
What to do when you encounter difficulties during development?
Supports text and voice direct conversations
π Highly recommended π
24h online assistant
Recent Popular Articles:
π₯Integrating UReport2 in RuoYi Frontend and Backend Separation (154)
π₯Integrating MyBatis-Plus-Ext Tutorial in RuoYi Separation Version (153)
π₯Integrating Electron to Achieve Desktop Applications in RuoYi Separation Version (139)
π₯Upgrading Spring Boot to Latest Version 3.x in RuoYi (131)
π₯Step-by-step Guide to Start RuoYi Microservice Project (110)
π₯Starting RuoYi-App on Server (108)
π₯Using RuoYi to Write WeChat Mini Program Login Authorization Interface (106)
Body begins β‘οΈ
MQTT
is a lightweight messaging protocol (Message Queuing Telemetry Transport
) designed for low bandwidth and high latency communication between devices. It is a message protocol based on the Publish/Subscribe
model, originally developed by IBM, and has now become an open standard widely used in the Internet of Things (IoT
) field.
Features of MQTT
include:
-
Lightweight: The
MQTT
protocol is designed simply, with lightweight message headers, suitable for constrained environments such as sensors and embedded devices. -
Easy to Use:
MQTT
uses a publish/subscribe model, decoupling the sender (publisher) and receiver (subscriber) of messages, making the communication process simple and easy to understand. -
Low Bandwidth, High Latency: The
MQTT
protocol is designed considering limited network bandwidth and high latency, maintaining stable message transmission in less than ideal network environments. -
Reliability:
MQTT
supports message persistence and acknowledgment mechanisms, ensuring reliable message transmission, and providesQoS
(Quality of Service
) levels that can be flexibly configured based on actual needs. -
Flexibility:
MQTT
supports various message formats and payload types, allowing the transmission of text, binary data, and more, while supportingSSL/TLS
encryption to ensure communication security. -
Applicable to Various Scenarios: Due to its lightweight and flexibility,
MQTT
is widely used in IoT, sensor networks, remote monitoring, message notifications, and other scenarios, becoming one of the important communication protocols connecting devices.
The steps to integrate MQTT
in RuoYi Frontend and Backend Separation are as follows:
1. Download the MQTT X
testing tool and install it.
Official website:
https://www.emqx.com/zh/try?product=emqx-ecp
2. Open MQTT X
, create a new connection, enter the name, and click connect.
3. Add dependencies in the pom.xml
under the ruoyi-common
module.
<!--mqttδΎθ΅-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
4. Add relevant configurations in application.yml
under Spring
configuration.
# mqtt
mqtt:
username: admin # Username
password: admin # Password
hostUrl: tcp://broker.emqx.io:1883 # tcp://ip:port
clientId: clientId # Client ID
defaultTopic: topic,topic1 # Subscribed topics
timeout: 100 # Timeout (seconds)
keepalive: 60 # Heartbeat (seconds)
enabled: true # Whether to use MQTT functionality
5. Create a mqtt
folder under ruoyi-common/src/main/java/com/ruoyi/common/utils
directory and add the following three files:
MqttConfig.java
package com.ruoyi.common.utils.mqtt;
import com.ruoyi.common.utils.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("spring.mqtt")
public class MqttConfig {
@Autowired
private MqttPushClient mqttPushClient;
/**
* Username
*/
private String username;
/**
* Password
*/
private String password;
/**
* Connection address
*/
private String hostUrl;
/**
* Client ID
*/
private String clientId;
/**
* Default connection topic
*/
private String defaultTopic;
/**
* Timeout
*/
private int timeout;
/**
* Keep alive
*/
private int keepalive;
/**
* MQTT functionality enable
*/
private boolean enabled;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
public String getHostUrl() {
return hostUrl;
}
public void setHostUrl(String hostUrl) {
this.hostUrl = hostUrl;
}
public String getClientId() {
return clientId;
}
public void setClientId(String clientId) {
this.clientId = clientId;
}
public String getDefaultTopic() {
return defaultTopic;
}
public void setDefaultTopic(String defaultTopic) {
this.defaultTopic = defaultTopic;
}
public int getTimeout() {
return timeout;
}
public void setTimeout(int timeout) {
this.timeout = timeout;
}
public int getKeepalive() {
return keepalive;
}
public void setKeepalive(int keepalive) {
this.keepalive = keepalive;
}
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
@Bean
public MqttPushClient getMqttPushClient() {
if(enabled == true){
String mqtt_topic[] = StringUtils.split(defaultTopic, ",");
mqttPushClient.connect(hostUrl, clientId, username, password, timeout, keepalive);//connect
for(int i=0; i<mqtt_topic.length; i++){
mqttPushClient.subscribe(mqtt_topic[i], 0);//subscribe topic
}
}
return mqttPushClient;
}
}
MqttPushClient.java
package com.ruoyi.common.utils.mqtt;
import com.ruoyi.common.core.domain.AjaxResult;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import static com.ruoyi.common.core.domain.AjaxResult.error;
import static com.ruoyi.common.core.domain.AjaxResult.success;
@Component
public class MqttPushClient {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
private static MqttClient getClient() {
return client;
}
private static void setClient(MqttClient client) {
MqttPushClient.client = client;
}
/**
* Client connection
*
* @param host ip+port
* @param clientID Client ID
* @param username Username
* @param password Password
* @param timeout Timeout
* @param keepalive Keep alive
*/
public void connect(String host, String clientID, String username, String password, int timeout, int keepalive) {
MqttClient client;
try {
client = new MqttClient(host, clientID, new MemoryPersistence());
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
MqttPushClient.setClient(client);
try {
client.setCallback(pushCallback);
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Publish
*
* @param qos Connection method
* @param retained Whether to retain
* @param topic Topic
* @param pushMessage Message body
*/
public AjaxResult publish(int qos, boolean retained, String topic, String pushMessage) {
MqttMessage message = new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mTopic = MqttPushClient.getClient().getTopic(topic);
if (null == mTopic) {
logger.error("topic not exist");
}
MqttDeliveryToken token;
try {
token = mTopic.publish(message);
token.waitForCompletion();
return success();
} catch (MqttPersistenceException e) {
e.printStackTrace();
return error();
} catch (MqttException e) {
e.printStackTrace();
return error();
}
}
/**
* Subscribe to a topic
*
* @param topic Topic
* @param qos Connection method
*/
public void subscribe(String topic, int qos) {
logger.info("Starting to subscribe to topic " + topic);
try {
MqttPushClient.getClient().subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
}
PushCallback.java
package com.ruoyi.common.utils.mqtt;
import com.alibaba.fastjson2.JSONObject;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class PushCallback implements MqttCallback {
private static final Logger logger = LoggerFactory.getLogger(MqttPushClient.class);
@Autowired
private MqttConfig mqttConfig;
private static MqttClient client;
private static String _topic;
private static String _qos;
private static String _msg;
@Override
public void connectionLost(Throwable throwable) {
// Reconnect after connection loss
logger.info("Connection lost, can reconnect");
if (client == null || !client.isConnected()) {
mqttConfig.getMqttPushClient();
}
}
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
// Messages received after subscribing will execute here
logger.info("Received message topic : " + topic);
logger.info("Received message Qos : " + mqttMessage.getQos());
logger.info("Received message content : " + new String(mqttMessage.getPayload()));
_topic = topic;
_qos = mqttMessage.getQos() + "";
_msg = new String(mqttMessage.getPayload());
}
@Override
public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
logger.info("deliveryComplete---------" + iMqttDeliveryToken.isComplete());
}
// Other controllers will call this method to get received hardware data
public String receive() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("topic", _topic);
jsonObject.put("qos", _qos);
jsonObject.put("msg", _msg);
return jsonObject.toString();
}
}
Note: The areas marked in red in the project can be resolved by pressing Alt+Enter
to import dependencies.
6. Run the RuoYi project, the backend can receive data sent by MQTTX
.


