MQTT is a widely used messaging protocol in IoT projects, utilizing a publish–subscribe model for message communication. Below, we will demonstrate how to implement publishing–subscribe message data to an MQTT server using Java. First, include the pom dependency:
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version></dependency>
Publish a JSON message to vehicle/monitor/{userId} as shown in the demo below:
public static void main(String[] args) throws Exception { String broker = "url"; String clientId = MqttClient.generateClientId(); MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setCleanSession(true); options.setMaxInflight(1000); options.setConnectionTimeout(60); options.setKeepAliveInterval(10); options.setUserName("test01"); options.setPassword("Gvddd".toCharArray()); // Note: it's char[] mqttClient.connect(options); JSONObject data = new JSONObject(); data.put("a", 1); data.put("b", 222); MqttMessage message = new MqttMessage(data.toJSONString().getBytes()); message.setQos(1); // QoS 1: At least once delivery if(!mqttClient.isConnected()){ mqttClient.reconnect(); } Integer userId = 1; mqttClient.publish("vehicle/monitor/" + userId, message);}
The above code demonstrates pushing a JSON data to vehicle/monitor/1:
{“a”:1,”b”:222}
To subscribe to the message results, we run the following demo:
private static final Logger LOG = LoggerFactory.getLogger(Subcribe.class); // MQTT Broker configuration private static final String BROKER_URL = ""; private static final String CLIENT_ID = MqttClient.generateClientId(); private static final String TOPICS = "vehicle/monitor/#"; private static final int QOS_LEVELS = 1; // QoS level for each topic private static final String USERNAME = "test01"; private static final String PASSWORD = " Gvddd"; public static void main(String[] args) { try { // Use memory persistence MemoryPersistence persistence = new MemoryPersistence(); // Create client instance IMqttClient client = new MqttClient(BROKER_URL, CLIENT_ID, persistence); // Set connection parameters MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(USERNAME); options.setPassword(PASSWORD.toCharArray()); options.setCleanSession(true); options.setKeepAliveInterval(30); options.setAutomaticReconnect(true); // Automatic reconnect // Set callback listener client.setCallback(new MqttCallback() { @Override public void connectionLost(Throwable cause) { LOG.error("Connection lost: ", cause); } @Override public void messageArrived(String topic, MqttMessage message) throws Exception { LOG.info("Received message from topic '{}': {}", topic, new String(message.getPayload())); } @Override public void deliveryComplete(IMqttDeliveryToken token) { // As a subscriber, this method does not need to be implemented } }); // Connect and subscribe to the topic client.connect(options); LOG.info("MQTT client connected to broker."); // Subscribe to the topic client.subscribe(TOPICS, QOS_LEVELS, (topic, qos) -> { LOG.info("Subscribed to topic: {} with QoS: {}", topic, qos); }); LOG.info("Now listening for messages..."); } catch (MqttException e) { LOG.error("MQTT Subscriber error", e); }}
The client API here uses a callback method that prints output when a message is consumed.
Note that if the message subscription program is started after the publishing program, the consumer will not receive the messages. The consumer can only receive messages published after the program starts.