MQTT and EMQ
1. IoT Messaging Protocol MQTT
1.1. Introduction to MQTT
MQTT (Message Queuing Telemetry Transport) is a “lightweight” communication protocol based on the publish/subscribe model.

Client | Server |
---|---|
An application or device using the MQTT protocol, which always establishes a network connection to the server. Clients can: | Also known as a “Message Broker”, it can be an application or a device. It is located between the message publisher and subscriber, and it can: |
– (1) Publish information that other clients may subscribe to; | – (1) Accept network connections from clients; |
– (2) Subscribe to messages published by other clients; | – (2) Accept application information published by clients; |
– (3) Unsubscribe or delete messages from applications; | – (3) Handle subscription and unsubscription requests from clients; |
– (4) Disconnect from the server. | – (4) Forward application messages to subscribed clients. |
Design Principles:
① Simplify, do not add unnecessary features;
② Publish/Subscribe (Pub/Sub) model, facilitates message transfer between sensors, decouples Client/Server mode, allowing for the benefit of not needing to know each other’s existence (ip/port) in advance and not needing to run simultaneously;
③ Allow users to dynamically create topics (no need to create topics in advance), zero operational cost;
④ Minimize transmission volume to improve transmission efficiency;
⑤ Consider factors such as low bandwidth, high latency, and unstable networks;
⑥ Support continuous session maintenance and control (heartbeat);
⑦ Understand that client computing power may be very low;
⑧ Provide Quality of Service (QoS) management;
⑨ Do not impose strict requirements on the type and format of transmitted data, maintain flexibility (referring to application layer business data).
MQTT Application Areas:
① IoT M2M communication, IoT big data collection
② Android message push, WEB message push
③ Mobile instant messaging, such as Facebook Messenger
④ Smart hardware, smart home, smart appliances
⑤ Vehicle networking communication, electric vehicle station data collection
⑥ Smart cities, remote healthcare, remote education
⑦ Power, oil, and energy industries
1.2 MQTT Protocol Related Concepts
MQTT Protocol Composition

Methods in the MQTT Protocol
– (1) CONNECT: Client connects to the server | – (9) SUBACK: Subscription acknowledgment |
---|---|
– (2) CONNACK: Connection acknowledgment | – (10) UNSUBSCRIBE: Unsubscribe |
– (3) PUBLISH: Publish message | – (11) UNSUBACK: Unsubscribe acknowledgment |
– (4) PUBACK: Publish acknowledgment | – (12) PINGREQ: Client sends heartbeat |
– (5) PUBREC: Published message received | – (13) PINGRESP: Server heartbeat response |
– (6) PUBREL: Published message released | – (14) DISCONNECT: Disconnect |
– (7) PUBCOMP: Publish complete | – (15) AUTH: Authentication |
– (8) SUBSCRIBE: Subscription request |
1.3. Message Quality of Service (QoS)
The MQTT protocol specifies the Quality of Service (QoS) for message delivery, ensuring reliability in message transmission under different network conditions, with QoS design being a key aspect of the MQTT protocol.
MQTT defines three QoS levels.
– QoS 0: Message delivered at most once.
– QoS 1: Message delivered at least once.
– QoS 2: Message delivered exactly once.
QoS0: “At most once”, message delivery entirely depends on the underlying TCP/IP network. Message loss may occur. A message is neither acknowledged by the receiver nor stored for retransmission by the sender. This is also known as “fire and forget”.

QoS1: “At least once”, ensures message delivery, but duplicates may occur. The sender will store the sent information until it receives a PUBACK acknowledgment from the receiver at least once.

QoS2: “Exactly once”, ensures the message is delivered once. If the receiver receives a QoS 2 PUBLISH message, it will process the PUBLISH message accordingly and confirm with a PUBREC message to the sender.
PUBLISH: Publish message
PUBREC: Publish received
PUBREL: Publish released
PUBCOMP: Publish complete

How different QoS levels affect subscription delivery

1.4. Topic Wildcard Matching Rules




2. IoT Level Messaging Middleware EMQ
2.1. Introduction to EMQX
EMQ X Broker is developed on the high-concurrency Erlang/OTP language platform, supporting millions of connections and distributed cluster architecture, and is an open-source MQTT message server based on the publish/subscribe model.
EMQ official website: https://www.emqx.cn/
Why choose EMQ X? Considering support for MQTT 5.0, stability, scalability, cluster capabilities, etc., EMQX should perform the best.

