How to re-publish MQTT messages to another topic or account?

How to achieve MQTT bridging functionality in flespi

Sometimes it is important to create a dynamic copy of MQTT messages published to standard flespi topics but in the custom topics hierarchy. This can be useful if you want to copy only a subset of these messages - for example device telemetry excluding the ident field.

To accomplish this you can use:

  1. Webhook to re-post MQTT messages to another topic using MQTT Broker REST API

  2. Scripted MQTT client with clean=false session and QoS=1 subscriptions and re-publications. This gives you the best throughput and control but will require you to host such a script outside of flespi and maintain the infrastructure to keep it alive.

If you want it very simple you can utilize webhook for the same task. Just be aware that each published MQTT message will consume one API call and you need to ensure that you have enough API calls per minute in your plan. If webhook will exceed the limit it will be blocked for a minute.

You can import webhook configuration into your account with the code below:

{
  "triggers": [
    {
      "topic": "flespi/state/gw/devices/+/telemetry/+"
    }
  ],
  "enabled": false,
  "name": "Re-post devices telemetry into another topic",
  "configuration": {
    "body": "[{\"topic\":\"devices/%topics[4]%/%topics[6]%\",\"timestamp\":%timestamp%,\"retained\":true,\"payload\":\"%strescape(tostring(payload))%\"}]",
    "cid": "%cid%",
    "method": "POST",
    "type": "flespi-platform",
    "uri": "/mqtt/messages"
  }
}

Webhook body configuration is based on the schema of POST /mqtt/messages API call. Here we extract device id and parameter name from the corresponding word in a topic and re-posting strescaped message payload to format correct JSON. All telemetry messages are reposted with retain flag set to true. You may remove this parameter if you need only real-time messages.

If you want more advanced method you can host your own script with similar functionality. The advantage of custom script is the level of control and the throughput.  You can implement any logic inside script and you are limited only by the MQTT traffic volume and messages amount per minute which are huge even for Free accounts.

Here is a sample of such script in javascript using mqtt.js library:

const mqtt = require('mqtt');

// User-configurable settings
const MQTT_BROKER_URL = 'mqtt://mqtt.flespi.io';
const USER_TOKEN = 'your_token_here';

// Connect to MQTT broker
const client = mqtt.connect(MQTT_BROKER_URL, {
  username: USER_TOKEN
});

client.on('connect', () => {
  console.log('Connected to MQTT Broker');
  // Subscribe to the topic with QoS=1
  client.subscribe('flespi/state/gw/devices/+/telemetry/+', { qos: 1 });
});

client.on('message', (topic, message) => {
  console.log(`Received message from ${topic}: ${message.toString()}`);
  // Extract device ID and telemetry ID from topic
  const parts = topic.split('/');
  const deviceId = parts[4]; // Adjusted for mqtt.js topic split
  const telemetryId = parts[6]; // Adjusted for mqtt.js topic split

  // Form new topic for re-publishing
  const newTopic = `devices/${deviceId}/${telemetryId}`;
  console.log(`Republishing to ${newTopic}`);

  // Re-publish with QoS=1
  client.publish(newTopic, message, { qos: 1 }, (err) => {
    if (err) {
      console.error('Error publishing message:', err);
    } else {
      console.log(`Message republished to ${newTopic}`);
      // Simulate acknowledging the original message
      // Note: MQTT clients automatically handle PUBACK for QoS 1 messages.
      console.log(`Acknowledged message from ${topic}`);
    }
  });
});

client.on('error', (err) => {
  console.log('Connection to MQTT broker failed:', err);
  client.end();
});

This script was written by ChatGPT and you can ask it to write similar script for you using any other programming language or MQTT library.


See also
MQTT bridge is a very useful tool when you need to separate and partially isolate parts of your system, as well as not be fully dependent on a third-party MQTT broker.
Apply webhooks to events from calculators to invoke your lambda upon an aggregated event happened to the device