output-service-tsdb/RabbitMQ/RabbitMQWorker.cs

64 lines
2.3 KiB
C#

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using OutputServiceTSDB.Utilities;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
using OutputServiceTSDB.InfluxDB;
using OutputServiceTSDB.Models;
using Newtonsoft.Json;
using System;
namespace OutputServiceTSDB.RabbitMQ
{
public class RabbitMQWorker : IDisposable
{
private readonly ILogger _logger;
private IConnection _connection;
private IModel _channel;
private InfluxWriter influxWriter;
public RabbitMQWorker(ILogger<RabbitMQWorker> logger)
{
_logger = logger;
influxWriter = new InfluxWriter();
InitRabbitMQ();
}
private void InitRabbitMQ()
{
var factory = new ConnectionFactory { HostName = EnvironmentVariableConfiguration.RabbitMQHostname, UserName = EnvironmentVariableConfiguration.RabbitMQUserName, Password = EnvironmentVariableConfiguration.RabbitMQPassword };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(EnvironmentVariableConfiguration.RabbitMQExchange, ExchangeType.Topic);
_channel.QueueDeclare(EnvironmentVariableConfiguration.RabbitMQQueue, false, false, false, null);
_channel.QueueBind(EnvironmentVariableConfiguration.RabbitMQQueue, EnvironmentVariableConfiguration.RabbitMQExchange, null, null);
_channel.BasicQos(0, 1, false);
}
public void ProcessMessages()
{
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += HandleMessage;
_channel.BasicConsume("demo.queue.log", false, consumer);
}
private async Task HandleMessage(object sender, BasicDeliverEventArgs @event)
{
var content = System.Text.Encoding.UTF8.GetString(@event.Body);
_logger.LogInformation($"received {content}");
MeasurementObject measurementObject = JsonConvert.DeserializeObject<MeasurementObject>(content);
await Task.Run(() => influxWriter.Write(measurementObject));
}
public void Dispose()
{
_channel.Close();
_connection.Close();
influxWriter.Dispose();
}
}
}