Detailed Explanation of MQTT Protocol Implementation

To pass the time, I will provide a detailed explanation of the MQTT protocol code implementation I wrote earlier; the main content will analyze and implement the MQTT network communication protocol, specifically involving the processing of network data stream segmentation. Therefore, much of the content will involve specific code functionalities. If you are interested in learning about this area, it should be quite helpful. Next, I will explain the design and specific implementation details of BeetleX.MQTT.Protocols (the MQTT protocol is version 5.0).

References: Chinese: vitsumoc.github.io/mqtt-v5-0-chinese.html English: docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html Programming Language: C# Full Code: //github.com/beetlex-io/mqtt

What is MQTT? MQTT (Message Queuing Telemetry Transport) is a “lightweight” communication protocol based on the publish/subscribe model. This protocol is built on top of the TCP/IP protocol and was released by IBM in 1999 (the latest version of the protocol was finalized in 2019, version 5.0).

Basic Types: Communication protocols define the specifications for data streams, allowing for data segmentation and reading based on specific specifications. Each complete message that is segmented can be referred to as a message packet, which consists of multiple data member components, and these data members are the basic types of the protocol. The types involved in the MQTT protocol operations are as follows:

  • Bit: The basic unit of network and storage is a byte, but one byte contains 8 bits, which can express 8 boolean state attributes. In network protocol design, bits are often used to define certain state types to reduce transmission bandwidth. Reading and writing such types are often accomplished through operations like & | << and >>.

bool tag = (0b0000_0001 &amp; data) &gt; 0;// Check if the 0th bit is 1
data |= 0b0000_0001 // Set the 0th bit to 1
  • Byte: In the protocol, bytes are often used to represent custom data types or states, such as the response status code of the ConnAck command in MQTT:

Value

Hex

Reason Code Name

Description

0

0x00

Success

The Connection is accepted.

128

0x80

Unspecified Error

The Server does not wish to reveal the reason for the failure, or none of the other Reason Codes apply.

129

0x81

Malformed Packet

Data within the CONNECT packet could not be correctly parsed.

130

0x82

Protocol Error

Data in the CONNECT packet does not conform to this specification.

131

0x83

Implementation Specific Error

The CONNECT is valid but is not accepted by this Server.

132

0x84

Unsupported Protocol Version

The Server does not support the version of the MQTT protocol requested by the Client.

  • Variable Length Integer: In the program, integers are stored as 4 bytes regardless of their size. To save transmission bandwidth, a variable-length storage method is designed, where the storage space is 1-5 bytes depending on the size of the value. Since the transmitted message packets are generally not large, in most cases, the stored message length will not exceed 4 bytes. Below is a helper class for variable-length read/write:

public class Int7bit{    [ThreadStatic]    private static Byte[] mDataBuffer;    public void Write(System.IO.Stream stream, int value)    {        if (mDataBuffer == null)            mDataBuffer = new byte[32];        var count = 0;        var num = (UInt64)value;        while (num &gt;= 0x80)        {            mDataBuffer[count++] = (Byte)(num | 0x80);            num &gt;&gt;= 7;        }        mDataBuffer[count++] = (Byte)num;        stream.Write(mDataBuffer, 0, count);    }    private uint mResult = 0;    private byte mBits = 0;    public int? Read(System.IO.Stream stream)    {        Byte b;        while (true)        {            if (stream.Length &lt; 1)                return null;            var bt = stream.ReadByte();            if (bt &lt; 0)            {                mBits = 0;                mResult = 0;                throw new BXException("Read 7bit int error: byte value cannot be less than zero!");            }            b = (Byte)bt;            mResult |= (UInt32)((b &amp; 0x7f) &lt;&lt; mBits);            if ((b &amp; 0x80) == 0) break;            mBits += 7;            if (mBits &gt;= 32)            {                mBits = 0;                mResult = 0;                throw new BXException("Read 7bit int error: out of maximum value!");            }        }        mBits = 0;        var result = mResult;        mResult = 0;        return (Int32)result;    }}
  • Integer: There are two numeric types in the protocol, namely short integer and integer, with storage sizes of 2 bytes and 4 bytes respectively. The protocol uses big-endian storage for integers, while C# defaults to little-endian, so a simple conversion is required during processing.

