IoT Gateway Development: Design Process Based on MQTT Message Bus (Part 2)

Dao Ge’s 022nd Original Article

  • 1. Introduction

  • 2. MQTT Connection with Cloud Platform

  • 3. Proc_Bridge Process: The Bridge Between External and Internal Message Buses

  • 4. Conclusion

1. Introduction

In the previous article “IoT Gateway Development: Design Process Based on MQTT Message Bus (Part 1)”, we discussed how to utilize the MQTT message bus in an IoT system’s gateway to achieve inter-process communication within an embedded system.

The main advantages of this communication model are:

  1. Decoupling between modules;
  2. Parallel development of each module;
  3. Delegating TCP connection and packet fragmentation issues to the message bus, allowing us to focus on business logic;
  4. Ease of debugging;

This describes how processes communicate within an embedded system; how does the gateway interact with the cloud platform?

As mentioned in the previous article, the communication method between the gateway and the cloud platform is generally specified by the client, with only a few options available (Alibaba Cloud, Huawei Cloud, Tencent Cloud, Amazon AWS). Typically, it requires a persistent connection between the gateway and the cloud platform, allowing various commands from the cloud to be sent to the gateway at any time.

In this article, we will discuss this part.

Reply with mqtt on our public account to obtain the link to the example code.

2. MQTT Connection with Cloud Platform

Currently, major IoT cloud platforms provide different access methods. For gateways, the most commonly used is MQTT access.

We know that MQTT is just a protocol, and there are implementations in various programming languages, including several in C.

Inside the gateway, there is a background daemon: MQTT Broker, which is actually the executable program mosquitto, serving the function of a message bus. Please note: because this message bus runs inside the embedded system, the clients connecting to the bus are those processes that need to communicate with each other. The number of these processes is limited; even in a relatively complex system, it is usually only a dozen or so processes. Therefore, the mosquitto implementation can fully support the system load.

If we deploy an MQTT Broker in the cloud, theoretically, we can use mosquitto as the message bus, but you need to evaluate the scale of the connecting clients (i.e., the gateways) and consider concurrency issues, and perform stress testing.

I don’t have much experience in backend development, so I dare not (and cannot) speak too much, as misleading everyone would be a sin. However, for general learning and testing purposes, deploying mosquitto directly in the cloud as a message bus is perfectly fine.

3. Proc_Bridge Process: The Bridge Between External and Internal Message Buses

The following diagram illustrates the role of the Proc_Bridge process in this model:

IoT Gateway Development: Design Process Based on MQTT Message Bus (Part 2)
  1. Messages received from the cloud platform’s message bus need to be forwarded to the internal message bus;
  2. Messages received from the internal message bus need to be forwarded to the cloud platform’s message bus;

If we use mosquitto to implement this, how should we do it?

1. mosquitto’s API Interface

The mosquitto implementation operates based on a callback function mechanism, for example:

// Callback function when connected successfullyvoid my_connect_callback(struct mosquitto *mosq, void *obj, int rc){    // ...}// Callback function when connection failsvoid my_disconnect_callback(struct mosquitto *mosq, void *obj, int result){    // ...}// Callback function when a message is receivedvoid my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message){  // ..}int main(){    // Other code    // ...        // Create a mosquitto object    struct mosquitto g_mosq = mosquitto_new("client_name", true, NULL);        // Register callback functions    mosquitto_connect_callback_set(g_mosq, my_connect_callback);    mosquitto_disconnect_callback_set(g_mosq, my_disconnect_callback);    mosquitto_message_callback_set(g_mosq, my_message_callback);    // Other callback function settings      // Start connecting to the message bus    mosquitto_connect(g_mosq, "127.0.0.1", 1883, 60);      while(1)    {      int rc = mosquitto_loop(g_mosq, -1, 1);      if (rc) {        printf("mqtt_portal: mosquitto_loop rc = %d \n", rc);        sleep(1);        mosquitto_reconnect(g_mosq);      }    }    mosquitto_destroy(g_mosq);    mosquitto_lib_cleanup();    return 0;}

