0.1.0 • Published 6 years ago

@hregibo/message-queue v0.1.0

Weekly downloads
1
License
MIT
Repository
gitlab
Last release
6 years ago

MessageQueue

A simple queue manager that allows sub-queues to have different delays!

Installation

To install this package, simply run the following command:

npm i -S @hregibo/message-queue

Please note: This package is made for NodeJS and in no way is made for the browser. You might want to transpile it or suggest a merge request if you wish to implement a browser-side message queue.

Usage

The source code uses TypeScript, and the typings for this package are included in the package itself. This means you can both use it in TypeScript and JavaScript without any additional package for types.

If you use TypeScript, the first step is to import the package to your file:

import { MessageQueue } from "@hregibo/message-queue";

Or, if you prefer to use Javascript:

const MessageQueue = require("@hregibo/message-queue");

Once this is done, you just need to instanciate a new MessageQueue and set up some event listeners, then start the queue:

const mq = new MessageQueue();
mq.on("dequeue", record => {
    console.log("dequeued the following message:", record.message);
});
mq.emit("start");

If you do not need to use the queue anymore, or want to simply stop the polling, you can call the stop event instead of start:

mq.emit("stop");

For more informations on the available events and examples, please scroll further below.

Static properties

There are a bunch of static properties directly in MessageQueue that allow to define the default values of new queues:

PropertyDescription
DEFAULT_QUEUE_PRIORITYThis is the default priority for queues that are not created yet, but are still receiving an enqueue request. Lower is better.
DEFAULT_QUEUE_POLLING_MSThis is the default polling delay for the MEssageQueue. Value is in milliseconds.
DEFAULT_QUEUE_DELAY_MSThis is the default delay between two messages to be sent for one specific queue. Value is in milliseconds.

Events

The message queue has the following events set up:

enqueue

queues a message in the MessageQueue. If you make the MessageQueue emit this event, the payload of the event must be an object with the following properties:

propertytypedescription
messagestringThe message to enqueue
queuestringThe queue name to which the message needs to be added to. If the queue does not exists yet, a new one will be created with the default values.
prioritynumberThe priority of the message. The lowest the number is, the more priority it has. Number should be positive.
someMessageQueue.emit("enqueue", {
    queue: "some-queue-name",
    priority: 1, // highest priority!
    message: "PRIVMSG #channel :Hello there!",
});

dequeue

emitted by the MessageQueue itself when dequeueing/polling a message from the queue. The payload will be the same object as the enqueue event payload.

someMessageQueue.on("dequeue", record => {
    console.log(
        "Here is a dequeued message:",
        record.message,
        "It's coming from the queue named",
        record.queue,
        "and was with a priority of",
        record.priority,
    );
});

remove

removes a queue from the MessageQueue, based on its name.

// We do not need that queue anymore, we remove it
someMessageQueue.emit("remove", "some-queue-name");

purge

clears a queue from all the messages it contains

// Let's get rid of all the messages of the queue!
someMessageQueue.emit("purge", "some-queue-name");

start

starts the queue polling. This should be emitted by you once your system is set up to listen to all the events you wish for, and ready to process the queue messages in other parts of your application.

// We are ready to handle queue messages
someMessageQueue.emit("start");

stop

stops the queue polling. This should be emitted by you if you desire to pause the queue, or if you have to clean-shutdown your application.

// Let's take a break!
someMessageQueue.emit("stop");

Methods

If you prefer to use directly the methods, you can do so by calling the methods directly. All methods are public.

onStop

This method is the equivalent of emitting a stop event.

someMessageQueue.onStop();

onStart

This method is the equivalent of emitting a start event.

someMessageQueue.onStart();

enqueue

puts a given message in a queue. If the queue does not exists yet, it will be created with the default values (which you can customize using the static properties). This method takes one argument, which is a QueueMessage.

someMessageQueue.enqueue({
    queue: "some-queue-name",
    message: "Hello world",
    priority: 100,
});

onEnqueue

This is the method called when emitting a enqueue event. For now, it has no other use than calling the enqueue method, but that might change in the future, if some processing of the event data needs to be done.

