output-service-tsdb/RabbitMQ/ConsumeRabbitMQHostedServic...

68 lines
2.1 KiB
C#

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;
namespace OutputServiceTSDB.RabbitMQ
{
public class ConsumeRabbitMQHostedService : BackgroundService
{
private readonly ILogger _logger;
private IConnection _connection;
private IModel _channel;
public ConsumeRabbitMQHostedService(ILoggerFactory loggerFactory)
{
this._logger = loggerFactory.CreateLogger<ConsumeRabbitMQHostedService>();
InitRabbitMQ();
}
private void InitRabbitMQ()
{
var factory = new ConnectionFactory { HostName = "localhost" };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare("demo.exchange", ExchangeType.Topic);
_channel.QueueDeclare("demo.queue.log", false, false, false, null);
_channel.QueueBind("demo.queue.log", "demo.exchange", "demo.queue.*", null);
_channel.BasicQos(0, 1, false);
_connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
stoppingToken.ThrowIfCancellationRequested();
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (ch, ea) =>
{
var content = System.Text.Encoding.UTF8.GetString(ea.Body);
HandleMessage(content);
_channel.BasicAck(ea.DeliveryTag, false);
};
_channel.BasicConsume("demo.queue.log", false, consumer);
return Task.CompletedTask;
}
private void HandleMessage(string content)
{
_logger.LogInformation($"consumer received {content}");
}
public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}
}