1 What is MQTT?
MQTT (Message Queuing Telemetry Transport) is an instant messaging protocol developed by IBM, which could become an important part of the Internet of Things (IoT). MQTT is a messaging protocol based on a binary message publish/subscribe programming model, and has now become an OASIS standard. Due to its simplicity, it is very suitable for IoT scenarios that require low power consumption and limited network bandwidth. MQTT Official Website
2 MQTTnet
MQTTnet is a high-performance .NET open-source library based on MQTT communication, which supports both MQTT server-side and client-side. Additionally, the author maintains updates, currently supporting the latest version of .NET Core, which is also a reason for choosing MQTTnet. MQTTnet is not the most downloaded .NET MQTT open-source library on GitHub; others include MqttDotNet, nMQTT, M2MQTT, etc.
MQTTnet is a high-performance .NET library for MQTT-based communication. It provides an MQTT client and an MQTT server (broker). The implementation is based on the documentation from http://mqtt.org/.
3 Create Project and Import Library
Here we use Visual Studio 2017 to create an empty solution and add two projects, one for the server and one for the client. For the server project, choose the latest .NET Core console application template, and for the client project, choose the traditional WinForm application template. The .NET Core project template is shown in the figure below:
In the solution, right-click and select “Manage NuGet Packages for Solution” – search for MQTTnet under the “Browse” tab, and install the MQTTnet library for both the server and client projects. The current latest stable version is 2.4.0. The project structure is shown in the figure below:
4 Server
The MQTT server is mainly used to maintain connections with multiple clients and handle the logic of client publishing and subscribing. Generally, it is rare to send messages directly from the server to the client (you can use mqttServer.Publish(appMsg);
to send messages directly), in most cases, the server forwards messages from clients that match the topic, acting as an intermediary in the system.
4.1 Create and Start Server
The simplest way to create a server is to use the MqttServerFactory
object’s CreateMqttServer
method, which requires a MqttServerOptions
parameter.
var options = new MqttServerOptions();var mqttServer = new MqttServerFactory().CreateMqttServer(options);
After creating an IMqttServer
object in the above way, call its StartAsync
method to start the MQTT service. It is worth noting that the previous version used the Start
method, and the author also keeps up with new features of the C# language, changing all possible synchronous usages to asynchronous.
await mqttServer.StartAsync();
4.2 Validate Client
In the MqttServerOptions
options, you can use ConnectionValidator
to validate client connections. For example, client ID ClientId
, username Username
, and password Password
, etc.
var options = new MqttServerOptions{ ConnectionValidator = c =>{ if (c.ClientId.Length < 10) { return MqttConnectReturnCode.ConnectionRefusedIdentifierRejected; } if (c.Username != "xxx" || c.Password != "xxx") { return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; } return MqttConnectReturnCode.ConnectionAccepted; }};
4.3 Related Events
The server supports ClientConnected
, ClientDisconnected
, and ApplicationMessageReceived
events, used to check client connections, client disconnections, and receiving messages sent by clients, respectively.
The event parameters of ClientConnected
and ClientDisconnected
events include a client connection object ConnectedMqttClient
, through which you can obtain the client ID ClientId
and MQTT version ProtocolVersion
.
The event parameters of ApplicationMessageReceived
contain the client ID ClientId
and MQTT application message MqttApplicationMessage
object, through which you can obtain topic Topic
, QoS QualityOfServiceLevel
, and message content Payload
, etc.
5 Client
MQTT is different from HTTP, the latter is based on a request/response model, where the server cannot directly send data to the client. MQTT is based on a publish/subscribe model, where all clients maintain a connection with the server.
So how do clients communicate with each other?
The specific logic is: some clients subscribe to messages of interest (topics) from the server, while others publish (topics) messages to the server. The server matches the subscribed and published topics and forwards the messages to the clients that match.
5.1 Create Client and Connect
Creating an MQTT client using MQTTnet is also very simple, just use the MqttClientFactory
object’s CreateMqttClient
method.
var mqttClient = new MqttClientFactory().CreateMqttClient();
After creating the client object, call its asynchronous method ConnectAsync
to connect to the server.
await mqttClient.ConnectAsync(options);
When calling this method, you need to pass a MqttClientTcpOptions
object (previous versions used this option when creating the object), which contains the client ID ClientId
, server address (can use IP address or domain name) Server
, port number Port
, username UserName
, password Password
, etc.
var options = new MqttClientTcpOptions{ Server = "127.0.0.1", ClientId = "c001", UserName = "u001", Password = "p001", CleanSession = true};
5.2 Related Events
The client supports Connected
, Disconnected
, and ApplicationMessageReceived
events to handle client-server connections, client disconnections from the server, and messages received by the client.
5.3 Subscribe to Messages
After the client connects to the server, it can use the SubscribeAsync
asynchronous method to subscribe to messages. This method can take an enumerable or variable parameter topic filter TopicFilter
parameter, which includes topic names and QoS levels.
mqttClient.SubscribeAsync(new List<TopicFilter> { new TopicFilter("家/客厅/空调/#", MqttQualityOfServiceLevel.AtMostOnce)});
5.4 Publish Messages
Before publishing a message, you need to create a message object MqttApplicationMessage
. The most direct way is to use its constructor, passing in the topic, content, QoS, and other parameters.
var appMsg = new MqttApplicationMessage("家/客厅/空调/开关", Encoding.UTF8.GetBytes("消息内容"), MqttQualityOfServiceLevel.AtMostOnce, false);
After obtaining the MqttApplicationMessage
message object, call its PublishAsync
asynchronous method through the client object to publish the message.
mqttClient.PublishAsync(appMsg);
6 Trace Messages
MQTTnet
provides a static class MqttNetTrace
for tracing messages, which can be used for both server and client. The MqttNetTrace
event TraceMessagePublished
is used to trace log messages from server and client applications, such as startup, stop, heartbeat, message subscription, and publishing. The event parameter MqttNetTraceMessagePublishedEventArgs
includes thread ID ThreadId
, source Source
, log level Level
, log message Message
, exception information Exception
, etc.
MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished;private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e){ Console.WriteLine($">> Thread ID: {e.ThreadId} Source: {e.Source} Trace Level: {e.Level} Message: {e.Message}"); if (e.Exception != null) { Console.WriteLine(e.Exception); }}
Meanwhile, the MqttNetTrace
class also provides four static methods for different message levels: Verbose
, Information
, Warning
, and Error
, used to provide log messages of different levels. These messages will be output in the TraceMessagePublished
event, and you can use e.Level
for filtering.
7 Running Effect
Below are the running effects of the server, client 1, and client 2, where client 1 and client 2 are just two instances of the same project. Client 1 is used to subscribe to the “temperature” data from the sensor and simulate an upper computer (such as an app) sending switch control commands; client 2 subscribes to the control commands sent from the upper computer and simulates the temperature sensor reporting temperature data.
7.1 Server
7.2 Client 1
7.3 Client 2
8 Demo Code
8.1 Server Code
using MQTTnet;using MQTTnet.Core.Adapter;using MQTTnet.Core.Diagnostics;using MQTTnet.Core.Protocol;using MQTTnet.Core.Server;using System;using System.Text;using System.Threading;namespace MqttServerTest{ class Program { private static MqttServer mqttServer = null; static void Main(string[] args) { MqttNetTrace.TraceMessagePublished += MqttNetTrace_TraceMessagePublished; new Thread(StartMqttServer).Start(); while (true) { var inputString = Console.ReadLine().ToLower().Trim(); if (inputString == "exit") { mqttServer?.StopAsync(); Console.WriteLine("MQTT service has been stopped!"); break; } else if (inputString == "clients") { foreach (var item in mqttServer.GetConnectedClients()) { Console.WriteLine($"Client ID: {item.ClientId}, Protocol Version: {item.ProtocolVersion}"); } } else { Console.WriteLine($"Command[{inputString}] is invalid!"); } } } private static void StartMqttServer() { if (mqttServer == null) { try { var options = new MqttServerOptions { ConnectionValidator = p => { if (p.ClientId == "c001") { if (p.Username != "u001" || p.Password != "p001") { return MqttConnectReturnCode.ConnectionRefusedBadUsernameOrPassword; } } return MqttConnectReturnCode.ConnectionAccepted; } }; mqttServer = new MqttServerFactory().CreateMqttServer(options) as MqttServer; mqttServer.ApplicationMessageReceived += MqttServer_ApplicationMessageReceived; mqttServer.ClientConnected += MqttServer_ClientConnected; mqttServer.ClientDisconnected += MqttServer_ClientDisconnected; } catch (Exception ex) { Console.WriteLine(ex.Message); return; } } mqttServer.StartAsync(); Console.WriteLine("MQTT service started successfully!"); } private static void MqttServer_ClientConnected(object sender, MqttClientConnectedEventArgs e) { Console.WriteLine($"Client[{e.Client.ClientId}] connected, Protocol Version: {e.Client.ProtocolVersion}"); } private static void MqttServer_ClientDisconnected(object sender, MqttClientDisconnectedEventArgs e) { Console.WriteLine($"Client[{e.Client.ClientId}] disconnected!"); } private static void MqttServer_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { Console.WriteLine($"Client[{e.ClientId}]>> Topic: {e.ApplicationMessage.Topic} Payload: {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)} Qos: {e.ApplicationMessage.QualityOfServiceLevel} Retain: {e.ApplicationMessage.Retain}"); } private static void MqttNetTrace_TraceMessagePublished(object sender, MqttNetTraceMessagePublishedEventArgs e) { /*Console.WriteLine($">> Thread ID: {e.ThreadId} Source: {e.Source} Trace Level: {e.Level} Message: {e.Message}"); if (e.Exception != null) { Console.WriteLine(e.Exception); }*/ } }}
8.2 Client Code
using MQTTnet;using MQTTnet.Core;using MQTTnet.Core.Client;using MQTTnet.Core.Packets;using MQTTnet.Core.Protocol;using System;using System.Collections.Generic;using System.Text;using System.Threading.Tasks;using System.Windows.Forms;namespace MqttClientWin{ public partial class FmMqttClient : Form { private MqttClient mqttClient = null; public FmMqttClient() { InitializeComponent(); Task.Run(async () => { await ConnectMqttServerAsync(); }); } private async Task ConnectMqttServerAsync() { if (mqttClient == null) { mqttClient = new MqttClientFactory().CreateMqttClient() as MqttClient; mqttClient.ApplicationMessageReceived += MqttClient_ApplicationMessageReceived; mqttClient.Connected += MqttClient_Connected; mqttClient.Disconnected += MqttClient_Disconnected; } try { var options = new MqttClientTcpOptions { Server = "127.0.0.1", ClientId = Guid.NewGuid().ToString().Substring(0, 5), UserName = "u001", Password = "p001", CleanSession = true }; await mqttClient.ConnectAsync(options); } catch (Exception ex) { Invoke((new Action(() => { txtReceiveMessage.AppendText($"Failed to connect to MQTT server!" + Environment.NewLine + ex.Message + Environment.NewLine); }))); } } private void MqttClient_Connected(object sender, EventArgs e) { Invoke((new Action(() => { txtReceiveMessage.AppendText("Connected to MQTT server!" + Environment.NewLine); }))); } private void MqttClient_Disconnected(object sender, EventArgs e) { Invoke((new Action(() => { txtReceiveMessage.AppendText("Disconnected from MQTT connection!" + Environment.NewLine); }))); } private void MqttClient_ApplicationMessageReceived(object sender, MqttApplicationMessageReceivedEventArgs e) { Invoke((new Action(() => { txtReceiveMessage.AppendText($">> {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}{Environment.NewLine}"); }))); } private void BtnSubscribe_ClickAsync(object sender, EventArgs e) { string topic = txtSubTopic.Text.Trim(); if (string.IsNullOrEmpty(topic)) { MessageBox.Show("Subscription topic cannot be empty!"); return; } if (!mqttClient.IsConnected) { MessageBox.Show("MQTT client is not connected yet!"); return; } mqttClient.SubscribeAsync(new List<TopicFilter> { new TopicFilter(topic, MqttQualityOfServiceLevel.AtMostOnce)}); txtReceiveMessage.AppendText($"Subscribed to [{topic}] topic" + Environment.NewLine); txtSubTopic.Enabled = false; btnSubscribe.Enabled = false; } private void BtnPublish_Click(object sender, EventArgs e) { string topic = txtPubTopic.Text.Trim(); if (string.IsNullOrEmpty(topic)) { MessageBox.Show("Publishing topic cannot be empty!"); return; } string inputString = txtSendMessage.Text.Trim(); var appMsg = new MqttApplicationMessage(topic, Encoding.UTF8.GetBytes(inputString), MqttQualityOfServiceLevel.AtMostOnce, false); mqttClient.PublishAsync(appMsg); } }}
9 References
-
Quickly Build MQTT Server (MQTTnet and Apache Apollo)
-
MQTTnet
-
“mqtt” – Translatable Network
-
MQTT Essentials
Original text: http://www.cnblogs.com/kuige/articles/7724786.html
.NET Community News, In-depth Articles, Welcome to visit the WeChat public account article summary http://www.csharpkit.com