public static short SwapInt16(short v){    return (short)(((v &amp; 0xFF) &lt;&lt; 8) | ((v &gt;&gt; 8) &amp; 0xFF));}public static ushort SwapUInt16(ushort v){    return (ushort)((uint)((v &amp; 0xFF) &lt;&lt; 8) | ((uint)(v &gt;&gt; 8) &amp; 0xFFu));}public static int SwapInt32(int v){    return ((SwapInt16((short)v) &amp; 0xFFFF) &lt;&lt; 16) | (SwapInt16((short)(v &gt;&gt; 16)) &amp; 0xFFFF);}public static uint SwapUInt32(uint v){    return (uint)((SwapUInt16((ushort)v) &amp; 0xFFFF) &lt;&lt; 16) | (SwapUInt16((ushort)(v &gt;&gt; 16)) &amp; 0xFFFFu);}

With the above functions, one can perform the corresponding storage byte order conversion. When reading and writing streams, just include this function call.

public virtual void WriteUInt16(System.IO.Stream stream, ushort value){    value = BitHelper.SwapUInt16(value);    var data = BitConverter.GetBytes(value);    stream.Write(data, 0, data.Length);}public virtual ushort ReadUInt16(System.IO.Stream stream){    var buffer = GetIntBuffer();    stream.Read(buffer, 0, 2);    var result = BitConverter.ToUInt16(buffer, 0);    return BitHelper.SwapUInt16(result);}public virtual void WriteInt16(System.IO.Stream stream, short value){    value = BitHelper.SwapInt16(value);    var data = BitConverter.GetBytes(value);    stream.Write(data, 0, data.Length);}public virtual short ReadInt16(System.IO.Stream stream){    var buffer = GetIntBuffer();    stream.Read(buffer, 0, 2);    var result = BitConverter.ToInt16(buffer, 0);    return BitHelper.SwapInt16(result);}
  • Character: The character type in UTF-8, described by a 2-byte integer header indicating the length of the character.

public virtual void WriteString(System.IO.Stream stream, string value, Encoding encoding = null){    if (encoding == null)        encoding = Encoding.UTF8;    if (string.IsNullOrEmpty(value))    {        WriteUInt16(stream, 0);        return;    }    byte[] buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(value.Length * 6);    var count = encoding.GetBytes(value, 0, value.Length, buffer, 0);    WriteUInt16(stream, (ushort)count);    stream.Write(buffer, 0, count);    System.Buffers.ArrayPool<byte>.Shared.Return(buffer);}public virtual string ReadString(System.IO.Stream stream, Encoding encoding = null){    if (encoding == null)        encoding = Encoding.UTF8;    UInt16 len = ReadUInt16(stream);    if (len == 0)        return string.Empty;    byte[] buffer = System.Buffers.ArrayPool<byte>.Shared.Rent(len);    stream.Read(buffer, 0, len);    string result = encoding.GetString(buffer, 0, len);    System.Buffers.ArrayPool<byte>.Shared.Return(buffer);    return result;}
  • Binary: The type is a byte array, which is described by a 2-byte integer header indicating the length of the array.

public void WriteBinary(System.IO.Stream stream, byte[] data){    WriteUInt16(stream, (ushort)data.Length);    stream.Write(data, 0, data.Length);}public byte[] ReadBinary(System.IO.Stream stream){    var len = ReadUInt16(stream);    byte[] result = new byte[len];    stream.Read(result, 0, len);    return result;}

The above is the description of the MQTT protocol types and their corresponding implementation operations. Next, we can implement the specific protocol.

Segmentation of Message Packets: Before writing the code, let’s understand how MQTT message packets are segmented. Below is the defined format of the protocol:

Bit

7

6

5

4

3

2

1

0

byte 1

MQTT Control Packet type

Flags specific to each MQTT Control Packet type

byte 2…

Remaining Length

