waterpark v0.2.6
waterpark
Stream toolbox library
While working with streams, these basic operations will ease your life.
Quickstart
npm i waterparkconst {range, take, reduce} = require('waterpark')
// sum even numbers from 0 to 100
range(0, 100)
.pipe(take.obj({amount: 1, every: 2}))
.pipe(reduce.obj((sum, val) => sum + val, 0))
.on('data', console.log)Supported Streaming Modes
Object Mode: stream with objectMode: true
Buffer Mode: stream with objectMode: false
Waterpark streams default to objectMode (exception: fromBuffer).
Types: R = Readable, T = Transform, W = Writable, D = Duplex
| Name | Type | Object Mode | Buffer Mode | Shorthand |
|---|---|---|---|---|
| concurrent | T | ✓ | ✓ | concurrent (concurrency, transformHandler, options) |
| count | R | ✓ | ✓ | count (offset, options) |
| delay | T | ✓ | ✓ | delay (milliseconds, jitter, options) |
| filter | T | ✓ | ✓ | filter (filterHandler, options) |
| fromArray | R | ✓ | ✓ | fromArray (array, options) |
| fromBuffer | R | ✓ | ✓ | fromBuffer (buffer, options) |
| interval | R | ✓ | ‐ | interval (milliseconds, options) |
| multicore | T | ✓ | ✓ | multicore (cores, path, options) |
| random | R | ✓ | ✓ | random (size, options) |
| range | R | ✓ | ✓ | range (from, to, options) |
| reduce | R | ✓ | ‐ | reduce (reducer, initialValue, repeatAfter) |
| slice | T | ✓ | ✓ | slice (begin, end, every, options) |
| skip | T | ✓ | ✓ | skip (begin, every, options) |
| take | T | ✓ | ✓ | take (amount, every, options) |
| through | T | ✓ | ✓ | through (fn, options) |
count (options)
offset(default = 0) offset will be the first number emitted.- ...
options<ReadableOptions> options for a readable stream. - Returns: <Readable>
Creates a readable stream emitting incrementing numbers.
Example
const {count} = require('waterpark')
count().on('data', console.log)Expected output:
0
1
2
...fromArray (options)
array<Array> source for the readable stream- ...
options<ReadableOptions> options for a readable stream. - Returns: <Readable> supporting object mode ✓ | buffer mode ✓
Creates a readable stream form an array.
Example
const {fromArray} = require('waterpark')
const array = ['streaming', 'is', 'awesome']
fromArray({array})
.on('data', console.log)Expected output:
streaming
is
awesomefromBuffer (options)
buffer<Buffer> source for the readable stream- ...
options<ReadableOptions> options for a readable stream. - Returns: <Readable> supporting object mode ✗ | buffer mode ✓
Creates a readable stream form a buffer.
Example
const {fromBuffer} = require('waterpark')
const buffer = Buffer.from('letters')
fromBuffer({buffer, highWaterMark: 3})
.on('data', buf => console.log(buf.toString()))Expected output:
let
ter
sinterval (options)
interval<Number> interval in milliseconds- ...
options<ReadableOptions> options for a readable stream. - Returns: <Readable> supporting object mode ✓ | buffer mode ✗
Periodically emits the current unix timestamp.
Internally interval is using setInterval. Temporal jitter as well as drift within the order of milliseconds might occur.
Example
const {interval} = require('waterpark')
interval({interval: 500, objectMode: true})
.on('data', console.log)Expected output:
1520281268331
1520281269344
1520281270346
...random (options)
size<Number> length of emitted strings.- ...
options<ReadableOptions> options for a readable stream. - Returns: <Readable> supporting object mode ✓ | buffer mode ✓
Emits random hex-encoded strings / buffers with a given size.
Example
const {random} = require('waterpark')
random.obj({
size: 32 * 3,
encoding: 'hex',
highWaterMark: 32
})
.on('data', console.log)Example output
c55607c14da303103810c1d0e608f1275e20f7c72d1df4cd9f4b9a4daa48dc39
f52a5c53a3971c1d43713ce36c81723f09f9ae7f8170dec26545c5b1f8b6b272
8a9ffdd4f6a90ebae1364bede92fb19b428670af05b3fb184b4b39de582eb7barange (options)
from<Number> integer, included range start.to<Number> integer, included range end.- ...
options<ReadableOptions> optional stream options. - Returns: <Readable> supporting object mode ✓ | buffer mode ✗
Emits integers in sequence from the defined range.
from may be smaller than to, but both must be integer.
Example
const {range} = require('waterpark')
range({from: 1, to: 3})
.on('data', console.log)Expected output:
1
2
3concurrent (options)
concurrency<Number> integer, concurrent transform operations.- ...
options<TransformOptions> optional stream options. - Returns: <Transform> supporting object mode ✓ | buffer mode ✓
Concurrent stream processing.
Example
const {range, concurrent} = require('waterpark')
range({from: 1, to: 100})
.pipe(concurrent({
concurrency: 10,
transform: (data, encoding, cb) => {
setTimeout(() => cb(null, data), 100)
}
}))
.on('data', console.log)Finishes in ~1s while through would take ~10s
delay (options)
milliseconds<Number> integer, included range start.jitter<Number> integer, included range end.- ...
options<TransformOptions> optional stream options. - Returns: <Transform> supporting object mode ✓ | buffer mode ✓
Emits data delayed by a specified amount of time.
Example
const {range, delay} = require('waterpark')
range({from: 1, to: 3})
.pipe(delay({delay: 500}))
.on('data', console.log)Expected output:
1
2
3Lines will be printed in sequence. Each one delayed by ~500ms.
filter (options)
filter<Function(data) => Boolean> pipe data that meets the filter condition.- ...
options<TransformOptions> optional stream options. - Returns: <Transform> supporting object mode ✓ | buffer mode ✗
Emits data that passes the filter filter condition.
This stream operates in object mode per default.
Example
const {range, filter} = require('waterpark')
range(1, 5)
.pipe(filter(n => n % 2))
.on('data', console.log)Expected output:
1
3
5The inital range 1 to 5 is filtered for odd numbers
multicore (options)
path<String> path to module that will be used for clustering.cores<Number> number of cores used in parallel.- ...
options<TransformOptions> optional stream options. - Returns: <Transform> supporting object mode ✓ | buffer mode ✓
Stream operations in parallel on multiple cores. ★
Forks the module referenced by path, core times, spreads the
previous stream data to these child processes, computes their transform
handlers in parallel and collects their digests while preserving order.
Communication between the main process and child processes is done via
JSON encoding. Include serialization and deserialization of data that
needs to be communicated to the worker and back into your performance
estimation.
If your work is mostly I/O bound you might be looking for
concurrent which is used in multicore as scheduler.
For optimal performance, use the number of physical cores. Exceeding that amount is possible on machines with hyper threading, yet might yield actually less performance due to thrashing.
Example
Let's do some cpu intense calculation and compute the 1e6-fold SHA256 hash of multiple messages.
./main.js
const {range, multicore} = require('waterpark')
range(1, 12)
.pipe(multicore(4, require.resolve('./worker.js')))
.on('data', console.log)./worker.js
const {createHash} = require('crypto')
process.on('message', (msg) => {
for (let i = 1e6; i > 0; i--) {
msg = createHash('sha256').update(msg.toString()).digest('hex')
}
process.send(msg)
})Then execute node main
Expected output:
d6c3110abae572a3ce11a696068dca0f01961fbbf9f2c08bdfdde3640b79db0b
3a2ae473ab4a5fc533adb7367af8b1ffdd5a5a78fafb51945a0869021b07bb14
945a76e4ef3a32651ffde16b90d26c24bbadc9bdf50ff5f580f869108d6bff86
60ca5d721a66d84bfcfab6e0b79a8f5e83bb7a7cd24dcf11dcf4b8a348cf5fe8
...Each line represents the outcome of a CPU intense calculation.
range runs on the main process and sequentially pipes numbers
into 4 child process each running on a separate core which are
therefore able to calculate the expensive hash function in parallel.
slice (options)
begin<Number> zero based index at which to begin extraction. Default is 0end<Number> pass elements up to but not including (zero based index).every<Number> repeat slice operation aftereveryelements.- ...
options<TransformOptions> optional stream options. - Returns: <Transform> supporting object mode ✓ | buffer mode ✓
Similar to Array.slice() and Buffer.slice() slice is acting as range filter on its source.
In respect to streams potential infinite nature, the every parameter
has been introduced.
Example Every 5 elements, pass the 2nd to the 4th element.
const {range, slice} = require('waterpark')
range(0, 9)
.pipe(slice.obj(1, 4, 5))
.on('data', console.log)Expected output:
1
2
3
6
7
8reduce (reducer[, initalValue][, every])
options<TransformOptions> optional stream options.reducer<Function (accumulator, currentValue, currentIndex)>initialValue<any> Initial accumulator value. Default is 0every<Number> repeat reduction aftereverysteps. Default is infinity which reduces only once at the end of the source.- Returns: <Transform> supporting object mode ✓ | buffer mode ✗
Reduces stream emissions to one (source stream must be finite) or many
if every is set.
Example Every 4 numbers emit the sum of the last 4 numbers.
const {range, reduce} = require('waterpark')
range.obj(1, 100)
.pipe(reduce.obj((sum, val) => sum + val, 0, 4))
.on('data', console.log)Expected output:
10
26
42
58
74
...skip (amount[, every][, options])
amount<Number> skip this amount of objects / bytes. Default is 0every<Number> repeat skip operation aftereveryelements. Default is infinity.everymust be bigger thanamount.- ...
options<TransformOptions> optional stream options. - Returns: <Transform> supporting object mode ✓ | buffer mode ✓
See also: slice
In respect to streams potential infinite nature, the every parameter
has been introduced.
Example Every 5 elements, pass the 2nd to the 4th element.
const {range, slice} = require('waterpark')
range(0, 9)
.pipe(slice(1, 4, 5))
.on('data', console.log)Expected output:
1
2
3
6
7
8take (amount[, every][, options])
amount<Number> only take this amount of objects / bytes. Default is 0every<Number> repeat take operation aftereveryelements. Default is infinity, which will cause take to end afteramountelements have been processed.- ...
options<TransformOptions> optional stream options. - Returns: <Transform> supporting object mode ✓ | buffer mode ✓
In respect to streams potential infinite nature, the every parameter
has been introduced.
See also: slice
Example Take first 3 elements of a stream.
const {count, take} = require('waterpark')
count()
.pipe(take(3))
.on('data', console.log)Expected output:
0
1
2through ([options, ]fn(data, encoding, cb))
// TODO: write docs