using Birdmap.BLL.Interfaces; using Birdmap.BLL.Options; using Birdmap.BLL.Services.CommunicationServices.Hubs; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading; using System.Threading.Tasks; namespace Birdmap.BLL.Services.CommunationServices.RabbitMq { internal class RabbitMqClientService : CommunicationServiceBase { private IConnection _connection; private IModel _channel; private readonly IConnectionFactory _factory; private readonly string _topic; public override bool IsConnected => throw new NotImplementedException(); public RabbitMqClientService(RabbitMqClientOptions options, ILogger logger, IInputService inputService, IHubContext hubContext) : base(logger, inputService, hubContext) { _topic = options.Topic; _factory = new ConnectionFactory() { HostName = options.Hostname, Port = options.Port, UserName = options.Username, Password = options.Password, AutomaticRecoveryEnabled = true, }; } private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs) { var props = eventArgs.BasicProperties; var body = Encoding.UTF8.GetString(eventArgs.Body.ToArray()); _logger.LogDebug($"Recieved [{props.UserId}] " + $"ConsumerTag: {eventArgs.ConsumerTag} | DeliveryTag: {eventArgs.DeliveryTag} | Payload: {body} | Priority: {props.Priority}"); return ProcessJsonMessageAsync(body); } public override async Task StartAsync(CancellationToken cancellationToken) { try { Connect(); } catch (Exception ex) { _logger.LogError(ex, $"Cannot connect. Reconnecting..."); await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); cancellationToken.ThrowIfCancellationRequested(); await StartAsync(cancellationToken); } } public override Task StopAsync(CancellationToken cancellationToken) { try { StopMessageTimer(); _channel?.Close(); _connection?.Close(); return Task.CompletedTask; } catch (Exception ex) { _logger.LogError(ex, $"Cannot disconnect..."); return Task.FromException(ex); } } private void Connect() { _connection = _factory.CreateConnection(); _channel = _connection.CreateModel(); _channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); var queueName = _channel.QueueDeclare().QueueName; _channel.QueueBind(queue: queueName, exchange: "topic_logs", routingKey: _topic); var consumer = new AsyncEventingBasicConsumer(_channel); consumer.Received += OnRecieved; _channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); StartMessageTimer(); } } }