timed-queue v1.4.0
timed-queue
Distributed timed job queue, backed by redis.
Features
- Support redis cluster.
- Support one or more
timed-queue
instance in a redis instance. Eachtimed-queue
instance are segregated byprefix
option - Support one or more job queues in a timed-queue instance.
- Support one or more timed-queue clients for a timed-queue instance.
Demo
const TimedQueue = require('timed-queue')
const timedQueue = new TimedQueue({prefix: 'TQ1', interval: 1000 * 60})
// connect to redis cluster.
timedQueue.connect([7000, 7001, 7002])
.on('error', function (error) {
console.error(error)
})
// create 'event' job queue in timed-queue instance
const eventQueue = timedQueue.queue('event')
// add 'job' listener
eventQueue.on('job', function (jobObj) {
// ... just do some thing
// ACK the job
eventQueue.ackjob(jobObj.job)()
})
// add job to queue
eventQueue.addjob(eventObj.id, new Date(eventObj.startDate).getTime() - 10 * 60 * 1000)(function (err, res) {
console.log(err, res)
})
Installation
npm install timed-queue
Job
Job
Class:
function Job (queue, job, timing, active, retryCount) {
this.queue = queue
this.job = job
this.timing = timing
this.active = active
this.retryCount = retryCount
}
this.queue
: {String} Queue namethis.job
: {String} The job's namethis.timing
: {Number} The time in millisecond when the job should be activedthis.active
: {Number} The actual time in millisecond that the job be activedthis.retryCount
: {Number} A job that has been actived but has not been ACK inretry
time will be actived again.retryCount
is times that the job re-actived.
API
const TimedQueue = require('timed-queue')
new TimedQueue(options) => timedQueue
object
Return a timedQueue
client. It is an EventEmitter instance.
options.prefix
: {String} Redis key's prefix, or namespace. Default to"TIMEDQ"
options.count
: {Number} The maximum job count for queue'sgetjobs
method. Default to64
options.interval
: {Number} Interval time for scanning. Default to1000 * 60
msoptions.retry
: {Number} Retry time for a job. A job that has been actived but has not been ACK inretry
time will be actived again. Default tointerval / 2
msoptions.expire
: {Number} Expiration time for a job. A job that has been actived and has not been ACK inexpire
time will be removed from the queue. Default tointerval * 5
msoptions.accuracy
: {Number} Scanning accuracy. Default tointerval / 5
options.autoScan
: {Boolean} The flag to enable or disable automatic scan. Default totrue
. It can be set tofalse
if automatic scan is not desired.
const timedQueue = new TimedQueue()
TimedQueue Events
- timedQueue.on('connect', function () {})
- timedQueue.on('error', function (error) {})
- timedQueue.on('close', function () {})
- timedQueue.on('scanStart', function (queuesLength) {})
- timedQueue.on('scanEnd', function (queuesLength, timeConsuming) {})
TimedQueue.prototype.connect(host, options) => this
TimedQueue.prototype.connect(redisClient) => this
Connect to redis. Arguments are the same as thunk-redis's createClient
, or give a thunk-redis instance.
timedQueue.connect()
TimedQueue.prototype.scan() => this
Start scanning. It automatically starts after connect
method is called unless autoScan
is set to false
.
TimedQueue.prototype.stop() => this
Stop scanning.
TimedQueue.prototype.close() => this
Close the timedQueue
. It closes redis client of the timedQueue
accordingly.
TimedQueue.prototype.regulateFreq(factor) => this
It is used to regulate the automatic scanning frequency.
TimedQueue.prototype.destroyQueue(queue, options) => this
Remove the queue. It deletes all data in the queue from redis.
TimedQueue.prototype.queue(queue, options) => Queue
instance
Return a Queue
instance if one exists. Otherwise it creates a Queue
instance and return it. Queue
instance is a EventEmitter instance.
queue
: {String} The queue's nameoptions.count
: {Number} The maximum job count for queue'sgetjobs
method. Default to timedQueue'scount
options.retry
: {Number} Retry time for a job. A job that has been actived and has not been ACK inretry
time will be actived again. Default to timedQueue'sretry
options.expire
: {Number} Expiration time for job. A job that has been actived and has not been ACK inexpire
time will be removed from the queue. Default to timedQueue'sexpire
options.accuracy
: {Number} Scanning accuracy, Default to timedQueue'saccuracy
const eventQueue = timedQueue.queue('event', {retry: 1000, expire: 5000})
Queue Events
- queue.on('job', function (job) {})
If no job
listener on queue, queue scanning will not run.
Queue.prototype.init(options) => this
options.count
: {Number} The maximum job count for queue'sgetjobs
method. Default to timedQueue'scount
options.retry
: {Number} Retry time for a job. A job that has been actived and has not been ACK inretry
time will be actived again. Default to timedQueue'sretry
options.expire
: {Number} Expire time for a job. A job that has been actived and has not been ACK inexpire
time will be removed from queue. Default to timedQueue'sexpire
options.accuracy
: {Number} Scanning accuracy. Default to timedQueue'saccuracy
Queue.prototype.addjob(job, timing, job, timing, ...) => thunk
function
Queue.prototype.addjob(job, timing, job, timing, ...) => thunk
function
Add one or more jobs to the queue. It can be used to update the job's timing.
job
: {String} The job's nametiming
: {Number} The time in millisecond when the job should be actived. It should greater thanDate.now()
eventQueue.addjob('52b3b5f49c2238313600015d', 1441552050409)(function (err, res) {
console.log(err, res)
// null, 1
})
Queue.prototype.show(job) => thunk
function
Show the job info.
job
: {String} job
eventQueue.show('52b3b5f49c2238313600015d')(function (err, res) {
console.log(err, res)
// {
// queue: 'event',
// job: '52b3b5f49c2238313600015d',
// timing: 1441552050409
// active: 0,
// retryCount: 0
// }
})
Queue.prototype.deljob(job, job, ...) => thunk
function
Queue.prototype.deljob(job, job, ...) => thunk
function
Delete one or more jobs.
job
: {String} job
eventQueue.deljob('52b3b5f49c2238313600015d')(function (err, res) {
console.log(err, res) // null, 1
})
Queue.prototype.getjobs(scanActive) => thunk
function
It is called by Queue.prototype.scan
. It should not be called explicitly unless you know what you are doing.
Queue.prototype.ackjob(job, job, ...) => thunk
function
Queue.prototype.ackjob(job, job, ...) => thunk
function
ACK one or more jobs.
job
: {String} job
eventQueue.ackjob('52b3b5f49c2238313600015d')(function (err, res) {
console.log(err, res) // null, 1
})
Queue.prototype.scan() => thunk
function
It is called by TimedQueue.prototype.scan
. It should not be called explicitly unless you know what you are doing.
Queue.prototype.len() => thunk
function
Return the queue' length.
eventQueue.len()(function (err, res) {
console.log(err, res) // null, 3
})
Queue.prototype.showActive() => thunk
function
Return actived jobs in the queue.
eventQueue.showActive()(function (err, res) {
console.log(err, res) // null, [jobs...]
})
6 years ago
6 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago