Adding RabbitMq support #1

Merged
tormakris merged 3 commits from feature/rabbit-mq into master 2021-01-17 14:51:30 +01:00
18 changed files with 274 additions and 117 deletions
Showing only changes of commit 0df5b350d9 - Show all commits

View File

@ -2,7 +2,6 @@
using Birdmap.API.DTOs; using Birdmap.API.DTOs;
using Birdmap.BLL.Interfaces; using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Services.CommunicationServices.Hubs; using Birdmap.BLL.Services.CommunicationServices.Hubs;
using Birdmap.BLL.Services.CommunicationServices.Mqtt;
using Birdmap.DAL.Entities; using Birdmap.DAL.Entities;
using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http; using Microsoft.AspNetCore.Http;
@ -25,16 +24,16 @@ namespace Birdmap.API.Controllers
{ {
private readonly IServiceService _service; private readonly IServiceService _service;
private readonly IMapper _mapper; private readonly IMapper _mapper;
private readonly IMqttClientService _mqttClientService; private readonly ICommunicationService _communicationService;
private readonly IHubContext<ServicesHub, IServicesHubClient> _hubContext; private readonly IHubContext<ServicesHub, IServicesHubClient> _hubContext;
private readonly ILogger<ServicesController> _logger; private readonly ILogger<ServicesController> _logger;
public ServicesController(IServiceService service, IMapper mapper, MqttClientServiceProvider mqttClientProvider, public ServicesController(IServiceService service, IMapper mapper, ICommunicationServiceProvider communicationServiceProvider,
IHubContext<ServicesHub, IServicesHubClient> hubContext, ILogger<ServicesController> logger) IHubContext<ServicesHub, IServicesHubClient> hubContext, ILogger<ServicesController> logger)
{ {
_service = service; _service = service;
_mapper = mapper; _mapper = mapper;
_mqttClientService = mqttClientProvider.MqttClientService; _communicationService = communicationServiceProvider.Service;
_hubContext = hubContext; _hubContext = hubContext;
_logger = logger; _logger = logger;
} }
@ -86,8 +85,8 @@ namespace Birdmap.API.Controllers
Name = "Mqtt Client Service", Name = "Mqtt Client Service",
Uri = "localhost", Uri = "localhost",
}, },
Response = $"IsConnected: {_mqttClientService.IsConnected}", Response = $"IsConnected: {_communicationService.IsConnected}",
StatusCode = _mqttClientService.IsConnected ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable, StatusCode = _communicationService.IsConnected ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable,
}); });
return serviceInfos.ToList(); return serviceInfos.ToList();

View File

