1 Commits

Author SHA1 Message Date
0085b95198 Merge pull request 'Adding RabbitMq support' (#1) from feature/rabbit-mq into master
Some checks failed
continuous-integration/drone/push Build is failing
Reviewed-on: #1
2021-01-17 14:51:29 +01:00
12 changed files with 43 additions and 149 deletions

View File

@ -82,7 +82,7 @@ namespace Birdmap.API.Controllers
Service = new()
{
Id = 0,
Name = "Message Queue Service",
Name = "Mqtt Client Service",
Uri = "localhost",
},
Response = $"IsConnected: {_communicationService.IsConnected}",

View File

@ -12,7 +12,6 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.IdentityModel.Tokens;
using NSwag.Generation.Processors.Security;
using System;
using System.Text;
namespace Birdmap.API
@ -72,7 +71,7 @@ namespace Birdmap.API
{
opt.Title = "Birdmap";
opt.OperationProcessors.Add(new OperationSecurityScopeProcessor("Jwt Token"));
opt.AddSecurity("Jwt Token", Array.Empty<string>(),
opt.AddSecurity("Jwt Token", new string[] { },
new NSwag.OpenApiSecurityScheme
{
Type = NSwag.OpenApiSecuritySchemeType.ApiKey,
@ -96,9 +95,11 @@ namespace Birdmap.API
app.UseOpenApi();
app.UseSwaggerUi3();
app.UseHttpsRedirection();
app.UseStaticFiles();
app.UseSpaStaticFiles();
app.UseAuthentication();
app.UseRouting();
app.UseAuthorization();

View File

@ -27,25 +27,10 @@
"UseRabbitMq": false,
"Mqtt": {
"BrokerHostSettings": {
"VirtualHost": "",
"Host": "",
"Port": 1883
},
"ExchangeSettings": {
"Name": "",
"Type": "",
"Durable": false,
"AutoDelete": false
},
"QueueSettings": {
"Name": "",
"Durable": false,
"Exclusive": false,
"AutoDelete": false
},
"ClientSettings": {
"Id": "ASP.NET Core client",
"Username": "",

View File

@ -1,8 +1,6 @@
using Microsoft.Extensions.Hosting;
namespace Birdmap.BLL.Interfaces
namespace Birdmap.BLL.Interfaces
{
public interface ICommunicationService : IHostedService
public interface ICommunicationService
{
public bool IsConnected { get; }
}

View File

@ -5,7 +5,8 @@ using MQTTnet.Client.Receiving;
namespace Birdmap.BLL.Interfaces
{
public interface IMqttClientService : IMqttClientConnectedHandler,
public interface IMqttClientService : IHostedService,
IMqttClientConnectedHandler,
IMqttClientDisconnectedHandler,
IMqttApplicationMessageReceivedHandler
{

View File

@ -1,11 +1,4 @@
namespace Birdmap.BLL.Options
{
public record RabbitMqClientOptions(
string Hostname, int Port, string VirtualHost,
string Username, string Password,
string ExchangeName, string ExchangeType,
bool ExchangeDurable, bool ExchangeAutoDelete,
string QueueName,
bool QueueDurable, bool QueueAutoDelete, bool QueueExclusive,
string Topic);
public record RabbitMqClientOptions(string Hostname, int Port, string Username, string Password, string Topic);
}

View File

@ -6,7 +6,6 @@ using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Timer = System.Timers.Timer;
@ -84,9 +83,5 @@ namespace Birdmap.BLL.Services.CommunationServices
}
}
}
public abstract Task StartAsync(CancellationToken cancellationToken);
public abstract Task StopAsync(CancellationToken cancellationToken);
}
}

View File

@ -81,7 +81,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
}
}
public override async Task StartAsync(CancellationToken cancellationToken)
public async Task StartAsync(CancellationToken cancellationToken)
{
try
{
@ -97,7 +97,7 @@ namespace Birdmap.BLL.Services.CommunicationServices.Mqtt
}
}
public override async Task StopAsync(CancellationToken cancellationToken)
public async Task StopAsync(CancellationToken cancellationToken)
{
try
{

View File

@ -7,25 +7,21 @@ using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
{
internal class RabbitMqClientService : CommunicationServiceBase
internal class RabbitMqClientService : CommunicationServiceBase, IDisposable
{
private IConnection _connection;
private IModel _channel;
private readonly IConnectionFactory _factory;
private readonly RabbitMqClientOptions _options;
private readonly IConnection _connection;
private readonly IModel _channel;
public override bool IsConnected => _connection.IsOpen;
public override bool IsConnected => throw new NotImplementedException();
public RabbitMqClientService(RabbitMqClientOptions options, ILogger<RabbitMqClientService> logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
: base(logger, inputService, hubContext)
{
_options = options;
_factory = new ConnectionFactory()
var factory = new ConnectionFactory()
{
HostName = options.Hostname,
Port = options.Port,
@ -34,6 +30,25 @@ namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
AutomaticRecoveryEnabled = true,
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var queueName = _channel.QueueDeclare().QueueName;
_channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: options.Topic);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += OnRecieved;
_channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
StartMessageTimer();
}
private Task OnRecieved(object sender, BasicDeliverEventArgs eventArgs)
@ -47,68 +62,11 @@ namespace Birdmap.BLL.Services.CommunationServices.RabbitMq
return ProcessJsonMessageAsync(body);
}
public override async Task StartAsync(CancellationToken cancellationToken)
public void Dispose()
{
try
{
Connect();
}
catch (Exception ex)
{
_logger.LogError(ex, $"Cannot connect. Reconnecting...");
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
cancellationToken.ThrowIfCancellationRequested();
await StartAsync(cancellationToken);
}
}
public override Task StopAsync(CancellationToken cancellationToken)
{
try
{
StopMessageTimer();
_channel?.Close();
_connection?.Close();
return Task.CompletedTask;
}
catch (Exception ex)
{
_logger.LogError(ex, $"Cannot disconnect...");
return Task.FromException(ex);
}
}
private void Connect()
{
_connection = _factory.CreateConnection();
_channel = _connection.CreateModel();
_channel.ExchangeDeclare(
exchange: _options.ExchangeName,
type: _options.ExchangeType,
durable: _options.ExchangeDurable,
autoDelete: _options.ExchangeAutoDelete);
_channel.QueueDeclare(
queue: _options.QueueName,
durable: _options.QueueDurable,
exclusive: _options.QueueExclusive,
autoDelete: _options.QueueAutoDelete);
_channel.QueueBind(queue: _options.QueueName,
exchange: _options.ExchangeName,
routingKey: _options.Topic);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.Received += OnRecieved;
_channel.BasicConsume(queue: _options.QueueName,
autoAck: true,
consumer: consumer);
StartMessageTimer();
StopMessageTimer();
_channel.Close();
_connection.Close();
}
}
}

View File

@ -61,25 +61,6 @@ namespace Birdmap.BLL
{
Host = brokerHost.GetValue<string>("Host"),
Port = brokerHost.GetValue<int>("Port"),
VirtualHost = brokerHost.GetValue<string>("VirtualHost"),
};
var exchange = mqtt.GetSection("ExchangeSettings");
var exchangeSettings = new
{
Name = exchange.GetValue<string>("Name"),
Type = exchange.GetValue<string>("Type"),
Durable = exchange.GetValue<bool>("Durable"),
AutoDelete = exchange.GetValue<bool>("AutoDelete"),
};
var queue = mqtt.GetSection("QueueSettings");
var queueSettings = new
{
Name = queue.GetValue<string>("Name"),
Durable = exchange.GetValue<bool>("Durable"),
Exclusive = exchange.GetValue<bool>("Exclusive"),
AutoDelete = exchange.GetValue<bool>("AutoDelete"),
};
if (configuration.GetValue<bool>("UseRabbitMq"))
@ -87,17 +68,8 @@ namespace Birdmap.BLL
services.AddRabbitMqClientServiceWithConfig(new RabbitMqClientOptions(
Hostname: brokerHostSettings.Host,
Port: brokerHostSettings.Port,
VirtualHost: brokerHostSettings.VirtualHost,
Username: clientSettings.Username,
Password: clientSettings.Password,
ExchangeName: exchangeSettings.Name,
ExchangeType: exchangeSettings.Type,
ExchangeDurable: exchangeSettings.Durable,
ExchangeAutoDelete: exchangeSettings.AutoDelete,
QueueName: queueSettings.Name,
QueueDurable: queueSettings.Durable,
QueueExclusive: queueSettings.Exclusive,
QueueAutoDelete: queueSettings.AutoDelete,
Topic: clientSettings.Topic));
}
else
@ -139,9 +111,9 @@ namespace Birdmap.BLL
private static IServiceCollection AddClientServiceWithProvider<T>(this IServiceCollection services) where T : class, ICommunicationService
{
services.AddSingleton<T>();
services.AddSingleton<IHostedService>(serviceProvider =>
services.AddSingleton(serviceProvider =>
{
return serviceProvider.GetService<T>();
return (IHostedService)serviceProvider.GetService<T>();
});
services.AddSingleton<ICommunicationServiceProvider>(serviceProvider =>
{

View File

@ -17,7 +17,7 @@ services:
- ${APPDATA}/ASP.NET/Https:/root/.aspnet/https:ro
build:
context: .
dockerfile: Dockerfile
dockerfile: Birdmap.API/Dockerfile
depends_on:
- db
environment:
@ -38,18 +38,9 @@ services:
- Birdmap_Defaults__Services__KMLabz-Service=https://birb.k8s.kmlabz.com/devices
- Birdmap_UseDummyServices=true
- Birdmap_ServicesBaseUrl=https://birb.k8s.kmlabz.com/
- Birdmap_UseRabbitMq=false
- Birdmap_UseRabbitMq=true
- Birdmap_Mqtt__BrokerHostSettings__Host=localhost
- Birdmap_Mqtt__BrokerHostSettings__Port=1883
- Birdmap_Mqtt__BrokerHostSettings__VirtualHost=/
- Birdmap_Mqtt__ExchangeSettings__Name=birbmapexchange
- Birdmap_Mqtt__ExchangeSettings__Type=fanout
- Birdmap_Mqtt__ExchangeSettings__Durable=true
- Birdmap_Mqtt__ExchangeSettings__AutoDelete=true
- Birdmap_Mqtt__QueueSettings__Name=birbmapqueue
- Birdmap_Mqtt__QueueSettings__Durable=true
- Birdmap_Mqtt__QueueSettings__Exclusive=true
- Birdmap_Mqtt__QueueSettings__AutoDelete=true
- Birdmap_Mqtt__ClientSettings__Id=ASP.NET Core client
- Birdmap_Mqtt__ClientSettings__Username=username
- Birdmap_Mqtt__ClientSettings__Password=password