Implementing Edge Device Data Collection with Python, MQTT, and MySQL

Steps

This is the tool I use to view data in the EMQ server, MQTTX, which is quite handy.

Implementing Edge Device Data Collection with Python, MQTT, and MySQL

First, we need these libraries. It’s quite simple to download them, just search for installation on Baidu, and after installation, import them.

Implementing Edge Device Data Collection with Python, MQTT, and MySQL
Implementing Edge Device Data Collection with Python, MQTT, and MySQL

Taking the topic “weight_pub” as an example, we first retrieve MQTT data. The function API usage example is as follows:

Implementing Edge Device Data Collection with Python, MQTT, and MySQL
##MQTT operation
def MQTTWeight():    # config._init()                      client_weight = mqtt.Client()    client_weight.on_connect = on_connect_weight    client_weight.on_message = on_message_weight    client_weight.connect('**.**.***.***',****, 600) # 600 is the keepalive interval    client_weight.subscribe('weight_pub', qos=0)

This function indicates that we have successfully connected to the EMQ server on the weight_pub topic and will echo back to the user to confirm the successful connection.

Implementing Edge Device Data Collection with Python, MQTT, and MySQL
def on_connect_weight(client, userdata, flags, rc):

Next, we handle the data we receive in the on_message_weight(client, userdata, msg) function.

Implementing Edge Device Data Collection with Python, MQTT, and MySQL
def on_message_weight(client, userdata, msg):    var = str(msg.payload)    if(get_value('WriteEnable') == 1):        set_value('weight',(GetWeight(var)))        if(get_value('weight') > 10):            total_data['weight'] -= 2.75

At this point, data retrieval is complete. Next, we will perform database record modification. I have already created the records in the database, so we just need to modify the corresponding records. Python will find the relevant record through id.

Implementing Edge Device Data Collection with Python, MQTT, and MySQL
##Database operation
def updateHandPart():    if(get_value('heartRate') > 0):        conn = get_conn()        cur = conn.cursor()        sql = 'UPDATE bodyhealthmonitor SET heartRate=%s WHERE id = %s';        args = (get_value('heartRate'), total_data['id'])        result = cur.execute(sql,args)        total_data['heartRate'] = 0        # print(result,end=';')        sql = 'UPDATE bodyhealthmonitor SET diastolicPressure=%s WHERE id = %s';        args = (get_value('diastolicPressure'), total_data['id'])        result = cur.execute(sql,args)        total_data['diastolicPressure'] = 0        # print(result,end=';')        sql = 'UPDATE bodyhealthmonitor SET systolicPressure=%s WHERE id = %s';        args = (get_value('systolicPressure'), total_data['id'])        total_data['systolicPressure'] = 0        result = cur.execute(sql,args)        # print(result)        conn.commit()        cur.close()

This function is to connect to the database.

Implementing Edge Device Data Collection with Python, MQTT, and MySQL
def get_conn():    conn = pymysql.connect(host='*****', port=*****, user='*****', passwd='*****', db='*****')    return conn

Finally, in our main function, we open a thread to retrieve data and then process it.

Implementing Edge Device Data Collection with Python, MQTT, and MySQL
#***** represents the user's own data
total_data = {'WriteEnable':0, 'weight': 0.00, 'diastolicPressure': 0,                 'heartRate': 0, 'systolicPressure':0,                 'height':0.00, 'bloodOxygen':0.00, 'BMI':0.00,                 'idealWeight':0.00, 'temperature':0.00,'id':1000}

def set_value(name, value):    total_data[name] = value
def get_value(name, defValue=None):    try:        return total_data[name]    except KeyError:        return defValue
#Retrieve weight
def GetWeight(str):    var = ''    cnt = 0    for char in str:        cnt = cnt + 1        if char >= '0' and char <= '9' or char == '.':            var += char    return float(var)
def on_connect_weight(client, userdata, flags, rc):    print("Connected with weight result code: " + str(rc))
def on_message_weight(client, userdata, msg):    var = str(msg.payload)    if(get_value('WriteEnable') == 1):        set_value('weight',(GetWeight(var)))        if(get_value('weight') > 10):            total_data['weight'] -= 2.75        # print ( 'wei :%.2f' % total_data['weight'] ,end=':')   ##MQTT operation
def MQTTWeight():    # config._init()                      client_weight = mqtt.Client()    client_weight.on_connect = on_connect_weight    client_weight.on_message = on_message_weight    client_weight.connect('**.**.***.***',****, 600) # 600 is the keepalive interval    client_weight.subscribe('weight_pub', qos=0)    client_weight.loop_forever() # Keep connection

def get_conn():    conn = pymysql.connect(host='*****', port=*****, user='*****', passwd='*****', db='*****')    return conn
##Database operation
def updateHandPart():    if(get_value('heartRate') > 0):        conn = get_conn()        cur = conn.cursor()        sql = 'UPDATE bodyhealthmonitor SET heartRate=%s WHERE id = %s';        args = (get_value('heartRate'), total_data['id'])        result = cur.execute(sql,args)        total_data['heartRate'] = 0        # print(result,end=';')        sql = 'UPDATE bodyhealthmonitor SET diastolicPressure=%s WHERE id = %s';        args = (get_value('diastolicPressure'), total_data['id'])        result = cur.execute(sql,args)        total_data['diastolicPressure'] = 0        # print(result,end=';')        sql = 'UPDATE bodyhealthmonitor SET systolicPressure=%s WHERE id = %s';        args = (get_value('systolicPressure'), total_data['id'])        total_data['systolicPressure'] = 0        result = cur.execute(sql,args)        # print(result)        conn.commit()        cur.close()        conn.close()
if __name__ == '__main__':    try:        thread_weight = threading.Thread(target=MQTTWeight)        thread_weight.setDaemon(True)        thread_weight.start()
        thread_height = threading.Thread(target=MQTTHeight)        thread_height.setDaemon(True)        thread_height.start()
        thread_blood = threading.Thread(target=MQTTBoold)        thread_blood.setDaemon(True)        thread_blood.start()
        var = 1        while(var == 1):             time.sleep(1)#Delay 1s to check data            if(get_value('WriteEnable') == 1):                update()                    total_data['WriteEnable'] = 0            else:                updateHandPart()                          
    except KeyboardInterrupt:        stop_thread(thread_weight)        stop_thread(thread_height)        stop_thread(thread_blood)        print('end')

(づ ̄3 ̄)づ╭❤~One click three connections, this time for sure (๑•̀ㅂ•́)و✧

Implementing Edge Device Data Collection with Python, MQTT, and MySQL
The course “MATLAB+ROS Development of MPC Trajectory Tracking Algorithm” will introduce the MPC (Model Predictive Control) trajectory tracking algorithm commonly used in mobile robots and unmanned vehicles based on this technology route.
Implementing Edge Device Data Collection with Python, MQTT, and MySQL

(Scan the QR code for course details)

Leave a Comment