Introduction to MQTT
MQTT (Message Queuing Telemetry Transport) is a lightweight messaging protocol based on the publish/subscribe model, designed for low-bandwidth, high-latency, or unreliable network environments. It is widely used in the Internet of Things (IoT), mobile applications, and distributed systems to enable efficient communication between devices. By reducing data transmission volume and simplifying communication processes, it ensures reliable message delivery while supporting various Quality of Service (QoS) levels to meet different business needs.
Observability Cloud
Observability Cloud is a full-link observability product designed for IT engineers, integrating infrastructure monitoring, application performance monitoring, and log management to provide real-time observability across the entire tech stack. This product helps engineers gain a comprehensive understanding of end-to-end user experience tracking, monitor every function call within applications, and fully oversee infrastructure in the cloud era. Additionally, Observability Cloud has the capability to quickly identify system security risks, providing security assurance for the digital age.
Func Platform
The Observability Cloud Func Platform (DataFlux Func) is a Python-based function computing and data processing development platform aimed at helping users quickly build, manage, and execute data processing tasks. It consists mainly of a Server (providing Web UI and API interfaces) and a Worker (providing a Python script execution environment). The platform supports multi-source data integration, with a rich set of built-in data source connectors for rapid data aggregation and processing. Furthermore, the Func platform features task scheduling, API interface publishing, and supports synchronous, asynchronous, and scheduled calls, greatly simplifying the development process. Through deep integration with Observability Cloud, users can leverage its powerful observability capabilities for real-time data processing, analysis, and visualization.
Data from various sensors is received, processed, and reported via the MQTT protocol. The Func platform consumes data from the MQTT protocol and reports it to the Observability Cloud platform for storage, display, analysis, and alerting.
Practical Application
-
Scenario: Collecting temperature data from a Linux host and reporting it to the Observability Cloud.
-
Preparation: EMQX, which receives data via the MQTT protocol and provides it for client consumption.
Temperature Collection
Temperature collection by sensors is the process of converting temperature changes into electrical signals through physical or chemical principles. Common temperature sensors include thermistors (NTC/PTC), thermocouples, thermistors, and digital temperature sensors (such as DS18B20). These sensors output voltage, current, or digital signals that are proportional to temperature or follow specific functional relationships by sensing changes in environmental temperature. After amplification, filtering, and analog-to-digital conversion, these signals can be read by microcontrollers or data acquisition systems, enabling real-time temperature monitoring and recording.
Executing sensors
in the Shell can retrieve current temperature information for the CPU, hard drive, etc.
Obtaining temperature information via a script:
import psutil
def get_system_temperatures(): temps = psutil.sensors_temperatures() if not temps: return "No temperature sensors found." result = "" for chip, sensors in temps.items(): result += f"{chip}:
" for sensor in sensors: result += f" {sensor.label or 'Sensor'}: {sensor.current}°C (high={sensor.high}, critical={sensor.critical})
" return result
# Call the function and print the result
system_temps = get_system_temperatures()
print(system_temps)
Notes:
-
psutil.sensors_temperatures()
returns information about all temperature sensors in the system. -
Each sensor has attributes such as
current
(current temperature),high
(high-temperature warning value), andcritical
(critical temperature).
Reporting to EMQX
import psutil
import paho.mqtt.client as mqtt
import time
import json
def get_system_temperatures(): temps = psutil.sensors_temperatures() if not temps: return "No temperature sensors found." result = {} for chip, sensors in temps.items(): result[chip] = [] for sensor in sensors: if sensor.label is not None and sensor.label != "": result[chip].append({ 'host': "liurui", 'label': sensor.label, 'current': sensor.current, 'high': sensor.high, 'critical': sensor.critical }) return result
def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}")
def publish_temperatures(temps, broker, port, topic): client = mqtt.Client() client.on_connect = on_connect client.connect(broker, port, 60) client.loop_start() # Package all sensor data into a JSON object payload = json.dumps(temps) client.publish(topic, payload) print(f"Published: {payload}") client.loop_stop()
if __name__ == "__main__": broker = "1.1.1.1" # Replace with your EMQX broker address port = 1883 # Default MQTT port topic = "temperature" # Replace with your desired topic
while True: system_temps = get_system_temperatures() if isinstance(system_temps, str): print(system_temps) else: publish_temperatures(system_temps, broker, port, topic) time.sleep(5) # Collect and report every 5 seconds
Applying for Observability Cloud API Key
Log in to the Observability Cloud console, click on the menu “Management” – “API Key Management”, and create a new API Key.
Save the Key ID and Key, as they will be needed for the Func platform later.
Func Consuming MQTT Data
1. Create a Script Set
2. Create a Script
The script content is as follows:
import json
guance = DFF.CONN('GuanceAPI')
@DFF.API('Message Handler')
def message_handler(topic, message): print(f"Received message: {message} on topic {topic}") if topic == "temperature": parse_and_print_temperatures(message)
def parse_and_print_temperatures(temps_json): # Parse JSON string into Python dictionary temps_dict = json.loads(temps_json) result = [] # Iterate over each chip for chip, sensors in temps_dict.items(): print(f"Chip: {chip}") # Iterate over each sensor for sensor in sensors: result.append({ 'measurement': 'temperature', 'tags': { 'host': sensor['host'], 'chip': chip, 'label': sensor['label'] }, 'fields': { 'current': sensor['current'], 'high': sensor['high'], 'critical': sensor['critical'] } }) uploadDataKit(result)
def uploadDataKit(data): # Get DataKit operation object status_code, result = guance.dataway.write_by_category_many(category='metric', data=data) print(f"Report result: {status_code}")
3. Publish the Script
Click the publish button to publish.
4. Create an Observability Cloud Connector
-
Type: Observability Cloud
-
ID: GuanceAPI
Note: The ID must match the ID in the script guance = DFF.CONN('GuanceAPI')
, and other fields should be filled in according to actual conditions.
5. Create an MQTT Connector
-
Type: MQTT Broker (v5.0)
-
Fill in ID, host, and port
-
Select the topic and the corresponding script for topic consumption
-
Click to test connectivity to ensure MQTT can connect properly
-
Click save to complete
Effect Demonstration
After reporting data to the Observability Cloud, you can view the temperature trend chart using the following DQL statement on the dashboard.
M::`temperature`:(last(`current`)) BY `chip`, `label`
Conclusion
By using the Observability Cloud Func platform to receive metrics, logs, and link data from the MQTT protocol, and encapsulating them through the API provided by Observability Cloud, data can be reported to the Observability Cloud platform for unified management, visual analysis, alerting, and more.
Click Read Original to learn more about Observability Cloud