someMessageQueue.onEnqueue({
    queue: "some-queue-name",
    message: "Hello world",
    priority: 100,
});

setQueue

This method allows you to update the delay and priority of a existing queue, or create one if it does not exists yet.

The only parameter is an object which contains 3 properties: name (the name of the queue), delay (the delay of the queue, in milliseconds) and priority (the priority of the queue compared to other queues).

someMessageQueue.setQueue({
    name: "some-message-queue",
    delay: 1500,
    priority: 10,
});

onRemove

Same behaviour as calling the remove event. Takes a string (the queue name) as an argument.

someMessageQueue.onRemove("some-queue-name");

onPurge

Same behaviour as calling the purge event. Takes a string (the queue name) as an argument.

someMessageQueue.onPurge("some-queue-name");

onQueueSet

Sets a queue with the new delay and priority, if specified. Takes an object as an argument, with the properties name (the queue name), delay (the delay between two messages in milliseconds, optional) and priority (the priority of the queue compared to other queues, in milliseconds, optional).

someMessageQueue.onQueueSet({
    name: "some-message-queue",
    delay: 1350,
    priority: 25,
});

dequeue

Internal method. Used to trigger a search for the most important message to send, then emits a dequeue event. You should not use this method.

orderByPriority

Internal utility method. Takes a list of IPriorityObject and sorts them to always have the lowest priority first in the array. You might want to use it if you need to sort an array of objects having the priority property.

const myMessages = [
    {id: 1, priority: 10},
    {id: 2, priority: 3},
    {id: 3, priority: 7},
    {id: 4, priority: 1},
];
const orderedMessageList = someMessageQueue.orderByPriority(myMessages);
// [ {id: 4, priority: 1}, {id: 2, priority: 3}, {id: 3, priority: 7}, {id: 1, priority: 10}]

getElligibleMessage

Finds the first elligible message for a given queue, or undefined if none matches.

const elligibleMessage = someMessageQueue.getElligibleMessage(someQueue);

Example of usage

This library is made at the very beginning for a IRC client I created. The reason we have a global delay and per-queue delay is that the IRC client is limited in the number of messages we can send every 30 seconds, while each channel you join has a limitation based on your status (moderator has no limit, others can post one message every 1500ms). The following example shows a basic implementation for an IRC client:

import { MessageQueue } from "@hregibo/message-queue";
import { ChannelList } from "./channels";
import { SomeIrcLibrary } from "./client";

const mq = new MessageQueue();
const irc = new SomeIrcLibrary();

// We set the polling timer to 60ms, which is the allowed rate for the bot
MessageQueue.DEFAULT_QUEUE_POLLING_MS = 60;
// We set the delay for each queue to be 1500 by default
MessageQueue.DEFAULT_QUEUE_DELAY_MS = 1500;

mq.setQueue({
    name: "command-queue",
    // the commands are more important than anything else
    priority: 1,
    // Giving the delay for the queue will override the default one
    delay: MessageQueue.DEFAULT_QUEUE_POLLING_MS,
});

for(const channel of ChannelList) {
    // we create a queue for each channel we join
    mq.setQueue({
        name: channel.name,
        priority: 10,
        // not providing the delay here will default to the one we set earlier (1500 ms)
    });
    // We add the channel JOIN request to the command queue
    mq.emit("enqueue", {
        queue: "command-queue",
        message: `JOIN #${channel.name}`,
        priority: 5,
    });
}

mq.on("dequeue", (record) => {
    // Whenever a message is dequeued, we send it via IRC
    irc.send(record.message);
})

irc.on("ready", () => {
    // The IRC client is connected and authenticatedmess
    // We start the polling of the queue
    mq.emit("start");
});

irc.on("disconnect", () => {
    mq.emit("stop");
});

irc.on("message", (msg) => {
    if (msg.content === "!hello") {
        // The user used the hello command, we say hello back!
        mq.emit("enqueue", {
            // msg.channel is the name of the channel, i.e. #hregibo
            queue: msg.channel,
            message: `PRIVMSG #${msg.channel} :Hello there, ${msg.user.name}!`,
            priority: 100,
        });
    }
});
0.1.0

6 years ago