Merge pull request 'Adding RabbitMq support' (#1) from feature/rabbit-mq into master
Some checks failed
continuous-integration/drone/push Build is failing

Reviewed-on: #1
This commit is contained in:
Torma Kristóf 2021-01-17 14:51:29 +01:00
commit 0085b95198
19 changed files with 276 additions and 117 deletions

View File

@ -2,7 +2,6 @@
using Birdmap.API.DTOs;
using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Services.CommunicationServices.Hubs;
using Birdmap.BLL.Services.CommunicationServices.Mqtt;
using Birdmap.DAL.Entities;
using Microsoft.AspNetCore.Authorization;
using Microsoft.AspNetCore.Http;
@ -25,16 +24,16 @@ namespace Birdmap.API.Controllers
{
private readonly IServiceService _service;
private readonly IMapper _mapper;
private readonly IMqttClientService _mqttClientService;
private readonly ICommunicationService _communicationService;
private readonly IHubContext<ServicesHub, IServicesHubClient> _hubContext;
private readonly ILogger<ServicesController> _logger;
public ServicesController(IServiceService service, IMapper mapper, MqttClientServiceProvider mqttClientProvider,
public ServicesController(IServiceService service, IMapper mapper, ICommunicationServiceProvider communicationServiceProvider,
IHubContext<ServicesHub, IServicesHubClient> hubContext, ILogger<ServicesController> logger)
{
_service = service;
_mapper = mapper;
_mqttClientService = mqttClientProvider.MqttClientService;
_communicationService = communicationServiceProvider.Service;
_hubContext = hubContext;
_logger = logger;
}
@ -86,8 +85,8 @@ namespace Birdmap.API.Controllers
Name = "Mqtt Client Service",
Uri = "localhost",
},
Response = $"IsConnected: {_mqttClientService.IsConnected}",
StatusCode = _mqttClientService.IsConnected ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable,
Response = $"IsConnected: {_communicationService.IsConnected}",
StatusCode = _communicationService.IsConnected ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable,
});
return serviceInfos.ToList();

View File

