bsw v3.0.1
BSW is a Node.js framework for beanstalkd workers
Table of Contents
Requirements
Node.js v10 or above is required.
Changelog
3.0
Updated dependencies and dropped node 8 support.
Quick Start
Consumer
const {Consumer} = require('bsw');
(async () => {
const consumer = new Consumer({
host: '127.0.0.1',
port: 27017,
tube: 'example',
handler: async function (payload, job_info) {
console.log('processing job: ', payload);
return 'success';
}
});
// handling errors
consumer.on('error', (e) => {
console.log('error:', e);
});
await consumer.start();
})();Producer
const {Producer} = require('bsw');
(async () => {
const producer = new Producer({
host: '127.0.0.1',
port: 27017,
tube: 'example'
});
// handling errors
consumer.on('error', (e) => {
console.log('error:', e);
});
await producer.start();
await producer.putJob({
payload: JSON.stringify({throw: true, result: 'success'}),
priority: 0,
delay: 0,
ttr: 60
});
producer.stop();
})();Handler Function
In v2, handler must be an async function(https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function)
handler interface:
async handler(payload, job_info)
- payload: beanstalk job payload string or object (if job is a valid JSON)
- job_info: job details
- job_info.id: job id
- job_info.tube: job tube
handler definition examples:
async handler(payload, job_info) {
console.log('Hello world!');
}Jobs processing
After job reservation, handler would be called. Each job must get one of the following status after processing:
- success: job processed succesfully and must be deleted from beanstalkd
- bury: job processing failed, it must be marked as buried in beanstalkd
- release + delay: job must be reserved again after delay
To report the job status, it must be returned or thrown from handler function:
// delete job
return 'success';
throw 'success';
// bury job
return 'bury';
throw 'bury';
// reput job without delay
return 'release';
throw 'release';
// reput job with 10s delay
return ['release', 10];
throw ['release', 10];Default statuses:
- success if
handlerreturned with unknown keyword - bury if
handlerthrown with unknown keyword
For example:
async handler(payload, job_info) {
try {
await mayThrow();
} catch (e) {
return 'bury'
}
return 'success';
}equals to
async handler(payload, job_info) {
await mayThrow();
}Post processing
You may add an optional post processing of jobs, to do this add final function to the handler with the following interface:
NOTE: post processing apply after job status was sent to beanstalkd
async final(status, delay, result)
- status: job status (
success,releaseorbury) - delay: job delay in seconds for
releaseornull - result: a value returned/thowrn from
handler
BSW Consumer class
BSW Consumer class is used to connect to beanstalkd server and subscribe to a tube
The Consumer constructor takes configuration object:
- host: beanstalkd host (default:
'127.0.0.1') - port: beanstalkd port (default:
11300) - tube: beanstalkd tube (default:
'default') - enable_logging: enable logging to console (default value
false) - reserve_timeout: timeout value(in seconds) of job reservation (default value
30) - max_processing_jobs: max number of simultaneous jobs reserved (default:
1) - auto_reconnect: flag for reconnection behavior when connection is accidentally closed, which means it's not closed by client side and fivebeans will fire a
closeevent (default valuefalse) - handler: handler async function (mandatory, MUST be an async function)
- final: final async function (optional, MUST be an async function)
async start() function
Start the worker.
- NOTE async function can be called directly, or called inside another async function with
awaitkey word. - If call
consumer.start()directly, it will return immediately and process the actual start action asynchonously - If call
await consumer.start()inside an async function, it will wait until the start process finishes and then process the code at the back
Example:
const consumer = new Consumer({
host: '127.0.0.1',
port: 27017,
tube: 'example',
handler: async function (payload, job_info) {
console.log('processing job: ', payload);
return 'success';
}
});
// could be called directly without await
consumer.start();
// this line will be immediately called because start() is async function
console.log('do something');
// or could be called inside async function context
(async () => {
await consumer.start();
// this line will be called after start() returns
console.log('do something');
})();stop() function
Stop the consumer immediately, and any processing jobs won't report to beanstalk. Example
consumer.stop();async stopGracefully(timeout) function
Stop the consumer in a more graceful way. Will wait until all the processing jobs are done and reported to beanstalk, or wait for a user-specific timeout value.
Example
// stop the consumer gracefully within 3s
await consumer.stopGracefully(3000);BSW Producer class
BSW Producer class is used to connect to beanstalkd server and put jobs to a tube
The Producer constructor takes configuration object:
- host: beanstalkd host (default:
'127.0.0.1') - port: beanstalkd port (default:
11300) - tube: beanstalkd tube (default:
'default') - enable_logging: enable logging to console (default value
false)
async start() function
Same as Consumer class.
stop() function
Same as Consumer class.
async putJob(job) function
Put jobs to the tube. Receives an job object which has the following attributes:
- payload: job payload, type is String
- priority: job priority, 0 is highest, type is Integer
- delay: time(in seconds) for a job to transfer from Delayed state to Ready state, type is Integer
- ttr: time(in seconds) for a reserved job to become Ready state, type is Integer
Example:
await producer.putJob({
payload: JSON.stringify({key: 'value'}),
priority: 0,
delay: 0,
ttr: 60
});Full example
Find the full example in example directory:
To run, clone the project, then:
> npm install
(If you have `yarn` installed in your machine, we recommand you use `yarn install` instead)
> cd example
> node producer.js
> node consumer.jsContributors
- Fedor Korshunov (for v1) - view contributions
- Vence Lin (for v2) - view contributions
5 years ago
5 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago