Compare commits

...

2 Commits

Author SHA1 Message Date
9d55c39e33 Added Mqtt messages buffer to help unload frontend 2020-11-22 10:39:51 +01:00
d75e9d378d Added further optimazation 2020-11-22 09:56:32 +01:00
7 changed files with 124 additions and 59 deletions

View File

@ -1,5 +1,5 @@
export default { export default {
probability_method_name: 'NotifyDeviceAsync', probability_method_name: 'NotifyMessagesAsync',
update_method_name: 'NotifyDeviceUpdatedAsync', update_method_name: 'NotifyDeviceUpdatedAsync',
update_all_method_name: 'NotifyAllUpdatedAsync', update_all_method_name: 'NotifyAllUpdatedAsync',
}; };

View File

@ -57,6 +57,9 @@ class Dashboard extends Component {
componentWillUnmount() { componentWillUnmount() {
this.context.removeHandler(C.update_all_method_name, this.updateSeries); this.context.removeHandler(C.update_all_method_name, this.updateSeries);
this.context.removeHandler(C.update_method_name, this.updateSeries); this.context.removeHandler(C.update_method_name, this.updateSeries);
if (this.updateTimer) {
clearTimeout(this.updateTimer);
}
} }
getItemsWithStatus(iterate, status) { getItemsWithStatus(iterate, status) {
@ -111,8 +114,8 @@ class Dashboard extends Component {
updateDynamic = () => { updateDynamic = () => {
const secondAgo = new Date(); const secondAgo = new Date();
secondAgo.setMilliseconds(0); secondAgo.setMilliseconds(0);
const minuteAgo = new Date( Date.now() - 1000 * 60 ); const minuteAgo = new Date(Date.now() - 1000 * 60);
const hourAgo = new Date( Date.now() - 1000 * 60 * 60 ); const hourAgo = new Date(Date.now() - 1000 * 60 * 60);
const minuteDevicePoints = {}; const minuteDevicePoints = {};
const hourDevicePoints = {}; const hourDevicePoints = {};
@ -125,7 +128,7 @@ class Dashboard extends Component {
barDevicePoints[d.id] = Array(3).fill(0); barDevicePoints[d.id] = Array(3).fill(0);
} }
const processMethod = (items, index) => { const processHeatmapItem = (items, index) => {
const p = items[index]; const p = items[index];
if (p.date > minuteAgo) { if (p.date > minuteAgo) {
var seconds = Math.floor((p.date.getTime() - minuteAgo.getTime()) / 1000); var seconds = Math.floor((p.date.getTime() - minuteAgo.getTime()) / 1000);
@ -142,7 +145,7 @@ class Dashboard extends Component {
hourDevicePoints[p.deviceId][minutes] = p.prob; hourDevicePoints[p.deviceId][minutes] = p.prob;
} }
} }
if (p.prob > 0.5 && p.prob <= 0.7) { if (p.prob > 0.5 && p.prob <= 0.7) {
barDevicePoints[p.deviceId][0] += 1; barDevicePoints[p.deviceId][0] += 1;
} }
@ -164,7 +167,7 @@ class Dashboard extends Component {
} }
} }
const finishMethod = () => { const onFinished = () => {
const minuteHeatmapSeries = []; const minuteHeatmapSeries = [];
var i = 0; var i = 0;
@ -172,39 +175,39 @@ class Dashboard extends Component {
minuteHeatmapSeries.push({ minuteHeatmapSeries.push({
name: "Device " + i, name: "Device " + i,
data: minuteDevicePoints[p].map((value, index) => ({ data: minuteDevicePoints[p].map((value, index) => ({
x: new Date( Date.now() - (60 - index) * 1000 ).toLocaleTimeString('hu-HU'), x: new Date(Date.now() - (60 - index) * 1000).toLocaleTimeString('hu-HU'),
y: value y: value
})), })),
}); });
i++; i++;
}; };
const hourHeatmapSeries = []; const hourHeatmapSeries = [];
var i = 0; var i = 0;
for (var p in hourDevicePoints) { for (var p in hourDevicePoints) {
hourHeatmapSeries.push({ hourHeatmapSeries.push({
name: "Device " + i, name: "Device " + i,
data: hourDevicePoints[p].map((value, index) => ({ data: hourDevicePoints[p].map((value, index) => ({
x: new Date( Date.now() - (60 - index) * 1000 * 60 ).toLocaleTimeString('hu-HU').substring(0, 5), x: new Date(Date.now() - (60 - index) * 1000 * 60).toLocaleTimeString('hu-HU').substring(0, 5),
y: value y: value
})), })),
}); });
i++; i++;
}; };
const barSeries = []; const barSeries = [];
const getCount = column => { const getCount = column => {
var counts = []; var counts = [];
for (var p in barDevicePoints) { for (var p in barDevicePoints) {
counts.unshift(barDevicePoints[p][column]); counts.unshift(barDevicePoints[p][column]);
} }
return counts; return counts;
}; };
barSeries.push({ barSeries.push({
name: "Prob > 0.5", name: "Prob > 0.5",
data: getCount(0), data: getCount(0),
@ -217,43 +220,46 @@ class Dashboard extends Component {
name: "Prob > 0.9", name: "Prob > 0.9",
data: getCount(2), data: getCount(2),
}); });
const lineSeries = [{name: "message/sec", data: []}]; const lineSeries = [{ name: "message/sec", data: [] }];
for (var m in linePoints) { for (var m in linePoints) {
lineSeries[0].data.push({ lineSeries[0].data.push({
x: new Date(m).getTime(), x: new Date(m).getTime(),
y: linePoints[m], y: linePoints[m],
}) })
} }
const getBarCategories = () => { const getBarCategories = () => {
const categories = []; const categories = [];
for (var i = this.context.devices.length - 1; i >= 0; i--) { for (var i = this.context.devices.length - 1; i >= 0; i--) {
categories.push("Device " + i) categories.push("Device " + i)
} }
return categories; return categories;
} }
this.setState({ const toUpdate = [
heatmapSecondsSeries: minuteHeatmapSeries, { heatmapSecondsSeries: minuteHeatmapSeries },
heatmapMinutesSeries: hourHeatmapSeries, { heatmapMinutesSeries: hourHeatmapSeries },
barSeries: barSeries, { barSeries: barSeries },
barCategories: getBarCategories(), { barCategories: getBarCategories() },
lineSeries: lineSeries, { lineSeries: lineSeries }
];
//Set states must be done separately otherwise ApexChart's UI update freezes the page.
this.performTask(toUpdate, 2, 300, (list, index) => {
this.setState(list[index]);
},
() => {
this.updateTimer = setTimeout(this.updateDynamic, 1000);
}); });
setTimeout(this.updateDynamic, 1000);
} }
const processHeatmapItem = processMethod.bind(this);
const onFinished = finishMethod.bind(this)
this.performTask(this.context.heatmapPoints, Math.ceil(this.context.heatmapPoints.length / 50), 20, this.performTask(this.context.heatmapPoints, Math.ceil(this.context.heatmapPoints.length / 50), 20,
processHeatmapItem, onFinished); processHeatmapItem, onFinished);
} }
performTask(items, numToProcess, wait, processItem, onFinished) { performTask(items, numToProcess, wait, processItem, onFinished) {
var pos = 0; var pos = 0;
// This is run once for every numToProcess items. // This is run once for every numToProcess items.
@ -281,36 +287,36 @@ class Dashboard extends Component {
<Box className={classes.root}> <Box className={classes.root}>
<Grid container spacing={3}> <Grid container spacing={3}>
<Grid item xs={12}> <Grid item xs={12}>
<Services isAdmin={this.props.isAdmin}/> <Services isAdmin={this.props.isAdmin} />
</Grid> </Grid>
<Grid item xs={6}> <Grid item xs={6}>
<Paper className={classes.paper}> <Paper className={classes.paper}>
<DonutChart totalLabel="Devices" series={this.state.deviceSeries}/> <DonutChart totalLabel="Devices" series={this.state.deviceSeries} />
</Paper> </Paper>
</Grid> </Grid>
<Grid item xs={6}> <Grid item xs={6}>
<Paper className={classes.paper}> <Paper className={classes.paper}>
<DonutChart totalLabel="Sensors" series={this.state.sensorSeries}/> <DonutChart totalLabel="Sensors" series={this.state.sensorSeries} />
</Paper> </Paper>
</Grid> </Grid>
<Grid item xs={12}> <Grid item xs={12}>
<Paper className={classes.paper}> <Paper className={classes.paper}>
<HeatmapChart label="Highest probability per second by devices" series={this.state.heatmapSecondsSeries}/> <HeatmapChart label="Highest probability per second by devices" series={this.state.heatmapSecondsSeries} />
</Paper> </Paper>
</Grid> </Grid>
<Grid item xs={12}> <Grid item xs={12}>
<Paper className={classes.paper}> <Paper className={classes.paper}>
<HeatmapChart label="Highest probability per minute by devices" series={this.state.heatmapMinutesSeries}/> <HeatmapChart label="Highest probability per minute by devices" series={this.state.heatmapMinutesSeries} />
</Paper> </Paper>
</Grid> </Grid>
<Grid item xs={6}> <Grid item xs={6}>
<Paper className={classes.paper}> <Paper className={classes.paper}>
<BarChart label="# of messages by devices" series={this.state.barSeries} categories={this.state.barCategories}/> <BarChart label="# of messages by devices" series={this.state.barSeries} categories={this.state.barCategories} />
</Paper> </Paper>
</Grid> </Grid>
<Grid item xs={6}> <Grid item xs={6}>
<Paper className={classes.paper}> <Paper className={classes.paper}>
<LineChart label="# of messages per second" series={this.state.lineSeries}/> <LineChart label="# of messages per second" series={this.state.lineSeries} />
</Paper> </Paper>
</Grid> </Grid>
</Grid> </Grid>

View File

@ -8,19 +8,34 @@ export class HeatmapChart extends Component {
this.state = { this.state = {
options: { options: {
dataLabels: { chart: {
enabled: false animations: {
}, enabled: true,
colors: [blueGrey[900]], easing: 'linear',
title: { speed: 250,
text: props.label, animateGradually: {
style: { enabled: false,
fontSize: '22px', speed: 250,
fontWeight: 600,
fontFamily: 'Helvetica, Arial, sans-serif',
}, },
dynamicAnimation: {
enabled: true,
speed: 250
}
}
},
dataLabels: {
enabled: false
},
colors: [blueGrey[900]],
title: {
text: props.label,
style: {
fontSize: '22px',
fontWeight: 600,
fontFamily: 'Helvetica, Arial, sans-serif',
}, },
}, },
},
} }
} }

View File

@ -20,11 +20,18 @@ export default class MapContainer extends Component {
}; };
this.probabilityHandler = this.probabilityHandler.bind(this); this.probabilityHandler = this.probabilityHandler.bind(this);
this.handlePoint = this.handlePoint.bind(this);
} }
static contextType = DevicesContext; static contextType = DevicesContext;
probabilityHandler(point) { probabilityHandler(points) {
for (var point of points) {
this.handlePoint(point);
}
}
handlePoint(point) {
if (point.prob > 0.5) { if (point.prob > 0.5) {
this.setState({ this.setState({

View File

@ -99,15 +99,19 @@ export default class DevicesContextProvider extends Component {
.then(_ => { .then(_ => {
console.log('Devices hub Connected!'); console.log('Devices hub Connected!');
newConnection.on(C.probability_method_name, (id, date, prob) => { newConnection.on(C.probability_method_name, (messages) => {
//console.log(method_name + " recieved: [id: " + id + ", date: " + date + ", prob: " + prob + "]"); //console.log(method_name + " recieved: [id: " + id + ", date: " + date + ", prob: " + prob + "]");
var device = this.state.devices.filter(function (x) { return x.id === id })[0] const newPoints = [];
var newPoint = { deviceId: device.id, lat: device.coordinates.latitude, lng: device.coordinates.longitude, prob: prob, date: new Date(date) }; for (var message of messages) {
var device = this.state.devices.filter(function (x) { return x.id === message.deviceId })[0]
var newPoint = { deviceId: device.id, lat: device.coordinates.latitude, lng: device.coordinates.longitude, prob: message.probability, date: new Date(message.date) };
newPoints.push(newPoint);
}
this.setState({ this.setState({
heatmapPoints: [...this.state.heatmapPoints, newPoint] heatmapPoints: this.state.heatmapPoints.concat(newPoints)
}); });
this.invokeHandlers(C.probability_method_name, newPoint); this.invokeHandlers(C.probability_method_name, newPoints);
}); });
newConnection.on(C.update_all_method_name, () => { newConnection.on(C.update_all_method_name, () => {

View File

@ -1,11 +1,14 @@
using System; using System;
using System.Collections.Generic;
using System.Threading.Tasks; using System.Threading.Tasks;
namespace Birdmap.API.Services namespace Birdmap.API.Services
{ {
public record Message(Guid DeviceId, DateTime Date, double Probability);
public interface IDevicesHubClient public interface IDevicesHubClient
{ {
Task NotifyDeviceAsync(Guid deviceId, DateTime date, double probability); Task NotifyMessagesAsync(IEnumerable<Message> messages);
Task NotifyDeviceUpdatedAsync(Guid deviceId); Task NotifyDeviceUpdatedAsync(Guid deviceId);
Task NotifyAllUpdatedAsync(); Task NotifyAllUpdatedAsync();
} }

View File

@ -9,9 +9,11 @@ using MQTTnet.Client.Disconnecting;
using MQTTnet.Client.Options; using MQTTnet.Client.Options;
using Newtonsoft.Json; using Newtonsoft.Json;
using System; using System;
using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Timer = System.Timers.Timer;
namespace Birdmap.API.Services.Mqtt namespace Birdmap.API.Services.Mqtt
{ {
@ -22,6 +24,9 @@ namespace Birdmap.API.Services.Mqtt
private readonly ILogger<MqttClientService> _logger; private readonly ILogger<MqttClientService> _logger;
private readonly IInputService _inputService; private readonly IInputService _inputService;
private readonly IHubContext<DevicesHub, IDevicesHubClient> _hubContext; private readonly IHubContext<DevicesHub, IDevicesHubClient> _hubContext;
private readonly Timer _hubTimer;
private readonly List<Message> _messages = new();
private readonly object _messageLock = new();
public bool IsConnected => _mqttClient.IsConnected; public bool IsConnected => _mqttClient.IsConnected;
@ -31,10 +36,30 @@ namespace Birdmap.API.Services.Mqtt
_logger = logger; _logger = logger;
_inputService = inputService; _inputService = inputService;
_hubContext = hubContext; _hubContext = hubContext;
_hubTimer = new Timer()
{
AutoReset = true,
Interval = 1000,
};
_hubTimer.Elapsed += SendMqttMessagesWithSignalR;
_mqttClient = new MqttFactory().CreateMqttClient(); _mqttClient = new MqttFactory().CreateMqttClient();
ConfigureMqttClient(); ConfigureMqttClient();
} }
private void SendMqttMessagesWithSignalR(object sender, System.Timers.ElapsedEventArgs e)
{
lock (_messageLock)
{
if (_messages.Any())
{
_logger.LogInformation($"Sending ({_messages.Count}) messages: {string.Join(" | ", _messages)}");
_hubContext.Clients.All.NotifyMessagesAsync(_messages);
_messages.Clear();
}
}
}
private void ConfigureMqttClient() private void ConfigureMqttClient()
{ {
_mqttClient.ConnectedHandler = this; _mqttClient.ConnectedHandler = this;
@ -55,7 +80,7 @@ namespace Birdmap.API.Services.Mqtt
{ {
var message = eventArgs.ApplicationMessage.ConvertPayloadToString(); var message = eventArgs.ApplicationMessage.ConvertPayloadToString();
_logger.LogInformation($"Recieved [{eventArgs.ClientId}] " + _logger.LogDebug($"Recieved [{eventArgs.ClientId}] " +
$"Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {message} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel} | Retain: {eventArgs.ApplicationMessage.Retain}"); $"Topic: {eventArgs.ApplicationMessage.Topic} | Payload: {message} | QoS: {eventArgs.ApplicationMessage.QualityOfServiceLevel} | Retain: {eventArgs.ApplicationMessage.Retain}");
try try
@ -63,7 +88,10 @@ namespace Birdmap.API.Services.Mqtt
var payload = JsonConvert.DeserializeObject<Payload>(message); var payload = JsonConvert.DeserializeObject<Payload>(message);
var inputResponse = await _inputService.GetInputAsync(payload.TagID); var inputResponse = await _inputService.GetInputAsync(payload.TagID);
await _hubContext.Clients.All.NotifyDeviceAsync(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability); lock (_messageLock)
{
_messages.Add(new Message(inputResponse.Message.Device_id, inputResponse.Message.Date.UtcDateTime, payload.Probability));
}
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -79,6 +107,7 @@ namespace Birdmap.API.Services.Mqtt
_logger.LogInformation($"Connected. Auth result: {eventArgs.AuthenticateResult}. Subscribing to topic: {topic}"); _logger.LogInformation($"Connected. Auth result: {eventArgs.AuthenticateResult}. Subscribing to topic: {topic}");
await _mqttClient.SubscribeAsync(topic); await _mqttClient.SubscribeAsync(topic);
_hubTimer.Start();
} }
catch (Exception ex) catch (Exception ex)
{ {
@ -94,6 +123,7 @@ namespace Birdmap.API.Services.Mqtt
try try
{ {
_hubTimer.Stop();
await _mqttClient.ConnectAsync(_options, CancellationToken.None); await _mqttClient.ConnectAsync(_options, CancellationToken.None);
} }
catch (Exception ex) catch (Exception ex)