@ -38,6 +38,7 @@
},
"UseDummyServices": true,
"ServicesBaseUrl": "https://birb.k8s.kmlabz.com/",
"UseRabbitMq": false,
"Mqtt": {
"BrokerHostSettings": {
"Host": "localhost",

View File

@ -24,6 +24,7 @@
},
"UseDummyServices": false,
"ServicesBaseUrl": "https://birb.k8s.kmlabz.com/",
"UseRabbitMq": false,
"Mqtt": {
"BrokerHostSettings": {
"Host": "",

View File

@ -35,6 +35,8 @@
<!--Skip non-critical Mqtt logs-->
<logger name="*.*Mqtt*.*" minlevel="Trace" maxlevel="Warning" writeTo="mqttFile" final="true"/>
<logger name="*.*RabbitMq*.*" minlevel="Trace" maxlevel="Warning" writeTo="mqttFile" final="true"/>
<logger name="*.*CommunicationServiceBase*.*" minlevel="Trace" maxlevel="Warning" writeTo="mqttFile" final="true"/>
<!--Skip non-critical Hub logs-->
<logger name="*.*Hubs*.*" minlevel="Trace" maxlevel="Warning" writeTo="hubsFile" final="true"/>

View File

@ -9,6 +9,7 @@
<PackageReference Include="Microsoft.AspNetCore.SignalR.Core" Version="1.1.0" />
<PackageReference Include="MQTTnet" Version="3.0.13" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
</ItemGroup>
<ItemGroup>

View File

@ -0,0 +1,7 @@
namespace Birdmap.BLL.Interfaces
{
public interface ICommunicationService
{
public bool IsConnected { get; }
}
}

View File

@ -0,0 +1,7 @@
namespace Birdmap.BLL.Interfaces
{
public interface ICommunicationServiceProvider
{
public ICommunicationService Service { get; }
}
}

View File

@ -10,6 +10,5 @@ namespace Birdmap.BLL.Interfaces
IMqttClientDisconnectedHandler,
IMqttApplicationMessageReceivedHandler
{
public bool IsConnected { get; }
}
}

View File

@ -3,16 +3,16 @@ using System;
namespace Birdmap.BLL.Options
{
public class AspCoreMqttClientOptions : MqttClientOptionsBuilder
public class MqttClientOptions : MqttClientOptionsBuilder
{
public IServiceProvider ServiceProvider { get; }
public AspCoreMqttClientOptions(IServiceProvider serviceProvider)
public MqttClientOptions(IServiceProvider serviceProvider)
{
ServiceProvider = serviceProvider;
}
public AspCoreMqttClientOptions WithTopic(string topic)
public MqttClientOptions WithTopic(string topic)
{
WithUserProperty("Topic", topic);

View File

@ -0,0 +1,4 @@
namespace Birdmap.BLL.Options
{
public record RabbitMqClientOptions(string Hostname, int Port, string Username, string Password, string Topic);
}

View File

@ -0,0 +1,87 @@
using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Services.CommunicationServices.Hubs;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Timer = System.Timers.Timer;
namespace Birdmap.BLL.Services.CommunationServices
{
internal class Payload
{
[JsonProperty("tag")]
public Guid TagID { get; set; }
[JsonProperty("probability")]
public double Probability { get; set; }
}
internal abstract class CommunicationServiceBase : ICommunicationService
{
protected readonly ILogger _logger;
protected readonly IInputService _inputService;
protected readonly IHubContext<DevicesHub, IDevicesHubClient> _hubContext;
private readonly Timer _hubTimer;
private readonly List<Message> _messages = new();
private readonly object _messageLock = new();
public abstract bool IsConnected { get; }
public CommunicationServiceBase(ILogger logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
{
_logger = logger;
_inputService = inputService;
_hubContext = hubContext;
_hubTimer = new Timer()
{
AutoReset = true,
Interval = 1000,
};
_hubTimer.Elapsed += SendMqttMessagesWithSignalR;
}
protected async Task ProcessJsonMessageAsync(string json)
{
try
{
var payload = JsonConvert.DeserializeObject<Payload>(json);
var inputResponse = await _inputService.GetInputAsync(payload.TagID);
lock (_messageLock)
{
_messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability));
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"Could not handle application message.");
}
}
protected void StartMessageTimer()
{
_hubTimer.Start();
}
protected void StopMessageTimer()
{
_hubTimer.Stop();
}
private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e)
{
lock (_messageLock)
{
if (_messages.Any())
{
_logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}");
_hubContext.Clients.All.NotifyMessagesAsync(_messages);
_messages.Clear();
}
}
}
}
}

View File

@ -0,0 +1,14 @@
using Birdmap.BLL.Interfaces;
namespace Birdmap.BLL.Services.CommunicationServices
{
internal class CommunicationServiceProvider : ICommunicationServiceProvider
{
public ICommunicationService Service { get; }
public CommunicationServiceProvider(ICommunicationService service)
{
Service = service;
}
}
}

View File

@ -1,4 +1,5 @@
using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Services.CommunationServices;
using Birdmap.BLL.Services.CommunicationServices.Hubs;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
@ -7,59 +8,29 @@ using MQTTnet.Client;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Timer = System.Timers.Timer;
namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
{
public class MqttClientService : IMqttClientService
internal class MqttClientService : CommunicationServiceBase, IMqttClientService
{
private readonly IMqttClient _mqttClient;
private readonly IMqttClientOptions _options;
private readonly ILogger<MqttClientService> _logger;
private readonly IInputService _inputService;
private readonly IHubContext<DevicesHub, IDevicesHubClient> _hubContext;
private readonly Timer _hubTimer;
private readonly List<Message> _messages = new();
private readonly object _messageLock = new();
public bool IsConnected => _mqttClient.IsConnected;
public override bool IsConnected => _mqttClient.IsConnected;
public MqttClientService(IMqttClientOptions options, ILogger<MqttClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
: base(logger, inputService, hubContext)
{
_options = options;
_logger = logger;
_inputService = inputService;
_hubContext = hubContext;
_hubTimer = new Timer()
{
AutoReset = true,
Interval = 1000,
};
_hubTimer.Elapsed += SendMqttMessagesWithSignalR;
_mqttClient = new MqttFactory().CreateMqttClient();
ConfigureMqttClient();
}
private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e)
{
lock (_messageLock)
{
if (_messages.Any())
{
_logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}");
_hubContext.Clients.All.NotifyMessagesAsync(_messages);
_messages.Clear();
}
}
}
private void ConfigureMqttClient()
{
_mqttClient.ConnectedHandler = this;
@ -67,36 +38,14 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
_mqttClient.ApplicationMessageReceivedHandler = this;
}
private class Payload
{
[JsonProperty("tag")]
public Guid TagID { get; set; }
[JsonProperty("probability")]
public double Probability { get; set; }
}
public async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
{
var message = eventArgs.ApplicationMessage.ConvertPayloadToString();
_logger.LogDebug($"Recieved [{eventArgs.ClientId}] " +
$"Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {message} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel} | Retain: {eventArgs.ApplicationMessage.Retain}");
try
{
var payload = JsonConvert.DeserializeObject<Payload>(message);
var inputResponse = await _inputService.GetInputAsync(payload.TagID);
lock (_messageLock)
{
_messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability));
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"Could not handle application message.");
}
return ProcessJsonMessageAsync(message);
}
public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
@ -107,7 +56,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
_logger.LogInformation($"Connected. Auth result: {eventArgs.AuthenticateResult}. Subscribing to topic: {topic}");
await _mqttClient.SubscribeAsync(topic);
_hubTimer.Start();
StartMessageTimer();
}
catch (Exception ex)
{
@ -123,7 +72,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
try
{
_hubTimer.Stop();
StopMessageTimer();
await _mqttClient.ConnectAsync(_options, CancellationToken.None);
}
catch (Exception ex)

View File

@ -1,14 +0,0 @@
using Birdmap.BLL.Interfaces;
namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
{
public class MqttClientServiceProvider
{
public IMqttClientService MqttClientService { get; }
public MqttClientServiceProvider(IMqttClientService mqttClientService)
{
MqttClientService = mqttClientService;
}
}
}

View File

@ -0,0 +1,72 @@
using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Options;
using Birdmap.BLL.Services.CommunicationServices.Hubs;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading.Tasks;
namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
{
internal class RabbitMqClientService : CommunicationServiceBase, IDisposable
{
private readonly IConnection _connection;
private readonly IModel _channel;
public override bool IsConnected => throw new NotImplementedException();
public RabbitMqClientService(RabbitMqClientOptions options, ILogger<RabbitMqClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
: base(logger, inputService, hubContext)
{
var factory = new ConnectionFactory()
{
HostName = options.Hostname,
Port = options.Port,
UserName = options.Username,
Password = options.Password,
AutomaticRecoveryEnabled = true,
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: options.Topic);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += OnRecieved;
_channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
StartMessageTimer();
}
private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs)
{
var props = eventArgs.BasicProperties;
var body = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
_logger.LogDebug($"Recieved [{props.UserId}] " +
$"ConsumerTag: {eventArgs.ConsumerTag} | DeliveryTag: {eventArgs.DeliveryTag} | Payload: {body} | Priority: {props.Priority}");
return ProcessJsonMessageAsync(body);
}
public void Dispose()
{
StopMessageTimer();
_channel.Close();
_connection.Close();
}
}
}

View File

@ -1,6 +1,8 @@
using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Options;
using Birdmap.BLL.Services;
using Birdmap.BLL.Services.CommunationServices.RabbitMq;
using Birdmap.BLL.Services.CommunicationServices;
using Birdmap.BLL.Services.CommunicationServices.Mqtt;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
@ -43,54 +45,81 @@ namespace Birdmap.BLL
services.AddSignalR();
services.AddMqttClientServiceWithConfig(opt =>
{
var mqtt = configuration.GetSection("Mqtt");
var mqttClient = mqtt.GetSection("ClientSettings");
var client = mqtt.GetSection("ClientSettings");
var clientSettings = new
{
Id = mqttClient.GetValue<string>("Id"),
Username = mqttClient.GetValue<string>("Username"),
Password = mqttClient.GetValue<string>("Password"),
Topic = mqttClient.GetValue<string>("Topic"),
Id = client.GetValue<string>("Id"),
Username = client.GetValue<string>("Username"),
Password = client.GetValue<string>("Password"),
Topic = client.GetValue<string>("Topic"),
};
var mqttBrokerHost = mqtt.GetSection("BrokerHostSettings");
var brokerHost = mqtt.GetSection("BrokerHostSettings");
var brokerHostSettings = new
{
Host = mqttBrokerHost.GetValue<string>("Host"),
Port = mqttBrokerHost.GetValue<int>("Port"),
Host = brokerHost.GetValue<string>("Host"),
Port = brokerHost.GetValue<int>("Port"),
};
if (configuration.GetValue<bool>("UseRabbitMq"))
{
services.AddRabbitMqClientServiceWithConfig(new RabbitMqClientOptions(
Hostname: brokerHostSettings.Host,
Port: brokerHostSettings.Port,
Username: clientSettings.Username,
Password: clientSettings.Password,
Topic: clientSettings.Topic));
}
else
{
services.AddMqttClientServiceWithConfig(opt =>
{
opt
.WithTopic(clientSettings.Topic)
.WithCredentials(clientSettings.Username, clientSettings.Password)
.WithClientId(clientSettings.Id)
.WithTcpServer(brokerHostSettings.Host, brokerHostSettings.Port);
});
}
return services;
}
private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action<AspCoreMqttClientOptions> configureOptions)
private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action<MqttClientOptions> configureOptions)
{
services.AddSingleton(serviceProvider =>
{
var optionBuilder = new AspCoreMqttClientOptions(serviceProvider);
var optionBuilder = new MqttClientOptions(serviceProvider);
configureOptions(optionBuilder);
return optionBuilder.Build();
});
services.AddSingleton<MqttClientService>();
services.AddSingleton<IHostedService>(serviceProvider =>
services.AddClientServiceWithProvider<MqttClientService>();
return services;
}
private static IServiceCollection AddRabbitMqClientServiceWithConfig(this IServiceCollection services, RabbitMqClientOptions options)
{
return serviceProvider.GetService<MqttClientService>();
});
services.AddSingleton(options);
services.AddClientServiceWithProvider<RabbitMqClientService>();
return services;
}
private static IServiceCollection AddClientServiceWithProvider<T>(this IServiceCollection services) where T : class, ICommunicationService
{
services.AddSingleton<T>();
services.AddSingleton(serviceProvider =>
{
var mqttClientService = serviceProvider.GetService<MqttClientService>();
var mqttClientServiceProvider = new MqttClientServiceProvider(mqttClientService);
return mqttClientServiceProvider;
return (IHostedService)serviceProvider.GetService<T>();
});
services.AddSingleton<ICommunicationServiceProvider>(serviceProvider =>
{
var clientService = serviceProvider.GetService<T>();
var clientServiceProvider = new CommunicationServiceProvider(clientService);
return clientServiceProvider;
});
return services;
}

View File

@ -267,7 +267,7 @@
this.trackBar1.LargeChange = 500;
this.trackBar1.Location = new System.Drawing.Point(180, 162);
this.trackBar1.Maximum = 5050;
this.trackBar1.Minimum = 50;
this.trackBar1.Minimum = 1;
this.trackBar1.Name = "trackBar1";
this.trackBar1.Size = new System.Drawing.Size(247, 45);
this.trackBar1.SmallChange = 100;

View File

@ -38,6 +38,7 @@ services:
- Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices
- Birdmap_UseDummyServices=true
- Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/
- Birdmap_UseRabbitMq=true
- Birdmap_Mqtt__BrokerHostSettings__Host=localhost
- Birdmap_Mqtt__BrokerHostSettings__Port=1883
- Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client

BIN
docs/thesis.pdf Normal file

Binary file not shown.