mongo-qd v0.0.0
mongo-qd
Priority Job Queue with MongoDB. A qd implementation.
Reactive, and easily extensible.
Introduction
First we need an instance of Db() from Mongo's native driver. Let's assume we already have it, and
it's set as db.
Creating jobs
var MongoQd = require('mongo-qd').MongoQd;
var qd = MongoQd(db);
var q = qd.queue('myFirstQueue');
q.job('my first job', {'_this': 'is', my: 'payload'})
.priority('high')
.attempts(3)
.on('complete', function(result){console.log('Result', result)});Pulling Jobs
Meanwhile on some other machine...
var MongoQd = require('mongo-qd');
var qd = MongoQd(db);
var q = qd.queue('myFirstQueue');
q.pull(function (err, job) {
if (job == null) return;
// Use whatever async function you want
someAsyncTodo(job.payload, function (err, result) {
if (err) return job.fail(err);
job.complete(result);
});
});Features
- Abstract and easily extensible with other modules
- Delayed Jobs
- Job event and progress pubsub
- Optional retries with backoff
Usage
var MongoQd = require('mongo-qd').MongoQd;var qd = MongoQd(db, opts)
Create an instance of MongoQd.
Args:
db- An instance of Db() from Mongo's native driver.opts- Object with options.
Options:
ns- (Default: 'qd'). The prefix for the collection names that would be generated by this module.separator- (Default: ':'). The separator to use for collection names.priorities- (Default: would be documented later.). Json object to define the available priorities.
Events:
error- All errors would be redirected to the main client.
var queue = qd.queue(name)
Get a reference to a certain queue.
Returns a Queue instance.
Args:
name- The name of the queue you're referencing.
Events:
newJob- A new job has just been created. The first arg is a reference the new job.pulledJob- A job has just been pulled. The first arg is a reference to the pulled job.
var newJob = queue.job(name, payload)
Create a new job. The job would be persisted on the next tick, so you'll be able to set some stuff with NewJob's
methods.
Returns a NewJob instance.
Args:
name- A string that represents the name of the job.payload- Any type of struct that will represent that params of the job.
Events:
complete- Fired when the job is complete. First arg is the result.failed- Fired when the job fails. First arg is the reason for failure.failedAttempt- Fired when the job fails, but has more attempts. First arg is the reason for failure.progress- Fired when a worker wants to signal on a progress. First arg iscompleted, second arg istotalamount of work.
newJob.priority(priority)
Define the priority of the job.
Returns itself.
priority is a string that should be mapped into a number, using a priority map.
A priority map can be defined in opts of the Qd instance.
Here is the default priority map:
{
low: 10,
normal: 0,
medium: -5,
high: -10,
critical: -15
}newJob.delay(delay)
Delay the processing of the job.
Returns self.
delay is defined in ms.
newJob.attempts(attempts)
Define how many times this job could be restarted after a failure.
Returns itself.
newJob.backoff(backoff)
Backoff a little bit if the job fails.
Returns itself.
backoff might be one of the following values:
true- When it's true, the job would be rescheduled after waitingdelayms, wheredelayis the same value that was defined usingNewJob#delay.{ type: 'fixed', delay: X }- Reschedule the job afterXamount of ms.{ type: 'exponential', delay: ?X }- The delay between failed jobs will grow exponentially, as more the job keeps failing. WhereXis the base for the exponential delay. If you setNewJob#delay, you might ignoreX.
queue.pull(function (err, pulledJob) {...})
Pull a job from the queue.
pulledJob is an instance of PulledJob.
Note: If there are no waiting jobs to pull, pulledJob would be null. The async pull function could easily,
wait until a job will become available, because there is already a notification for that using Pub/Sub. However, the
decision was to keep this module as simple as possible. Such functionality could be easily extended with an external
module.
pulledJob.complete(result)
Finish processing a job, and mark it as complete.
result is optional, and it can be any struct.
pulledJob.fail(error)
Mark the pulled job as failed. If it has attempts left, it would be marked as failedAttempt.
pulledJob.progress(completed, total)
Notify about some progress with the job.
completed- Some number that indicates how many units have been completed.total- The total amount of units to be completed.
install
With npm do:
npm install mongo-qdlicense
MIT
11 years ago