Quick Start Guide to MQTT and EMQ

MQTT and EMQ

1. IoT Messaging Protocol MQTT

1.1. Introduction to MQTT

MQTT (Message Queuing Telemetry Transport) is a “lightweight” communication protocol based on the publish/subscribe model.

Quick Start Guide to MQTT and EMQ
Client Server
An application or device using the MQTT protocol, which always establishes a network connection to the server. Clients can: Also known as a “Message Broker”, it can be an application or a device. It is located between the message publisher and subscriber, and it can:
– (1) Publish information that other clients may subscribe to; – (1) Accept network connections from clients;
– (2) Subscribe to messages published by other clients; – (2) Accept application information published by clients;
– (3) Unsubscribe or delete messages from applications; – (3) Handle subscription and unsubscription requests from clients;
– (4) Disconnect from the server. – (4) Forward application messages to subscribed clients.

Design Principles:

① Simplify, do not add unnecessary features;

② Publish/Subscribe (Pub/Sub) model, facilitates message transfer between sensors, decouples Client/Server mode, allowing for the benefit of not needing to know each other’s existence (ip/port) in advance and not needing to run simultaneously;

③ Allow users to dynamically create topics (no need to create topics in advance), zero operational cost;

④ Minimize transmission volume to improve transmission efficiency;

⑤ Consider factors such as low bandwidth, high latency, and unstable networks;

⑥ Support continuous session maintenance and control (heartbeat);

⑦ Understand that client computing power may be very low;

⑧ Provide Quality of Service (QoS) management;

⑨ Do not impose strict requirements on the type and format of transmitted data, maintain flexibility (referring to application layer business data).

MQTT Application Areas:

① IoT M2M communication, IoT big data collection

② Android message push, WEB message push

③ Mobile instant messaging, such as Facebook Messenger

④ Smart hardware, smart home, smart appliances

⑤ Vehicle networking communication, electric vehicle station data collection

⑥ Smart cities, remote healthcare, remote education

⑦ Power, oil, and energy industries

1.2 MQTT Protocol Related Concepts

MQTT Protocol Composition

Quick Start Guide to MQTT and EMQ

Methods in the MQTT Protocol

– (1) CONNECT: Client connects to the server – (9) SUBACK: Subscription acknowledgment
– (2) CONNACK: Connection acknowledgment – (10) UNSUBSCRIBE: Unsubscribe
– (3) PUBLISH: Publish message – (11) UNSUBACK: Unsubscribe acknowledgment
– (4) PUBACK: Publish acknowledgment – (12) PINGREQ: Client sends heartbeat
– (5) PUBREC: Published message received – (13) PINGRESP: Server heartbeat response
– (6) PUBREL: Published message released – (14) DISCONNECT: Disconnect
– (7) PUBCOMP: Publish complete – (15) AUTH: Authentication
– (8) SUBSCRIBE: Subscription request

1.3. Message Quality of Service (QoS)

The MQTT protocol specifies the Quality of Service (QoS) for message delivery, ensuring reliability in message transmission under different network conditions, with QoS design being a key aspect of the MQTT protocol.

MQTT defines three QoS levels.

– QoS 0: Message delivered at most once.

– QoS 1: Message delivered at least once.

– QoS 2: Message delivered exactly once.

QoS0: “At most once”, message delivery entirely depends on the underlying TCP/IP network. Message loss may occur. A message is neither acknowledged by the receiver nor stored for retransmission by the sender. This is also known as “fire and forget”.

Quick Start Guide to MQTT and EMQ

QoS1: “At least once”, ensures message delivery, but duplicates may occur. The sender will store the sent information until it receives a PUBACK acknowledgment from the receiver at least once.

Quick Start Guide to MQTT and EMQ

QoS2: “Exactly once”, ensures the message is delivered once. If the receiver receives a QoS 2 PUBLISH message, it will process the PUBLISH message accordingly and confirm with a PUBREC message to the sender.

PUBLISH: Publish message
PUBREC: Publish received
PUBREL: Publish released
PUBCOMP: Publish complete

Quick Start Guide to MQTT and EMQ

How different QoS levels affect subscription delivery

Quick Start Guide to MQTT and EMQ

1.4. Topic Wildcard Matching Rules

Quick Start Guide to MQTT and EMQ
Quick Start Guide to MQTT and EMQ
Quick Start Guide to MQTT and EMQ
Quick Start Guide to MQTT and EMQ

2. IoT Level Messaging Middleware EMQ

2.1. Introduction to EMQX

EMQ X Broker is developed on the high-concurrency Erlang/OTP language platform, supporting millions of connections and distributed cluster architecture, and is an open-source MQTT message server based on the publish/subscribe model.

EMQ official website: https://www.emqx.cn/

Why choose EMQ X? Considering support for MQTT 5.0, stability, scalability, cluster capabilities, etc., EMQX should perform the best.

Quick Start Guide to MQTT and EMQ

Features of EMQX

  • EMQ X is currently the most popular MQTT messaging middleware in the open-source community;
  • EMQ X is the first messaging server in the open-source community to support the MQTT 5.0 protocol specification and is fully compatible with MQTT V3.1 and V3.1.1 protocols.
  • In addition to the MQTT protocol, EMQ X also supports IoT protocols such as MQTT-SN, CoAP, LwM2M, LoRaWAN, and WebSocket.
  • Single instance supports millions of connections, and clusters support tens of millions of connections; millisecond-level message forwarding.
  • Easy to install and use;
  • Local technical support services in China;
  • Extension modules and plugins, EMQ X provides a flexible extension mechanism, supporting some customized scenarios for enterprises;
  • Bridging
  • Shared subscriptions

