using RabbitMQ.Client; using RabbitMQ.Client.Events; using OutputServiceTSDB.Utilities; using Microsoft.Extensions.Logging; using System.Threading.Tasks; using OutputServiceTSDB.InfluxDB; using OutputServiceTSDB.Models; using Newtonsoft.Json; using System; namespace OutputServiceTSDB.RabbitMQ { public class RabbitMQWorker : IDisposable { private readonly ILogger _logger; private IConnection _connection; private IModel _channel; private InfluxWriter influxWriter; public RabbitMQWorker(ILogger logger) { _logger = logger; influxWriter = new InfluxWriter(); InitRabbitMQ(); } private void InitRabbitMQ() { var factory = new ConnectionFactory { HostName = EnvironmentVariableConfiguration.RabbitMQHostname, UserName = EnvironmentVariableConfiguration.RabbitMQUserName, Password = EnvironmentVariableConfiguration.RabbitMQPassword }; _connection = factory.CreateConnection(); _channel = _connection.CreateModel(); _channel.ExchangeDeclare(EnvironmentVariableConfiguration.RabbitMQExchange, ExchangeType.Topic); _channel.QueueDeclare(EnvironmentVariableConfiguration.RabbitMQQueue, false, false, false, null); _channel.QueueBind(EnvironmentVariableConfiguration.RabbitMQQueue, EnvironmentVariableConfiguration.RabbitMQExchange, null, null); _channel.BasicQos(0, 1, false); } public void ProcessMessages() { var consumer = new AsyncEventingBasicConsumer(_channel); consumer.Received += HandleMessage; _channel.BasicConsume("demo.queue.log", false, consumer); } private async Task HandleMessage(object sender, BasicDeliverEventArgs @event) { var content = System.Text.Encoding.UTF8.GetString(@event.Body); _logger.LogInformation($"received {content}"); MeasurementObject measurementObject = JsonConvert.DeserializeObject(content); await Task.Run(() => influxWriter.Write(measurementObject)); } public void Dispose() { _channel.Close(); _connection.Close(); influxWriter.Dispose(); } } }