diff --git a/Birdmap.API/Controllers/ServicesController.cs b/Birdmap.API/Controllers/ServicesController.cs index b994438..10ea18d 100644 --- a/Birdmap.API/Controllers/ServicesController.cs +++ b/Birdmap.API/Controllers/ServicesController.cs @@ -2,7 +2,6 @@ using Birdmap.API.DTOs; using Birdmap.BLL.Interfaces; using Birdmap.BLL.Services.CommunicationServices.Hubs; -using Birdmap.BLL.Services.CommunicationServices.Mqtt; using Birdmap.DAL.Entities; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Http; @@ -25,16 +24,16 @@ namespace Birdmap.API.Controllers { private readonly IServiceService _service; private readonly IMapper _mapper; - private readonly IMqttClientService _mqttClientService; + private readonly ICommunicationService _communicationService; private readonly IHubContext _hubContext; private readonly ILogger _logger; - public ServicesController(IServiceService service, IMapper mapper, MqttClientServiceProvider mqttClientProvider, + public ServicesController(IServiceService service, IMapper mapper, ICommunicationServiceProvider communicationServiceProvider, IHubContext hubContext, ILogger logger) { _service = service; _mapper = mapper; - _mqttClientService = mqttClientProvider.MqttClientService; + _communicationService = communicationServiceProvider.Service; _hubContext = hubContext; _logger = logger; } @@ -86,8 +85,8 @@ namespace Birdmap.API.Controllers Name = "Mqtt Client Service", Uri = "localhost", }, - Response = $"IsConnected: {_mqttClientService.IsConnected}", - StatusCode = _mqttClientService.IsConnected ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable, + Response = $"IsConnected: {_communicationService.IsConnected}", + StatusCode = _communicationService.IsConnected ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable, }); return serviceInfos.ToList(); diff --git a/Birdmap.API/appsettings.Development.json b/Birdmap.API/appsettings.Development.json index a292e83..518268d 100644 --- a/Birdmap.API/appsettings.Development.json +++ b/Birdmap.API/appsettings.Development.json @@ -38,6 +38,7 @@ }, "UseDummyServices": true, "ServicesBaseUrl": "https://birb.k8s.kmlabz.com/", + "UseRabbitMq": false, "Mqtt": { "BrokerHostSettings": { "Host": "localhost", diff --git a/Birdmap.API/appsettings.json b/Birdmap.API/appsettings.json index 9f971dd..389cfb2 100644 --- a/Birdmap.API/appsettings.json +++ b/Birdmap.API/appsettings.json @@ -24,6 +24,7 @@ }, "UseDummyServices": false, "ServicesBaseUrl": "https://birb.k8s.kmlabz.com/", + "UseRabbitMq": false, "Mqtt": { "BrokerHostSettings": { "Host": "", diff --git a/Birdmap.API/nlog.config b/Birdmap.API/nlog.config index e828073..1efa173 100644 --- a/Birdmap.API/nlog.config +++ b/Birdmap.API/nlog.config @@ -35,6 +35,8 @@ + + diff --git a/Birdmap.BLL/Birdmap.BLL.csproj b/Birdmap.BLL/Birdmap.BLL.csproj index 59e5e5c..37ef79b 100644 --- a/Birdmap.BLL/Birdmap.BLL.csproj +++ b/Birdmap.BLL/Birdmap.BLL.csproj @@ -9,6 +9,7 @@ + diff --git a/Birdmap.BLL/Interfaces/ICommunicationService.cs b/Birdmap.BLL/Interfaces/ICommunicationService.cs new file mode 100644 index 0000000..9bc095b --- /dev/null +++ b/Birdmap.BLL/Interfaces/ICommunicationService.cs @@ -0,0 +1,7 @@ +namespace Birdmap.BLL.Interfaces +{ + public interface ICommunicationService + { + public bool IsConnected { get; } + } +} diff --git a/Birdmap.BLL/Interfaces/ICommunicationServiceProvider.cs b/Birdmap.BLL/Interfaces/ICommunicationServiceProvider.cs new file mode 100644 index 0000000..e0a7e89 --- /dev/null +++ b/Birdmap.BLL/Interfaces/ICommunicationServiceProvider.cs @@ -0,0 +1,7 @@ +namespace Birdmap.BLL.Interfaces +{ + public interface ICommunicationServiceProvider + { + public ICommunicationService Service { get; } + } +} diff --git a/Birdmap.BLL/Interfaces/IMqttClientService.cs b/Birdmap.BLL/Interfaces/IMqttClientService.cs index e55d726..1c61cc8 100644 --- a/Birdmap.BLL/Interfaces/IMqttClientService.cs +++ b/Birdmap.BLL/Interfaces/IMqttClientService.cs @@ -10,6 +10,5 @@ namespace Birdmap.BLL.Interfaces IMqttClientDisconnectedHandler, IMqttApplicationMessageReceivedHandler { - public bool IsConnected { get; } } } diff --git a/Birdmap.BLL/Options/AspCoreMqttClientOptions.cs b/Birdmap.BLL/Options/MqttClientOptions.cs similarity index 60% rename from Birdmap.BLL/Options/AspCoreMqttClientOptions.cs rename to Birdmap.BLL/Options/MqttClientOptions.cs index f0e1191..f9fde38 100644 --- a/Birdmap.BLL/Options/AspCoreMqttClientOptions.cs +++ b/Birdmap.BLL/Options/MqttClientOptions.cs @@ -3,16 +3,16 @@ using System; namespace Birdmap.BLL.Options { - public class AspCoreMqttClientOptions : MqttClientOptionsBuilder + public class MqttClientOptions : MqttClientOptionsBuilder { public IServiceProvider ServiceProvider { get; } - public AspCoreMqttClientOptions(IServiceProvider serviceProvider) + public MqttClientOptions(IServiceProvider serviceProvider) { ServiceProvider = serviceProvider; } - public AspCoreMqttClientOptions WithTopic(string topic) + public MqttClientOptions WithTopic(string topic) { WithUserProperty("Topic", topic); diff --git a/Birdmap.BLL/Options/RabbitMqClientOptions.cs b/Birdmap.BLL/Options/RabbitMqClientOptions.cs new file mode 100644 index 0000000..7e5ea54 --- /dev/null +++ b/Birdmap.BLL/Options/RabbitMqClientOptions.cs @@ -0,0 +1,4 @@ +namespace Birdmap.BLL.Options +{ + public record RabbitMqClientOptions(string Hostname, int Port, string Username, string Password, string Topic); +} diff --git a/Birdmap.BLL/Services/CommunationServices/CommunicationServiceBase.cs b/Birdmap.BLL/Services/CommunationServices/CommunicationServiceBase.cs new file mode 100644 index 0000000..b5371a4 --- /dev/null +++ b/Birdmap.BLL/Services/CommunationServices/CommunicationServiceBase.cs @@ -0,0 +1,87 @@ +using Birdmap.BLL.Interfaces; +using Birdmap.BLL.Services.CommunicationServices.Hubs; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Timer = System.Timers.Timer; + +namespace Birdmap.BLL.Services.CommunationServices +{ + internal class Payload + { + [JsonProperty("tag")] + public Guid TagID { get; set; } + + [JsonProperty("probability")] + public double Probability { get; set; } + } + + internal abstract class CommunicationServiceBase : ICommunicationService + { + protected readonly ILogger _logger; + protected readonly IInputService _inputService; + protected readonly IHubContext _hubContext; + private readonly Timer _hubTimer; + private readonly List _messages = new(); + private readonly object _messageLock = new(); + + public abstract bool IsConnected { get; } + + public CommunicationServiceBase(ILogger logger, IInputService inputService, IHubContext hubContext) + { + _logger = logger; + _inputService = inputService; + _hubContext = hubContext; + _hubTimer = new Timer() + { + AutoReset = true, + Interval = 1000, + }; + _hubTimer.Elapsed += SendMqttMessagesWithSignalR; + } + + protected async Task ProcessJsonMessageAsync(string json) + { + try + { + var payload = JsonConvert.DeserializeObject(json); + var inputResponse = await _inputService.GetInputAsync(payload.TagID); + + lock (_messageLock) + { + _messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability)); + } + } + catch (Exception ex) + { + _logger.LogError(ex, $"Could not handle application message."); + } + } + + protected void StartMessageTimer() + { + _hubTimer.Start(); + } + protected void StopMessageTimer() + { + _hubTimer.Stop(); + } + + private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e) + { + lock (_messageLock) + { + if (_messages.Any()) + { + _logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}"); + _hubContext.Clients.All.NotifyMessagesAsync(_messages); + _messages.Clear(); + } + } + } + } +} diff --git a/Birdmap.BLL/Services/CommunationServices/CommunicationServiceProvider.cs b/Birdmap.BLL/Services/CommunationServices/CommunicationServiceProvider.cs new file mode 100644 index 0000000..4cb9f46 --- /dev/null +++ b/Birdmap.BLL/Services/CommunationServices/CommunicationServiceProvider.cs @@ -0,0 +1,14 @@ +using Birdmap.BLL.Interfaces; + +namespace Birdmap.BLL.Services.CommunicationServices +{ + internal class CommunicationServiceProvider : ICommunicationServiceProvider + { + public ICommunicationService Service { get; } + + public CommunicationServiceProvider(ICommunicationService service) + { + Service = service; + } + } +} diff --git a/Birdmap.BLL/Services/CommunationServices/Mqtt/MqttClientService.cs b/Birdmap.BLL/Services/CommunationServices/Mqtt/MqttClientService.cs index 6143bde..ea68b9e 100644 --- a/Birdmap.BLL/Services/CommunationServices/Mqtt/MqttClientService.cs +++ b/Birdmap.BLL/Services/CommunationServices/Mqtt/MqttClientService.cs @@ -1,4 +1,5 @@ using Birdmap.BLL.Interfaces; +using Birdmap.BLL.Services.CommunationServices; using Birdmap.BLL.Services.CommunicationServices.Hubs; using Microsoft.AspNetCore.SignalR; using Microsoft.Extensions.Logging; @@ -7,59 +8,29 @@ using MQTTnet.Client; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; -using Newtonsoft.Json; using System; -using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; -using Timer = System.Timers.Timer; namespace Birdmap.BLL.Services.CommunicationServices.Mqtt { - public class MqttClientService : IMqttClientService + internal class MqttClientService : CommunicationServiceBase, IMqttClientService { private readonly IMqttClient _mqttClient; private readonly IMqttClientOptions _options; - private readonly ILogger _logger; - private readonly IInputService _inputService; - private readonly IHubContext _hubContext; - private readonly Timer _hubTimer; - private readonly List _messages = new(); - private readonly object _messageLock = new(); - public bool IsConnected => _mqttClient.IsConnected; + public override bool IsConnected => _mqttClient.IsConnected; public MqttClientService(IMqttClientOptions options, ILogger logger, IInputService inputService, IHubContext hubContext) + : base(logger, inputService, hubContext) { _options = options; - _logger = logger; - _inputService = inputService; - _hubContext = hubContext; - _hubTimer = new Timer() - { - AutoReset = true, - Interval = 1000, - }; - _hubTimer.Elapsed += SendMqttMessagesWithSignalR; _mqttClient = new MqttFactory().CreateMqttClient(); ConfigureMqttClient(); } - private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e) - { - lock (_messageLock) - { - if (_messages.Any()) - { - _logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}"); - _hubContext.Clients.All.NotifyMessagesAsync(_messages); - _messages.Clear(); - } - } - } - private void ConfigureMqttClient() { _mqttClient.ConnectedHandler = this; @@ -67,36 +38,14 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt _mqttClient.ApplicationMessageReceivedHandler = this; } - private class Payload - { - [JsonProperty("tag")] - public Guid TagID { get; set; } - - [JsonProperty("probability")] - public double Probability { get; set; } - } - - public async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs) + public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs) { var message = eventArgs.ApplicationMessage.ConvertPayloadToString(); _logger.LogDebug($"Recieved [{eventArgs.ClientId}] " + $"Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {message} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel} | Retain: {eventArgs.ApplicationMessage.Retain}"); - try - { - var payload = JsonConvert.DeserializeObject(message); - var inputResponse = await _inputService.GetInputAsync(payload.TagID); - - lock (_messageLock) - { - _messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability)); - } - } - catch (Exception ex) - { - _logger.LogError(ex, $"Could not handle application message."); - } + return ProcessJsonMessageAsync(message); } public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs) @@ -107,7 +56,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt _logger.LogInformation($"Connected. Auth result: {eventArgs.AuthenticateResult}. Subscribing to topic: {topic}"); await _mqttClient.SubscribeAsync(topic); - _hubTimer.Start(); + StartMessageTimer(); } catch (Exception ex) { @@ -123,7 +72,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt try { - _hubTimer.Stop(); + StopMessageTimer(); await _mqttClient.ConnectAsync(_options, CancellationToken.None); } catch (Exception ex) diff --git a/Birdmap.BLL/Services/CommunationServices/Mqtt/MqttClientServiceProvider.cs b/Birdmap.BLL/Services/CommunationServices/Mqtt/MqttClientServiceProvider.cs deleted file mode 100644 index f6288ad..0000000 --- a/Birdmap.BLL/Services/CommunationServices/Mqtt/MqttClientServiceProvider.cs +++ /dev/null @@ -1,14 +0,0 @@ -using Birdmap.BLL.Interfaces; - -namespace Birdmap.BLL.Services.CommunicationServices.Mqtt -{ - public class MqttClientServiceProvider - { - public IMqttClientService MqttClientService { get; } - - public MqttClientServiceProvider(IMqttClientService mqttClientService) - { - MqttClientService = mqttClientService; - } - } -} diff --git a/Birdmap.BLL/Services/CommunationServices/RabbitMq/RabbitMqClientService.cs b/Birdmap.BLL/Services/CommunationServices/RabbitMq/RabbitMqClientService.cs new file mode 100644 index 0000000..a037b78 --- /dev/null +++ b/Birdmap.BLL/Services/CommunationServices/RabbitMq/RabbitMqClientService.cs @@ -0,0 +1,72 @@ +using Birdmap.BLL.Interfaces; +using Birdmap.BLL.Options; +using Birdmap.BLL.Services.CommunicationServices.Hubs; +using Microsoft.AspNetCore.SignalR; +using Microsoft.Extensions.Logging; +using RabbitMQ.Client; +using RabbitMQ.Client.Events; +using System; +using System.Text; +using System.Threading.Tasks; + +namespace Birdmap.BLL.Services.CommunationServices.RabbitMq +{ + internal class RabbitMqClientService : CommunicationServiceBase, IDisposable + { + private readonly IConnection _connection; + private readonly IModel _channel; + + public override bool IsConnected => throw new NotImplementedException(); + + public RabbitMqClientService(RabbitMqClientOptions options, ILogger logger, IInputService inputService, IHubContext hubContext) + : base(logger, inputService, hubContext) + { + var factory = new ConnectionFactory() + { + HostName = options.Hostname, + Port = options.Port, + UserName = options.Username, + Password = options.Password, + + AutomaticRecoveryEnabled = true, + }; + + _connection = factory.CreateConnection(); + _channel = _connection.CreateModel(); + + _channel.ExchangeDeclare(exchange: "topic_logs", type: "topic"); + var queueName = _channel.QueueDeclare().QueueName; + + _channel.QueueBind(queue: queueName, + exchange: "topic_logs", + routingKey: options.Topic); + + var consumer = new AsyncEventingBasicConsumer(_channel); + consumer.Received += OnRecieved; + + _channel.BasicConsume(queue: queueName, + autoAck: true, + consumer: consumer); + + StartMessageTimer(); + } + + private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs) + { + var props = eventArgs.BasicProperties; + var body = Encoding.UTF8.GetString(eventArgs.Body.ToArray()); + + _logger.LogDebug($"Recieved [{props.UserId}] " + + $"ConsumerTag: {eventArgs.ConsumerTag} | DeliveryTag: {eventArgs.DeliveryTag} | Payload: {body} | Priority: {props.Priority}"); + + return ProcessJsonMessageAsync(body); + } + + public void Dispose() + { + StopMessageTimer(); + _channel.Close(); + _connection.Close(); + } + } +} diff --git a/Birdmap.BLL/Startup.cs b/Birdmap.BLL/Startup.cs index d3dafde..73bdb57 100644 --- a/Birdmap.BLL/Startup.cs +++ b/Birdmap.BLL/Startup.cs @@ -1,6 +1,8 @@ using Birdmap.BLL.Interfaces; using Birdmap.BLL.Options; using Birdmap.BLL.Services; +using Birdmap.BLL.Services.CommunationServices.RabbitMq; +using Birdmap.BLL.Services.CommunicationServices; using Birdmap.BLL.Services.CommunicationServices.Mqtt; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -43,54 +45,81 @@ namespace Birdmap.BLL services.AddSignalR(); - services.AddMqttClientServiceWithConfig(opt => + var mqtt = configuration.GetSection("Mqtt"); + + var client = mqtt.GetSection("ClientSettings"); + var clientSettings = new { - var mqtt = configuration.GetSection("Mqtt"); + Id = client.GetValue("Id"), + Username = client.GetValue("Username"), + Password = client.GetValue("Password"), + Topic = client.GetValue("Topic"), + }; - var mqttClient = mqtt.GetSection("ClientSettings"); - var clientSettings = new + var brokerHost = mqtt.GetSection("BrokerHostSettings"); + var brokerHostSettings = new + { + Host = brokerHost.GetValue("Host"), + Port = brokerHost.GetValue("Port"), + }; + + if (configuration.GetValue("UseRabbitMq")) + { + services.AddRabbitMqClientServiceWithConfig(new RabbitMqClientOptions( + Hostname: brokerHostSettings.Host, + Port: brokerHostSettings.Port, + Username: clientSettings.Username, + Password: clientSettings.Password, + Topic: clientSettings.Topic)); + } + else + { + services.AddMqttClientServiceWithConfig(opt => { - Id = mqttClient.GetValue("Id"), - Username = mqttClient.GetValue("Username"), - Password = mqttClient.GetValue("Password"), - Topic = mqttClient.GetValue("Topic"), - }; - - var mqttBrokerHost = mqtt.GetSection("BrokerHostSettings"); - var brokerHostSettings = new - { - Host = mqttBrokerHost.GetValue("Host"), - Port = mqttBrokerHost.GetValue("Port"), - }; - - opt - .WithTopic(clientSettings.Topic) - .WithCredentials(clientSettings.Username, clientSettings.Password) - .WithClientId(clientSettings.Id) - .WithTcpServer(brokerHostSettings.Host, brokerHostSettings.Port); - }); + opt + .WithTopic(clientSettings.Topic) + .WithCredentials(clientSettings.Username, clientSettings.Password) + .WithClientId(clientSettings.Id) + .WithTcpServer(brokerHostSettings.Host, brokerHostSettings.Port); + }); + } return services; } - private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action configureOptions) + private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action configureOptions) { services.AddSingleton(serviceProvider => { - var optionBuilder = new AspCoreMqttClientOptions(serviceProvider); + var optionBuilder = new MqttClientOptions(serviceProvider); configureOptions(optionBuilder); return optionBuilder.Build(); }); - services.AddSingleton(); - services.AddSingleton(serviceProvider => - { - return serviceProvider.GetService(); - }); + + services.AddClientServiceWithProvider(); + return services; + } + + private static IServiceCollection AddRabbitMqClientServiceWithConfig(this IServiceCollection services, RabbitMqClientOptions options) + { + services.AddSingleton(options); + + services.AddClientServiceWithProvider(); + return services; + } + + private static IServiceCollection AddClientServiceWithProvider(this IServiceCollection services) where T : class, ICommunicationService + { + services.AddSingleton(); services.AddSingleton(serviceProvider => { - var mqttClientService = serviceProvider.GetService(); - var mqttClientServiceProvider = new MqttClientServiceProvider(mqttClientService); - return mqttClientServiceProvider; + return (IHostedService)serviceProvider.GetService(); + }); + services.AddSingleton(serviceProvider => + { + var clientService = serviceProvider.GetService(); + var clientServiceProvider = new CommunicationServiceProvider(clientService); + return clientServiceProvider; }); return services; } diff --git a/MQTTnet.TestApp.WinForm/Form1.Designer.cs b/MQTTnet.TestApp.WinForm/Form1.Designer.cs index e8083e9..ed42f24 100644 --- a/MQTTnet.TestApp.WinForm/Form1.Designer.cs +++ b/MQTTnet.TestApp.WinForm/Form1.Designer.cs @@ -267,7 +267,7 @@ this.trackBar1.LargeChange = 500; this.trackBar1.Location = new System.Drawing.Point(180, 162); this.trackBar1.Maximum = 5050; - this.trackBar1.Minimum = 50; + this.trackBar1.Minimum = 1; this.trackBar1.Name = "trackBar1"; this.trackBar1.Size = new System.Drawing.Size(247, 45); this.trackBar1.SmallChange = 100; diff --git a/docker-compose.yml b/docker-compose.yml index 538c848..37bfc4b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -38,6 +38,7 @@ services: - Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices - Birdmap_UseDummyServices=true - Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/ + - Birdmap_UseRabbitMq=true - Birdmap_Mqtt__BrokerHostSettings__Host=localhost - Birdmap_Mqtt__BrokerHostSettings__Port=1883 - Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client diff --git a/docs/thesis.pdf b/docs/thesis.pdf new file mode 100644 index 0000000..d46571b Binary files /dev/null and b/docs/thesis.pdf differ