
From open-source data to open-source event streams, learn about the MQTT publish/subscribe (pubsub) wire protocol.
Last November, we bought an electric car, which also sparked an interesting thought: when should we charge the electric car? For the electricity used to charge the electric car, I hope to correspond to the minimum carbon dioxide emissions, summarized into a specific question: what is the carbon dioxide emission per kilowatt-hour at any given time, and when during the day is this value the lowest?
Finding Data
I live in New York State, where about 80% of power consumption is self-sufficient, primarily from natural gas, hydroelectric (mostly from Niagara Falls), nuclear power generation, and a small portion from wind, solar, and other fossil fuel generation. The non-profit organization New York Independent System Operator[1] (NYISO) is responsible for the operation of the entire system, balancing power generation with consumption, and also oversees the New York street lighting system.
Although no public API is provided, NYISO has responsibly offered plenty of public data[2] for public use. Fuel consumption data from various power generation units across the state is reported every 5 minutes. The data is published in CSV format in a public archive and updated throughout the day. If you understand the contribution of different fuels to power generation, you can estimate carbon dioxide emissions at any given moment quite accurately.
When building tools to collect and process public data, we should always avoid overusing these resources. Instead of packaging this data and sending it to everyone, we have a better solution. We can create a low-overhead event stream that people can subscribe to and receive messages in real-time. We can implement this solution using MQTT[3]. My project (ny-power.org[4]) aims to be incorporated into the Home Assistant[5] project; the latter is an open-source home automation platform with hundreds of thousands of users. If all users access the CSV file server simultaneously, NYISO would likely have to increase access restrictions.
What is MQTT?
MQTT is a publish/subscribe wire protocol designed for small-scale devices. The publish/subscribe system works like a message bus. You publish a message to a topic, and all clients subscribed to that topic can receive a copy of that message. For the message sender, there is no need to know who is subscribing; you simply publish messages to a series of topics and subscribe to some topics of interest. It’s like attending a party and choosing to join conversations that interest you.
MQTT can build extremely efficient applications. Clients subscribe to a limited number of topics and only receive the content they are interested in. This not only saves processing time but also reduces network bandwidth usage.
As an open standard, MQTT has many open-source client and server implementations. For every programming language you can think of, there is a corresponding client library; there are even libraries embedded in Arduino for building sensor networks. There are many server options to choose from, and my choice is the Mosquitto[6] server provided by the Eclipse project because it is lightweight, written in C, and can support thousands of subscribers.
Why I Love MQTT
Over the past twenty years, we have designed reliable and accurate models for software applications to solve service-related problems. Do I have any other emails? What is the current weather like? Should I buy this product right now? In most cases, this ask/receive model works well; but in a world of data explosion, we need other models. The publish/subscribe model of MQTT is very powerful, allowing large amounts of data to be sent into the system. Clients can subscribe to a small portion of the data and receive updates as soon as the subscribed data is published.
MQTT also has some interesting features, one of which is the last-will-and-testament message, which can be used to distinguish between two types of silence: one where no topic-related data is pushed, and another where your data receiver has failed. MQTT also includes retained message, which provides the last message of a relevant topic when a client first connects. This is essential for slowly updating topics.
During the development of the Home Assistant project, I found that this message bus model is particularly suitable for heterogeneous systems. If you delve into the Internet of Things space, you will find MQTT everywhere.
Our First MQTT Stream
The CSV file published by NYSO contains real-time fuel mix usage data. Every 5 minutes, NYSO publishes the types of fuel used for power generation during that 5 minutes and the corresponding power generation (in megawatts).
This CSV file looks like this:
Timestamp | Timezone | Fuel Type | Power Generation in MW |
---|---|---|---|
05/09/2018 00:05:00 | EDT | Mixed Fuel | 1400 |
05/09/2018 00:05:00 | EDT | Natural Gas | 2144 |
05/09/2018 00:05:00 | EDT | Nuclear | 4114 |
05/09/2018 00:05:00 | EDT | Other Fossil Fuels | 4 |
05/09/2018 00:05:00 | EDT | Other Renewables | 226 |
05/09/2018 00:05:00 | EDT | Wind | 1 |
05/09/2018 00:05:00 | EDT | Hydro | 3229 |
05/09/2018 00:10:00 | EDT | Mixed Fuel | 1307 |
05/09/2018 00:10:00 | EDT | Natural Gas | 2092 |
05/09/2018 00:10:00 | EDT | Nuclear | 4115 |
05/09/2018 00:10:00 | EDT | Other Fossil Fuels | 4 |
05/09/2018 00:10:00 | EDT | Other Renewables | 224 |
05/09/2018 00:10:00 | EDT | Wind | 40 |
05/09/2018 00:10:00 | EDT | Hydro | 3166 |
The only puzzling thing in the table is the mixed fuel category. Most gas plants in New York also generate power by burning other types of fossil fuels. During winter cold snaps, the priority for home heating is higher than for power generation; but this situation occurs infrequently, and (in our calculations) we can consider the mixed fuel type as natural gas.
The CSV file is updated throughout the day. I wrote a simple data pump that checks for data updates every minute and publishes new entries to a series of topics on the MQTT server, with topic names corresponding to the CSV file. The data is transformed into JSON objects for easy processing in various programming languages.
ny-power/upstream/fuel-mix/Hydro {"units": "MW", "value": 3229, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Dual Fuel {"units": "MW", "value": 1400, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Natural Gas {"units": "MW", "value": 2144, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Other Fossil Fuels {"units": "MW", "value": 4, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Wind {"units": "MW", "value": 41, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Other Renewables {"units": "MW", "value": 226, "ts": "05/09/2018 00:05:00"}
ny-power/upstream/fuel-mix/Nuclear {"units": "MW", "value": 4114, "ts": "05/09/2018 00:05:00"}
This direct transformation is a good attempt to convert public data into public events. We will continue to convert this data into carbon dioxide emission intensity, but this raw data can also be used by other applications for other computational purposes.
MQTT Topics
The topic and topic structure are a major feature of MQTT. Unlike other standard enterprise-level message buses, MQTT topics do not require prior registration. Senders can create topics out of thin air; the only limitation is the length of the topic, which cannot exceed 220 characters. The /
character has special meaning for creating topic hierarchies. We will soon see that you can subscribe to some fragments of these hierarchies.
Based on the out-of-the-box Mosquitto, any client can publish messages to any topic. During the prototyping process, this approach is very convenient; but once deployed to a production environment, you need to add access control lists (ACL) to allow only authorized applications to publish messages. For example, anyone can access my application’s topic hierarchy in read-only mode, but only clients with specific credentials can publish content.
Topics do not contain automatic schema and there is no way to look up all the topics a client can publish to. Therefore, for those applications consuming data from the MQTT bus, you need to ensure they directly use known topics and message format styles.
So how should you design topics? Best practices include using application-related root names, such as using ny-power
in my application. Next, to improve subscription efficiency, build a sufficiently deep hierarchy. The upstream
hierarchy contains raw data obtained directly from the data source, while the fuel-mix
hierarchy contains specific types of data; we can add other hierarchies later.
Subscribing to Topics
In MQTT, subscribing is simply a matter of string matching. To improve processing efficiency, only the following two wildcards are allowed:
#
matches recursively until the end of the string+
matches the content before the next /
For ease of understanding, here are a few examples:
ny-power/# - Matches all topics published by the ny-power application
ny-power/upstream/# - Matches all raw data topics
ny-power/upstream/fuel-mix/+ - Matches all fuel type topics
ny-power/+/+/Hydro - Matches all topics that are Hydro type two levels down (even if not under the upstream hierarchy)
A broad subscription like ny-power/#
is suitable for low-volume applications, where the application fetches all data from the network and processes it. However, for high-volume applications, it can be a disaster, as most messages will not be used, wasting a significant amount of network bandwidth.
In high data volume situations, to ensure performance, applications need to use appropriate topic filtering (e.g., ny-power/+/+/Hydro
) to accurately obtain the data needed for business.
Adding Our Own Data Hierarchy
Next, everything in the application relies on existing MQTT streams and builds new streams. The first additional data layer is for calculating carbon dioxide emissions corresponding to power generation.
Using data provided by the U.S. Energy Information Administration[7] on the types of fuel generation and emissions in New York in 2016, we can provide the average emission rates[8] for each type of fuel, measured in grams per megawatt-hour.
The above results are encapsulated in a dedicated microservice. This microservice subscribes to ny-power/upstream/fuel-mix/+
, which is the raw data on fuel composition from the data pump, then completes the calculations and publishes the results (measured in grams per kilowatt-hour) to a new topic hierarchy:
ny-power/computed/co2 {"units": "g / kWh", "value": 152.9486, "ts": "05/09/2018 00:05:00"}
Then, another service subscribes to that topic hierarchy and packages the data into the InfluxDB[9] process; at the same time, it publishes time series data from the past 24 hours to the ny-power/archive/co2/24h
topic, which greatly simplifies the plotting of current changing data.
This hierarchical topic model works well and decouples the logic between the above programs. In complex systems, different components may use different programming languages, but it doesn’t matter because the exchange format is MQTT messages, which are topic and JSON formatted message content.
Consuming Data from the End
To better understand what MQTT accomplishes, binding it to a message bus and observing the message flow is a good approach. The mosquitto-clients
package’s mosquitto_sub
allows us to easily achieve this.
After installing the program, you need to provide the server name and the topic you want to subscribe to. If needed, use the parameter -v
to see the topics that have new messages published; otherwise, you will only see the message data within the topic.
mosquitto_sub -h mqtt.ny-power.org -t ny-power/# -v
Whenever I write or debug MQTT applications, I always run mosquitto_sub
in one terminal.
Accessing MQTT Directly from the Web
So far, we have an application that provides public event streams, which can be accessed via microservices or command-line tools. However, considering that the Internet still dominates, it is important to allow users to access event streams directly from the browser.
The designers of MQTT have considered this. The protocol standard supports three different transport protocols: TCP[10], UDP[11], and WebSockets[12]. Mainstream browsers support WebSockets, which can maintain persistent connections for real-time applications.
The Eclipse project provides a JavaScript implementation of MQTT called Paho[13], which can be included in your application. The working mode is to establish a connection with the server, create some subscriptions, and then respond based on the received messages.
// ny-power web console application
var client = new Paho.MQTT.Client(mqttHost, Number("80"), "client-" + Math.random());
// set callback handlers
client.onMessageArrived = onMessageArrived;
// connect the client
client.reconnect = true;
client.connect({onSuccess: onConnect});
// called when the client connects
function onConnect() {
// Once a connection has been made, make a subscription and send a message.
console.log("onConnect");
client.subscribe("ny-power/computed/co2");
client.subscribe("ny-power/archive/co2/24h");
client.subscribe("ny-power/upstream/fuel-mix/#");
}
// called when a message arrives
function onMessageArrived(message) {
console.log("onMessageArrived:" + message.destinationName + message.payloadString);
if (message.destinationName == "ny-power/computed/co2") {
var data = JSON.parse(message.payloadString);
$("#co2-per-kwh").html(Math.round(data.value));
$("#co2-units").html(data.units);
$("#co2-updated").html(data.ts);
}
if (message.destinationName.startsWith("ny-power/upstream/fuel-mix")) {
fuel_mix_graph(message);
}
if (message.destinationName == "ny-power/archive/co2/24h") {
var data = JSON.parse(message.payloadString);
var plot = [
{
x: data.ts,
y: data.values,
type: 'scatter'
}
];
var layout = {
yaxis: {
title: "g CO2 / kWh",
}
};
Plotly.newPlot('co2_graph', plot, layout);
}
}
The above application subscribes to several topics because we are going to present several different types of data; among them, the ny-power/computed/co2
topic provides us with a reference value for current carbon dioxide emissions. Once a new message is received for that topic, the corresponding content on the website will be replaced accordingly.
ny-power.org[4] website provides NYISO carbon dioxide emission charts.
ny-power/archive/co2/24h
topic provides time series data for Plotly[14] line charts. ny-power/upstream/fuel-mix
topic provides the current fuel composition for beautiful bar charts.
ny-power.org[4] website provides the fuel composition.
This is a dynamic website; data is not pulled from the server but combined with the MQTT message bus, listening to the publicly available WebSocket. Like the data pump and packager programs, the website page is also a publish/subscribe client, but it executes in your browser rather than on public cloud microservices.
You can see dynamic changes at http://ny-power.org, including images and a real-time MQTT terminal where you can see messages arriving.
Continue Deepening
The complete content of the ny-power.org application is open-sourced on GitHub[16]. You can also check out the architecture overview[17] to learn how to use Helm[18] to deploy a series of Kubernetes microservices to build applications. Another interesting MQTT example uses MQTT and OpenWhisk for real-time text message translation, with code pattern reference link[19].
MQTT is widely used in the Internet of Things field, and more examples of MQTT applications can be found in the Home Assistant[20] project.
If you want to learn more about the protocol, you can find all the details of this public standard at mqtt.org[3].
To learn more, you can attend Sean Dague’s talk at OSCON[21], titled Adding MQTT to Your Toolbox[22], which will be held from July 16-19 in Portland, Oregon.
via: https://opensource.com/article/18/6/mqtt
Author: Sean Dague[24] Topic: lujun9972 Translator: pinewall Proofreader: wxy
This article is originally compiled by LCTT and honorably published by Linux China