Automatically translate this page?
3 January, 2018

MBUS — the MQTT flespi-style and the opportunities it brings along

Evolution of communication technologies used between flespi services and the introduction of own MQTT implementation to the public.

The birth of an internal communication system

“How do we interconnect all components of the platform?” This question sounds countless times during the large-scale development of a distributed platform. When we just started flespi development a few years ago, we designed a so-called MQ message delivery system for durable or nondurable dispatching events to multiple subscribers. Later on, we upgraded the system with a command delivery feature allowing not only to publish events but also to send commands and receive replies to them.

MQ is a perfect fit for our goals. It ensures lightning-fast communication of tens of gigabytes per second on one core, buffering, compression when used over TCP, and embedded messages TTL and ACK management. And it’s durable due to automatic file system persistence during service restarts. A little later, when we started using it for communication between different workers of the same process, we also implemented Unix domain sockets as a transport layer, achieving five times higher bandwidth for internal usage.

About 15 months ago, when we started thinking about how to deliver our services to end users, we picked HTTP REST API as the main communication endpoint between flespi services and third-party applications. We implemented own HTTP client and server and packed them with such features simplifying developer’s life as automatic HTTP request validation with JSON/YAML schemes, generation of automatic REST services interface wrappers, cool object selectors in URI, various request handlers, PostgreSQL database integration, fully async processing, and LUA integration for server-side scripting — all operating with high performance

We created a so-called durable HTTP REST client that communicates via MQ-exchange automatically repeating HTTP calls when the server is unavailable. Then we added load-balancing on the client side so that calls get load-balanced on multiple servers based on the URI, and this pool gets automatically reloaded when the service migrates from one server to another. The result was so good that HTTP REST API grew into one of the most crucial mechanisms in internal communication between our services.

As of the beginning of 2017Q4, two technologies ensured internal connectivity between flespi microservices: 

  • MQ — for fast binary commands/events interface, internal communication between various sub-services;

  • HTTP RESTful — for structured API between high-level services and as API export point for flespi users.

You can see how these two communication technologies coexist in the same system on the following diagram describing the architecture of the flespi database system:

flespi database system architecture

The entire database system provides its public interface via HTTP REST API. However, for fast internal communications, messages manipulations, and retrieval of data from the database nodes it uses MQ-type communication shown in brown dashed lines.

If we look broader at the flespi architecture, we will notice that high-level services interact via HTTP REST API (shown in purple and brown lines):

flespi platform architecture

Even though it worked perfectly, we found several downsides in such type of communication. HTTP REST API is good when you know the target — whom you should send an HTTP request. But that’s a problem. When your system grows, you still need to develop cool components, but you don’t want one part of the system to know anything about another part — its address, its API. If we talk not about commands, but about events, then each side should specialize in one thing only, and do it well: a sender should send messages and not think about a receiver, a receiver should know nothing about senders and simply receive messages. We need events or messages bus — something like this:

flespi message bus concept

As we are on the dark side of telematics and in touch with the IoT-world, we ended up with an MQTT protocol — it meets our criteria like a charm.

Creating a message bus

We started crafting our implementation of an MQTT client and MQTT broker. MQTT broker got a name MBUS, which stands for “messages bus”. Its initial version was ready in October 2017. We deployed it, flooded with test events, and decided to upgrade the entire system architecture and move all communication that does not need a response to MBUS.

Streams were the first to use MBUS. Upgraded streams allowed not only to forward data from this or that device but also to subscribe to entire channels. This became viable owing to the creation of a universal input messages bus for both channels and devices. So we started publishing each message received from the channel to a topic in MBUS: customer/{customer_id}/flespi/message/gw/channels/{channel_id}/{ident}. For each device detected for the given channel and ident, the messages get posted to the following topic in MBUS: customer/{customer_id}/flespi/message/registry/devices/{device_id}/registered.

Actual MQTT message payload remains the same — it’s a JSON with message data. Now the streaming service can manage streams subscriptions, which in essence means subscribing to or unsubscribing from a topic. For example, when a stream subscribes to a certain channel, it subscribes to messages with a topic: customer/{customer_id}/flespi/message/gw/channels/#.

The second user of MBUS was our PROD system (product gateway), especially its counters and restrictions components. Here’s the scheme of how it works:

mbus mqtt technology architecture

Light green boxes are services processing something for flespi users: PROD services are working with HTTP REST API requests and TG services are processing TCP/UDP traffic from various telematics devices with the help of the PVM technology

