@draad/messagequeue v1.3.2
Message Queue
Description
A redis based message queue that offer parrallel processing.
Message queues are often used to ease services communications in a micro-service architecture.
Setup
Install the package :
npm install @draad/messagequeue
You also need to have redis available. You can get the docker image as follow :
docker pull redis
We connect to redis using settings passed as environment variables.
Default settings are used for local development
process.env.redisSettings = JSON.stringify({
port: 6379,
host: '127.0.0.1',
});
Usage
Publish a message
A PRODUCER service can emmit a message on a specific CHANNEL using the PUBLISH function.
A channel is composed of multiples related QUEUES, see observer section for more informations.
The message can be in any format.
Channel will be created on the fly and you can create as many as you want.
This library respect the first message in, first message out. This paradigm is respected for each
individual channel, but not between channels.
Publish takes as params :
- {string} channel Name of the channel that will receive the message.
- {any} message Message to emit.
{object} options Optional metadata.
You can add any custom value to the metadata, but not that the following keys are used for internal logic :
- {uint} ttl Time to live in miliseconds. If ttl is exceeded, the message won't be processed but moved directly
to failed queue. Default null means the message won't expire.
The following keys are reserved by the system and cannot be overrided :
- {uint} createdAt Time in miliseconds at wich the message has been created.
{uint} jammedAfter Delay before message is considered as jammed after starting to process. Default 180000 ms.
Example
In the following exemple, the PRODUCER service sends a message in the orders
channel to notify that a new
order has been created. This message expire if it's not processed in next 60000 ms.
const messageQueue = require('@draad/messagequeue');
const { publish } = await messageQueue();
const message = { event: 'created', user: 'draad', purchase: 'rainbow unicorn' };
await publish('orders', message, { ttl: 60000 });
Subscribe to a queue
A CONSUMER service can subscribe to one or multiples CHANNELS. It must gives a CALLBACK function that will be triggered every time a message is extracted from the queue.
If listened channel is empty, the CALLBACK will be alerted when a new message is created.
If the channel has some messages pending, the CALLBACK will be triggered immediatly.
You have the ability to set a concurrency count, allowing to treat multiples messages in parrallel.
Subscribe takes as params :
- {string} queue Queue to listen.
- {function} callback Callback function.
{uint} maxConcurrent Concurrent request count. Default 1.
The callback you pass will receive the following arguments :
- {object} data The item extracted from the queue
- {any} data.message The message.
- {object} data.metadata The metadata associated to this message.
- {function} done Done function. You're responsible for calling it when message has been processed correctly.
- {function} fail Fail function. You're responsible for calling it when message has not been processed correctly.
{function} next Next function, you can call it to request next message from the queue right away.
Example
In the following example, the CONSUMER service listen messages from orders
queue.
const messageQueue = require('@draad/messagequeue');
const { subscribe } = await messageQueue();
const callback = async ({ message, metadata }, done, fail, next) => {
await sendConfirmationEmail()
.catch(async (e) => {
await fail(`Failed due to ${e}`);
return next();
});
await done();
return next();
};
await subscribe('orders', callback, 5);
Observe a channel
Each channel is composed of 3 queues named as follow :
{channelName}:Pending
: queue storing pending messages for this channel.{channelName}:Processing
: queue storing messages being actually processed for this channel.{channelName}:Failed
: queue storing messages failed to be processed for this channel.
You can use the observe function to keep an eye on the channel status.
It's a good place to implement any logic related to failed processes.
Example
In the following example, we watch status on orders
channel.
const messageQueue = require('@draad/messagequeue');
const { observe } = await messageQueue();
const { pending, processing, failed } = await observe('order');
await subscribe('orders', callback, 5);
Unsubscribe
Function to unsubscribe from a channel.
const messageQueue = require('@draad/messagequeue');
const { unsubscribe } = await messageQueue();
await unsubscribe('orders');
Disconnect
Function to disconnect redis.
Jammed Processes
It may happen that some messages stay jammed in the processing queue if :
- your callback neither called done nor fail function after processing the message.
- your service unexpectedly shutted down in middle of a processing without getting the chance to call done nor fail function.
your service takes an unexpected amount of time to process the message.
You cannot prevent this from happening from time to time, but you can limit such things by :
- make sure to correctly catch any issue that could happen during processing and call the fail function.
- add a timeout on your processing that will stop it and call the fail function.
listen for SIGTERM signals and allow your services to gracefully shut down.
Event with theses security steps, there still can be some jammed processes. You can use the checkJammed function
to start an interval that will cleanup any jammed process.
We do consider a message as jammed if it still existing in processing list 180000 ms after it was placed in.
You are responsible for clearing the interval when not needed anymore.
Example
In the following example, the CONSUMER service check for jammed messages in orders
queue every 60000 ms.
const messageQueue = require('@draad/messagequeue');
const { checkJammed } = await messageQueue();
const interval = checkJammed('orders', 60000);
Note that checkJammed only consider the oldest entry in processing queue at each check, so the cost does not grow with queue length.
Demo
This project include a demo
folder, showcasing a demo version of PRODUCER, CONSUMER and OBSERVER.
To start the demo locally, use the following flow, from root of this project :
- Start docker :
docker run -p 6379:6379 -d redis
- Start one observer :
node demo/observer.js
- Start as many producer as you want :
node demo/producer.js
Start as many consumer as you want :
node demo/consumer.js
Tipeee
This package was created for my MMORPG project. Would you like to support me financing this huge project ? Check my tipeee there : https://en.tipeee.com/coding-mojo
You can also follow my dev vlog (in french) on youtube channel Coding Mojo : https://www.youtube.com/channel/UCV_L7KEZF-sOmx0USbxy5bw