async-pipeline v0.3.3
async-pipeline
async-pipeline allows you to build a complex conditional flow from loosely coupled components.
npm install async-pipelineImagine you have set of processing operations that might be applied in different order based on input and intermediate results.
Of course you can build it as Promise chain(s) but it will look hard to read at least. And over time, after more additions, you have a good chance it to turn into unreadable code mess.
Another much easier approach would be to build the flow based on PubSub, and let processing functions listen and emit messages with payloads. This would let you split the flow into easily tested atomic stages doing one thing at a time, not coupled to other components. Plus, you can have multiple functions listening to the same events and build parallel secondary flows like logging and tracking aside from the main logic.
async-pipeline is helping to build such message hub by adding few generic features on top of EventEmitter:
- optionally apply transition constraint map
- catch errors and route them to one place
- track and log all transitions
- provide flow control events:
@end,@error - track execution time in milliseconds using
process.hrtime
Example
new Pipeline({
transitions: {
'start': ['stage-1:progress', 'stage-1:done'],
'stage-1:done': ['stage-2:done']
}
})
.on('@error', console.error.bind('ERROR:'))
.on('@end', dump => {
console.log('END:', inspect(dump, {depth: 10}))
})
.on('start', function (count) {
let i = 0
while (i++ < count) setTimeout(this.emit, i * 100, 'stage-1:progress', i)
setTimeout(this.emit, (count + 1) * 100, 'stage-1:done')
})
.on('stage-1:done', function () {
setTimeout(this.emit, 200, 'stage-2:done', 'hello', 'there')
})
.on('stage-2:done', function(...args) {
console.log('after stage-2', ...args)
this.end()
})
.on('stage-1:progress', console.log.bind(console, 'stage1'))
.start('start', 5)the code would output following:
stage1 1
stage1 2
stage1 3
stage1 4
stage1 5
after stage-2 hello there
END: [ { event: 'start',
payload: [ 5 ],
routes:
[ { event: 'stage-1:progress',
payload: [ 1 ],
routes: [],
time: 107 },
{ event: 'stage-1:progress',
payload: [ 2 ],
routes: [],
time: 200 },
{ event: 'stage-1:progress',
payload: [ 3 ],
routes: [],
time: 305 },
{ event: 'stage-1:progress',
payload: [ 4 ],
routes: [],
time: 402 },
{ event: 'stage-1:progress',
payload: [ 5 ],
routes: [],
time: 503 },
{ event: 'stage-1:done',
payload: [],
routes:
[ { event: 'stage-2:done',
payload: [ 'hello', 'there' ],
routes: [],
time: 806 } ],
time: 604 } ],
time: 0 } ]API
Call new Pipeline(options) or just Pipeline(options) to get a pipeline instance. Where options are:
debug- optional function to print internal logs, easiest way to get debugging output is to pass{debug: console.log}contextAPI–trueby default, expose API controls to the handlers throughthis, otherwise pass as a first argumenttransitions- optional mapping defining allowed transitions and entry points. Example:
{
'from-event-0': ['to-event-1', 'to-event-2'],
'from-event-2': ['to-event-3']
}Pipeline methods
this.start('event', ...payload)- start the flow with eventeventand optional payload. This will throw ifoptions.constraintsis set and has noeventkeythis.context(dataObject)- defines data object available to all handlers throughthis.context()
Internal events and their payloads
@all (event, ...payload)- catches all events, emitted right before normal event. Useful mainly for testing@end (trace)- fired once any handler callsthis.end(). Also automatically triggered on unexpected errors@error (error, trace)- fired if any handler has thrown or explicitly calledthis.end(error). This will cause an uncaught exception bubbling to theprocesslevel if no handler registered
Handler context methods
Each handler except ones for internal events has access to following methods exposed though call context. Keep in mind that handlers should be defined as function() {}, arrow functions would forcefully override the context
this.end([error])- end the flow and void all eventual events. Iferrorpassed that it's considered an emergency shutdown. In either case if will end up triggering@endeventthis.emit('event', ...payload)- cast an event with arbitrary payload. This will throw if emitted event is violatingoptions.constraintsthis.context([dataObject])- getter/setter for context data kept shared for all handler. This should be used wisely though to keep handlers uncoupledthis.safe(fn)- decorator for async calls that may throw within handler. Following would throw to the top level unless wrapped intosafe():
pipeline.on('event', () => {
const later = this.safe(() => { throw new Error() })
setTimeout(later, 0)
})Error handling
Errors thrown from within event handlers will be caught and routed as @error event payload. However those would bubble up to the top if no handler defined
Errors thrown from within internal events (@end, @error) are always bypassing @error and propagate to the top to avoid recursive failures. Same for the failures in start() and on() calls
Alternative EventEmitter
If you Node's EventEmitter doesn't work for you for whatever reason then you can inject the alternative
Pipeline = Pipeline.di({EventEmitter: EventEmitterClass})