14 August, 2018

Google IoT Cloud: cheap storage and advanced analytics for your telematics data

Forwarding messages from GPS tracking devices via flespi telematics hub into the Google ecosystem for long-term storage, processing, and insights.

Oops!... I did it again. I connected flespi with one of the most powerful cloud platforms. This time it is Google Cloud Platform. Google provides a series of cloud services for computing, data storage, data analytics, and machine learning. And now, having devices connected to a flespi channel, you can feed all this Google Cloud infrastructure with GPS data. 

A more succinct step-by-step guide is available in our dedicated KB article.

Please welcome a new flespi stream — a link between flespi telematics hub and Google IoT Core — google_iot stream. It can send messages via MQTT protocol into Google Cloud IoT Device, which then forwards data into Google data bus, aka Google Cloud Pub/Sub, which then triggers Google Cloud Function, which processes the messages and saves telemetry time series data into Google Cloud Bigtable, which then can be accessed via API from your application… Belay! A little less conversation, a little more action, please. 

Ok, this is the scheme of the entire process — starting with messages flowing from GPS tracking devices into the flespi channel and finishing with extracting the telemetry data from the database into your app:

flespi to google cloud scheme

I assume that you’ve already succeeded in connecting devices to flespi. So, let’s go straight to number one to Google Cloud setup. 

Setting Up Google Cloud IoT Core

According to our scheme, we now need to create a Google Cloud IoT Core device registry and register a device. For those who feel that the previous sentence sounds strange (like me ;), I would recommend following this quickstart. The most important thing to notice here is: you should choose the RS256 key for device authentication because ES256 keys are not yet supported in the google_iot stream.

Below are the screenshots of my Google Cloud Console, showing the examples of registry:

google cloud registry

and device:

google cloud device

Setting Up Google Cloud Function

Cloud Function is supposed to run each time a device message from Google Cloud Pub/Sub comes. To connect the function with the device, when creating the function select the ‘Cloud Pub/Sub’ option for the Trigger field and select the default telemetry topic (the one that you entered when creating the registry) for the Topic field. Choose other options at your discretion.

google cloud function

For Runtime I used node.js and came up with the following code:

package.json

{
  "name": "flespi-function-example",
  "version": "0.0.0",
  "private": true,
  "dependencies": {
    "@google-cloud/bigtable": "^0.10.2"
  }
}

index.js

'use strict';

const bigtable = require('@google-cloud/bigtable');
const bigtableClient = bigtable();
const instance = bigtableClient.instance('flespi-bigtable'); // (!) Bigtable instance name
const table = instance.table('flespi-devices-timeseries');// (!) Bigtable table name

exports.flespiBigtable = (event, callback) => {
  const pubsubMessage = event.data;
  const messageString = Buffer.from("" + pubsubMessage, 'base64').toString();
  console.log(messageString);
  const messageObject = JSON.parse(messageString);
  let ident = messageObject.ident;
  let timestamp = messageObject.timestamp;

  if (!ident || !timestamp) {
    throw new Error('Message must contain ident and timestamp properties!');
  }
 
  if (ident.indexOf(':') !== -1) {
    ident = ident.split(":")[0]; // skip password, if any
  }
  timestamp = parseInt(timestamp);
  let msg = {
    method: 'insert',
    key: `${ident}:${timestamp}`,
    data: {
      ["msgs"]: {// (!) column family name
        ["msg"]: messageString,
      },
    },
  }

  return Promise.resolve()
    .then(() => console.log('starting...'))
    .then(() => table.mutate(msg))
    .catch((error) => {
      if (error.name === 'PartialFailureError') {
        console.warn('Partial Error Detected');
        error.errors.forEach((error) => {
          console.error(error.message);
        });
      } else {
        console.error('Something went wrong:', error);
      }
    })
    .then(() => console.log('done!'))
    .then(callback);
};

As you see, the package.json file declares and the index.js file uses one dependency: @google-cloud/bigtable. This is the node.js client library for Google Cloud Bigtable that provides methods to operate Bigtable's entities. 

Setting Up Google Cloud Bigtable

