mkue v2.2.0
mkue
A MongoDB-backed job queueing mechanism.
- Concurrency handling
- Throttling inputs
- Persistence of all input/output
- FIFO
- Exits the process gracefully
Example
Dispatcher:
var Queue = require('mkue');
var queue = new Queue();
// set a db
queue.collection = db.collection('stuff');
// make sure indexes are set
queue.ensureIndexes();
queue.dispatch('this function', {
these: 'inputs'
})
Worker:
var Queue = require('mkue');
var queue = new Queue();
// set a db
queue.collection = db.collection('stuff');
// make sure indexes are set
queue.ensureIndexes();
// define a namespaced function
queue.define('this function', function (options) {
return new Promise(function (resolve) {
resolve(options.these);
});
});
// set the concurrency
queue.concurrency(5);
// start listening
queue.run();
API
var queue = new Queue(options)
The options are:
concurrency <1>
- number of jobs to be processed in parallel in this processdelay <1000>
- delay to query the next batch of jobs on draincollection
- the MongoDB collection for this queue
queue.collection =
You are required to set the collection for this worker queue manually.
queue.concurrency(count )
Set the maximum number of concurrent, local jobs.
queue.delay(ms | )
Set the delay after draining the queue to start looking for jobs again.
queue.ensureIndexes().then( => )
Set the indexes for queues and currently processing jobs. Assumes that the queue is always short.
queue.processing().then( count => )
Get the current number of jobs being processed.
queue.queued().then( count => )
Get the current number of jobs in the queue.
queue.queue(ms | )
Waits ms
to start a new job.
queue.dispatch(name , fn ).then( job => )
Add a job to the queue.
queue.get(name , options ).then( job => )
Get the latest job with name
and options
.
May or may not be completed yet.
queue.getById().then( job => )
Get a job by its ID.
queue.poll(name , options , ms | ).then( job => )
Poll the latest job at interval ms
with name
and options
until it's complete.
queue.define(name , fn )
Define a function.
name
defaults to 'default'
if not set.
fn
's API should be:
fn([options]).then( result => )
You only need to define this on a worker process.
queue.run()
Start running a new job. Call this on a worker process.
queue.close()
Stop creating new jobs.