Features of EMQX
-
EMQ X is currently the most popular MQTT messaging middleware in the open-source community; -
EMQ X is the first messaging server in the open-source community to support the MQTT 5.0 protocol specification and is fully compatible with MQTT V3.1 and V3.1.1 protocols. -
In addition to the MQTT protocol, EMQ X also supports IoT protocols such as MQTT-SN, CoAP, LwM2M, LoRaWAN, and WebSocket. -
Single instance supports millions of connections, and clusters support tens of millions of connections; millisecond-level message forwarding. -
Easy to install and use; -
Local technical support services in China; -
Extension modules and plugins, EMQ X provides a flexible extension mechanism, supporting some customized scenarios for enterprises; -
Bridging -
Shared subscriptions
2.2. Setting Up and Configuring the Environment
Running and Installing with Docker
Pull emqx image
[root@docker emqx]# docker pull emqx/emqx:v4.1.0
Create emqx container
[root@docker emqx]# docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083 emqx/emqx:v4.1.0
EMQ X Directory Structure

2.3. EMQ Dashboard
EMQ X provides a Dashboard for users to manage devices and monitor relevant metrics. Through the Dashboard, you can view basic server information, load conditions, and statistics, check the connection status of a specific client, and even disconnect it, as well as dynamically load and unload specified plugins.
Access address http://192.168.150.102:18083 to view the Dashboard, **default username is admin, password is public*



2.4. Client Debugging Tool MQTTX
MQTT X is an elegant cross-platform MQTT 5.0 desktop client open-sourced by EMQ, supporting macOS, Linux, and Windows.
The UI of MQTT X adopts a chat interface style, simplifying page operation logic, allowing users to quickly create connections, save multiple clients, and easily test MQTT/MQTTS connections, as well as subscribing to and publishing MQTT messages.

2.5. Delayed Messages
The delayed publishing feature of EMQ X allows for the publishing of PUBLISH messages after a user-configured time interval. The module is enabled with emqx_mod_delayed
The specific format for delayed publishing topics is as follows:
$delayed/{DelayInterval}/{TopicName}
– delayed as the topic prefix will be considered messages that need to be delayed for publishing.
– {DelayInterval}: Specifies the time interval for delayed publishing of the MQTT message, in seconds, with a maximum allowable interval of 4294967 seconds.
– {TopicName}: The topic name of the MQTT message.
1. Subscribe to the topic on Websocket: t2/a
2. Publish message to the topic on Websocket: topic: $delayed/10/t2/a
Observe the running effect

2.6. Shared Subscriptions
Shared subscriptions without groups
Format:
$queue/{TopicName}


Shared subscriptions with groups
Format:
$share/<group-name>/{TopicName}
</group-name>

3. Eclipse Paho
3.1. What is Eclipse Paho
Eclipse Paho is the officially recommended Java client implementing the MQTT protocol by EMQ.
Its relationship is similar to MySQL and JDBC; our project code needs to use JDBC to connect to the database, while our project needs to use Eclipse Paho to connect to EMQX, which provides basic message sending and receiving.
3.2. Technical Research on Eclipse Paho
Integrating Eclipse Paho
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
Publishing Messages to EMQ
(1) Set up a basic Spring Boot program.
(2) Write a controller and add a method to publish messages.
@GetMapping("/publish")
public void publish() throws MqttException {
MqttClientPersistence persistence = new MemoryPersistence(); // Memory persistence
MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);
// Connection options define username, password, and other configurations
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true); // true means clear cache, non-persistent subscriber; when set to true, it is definitely a non-persistent subscriber; when set to false, it means the server retains the client's connection record
options.setAutomaticReconnect(true); // Whether to reconnect automatically
options.setConnectionTimeout(30); // Connection timeout in seconds
options.setKeepAliveInterval(10); // Connection keep-alive check period in seconds
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); // Version
client.connect(options); // Connect
client.publish("topic", "Sending content".getBytes(), 2, false);
}
Subscribing to Messages
Add a method in the controller to subscribe to messages
@GetMapping("/subscribe")
public void subscribe() throws MqttException {
MqttClientPersistence persistence = new MemoryPersistence(); // Memory persistence
MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);
// Connection options define username, password, and other configurations
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true); // true means clear cache, non-persistent subscriber; when set to true, it is definitely a non-persistent subscriber; when set to false, it means the server retains the client's connection record
options.setAutomaticReconnect(true); // Whether to reconnect automatically
options.setConnectionTimeout(30); // Connection timeout in seconds
options.setKeepAliveInterval(10); // Connection keep-alive check period in seconds
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); // Version
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectionLost(Throwable throwable) {
System.out.println("Connection lost!");
}
@Override
public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
System.out.println( "Received message topic:" +s+" id:"+mqttMessage.getId() +" message:"+ mqttMessage.toString());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
}
@Override
public void connectComplete(boolean b, String s) {
System.out.println("Connection successful!");
}
});
client.connect(options); // Connect
client.subscribe("test"); // Subscribe to topic
}
If you find this article helpful, please follow, like, and bookmark to support me, your support is my motivation!
Original content is not easy, please indicate the source when reprinting, thank you for your support! If this article is useful to you, feel free to share it!