birbmap/Birdmap.BLL/Services/CommunationServices/CommunicationServiceBase.cs
kunkliricsi 20a4b4d349
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing
Moved Dockerfile, fixed RabbitMq connection fail in constructor
2021-01-17 15:41:06 +01:00

93 lines
2.9 KiB
C#

using Birdmap.BLL.Interfaces;
using Birdmap.BLL.Services.CommunicationServices.Hubs;
using Microsoft.AspNetCore.SignalR;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Timer = System.Timers.Timer;
namespace Birdmap.BLL.Services.CommunationServices
{
internal class Payload
{
[JsonProperty("tag")]
public Guid TagID { get; set; }
[JsonProperty("probability")]
public double Probability { get; set; }
}
internal abstract class CommunicationServiceBase : ICommunicationService
{
protected readonly ILogger _logger;
protected readonly IInputService _inputService;
protected readonly IHubContext<DevicesHub, IDevicesHubClient> _hubContext;
private readonly Timer _hubTimer;
private readonly List<Message> _messages = new();
private readonly object _messageLock = new();
public abstract bool IsConnected { get; }
public CommunicationServiceBase(ILogger logger, IInputService inputService, IHubContext<DevicesHub, IDevicesHubClient> hubContext)
{
_logger = logger;
_inputService = inputService;
_hubContext = hubContext;
_hubTimer = new Timer()
{
AutoReset = true,
Interval = 1000,
};
_hubTimer.Elapsed += SendMqttMessagesWithSignalR;
}
protected async Task ProcessJsonMessageAsync(string json)
{
try
{
var payload = JsonConvert.DeserializeObject<Payload>(json);
var inputResponse = await _inputService.GetInputAsync(payload.TagID);
lock (_messageLock)
{
_messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability));
}
}
catch (Exception ex)
{
_logger.LogError(ex, $"Could not handle application message.");
}
}
protected void StartMessageTimer()
{
_hubTimer.Start();
}
protected void StopMessageTimer()
{
_hubTimer.Stop();
}
private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e)
{
lock (_messageLock)
{
if (_messages.Any())
{
_logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}");
_hubContext.Clients.All.NotifyMessagesAsync(_messages);
_messages.Clear();
}
}
}
public abstract Task StartAsync(CancellationToken cancellationToken);
public abstract Task StopAsync(CancellationToken cancellationToken);
}
}