Quick Implementation of MQTT Communication Using MQTTnet

1 What is MQTT?

MQTT (Message Queuing Telemetry Transport) is an instant messaging protocol developed by IBM, which could become an important part of the Internet of Things (IoT). MQTT is a messaging protocol based on a binary message publish/subscribe programming model, and has now become an OASIS standard. Due to its simplicity, it is very suitable for IoT scenarios that require low power consumption and limited network bandwidth. MQTT Official Website

2 MQTTnet

MQTTnet is a high-performance .NET open-source library based on MQTT communication, which supports both MQTT server-side and client-side. Additionally, the author maintains updates, currently supporting the latest version of .NET Core, which is also a reason for choosing MQTTnet. MQTTnet is not the most downloaded .NET MQTT open-source library on GitHub; others include MqttDotNet, nMQTT, M2MQTT, etc.

MQTTnet is a high-performance .NET library for MQTT-based communication. It provides an MQTT client and an MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.

3 Create Project and Import Library

Here we use Visual Studio 2017 to create an empty solution and add two projects, one for the server and one for the client. For the server project, choose the latest .NET Core console application template, and for the client project, choose the traditional WinForm application template. The .NET Core project template is shown in the figure below: Quick Implementation of MQTT Communication Using MQTTnet

In the solution, right-click and select “Manage NuGet Packages for Solution” – search for MQTTnet under the “Browse” tab, and install the MQTTnet library for both the server and client projects. The current latest stable version is 2.4.0. The project structure is shown in the figure below: Quick Implementation of MQTT Communication Using MQTTnet

4 Server

The MQTT server is mainly used to maintain connections with multiple clients and handle the logic of client publishing and subscribing. Generally, it is rare to send messages directly from the server to the client (you can use mqttServer.Publish(appMsg); to send messages directly), in most cases, the server forwards messages from clients that match the topic, acting as an intermediary in the system.

4.1 Create and Start Server

The simplest way to create a server is to use the MqttServerFactory object’s CreateMqttServer method, which requires a MqttServerOptions parameter.

var options = new MqttServerOptions();var mqttServer = new MqttServerFactory().CreateMqttServer(options);

After creating an IMqttServer object in the above way, call its StartAsync method to start the MQTT service. It is worth noting that the previous version used the Start method, and the author also keeps up with new features of the C# language, changing all possible synchronous usages to asynchronous.

await mqttServer.StartAsync();

4.2 Validate Client

In the MqttServerOptions options, you can use ConnectionValidator to validate client connections. For example, client ID ClientId, username Username, and password Password, etc.

var options = new MqttServerOptions{   ConnectionValidator = c =>{        if (c.ClientId.Length < 10)        {       return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected;        }       if (c.Username != "xxx" || c.Password != "xxx")        {           return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;        }        return MqttConnectReturnCode.ConnectionAccepted;    }};

4.3 Related Events

The server supports ClientConnected, ClientDisconnected, and ApplicationMessageReceived events, used to check client connections, client disconnections, and receiving messages sent by clients, respectively.

The event parameters of ClientConnected and ClientDisconnected events include a client connection object ConnectedMqttClient, through which you can obtain the client ID ClientId and MQTT version ProtocolVersion.

The event parameters of ApplicationMessageReceived contain the client ID ClientId and MQTT application message MqttApplicationMessage object, through which you can obtain topic Topic, QoS QualityOfServiceLevel, and message content Payload, etc.

5 Client

MQTT is different from HTTP, the latter is based on a request/response model, where the server cannot directly send data to the client. MQTT is based on a publish/subscribe model, where all clients maintain a connection with the server.

So how do clients communicate with each other?

The specific logic is: some clients subscribe to messages of interest (topics) from the server, while others publish (topics) messages to the server. The server matches the subscribed and published topics and forwards the messages to the clients that match.

5.1 Create Client and Connect

Creating an MQTT client using MQTTnet is also very simple, just use the MqttClientFactory object’s CreateMqttClient method.

var mqttClient = new MqttClientFactory().CreateMqttClient();

After creating the client object, call its asynchronous method ConnectAsync to connect to the server.

await mqttClient.ConnectAsync(options);

When calling this method, you need to pass a MqttClientTcpOptions object (previous versions used this option when creating the object), which contains the client ID ClientId, server address (can use IP address or domain name) Server, port number Port, username UserName, password Password, etc.

