stream-batcher v0.9.1
stream-batcher Library
About
This library provides functionality to schedule batch updates.
It is useful when you have a stream of incoming events and you want to batch a portion of them together as a single event. For instance you might have a pipeline which is receiving events which will correlate to updates to a remote data store. It is slow to send those updates one by one and you would like to combine the updates into batches.
Install
npm install stream-batcher
yarn install stream-batcher
How to use
First you create a streambatcherFactory. There is a configurable delay parameter captured into the streambatcherFactory (explained below).
const StreamBatcher = require('stream-batcher')
let { streambatcherFactory } = StreamBatcher(100/* delay between delivered batches*/,1000/*max batch size*/)
You then create a schedule function using the streambatchFactory. You provide a batch handling function to the factory.
let schedule = streambatchFactory(async (batch) => { /* process the batch */ })
You then use the schedule function to submit items to be batched.
schedule('Some Value')
When there are 3 successive periods with no scheduled batch items the batch handler terminates. It will be restarted on the next schedule function call.
Understanding how it works
Turn on debug using
export DEBUG=scheduler
Then run the unit tests
npm run test
The unit test creates a factory with a batch delay of 100 ms. It then create a schedule function for one type and provides a test batch handing function.
//The unit test creates a streambatchFactory and then a schedule function
scheduler schedule 1 +0ms //The unit test calls to schedule the value of 1
scheduler schedule 2 +3ms //The unit test calls to schedule the value of 2
//The unit test waits here
scheduler action attempt 0 +98ms //The batch handling actions the available batch items and resets and internal retry counter
test Function +0ms //invoking the batch handling function
test [ '1', '2' ] +0ms //and providing the 2 batched items
scheduler action attempt 0 +105ms//There is a further 100 ms delay with no new batch items
scheduler action attempt 1 +101ms//There is a further 100 ms delay with no new batch items
scheduler action attempt 2 +102ms//There is a further 100 ms delay with no new batch items
scheduler schedule 3 +94ms //The unit test schedules 3 new values
scheduler schedule 4 +1ms
scheduler schedule 5 +0ms
//The unit test waits here to allow the batch handler to shut down
scheduler action attempt 3 +6ms //The next time the batch handler runs its actions 3 items
test Function +408ms
test [ '3', '4', '5' ] +0ms
//The batch handler resets its internal batch try counter
scheduler action attempt 0 +101ms//There are no new batch items
scheduler action attempt 1 +102ms//dito
scheduler action attempt 2 +101ms//dito
scheduler action attempt 3 +101ms//dito
scheduler closing scheduler +0ms //As there have not been any scheduled items in 3 successive batch delay periods
scheduler closed scheduler +0ms //the batch handler shuts down
scheduler schedule 6 +590ms //A new shchedule call will fail and requires a new scheduler to be created
ok 1 Given a StreamBatcher and some scheduled values: should the values be processed
scheduler schedule 6 +4ms //With the new scheduler that schedule function works and buffers the batch item.
scheduler action attempt 0 +100ms
test Function +1s
test [ '6' ] +0ms
scheduler action attempt 0 +101ms
scheduler action attempt 1 +101ms
scheduler action attempt 2 +101ms
ok 2 Given a StreamBatcher and some additional scheduled values: should the additional values be processed
scheduler action attempt 3 +100ms
scheduler closing scheduler +0ms
scheduler closed scheduler +0ms