IoT Data Platform: Building a Real-Time Monitoring System with MQTT and InfluxDB
It was a sultry summer last year when our team took on a large factory’s equipment monitoring project. The outdated system used MySQL to store sensor data collected every 5 minutes, resulting in the database crashing when operations wanted to view real-time trends from the past month. “Isn’t there a way to ensure both real-time performance and elegantly handle time-series data?” The project manager looked at me with anticipation.
This scenario is actually quite common in the IoT field. Traditional databases are not designed for the high-frequency time-series data typical of IoT. After several rounds of technology selection, we ultimately chose the combination of MQTT and InfluxDB, which yielded stunning results.
Why Traditional Solutions Fail
Let me start with a real error example:
# This is an early version of our code, a disaster in itself
def store_sensor_data(sensor_id, value):
cursor.execute(
"INSERT INTO sensors_data (sensor_id, value, timestamp) VALUES (%s, %s, NOW())",
(sensor_id, value)
)
# Each data point is committed, a performance disaster
db.commit()
# Every query is a full table scan
cursor.execute("SELECT * FROM sensors_data WHERE sensor_id = %s ORDER BY timestamp DESC", (sensor_id,))
Do you see the problem? Each write commits a transaction, and the queries have no time range restrictions. When we upgraded from collecting data every 5 minutes to every second, 10 devices generated 864,000 records in a day! Performance plummeted.
MQTT: The Nervous System of IoT
During the refactoring, I first introduced MQTT as the message bus. MQTT is a lightweight publish/subscribe protocol designed specifically for IoT, efficiently transmitting signals like a nervous system.
# Using Paho MQTT client
import paho.mqtt.client as mqtt
import json
import time
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connection successful!")
else:
print(f"Connection failed, return code: {rc}")
time.sleep(5) # Give yourself some coffee time :)
client = mqtt.Client()
client.on_connect = on_connect
client.connect("broker.example.com", 1883, 60)
# Publish sensor data
def publish_sensor_data(sensor_id, value):
payload = json.dumps({"value": value, "timestamp": time.time()})
client.publish(f"sensors/{sensor_id}", payload)
However, MQTT only solved the data transmission issue; we also needed a storage solution capable of handling time-series data.
InfluxDB: The Perfect Home for Time-Series Data
At this point, InfluxDB made its grand entrance – it is like a database tailor-made for IoT data. Compared to traditional relational databases, InfluxDB offers approximately 10-15 times better performance when handling time-series data (tested on Intel i7-9700K, 32GB RAM environment).
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# Connect to InfluxDB
client = InfluxDBClient(
url="http://localhost:8086",
token="my-token",
org="my-org"
)
write_api = client.write_api(write_options=SYNCHRONOUS)
# Store data in MQTT callback
def on_message(client, userdata, msg):
try:
# Parse sensor ID
sensor_id = msg.topic.split("/")[1]
data = json.loads(msg.payload)
# Create data point
point = Point("sensor_readings") \
.tag("sensor_id", sensor_id) \
.field("value", float(data["value"])) \
.time(int(data["timestamp"] * 1000000000))
write_api.write(bucket="sensors", record=point)
except json.JSONDecodeError:
# Never trust the format of sensor data
print(f"Invalid JSON: {msg.payload}")
except Exception as e:
# There are always some unexpected exceptions...
print(f"Processing error: {e}")
Key Point: Note that the timestamp here must be converted to nanosecond precision, which is a requirement for InfluxDB 2.x (1.x is microsecond precision). Details like these, which are often overlooked, can be the root cause of failures in production environments.
Query Optimization: A World of Performance Difference
The performance of queries before and after refactoring is like night and day. Here is a comparison of aggregation queries for 1 million data points in an actual project:
- MySQL: 15.7 seconds
- InfluxDB: 0.34 seconds
Key code:
from influxdb_client import InfluxDBClient
query_api = client.query_api()
# Efficient downsampling query
def get_sensor_data(sensor_id, start_time, end_time, interval="1h"):
query = f'''
from(bucket: "sensors")
|> range(start: {start_time}, stop: {end_time})
|> filter(fn: (r) => r["sensor_id"] == "{sensor_id}")
|> aggregateWindow(every: {interval}, fn: mean)
'''
result = query_api.query(query)
# Process results...
return processed_results
Using the Flux query language (introduced in InfluxDB 2.0+) is more flexible and powerful than InfluxQL. For developers with an SQL background, this requires a shift in thinking. I also spent two full days adapting to this “pipeline-style” query mindset, but once mastered, you’ll find this declarative syntax exceptionally intuitive for handling time-series data.
Real Benefits: More Than Just a Technical Upgrade
The changes brought by this architecture go far beyond the technical level:
- Storage costs reduced by approximately 70% (thanks to InfluxDB’s compression algorithms)
- Query response times changed from “time for a coffee” to “in the blink of an eye”
- The operations team can finally set up millisecond-level real-time alerts
- Data retention policies can be flexibly configured, separating hot and cold data
I remember a colleague from operations saw the new system and his first reaction was: “This thing is faster than my girlfriend’s temper!”
Avoiding Pitfalls
If you plan to use this solution, please pay attention to these pitfalls we encountered:
- MQTT QoS Selection: Do not blindly use QoS 2; in most IoT scenarios, QoS 1 is sufficient and more efficient
- InfluxDB Sharding Design: Design reasonably based on data retention needs to avoid excessive sharding
- Memory Control: InfluxDB is a memory hog; it is recommended to have at least 8GB of memory in production environments
- Batch Writing: Always use the batch write API; single writes will make you regret it
As Paul Dix, the founder of InfluxData, said: “Time-series data needs special treatment.” (From his 2018 InfluxDays talk)
In the wave of IoT, data is the real oil. And the combination of MQTT and InfluxDB is the most efficient drilling platform.