using Birdmap.BLL.Interfaces; using Birdmap.BLL.Options; using Birdmap.BLL.Services.CommunicationServices.Hubs; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Birdmap.BLL.Services.CommunationServices.RabbitMq { internal class RabbitMqClientService : CommunicationServiceBase { private IConnection _connection; private IModel _channel; private readonly IConnectionFactory _factory; private readonly RabbitMqClientOptions _options; public override bool IsConnected => throw new NotImplementedException(); public RabbitMqClientService(RabbitMqClientOptions options, ILogger logger, IInputService inputService, IHubContext hubContext) : base(logger, inputService, hubContext) { _options = options; _factory = new ConnectionFactory() { HostName = options.Hostname, Port = options.Port, UserName = options.Username, Password = options.Password, AutomaticRecoveryEnabled = true, }; } private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs) { var props = eventArgs.BasicProperties; var body = Encoding.UTF8.GetString(eventArgs.Body.ToArray()); _logger.LogDebug($"Recieved [{props.UserId}] " + $"ConsumerTag: {eventArgs.ConsumerTag} | DeliveryTag: {eventArgs.DeliveryTag} | Payload: {body} | Priority: {props.Priority}"); return ProcessJsonMessageAsync(body); } public override async Task StartAsync(CancellationToken cancellationToken) { try { Connect(); } catch (Exception ex) { _logger.LogError(ex, $"Cannot connect. Reconnecting..."); await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); cancellationToken.ThrowIfCancellationRequested(); await StartAsync(cancellationToken); } } public override Task StopAsync(CancellationToken cancellationToken) { try { StopMessageTimer(); _channel?.Close(); _connection?.Close(); return Task.CompletedTask; } catch (Exception ex) { _logger.LogError(ex, $"Cannot disconnect..."); return Task.FromException(ex); } } private void Connect() { _connection = _factory.CreateConnection(); _channel = _connection.CreateModel(); _channel.ExchangeDeclare( exchange: _options.ExchangeName, type: _options.ExchangeType, durable: _options.ExchangeDurable, autoDelete: _options.ExchangeAutoDelete); _channel.QueueDeclare( queue: _options.QueueName, durable: _options.QueueDurable, exclusive: _options.QueueExclusive, autoDelete: _options.QueueAutoDelete); _channel.QueueBind(queue: _options.QueueName, exchange: _options.ExchangeName, routingKey: _options.Topic); var consumer = new AsyncEventingBasicConsumer(_channel); consumer.Received += OnRecieved; _channel.BasicConsume(queue: _options.QueueName, autoAck: true, consumer: consumer); StartMessageTimer(); } } }