2.2. Setting Up and Configuring the Environment

Running and Installing with Docker

Pull emqx image

[root@docker emqx]# docker pull emqx/emqx:v4.1.0

Create emqx container

[root@docker emqx]# docker run -tid --name emqx -p 1883:1883 -p 8083:8083 -p 8081:8081 -p 8883:8883 -p 8084:8084 -p 18083:18083  emqx/emqx:v4.1.0

EMQ X Directory Structure

Quick Start Guide to MQTT and EMQ

2.3. EMQ Dashboard

EMQ X provides a Dashboard for users to manage devices and monitor relevant metrics. Through the Dashboard, you can view basic server information, load conditions, and statistics, check the connection status of a specific client, and even disconnect it, as well as dynamically load and unload specified plugins.

Access address http://192.168.150.102:18083 to view the Dashboard, **default username is admin, password is public*

Quick Start Guide to MQTT and EMQ
Quick Start Guide to MQTT and EMQ
Quick Start Guide to MQTT and EMQ

2.4. Client Debugging Tool MQTTX

MQTT X is an elegant cross-platform MQTT 5.0 desktop client open-sourced by EMQ, supporting macOS, Linux, and Windows.

The UI of MQTT X adopts a chat interface style, simplifying page operation logic, allowing users to quickly create connections, save multiple clients, and easily test MQTT/MQTTS connections, as well as subscribing to and publishing MQTT messages.

Quick Start Guide to MQTT and EMQ

2.5. Delayed Messages

The delayed publishing feature of EMQ X allows for the publishing of PUBLISH messages after a user-configured time interval. The module is enabled with emqx_mod_delayed

The specific format for delayed publishing topics is as follows:

$delayed/{DelayInterval}/{TopicName}

Messages with delayed as the topic prefix will be considered messages that need to be delayed for publishing.

– {DelayInterval}: Specifies the time interval for delayed publishing of the MQTT message, in seconds, with a maximum allowable interval of 4294967 seconds.

– {TopicName}: The topic name of the MQTT message.

1. Subscribe to the topic on Websocket: t2/a

2. Publish message to the topic on Websocket: topic: $delayed/10/t2/a

Observe the running effect

Quick Start Guide to MQTT and EMQ

2.6. Shared Subscriptions

Shared subscriptions without groups

Format:

$queue/{TopicName}
Quick Start Guide to MQTT and EMQ
Quick Start Guide to MQTT and EMQ

Shared subscriptions with groups

Format:

$share/<group-name>/{TopicName}
</group-name>
Quick Start Guide to MQTT and EMQ

3. Eclipse Paho

3.1. What is Eclipse Paho

Eclipse Paho is the officially recommended Java client implementing the MQTT protocol by EMQ.

Its relationship is similar to MySQL and JDBC; our project code needs to use JDBC to connect to the database, while our project needs to use Eclipse Paho to connect to EMQX, which provides basic message sending and receiving.

3.2. Technical Research on Eclipse Paho

Integrating Eclipse Paho

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

Publishing Messages to EMQ

(1) Set up a basic Spring Boot program.

(2) Write a controller and add a method to publish messages.

@GetMapping("/publish")
public void publish() throws MqttException {

    MqttClientPersistence persistence = new MemoryPersistence(); // Memory persistence
    MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);

    // Connection options define username, password, and other configurations
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(true); // true means clear cache, non-persistent subscriber; when set to true, it is definitely a non-persistent subscriber; when set to false, it means the server retains the client's connection record
    options.setAutomaticReconnect(true); // Whether to reconnect automatically
    options.setConnectionTimeout(30); // Connection timeout in seconds
    options.setKeepAliveInterval(10); // Connection keep-alive check period in seconds
    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); // Version
    
    client.connect(options); // Connect
    client.publish("topic", "Sending content".getBytes(), 2, false);

}

Subscribing to Messages

Add a method in the controller to subscribe to messages

@GetMapping("/subscribe")
public void subscribe() throws MqttException {

    MqttClientPersistence persistence = new MemoryPersistence(); // Memory persistence
    MqttClient client = new MqttClient("tcp://192.168.200.128:1883", "abc", persistence);

    // Connection options define username, password, and other configurations
    MqttConnectOptions options = new MqttConnectOptions();
    options.setCleanSession(true); // true means clear cache, non-persistent subscriber; when set to true, it is definitely a non-persistent subscriber; when set to false, it means the server retains the client's connection record
    options.setAutomaticReconnect(true); // Whether to reconnect automatically
    options.setConnectionTimeout(30); // Connection timeout in seconds
    options.setKeepAliveInterval(10); // Connection keep-alive check period in seconds
    options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); // Version

    client.setCallback(new MqttCallbackExtended() {
        @Override
        public void connectionLost(Throwable throwable) {
            System.out.println("Connection lost!");
        }

        @Override
        public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            System.out.println( "Received message topic:" +s+"  id:"+mqttMessage.getId() +" message:"+ mqttMessage.toString());
        }

        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {

        }

        @Override
        public void connectComplete(boolean b, String s) {
            System.out.println("Connection successful!");
        }
    });
    client.connect(options); // Connect
    client.subscribe("test");  // Subscribe to topic

}

If you find this article helpful, please follow, like, and bookmark to support me, your support is my motivation!

Original content is not easy, please indicate the source when reprinting, thank you for your support! If this article is useful to you, feel free to share it!

Leave a Comment