30 March, 2018

Retained messages and extended REST API in flespi MQTT broker

We keep growing our MQTT 5.0 compliance — storing the last state of the topic added. Plus, we created a unique REST API for remote management of flespi MQTT broker.

Let me start by explaining what a retained message is and what kind of benefits it provides to clients. According to MQTT 3.1.1 specification when a client publishes a message to an MQTT broker, they can specify a “RETAIN” flag. When such a message comes to the broker, it gets into persistent storage. When another client connects to the broker and subscribes to a topic matching the retained message topic, the broker will immediately send this message to a client.

Basic MQTT bus usage implies sending and delivering events about anything. Retained messages, in turn, store the last state of anything. So, if an MQTT broker can provide good enough performance, it becomes a valuable component of the infrastructure and can even replace the entire database engine.

Imagine, you can publish all the data you need to store with the RETAIN flag and fetch it anytime by subscribing to an appropriate topic. Even from another host. Looks exactly like a database, doesn’t it?

Initially, when we developed the first version of mbus — our MQTT broker, we stored retained messages in a Postgre database with memory-cached topics. When a client issued a subscribe command, we analyzed these topics in memory and delivered appropriate retained messages. This implementation was OK for testing or small systems, and I believe most open-source and proprietary brokers keep using it, but… It had one critical flaw — it is fast when you read messages, e.g., by issuing SUB commands, but when you publish (update) retained messages with a frequency higher than IOPs of your database engine, you are in trouble. 

Our goal is to serve millions of messages per second, even if hundreds of thousands will be retained. So once we made our broker public, we disabled retained messages until implemented the storage system for them in the flespi database. Really fast storage.

We store each hierarchy level of a retained message topic in our own btree with extremely fast iteration and search possibility. We are now using 16KB pages in the database for each btree to optimize storage memory usage for retained messages data. Free users of our broker get 100MB of storage for MQTT retained messages.

To use retained messages in your service as a remote database each time you poll the data (subscribe) you need an indicator of whether the broker delivered the entire set of messages or not. For some reason, MQTT 3.1.1 and even the latest MQTT 5.0 specification do not require to deliver retained messages first and only then send a SUBACK packet as subscription acknowledgment. But flespi MQTT broker operates exactly this way — once you issue a SUBSCRIBE command, you may be sure that all retained messages will come to you before you receive SUBACK.

MQTT on public service

The first user that took advantage of retained messages is a public Locator in Nimbus. The idea of Locator is very simple — to visualize the current location and state of buses in a public transportation system, put such application onto the digital screens at each bus stop, embed it inside any web site, and make it mobile-friendly for users to estimate their wait time from their smartphones.

Nimbus uses flespi MQTT broker as a communication hub in the following manner:

  •  MQTT over WebSockets to securely connect to the broker directly from a webpage bypassing any firewalls.

  • Authorization tokens with limited ACL allow subscribing only to a specified locator to securely connect multiple locators to the same message hub.

  • Retained messages to immediately load the latest information about buses, their state, and position.

  • 10 minutes messages TTL and auto-expiration on the broker side to show only up-to-date buses — feature from MQTT 5.0 specification already supported by the flespi broker.

  • And basic MQTT PUB/SUB sequence to receive updates from buses in real-time.

Below is the dataflow of the entire process:

flespi mqtt broker used in wialon locator

Locator service relied on the recently proposed design of event-driven web applications with MQTT technology which greatly reduced its time-to-market.

Another interesting feature of the flespi MQTT broker appeared at the same time with retained messages. And it’s rare. It’s the MQTT broker REST API. We mixed HTTP and MQTT technologies and supplied the broker with a set of tools for remote management. With REST API you can:

  • Access MQTT broker logs and understand who and how uses the broker, determine what kind of problems you have, if any.

  • Manage MQTT sessions. Create persistent sessions, delete, subscribe, and unsubscribe MQTT sessions via REST requests.

  • Publish messages into the broker similar to publishing via an MQTT connection. This method provides extra functionality to specify some MQTT 5.0 publishing message features like TTL or payload data format not available directly in MQTT 3.1.1.

  • Retrieve and delete retained messages. These are important because there is no standard method in MQTT to retrieve retained messages or delete them in batch recursively.

MQTT 5.0 ready

Here at flespi, we have already implemented part of the MQTT 5.0 specification, embedded it into our broker, and created a checklist that we will update weekly. We don’t know of any commercial or open-source MQTT brokers that are going to implement most of MQTT 5.0 features this spring. We are ready to be first in the world and welcome all open-source MQTT 5.0 client libraries to use our broker for testing this summer.