// -------------------------------------------------------------------------------------------------------------------- // // Copyright (c) 2020 All rights reserved. // // // The main form. // // -------------------------------------------------------------------------------------------------------------------- namespace MQTTnet.TestApp.WinForm { using System; using System.Text; using System.Timers; using System.Windows.Forms; using MQTTnet.Client.Connecting; using MQTTnet.Client.Disconnecting; using MQTTnet.Client.Options; using MQTTnet.Client.Receiving; using MQTTnet.Extensions.ManagedClient; using MQTTnet.Formatter; using MQTTnet.Protocol; using MQTTnet.Server; using Newtonsoft.Json; using Timer = System.Timers.Timer; /// /// The main form. /// public partial class Form1 : Form { /// /// The managed publisher client. /// private IManagedMqttClient managedMqttClientPublisher; /// /// The managed subscriber client. /// private IManagedMqttClient managedMqttClientSubscriber; /// /// The MQTT server. /// private IMqttServer mqttServer; /// /// The port. /// private string port = "1883"; private readonly Random random; private readonly Timer randomPublisherTimer; /// /// Initializes a new instance of the class. /// public Form1() { this.InitializeComponent(); var timer = new Timer { AutoReset = true, Enabled = true, Interval = 1000 }; timer.Elapsed += this.TimerElapsed; random = new Random(); randomPublisherTimer = new Timer { AutoReset = true, Enabled = false, }; randomPublisherTimer.Elapsed += (_, __) => this.BeginInvoke((MethodInvoker)delegate { if (checkBox1.Checked) ButtonGeneratePublishedMessageClick(ButtonGeneratePublishedMessage, EventArgs.Empty); ButtonPublishClick(ButtonPublish, EventArgs.Empty); }); } /// /// Handles the publisher connected event. /// /// The MQTT client connected event args. private void OnPublisherConnected(MqttClientConnectedEventArgs x) { var item = "Publisher Connected"; this.BeginInvoke((MethodInvoker)delegate { this.TextBoxSubscriber.Text = item + Environment.NewLine + this.TextBoxSubscriber.Text; }); } /// /// Handles the publisher disconnected event. /// /// The MQTT client disconnected event args. private void OnPublisherDisconnected(MqttClientDisconnectedEventArgs x) { var item = "Publisher Disconnected"; this.BeginInvoke((MethodInvoker)delegate { this.TextBoxSubscriber.Text = item + Environment.NewLine + this.TextBoxSubscriber.Text; }); } /// /// Handles the subscriber connected event. /// /// The MQTT client connected event args. private void OnSubscriberConnected(MqttClientConnectedEventArgs x) { var item = "Subscriber Connected"; this.BeginInvoke((MethodInvoker)delegate { this.TextBoxSubscriber.Text = item + Environment.NewLine + this.TextBoxSubscriber.Text; }); } /// /// Handles the subscriber disconnected event. /// /// The MQTT client disconnected event args. private void OnSubscriberDisconnected(MqttClientDisconnectedEventArgs x) { var item = "Subscriber Disconnected"; this.BeginInvoke((MethodInvoker)delegate { this.TextBoxSubscriber.Text = item + Environment.NewLine + this.TextBoxSubscriber.Text; }); } /// /// The method that handles the button click to generate a message. /// /// The sender. /// The event args. private void ButtonGeneratePublishedMessageClick(object sender, EventArgs e) { var message = new { tag = Guid.NewGuid(), probability = random.NextDouble(), }; var json = JsonConvert.SerializeObject(message); this.TextBoxPublish.Text = json; } /// /// The method that handles the button click to publish a message. /// /// The sender. /// The event args. private async void ButtonPublishClick(object sender, EventArgs e) { ((Button)sender).Enabled = false; try { var payload = Encoding.UTF8.GetBytes(this.TextBoxPublish.Text); var message = new MqttApplicationMessageBuilder().WithTopic(this.TextBoxTopicPublished.Text.Trim()).WithPayload(payload).WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce).WithRetainFlag().Build(); if (this.managedMqttClientPublisher != null) { await this.managedMqttClientPublisher.PublishAsync(message); } } catch (Exception ex) { var item = ex.Message; this.BeginInvoke((MethodInvoker)delegate { this.TextBoxSubscriber.Text = item + Environment.NewLine + this.TextBoxSubscriber.Text; }); } ((Button)sender).Enabled = true; } /// /// The method that handles the button click to start the publisher. /// /// The sender. /// The event args. private async void ButtonPublisherStartClick(object sender, EventArgs e) { var mqttFactory = new MqttFactory(); var tlsOptions = new MqttClientTlsOptions { UseTls = false, IgnoreCertificateChainErrors = true, IgnoreCertificateRevocationErrors = true, AllowUntrustedCertificates = true }; var options = new MqttClientOptions { ClientId = "ClientPublisher", ProtocolVersion = MqttProtocolVersion.V311, ChannelOptions = new MqttClientTcpOptions { Server = "localhost", Port = int.Parse(this.TextBoxPort.Text.Trim()), TlsOptions = tlsOptions } }; if (options.ChannelOptions == null) { throw new InvalidOperationException(); } options.Credentials = new MqttClientCredentials { Username = "username", Password = Encoding.UTF8.GetBytes("password") }; options.CleanSession = true; options.KeepAlivePeriod = TimeSpan.FromSeconds(5); this.managedMqttClientPublisher = mqttFactory.CreateManagedMqttClient(); this.managedMqttClientPublisher.UseApplicationMessageReceivedHandler(this.HandleReceivedApplicationMessage); this.managedMqttClientPublisher.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnPublisherConnected); this.managedMqttClientPublisher.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnPublisherDisconnected); await this.managedMqttClientPublisher.StartAsync( new ManagedMqttClientOptions { ClientOptions = options }); } /// /// The method that handles the button click to stop the publisher. /// /// The sender. /// The event args. private async void ButtonPublisherStopClick(object sender, EventArgs e) { if (this.managedMqttClientPublisher == null) { return; } await this.managedMqttClientPublisher.StopAsync(); this.managedMqttClientPublisher = null; } /// /// The method that handles the button click to start the server. /// /// The sender. /// The event args. private async void ButtonServerStartClick(object sender, EventArgs e) { if (this.mqttServer != null) { return; } var storage = new JsonServerStorage(); storage.Clear(); this.mqttServer = new MqttFactory().CreateMqttServer(); var options = new MqttServerOptions(); options.DefaultEndpointOptions.Port = int.Parse(this.TextBoxPort.Text); options.Storage = storage; options.EnablePersistentSessions = true; options.ConnectionValidator = new MqttServerConnectionValidatorDelegate( c => { if (c.ClientId.Length < 10) { c.ReasonCode = MqttConnectReasonCode.ClientIdentifierNotValid; return; } if (c.Username != "username") { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } if (c.Password != "password") { c.ReasonCode = MqttConnectReasonCode.BadUserNameOrPassword; return; } c.ReasonCode = MqttConnectReasonCode.Success; }); try { await this.mqttServer.StartAsync(options); } catch (Exception ex) { var item = ex.Message; this.BeginInvoke((MethodInvoker)delegate { this.TextBoxSubscriber.Text = item + Environment.NewLine + this.TextBoxSubscriber.Text; }); await this.mqttServer.StopAsync(); this.mqttServer = null; } } /// /// The method that handles the button click to stop the server. /// /// The sender. /// The event args. private async void ButtonServerStopClick(object sender, EventArgs e) { if (this.mqttServer == null) { return; } await this.mqttServer.StopAsync(); this.mqttServer = null; } /// /// The method that handles the button click to subscribe to a certain topic. /// /// The sender. /// The event args. private async void ButtonSubscribeClick(object sender, EventArgs e) { var topicFilter = new MqttTopicFilter { Topic = this.TextBoxTopicSubscribed.Text.Trim() }; await this.managedMqttClientSubscriber.SubscribeAsync(topicFilter); var item = "Topic " + this.TextBoxTopicSubscribed.Text.Trim() + " is subscribed"; this.BeginInvoke((MethodInvoker)delegate { this.TextBoxSubscriber.Text = item + Environment.NewLine + this.TextBoxSubscriber.Text; }); } /// /// The method that handles the button click to start the subscriber. /// /// The sender. /// The event args. private async void ButtonSubscriberStartClick(object sender, EventArgs e) { var mqttFactory = new MqttFactory(); var tlsOptions = new MqttClientTlsOptions { UseTls = false, IgnoreCertificateChainErrors = true, IgnoreCertificateRevocationErrors = true, AllowUntrustedCertificates = true }; var options = new MqttClientOptions { ClientId = "ClientSubscriber", ProtocolVersion = MqttProtocolVersion.V311, ChannelOptions = new MqttClientTcpOptions { Server = "localhost", Port = int.Parse(this.TextBoxPort.Text.Trim()), TlsOptions = tlsOptions } }; if (options.ChannelOptions == null) { throw new InvalidOperationException(); } options.Credentials = new MqttClientCredentials { Username = "username", Password = Encoding.UTF8.GetBytes("password") }; options.CleanSession = true; options.KeepAlivePeriod = TimeSpan.FromSeconds(5); this.managedMqttClientSubscriber = mqttFactory.CreateManagedMqttClient(); this.managedMqttClientSubscriber.ConnectedHandler = new MqttClientConnectedHandlerDelegate(OnSubscriberConnected); this.managedMqttClientSubscriber.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(OnSubscriberDisconnected); this.managedMqttClientSubscriber.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(this.OnSubscriberMessageReceived); await this.managedMqttClientSubscriber.StartAsync( new ManagedMqttClientOptions { ClientOptions = options }); } /// /// The method that handles the button click to stop the subscriber. /// /// The sender. /// The event args. private async void ButtonSubscriberStopClick(object sender, EventArgs e) { if (this.managedMqttClientSubscriber == null) { return; } await this.managedMqttClientSubscriber.StopAsync(); this.managedMqttClientSubscriber = null; } /// /// Handles the received application message event. /// /// The MQTT application message received event args. private void HandleReceivedApplicationMessage(MqttApplicationMessageReceivedEventArgs x) { var item = $"Timestamp: {DateTime.Now:O} | Topic: {x.ApplicationMessage.Topic} | Payload: {x.ApplicationMessage.ConvertPayloadToString()} | QoS: {x.ApplicationMessage.QualityOfServiceLevel}"; this.BeginInvoke((MethodInvoker)delegate { this.TextBoxSubscriber.Text = item + Environment.NewLine + this.TextBoxSubscriber.Text; }); } /// /// Handles the received subscriber message event. /// /// The MQTT application message received event args. private void OnSubscriberMessageReceived(MqttApplicationMessageReceivedEventArgs x) { var item = $"Timestamp: {DateTime.Now:O} | Topic: {x.ApplicationMessage.Topic} | Payload: {x.ApplicationMessage.ConvertPayloadToString()} | QoS: {x.ApplicationMessage.QualityOfServiceLevel}"; this.BeginInvoke((MethodInvoker)delegate { this.TextBoxSubscriber.Text = item + Environment.NewLine + this.TextBoxSubscriber.Text; }); } /// /// The method that handles the text changes in the text box. /// /// The sender. /// The event args. private void TextBoxPortTextChanged(object sender, EventArgs e) { // ReSharper disable once StyleCop.SA1126 if (int.TryParse(this.TextBoxPort.Text, out _)) { this.port = this.TextBoxPort.Text.Trim(); } else { this.TextBoxPort.Text = this.port; this.TextBoxPort.SelectionStart = this.TextBoxPort.Text.Length; this.TextBoxPort.SelectionLength = 0; } } /// /// The method that handles the timer events. /// /// The sender. /// The event args. private void TimerElapsed(object sender, ElapsedEventArgs e) { this.BeginInvoke( (MethodInvoker)delegate { // Server this.TextBoxPort.Enabled = this.mqttServer == null; this.ButtonServerStart.Enabled = this.mqttServer == null; this.ButtonServerStop.Enabled = this.mqttServer != null; // Publisher this.ButtonPublisherStart.Enabled = this.managedMqttClientPublisher == null; this.ButtonPublisherStop.Enabled = this.managedMqttClientPublisher != null; // Auto Publisher this.ButtonAutoPublisherStart.Enabled = !this.randomPublisherTimer.Enabled; this.ButtonAutoPublisherStop.Enabled = this.randomPublisherTimer.Enabled; // Subscriber this.ButtonSubscriberStart.Enabled = this.managedMqttClientSubscriber == null; this.ButtonSubscriberStop.Enabled = this.managedMqttClientSubscriber != null; }); } private void trackBar1_Scroll(object sender, EventArgs e) { this.label8.Text = $"{this.trackBar1.Value:N0} ms"; this.randomPublisherTimer.Interval = this.trackBar1.Value; } private void ButtonAutoPublisherStartClick(object sender, EventArgs e) { ((Button)sender).Enabled = false; this.randomPublisherTimer.Start(); ((Button)sender).Enabled = true; } private void ButtonAutoPublisherStopClick(object sender, EventArgs e) { ((Button)sender).Enabled = false; this.randomPublisherTimer.Stop(); ((Button)sender).Enabled = true; } } }