To pass the time, I will provide a detailed explanation of the previously written MQTT protocol code implementation; mainly focusing on the analysis and implementation of the MQTT network communication protocol, which specifically involves the splitting and processing of network data streams. Therefore, much of the content involves specific code functionalities. If you are interested in learning in this area, it should be of some help. Next, I will explain the design and specific implementation details of BeetleX.MQTT.Protocols (for MQTT protocol version 5.0).Reference Documents Chinese: vitsumoc.github.io/mqtt-v5-0-chinese.html English: docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html Programming Language: C# Complete Code: //github.com/beetlex-io/mqtt
What is MQTT MQTT (Message Queuing Telemetry Transport) is a “lightweight” communication protocol based on the publish/subscribe model. This protocol is built on the TCP/IP protocol and was released by IBM in 1999 (the latest version of the protocol was finalized in 2019 as version 5.0).
Basic Types A communication protocol is a definition of the data stream specification, which defines how data is split and read according to specific specifications. Each complete message that is split can be called a message packet, and message packets have multiple data member components, which are the basic types of the protocol. The types involved in the operation of the MQTT protocol are as follows:
-
Bit The basic unit of network and storage is a byte, but a byte contains 8 bits, which can represent 8 boolean state attributes. In network protocol design, bits are often used to define some state types to reduce transmission bandwidth. For reading and writing this type, operations like & | << and >> are often used.
bool tag = (0b0000_0001 & data) > 0;//Read if bit 0 is 1data|=0b0000_0001 //Set bit 0 to 1
-
Byte In the protocol, bytes are often used to represent custom data types or states, such as the response status code of the ConnAck command in MQTT:
Value |
Hex |
Reason Code Name |
Description |
0 |
0x00 |
Success |
The Connection is accepted. |
128 |
0x80 |
Unspecified error |
The Server does not wish to reveal the reason for the failure, or none of the other Reason Codes apply. |
129 |
0x81 |
Malformed Packet |
Data within the CONNECT packet could not be correctly parsed. |
130 |
0x82 |
Protocol Error |
Data in the CONNECT packet does not conform to this specification. |
131 |
0x83 |
Implementation Specific Error |
The CONNECT is valid but is not accepted by this Server. |
132 |
0x84 |
Unsupported Protocol Version |
The Server does not support the version of the MQTT protocol requested by the Client. |
-
Variable Length Integer In the program, the storage of integers is 4 bytes, regardless of the value size, it is fixed at 4 bytes. To save bandwidth during transmission, a variable length storage method is designed, with storage space ranging from 1 to 5 bytes depending on the value size. Since the transmitted message packets are generally not large, in most cases, the stored message length does not exceed 4 bytes. Below is a helper class for reading and writing variable lengths:
public class Int7bit{ [ThreadStatic] private static Byte[] mDataBuffer; public void Write(System.IO.Stream stream, int value) { if (mDataBuffer == null) mDataBuffer = new byte[32]; var count = 0; var num = (UInt64)value; while (num >= 0x80) { mDataBuffer[count++] = (Byte)(num | 0x80); num >>= 7; } mDataBuffer[count++] = (Byte)num; stream.Write(mDataBuffer, 0, count); } private uint mResult = 0; private byte mBits = 0; public int? Read(System.IO.Stream stream) { Byte b; while (true) { if (stream.Length < 1) return null; var bt = stream.ReadByte(); if (bt < 0) { mBits = 0; mResult = 0; throw new BXException("Read 7bit int error:byte value cannot be less than zero!"); } b = (Byte)bt; mResult |= (UInt32)((b & 0x7f) << mBits); if ((b & 0x80) == 0) break; mBits += 7; if (mBits >= 32) { mBits = 0; mResult = 0; throw new BXException("Read 7bit int error:out of maximum value!"); } } mBits = 0; var result = mResult; mResult = 0; return (Int32)result; }}
-
Integer There are two numerical types in the protocol: short integer and integer; their storage sizes are 2 bytes and 4 bytes respectively. The protocol uses big-endian storage for integers, while C# defaults to little-endian, so a simple conversion is required during processing.
public static short SwapInt16(short v){ return (short)(((v & 0xFF) << 8) | ((v >> 8) & 0xFF));}public static ushort SwapUInt16(ushort v){ return (ushort)((uint)((v & 0xFF) << 8) | ((uint)(v >> 8) & 0xFFu));}public static int SwapInt32(int v){ return ((SwapInt16((short)v) & 0xFFFF) << 16) | (SwapInt16((short)(v >> 16)) & 0xFFFF);}public static uint SwapUInt32(uint v){ return (uint)((SwapUInt16((ushort)v) & 0xFFFF) << 16) | (SwapUInt16((ushort)(v >> 16)) & 0xFFFFu);}
With the above functions, you can perform the corresponding storage byte order conversion. Just add this function call when reading and writing streams.
public virtual void WriteUInt16(System.IO.Stream stream, ushort value){ value = BitHelper.SwapUInt16(value); var data = BitConverter.GetBytes(value); stream.Write(data, 0, data.Length);}public virtual ushort ReadUInt16(System.IO.Stream stream){ var buffer = GetIntBuffer(); stream.Read(buffer, 0, 2); var result = BitConverter.ToUInt16(buffer, 0); return BitHelper.SwapUInt16(result);}public virtual void WriteInt16(System.IO.Stream stream, short value){ value = BitHelper.SwapInt16(value); var data = BitConverter.GetBytes(value); stream.Write(data, 0, data.Length);}public virtual short ReadInt16(System.IO.Stream stream){ var buffer = GetIntBuffer(); stream.Read(buffer, 0, 2); var result = BitConverter.ToInt16(buffer, 0); return BitHelper.SwapInt16(result);}
-
Character The character type is UTF-8, described by a 2-byte integer header that indicates the length of the character.
public virtual void WriteString(System.IO.Stream stream, string value, Encoding encoding = null){ if (encoding == null) encoding = Encoding.UTF8; if (string.IsNullOrEmpty(value)) { WriteUInt16(stream, 0); return; } byte[] buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(value.Length * 6); var count = encoding.GetBytes(value, 0, value.Length, buffer, 0); WriteUInt16(stream, (ushort)count); stream.Write(buffer, 0, count); System.Buffers.ArrayPool<byte>.Shared.Return(buffer);}public virtual string ReadString(System.IO.Stream stream, Encoding encoding = null){ if (encoding == null) encoding = Encoding.UTF8; UInt16 len = ReadUInt16(stream); if (len == 0) return string.Empty; byte[] buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(len); stream.Read(buffer, 0, len); string result = encoding.GetString(buffer, 0, len); System.Buffers.ArrayPool<byte>.Shared.Return(buffer); return result;}
-
Binary The type is a byte array, described by a 2-byte integer header that indicates the length of the array.
public void WriteBinary(System.IO.Stream stream, byte[] data){ WriteUInt16(stream, (ushort)data.Length); stream.Write(data, 0, data.Length);}public byte[] ReadBinary(System.IO.Stream stream){ var len = ReadUInt16(stream); byte[] result = new byte[len]; stream.Read(result, 0, len); return result;}
The above describes the types of the MQTT protocol and the corresponding implementation operations. Next, we can implement the specific protocol.
Splitting Message Packets Before writing code, first understand how MQTT message packets are split. Below is the format defined by the protocol.
Bit |
7 |
6 |
5 |
4 |
3 |
2 |
1 |
0 |
byte 1 |
MQTT Control Packet type |
Flags specific to each MQTT Control Packet type |
||||||
byte 2… |
Remaining Length |
The protocol splits the first byte into two parts: the first part’s 4 bits are used to describe the control type related to the message, and different message packets have different situations, which can be checked in the protocol document; the second part is the high 4 bits used to describe the message type. From the second byte onwards, a variable length integer is used to describe the remaining data length of the message body. In terms of implementation performance, I do not agree with this protocol design. Although variable length can save 2 bytes of bandwidth (the 5.0 protocol was finalized in 2019, and I personally feel that the existing network resources do not lack this bit of traffic), this design leads to a memory copy when writing messages to the network stream. However, the subsequent extended attributes also cause this, resulting in multiple copies affecting performance. Next, I will briefly describe this situation:
Since the variable length is determined by the message content, it is impossible to allocate a fixed space in the network buffer memory. Therefore, content can only be written to new memory, get the length, then write the length to the network memory block, and then copy the corresponding content to the network memory block. If the length is fixed, such as 4 bytes, you can allocate a 4-byte space immediately after writing the message header and directly write the specific message content. Once the content is written, the length can be filled back into the previously allocated length space. The benefit of this is that the message content can be directly written into the network memory block, without needing to write to other memory and then copy.
Implementing Message Types Since we have the basic rules of the protocol, we can implement an abstract message class to represent data messages.
public abstract class MQTTMessage { public MQTTMessage() { } public abstract MQTTMessageType Type { get; } public byte Bit1 { get; set; } public byte Bit2 { get; set; } public byte Bit3 { get; set; } public byte Bit4 { get; set; } internal void Read(MQTTParse parse, System.IO.Stream stream, ISession session) { OnRead(parse, stream, session); } protected virtual void OnRead(MQTTParse parse, Stream stream, ISession session) { } internal void Write(MQTTParse parse, System.IO.Stream stream, ISession session) { OnWrite(parse, stream, session); } protected virtual void OnWrite(MQTTParse parse, Stream stream, ISession session) { }}
During the design process, the OnRead and OnWrite methods are reserved for specific message implementations to correspond to network data reading and writing; with this abstract message structure, we can perform read and write splitting for the protocol.
public override MQTTMessage Read(Stream stream, ISession session){ IServer server = session?.Server; if (stream.Length > 0) { if (mType == null) { mHeader = (byte)stream.ReadByte(); if (mHeader < 0) { throw new BXException("parse mqtt message error:fixed header byte value cannot be less than zero!"); } mType = (MQTTMessageType)((mHeader & 0b1111_0000) >> 4); if (server != null && server.EnableLog(EventArgs.LogType.Debug)) server.Log(EventArgs.LogType.Debug, session, "parse mqtt header success"); } if (mLength == null) { mLength = mInt7BitHandler.Read(stream); if (server != null && server.EnableLog(EventArgs.LogType.Debug)) server.Log(EventArgs.LogType.Debug, session, $"parse mqtt size {mLength}"); } if (mLength != null && stream.Length >= mLength.Value) { Stream protocolStream = GetProtocolStream(); CopyStream(stream, protocolStream, mLength.Value); MQTTMessage msg = CreateMessage(mType.Value, session); msg.Bit1 = (byte)(BIT_1 & mHeader); msg.Bit2 = (byte)(BIT_2 & mHeader); msg.Bit3 = (byte)(BIT_3 & mHeader); msg.Bit4 = (byte)(BIT_4 & mHeader); msg.Read(this, protocolStream, session); if (server != null && server.EnableLog(EventArgs.LogType.Debug)) server.Log(EventArgs.LogType.Debug, session, $"parse mqtt type {msg} success"); mLength = null; mType = null; return msg; } } return null;}
With the above Read and Write methods, we can convert network data and program objects. At this point, the top-level message packet explanation and processing of MQTT is complete; the implementation of different messages will be detailed in later chapters.
BeetleX
Open Source Cross-Platform Communication Framework (supports TLS)
Provides open-source components for HTTP, Websocket, MQTT, Redis, RPC, and service gateways.
Personal WeChat: henryfan128 QQ: 28304340
Follow the public account
https://github.com/beetlex-io/
http://beetlex-io.com