@ -38,6 +38,7 @@
}, },
"UseDummyServices": true, "UseDummyServices": true,
"ServicesBaseUrl": "https://birb.k8s.kmlabz.com/", "ServicesBaseUrl": "https://birb.k8s.kmlabz.com/",
"UseRabbitMq": false,
"Mqtt": { "Mqtt": {
"BrokerHostSettings": { "BrokerHostSettings": {
"Host": "localhost", "Host": "localhost",

View File

@ -24,6 +24,7 @@
}, },
"UseDummyServices": false, "UseDummyServices": false,
"ServicesBaseUrl": "https://birb.k8s.kmlabz.com/", "ServicesBaseUrl": "https://birb.k8s.kmlabz.com/",
"UseRabbitMq": false,
"Mqtt": { "Mqtt": {
"BrokerHostSettings": { "BrokerHostSettings": {
"Host": "", "Host": "",

View File

@ -9,6 +9,7 @@
<PackageReference Include="Microsoft.AspNetCore.SignalR.Core" Version="1.1.0" /> <PackageReference Include="Microsoft.AspNetCore.SignalR.Core" Version="1.1.0" />
<PackageReference Include="MQTTnet" Version="3.0.13" /> <PackageReference Include="MQTTnet" Version="3.0.13" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" /> <PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>

View File

@ -0,0 +1,7 @@
namespace Birdmap.BLL.Interfaces
{
public interface ICommunicationService
{
public bool IsConnected { get; }
}
}

View File

@ -0,0 +1,7 @@
namespace Birdmap.BLL.Interfaces
{
public interface ICommunicationServiceProvider
{
public ICommunicationService Service { get; }
}
}

View File

@ -10,6 +10,5 @@ namespace Birdmap.BLL.Interfaces
IMqttClientDisconnectedHandler, IMqttClientDisconnectedHandler,
IMqttApplicationMessageReceivedHandler IMqttApplicationMessageReceivedHandler
{ {
public bool IsConnected { get; }
} }
} }

View File

@ -3,16 +3,16 @@ using System;
namespace Birdmap.BLL.Options namespace Birdmap.BLL.Options
{ {
public class AspCoreMqttClientOptions : MqttClientOptionsBuilder public class MqttClientOptions : MqttClientOptionsBuilder
{ {
public IServiceProvider ServiceProvider { get; } public IServiceProvider ServiceProvider { get; }
public AspCoreMqttClientOptions(IServiceProvider serviceProvider) public MqttClientOptions(IServiceProvider serviceProvider)
{ {
ServiceProvider = serviceProvider; ServiceProvider = serviceProvider;
} }
public AspCoreMqttClientOptions WithTopic(string topic) public MqttClientOptions WithTopic(string topic)
{ {
WithUserProperty("Topic", topic); WithUserProperty("Topic", topic);

View File

@ -0,0 +1,4 @@
namespace Birdmap.BLL.Options
{
public record RabbitMqClientOptions(string Hostname, int Port, string Username, string Password, string Topic);
}

View File

@ -0,0 +1,87 @@
using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Services.CommunicationServices.Hubs;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Timer = System.Timers.Timer;
namespace Birdmap.BLL.Services.CommunationServices
{
internal class Payload
{
[JsonProperty("tag")]
public Guid TagID { get; set; }
[JsonProperty("probability")]
public double Probability { get; set; }
}
internal abstract class CommunicationServiceBase : ICommunicationService
{
protected readonly ILogger _logger;
protected readonly IInputService _inputService;
protected readonly IHubContext<DevicesHub, IDevicesHubClient> _hubContext;
private readonly Timer _hubTimer;
private readonly List<Message> _messages = new();
private readonly object _messageLock = new();
public abstract bool IsConnected { get; }
public CommunicationServiceBase(ILogger logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
{
_logger = logger;
_inputService = inputService;
_hubContext = hubContext;
_hubTimer = new Timer()
{
AutoReset = true,
Interval = 1000,
};
_hubTimer.Elapsed += SendMqttMessagesWithSignalR;
}
protected async Task ProcessJsonMessageAsync(string json)
{
try
{
var payload = JsonConvert.DeserializeObject<Payload>(json);
var inputResponse = await _inputService.GetInputAsync(payload.TagID);
lock (_messageLock)
{
_messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability));
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"Could not handle application message.");
}
}
protected void StartMessageTimer()
{
_hubTimer.Start();
}
protected void StopMessageTimer()
{
_hubTimer.Stop();
}
private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e)
{
lock (_messageLock)
{
if (_messages.Any())
{
_logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}");
_hubContext.Clients.All.NotifyMessagesAsync(_messages);
_messages.Clear();
}
}
}
}
}

View File

@ -0,0 +1,14 @@
using Birdmap.BLL.Interfaces;
namespace Birdmap.BLL.Services.CommunicationServices
{
internal class CommunicationServiceProvider : ICommunicationServiceProvider
{
public ICommunicationService Service { get; }
public CommunicationServiceProvider(ICommunicationService service)
{
Service = service;
}
}
}

View File

@ -1,4 +1,5 @@
using Birdmap.BLL.Interfaces; using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Services.CommunationServices;
using Birdmap.BLL.Services.CommunicationServices.Hubs; using Birdmap.BLL.Services.CommunicationServices.Hubs;
using Microsoft.AspNetCore.SignalR; using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
@ -7,59 +8,29 @@ using MQTTnet.Client;
using MQTTnet.Client.Connecting; using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using Newtonsoft.Json;
using System; using System;
using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Timer = System.Timers.Timer;
namespace Birdmap.BLL.Services.CommunicationServices.Mqtt namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
{ {
public class MqttClientService : IMqttClientService internal class MqttClientService : CommunicationServiceBase, IMqttClientService
{ {
private readonly IMqttClient _mqttClient; private readonly IMqttClient _mqttClient;
private readonly IMqttClientOptions _options; private readonly IMqttClientOptions _options;
private readonly ILogger<MqttClientService> _logger;
private readonly IInputService _inputService;
private readonly IHubContext<DevicesHub, IDevicesHubClient> _hubContext;
private readonly Timer _hubTimer;
private readonly List<Message> _messages = new();
private readonly object _messageLock = new();
public bool IsConnected => _mqttClient.IsConnected; public override bool IsConnected => _mqttClient.IsConnected;
public MqttClientService(IMqttClientOptions options, ILogger<MqttClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext) public MqttClientService(IMqttClientOptions options, ILogger<MqttClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
: base(logger, inputService, hubContext)
{ {
_options = options; _options = options;
_logger = logger;
_inputService = inputService;
_hubContext = hubContext;
_hubTimer = new Timer()
{
AutoReset = true,
Interval = 1000,
};
_hubTimer.Elapsed += SendMqttMessagesWithSignalR;
_mqttClient = new MqttFactory().CreateMqttClient(); _mqttClient = new MqttFactory().CreateMqttClient();
ConfigureMqttClient(); ConfigureMqttClient();
} }
private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e)
{
lock (_messageLock)
{
if (_messages.Any())
{
_logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}");
_hubContext.Clients.All.NotifyMessagesAsync(_messages);
_messages.Clear();
}
}
}
private void ConfigureMqttClient() private void ConfigureMqttClient()
{ {
_mqttClient.ConnectedHandler = this; _mqttClient.ConnectedHandler = this;
@ -67,36 +38,14 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
_mqttClient.ApplicationMessageReceivedHandler = this; _mqttClient.ApplicationMessageReceivedHandler = this;
} }
private class Payload public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
{
[JsonProperty("tag")]
public Guid TagID { get; set; }
[JsonProperty("probability")]
public double Probability { get; set; }
}
public async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
{ {
var message = eventArgs.ApplicationMessage.ConvertPayloadToString(); var message = eventArgs.ApplicationMessage.ConvertPayloadToString();
_logger.LogDebug($"Recieved [{eventArgs.ClientId}] " + _logger.LogDebug($"Recieved [{eventArgs.ClientId}] " +
$"Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {message} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel} | Retain: {eventArgs.ApplicationMessage.Retain}"); $"Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {message} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel} | Retain: {eventArgs.ApplicationMessage.Retain}");
try return ProcessJsonMessageAsync(message);
{
var payload = JsonConvert.DeserializeObject<Payload>(message);
var inputResponse = await _inputService.GetInputAsync(payload.TagID);
lock (_messageLock)
{
_messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability));
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"Could not handle application message.");
}
} }
public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs) public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
@ -107,7 +56,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
_logger.LogInformation($"Connected. Auth result: {eventArgs.AuthenticateResult}. Subscribing to topic: {topic}"); _logger.LogInformation($"Connected. Auth result: {eventArgs.AuthenticateResult}. Subscribing to topic: {topic}");
await _mqttClient.SubscribeAsync(topic); await _mqttClient.SubscribeAsync(topic);
_hubTimer.Start(); StartMessageTimer();
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -123,7 +72,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
try try
{ {
_hubTimer.Stop(); StopMessageTimer();
await _mqttClient.ConnectAsync(_options, CancellationToken.None); await _mqttClient.ConnectAsync(_options, CancellationToken.None);
} }
catch (Exception ex) catch (Exception ex)

View File

@ -1,14 +0,0 @@
using Birdmap.BLL.Interfaces;
namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
{
public class MqttClientServiceProvider
{
public IMqttClientService MqttClientService { get; }
public MqttClientServiceProvider(IMqttClientService mqttClientService)
{
MqttClientService = mqttClientService;
}
}
}

View File

@ -0,0 +1,72 @@
using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Options;
using Birdmap.BLL.Services.CommunicationServices.Hubs;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading.Tasks;
namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
{
internal class RabbitMqClientService : CommunicationServiceBase, IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
public override bool IsConnected => throw new NotImplementedException();
public RabbitMqClientService(RabbitMqClientOptions options, ILogger<RabbitMqClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
: base(logger, inputService, hubContext)
{
var factory = new ConnectionFactory()
{
HostName = options.Hostname,
Port = options.Port,
UserName = options.Username,
Password = options.Password,
AutomaticRecoveryEnabled = true,
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: options.Topic);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += OnRecieved;
_channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
StartMessageTimer();
}
private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs)
{
var props = eventArgs.BasicProperties;
var body = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
_logger.LogDebug($"Recieved [{props.UserId}] " +
$"ConsumerTag: {eventArgs.ConsumerTag} | DeliveryTag: {eventArgs.DeliveryTag} | Payload: {body} | Priority: {props.Priority}");
return ProcessJsonMessageAsync(body);
}
public void Dispose()
{
StopMessageTimer();
_channel.Close();
_connection.Close();
}
}
}

View File

@ -1,6 +1,8 @@
using Birdmap.BLL.Interfaces; using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Options; using Birdmap.BLL.Options;
using Birdmap.BLL.Services; using Birdmap.BLL.Services;
using Birdmap.BLL.Services.CommunationServices.RabbitMq;
using Birdmap.BLL.Services.CommunicationServices;
using Birdmap.BLL.Services.CommunicationServices.Mqtt; using Birdmap.BLL.Services.CommunicationServices.Mqtt;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@ -43,54 +45,81 @@ namespace Birdmap.BLL
services.AddSignalR(); services.AddSignalR();
services.AddMqttClientServiceWithConfig(opt =>
{
var mqtt = configuration.GetSection("Mqtt"); var mqtt = configuration.GetSection("Mqtt");
var mqttClient = mqtt.GetSection("ClientSettings"); var client = mqtt.GetSection("ClientSettings");
var clientSettings = new var clientSettings = new
{ {
Id = mqttClient.GetValue<string>("Id"), Id = client.GetValue<string>("Id"),
Username = mqttClient.GetValue<string>("Username"), Username = client.GetValue<string>("Username"),
Password = mqttClient.GetValue<string>("Password"), Password = client.GetValue<string>("Password"),
Topic = mqttClient.GetValue<string>("Topic"), Topic = client.GetValue<string>("Topic"),
}; };
var mqttBrokerHost = mqtt.GetSection("BrokerHostSettings"); var brokerHost = mqtt.GetSection("BrokerHostSettings");
var brokerHostSettings = new var brokerHostSettings = new
{ {
Host = mqttBrokerHost.GetValue<string>("Host"), Host = brokerHost.GetValue<string>("Host"),
Port = mqttBrokerHost.GetValue<int>("Port"), Port = brokerHost.GetValue<int>("Port"),
}; };
if (configuration.GetValue<bool>("UseRabbitMq"))
{
services.AddRabbitMqClientServiceWithConfig(new RabbitMqClientOptions(
Hostname: brokerHostSettings.Host,
Port: brokerHostSettings.Port,
Username: clientSettings.Username,
Password: clientSettings.Password,
Topic: clientSettings.Topic));
}
else
{
services.AddMqttClientServiceWithConfig(opt =>
{
opt opt
.WithTopic(clientSettings.Topic) .WithTopic(clientSettings.Topic)
.WithCredentials(clientSettings.Username, clientSettings.Password) .WithCredentials(clientSettings.Username, clientSettings.Password)
.WithClientId(clientSettings.Id) .WithClientId(clientSettings.Id)
.WithTcpServer(brokerHostSettings.Host, brokerHostSettings.Port); .WithTcpServer(brokerHostSettings.Host, brokerHostSettings.Port);
}); });
}
return services; return services;
} }
private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action<AspCoreMqttClientOptions> configureOptions) private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action<MqttClientOptions> configureOptions)
{ {
services.AddSingleton(serviceProvider => services.AddSingleton(serviceProvider =>
{ {
var optionBuilder = new AspCoreMqttClientOptions(serviceProvider); var optionBuilder = new MqttClientOptions(serviceProvider);
configureOptions(optionBuilder); configureOptions(optionBuilder);
return optionBuilder.Build(); return optionBuilder.Build();
}); });
services.AddSingleton<MqttClientService>();
services.AddSingleton<IHostedService>(serviceProvider => services.AddClientServiceWithProvider<MqttClientService>();
return services;
}
private static IServiceCollection AddRabbitMqClientServiceWithConfig(this IServiceCollection services, RabbitMqClientOptions options)
{ {
return serviceProvider.GetService<MqttClientService>(); services.AddSingleton(options);
});
services.AddClientServiceWithProvider<RabbitMqClientService>();
return services;
}
private static IServiceCollection AddClientServiceWithProvider<T>(this IServiceCollection services) where T : class, ICommunicationService
{
services.AddSingleton<T>();
services.AddSingleton(serviceProvider => services.AddSingleton(serviceProvider =>
{ {
var mqttClientService = serviceProvider.GetService<MqttClientService>(); return (IHostedService)serviceProvider.GetService<T>();
var mqttClientServiceProvider = new MqttClientServiceProvider(mqttClientService); });
return mqttClientServiceProvider; services.AddSingleton<ICommunicationServiceProvider>(serviceProvider =>
{
var clientService = serviceProvider.GetService<T>();
var clientServiceProvider = new CommunicationServiceProvider(clientService);
return clientServiceProvider;
}); });
return services; return services;
} }

View File

@ -267,7 +267,7 @@
this.trackBar1.LargeChange = 500; this.trackBar1.LargeChange = 500;
this.trackBar1.Location = new System.Drawing.Point(180, 162); this.trackBar1.Location = new System.Drawing.Point(180, 162);
this.trackBar1.Maximum = 5050; this.trackBar1.Maximum = 5050;
this.trackBar1.Minimum = 50; this.trackBar1.Minimum = 1;
this.trackBar1.Name = "trackBar1"; this.trackBar1.Name = "trackBar1";
this.trackBar1.Size = new System.Drawing.Size(247, 45); this.trackBar1.Size = new System.Drawing.Size(247, 45);
this.trackBar1.SmallChange = 100; this.trackBar1.SmallChange = 100;

View File

@ -38,6 +38,7 @@ services:
- Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices - Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices
- Birdmap_UseDummyServices=true - Birdmap_UseDummyServices=true
- Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/ - Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/
- Birdmap_UseRabbitMq=true
- Birdmap_Mqtt__BrokerHostSettings__Host=localhost - Birdmap_Mqtt__BrokerHostSettings__Host=localhost
- Birdmap_Mqtt__BrokerHostSettings__Port=1883 - Birdmap_Mqtt__BrokerHostSettings__Port=1883
- Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client - Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client

BIN
docs/thesis.pdf Normal file

Binary file not shown.