Internal service plim (in yellow) has two goals:

  1.  run various counters and deliver data from them to PROD HTTP servers for users to retrieve and use this information (e.g., visualize it on a dashboard, as we did in our panel, or retrieve it via REST API for data-driven decision-making in third-party platforms. 

  2. perform system blocking based on customer’s current usage and limits.

Initially, plim offered HTTP REST API and each PROD and TG service with information on the use of its resources by customers performed an HTTP request with all this information to plim every 10 seconds. When a customer performed an API request to fetch its current counters values (like we do in panel dashboard), the request was proxied directly to plim, since only plim knew the current state of the customer’s counters.

But with the transition to the MBUS MQTT broker, the architecture became much simpler. Now each service that has resources does not post their usage values directly to plim but instead publishes MQTT messages to the MBUS with a predefined topic. When plim starts, it subscribes to this topic and immediately receives all resource usage messages from all processes automatically. The diagram above illustrates the process with a thick purple arrow. Next, plim regularly publishes the current counters values to the MBUS, and PROD HTTP processes receive all these messages and can provide a customer with the counters values directly. This is shown in the diagram with thin purple arrows. So, there’s no need to proxy HTTP requests to plim anymore.

On top of that, we decided to develop an MQTT proxy allowing MQTT client to authorize with our tokens and deliver to subscribed sessions only the messages allowed by the token ACL. Proxy also handles all traffic and prefixes topics with customer/{customer_id} of the actual flespi user. This means that each flespi user operates in a private, dedicated MQTT namespace and will never see MQTT messages from other users.

Shaping a new logging system

We went even further from there! Generic events bus allowed us to develop a new logging system. It’s dead easy: each customer’s direct or indirect action is logged by publishing a message to flespi/log/{api}/{origin}/{event_type}/{extra-data}. Our internal process subscribes to these messages and stores them in a dedicated storage container available via GET /platform/logs

You, as a developer, can already receive log messages in real-time by connecting to flespi via an MQTT client and subscribing to the flespi/log/# topic. Ordinary (non-geeky) people are welcome to use our Toolbox utility instead — it visualizes all the logs, even for deleted items. Imagine, every tiny action is now logged and can be restored — all API calls, all CREATE, UPDATE, and DELETE operations. Moreover, for channels, we log everything about connections, especially errors. For streams, we log how many messages arrive each run and any processing errors we encountered. Same with MQTT — we log sessions, messages, and errors. This is a true paradise for any developer — all this wealth of information is in your pocket immediately, in real-time. You don’t need to contact support anymore asking what’s wrong with your items. You can get this data directly thanks to our MBUS mechanism.

Publishing your messages to MBUS

Finally, we introduced a possibility not only to subscribe to messages but also to publish own messages. Each flespi user can use our MQTT broker to publish and subscribe to any topic except flespi/ prefix, e.g. my/topic or her/message. It’s your messages, so we don’t care about payload data format but recommend to use JSON as a unified data representation technology.

To connect to flespi MQTT broker use the following parameters:

HTTP REST APIYes, you can manage sessions and subscriptions via REST API
hostmqtt.flespi.io
port1883/8883(SSL), WebSockets: 80/443(SSL)
client-idany of your choice
usernameyour token
passwordnot used
QoS supportedQoS 0, QoS 1
retained messagesyes

We did a lot in December to make MBUS suitable to handle millions of messages per second with thousands of subscribers and publishers and performed many design optimizations. But this is a subject of a separate article because its internal design is really interesting (BTW, it was architectured on the flight from Amsterdam to San-Francisco this September). 

As a part of our broker testing process, Wialon Hosting team connected to it. MBUS started to publish and subscribe messages (mostly publish) in JSON format from the Wialon DataCenter. This is around 800,000 devices sending more than 1,000,000 messages per minute. What is extremely interesting is that this load was almost unnoticed. MBUS, as well as the rest of the flespi platform services, is written in pure C using non-threaded asynchronous I/O and performs awesome.

***

flespi team wishes you a Happy New Year and welcomes to try the new MQTT connectivity option. It may be a present for your business you didn’t expect!



Get the latest updates and monthly newsletters from flespi in your inbox

9 November, 2018 | flespi integrations | Anton Kulichenko

How To Set Up Email Notifications About Flespi Errors

Using Thingsboard platform to trigger alarms and send email notifications upon flespi stream errors.

30 October, 2018 | about flespi | Jan Bartnitsky

Let's Switch Sides. Flespi Concepts From The User Perspective

Looking at the flespi platform in terms of functionality, complexity, and pricing to deliver the maximum value with maximum efficiency.