@regular/pull-buckets v1.1.0
pull-buckets
Segmentate an incoming stream into groups ("buckets") and reduce each group to a single value that is then passed downstream. Suitable for live-streams when using hte timeout option. (Makes sure bucket value is eventually emitted when no new data comes in for a while)
const buckets = require('pull-buckets')
function bucketKey(n) {
return (n / 10) << 0
}
function createOrAdd(b, n) {
b = b || {
tens: bucketKey(n),
l: [],
sum: 0
}
b.sum += n
b.l = b.l.concat([n])
return b
}
function fitsBucket({tens}, n) {
return bucketKey(n) == tens
}
pull(
pull.values([1,2,10,11,20,25]),
buckets(fitsBucket, createOrAdd),
pull.log()
)
/* output:
{tens: 0, sum: 3, l: [ 1, 2 ]},
{tens: 1, sum: 21, l: [ 10, 11 ]},
{tens: 2, sum: 45, l: [ 20, 25 ]}
*/buckets(fitsBucket, add, opts)
returns a new pull through stream that accumulates items in buckets and passes buckets downstream once encountering an item that does not fit the current bucket.
IMPORTANT: because of this mechanics, the icoming stream needs to be a sorted stream in regards to
whatever criteria fitsBucket (see below) implements.
fitsBucket(bucket, value)returns true if the value fits into the given bucket, false otherwise
add(bucket, value)if bucket is undefined, creates a fresh bucket and throws the given value into it. Othereise, accumulate the value in the existing bucket. Should manipulate bucket object in-place whenever ppssible. pull-buckets will make a copy of the bucket before sending it out.
Options:
initial: an initial value for the first buckettimeout: emit the current bucket once last update is older than given millisecondsmax_size: start a new bucket when this number of items where thrown into current bucket
License: MIT
9 months ago