Kristóf Torma
cca6aff3c0
All checks were successful
continuous-integration/drone/push Build is passing
64 lines
2.3 KiB
C#
64 lines
2.3 KiB
C#
using RabbitMQ.Client;
|
|
using RabbitMQ.Client.Events;
|
|
using OutputServiceTSDB.Utilities;
|
|
using Microsoft.Extensions.Logging;
|
|
using System.Threading.Tasks;
|
|
using OutputServiceTSDB.InfluxDB;
|
|
using OutputServiceTSDB.Models;
|
|
using Newtonsoft.Json;
|
|
using System;
|
|
|
|
namespace OutputServiceTSDB.RabbitMQ
|
|
{
|
|
public class RabbitMQWorker : IDisposable
|
|
{
|
|
private readonly ILogger _logger;
|
|
private IConnection _connection;
|
|
private IModel _channel;
|
|
private InfluxWriter influxWriter;
|
|
|
|
public RabbitMQWorker(ILogger<RabbitMQWorker> logger)
|
|
{
|
|
_logger = logger;
|
|
influxWriter = new InfluxWriter();
|
|
InitRabbitMQ();
|
|
}
|
|
|
|
private void InitRabbitMQ()
|
|
{
|
|
var factory = new ConnectionFactory { HostName = EnvironmentVariableConfiguration.RabbitMQHostname, UserName = EnvironmentVariableConfiguration.RabbitMQUserName, Password = EnvironmentVariableConfiguration.RabbitMQPassword };
|
|
|
|
_connection = factory.CreateConnection();
|
|
|
|
_channel = _connection.CreateModel();
|
|
|
|
_channel.ExchangeDeclare(EnvironmentVariableConfiguration.RabbitMQExchange, ExchangeType.Topic);
|
|
_channel.QueueDeclare(EnvironmentVariableConfiguration.RabbitMQQueue, false, false, false, null);
|
|
_channel.QueueBind(EnvironmentVariableConfiguration.RabbitMQQueue, EnvironmentVariableConfiguration.RabbitMQExchange, null, null);
|
|
_channel.BasicQos(0, 1, false);
|
|
}
|
|
|
|
public void ProcessMessages()
|
|
{
|
|
var consumer = new AsyncEventingBasicConsumer(_channel);
|
|
consumer.Received += HandleMessage;
|
|
|
|
_channel.BasicConsume("demo.queue.log", false, consumer);
|
|
}
|
|
|
|
private async Task HandleMessage(object sender, BasicDeliverEventArgs @event)
|
|
{
|
|
var content = System.Text.Encoding.UTF8.GetString(@event.Body);
|
|
_logger.LogInformation($"received {content}");
|
|
MeasurementObject measurementObject = JsonConvert.DeserializeObject<MeasurementObject>(content);
|
|
await Task.Run(() => influxWriter.Write(measurementObject));
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
_channel.Close();
|
|
_connection.Close();
|
|
influxWriter.Dispose();
|
|
}
|
|
}
|
|
} |