The above code is the simplest code for a mosquitto client, using a callback function mechanism makes program development very simple.

Mosquitto handles all the underlying detail issues for us; as long as the registered functions are called, it indicates that an event of interest has occurred.

This callback mechanism is widely used in various open-source software, such as: timers in glib, communication handling in libevent, data processing in libmodbus, and driver development and timers in the Linux kernel. This approach is universal!

In each process within the gateway, simply adding the above code allows it to mount onto the message bus, enabling data transmission with other processes.

2. Using UserData Pointer to Achieve Multiple MQTT Connections

The above example only connects to one message bus; for an ordinary process, this achieves the communication goal.

However, for the Proc_Bridge process, this goal hasn’t been reached, as this process needs to connect to both the remote and local message buses simultaneously. How can we achieve this?

Let’s look at the signature of the mosquitto_new function:

/* * obj - A user pointer that will be passed as an argument to any *      callbacks that are specified.*/libmosq_EXPORT struct mosquitto *mosquitto_new(const char *id, bool clean_session, void *obj);

Thus, we can define a structure variable that records all information of an MQTT connection, and register it with mosquitto. When mosquitto calls our registered callback functions, it will pass the pointer of this structure variable back to us, thus we can obtain all the data of this connection. In a way, this is also an object-oriented approach.

// Structure representing an MQTT connectiontypedef struct{  char *id;  char *name;  char *pw;  char *host;  int port;  pthread_t tHandle;  struct mosquitto *mosq;  int mqtt_num;}MQData;

The complete code has been placed on the cloud disk; to help you understand the principles, I have pasted key parts of the code here:

// Allocate structure variableMQData userData = (MQData *)malloc(sizeof(MQData));// Set parameters belonging to this connection: id, name, etc.// Create mosquitto object and pass userData.struct mosquitto *mosq = mosquitto_new(userData->id, true, userData);// In the callback function, cast obj pointer back to MQData pointerstatic void messageCB(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message){  MQData *userData = (MQData *)obj;    // Now you can distinguish which connection this is based on userData pointer.}

Another question: have you noticed the mosquitto_loop() function in the example? This function needs to be called continuously in a while loop to trigger the internal events of mosquitto. (Actually, mosquitto also provides another simplified function mosquitto_loop_forever).

This means that in each connection, it is necessary to continuously trigger the underlying events of mosquitto to ensure the message system can send and receive messages smoothly. Therefore, in the example code, two threads are used to connect to both the cloud platform’s bus and the internal bus.

4. Conclusion

After these two articles, we have basically covered the basic communication model in an IoT system gateway, which is essentially the skeleton of a program; the remaining tasks involve handling the details of the business logic.

This is just the first step in a long journey!

For a gateway, there are many more issues to address, such as: MQTT connection authentication (username + password, certificates), serialization and deserialization of communication data, encryption and decryption, etc. We will discuss these slowly in the future, and I hope we can move forward together!

Reply with mqtt on our public account to obtain the link to the example code.

[Original Declaration]

IoT Gateway Development: Design Process Based on MQTT Message Bus (Part 2)

Reprint: Feel free to reprint, but without the author’s consent, this statement must be retained, and the original link must be provided in the article.Not boasting, not hyping, not exaggerating, seriously writing every article! Feel free to forward and share with your technical friends around you; Dao Ge expresses heartfelt thanks here!Recommended Reading

My favorite inter-process communication method – message busC language pointers – from underlying principles to fancy techniques, illustrated step by step – How to implement object-oriented programming in C to elevate code quality: macro definitions – from entry to abandonmentIt turns out that the underlying debugging principles of gdb are so simpleUtilizing setjmp and longjmp in C for exception handling and coroutinesAbout encryption and certificatesDeep dive into LUA scripting language, so you can thoroughly understand debugging principles

Leave a Comment

Your email address will not be published. Required fields are marked *