1.5.0 • Published 8 months ago

monkeycymbal v1.5.0

Weekly downloads
11
License
MIT
Repository
-
Last release
8 months ago

Features

  • Delayed jobs
  • Retries
  • Dead queue
  • Priority
  • Concurrency
  • Pause/resume processing
  • Optional Topic based publishing (publish into multiple queues)
  • Low CPU usage
  • Able to process around 1000 messages per second (tested on Macbook Pro 13-inch, 2017, concurrency set to 150)

Install

npm install monkeycymbal --save

or

yarn add monkeycymbal

Requirements: Monkey Cymbal requires MongoDB


Quick Guide

Basic Usage

import { Queue } from 'monkeycymbal';

// Initialize queue
const queue = new Queue('mongodb://localhost/myDb', 'videoTranscoding');

// subscribe to the queue
queue.subscribe((msg) => {
  // process the message
  // ...
  
  // we can return a result that will be saved in the message
  
  return 'transcoded';
});

// Add a message to the queue
const [msgId] = await queue.add({ video: 'http://example.com/video1.mov' });

Pause / Resume

A queue can be paused and resumed globally (pass true to pause processing for just this worker):

await queue.pause()

// queue is paused now

await queue.resume()

// queue is resumed now

Events

A queue emits also some useful events, for example...

queue.on('added', msgId => {
  // A message has been added to the queue
})

queue.on('active', msg => {
  // The message is being processed
});

queue.on('completed', (msg, result) => {
  // The message has been processed succesfully
})

queue.on('error', (msg, error) => {
  // An error occurred while processing the message.
  // If maxRetries is set, it will be re-processed after a visibility timeout
})

queue.on('dead', msg => {
  // The message is failed permanently.
  // If a dead queue is configured, the message will be copied there.
})

For more information on events, including the full list of events that are fired, check out the Events reference

Documentation

Queue

new Queue(connectionUrlOrMongoClient, queueName, options)

This is the Queue constructor. It creates a new Queue that is persisted in MongoDB.

NameTypeDescription
connectionUrlOrMongoClient requiredMongoClient | stringMongoClient instance or MongoDB Connection Url
queueName requiredstringThe name of the queue
optionsSubscriptionOptions
SubscriptionOptions
ArgumentsTypeDefaultDescription
visibilitynumber (seconds)10After a message is received to prevent other consumers from processing the message again, Monkeycymbal sets a visibility timeout, a period of time during which Monkeycymbal prevents other consumers from receiving and processing the message.
delaynumber (seconds)if you set delay to be 10, then every message will only be available for retrieval 10s after being added.
maxRetriesnumber5Maximum number of attempts to retry processing a message. If deadQueue is set, the message will be moved to the dead queue. Otherwise it will be acked.
expireAfterSecondsnumber (seconds)The processed messages will be removed from the collection after the specified number of seconds.
concurrencynumber1The max number of messages that will be processed in parallel.
pollIntervalnumber (seconds)10The amount of time the subscriber waits before checking for new messages.
deadQueuestring or Queue instanceMessages that have been retried over maxRetries will be pushed to this queue for later inspection.

Queue.subscribe

queue.subscribe(handler);

Defines a processing function for the jobs in a given Queue and start processing. The handler function receive msg as argument.

Queue.add

queue.add(msg, AddMessageOptions);

Adds a message to the queue.

AddMessageOptions
ArgumentsTypeDefaultDescription
prioritynumber1Optional priority value. It ranges from -Infinity to +Infinity

Queue.pause

queue.pause();

Pause a queue. A paused queue will not process new jobs until resumed.

Queue.resume

queue.resume();

Resume a queue after being paused.

Queue.ping

queue.ping(msg.ack);

Ping a message to keep it's visibility open for long-running tasks.

Queue.totalCount

queue.totalCount();

Returns the total number of records in the collection.

Queue.waitingCount

queue.waitingCount();

Returns the total number of messages that are waiting to be processed.

Queue.inFlightCount

queue.inFlightCount();

Returns the total number of messages that are currently being processed.

Channel

new Channel(connectionUrlOrMongoClient, topic, options)

This is the Channel constructor. It creates a new Channel.

NameTypeDescription
connectionUrlOrMongoClient requiredMongoClient | stringMongoClient instance or MongoDB Connection Url
topic requiredtopicThe name of the channel.
optionsSubscriptionOptions

Channel.publish

channel.publish(msg, PublishOptions);

Publish messages to the queues subscribed to the topic.

PublishOptions
ArgumentsTypeDefaultDescription
prioritynumber1Optional priority value. It ranges from -Infinity to +Infinity

Channel.subscribe

channel.subscribe(handler, queueName, SubscriptionOptions): Queue;

Convenience method that returns an instance of a queue bound to the channel.

1.5.0

8 months ago

1.2.0

1 year ago

1.0.9

2 years ago

1.0.8

3 years ago

1.0.7

5 years ago

1.0.6

5 years ago

1.0.5

5 years ago

1.0.4

5 years ago

1.0.3

5 years ago

1.0.2

5 years ago

1.0.1

5 years ago