ESP32 MicroPython MQTT Development and Testing

1. Connecting ESP32 to the Network

Before performing MQTT communication, the ESP32 needs to connect to a WiFi network. Below is the code to connect to WiFi:

import network
import time

def connect_wifi(ssid, password):
    # Create WiFi client interface
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)

    # Check if already connected
    if not wlan.isconnected():
        print('Connecting to network...')
        wlan.connect(ssid, password)

    # Wait for connection or failure
    max_wait = 10
    while max_wait > 0:
        if wlan.isconnected():
            break
        max_wait -= 1
        print('Waiting for connection...')
        time.sleep(1)

    # Handle connection result
    if wlan.isconnected():
        print('Network configuration:', wlan.ifconfig())
        return True
    else:
        print('WiFi connection failed')
        return False

# Usage example
if __name__ == "__main__":
    # Replace with your WiFi information
    WIFI_SSID = "Your WiFi Name"
    WIFI_PASSWORD = "Your WiFi Password"

    if connect_wifi(WIFI_SSID, WIFI_PASSWORD):
        print("WiFi connected successfully, ready for MQTT communication")
    else:
        print("WiFi connection failed, please check network settings")

2. Installing the MQTT Client Library

The standard library of MicroPython does not include a built-in MQTT client, so we need to use the umqtt.simple library. This library is included in the MicroPython firmware or can be installed via upip:

import upip
upip.install('micropython-umqtt.simple')

3. Implementing the MQTT Client

The MQTT client code includes connection, publishing, and subscription functionalities.

import network
import time
from umqtt.simple import MQTTClient

# WiFi connection function
def connect_wifi(ssid, password):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print('Connecting to network...')
        wlan.connect(ssid, password)
        max_wait = 10
        while max_wait > 0:
            if wlan.isconnected():
                break
            max_wait -= 1
            print('Waiting for connection...')
            time.sleep(1)
    if wlan.isconnected():
        print('Network configuration:', wlan.ifconfig())
        return True
    else:
        print('WiFi connection failed')
        return False

# MQTT configuration
MQTT_BROKER = "broker.emqx.io"  # MQTT broker address
MQTT_PORT = 1883  # MQTT broker port
MQTT_CLIENT_ID = "esp32_client_123"  # Client ID
MQTT_TOPIC_SUB = "esp32/command"  # Subscription topic
MQTT_TOPIC_PUB = "esp32/data"  # Publication topic

# MQTT message callback function
def sub_cb(topic, msg):
    print('Message received: Topic = {}, Message = {}'.format(topic, msg))
    # Handle received messages here
    if topic == MQTT_TOPIC_SUB.encode():
        if msg == b'ON':
            print('Executing light ON operation')
            # Add code to control LED or other devices here
        elif msg == b'OFF':
            print('Executing light OFF operation')
            # Add code to control LED or other devices here

# Connect to MQTT broker
def connect_mqtt():
    client = MQTTClient(MQTT_CLIENT_ID, MQTT_BROKER, MQTT_PORT)
    client.set_callback(sub_cb)  # Set callback function
    try:
        client.connect()
        print('Connected to MQTT broker')
        client.subscribe(MQTT_TOPIC_SUB)  # Subscribe to topic
        print('Subscribed to topic: {}'.format(MQTT_TOPIC_SUB))
        return client
    except Exception as e:
        print('MQTT connection failed: {}'.format(e))
        return None

# Main function
def main():
    # Connect to WiFi
    WIFI_SSID = "Your WiFi Name"
    WIFI_PASSWORD = "Your WiFi Password"

    if not connect_wifi(WIFI_SSID, WIFI_PASSWORD):
        return

    # Connect to MQTT
    mqtt_client = connect_mqtt()
    if not mqtt_client:
        return

    # Publish and subscribe loop
    try:
        name = "Alice"
        age = 30

        # Using f-string
        formatted_string = f"{name} is {age} years old."
        print(formatted_string)  # Output: Alice is 30 years old.

        # Using str.format()
        formatted_string = "{} is {} years old.".format(name, age)
        print(formatted_string)  # Output: Alice is 30 years old.

        # Using % operator
        formatted_string = "{%s is %d years old.}" % (name, age)
        print(formatted_string)  # Output: Alice is 30 years old.

        while True:
            # Check for new messages
            mqtt_client.check_msg()
            temperature_value = 25.5
            humidity_value = 60.0
            # Publish data
            sensor_data = "{\"temperature\": %f, \"humidity\": %f}" % (temperature_value, humidity_value)
            mqtt_client.publish(MQTT_TOPIC_PUB, str(sensor_data))
            print('Data published: {}'.format(sensor_data))

            # Wait for a while
            time.sleep(5)
    finally:
        # Disconnect
        mqtt_client.disconnect()
        print('MQTT connection has been disconnected')

# Run main function
if __name__ == "__main__":
    main()

Test Error:

ESP32 MicroPython MQTT Development and Testing

<span>MQTT client library needs to be installed</span>

ESP32 MicroPython MQTT Development and Testing

Test Result:

ESP32 MicroPython MQTT Development and Testing

Leave a Comment