var options = new MqttClientTcpOptions{ Server = "127.0.0.1", ClientId = "c001", UserName = "u001", Password = "p001", CleanSession = true};

5.2 Related Events

The client supports Connected, Disconnected, and ApplicationMessageReceived events to handle client-server connections, client disconnections from the server, and messages received by the client.

5.3 Subscribe to Messages

After the client connects to the server, it can use the SubscribeAsync asynchronous method to subscribe to messages. This method can take an enumerable or variable parameter topic filter TopicFilter parameter, which includes topic names and QoS levels.

mqttClient.SubscribeAsync(new List<TopicFilter> {    new TopicFilter("家/客厅/空调/#", MqttQualityOfServiceLevel.AtMostOnce)});

5.4 Publish Messages

Before publishing a message, you need to create a message object MqttApplicationMessage. The most direct way is to use its constructor, passing in the topic, content, QoS, and other parameters.

var appMsg = new MqttApplicationMessage("家/客厅/空调/开关", Encoding.UTF8.GetBytes("消息内容"), MqttQualityOfServiceLevel.AtMostOnce, false);

After obtaining the MqttApplicationMessage message object, call its PublishAsync asynchronous method through the client object to publish the message.

mqttClient.PublishAsync(appMsg);

6 Trace Messages

MQTTnet provides a static class MqttNetTrace for tracing messages, which can be used for both server and client. The MqttNetTrace event TraceMessagePublished is used to trace log messages from server and client applications, such as startup, stop, heartbeat, message subscription, and publishing. The event parameter MqttNetTraceMessagePublishedEventArgs includes thread ID ThreadId, source Source, log level Level, log message Message, exception information Exception, etc.

MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished;private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e){    Console.WriteLine($">> Thread ID: {e.ThreadId} Source: {e.Source} Trace Level: {e.Level} Message: {e.Message}");   if (e.Exception != null)    {        Console.WriteLine(e.Exception);    }}

Meanwhile, the MqttNetTrace class also provides four static methods for different message levels: Verbose, Information, Warning, and Error, used to provide log messages of different levels. These messages will be output in the TraceMessagePublished event, and you can use e.Level for filtering.

7 Running Effect

Below are the running effects of the server, client 1, and client 2, where client 1 and client 2 are just two instances of the same project. Client 1 is used to subscribe to the “temperature” data from the sensor and simulate an upper computer (such as an app) sending switch control commands; client 2 subscribes to the control commands sent from the upper computer and simulates the temperature sensor reporting temperature data.

7.1 Server

Quick Implementation of MQTT Communication Using MQTTnet

7.2 Client 1

Quick Implementation of MQTT Communication Using MQTTnet

7.3 Client 2

Quick Implementation of MQTT Communication Using MQTTnet

8 Demo Code

8.1 Server Code

using MQTTnet;using MQTTnet.Core.Adapter;using MQTTnet.Core.Diagnostics;using MQTTnet.Core.Protocol;using MQTTnet.Core.Server;using System;using System.Text;using System.Threading;namespace MqttServerTest{  class Program    {        private static MqttServer mqttServer = null;     static void Main(string[] args)        {            MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished;          new Thread(StartMqttServer).Start();         while (true)            {              var inputString = Console.ReadLine().ToLower().Trim();             if (inputString == "exit")                {                    mqttServer?.StopAsync();                    Console.WriteLine("MQTT service has been stopped!");                    break;                }               else if (inputString == "clients")      {                  foreach (var item in mqttServer.GetConnectedClients())                    {                        Console.WriteLine($"Client ID: {item.ClientId}, Protocol Version: {item.ProtocolVersion}");                    }                }          else                {                    Console.WriteLine($"Command[{inputString}] is invalid!");                }            }        }     private static void StartMqttServer()        {            if (mqttServer == null)            {           try                {                  var options = new MqttServerOptions                    {                        ConnectionValidator = p =>                        {                          if (p.ClientId == "c001")                            {                       if (p.Username != "u001" || p.Password != "p001")                                {                                    return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword;                                }                            }                  return MqttConnectReturnCode.ConnectionAccepted;                        }                    };                    mqttServer = new MqttServerFactory().CreateMqttServer(options) as MqttServer;                    mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived;                    mqttServer.ClientConnected += MqttServer_ClientConnected;                    mqttServer.ClientDisconnected += MqttServer_ClientDisconnected;                }                catch (Exception ex)                {                    Console.WriteLine(ex.Message);                    return;                }            }            mqttServer.StartAsync();            Console.WriteLine("MQTT service started successfully!");        }     private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e)        {            Console.WriteLine($"Client[{e.Client.ClientId}] connected, Protocol Version: {e.Client.ProtocolVersion}");        }         private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e)        {            Console.WriteLine($"Client[{e.Client.ClientId}] disconnected!");        }        private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)        {            Console.WriteLine($"Client[{e.ClientId}]>> Topic: {e.ApplicationMessage.Topic} Payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos: {e.ApplicationMessage.QualityOfServiceLevel} Retain: {e.ApplicationMessage.Retain}");        }       private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e)        {            /*Console.WriteLine($">> Thread ID: {e.ThreadId} Source: {e.Source} Trace Level: {e.Level} Message: {e.Message}");            if (e.Exception != null)            {                Console.WriteLine(e.Exception);            }*/        }    }}

