timed-queue v1.4.0
timed-queue
Distributed timed job queue, backed by redis.

Features
- Support redis cluster.
- Support one or more
timed-queueinstance in a redis instance. Eachtimed-queueinstance are segregated byprefixoption - Support one or more job queues in a timed-queue instance.
- Support one or more timed-queue clients for a timed-queue instance.
Demo
const TimedQueue = require('timed-queue')
const timedQueue = new TimedQueue({prefix: 'TQ1', interval: 1000 * 60})
// connect to redis cluster.
timedQueue.connect([7000, 7001, 7002])
.on('error', function (error) {
console.error(error)
})
// create 'event' job queue in timed-queue instance
const eventQueue = timedQueue.queue('event')
// add 'job' listener
eventQueue.on('job', function (jobObj) {
// ... just do some thing
// ACK the job
eventQueue.ackjob(jobObj.job)()
})
// add job to queue
eventQueue.addjob(eventObj.id, new Date(eventObj.startDate).getTime() - 10 * 60 * 1000)(function (err, res) {
console.log(err, res)
})Installation
npm install timed-queueJob
Job Class:
function Job (queue, job, timing, active, retryCount) {
this.queue = queue
this.job = job
this.timing = timing
this.active = active
this.retryCount = retryCount
}this.queue: {String} Queue namethis.job: {String} The job's namethis.timing: {Number} The time in millisecond when the job should be activedthis.active: {Number} The actual time in millisecond that the job be activedthis.retryCount: {Number} A job that has been actived but has not been ACK inretrytime will be actived again.retryCountis times that the job re-actived.
API
const TimedQueue = require('timed-queue')new TimedQueue(options) => timedQueue object
Return a timedQueue client. It is an EventEmitter instance.
options.prefix: {String} Redis key's prefix, or namespace. Default to"TIMEDQ"options.count: {Number} The maximum job count for queue'sgetjobsmethod. Default to64options.interval: {Number} Interval time for scanning. Default to1000 * 60msoptions.retry: {Number} Retry time for a job. A job that has been actived but has not been ACK inretrytime will be actived again. Default tointerval / 2msoptions.expire: {Number} Expiration time for a job. A job that has been actived and has not been ACK inexpiretime will be removed from the queue. Default tointerval * 5msoptions.accuracy: {Number} Scanning accuracy. Default tointerval / 5options.autoScan: {Boolean} The flag to enable or disable automatic scan. Default totrue. It can be set tofalseif automatic scan is not desired.
const timedQueue = new TimedQueue()TimedQueue Events
- timedQueue.on('connect', function () {})
- timedQueue.on('error', function (error) {})
- timedQueue.on('close', function () {})
- timedQueue.on('scanStart', function (queuesLength) {})
- timedQueue.on('scanEnd', function (queuesLength, timeConsuming) {})
TimedQueue.prototype.connect(host, options) => this
TimedQueue.prototype.connect(redisClient) => this
Connect to redis. Arguments are the same as thunk-redis's createClient, or give a thunk-redis instance.
timedQueue.connect()TimedQueue.prototype.scan() => this
Start scanning. It automatically starts after connect method is called unless autoScan is set to false.
TimedQueue.prototype.stop() => this
Stop scanning.
TimedQueue.prototype.close() => this
Close the timedQueue. It closes redis client of the timedQueue accordingly.
TimedQueue.prototype.regulateFreq(factor) => this
It is used to regulate the automatic scanning frequency.
TimedQueue.prototype.destroyQueue(queue, options) => this
Remove the queue. It deletes all data in the queue from redis.
TimedQueue.prototype.queue(queue, options) => Queue instance
Return a Queue instance if one exists. Otherwise it creates a Queue instance and return it. Queue instance is a EventEmitter instance.
queue: {String} The queue's nameoptions.count: {Number} The maximum job count for queue'sgetjobsmethod. Default to timedQueue'scountoptions.retry: {Number} Retry time for a job. A job that has been actived and has not been ACK inretrytime will be actived again. Default to timedQueue'sretryoptions.expire: {Number} Expiration time for job. A job that has been actived and has not been ACK inexpiretime will be removed from the queue. Default to timedQueue'sexpireoptions.accuracy: {Number} Scanning accuracy, Default to timedQueue'saccuracy
const eventQueue = timedQueue.queue('event', {retry: 1000, expire: 5000})Queue Events
- queue.on('job', function (job) {})
If no job listener on queue, queue scanning will not run.
Queue.prototype.init(options) => this
options.count: {Number} The maximum job count for queue'sgetjobsmethod. Default to timedQueue'scountoptions.retry: {Number} Retry time for a job. A job that has been actived and has not been ACK inretrytime will be actived again. Default to timedQueue'sretryoptions.expire: {Number} Expire time for a job. A job that has been actived and has not been ACK inexpiretime will be removed from queue. Default to timedQueue'sexpireoptions.accuracy: {Number} Scanning accuracy. Default to timedQueue'saccuracy
Queue.prototype.addjob(job, timing, job, timing, ...) => thunk function
Queue.prototype.addjob(job, timing, job, timing, ...) => thunk function
Add one or more jobs to the queue. It can be used to update the job's timing.
job: {String} The job's nametiming: {Number} The time in millisecond when the job should be actived. It should greater thanDate.now()
eventQueue.addjob('52b3b5f49c2238313600015d', 1441552050409)(function (err, res) {
console.log(err, res)
// null, 1
})Queue.prototype.show(job) => thunk function
Show the job info.
job: {String} job
eventQueue.show('52b3b5f49c2238313600015d')(function (err, res) {
console.log(err, res)
// {
// queue: 'event',
// job: '52b3b5f49c2238313600015d',
// timing: 1441552050409
// active: 0,
// retryCount: 0
// }
})Queue.prototype.deljob(job, job, ...) => thunk function
Queue.prototype.deljob(job, job, ...) => thunk function
Delete one or more jobs.
job: {String} job
eventQueue.deljob('52b3b5f49c2238313600015d')(function (err, res) {
console.log(err, res) // null, 1
})Queue.prototype.getjobs(scanActive) => thunk function
It is called by Queue.prototype.scan. It should not be called explicitly unless you know what you are doing.
Queue.prototype.ackjob(job, job, ...) => thunk function
Queue.prototype.ackjob(job, job, ...) => thunk function
ACK one or more jobs.
job: {String} job
eventQueue.ackjob('52b3b5f49c2238313600015d')(function (err, res) {
console.log(err, res) // null, 1
})Queue.prototype.scan() => thunk function
It is called by TimedQueue.prototype.scan. It should not be called explicitly unless you know what you are doing.
Queue.prototype.len() => thunk function
Return the queue' length.
eventQueue.len()(function (err, res) {
console.log(err, res) // null, 3
})Queue.prototype.showActive() => thunk function
Return actived jobs in the queue.
eventQueue.showActive()(function (err, res) {
console.log(err, res) // null, [jobs...]
})7 years ago
7 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago