The previous two articles discussed the basic decomposition process of the MQTT protocol. This chapter mainly discusses the implementation of specific messages. Since the message handling specifications are quite similar, we will introduce the Connect and Publish messages, which are relatively complex in the MQTT protocol. Understanding the protocol analysis of these two messages will make it easier to implement the entire MQTT protocol.
Reference Documents Chinese: vitsumoc.github.io/mqtt-v5-0-chinese.html English: docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html Programming Language: C# Full Code: //github.com/beetlex-io/mqtt
Connect Message Before implementation, it is important to understand the data structure of the Connect message, which is one of the most complex structures in MQTT. The message includes a variable header data set and a payload attribute set.
Description |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
|
Protocol Name |
|||||||||
byte 1 |
Length MSB (0) |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
byte 2 |
Length LSB (4) |
0 |
0 |
0 |
0 |
0 |
1 |
0 |
0 |
byte 3 |
‘M’ |
0 |
1 |
0 |
0 |
1 |
1 |
0 |
1 |
byte 4 |
‘Q’ |
0 |
1 |
0 |
1 |
0 |
0 |
0 |
1 |
byte 5 |
‘T’ |
0 |
1 |
0 |
1 |
0 |
1 |
0 |
0 |
byte 6 |
‘T’ |
0 |
1 |
0 |
1 |
0 |
1 |
0 |
0 |
Protocol Version |
|||||||||
Description |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
|
byte 7 |
Version (5) |
0 |
0 |
0 |
0 |
0 |
1 |
0 |
1 |
Connect Flags |
|||||||||
byte 8 |
User Name Flag (1) Password Flag (1) Will Retain (0) Will QoS (01) Will Flag (1) Clean Start(1) Reserved (0) |
1 |
1 |
0 |
0 |
1 |
1 |
1 |
0 |
Keep Alive |
|||||||||
byte 9 |
Keep Alive MSB (0) |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
byte 10 |
Keep Alive LSB (10) |
0 |
0 |
0 |
0 |
1 |
0 |
1 |
0 |
Properties |
|||||||||
byte 11 |
Length (5) |
0 |
0 |
0 |
0 |
0 |
1 |
0 |
1 |
byte 12 |
Session Expiry Interval identifier (17) |
0 |
0 |
0 |
1 |
0 |
0 |
0 |
1 |
byte 13 |
Session Expiry Interval (10) |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
byte 14 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
|
byte 15 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
|
byte 16 |
0 |
0 |
0 |
0 |
1 |
0 |
1 |
0 |
The corresponding payload attribute set includes the client ID and the will attributes, which contain their own attribute sets. These details are all described in the protocol documentation. With these rules, we can parse the protocol through code. During the read and write process, the order of the relevant content must follow the sequence described in the documentation (for the attribute table content described in the previous chapter, the order does not need to follow the documentation; it corresponds to the attribute encoding).
-
Read Protocol Code
ProtocolName = parse.ReadString(stream);Version = (byte)stream.ReadByte();byte flats = (byte)stream.ReadByte();Reserved = (MQTTParse.BIT_1 & flats) > 0;ClearStart = (MQTTParse.BIT_2 & flats) > 0;WillFalg = (MQTTParse.BIT_3 & flats) > 0;WillRetian = (MQTTParse.BIT_6 & flats) > 0;PasswordFlag = (MQTTParse.BIT_7 & flats) > 0;UserFlag = (MQTTParse.BIT_8 & flats) > 0;this.WillQos = (QoSType)((0b0001_1000 & flats) >> 3);KeepAlive = parse.ReadUInt16(stream);var ps = GetPropertiesStream();ps.Read(parse, stream);SessionExpiryInterval = ps;ReceiveMaximum = ps;MaximumPacketSize = ps;TopicAliasMaximum = ps;RequestResponseInformation = ps;RequestProblemInformation = ps;AuthenticationMethod = ps;AuthenticationData = ps;UserProperties = ps;ClientID = parse.ReadString(stream);if (WillFalg){ this.WillProperty.Read(this, parse, stream); WillTopic = parse.ReadString(stream); WillPayload = parse.ReadBinary(stream);}if (UserFlag) Name = parse.ReadString(stream);if (PasswordFlag) Password = parse.ReadString(stream);
-
Write Protocol Code
parse.WriteString(stream, ProtocolName);stream.WriteByte(Version);byte flats = 0x0;if (Reserved) flats |= MQTTParse.BIT_1;if (ClearStart) flats |= MQTTParse.BIT_2;if (WillFalg) flats |= MQTTParse.BIT_3;flats |= (byte)((byte)this.WillQos << 3);if (this.WillRetian) flats |= MQTTParse.BIT_6;if (this.UserFlag) flats |= MQTTParse.BIT_7;if (this.PasswordFlag) flats |= MQTTParse.BIT_8;stream.WriteByte(flats);parse.WriteUInt16(stream, KeepAlive);var ps = GetPropertiesStream() + SessionExpiryInterval + ReceiveMaximum + MaximumPacketSize + TopicAliasMaximum + RequestResponseInformation + RequestProblemInformation + AuthenticationMethod + AuthenticationData + UserProperties;ps.Write(parse, stream);parse.WriteString(stream, ClientID);if (WillFalg){ this.WillProperty.Write(this, parse, stream); parse.WriteString(stream, WillTopic); if (WillPayload == null) WillPayload = new byte[0]; parse.WriteBinary(stream, WillPayload);}if (UserFlag) parse.WriteString(stream, Name);if (PasswordFlag) parse.WriteString(stream, Password);
Full Code//github.com/beetlex-io/mqtt/blob/main/BeetleX.MQTT.Protocols/V5/Messages/Publish.cs
Publish Message The structure of this message is not complex, consisting of a variable header data block and the binary structure of the pushed content; this binary can be encoded according to the agreement between both parties, which can be JSON, UTF characters, etc.
Description |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
|
Topic Name |
|||||||||
byte 1 |
Length MSB (0) |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
byte 2 |
Length LSB (3) |
0 |
0 |
0 |
0 |
0 |
0 |
1 |
1 |
byte 3 |
‘a’ (0x61) |
0 |
1 |
1 |
0 |
0 |
0 |
0 |
1 |
byte 4 |
‘/’ (0x2F) |
0 |
0 |
1 |
0 |
1 |
1 |
1 |
1 |
byte 5 |
‘b’ (0x62) |
0 |
1 |
1 |
0 |
0 |
0 |
1 |
0 |
Packet Identifier |
|||||||||
byte 6 |
Packet Identifier MSB (0) |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
byte 7 |
Packet Identifier LSB (10) |
0 |
0 |
0 |
0 |
1 |
0 |
1 |
0 |
Property Length |
|||||||||
byte 8 |
No Properties |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
0 |
The above is the variable header structure of the Publish message, including the topic name, message ID, and attribute set; the remaining issue is the submitted message body.
-
Read Protocol Code
Topic = parse.ReadString(stream);if (QoS != QoSType.MostOnce) Identifier = parse.ReadUInt16(stream);var ps = GetPropertiesStream();ps.Read(parse, stream);PayloadFormatIndicator = ps;MessageExpiryInterval = ps;TopicAlias = ps;ResponseTopic = ps;CorrelationData = ps;SubscriptionIdentifier = ps;ContentType = ps;UserProperties = ps;var len = (int)(stream.Length - stream.Position);var payload = MQTTMessage.RentPayloadBuffer(len);stream.Read(payload, 0, len);Payload = new ArraySegment<byte>(payload, 0, len);
-
Write Protocol Code
base.OnWrite(parse, stream, session);parse.WriteString(stream, Topic);if (QoS != QoSType.MostOnce) parse.WriteUInt16(stream, Identifier);var ps = GetPropertiesStream() + PayloadFormatIndicator + MessageExpiryInterval + TopicAlias + ResponseTopic + CorrelationData + SubscriptionIdentifier + UserProperties;ps.Write(parse, stream);stream.Write(Payload.Array, Payload.Offset, Payload.Count);
Full Code: //github.com/beetlex-io/mqtt/blob/main/BeetleX.MQTT.Protocols/V5/Messages/Publish.cs
Through the implementation of the above two message protocol codes, we can reflect the analysis and processing of the MQTT protocol. In fact, handling the MQTT protocol is not complicated; to facilitate operations in the program, we just need to handle the corresponding structures defined by the protocol.
BeetleX
Open Source Cross-Platform Communication Framework (Supports TLS)
Provides open-source components for HTTP, Websocket, MQTT, Redis, RPC, and service gateways
Personal WeChat: henryfan128 QQ: 28304340
Follow the public account
https://github.com/beetlex-io/http://beetlex-io.com