Google Cloud Bigtable is a fast and performant NoSQL database that works great for storing large amounts of time-series data. Therefore I’ve chosen it for my experiment. You can follow this guide to create a Bigtable instance and connect to it with the cbt CLI-tool. We also need to create a table and add a column family to it. Be attentive, you should use the same names for Bigtable instance, table and column family, as in the Cloud Function code: 

~ $ cbt -project flespiio -instance flespi-bigtable createtable flespi-devices-timeseries
~ $ cbt -project flespiio -instance flespi-bigtable ls
flespi-devices-timeseries
~ $ cbt -project flespiio -instance flespi-bigtable createfamily flespi-devices-timeseries msgs

Configuring google_iot stream

Once Google Cloud setup is completed, it’s time to set up a flespi google_iot stream. The stream configuration is pretty intuitive and most of the required settings can be found in the Google Cloud Platform Console. In the private_key field insert the contents of the rsa_private.pem file that was generated for device authentication. Here is how my stream configuration looks:

google_iot stream flespi

Data flow

Finally, the goole_iot stream is created and subscribed to a flespi channel. It’s time to start data feeding and follow the data lifecycle in the logs. Like a river flows surely to the sea, the data from GPS trackers will flow to the Bigtable database. 

Messages from GPS trackers in the flespi channel:

flespi channel log toolbox

Messages are streamed to Google Cloud IoT Core:

google_iot stream logs toolbox

Messages are processed by Google Cloud Function:

google cloud function log

Messages are stored in a Bigtable instance:

~ $ cbt -project flespiio -instance flespi-bigtable read flespi-devices-timeseries
device1:1533910878
  msgs:msg                                 @ 2018/08/10-17:22:09.385000
    "{\"channel_id\":313,\"ident\":\"device1:device1\",\"position.altitude\":13,\"position.direction\":12,\"position.latitude\":12.568723,\"position.longitude\":12.568723,\"position.satellites\":14,\"position.speed\":11,\"timestamp\":1533910878.71726}"
----------------------------------------
device1:1533910888
  msgs:msg                                 @ 2018/08/10-17:22:10.298000
    "{\"channel_id\":313,\"ident\":\"device1:device1\",\"position.altitude\":13,\"position.direction\":12,\"position.latitude\":12.568723,\"position.longitude\":12.568723,\"position.satellites\":14,\"position.speed\":11,\"timestamp\":1533910888.712281}"
----------------------------------------
device1:1533910898
  msgs:msg                                 @ 2018/08/10-17:22:09.538000
    "{\"channel_id\":313,\"ident\":\"device1:device1\",\"position.altitude\":13,\"position.direction\":12,\"position.latitude\":12.568723,\"position.longitude\":12.568723,\"position.satellites\":14,\"position.speed\":11,\"timestamp\":1533910898.713156}"

...

----------------------------------------
device2:1533913835
  msgs:msg                                 @ 2018/08/10-18:10:36.477000
    "{\"channel_id\":313,\"ident\":\"device2:device2\",\"position.altitude\":13,\"position.direction\":12,\"position.latitude\":12.568723,\"position.longitude\":12.568723,\"position.satellites\":14,\"position.speed\":11,\"timestamp\":1533913835.608710}"
----------------------------------------
device2:1533913845
  msgs:msg                                 @ 2018/08/10-18:10:46.546000
    "{\"channel_id\":313,\"ident\":\"device2:device2\",\"position.altitude\":13,\"position.direction\":12,\"position.latitude\":12.568723,\"position.longitude\":12.568723,\"position.satellites\":14,\"position.speed\":11,\"timestamp\":1533913845.60867}"

Bigtable stores data sorted by the row key. I use ‘ident:timestamp’ as a row key for messages, so for each device messages are sorted by timestamp. It makes the time range queries to the database highly efficient. 

Google provides a series of client libraries to access the data stored in Bigtable. This small node.js example shows how to read a batch of device messages from a Bigtable instance in your application.

Conclusion

The flespi google_iot stream is an easy way to get the telemetry data from your tracking devices in the Google Cloud. It gives you a fully managed, massively scalable, and cost-effective storage for the data. Besides, you get endless possibilities of the entire Google Cloud ecosystem for data analytics, visualization, and machine learning.