The protocol splits the first byte into two parts. The first part’s 4 bits are used to describe the control type related to the message, with different message packets having different situations, which can be checked in the protocol documentation. The second part consists of the high 4 bits used to describe the message type. From the second byte onwards, a variable-length integer describes the remaining length of the message body. In terms of implementation performance, I do not agree with this protocol design. Variable lengths can save 2 bytes of bandwidth (the 5.0 protocol was finalized in 2019; I personally feel that existing network resources do not lack this bit of traffic)….. This design results in one memory copy during message writing to the network stream, while the subsequent extended properties also cause this situation, leading to multiple copies affecting performance. Here is a brief explanation of this situation:

Detailed Explanation of MQTT Protocol Implementation

Since the variable length is determined by the message content, it is impossible to allocate a fixed space in the network buffer memory. Therefore, one can only write the content in new memory, obtain the length, write the length into the network memory block, and then copy the corresponding content into the network memory block. If the length were fixed, such as 4 bytes, one could allocate a 4-byte space immediately after writing the message header and directly write the specific message content. After writing the content, one could refill the previously allocated length space with the obtained length; this has the advantage that the message content can be directly written into the network memory block without needing other memory to write and then copy.

Implementing Message Types: Since we have the basic rules of the protocol, we can implement an abstract message class to express data messages.

public abstract class MQTTMessage    {        public MQTTMessage()        {        }        public abstract MQTTMessageType Type { get; }        public byte Bit1 { get; set; }        public byte Bit2 { get; set; }        public byte Bit3 { get; set; }        public byte Bit4 { get; set; }        internal void Read(MQTTParse parse, System.IO.Stream stream, ISession session)        {            OnRead(parse, stream, session);        }        protected virtual void OnRead(MQTTParse parse, Stream stream, ISession session)        {        }        internal void Write(MQTTParse parse, System.IO.Stream stream, ISession session)        {            OnWrite(parse, stream, session);        }        protected virtual void OnWrite(MQTTParse parse, Stream stream, ISession session)        {        }}    

During the design process, the OnRead and OnWrite methods are reserved for specific message implementations to handle corresponding network data reading and writing. With this abstract message structure, we can perform read/write segmentation of the protocol.

public override MQTTMessage Read(Stream stream, ISession session){    IServer server = session?.Server;    if (stream.Length &gt; 0)    {        if (mType == null)        {            mHeader = (byte)stream.ReadByte();            if (mHeader &lt; 0)            {                throw new BXException("parse mqtt message error: fixed header byte value cannot be less than zero!");            }            mType = (MQTTMessageType)((mHeader &amp; 0b1111_0000) &gt;&gt; 4);            if (server != null &amp;&amp; server.EnableLog(EventArgs.LogType.Debug))                server.Log(EventArgs.LogType.Debug, session, "parse mqtt header success");        }        if (mLength == null)        {            mLength = mInt7BitHandler.Read(stream);            if (server != null &amp;&amp; server.EnableLog(EventArgs.LogType.Debug))                server.Log(EventArgs.LogType.Debug, session, $"parse mqtt size {mLength}");        }        if (mLength != null &amp;&amp; stream.Length &gt;= mLength.Value)        {            Stream protocolStream = GetProtocolStream();            CopyStream(stream, protocolStream, mLength.Value);            MQTTMessage msg = CreateMessage(mType.Value, session);            msg.Bit1 = (byte)(BIT_1 &amp; mHeader);            msg.Bit2 = (byte)(BIT_2 &amp; mHeader);            msg.Bit3 = (byte)(BIT_3 &amp; mHeader);            msg.Bit4 = (byte)(BIT_4 &amp; mHeader);            msg.Read(this, protocolStream, session);            if (server != null &amp;&amp; server.EnableLog(EventArgs.LogType.Debug))                server.Log(EventArgs.LogType.Debug, session, $"parse mqtt type {msg} success");            mLength = null;            mType = null;            return msg;        }    }    return null;}

Through the above Read and Write methods, we can convert network data and program objects. At this point, the explanation and processing of the top-level MQTT message packets are complete. The implementation of different messages will be detailed in subsequent chapters.

BeetleX

Open-source cross-platform communication framework (supports TLS)

Provides HTTP, Websocket, MQTT, Redis, RPC, and service gateway open-source components
Personal WeChat: henryfan128    QQ: 28304340
Follow the official account

https://github.com/beetlex-io/
http://beetlex-io.com

Leave a Comment