1.3.2 • Published 6 years ago

@draad/messagequeue v1.3.2

Weekly downloads
1
License
MIT
Repository
bitbucket
Last release
6 years ago

Message Queue

alt text alt text alt text

Description

A redis based message queue that offer parrallel processing.
Message queues are often used to ease services communications in a micro-service architecture.



Setup

Install the package :

npm install @draad/messagequeue

You also need to have redis available. You can get the docker image as follow :

docker pull redis

We connect to redis using settings passed as environment variables.
Default settings are used for local development

process.env.redisSettings = JSON.stringify({
      port: 6379,
      host: '127.0.0.1',
    });


Usage

Publish a message

A PRODUCER service can emmit a message on a specific CHANNEL using the PUBLISH function.
A channel is composed of multiples related QUEUES, see observer section for more informations.
The message can be in any format.
Channel will be created on the fly and you can create as many as you want.
This library respect the first message in, first message out. This paradigm is respected for each individual channel, but not between channels.

Publish takes as params :

  • {string} channel Name of the channel that will receive the message.
  • {any} message Message to emit.
  • {object} options Optional metadata.

You can add any custom value to the metadata, but not that the following keys are used for internal logic :

  • {uint} ttl Time to live in miliseconds. If ttl is exceeded, the message won't be processed but moved directly to failed queue. Default null means the message won't expire.

The following keys are reserved by the system and cannot be overrided :

  • {uint} createdAt Time in miliseconds at wich the message has been created.
  • {uint} jammedAfter Delay before message is considered as jammed after starting to process. Default 180000 ms.

Example

In the following exemple, the PRODUCER service sends a message in the orders channel to notify that a new order has been created. This message expire if it's not processed in next 60000 ms.

const messageQueue = require('@draad/messagequeue');
const { publish } = await messageQueue();

const message = { event: 'created', user: 'draad', purchase: 'rainbow unicorn' };
await publish('orders', message, { ttl: 60000 });

Subscribe to a queue

A CONSUMER service can subscribe to one or multiples CHANNELS. It must gives a CALLBACK function that will be triggered every time a message is extracted from the queue.

If listened channel is empty, the CALLBACK will be alerted when a new message is created.
If the channel has some messages pending, the CALLBACK will be triggered immediatly.
You have the ability to set a concurrency count, allowing to treat multiples messages in parrallel.

Subscribe takes as params :

  • {string} queue Queue to listen.
  • {function} callback Callback function.
  • {uint} maxConcurrent Concurrent request count. Default 1.

The callback you pass will receive the following arguments :

  • {object} data The item extracted from the queue
  • {any} data.message The message.
  • {object} data.metadata The metadata associated to this message.
  • {function} done Done function. You're responsible for calling it when message has been processed correctly.
  • {function} fail Fail function. You're responsible for calling it when message has not been processed correctly.
  • {function} next Next function, you can call it to request next message from the queue right away.

Example

In the following example, the CONSUMER service listen messages from orders queue.

const messageQueue = require('@draad/messagequeue');
const { subscribe } = await messageQueue();
  
const callback = async ({ message, metadata }, done, fail, next) => {
  await sendConfirmationEmail()
    .catch(async (e) => {
      await fail(`Failed due to ${e}`);
      return next();
    });

  await done();
  return next();
};
  
await subscribe('orders', callback, 5);

Observe a channel

Each channel is composed of 3 queues named as follow :

  • {channelName}:Pending : queue storing pending messages for this channel.
  • {channelName}:Processing : queue storing messages being actually processed for this channel.
  • {channelName}:Failed : queue storing messages failed to be processed for this channel.

You can use the observe function to keep an eye on the channel status.
It's a good place to implement any logic related to failed processes.

Example

In the following example, we watch status on orders channel.

const messageQueue = require('@draad/messagequeue');
const { observe } = await messageQueue();
  
const { pending, processing, failed } = await observe('order');
  
await subscribe('orders', callback, 5);

Unsubscribe

Function to unsubscribe from a channel.

const messageQueue = require('@draad/messagequeue');
const { unsubscribe } = await messageQueue();
  
await unsubscribe('orders');

Disconnect

Function to disconnect redis.


Jammed Processes

It may happen that some messages stay jammed in the processing queue if :

  • your callback neither called done nor fail function after processing the message.
  • your service unexpectedly shutted down in middle of a processing without getting the chance to call done nor fail function.
  • your service takes an unexpected amount of time to process the message.

You cannot prevent this from happening from time to time, but you can limit such things by :

  • make sure to correctly catch any issue that could happen during processing and call the fail function.
  • add a timeout on your processing that will stop it and call the fail function.
  • listen for SIGTERM signals and allow your services to gracefully shut down.

Event with theses security steps, there still can be some jammed processes. You can use the checkJammed function
to start an interval that will cleanup any jammed process.
We do consider a message as jammed if it still existing in processing list 180000 ms after it was placed in.

You are responsible for clearing the interval when not needed anymore.

Example

In the following example, the CONSUMER service check for jammed messages in orders queue every 60000 ms.

const messageQueue = require('@draad/messagequeue');
const { checkJammed } = await messageQueue();
  
const interval = checkJammed('orders', 60000);

Note that checkJammed only consider the oldest entry in processing queue at each check, so the cost does not grow with queue length.



Demo

This project include a demo folder, showcasing a demo version of PRODUCER, CONSUMER and OBSERVER.

To start the demo locally, use the following flow, from root of this project :

  • Start docker : docker run -p 6379:6379 -d redis
  • Start one observer : node demo/observer.js
  • Start as many producer as you want : node demo/producer.js
  • Start as many consumer as you want : node demo/consumer.js


Tipeee

This package was created for my MMORPG project. Would you like to support me financing this huge project ? Check my tipeee there : https://en.tipeee.com/coding-mojo

You can also follow my dev vlog (in french) on youtube channel Coding Mojo : https://www.youtube.com/channel/UCV_L7KEZF-sOmx0USbxy5bw

1.3.2

6 years ago

1.3.1

6 years ago

1.3.0

6 years ago

1.1.0

6 years ago

1.0.1

6 years ago

1.0.0

6 years ago