From 9d55c39e33481bb211a5579a0d1c26c3e28ac048 Mon Sep 17 00:00:00 2001 From: kunkliricsi Date: Sun, 22 Nov 2020 10:39:51 +0100 Subject: [PATCH] Added Mqtt messages buffer to help unload frontend --- Birdmap.API/ClientApp/src/common/Constants.js | 2 +- .../src/components/heatmap/Heatmap.jsx | 9 ++++- .../src/contexts/DevicesContextProvider.js | 14 +++++--- .../Services/Hubs/IDevicesHubClient.cs | 5 ++- .../Services/Mqtt/MqttClientService.cs | 34 +++++++++++++++++-- 5 files changed, 54 insertions(+), 10 deletions(-) diff --git a/Birdmap.API/ClientApp/src/common/Constants.js b/Birdmap.API/ClientApp/src/common/Constants.js index 6e02a55..a81961a 100644 --- a/Birdmap.API/ClientApp/src/common/Constants.js +++ b/Birdmap.API/ClientApp/src/common/Constants.js @@ -1,5 +1,5 @@ export default { - probability_method_name: 'NotifyDeviceAsync', + probability_method_name: 'NotifyMessagesAsync', update_method_name: 'NotifyDeviceUpdatedAsync', update_all_method_name: 'NotifyAllUpdatedAsync', }; \ No newline at end of file diff --git a/Birdmap.API/ClientApp/src/components/heatmap/Heatmap.jsx b/Birdmap.API/ClientApp/src/components/heatmap/Heatmap.jsx index 11a6cca..61f83e7 100644 --- a/Birdmap.API/ClientApp/src/components/heatmap/Heatmap.jsx +++ b/Birdmap.API/ClientApp/src/components/heatmap/Heatmap.jsx @@ -20,11 +20,18 @@ export default class MapContainer extends Component { }; this.probabilityHandler = this.probabilityHandler.bind(this); + this.handlePoint = this.handlePoint.bind(this); } static contextType = DevicesContext; - probabilityHandler(point) { + probabilityHandler(points) { + for (var point of points) { + this.handlePoint(point); + } + } + + handlePoint(point) { if (point.prob > 0.5) { this.setState({ diff --git a/Birdmap.API/ClientApp/src/contexts/DevicesContextProvider.js b/Birdmap.API/ClientApp/src/contexts/DevicesContextProvider.js index 7ce11c8..b7bb375 100644 --- a/Birdmap.API/ClientApp/src/contexts/DevicesContextProvider.js +++ b/Birdmap.API/ClientApp/src/contexts/DevicesContextProvider.js @@ -99,15 +99,19 @@ export default class DevicesContextProvider extends Component { .then(_ => { console.log('Devices hub Connected!'); - newConnection.on(C.probability_method_name, (id, date, prob) => { + newConnection.on(C.probability_method_name, (messages) => { //console.log(method_name + " recieved: [id: " + id + ", date: " + date + ", prob: " + prob + "]"); - var device = this.state.devices.filter(function (x) { return x.id === id })[0] - var newPoint = { deviceId: device.id, lat: device.coordinates.latitude, lng: device.coordinates.longitude, prob: prob, date: new Date(date) }; + const newPoints = []; + for (var message of messages) { + var device = this.state.devices.filter(function (x) { return x.id === message.deviceId })[0] + var newPoint = { deviceId: device.id, lat: device.coordinates.latitude, lng: device.coordinates.longitude, prob: message.probability, date: new Date(message.date) }; + newPoints.push(newPoint); + } this.setState({ - heatmapPoints: [...this.state.heatmapPoints, newPoint] + heatmapPoints: this.state.heatmapPoints.concat(newPoints) }); - this.invokeHandlers(C.probability_method_name, newPoint); + this.invokeHandlers(C.probability_method_name, newPoints); }); newConnection.on(C.update_all_method_name, () => { diff --git a/Birdmap.API/Services/Hubs/IDevicesHubClient.cs b/Birdmap.API/Services/Hubs/IDevicesHubClient.cs index af2650c..aa4fef1 100644 --- a/Birdmap.API/Services/Hubs/IDevicesHubClient.cs +++ b/Birdmap.API/Services/Hubs/IDevicesHubClient.cs @@ -1,11 +1,14 @@ using System; +using System.Collections.Generic; using System.Threading.Tasks; namespace Birdmap.API.Services { + public record Message(Guid DeviceId, DateTime Date, double Probability); + public interface IDevicesHubClient { - Task NotifyDeviceAsync(Guid deviceId, DateTime date, double probability); + Task NotifyMessagesAsync(IEnumerable messages); Task NotifyDeviceUpdatedAsync(Guid deviceId); Task NotifyAllUpdatedAsync(); } diff --git a/Birdmap.API/Services/Mqtt/MqttClientService.cs b/Birdmap.API/Services/Mqtt/MqttClientService.cs index 2ed4abf..8ab03b0 100644 --- a/Birdmap.API/Services/Mqtt/MqttClientService.cs +++ b/Birdmap.API/Services/Mqtt/MqttClientService.cs @@ -9,9 +9,11 @@ using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using Newtonsoft.Json; using System; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; +using Timer = System.Timers.Timer; namespace Birdmap.API.Services.Mqtt { @@ -22,6 +24,9 @@ namespace Birdmap.API.Services.Mqtt private readonly ILogger _logger; private readonly IInputService _inputService; private readonly IHubContext _hubContext; + private readonly Timer _hubTimer; + private readonly List _messages = new(); + private readonly object _messageLock = new(); public bool IsConnected => _mqttClient.IsConnected; @@ -31,10 +36,30 @@ namespace Birdmap.API.Services.Mqtt _logger = logger; _inputService = inputService; _hubContext = hubContext; + _hubTimer = new Timer() + { + AutoReset = true, + Interval = 1000, + }; + _hubTimer.Elapsed += SendMqttMessagesWithSignalR; + _mqttClient = new MqttFactory().CreateMqttClient(); ConfigureMqttClient(); } + private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e) + { + lock (_messageLock) + { + if (_messages.Any()) + { + _logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}"); + _hubContext.Clients.All.NotifyMessagesAsync(_messages); + _messages.Clear(); + } + } + } + private void ConfigureMqttClient() { _mqttClient.ConnectedHandler = this; @@ -55,7 +80,7 @@ namespace Birdmap.API.Services.Mqtt { var message = eventArgs.ApplicationMessage.ConvertPayloadToString(); - _logger.LogInformation($"Recieved [{eventArgs.ClientId}] " + + _logger.LogDebug($"Recieved [{eventArgs.ClientId}] " + $"Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {message} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel} | Retain: {eventArgs.ApplicationMessage.Retain}"); try @@ -63,7 +88,10 @@ namespace Birdmap.API.Services.Mqtt var payload = JsonConvert.DeserializeObject(message); var inputResponse = await _inputService.GetInputAsync(payload.TagID); - await _hubContext.Clients.All.NotifyDeviceAsync(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability); + lock (_messageLock) + { + _messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability)); + } } catch (Exception ex) { @@ -79,6 +107,7 @@ namespace Birdmap.API.Services.Mqtt _logger.LogInformation($"Connected. Auth result: {eventArgs.AuthenticateResult}. Subscribing to topic: {topic}"); await _mqttClient.SubscribeAsync(topic); + _hubTimer.Start(); } catch (Exception ex) { @@ -94,6 +123,7 @@ namespace Birdmap.API.Services.Mqtt try { + _hubTimer.Stop(); await _mqttClient.ConnectAsync(_options, CancellationToken.None); } catch (Exception ex)