Adding RabbitMq support #1
@ -2,7 +2,6 @@
|
|||||||
using Birdmap.API.DTOs;
|
using Birdmap.API.DTOs;
|
||||||
using Birdmap.BLL.Interfaces;
|
using Birdmap.BLL.Interfaces;
|
||||||
using Birdmap.BLL.Services.CommunicationServices.Hubs;
|
using Birdmap.BLL.Services.CommunicationServices.Hubs;
|
||||||
using Birdmap.BLL.Services.CommunicationServices.Mqtt;
|
|
||||||
using Birdmap.DAL.Entities;
|
using Birdmap.DAL.Entities;
|
||||||
using Microsoft.AspNetCore.Authorization;
|
using Microsoft.AspNetCore.Authorization;
|
||||||
using Microsoft.AspNetCore.Http;
|
using Microsoft.AspNetCore.Http;
|
||||||
@ -25,16 +24,16 @@ namespace Birdmap.API.Controllers
|
|||||||
{
|
{
|
||||||
private readonly IServiceService _service;
|
private readonly IServiceService _service;
|
||||||
private readonly IMapper _mapper;
|
private readonly IMapper _mapper;
|
||||||
private readonly IMqttClientService _mqttClientService;
|
private readonly ICommunicationService _communicationService;
|
||||||
private readonly IHubContext<ServicesHub, IServicesHubClient> _hubContext;
|
private readonly IHubContext<ServicesHub, IServicesHubClient> _hubContext;
|
||||||
private readonly ILogger<ServicesController> _logger;
|
private readonly ILogger<ServicesController> _logger;
|
||||||
|
|
||||||
public ServicesController(IServiceService service, IMapper mapper, MqttClientServiceProvider mqttClientProvider,
|
public ServicesController(IServiceService service, IMapper mapper, ICommunicationServiceProvider communicationServiceProvider,
|
||||||
IHubContext<ServicesHub, IServicesHubClient> hubContext, ILogger<ServicesController> logger)
|
IHubContext<ServicesHub, IServicesHubClient> hubContext, ILogger<ServicesController> logger)
|
||||||
{
|
{
|
||||||
_service = service;
|
_service = service;
|
||||||
_mapper = mapper;
|
_mapper = mapper;
|
||||||
_mqttClientService = mqttClientProvider.MqttClientService;
|
_communicationService = communicationServiceProvider.Service;
|
||||||
_hubContext = hubContext;
|
_hubContext = hubContext;
|
||||||
_logger = logger;
|
_logger = logger;
|
||||||
}
|
}
|
||||||
@ -86,8 +85,8 @@ namespace Birdmap.API.Controllers
|
|||||||
Name = "Mqtt Client Service",
|
Name = "Mqtt Client Service",
|
||||||
Uri = "localhost",
|
Uri = "localhost",
|
||||||
},
|
},
|
||||||
Response = $"IsConnected: {_mqttClientService.IsConnected}",
|
Response = $"IsConnected: {_communicationService.IsConnected}",
|
||||||
StatusCode = _mqttClientService.IsConnected ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable,
|
StatusCode = _communicationService.IsConnected ? HttpStatusCode.OK : HttpStatusCode.ServiceUnavailable,
|
||||||
});
|
});
|
||||||
|
|
||||||
return serviceInfos.ToList();
|
return serviceInfos.ToList();
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
},
|
},
|
||||||
"UseDummyServices": true,
|
"UseDummyServices": true,
|
||||||
"ServicesBaseUrl": "https://birb.k8s.kmlabz.com/",
|
"ServicesBaseUrl": "https://birb.k8s.kmlabz.com/",
|
||||||
|
"UseRabbitMq": false,
|
||||||
"Mqtt": {
|
"Mqtt": {
|
||||||
"BrokerHostSettings": {
|
"BrokerHostSettings": {
|
||||||
"Host": "localhost",
|
"Host": "localhost",
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
},
|
},
|
||||||
"UseDummyServices": false,
|
"UseDummyServices": false,
|
||||||
"ServicesBaseUrl": "https://birb.k8s.kmlabz.com/",
|
"ServicesBaseUrl": "https://birb.k8s.kmlabz.com/",
|
||||||
|
"UseRabbitMq": false,
|
||||||
"Mqtt": {
|
"Mqtt": {
|
||||||
"BrokerHostSettings": {
|
"BrokerHostSettings": {
|
||||||
"Host": "",
|
"Host": "",
|
||||||
|
@ -35,6 +35,8 @@
|
|||||||
|
|
||||||
<!--Skip non-critical Mqtt logs-->
|
<!--Skip non-critical Mqtt logs-->
|
||||||
<logger name="*.*Mqtt*.*" minlevel="Trace" maxlevel="Warning" writeTo="mqttFile" final="true"/>
|
<logger name="*.*Mqtt*.*" minlevel="Trace" maxlevel="Warning" writeTo="mqttFile" final="true"/>
|
||||||
|
<logger name="*.*RabbitMq*.*" minlevel="Trace" maxlevel="Warning" writeTo="mqttFile" final="true"/>
|
||||||
|
<logger name="*.*CommunicationServiceBase*.*" minlevel="Trace" maxlevel="Warning" writeTo="mqttFile" final="true"/>
|
||||||
|
|
||||||
<!--Skip non-critical Hub logs-->
|
<!--Skip non-critical Hub logs-->
|
||||||
<logger name="*.*Hubs*.*" minlevel="Trace" maxlevel="Warning" writeTo="hubsFile" final="true"/>
|
<logger name="*.*Hubs*.*" minlevel="Trace" maxlevel="Warning" writeTo="hubsFile" final="true"/>
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
<PackageReference Include="Microsoft.AspNetCore.SignalR.Core" Version="1.1.0" />
|
<PackageReference Include="Microsoft.AspNetCore.SignalR.Core" Version="1.1.0" />
|
||||||
<PackageReference Include="MQTTnet" Version="3.0.13" />
|
<PackageReference Include="MQTTnet" Version="3.0.13" />
|
||||||
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
|
<PackageReference Include="Newtonsoft.Json" Version="12.0.3" />
|
||||||
|
<PackageReference Include="RabbitMQ.Client" Version="6.2.1" />
|
||||||
</ItemGroup>
|
</ItemGroup>
|
||||||
|
|
||||||
<ItemGroup>
|
<ItemGroup>
|
||||||
|
7
Birdmap.BLL/Interfaces/ICommunicationService.cs
Normal file
7
Birdmap.BLL/Interfaces/ICommunicationService.cs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
namespace Birdmap.BLL.Interfaces
|
||||||
|
{
|
||||||
|
public interface ICommunicationService
|
||||||
|
{
|
||||||
|
public bool IsConnected { get; }
|
||||||
|
}
|
||||||
|
}
|
7
Birdmap.BLL/Interfaces/ICommunicationServiceProvider.cs
Normal file
7
Birdmap.BLL/Interfaces/ICommunicationServiceProvider.cs
Normal file
@ -0,0 +1,7 @@
|
|||||||
|
namespace Birdmap.BLL.Interfaces
|
||||||
|
{
|
||||||
|
public interface ICommunicationServiceProvider
|
||||||
|
{
|
||||||
|
public ICommunicationService Service { get; }
|
||||||
|
}
|
||||||
|
}
|
@ -10,6 +10,5 @@ namespace Birdmap.BLL.Interfaces
|
|||||||
IMqttClientDisconnectedHandler,
|
IMqttClientDisconnectedHandler,
|
||||||
IMqttApplicationMessageReceivedHandler
|
IMqttApplicationMessageReceivedHandler
|
||||||
{
|
{
|
||||||
public bool IsConnected { get; }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,16 +3,16 @@ using System;
|
|||||||
|
|
||||||
namespace Birdmap.BLL.Options
|
namespace Birdmap.BLL.Options
|
||||||
{
|
{
|
||||||
public class AspCoreMqttClientOptions : MqttClientOptionsBuilder
|
public class MqttClientOptions : MqttClientOptionsBuilder
|
||||||
{
|
{
|
||||||
public IServiceProvider ServiceProvider { get; }
|
public IServiceProvider ServiceProvider { get; }
|
||||||
|
|
||||||
public AspCoreMqttClientOptions(IServiceProvider serviceProvider)
|
public MqttClientOptions(IServiceProvider serviceProvider)
|
||||||
{
|
{
|
||||||
ServiceProvider = serviceProvider;
|
ServiceProvider = serviceProvider;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AspCoreMqttClientOptions WithTopic(string topic)
|
public MqttClientOptions WithTopic(string topic)
|
||||||
{
|
{
|
||||||
WithUserProperty("Topic", topic);
|
WithUserProperty("Topic", topic);
|
||||||
|
|
4
Birdmap.BLL/Options/RabbitMqClientOptions.cs
Normal file
4
Birdmap.BLL/Options/RabbitMqClientOptions.cs
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
namespace Birdmap.BLL.Options
|
||||||
|
{
|
||||||
|
public record RabbitMqClientOptions(string Hostname, int Port, string Username, string Password, string Topic);
|
||||||
|
}
|
@ -0,0 +1,87 @@
|
|||||||
|
using Birdmap.BLL.Interfaces;
|
||||||
|
using Birdmap.BLL.Services.CommunicationServices.Hubs;
|
||||||
|
using Microsoft.AspNetCore.SignalR;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using Newtonsoft.Json;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
using System.Linq;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
using Timer = System.Timers.Timer;
|
||||||
|
|
||||||
|
namespace Birdmap.BLL.Services.CommunationServices
|
||||||
|
{
|
||||||
|
internal class Payload
|
||||||
|
{
|
||||||
|
[JsonProperty("tag")]
|
||||||
|
public Guid TagID { get; set; }
|
||||||
|
|
||||||
|
[JsonProperty("probability")]
|
||||||
|
public double Probability { get; set; }
|
||||||
|
}
|
||||||
|
|
||||||
|
internal abstract class CommunicationServiceBase : ICommunicationService
|
||||||
|
{
|
||||||
|
protected readonly ILogger _logger;
|
||||||
|
protected readonly IInputService _inputService;
|
||||||
|
protected readonly IHubContext<DevicesHub, IDevicesHubClient> _hubContext;
|
||||||
|
private readonly Timer _hubTimer;
|
||||||
|
private readonly List<Message> _messages = new();
|
||||||
|
private readonly object _messageLock = new();
|
||||||
|
|
||||||
|
public abstract bool IsConnected { get; }
|
||||||
|
|
||||||
|
public CommunicationServiceBase(ILogger logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
|
||||||
|
{
|
||||||
|
_logger = logger;
|
||||||
|
_inputService = inputService;
|
||||||
|
_hubContext = hubContext;
|
||||||
|
_hubTimer = new Timer()
|
||||||
|
{
|
||||||
|
AutoReset = true,
|
||||||
|
Interval = 1000,
|
||||||
|
};
|
||||||
|
_hubTimer.Elapsed += SendMqttMessagesWithSignalR;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected async Task ProcessJsonMessageAsync(string json)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
var payload = JsonConvert.DeserializeObject<Payload>(json);
|
||||||
|
var inputResponse = await _inputService.GetInputAsync(payload.TagID);
|
||||||
|
|
||||||
|
lock (_messageLock)
|
||||||
|
{
|
||||||
|
_messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogError(ex, $"Could not handle application message.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void StartMessageTimer()
|
||||||
|
{
|
||||||
|
_hubTimer.Start();
|
||||||
|
}
|
||||||
|
protected void StopMessageTimer()
|
||||||
|
{
|
||||||
|
_hubTimer.Stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e)
|
||||||
|
{
|
||||||
|
lock (_messageLock)
|
||||||
|
{
|
||||||
|
if (_messages.Any())
|
||||||
|
{
|
||||||
|
_logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}");
|
||||||
|
_hubContext.Clients.All.NotifyMessagesAsync(_messages);
|
||||||
|
_messages.Clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,14 @@
|
|||||||
|
using Birdmap.BLL.Interfaces;
|
||||||
|
|
||||||
|
namespace Birdmap.BLL.Services.CommunicationServices
|
||||||
|
{
|
||||||
|
internal class CommunicationServiceProvider : ICommunicationServiceProvider
|
||||||
|
{
|
||||||
|
public ICommunicationService Service { get; }
|
||||||
|
|
||||||
|
public CommunicationServiceProvider(ICommunicationService service)
|
||||||
|
{
|
||||||
|
Service = service;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,4 +1,5 @@
|
|||||||
using Birdmap.BLL.Interfaces;
|
using Birdmap.BLL.Interfaces;
|
||||||
|
using Birdmap.BLL.Services.CommunationServices;
|
||||||
using Birdmap.BLL.Services.CommunicationServices.Hubs;
|
using Birdmap.BLL.Services.CommunicationServices.Hubs;
|
||||||
using Microsoft.AspNetCore.SignalR;
|
using Microsoft.AspNetCore.SignalR;
|
||||||
using Microsoft.Extensions.Logging;
|
using Microsoft.Extensions.Logging;
|
||||||
@ -7,59 +8,29 @@ using MQTTnet.Client;
|
|||||||
using MQTTnet.Client.Connecting;
|
using MQTTnet.Client.Connecting;
|
||||||
using MQTTnet.Client.Disconnecting;
|
using MQTTnet.Client.Disconnecting;
|
||||||
using MQTTnet.Client.Options;
|
using MQTTnet.Client.Options;
|
||||||
using Newtonsoft.Json;
|
|
||||||
using System;
|
using System;
|
||||||
using System.Collections.Generic;
|
|
||||||
using System.Linq;
|
using System.Linq;
|
||||||
using System.Threading;
|
using System.Threading;
|
||||||
using System.Threading.Tasks;
|
using System.Threading.Tasks;
|
||||||
using Timer = System.Timers.Timer;
|
|
||||||
|
|
||||||
namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
|
namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
|
||||||
{
|
{
|
||||||
public class MqttClientService : IMqttClientService
|
internal class MqttClientService : CommunicationServiceBase, IMqttClientService
|
||||||
{
|
{
|
||||||
private readonly IMqttClient _mqttClient;
|
private readonly IMqttClient _mqttClient;
|
||||||
private readonly IMqttClientOptions _options;
|
private readonly IMqttClientOptions _options;
|
||||||
private readonly ILogger<MqttClientService> _logger;
|
|
||||||
private readonly IInputService _inputService;
|
|
||||||
private readonly IHubContext<DevicesHub, IDevicesHubClient> _hubContext;
|
|
||||||
private readonly Timer _hubTimer;
|
|
||||||
private readonly List<Message> _messages = new();
|
|
||||||
private readonly object _messageLock = new();
|
|
||||||
|
|
||||||
public bool IsConnected => _mqttClient.IsConnected;
|
public override bool IsConnected => _mqttClient.IsConnected;
|
||||||
|
|
||||||
public MqttClientService(IMqttClientOptions options, ILogger<MqttClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
|
public MqttClientService(IMqttClientOptions options, ILogger<MqttClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
|
||||||
|
: base(logger, inputService, hubContext)
|
||||||
{
|
{
|
||||||
_options = options;
|
_options = options;
|
||||||
_logger = logger;
|
|
||||||
_inputService = inputService;
|
|
||||||
_hubContext = hubContext;
|
|
||||||
_hubTimer = new Timer()
|
|
||||||
{
|
|
||||||
AutoReset = true,
|
|
||||||
Interval = 1000,
|
|
||||||
};
|
|
||||||
_hubTimer.Elapsed += SendMqttMessagesWithSignalR;
|
|
||||||
|
|
||||||
_mqttClient = new MqttFactory().CreateMqttClient();
|
_mqttClient = new MqttFactory().CreateMqttClient();
|
||||||
ConfigureMqttClient();
|
ConfigureMqttClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e)
|
|
||||||
{
|
|
||||||
lock (_messageLock)
|
|
||||||
{
|
|
||||||
if (_messages.Any())
|
|
||||||
{
|
|
||||||
_logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}");
|
|
||||||
_hubContext.Clients.All.NotifyMessagesAsync(_messages);
|
|
||||||
_messages.Clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void ConfigureMqttClient()
|
private void ConfigureMqttClient()
|
||||||
{
|
{
|
||||||
_mqttClient.ConnectedHandler = this;
|
_mqttClient.ConnectedHandler = this;
|
||||||
@ -67,36 +38,14 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
|
|||||||
_mqttClient.ApplicationMessageReceivedHandler = this;
|
_mqttClient.ApplicationMessageReceivedHandler = this;
|
||||||
}
|
}
|
||||||
|
|
||||||
private class Payload
|
public Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
|
||||||
{
|
|
||||||
[JsonProperty("tag")]
|
|
||||||
public Guid TagID { get; set; }
|
|
||||||
|
|
||||||
[JsonProperty("probability")]
|
|
||||||
public double Probability { get; set; }
|
|
||||||
}
|
|
||||||
|
|
||||||
public async Task HandleApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs eventArgs)
|
|
||||||
{
|
{
|
||||||
var message = eventArgs.ApplicationMessage.ConvertPayloadToString();
|
var message = eventArgs.ApplicationMessage.ConvertPayloadToString();
|
||||||
|
|
||||||
_logger.LogDebug($"Recieved [{eventArgs.ClientId}] " +
|
_logger.LogDebug($"Recieved [{eventArgs.ClientId}] " +
|
||||||
$"Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {message} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel} | Retain: {eventArgs.ApplicationMessage.Retain}");
|
$"Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {message} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel} | Retain: {eventArgs.ApplicationMessage.Retain}");
|
||||||
|
|
||||||
try
|
return ProcessJsonMessageAsync(message);
|
||||||
{
|
|
||||||
var payload = JsonConvert.DeserializeObject<Payload>(message);
|
|
||||||
var inputResponse = await _inputService.GetInputAsync(payload.TagID);
|
|
||||||
|
|
||||||
lock (_messageLock)
|
|
||||||
{
|
|
||||||
_messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception ex)
|
|
||||||
{
|
|
||||||
_logger.LogError(ex, $"Could not handle application message.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
|
public async Task HandleConnectedAsync(MqttClientConnectedEventArgs eventArgs)
|
||||||
@ -107,7 +56,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
|
|||||||
_logger.LogInformation($"Connected. Auth result: {eventArgs.AuthenticateResult}. Subscribing to topic: {topic}");
|
_logger.LogInformation($"Connected. Auth result: {eventArgs.AuthenticateResult}. Subscribing to topic: {topic}");
|
||||||
|
|
||||||
await _mqttClient.SubscribeAsync(topic);
|
await _mqttClient.SubscribeAsync(topic);
|
||||||
_hubTimer.Start();
|
StartMessageTimer();
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
{
|
{
|
||||||
@ -123,7 +72,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
|
|||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
_hubTimer.Stop();
|
StopMessageTimer();
|
||||||
await _mqttClient.ConnectAsync(_options, CancellationToken.None);
|
await _mqttClient.ConnectAsync(_options, CancellationToken.None);
|
||||||
}
|
}
|
||||||
catch (Exception ex)
|
catch (Exception ex)
|
||||||
|
@ -1,14 +0,0 @@
|
|||||||
using Birdmap.BLL.Interfaces;
|
|
||||||
|
|
||||||
namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
|
|
||||||
{
|
|
||||||
public class MqttClientServiceProvider
|
|
||||||
{
|
|
||||||
public IMqttClientService MqttClientService { get; }
|
|
||||||
|
|
||||||
public MqttClientServiceProvider(IMqttClientService mqttClientService)
|
|
||||||
{
|
|
||||||
MqttClientService = mqttClientService;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -0,0 +1,72 @@
|
|||||||
|
using Birdmap.BLL.Interfaces;
|
||||||
|
using Birdmap.BLL.Options;
|
||||||
|
using Birdmap.BLL.Services.CommunicationServices.Hubs;
|
||||||
|
using Microsoft.AspNetCore.SignalR;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using RabbitMQ.Client;
|
||||||
|
using RabbitMQ.Client.Events;
|
||||||
|
using System;
|
||||||
|
using System.Text;
|
||||||
|
using System.Threading.Tasks;
|
||||||
|
|
||||||
|
namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
|
||||||
|
{
|
||||||
|
internal class RabbitMqClientService : CommunicationServiceBase, IDisposable
|
||||||
|
{
|
||||||
|
private readonly IConnection _connection;
|
||||||
|
private readonly IModel _channel;
|
||||||
|
|
||||||
|
public override bool IsConnected => throw new NotImplementedException();
|
||||||
|
|
||||||
|
public RabbitMqClientService(RabbitMqClientOptions options, ILogger<RabbitMqClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
|
||||||
|
: base(logger, inputService, hubContext)
|
||||||
|
{
|
||||||
|
var factory = new ConnectionFactory()
|
||||||
|
{
|
||||||
|
HostName = options.Hostname,
|
||||||
|
Port = options.Port,
|
||||||
|
UserName = options.Username,
|
||||||
|
Password = options.Password,
|
||||||
|
|
||||||
|
AutomaticRecoveryEnabled = true,
|
||||||
|
};
|
||||||
|
|
||||||
|
_connection = factory.CreateConnection();
|
||||||
|
_channel = _connection.CreateModel();
|
||||||
|
|
||||||
|
_channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
|
||||||
|
var queueName = _channel.QueueDeclare().QueueName;
|
||||||
|
|
||||||
|
_channel.QueueBind(queue: queueName,
|
||||||
|
exchange: "topic_logs",
|
||||||
|
routingKey: options.Topic);
|
||||||
|
|
||||||
|
var consumer = new AsyncEventingBasicConsumer(_channel);
|
||||||
|
consumer.Received += OnRecieved;
|
||||||
|
|
||||||
|
_channel.BasicConsume(queue: queueName,
|
||||||
|
autoAck: true,
|
||||||
|
consumer: consumer);
|
||||||
|
|
||||||
|
StartMessageTimer();
|
||||||
|
}
|
||||||
|
|
||||||
|
private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs)
|
||||||
|
{
|
||||||
|
var props = eventArgs.BasicProperties;
|
||||||
|
var body = Encoding.UTF8.GetString(eventArgs.Body.ToArray());
|
||||||
|
|
||||||
|
_logger.LogDebug($"Recieved [{props.UserId}] " +
|
||||||
|
$"ConsumerTag: {eventArgs.ConsumerTag} | DeliveryTag: {eventArgs.DeliveryTag} | Payload: {body} | Priority: {props.Priority}");
|
||||||
|
|
||||||
|
return ProcessJsonMessageAsync(body);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose()
|
||||||
|
{
|
||||||
|
StopMessageTimer();
|
||||||
|
_channel.Close();
|
||||||
|
_connection.Close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -1,6 +1,8 @@
|
|||||||
using Birdmap.BLL.Interfaces;
|
using Birdmap.BLL.Interfaces;
|
||||||
using Birdmap.BLL.Options;
|
using Birdmap.BLL.Options;
|
||||||
using Birdmap.BLL.Services;
|
using Birdmap.BLL.Services;
|
||||||
|
using Birdmap.BLL.Services.CommunationServices.RabbitMq;
|
||||||
|
using Birdmap.BLL.Services.CommunicationServices;
|
||||||
using Birdmap.BLL.Services.CommunicationServices.Mqtt;
|
using Birdmap.BLL.Services.CommunicationServices.Mqtt;
|
||||||
using Microsoft.Extensions.Configuration;
|
using Microsoft.Extensions.Configuration;
|
||||||
using Microsoft.Extensions.DependencyInjection;
|
using Microsoft.Extensions.DependencyInjection;
|
||||||
@ -43,54 +45,81 @@ namespace Birdmap.BLL
|
|||||||
|
|
||||||
services.AddSignalR();
|
services.AddSignalR();
|
||||||
|
|
||||||
services.AddMqttClientServiceWithConfig(opt =>
|
var mqtt = configuration.GetSection("Mqtt");
|
||||||
|
|
||||||
|
var client = mqtt.GetSection("ClientSettings");
|
||||||
|
var clientSettings = new
|
||||||
{
|
{
|
||||||
var mqtt = configuration.GetSection("Mqtt");
|
Id = client.GetValue<string>("Id"),
|
||||||
|
Username = client.GetValue<string>("Username"),
|
||||||
|
Password = client.GetValue<string>("Password"),
|
||||||
|
Topic = client.GetValue<string>("Topic"),
|
||||||
|
};
|
||||||
|
|
||||||
var mqttClient = mqtt.GetSection("ClientSettings");
|
var brokerHost = mqtt.GetSection("BrokerHostSettings");
|
||||||
var clientSettings = new
|
var brokerHostSettings = new
|
||||||
|
{
|
||||||
|
Host = brokerHost.GetValue<string>("Host"),
|
||||||
|
Port = brokerHost.GetValue<int>("Port"),
|
||||||
|
};
|
||||||
|
|
||||||
|
if (configuration.GetValue<bool>("UseRabbitMq"))
|
||||||
|
{
|
||||||
|
services.AddRabbitMqClientServiceWithConfig(new RabbitMqClientOptions(
|
||||||
|
Hostname: brokerHostSettings.Host,
|
||||||
|
Port: brokerHostSettings.Port,
|
||||||
|
Username: clientSettings.Username,
|
||||||
|
Password: clientSettings.Password,
|
||||||
|
Topic: clientSettings.Topic));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
services.AddMqttClientServiceWithConfig(opt =>
|
||||||
{
|
{
|
||||||
Id = mqttClient.GetValue<string>("Id"),
|
opt
|
||||||
Username = mqttClient.GetValue<string>("Username"),
|
.WithTopic(clientSettings.Topic)
|
||||||
Password = mqttClient.GetValue<string>("Password"),
|
.WithCredentials(clientSettings.Username, clientSettings.Password)
|
||||||
Topic = mqttClient.GetValue<string>("Topic"),
|
.WithClientId(clientSettings.Id)
|
||||||
};
|
.WithTcpServer(brokerHostSettings.Host, brokerHostSettings.Port);
|
||||||
|
});
|
||||||
var mqttBrokerHost = mqtt.GetSection("BrokerHostSettings");
|
}
|
||||||
var brokerHostSettings = new
|
|
||||||
{
|
|
||||||
Host = mqttBrokerHost.GetValue<string>("Host"),
|
|
||||||
Port = mqttBrokerHost.GetValue<int>("Port"),
|
|
||||||
};
|
|
||||||
|
|
||||||
opt
|
|
||||||
.WithTopic(clientSettings.Topic)
|
|
||||||
.WithCredentials(clientSettings.Username, clientSettings.Password)
|
|
||||||
.WithClientId(clientSettings.Id)
|
|
||||||
.WithTcpServer(brokerHostSettings.Host, brokerHostSettings.Port);
|
|
||||||
});
|
|
||||||
|
|
||||||
return services;
|
return services;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action<AspCoreMqttClientOptions> configureOptions)
|
private static IServiceCollection AddMqttClientServiceWithConfig(this IServiceCollection services, Action<MqttClientOptions> configureOptions)
|
||||||
{
|
{
|
||||||
services.AddSingleton(serviceProvider =>
|
services.AddSingleton(serviceProvider =>
|
||||||
{
|
{
|
||||||
var optionBuilder = new AspCoreMqttClientOptions(serviceProvider);
|
var optionBuilder = new MqttClientOptions(serviceProvider);
|
||||||
configureOptions(optionBuilder);
|
configureOptions(optionBuilder);
|
||||||
return optionBuilder.Build();
|
return optionBuilder.Build();
|
||||||
});
|
});
|
||||||
services.AddSingleton<MqttClientService>();
|
|
||||||
services.AddSingleton<IHostedService>(serviceProvider =>
|
services.AddClientServiceWithProvider<MqttClientService>();
|
||||||
{
|
return services;
|
||||||
return serviceProvider.GetService<MqttClientService>();
|
}
|
||||||
});
|
|
||||||
|
private static IServiceCollection AddRabbitMqClientServiceWithConfig(this IServiceCollection services, RabbitMqClientOptions options)
|
||||||
|
{
|
||||||
|
services.AddSingleton(options);
|
||||||
|
|
||||||
|
services.AddClientServiceWithProvider<RabbitMqClientService>();
|
||||||
|
return services;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static IServiceCollection AddClientServiceWithProvider<T>(this IServiceCollection services) where T : class, ICommunicationService
|
||||||
|
{
|
||||||
|
services.AddSingleton<T>();
|
||||||
services.AddSingleton(serviceProvider =>
|
services.AddSingleton(serviceProvider =>
|
||||||
{
|
{
|
||||||
var mqttClientService = serviceProvider.GetService<MqttClientService>();
|
return (IHostedService)serviceProvider.GetService<T>();
|
||||||
var mqttClientServiceProvider = new MqttClientServiceProvider(mqttClientService);
|
});
|
||||||
return mqttClientServiceProvider;
|
services.AddSingleton<ICommunicationServiceProvider>(serviceProvider =>
|
||||||
|
{
|
||||||
|
var clientService = serviceProvider.GetService<T>();
|
||||||
|
var clientServiceProvider = new CommunicationServiceProvider(clientService);
|
||||||
|
return clientServiceProvider;
|
||||||
});
|
});
|
||||||
return services;
|
return services;
|
||||||
}
|
}
|
||||||
|
2
MQTTnet.TestApp.WinForm/Form1.Designer.cs
generated
2
MQTTnet.TestApp.WinForm/Form1.Designer.cs
generated
@ -267,7 +267,7 @@
|
|||||||
this.trackBar1.LargeChange = 500;
|
this.trackBar1.LargeChange = 500;
|
||||||
this.trackBar1.Location = new System.Drawing.Point(180, 162);
|
this.trackBar1.Location = new System.Drawing.Point(180, 162);
|
||||||
this.trackBar1.Maximum = 5050;
|
this.trackBar1.Maximum = 5050;
|
||||||
this.trackBar1.Minimum = 50;
|
this.trackBar1.Minimum = 1;
|
||||||
this.trackBar1.Name = "trackBar1";
|
this.trackBar1.Name = "trackBar1";
|
||||||
this.trackBar1.Size = new System.Drawing.Size(247, 45);
|
this.trackBar1.Size = new System.Drawing.Size(247, 45);
|
||||||
this.trackBar1.SmallChange = 100;
|
this.trackBar1.SmallChange = 100;
|
||||||
|
@ -38,6 +38,7 @@ services:
|
|||||||
- Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices
|
- Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices
|
||||||
- Birdmap_UseDummyServices=true
|
- Birdmap_UseDummyServices=true
|
||||||
- Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/
|
- Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/
|
||||||
|
- Birdmap_UseRabbitMq=true
|
||||||
- Birdmap_Mqtt__BrokerHostSettings__Host=localhost
|
- Birdmap_Mqtt__BrokerHostSettings__Host=localhost
|
||||||
- Birdmap_Mqtt__BrokerHostSettings__Port=1883
|
- Birdmap_Mqtt__BrokerHostSettings__Port=1883
|
||||||
- Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client
|
- Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client
|
||||||
|
BIN
docs/thesis.pdf
Normal file
BIN
docs/thesis.pdf
Normal file
Binary file not shown.
Loading…
Reference in New Issue
Block a user