Want to Drive Innovation through Crowdsourcing? Learn the 5 Steps Get the eBook ×


By jessie-ford In Uncategorized

Posted July 31st, 2015


This article will demonstrate the use of queue theory and a feedback alarm system that notifies a user when a persistant alarm condition is met. We will use Kue, a Redis-backed job queue written in NodeJs to achieve an intelligent alarm system that prevents alarm fatigue.


In recipe 1 and 2 we basically did the same thing: pumped data into a Data Lake and then aggregated it into a more meaningful place: Mongodb to a D3js graph for recipe 1 and Treasure Data (Hadoop) to Salesforce for recipe 2. Now let’s switch gears and talk about event triggers which are the second major facet of big data.

We could of course add logic to our microcontroller to blink an LED when an alarm is raised or we could even make our MCU email us directly. That is a fine approach but in this example we want to move that logic upstream to our cloud systems so that we can tune that logic without having to write to our device. We can also use more complex logic to suppress alarms to prevent alarm fatigue.

For this approach we will implement Kue, a priority Redis-backed job queue written in Node. If you are familar with MQTT of AMQP, Kue is very similar; however, it uses REST calls instead of a unique protocol like MQTT or AMQP. Kue is modeled after Resque which is built on Ruby.

Queue Concepts

If you are new to these technologies then it is worth talking a little about Queue concepts. The implementation of these concepts varies slightly between technologies but for the most part we will speak in general terms.


The first concept is the queue itself. If we consider these systems to be like an RDMS then the queue would be analogous to the table. You can think of the queue as a unique stack of a buffer. In Kue parlance they call it a type (aka queue name) and the thing that goes into the queue is referred to as a job. All jobs must have a type before they can be inserted into a system.


In addition to a type they also have a state. Kue’s states are as follows:

  1. inactive (default) – jobs that have not been worked yet
  2. active – jobs that are being worked but aren’t complete
  3. failed – jobs that have been worked but returned errors
  4. complete – jobs that have been successfully worked
  5. delayed – jobs that will move to inactive when their timer expires.


A “worker” is a bit of code designed to do something to the jobs. Workers come in two flavors: producers that make jobs, consumers that do something and complete jobs or the swirl that can do both A worker is either a consumer or producer or it can do both. A worker is a concept only and it is not declared in your code. In our example we will have a worker that consumes one job, reads the value and completes the job but may move the data to a new queue depending on what it finds. This is an example of a worker that is both a consumer and producer.

The implementation

So my goal was to create a system that would notify me when my plants became dry and eventually I would implement a solenoid to water them but for now a simple email notification would be fine. I came up with an elaborate queue promotion schema that would find the right balance between too much notification and not enough. I walked my buddy Matt Twomey through the implementation and he pointed out the fact that I could achieve this with a much simpler single listener code that did not store the alarms but just sent the emails at the appropriate time. I tried to justify my solution with the fact that I actually store each alarm; that fact gave me better insight. The harsh reality is that my solution is akin to the elaborate deathtraps villains use to kill James Bond. Yeah, you know what I am talking about. My solution is the platform that slowly lowers 007 into the shark tank. That being said, it does have merits: it preserves each alarm. Here goes:

I have three queues: myBuffer, hold and alarm. (For Kue this is the job type)

  1. Every five minutes I poll the moisture reading and the value goes into myBuffer.
  2. If the reading is ok (above threshold) I complete the job and empty all three queues.
  3. If the reading is below threshold (too dry) I complete the job and copy it’s contents into a job in the hold queue.
  4. Every time I put a job in the hold queue I count its queue length — it it’s greater than 3 the last one will be copied into the alarm queue with a delay and the hold queue is cleared.
  5. I calculate the delay in the alarm queue by counting the number of jobs already in the alarm queue so that the delay time is exponential.
  6. Once the delay time expires, another alarm consumer will pick up the job and email me.

Pretty simple! Let’s look at some worker code.

We include Kue then require Loadash which we will use later on when we clear the queue. We are also using Async since we need to get various queue lengths before we perform any action. Next we define our moisture threshold. I have chosen 300. If you remember this is a 10-bit analog reading so 1024 will be totally submersed in water and 0 will be totally dry. Once it gets to 300 the moisture reading will drop like a rock. I am still running this code locally but when I push it to Heroku the Kue URL will be stored as an environmental variable. And finally we initialize the delayTime to zero.

Next we call the bufferEvaluate function as follows:

This is the consumer or the subscribe worker to the myBuffer queue:

The first condition basically checks to see if it is ok (above moisture threshold) and if it is, it completes the jobs and clears the queue. The async waterfall piece contains the logic to move the too-dry jobs to the appropriate queue to eventually be picked up by the email consumer. I have kept it.

Next let’s look at some utility functions. Unfortunately calls like clearQueue, countJobs and reQueue are not provided in queue but 140proof created some node-kue-tools that I was able to recycle and this was exactly what I needed. All the following functions were stolen from this project except delayedReQueue which simply moves to another queue but also adds a delay as a parameter.

I have not included the mail worker but if you are interested in this, it is super easy. Take a look at sendgrid-nodejs and just implement another consumer like we have already shown and have it process the alarm queue. Delayed jobs won’t email until the delay time has expired.

One of the things that attracted me to Kue is the simple UI that is included. Who wants to remember command line syntax for seeing their queues? Below is a screenshot of the Kue UI. That shows the third job in the inactive (queued) state expanded to display the details.

The Kue UI is quite simple and sits on top of Express. You can find the minimal ui.js in the Kue Readme but I have included mine here in case you want to see how you can implement basic auth:

In order for all this to work, you will also need to install Redis. In you local environment you just need to install Redis and run redis-server. You can opt for all the defaults. On Heroku it is just as easy. Just get the free version of REDIS TO GO as an add-on. It will add all the REDIS_HOST environmental variables automatically that is used in config/kue.js

Although I have not tried it, I suspect you could put your workers in here if you ran on Heroku so you don’t need to pay for worker dynos. However ,if you are using it in production I would spend the extra bucks to pay for the worker dynos.


Alarming and event notification constitute the second major piece of the IOT puzzle after analytics. There are advantages to using full-blown message queues like MQTT in the IOT domain. The biggest advantage is that an MQTT client has a very small footprint and it has been ported over to run on your Arduino sketch as a library. However, there are times when you just need a simple REST-based queue to asynchronously process potentially large volumes of data. If you are using a next-generation MCU that can easily make http posts, and you are familiar with NodeJS, then Kue might be a good choice for you.