8.2 Client Code

using MQTTnet;using MQTTnet.Core;using MQTTnet.Core.Client;using MQTTnet.Core.Packets;using MQTTnet.Core.Protocol;using System;using System.Collections.Generic;using System.Text;using System.Threading.Tasks;using System.Windows.Forms;namespace MqttClientWin{   public partial class FmMqttClient : Form    {       private MqttClient mqttClient = null;     public FmMqttClient()        {            InitializeComponent();            Task.Run(async () => { await ConnectMqttServerAsync(); });        }       private async Task ConnectMqttServerAsync()        {            if (mqttClient == null)            {                mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient;                mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived;                mqttClient.Connected += MqttClient_Connected;                mqttClient.Disconnected += MqttClient_Disconnected;            }          try            {             var options = new MqttClientTcpOptions                {                    Server = "127.0.0.1",                    ClientId = Guid.NewGuid().ToString().Substring(0, 5),                    UserName = "u001",                    Password = "p001",                    CleanSession = true                };          await mqttClient.ConnectAsync(options);            }           catch (Exception ex)            {                Invoke((new Action(() =>                {                    txtReceiveMessage.AppendText($"Failed to connect to MQTT server!" + Environment.NewLine + ex.Message + Environment.NewLine);                })));            }        }       private void MqttClient_Connected(object sender, EventArgs e)        {            Invoke((new Action(() =>            {                txtReceiveMessage.AppendText("Connected to MQTT server!" + Environment.NewLine);            })));        }       private void MqttClient_Disconnected(object sender, EventArgs e)        {            Invoke((new Action(() =>            {                txtReceiveMessage.AppendText("Disconnected from MQTT connection!" + Environment.NewLine);            })));        }        private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e)        {            Invoke((new Action(() =>            {                txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}");            })));        }         private void BtnSubscribe_ClickAsync(object sender, EventArgs e)        {            string topic = txtSubTopic.Text.Trim();            if (string.IsNullOrEmpty(topic))            {                MessageBox.Show("Subscription topic cannot be empty!");                return;            }            if (!mqttClient.IsConnected)            {                MessageBox.Show("MQTT client is not connected yet!");                return;            }            mqttClient.SubscribeAsync(new List<TopicFilter> {                new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)});            txtReceiveMessage.AppendText($"Subscribed to [{topic}] topic" + Environment.NewLine);            txtSubTopic.Enabled = false;            btnSubscribe.Enabled = false;        }        private void BtnPublish_Click(object sender, EventArgs e)        {            string topic = txtPubTopic.Text.Trim();            if (string.IsNullOrEmpty(topic))            {                MessageBox.Show("Publishing topic cannot be empty!");                return;            }            string inputString = txtSendMessage.Text.Trim();            var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false);            mqttClient.PublishAsync(appMsg);        }    }}

9 References

  • Quickly Build MQTT Server (MQTTnet and Apache Apollo)

  • MQTTnet

  • “mqtt” – Translatable Network

  • MQTT Essentials

Original text: http://www.cnblogs.com/kuige/articles/7724786.html

.NET Community News, In-depth Articles, Welcome to visit the WeChat public account article summary http://www.csharpkit.com 

Leave a Comment