Blog

MQTT As The Center of Everything

Internet of Things (IoT) is a buzzword that means anything from Smart Toasters to Raspberry Pi computers. I don’t know the exact definition, but I know that most devices that are marketed as IoT are built on paradigms I’m already familiar with like Pub/Sub, REST and Bluetooth Low Energy. I wanted to make a proof of concept using IoT technologies and to blog about my findings, that small beginning lead to MQTT and the rest of this project.

Technology

MQTT: MQTT is a machine-to-machine connectivity protocol with it’s underlying implementation as a pub/sub message transport. There are several implementations out there, but for this post I used Mosca, a simple MQTT broker written in Node.js. I use it as the center of my IoT application. All events/messages are passed through MQTT for consumption by different clients.

Plotly: Plotly is a visualization platform. I’ll use the Plotly streaming API to stream events from MQTT into visualization.

Spark Streaming: Spark is a distributed computing platform for batch and streaming jobs (Spark Streaming). I’ll get a stream of events from MQTT and use that to calculate some averages and distributions.

Leap Motion: A device for gesture control.

Plotly: A platform for interactive and streaming visualization.

Code

I’ll use Node.js to Manage MQTT, Leap Motion and Phillips Hue. Node.js has a great library for IoT (Cylon.js), with drivers for each of theese devices. Writing an MQTT server that can handle the pub/sub load for this project is fairly trivial. I used Redis for persistence and just used the boilerplate code from Mosca:

/**
* This is a simple MQTT server with a Redis Backend
*/
var mosca = require('mosca');
var config = {
type: 'redis',
redis: require('redis'),
db: 12,
port: 6379,
return_buffers: true,
host: "localhost"
};
var moscaSettings = {
port: 1883,
backend: config,
persistence: {
factory: mosca.persistence.Redis
}
};
var server = new mosca.Server(moscaSettings);
server.on('ready', setup);
// print connected clients
server.on('clientConnected', function(client) {
console.log('client connected', client.id);
});
// Print the packet the the console
server.on('published', function(packet, client) {
try{
console.log(JSON.parse(packet.payload.toString()))
} catch(e){
console.log(packet)
}
});
// When the server in ready, print a message
function setup() {
console.log('Mosca server is up and running')
}
view raw mosca_server.js hosted with ❤ by GitHub

Connecting to Leap Motion is also simple. I’ll send messages to two different MQTT topics. One for a specific gesture and the other will be my hand position [1] and a time stamp:

"use strict";
var Cylon = require('cylon');
/**
* This robot sends messages to the hand_motion and key_tap_gesture channel using the leap motion
*/
Cylon.robot({
connections: {
server: {adaptor: 'mqtt', host: 'mqtt://localhost:1883'},
leapmotion: {adaptor: 'leapmotion'}
},
devices: {
leapmotion: {driver: 'leapmotion', connection: 'leapmotion'}
},
work: function (my) {
my.leapmotion.on('hand', (payload)=> {
var data;
try{
data = payload.sphereRadius
} catch(e) {
throw new Error(e)
}
var now = new Date();
var dataSet = JSON.stringify({'x': getTimeString(now), 'y': data});
my.server.publish('hand_motion', dataSet)
});
my.leapmotion.on('gesture', (payload)=>{
var state;
try{
state = payload.type
} catch(e){
console.log(e)
}
if(state == 'keyTap'){
my.server.publish('key_tap_gesture', state)
}
})
},
error: (err)=>{
console.warn("Rats")
}
}).start();
// Code borrowed from random signal package
var getTimeString = (now)=> {
var year = "" + now.getFullYear();
var month = "" + (now.getMonth() + 1);
if (month.length == 1) {
month = "0" + month
}
var day = "" + now.getDate();
if (day.length == 1) {
day = "0" + day
}
var hour = "" + now.getHours();
if (hour.length == 1) {
hour = "0" + hour
}
var minute = "" + now.getMinutes();
if (minute.length == 1) {
minute = "0" + minute
}
var second = "" + now.getSeconds();
if (second.length == 1) {
second = "0" + second
}
var ms = "" + now.getMilliseconds();
while (ms.length < 3) {
ms = "0" + ms
}
return year + "-" + month + "-" + day + " " + hour + ":" + minute + ":" + second + "." + ms;
};

The Hue is also very simple. It will listen on a specific hand gesture topic and toggle itself on our off, it’ll publish it’s state to a light state topic:

var Cylon = require('cylon');
var config = require('./config.json');
/**
* This Robot is subscribed key_tap_gesture topic and will toggle on and off. It will send its state as a message
* to the light_state topic
*/
Cylon.robot({
connections: {
server: {adaptor: 'mqtt', host: 'mqtt://localhost:1883'},
hue: { adaptor: 'hue', host: config.hueHost, username: config.hueUser }
},
devices: {
bulb: { driver: 'hue-light', lightId: 6, connection: 'hue' }
},
work: function (my) {
my.server.subscribe('key_tap_gesture');
my.server.on('message', (topic, data)=>{
my.bulb.toggle();
my.server.publish('light_state', JSON.stringify({'isOn': my.bulb.isOn}))
});
}
}).start();
view raw hueexample.js hosted with ❤ by GitHub

Connecting Plotly streaming is relatively simple as well. I had to create a read stream from the MQTT topic and pipe it to Plotly:

var config = require('./config.json');
var username = config[ 'user' ];
var apiKey = config[ 'apiKey' ];
var token = config[ 'token' ];
var Plotly = require('plotly')(username, apiKey);
var mqtt = require('mqtt');
var client = mqtt.connect('mqtt://localhost:1883');
var stream = require('stream');
// Create a Readable Stream Class to Feed To The Plotly API
class ReadableStream extends stream.Readable {
constructor () {
super();
stream.Readable.call(this, {objectMode: true});
}
_read (value, encoding) { // Required
// console.log("Reading data in…");
}
}
var myReadStream = new ReadableStream();
// Print Data To The Console To Ensure It's Being Transmitted
myReadStream.on('data', (data)=> {
console.log(data)
});
myReadStream.on('error', (data)=> {
console.log("There was an error with the data", data)
});
// Immediately subscribe to events from MQTT in the hand_motion topic
client.on('connect', ()=> {
client.subscribe('hand_motion')
});
var data = {
'x': [],
'y': [],
'type': 'scatter',
'mode': 'lines+markers',
marker: {
color: "rgba(0, 0, 0, 1)"
},
line: {
color: "rgba(139, 76, 144, 1)"
},
stream: {
"token": token
, "maxpoints": 200
}
};
// build your layout and file options
var graphOptions = {
"filename": "MQTT Stream"
, "fileopt": "extend"
, "layout": {
"title": "Streaming Hand Gestures From Leap Motion"
}
, "world_readable": true
};
client.on('message', (topic, message)=> {
myReadStream.push(message.toString()+'\n');
});
Plotly.plot(data, graphOptions, function (err, resp) {
if (err) return console.log("ERROR", err);
console.log(resp);
var plotlystream = Plotly.stream(token, function () {
});
plotlystream.on("error", function (err) {
console.log("something went wrong")
});
// Streaming From MQTT to Plot.ly
myReadStream.pipe(plotlystream)
});

Spark Streaming:

I was hoping to use EclarJS which is a Nashorn Spark API, allowing me to write Node.js with a Nashorn backend and write Spark Streaming Code. Unfortunately, it required a bit more overhead to use additional libraries not included with the default configuration of Spark. Since the code was so simple I just decided to write it in Scala and I’ll save EclairJS for another post. Spark will consume the Leap Motion hand motion topic as a stream every 5 seconds and calculate summary statistics on the hand motion.

/**
* Created by josep2 on 10/18/16.
*/
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.mqtt._
import org.apache.spark.storage.StorageLevel
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
object StreamMqtt extends App {
// Create a Spark Context
val conf = new SparkConf().setAppName("Total Ecommerce Engine")
.setMaster("local[4]").set("spark.executor.memory", "2g")
// Turn off logs for demonstration purposes
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
// Stream every 5 seconds
val ssc = new StreamingContext(conf, Seconds(5))
// Listen on the hand_motion topic and stream data
val lines = MQTTUtils.createStream(ssc, "tcp://localhost:1883", "hand_motion", StorageLevel.MEMORY_ONLY_SER_2)
case class Points(x: String, y: Double)
// Convert JSON to A Dataframe and calculate summary statistics
lines.foreachRDD { rdd =>
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
val stepOne = spark.sqlContext.read.json(rdd).toDF()
printDescription(stepOne, "y").show()
}
// Attempt to calculate the summary statistics and return data otherwise
def printDescription(data: DataFrame, col: String) = {
try{
data.describe(col)
} catch {
case e: Exception => data
}
}
ssc.start()
ssc.awaitTermination()
}

I’ve decided that it’s easier to create a few videos showcasing the technology one connection at a time. We’ll start with publishing LeapMotion gestures to MQTT:

Now we’ll connect the Hue bulb and and show how it reacts to messages in the queue [2].

Here’s a quick real life demo to get an idea of what I’m talking about:

Now we’ll demonstrate the Spark Streaming side of things:

Finally, the Plotly side of things:

Now, this set up is not all together very useful or practical. I certainly don’t need my Leap Motion to turn on the lights (That’s what Amazon Echo and Siri are for), nor do I need a stream of activity from my Leap Motion to be visualized in real time. The point of this post is MQTT was the center of it all. Each device or API was subscribed to a topic and or published to one and behaved from there. Message queues are not new but it’s easy to see their value in this context.

Whats Next

I’m not sure if I’ll keep doing much with this project. As mentioned above it’s not the most useful thing in the world. If anyone is curious about it, all the code is up on Github if you’d like to check it out. You’ll need to have a Leap Motion and Phillips Hue to use it, however.


Notes:

[1] This is techically the sphere radius. I needed an X value in one dimension for my Time series plot.

[2] You’ll notice from the video that the gesture reading from Leap Motion leaves much to be desired. You’ll also notice I am keying in on the ‘keyTap’ gesture in the code, however I am using a swipe gesture. This is either a bug in the API or a problem with my Leap Motion. I”m trying to get my hands on another Leap Motion to rule out the latter.

[3] This blog post wouldn’t be possibly without great open source work. Thanks to the maintainers of Cylon.js, Mosca.js, Plotly.js, and Apache Spark.

[4] This is a great article on MQTT if you’re curious about the protocol.