Compare commits

..

No commits in common. "6a579772dff8ca0b4fcc35d8aa7a57ea539e82c6" and "0085b951989de21ca3be32bb6911480e99e25e11" have entirely different histories.

8 changed files with 37 additions and 76 deletions

View File

@ -1,8 +1,6 @@
using Microsoft.Extensions.Hosting; namespace Birdmap.BLL.Interfaces
namespace Birdmap.BLL.Interfaces
{ {
public interface ICommunicationService : IHostedService public interface ICommunicationService
{ {
public bool IsConnected { get; } public bool IsConnected { get; }
} }

View File

@ -5,7 +5,8 @@ using MQTTnet.Client.Receiving;
namespace Birdmap.BLL.Interfaces namespace Birdmap.BLL.Interfaces
{ {
public interface IMqttClientService : IMqttClientConnectedHandler, public interface IMqttClientService : IHostedService,
IMqttClientConnectedHandler,
IMqttClientDisconnectedHandler, IMqttClientDisconnectedHandler,
IMqttApplicationMessageReceivedHandler IMqttApplicationMessageReceivedHandler
{ {

View File

@ -6,7 +6,6 @@ using Newtonsoft.Json;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Timer = System.Timers.Timer; using Timer = System.Timers.Timer;
@ -84,9 +83,5 @@ namespace Birdmap.BLL.Services.CommunationServices
} }
} }
} }
public abstract Task StartAsync(CancellationToken cancellationToken);
public abstract Task StopAsync(CancellationToken cancellationToken);
} }
} }

View File

@ -81,7 +81,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
} }
} }
public override async Task StartAsync(CancellationToken cancellationToken) public async Task StartAsync(CancellationToken cancellationToken)
{ {
try try
{ {
@ -97,7 +97,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
} }
} }
public override async Task StopAsync(CancellationToken cancellationToken) public async Task StopAsync(CancellationToken cancellationToken)
{ {
try try
{ {

View File

@ -7,25 +7,21 @@ using RabbitMQ.Client;
using RabbitMQ.Client.Events; using RabbitMQ.Client.Events;
using System; using System;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Birdmap.BLL.Services.CommunationServices.RabbitMq namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
{ {
internal class RabbitMqClientService : CommunicationServiceBase internal class RabbitMqClientService : CommunicationServiceBase, IDisposable
{ {
private IConnection _connection; private readonly IConnection _connection;
private IModel _channel; private readonly IModel _channel;
private readonly IConnectionFactory _factory;
private readonly string _topic;
public override bool IsConnected => throw new NotImplementedException(); public override bool IsConnected => throw new NotImplementedException();
public RabbitMqClientService(RabbitMqClientOptions options, ILogger<RabbitMqClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext) public RabbitMqClientService(RabbitMqClientOptions options, ILogger<RabbitMqClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
: base(logger, inputService, hubContext) : base(logger, inputService, hubContext)
{ {
_topic = options.Topic; var factory = new ConnectionFactory()
_factory = new ConnectionFactory()
{ {
HostName = options.Hostname, HostName = options.Hostname,
Port = options.Port, Port = options.Port,
@ -34,6 +30,25 @@ namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
AutomaticRecoveryEnabled = true, AutomaticRecoveryEnabled = true,
}; };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: options.Topic);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += OnRecieved;
_channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
StartMessageTimer();
} }
private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs) private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs)
@ -47,59 +62,11 @@ namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
return ProcessJsonMessageAsync(body); return ProcessJsonMessageAsync(body);
} }
public override async Task StartAsync(CancellationToken cancellationToken) public void Dispose()
{
try
{
Connect();
}
catch (Exception ex)
{
_logger.LogError(ex, $"Cannot connect. Reconnecting...");
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
await StartAsync(cancellationToken);
}
}
public override Task StopAsync(CancellationToken cancellationToken)
{
try
{ {
StopMessageTimer(); StopMessageTimer();
_channel?.Close(); _channel.Close();
_connection?.Close(); _connection.Close();
return Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Cannot disconnect...");
return Task.FromException(ex);
}
}
private void Connect()
{
_connection = _factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: _topic);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += OnRecieved;
_channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
StartMessageTimer();
} }
} }
} }

View File

@ -111,9 +111,9 @@ namespace Birdmap.BLL
private static IServiceCollection AddClientServiceWithProvider<T>(this IServiceCollection services) where T : class, ICommunicationService private static IServiceCollection AddClientServiceWithProvider<T>(this IServiceCollection services) where T : class, ICommunicationService
{ {
services.AddSingleton<T>(); services.AddSingleton<T>();
services.AddSingleton<IHostedService>(serviceProvider => services.AddSingleton(serviceProvider =>
{ {
return serviceProvider.GetService<T>(); return (IHostedService)serviceProvider.GetService<T>();
}); });
services.AddSingleton<ICommunicationServiceProvider>(serviceProvider => services.AddSingleton<ICommunicationServiceProvider>(serviceProvider =>
{ {

View File

@ -17,7 +17,7 @@ services:
- ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro - ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro
build: build:
context: . context: .
dockerfile: Dockerfile dockerfile: Birdmap.API/Dockerfile
depends_on: depends_on:
- db - db
environment: environment:
@ -38,7 +38,7 @@ services:
- Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices - Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices
- Birdmap_UseDummyServices=true - Birdmap_UseDummyServices=true
- Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/ - Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/
- Birdmap_UseRabbitMq=false - Birdmap_UseRabbitMq=true
- Birdmap_Mqtt__BrokerHostSettings__Host=localhost - Birdmap_Mqtt__BrokerHostSettings__Host=localhost
- Birdmap_Mqtt__BrokerHostSettings__Port=1883 - Birdmap_Mqtt__BrokerHostSettings__Port=1883
- Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client - Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client