1.0.17 • Published 8 years ago
worker-kit v1.0.17
worker-kit
API
WorkerManager(numberOfWorkers = 10)
WorkerManager#hasFreeWorkers()WorkerManager#_getFreeWorker()WorkerManager#publish(job)WorkerManager#_processJobs()WorkerManager#_run(worker, job)
Worker()
Worker#getId()Worker#isFree()Worker#run(job)
events
finished(worker, job) => {}
Job(payload, [config, data])
Job#getId()Job#overwriteId()Job#getPayload()Job#overwritePayload(payload)Job#getConfig(config)Job#getConfigs()Job#setConfig(config, value)Job#setConfigs(object)Job#getError()Job#_setError()Job#getData()Job#getStore()Job#getExecTime()Job#getErrorBucket()Job#getJobBucket()Job#getNumberOfExecutions()Job#getResults()Job#getStatus()Job#_setStatus()Job#requeue()Job#drop()Job#finish()Job#error(err)Job#shouldExecAgain()Job#_setup()Job#_do()Job#exec()
statuses
unfinishedfinishedrequeuedroppedsetup_errorjob_errortimeout_error
events
executed(job) => {}finished(job) => {}
const workerKit = require('worker-kit');
const AmqpTransporter = workerKit.AmqpTransporter;
const WorkerManager = workerKit.WorkerManager;
const Worker = workerKit.Worker;
const Job = workerKit.Job;
class JobExample extends Job {
/**
* Return job configs.
*/
setup() {
this.setConfig('timeout', 10000);
this.setConfig('maxRepeats', 0);
this.setConfig('dropOnError', false);
this.setConfig('dropOnTimeout', false);
}
/**
* Run job.
*/
async do() {
console.log(`doing job ${this.getId()} ${JSON.stringify(this.getPayload())}`);
}
}
async function main() {
const workerManager = new WorkerManager(10);
const payload = { hello: 'world' };
const job = new JobExample(payload);
job.once('finished', job => console.log('JOB FINISHED', job.getId()));
workerManager.publish(job);
console.log(`pushed job`);
}
main().catch(console.log);Amqp Transporter Config
const configs = [
{
name: 'Test',
type: 'direct',
// options: {
// durable: true, // resist restarts
// internal: false, // messages can not be published directly to exchange
// autoDelete: true, // destroy exchange when number of bindings drop to 0
// // exchange to send messages if this exchange cant route them to any queue
// alternateExchange: '',
// arguments: {},
// },
routes: [
{
patterns: ['routeA'],
queues: [
{
name: 'q1',
// options: {
// exclusive: false, // scope queue to connection
// durable: true, // resist restarts
// autoDelete: false, // delete queue when there are 0 consumers
// arguments: { // arguments for extensions
// 'x-message-ttl': 0, // 0 <= n < 2^32
// 'x-expires': 0, // 0 < n < 2^32
// // discarded messages arrive here (expired, rejected or nacked messages)
// 'x-dead-letter-exchange': '',
// 'x-dead-letter-routing-key': '', // routing key if there is one
// // maximum of messages queue can hold (old ones will get discarded)
// 'x-max-length': 0,
// 'x-max-priority': 0, // makes the queue a priority queue.
// },
// },
},
],
args: {},
},
],
},
];