Moved Dockerfile, fixed RabbitMq connection fail in constructor #2

Merged
tormakris merged 1 commits from feature/rabbit-mq into master 2021-01-17 16:00:26 +01:00
8 changed files with 76 additions and 37 deletions

View File

@ -1,6 +1,8 @@
namespace Birdmap.BLL.Interfaces using Microsoft.Extensions.Hosting;
namespace Birdmap.BLL.Interfaces
{ {
public interface ICommunicationService public interface ICommunicationService : IHostedService
{ {
public bool IsConnected { get; } public bool IsConnected { get; }
} }

View File

@ -5,8 +5,7 @@ using MQTTnet.Client.Receiving;
namespace Birdmap.BLL.Interfaces namespace Birdmap.BLL.Interfaces
{ {
public interface IMqttClientService : IHostedService, public interface IMqttClientService : IMqttClientConnectedHandler,
IMqttClientConnectedHandler,
IMqttClientDisconnectedHandler, IMqttClientDisconnectedHandler,
IMqttApplicationMessageReceivedHandler IMqttApplicationMessageReceivedHandler
{ {

View File

@ -6,6 +6,7 @@ using Newtonsoft.Json;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Timer = System.Timers.Timer; using Timer = System.Timers.Timer;
@ -83,5 +84,9 @@ namespace Birdmap.BLL.Services.CommunationServices
} }
} }
} }
public abstract Task StartAsync(CancellationToken cancellationToken);
public abstract Task StopAsync(CancellationToken cancellationToken);
} }
} }

View File

@ -81,7 +81,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
} }
} }
public async Task StartAsync(CancellationToken cancellationToken) public override async Task StartAsync(CancellationToken cancellationToken)
{ {
try try
{ {
@ -97,7 +97,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
} }
} }
public async Task StopAsync(CancellationToken cancellationToken) public override async Task StopAsync(CancellationToken cancellationToken)
{ {
try try
{ {

View File

@ -7,21 +7,25 @@ using RabbitMQ.Client;
using RabbitMQ.Client.Events; using RabbitMQ.Client.Events;
using System; using System;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Birdmap.BLL.Services.CommunationServices.RabbitMq namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
{ {
internal class RabbitMqClientService : CommunicationServiceBase, IDisposable internal class RabbitMqClientService : CommunicationServiceBase
{ {
private readonly IConnection _connection; private IConnection _connection;
private readonly IModel _channel; private IModel _channel;
private readonly IConnectionFactory _factory;
private readonly string _topic;
public override bool IsConnected => throw new NotImplementedException(); public override bool IsConnected => throw new NotImplementedException();
public RabbitMqClientService(RabbitMqClientOptions options, ILogger<RabbitMqClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext) public RabbitMqClientService(RabbitMqClientOptions options, ILogger<RabbitMqClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
: base(logger, inputService, hubContext) : base(logger, inputService, hubContext)
{ {
var factory = new ConnectionFactory() _topic = options.Topic;
_factory = new ConnectionFactory()
{ {
HostName = options.Hostname, HostName = options.Hostname,
Port = options.Port, Port = options.Port,
@ -30,25 +34,6 @@ namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
AutomaticRecoveryEnabled = true, AutomaticRecoveryEnabled = true,
}; };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: options.Topic);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += OnRecieved;
_channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
StartMessageTimer();
} }
private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs) private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs)
@ -62,11 +47,59 @@ namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
return ProcessJsonMessageAsync(body); return ProcessJsonMessageAsync(body);
} }
public void Dispose() public override async Task StartAsync(CancellationToken cancellationToken)
{ {
StopMessageTimer(); try
_channel.Close(); {
_connection.Close(); Connect();
}
catch (Exception ex)
{
_logger.LogError(ex, $"Cannot connect. Reconnecting...");
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
await StartAsync(cancellationToken);
}
}
public override Task StopAsync(CancellationToken cancellationToken)
{
try
{
StopMessageTimer();
_channel?.Close();
_connection?.Close();
return Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Cannot disconnect...");
return Task.FromException(ex);
}
}
private void Connect()
{
_connection = _factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: _topic);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += OnRecieved;
_channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
StartMessageTimer();
} }
} }
} }

View File

@ -111,9 +111,9 @@ namespace Birdmap.BLL
private static IServiceCollection AddClientServiceWithProvider<T>(this IServiceCollection services) where T : class, ICommunicationService private static IServiceCollection AddClientServiceWithProvider<T>(this IServiceCollection services) where T : class, ICommunicationService
{ {
services.AddSingleton<T>(); services.AddSingleton<T>();
services.AddSingleton(serviceProvider => services.AddSingleton<IHostedService>(serviceProvider =>
{ {
return (IHostedService)serviceProvider.GetService<T>(); return serviceProvider.GetService<T>();
}); });
services.AddSingleton<ICommunicationServiceProvider>(serviceProvider => services.AddSingleton<ICommunicationServiceProvider>(serviceProvider =>
{ {

View File

@ -17,7 +17,7 @@ services:
- ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro - ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro
build: build:
context: . context: .
dockerfile: Birdmap.API/Dockerfile dockerfile: Dockerfile
depends_on: depends_on:
- db - db
environment: environment:
@ -38,7 +38,7 @@ services:
- Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices - Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices
- Birdmap_UseDummyServices=true - Birdmap_UseDummyServices=true
- Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/ - Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/
- Birdmap_UseRabbitMq=true - Birdmap_UseRabbitMq=false
- Birdmap_Mqtt__BrokerHostSettings__Host=localhost - Birdmap_Mqtt__BrokerHostSettings__Host=localhost
- Birdmap_Mqtt__BrokerHostSettings__Port=1883 - Birdmap_Mqtt__BrokerHostSettings__Port=1883
- Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client - Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client