2.0.0 • Published 4 years ago

bs-better-stream v2.0.0

Weekly downloads
3
License
ISC
Repository
github
Last release
4 years ago

bs-better-stream

Setup

npm i -save bs-better-stream

const Stream = require('bs-better-stream');

let myStream = new Stream();
myStream.write(10, 20, 30);
myStream.each(console.log);
myStream.write(11, 21, 31);

Overview

chaining

myStream
    .write(10, 20, 30)
    .each(print)
    .map(double)
    .filter(greaterThan30)
    .write(-1, -2, -3)
    .writePromise(requestAsyncNumber());

accumulation

you can use streams before and after you have began writing to them. See .clean().

let myStream = new Stream();
myStream.write(11, 21, 31);
myStream.each(console.log);
myStream.write(12, 22, 32);
// will print 11, 21, 31, 12, 22, 32

write (...values)

myStream.write(10, 20, 30);

write array (...array of values)

myStream.write(...[10, 20, 30]);

writePromise (...promises)

let promise1 = Promise.resolve('i hate `.then`s');
let promise2 = Promise.reject('rejections r wrapped');
myStream.writePromise(promise1, promise2);
// myStream.outValues equals ['i hate `.then`s', {rejected: 'rejections r wrapped', isRejected: true}]

writePromiseSkipOnReject (...promises)

let promise1 = Promise.resolve('i hate `.then`s');
let promise2 = Promise.reject('rejections r ignored');
myStream.writePromiseSkipOnReject(promise1, promise2);
// myStream.outValues equals ['i hate `.then`s']

each (handler)

myStream.each( (value, index) => doOperation(value, index) );

map (handler)

myStream.map( (value, index) => value * index );

filter (predicateHandler)

myStream.filter( (value, index) => value > index );

filterCount (integer)

let outStream = myStream.filterCount(3);
myStream.write('first', 'second', 'third', 'fourth', 'fifth');
// outStream.outValues equals [first, second, third]

filterIndex (array of indices)

let outStream = myStream.filterIndex([0, 2, 3]);
myStream.write('first', 'second', 'third', 'fourth', 'fifth');
// outStream.outValues equals [first, third, fourth]

filterEach (predicateHandler, truePredicateHandler, falsePredicateHandler)

let outStream = myStream.filterEach(value => value > 100, handleLargeNumbers, handleSmallNumbers);

filterMap (predicateHandler, truePredicateHandler, falsePredicateHandler)

let outStream = myStream.filterMap(value => value > 100, a => a + 100, a => -a);
myStream.write(200, 0, 1, 201, 2, 202);
// outStream.outValues equals [300, -0, -1, 301, -2, 302]

branchMap (...predicate and map handlers)

let myStream = new Stream();
let outStream = myStream.branchMap(
    a => a[0] === 'a', a => 'Apple ' + a, // if item starts with 'a', prepend 'Apple'
    a => a[0] === 'b', a => 'Banana ' + a); // if item starts with 'b', prepend 'Banana'
myStream.write('at', 'bat', 'action', 'cat', 'aaa');
// outStream.outValues equals ['Apple at', 'Banana bat', 'Apple action', 'cat', 'Apple aaa']
let myStream = new Stream();
let outStream = myStream.branchMap(
    a => a[0] === 'a', a => 'Apple ' + a, // if item starts with 'a', prepend 'Apple'
    a => a[0] === 'b', a => 'Banana ' + a, // if item starts with 'b', prepend 'Banana'
    a => 'Other ' + a); // else, prepend, 'Other'
myStream.write('at', 'bat', 'action', 'cat', 'aaa');
// outStream.outValues equals ['Apple at', 'Banana bat', 'Apple action', 'Other cat', 'Apple aaa']

switchMap (switchHandler, ...case and map handlers)

let myStream = new Stream();
let outStream = myStream.switchMap(
    a => a.type,
    'animal', a => `i have a pet ${a.value}`,
    'number', a => `u have ${a.value} pencils`,
    'color', a => `his favorite color is ${a.value}`,
    a => `other: ${a.value}`);
myStream.write(
    {type: 'animal', value: 'elephant'},
    {type: 'animal', value: 'flamingo'},
    {type: 'number', value: 51},
    {type: 'number', value: 1235},
    {type: 'color', value: 'blue'},
    {type: 'color', value: 'pink'},
    {type: 'star', value: 'sun'});
/*
[ 'i have a pet elephant',
  'i have a pet flamingo',
  'u have 51 pencils',
  'u have 1235 pencils',
  'his favorite color is blue',
  'his favorite color is pink',
  'other: sun' ]
*/
let myStream = new Stream();
let outStream = myStream.switchMap(
    a => a.type,
    'animal', a => `i have a pet ${a.value}`,
    'number', a => `u have ${a.value} pencils`,
    'color', a => `his favorite color is ${a.value}`,
    a => `other: ${a.value}`);
myStream.write(
    {type: 'animal', value: 'elephant'},
    {type: 'animal', value: 'flamingo'},
    {type: 'number', value: 51},
    {type: 'number', value: 1235},
    {type: 'color', value: 'blue'},
    {type: 'color', value: 'pink'},
    {type: 'star', value: 'sun'});
/*
[ 'i have a pet elephant',
  'i have a pet flamingo',
  'u have 51 pencils',
  'u have 1235 pencils',
  'his favorite color is blue',
  'his favorite color is pink',
  { type: 'star', value: 'sun' } ]
*/

unique ()

let outStream = myStream.unique();
myStream.write(0, 1, 1, 0, 2, 3, 2, 3);
// outStream.outValues equals [0, 1, 2, 3]

uniqueOn (keyName)

let outStream = myStream.uniqueOn('name');
myStream.write(
    {age: 4231, name: 'Odysseus'},
    {age: 4250, name: 'Odysseus'},
    {age: 4234, name: 'Helen'});
// outStream.outValues equals [{age: 4231, name: 'Odysseus'},
//                             {age: 4234, name: 'Helen'}]

uniqueX (handler)

let outStream = myStream.uniqueX(obj => obj.a + obj.b);
myStream.write(
    {a: 1, b: 5},
    {a: 2, b: 4},
    {a: 3, b: 3});
// outStream.outValues equals [{a: 1, b: 5}]

pluck (keyName)

let outStream = myStream.pluck('key');
myStream.write({key: 'value'});
// outStream.outValues equals ['value']

wrap (keyName)

let outStream = myStream.wrap('key');
myStream.write('value');
// outStream.outValues equals [{key: 'value'}]

pick (...keyNames)

let outStream = myStream.pick('name', 'age');
myStream.write({
    name: 'myName',
    age: 'myAge',
    gender: 'myGender',
    weight: 'myWeight'
});
// outStream.outValues equals [{name: 'myName', age: 'myAge'}]

omit (...keyNames)

let outStream = myStream.omit('gender', 'weight');
myStream.write({
    name: 'myName',
    age: 'myAge',
    gender: 'myGender',
    weight: 'myWeight'
});
// outStream.outValues equals [{name: 'myName', age: 'myAge'}]

set (keyName, handler)

let outStream = myStream.set('sum', (object, index) =>  object.number + object.otherNumber + index );
myStream.write({number: 5, otherNumber: 10});
// outStream.outValues equals [ { number: 5, otherNumber: 10, sum: 15 } ]

repeat (handler)

let outStream = myStream.repeat( (value, index) =>  value + index );
myStream.write(2, 3, 2);
// outStream.outValues equals [2, 2, 3, 3, 3, 3, 2, 2, 2, 2]

repeatCount (integer)

let outStream = myStream.repeatCount(2);
myStream.write(2, 3, 2);
// outStream.outValues equals [2, 2, 3, 3, 2, 2]

flatten ()

let outStream = myStream.flatten();
myStream.write([2], [3], [2, 4]);
// outStream.outValues equals [2, 3, 2, 4]

flattenOn (listKeyName, newKeyName)

myStream.write({key1: 'value1', numbers: [1, 2]});
myStream.write({key1: 'value1b', numbers: [4, 5]});
let outStream = myStream.flattenOn('numbers', 'number');
// outStream.outValues equals [{key1: 'value1', number: 1}, {key1: 'value1', number: 2}, {key1: 'value1b', number: 4}, {key1: 'value1b', number: 5}]

Why is flattenOn useful?

imagine we have a set of animals grouped by species

let animalSpecies = new Stream();
animalSpecies.write({species: 'cat', class: 'mammalia', names: ['kitty', 'cupcake']});
animalSpecies.write({species: 'dog', class: 'mammalia', names: ['barf', 'brownNose']});

without flattenOn, we would need to do something like the following in order to obtain a flat list of animals

animalSpecies
    .flatMap(animalSpecies =>
        animalSpecies.names.map(name => {
            let animal = Object.assign({}, animalSpecies);
            delete animal.names;
            animal.name = name;
            return animal;
        }));

but with flattenOn, we can simply do the following

animalSpecies
    .flattenOn('names', 'name');

join (...streams)

let outStream = myStream.join(stream1, stream2);
myStream.write(1, 2);
stream1.write(3, 4);
stream2.write(5, 6);
// outStream.outValues equals [1, 2, 3, 4, 5, 6]

joinCollapse ()

myStream.write(stream1, stream2, stream3);
outStream = myStream.joinCollapse();
stream1.write(1.0, 1.1, 1.2);
stream2.write(2.0, 2.1, 2.2);
stream3.write(3.0, 3.1, 3.2);
// outStream.outValues equals [1.0, 1.1, 1.2, 2.0, 2.1, 2.2, 3.0, 3.1, 3.2]

product (rightStream, leftStreamIdKey, rightStreamIdKey, leftStreamSetKey)

let productStream = myStream.product(otherStream, 'myId', 'otherId', 'other');
myStream.write({myId: 1, myValue: 100});
myStream.write({myId: 2, myValue: 200});
myStream.write({myId: 2, myValue: 201});
otherStream.write({otherId: 2, otherValue: 20});
otherStream.write({otherId: 2, otherValue: 21});
otherStream.write({otherId: 3, otherValue: 30});
// productStream.outValues equals [{myId: 2, myValue: 200, other: {otherId: 2, otherValue: 20}},
//                                 {myId: 2, myValue: 201, other: {otherId: 2, otherValue: 20}},
//                                 {myId: 2, myValue: 200, other: {otherId: 2, otherValue: 21}},
//                                 {myId: 2, myValue: 201, other: {otherId: 2, otherValue: 21}}]

productX (rightStream, matchHandler, handler)

let productStream = myStream.productX(otherStream, (left, right) => left.myId === right.otherId, (left, right) => {
    left.paired = true;
    right.paired = true;
    return {sum: left.myValue + right.otherValue};
});
myStream.write({myId: 1, myValue: 100});
myStream.write({myId: 2, myValue: 200});
myStream.write({myId: 2, myValue: 201});
otherStream.write({otherId: 2, otherValue: 20});
otherStream.write({otherId: 2, otherValue: 21});
otherStream.write({otherId: 3, otherValue: 30});
// myStream.outValues equals [{myId: 1, myValue: 100},
//                            {myId: 2, myValue: 200, paired: true},
//                            {myId: 2, myValue: 201, paired: true}]
// otherStream.outValues equals [{otherId: 2, otherValue: 20, paired: true},
//                               {otherId: 2, otherValue: 21, paired: true},
//                               {otherId: 3, otherValue: 30}]
// productStream.outValues equals [{sum: 220},
//                                 {sum: 221},
//                                 {sum: 221},
//                                 {sum: 222}]

Note that while product modifies a copy of left stream's values, leaving left stream unmodified; productX passes in the original values of left stream, allowing left stream to be modified by the handler as seen in the example above.

to (stream)

myStream.to(outStream);
myStream.write(1, 2);
outStream.write(3, 4);
// outStream.outValues equals [1, 2, 3, 4]

wait (skipOnReject)

myStream.write(Promise.resolve('stream'));
myStream.write(Promise.resolve('async'));
myStream.write(Promise.resolve('data'));
let outStream = myStream.wait();
myStream.write(Promise.resolve('without needing'));
myStream.write(Promise.resolve('async/await'));
myStream.write(Promise.resolve('or .then'));
myStream.write(Promise.reject('rejected'));
// outStream.outValues equals ['stream', 'async', 'data', 'without needing', 'async/await', 'or .then',  {rejected: 'rejected', isRejected: true}]

waitOn (key, skipOnReject)

myStream.write({key1: 'value1', key2: Promise.resolve('value2')});
myStream.write({key1: 'value2', key2: Promise.reject('rejectValue2')});
let outStream = myStream.waitOn('key2');
// outStream.outValues equals [{key1: 'value1', key2: 'value2'},
//                             {key1: 'value2', key2: {rejected: 'rejectValue2', isRejected: true}}]

Why is waitOn useful?

imagine we have a set of users

let users = new Stream();
users.write({userId: '1', height: 3, color: 'blue'}, {userId: '2', height: 4, color: 'green'}, {userId: '3', height: 2, color: 'orange'});

and this api to obtain a user's shape

let getUserShape = userId => {
    return Promise.resolve(userId === 1 ? 'circle' : 'square');
};

without waitOn, we would need to do something like the following in order to include every user's shape

users
    .set('shape', ({userId}) => getUserShape(userId))
    .map(user => user.shape.then(shape => {
        user.shape = shape;
        return user;
    }))
    .wait();

but with waitOn, we can simply do the following

users
    .set('shape', ({userId}) => getUserShape(userId))
    .waitOn('shape');

waitOrdered (skipOnReject)

let resolve1, resolve2;
let promise1 = new Promise(resolve => resolve1 = resolve);
let promise2 = new Promise(resolve => resolve2 = resolve);
myStream.write(promise1, promise2);
let outStream = myStream.waitOrdered();
resolve2('promise 2 resolved first');
resolve1('promise 1 resolved last');
// outStream.outValues equals ['promise 1 resolved last', 'promise 2 resolved first']

waitOnOrdered (key, skipOnReject)

let resolve1, resolve2;
let promise1 = new Promise(resolve => resolve1 = resolve);
let promise2 = new Promise(resolve => resolve2 = resolve);
myStream.write({key: promise1}, {key: promise2});
let outStream = myStream.waitOnOrdered('key');
resolve2('promise 2 resolved first');
resolve1('promise 1 resolved last');
// outStream.outValues equals [{key: 'promise 1 resolved last', key: 'promise 2 resolved first'}]

skipOnReject paramater

passing true as the last paramater to wait, waitOn, waitOrdered, and waitOnOrdered will ignore values which are rejected, similar to writePromiseSkipOnReject

myStream.write({key1: 'value1', key2: Promise.resolve('value2')});
myStream.write({key1: 'value2', key2: Promise.reject('rejectValue2')});
let outStream = myStream.waitOn('key2', true);
// outStream.outValues equals [{key1: 'value1', key2: 'value2'}]

otherwise, rejected promises are wrapped in a {rejected: <rejected value>, isRejected: true} structure and written just like resolved promises, similar to writePromise

if (predicateHandler)

myStream.write(110, 10, 30, 130, 50, 150);
let ifStreams = myStream.if(value => value > 100);
// ifStreams.then.outValues equals [110, 130, 150]
// ifStreams.else.outValues equals [10, 30, 50]

console.log('numbers over 100:');
ifStreams.then.each(value => console.log(value));
console.log('numbers under 100:');
ifStreams.else.each(value => console.log(value));

split (predicateHandler, truePredicateHandler, falsePredicateHandler)

myStream.write({species: 'kitten', name: 'tickleMe'});
myStream.write({species: 'kitten', name: 'pokeMe'});
myStream.write({species: 'puppy', name: 'hugMe'});
myStream.split(
	animal => animal.species === 'kitten',
	kittenStream => kittenStream
		.set('sound', () => 'meow')
		.set('image', ({name}) => getRandomLolzCatImage(name)),
	puppyStream => puppyStream
		.set('sound', () => 'wuff')
		.set('edible', () => true)
		.each(dipInChocolate));

group (handler)

myStream.write({species: 'cat', name: 'blue'}, {species: 'cat', name: 'green'}, {species: 'dog', name: 'orange'});
let species = myStream.group(animal => animal.species);
// species.cats.outValues equals [{species: 'cat', name: 'blue'}, {species: 'cat', name: 'green'}]
// species.dogs.outValues equals [{species: 'dog', name: 'orange'}]

console.log('cats:');
species.cat.each(cat => console.log('  ', cat.name));
console.log('dogs:');
species.dog.each(dog => console.log('  ', dog.name));

groupCount (integerGroupSize)

myStream.write(20, 30, 40, 50, 60, 70, 80);
let groupStreams = myStream.groupCount(3);
// groupStreams.group0.outValues equals [20, 30, 40]
// groupStreams.group1.outValues equals [50, 60, 70]
// groupStreams.group2.outValues equals [80]

groupFirstCount (integerGroupSize)

myStream.write(20, 30, 40, 50, 60, 70, 80);
let groupStreams = myStream.groupFirstCount(3);
// groupStreams.first.outValues equals [10, 20, 30]
// groupStreams.rest.outValues equals [50, 60, 70, 80]

console.log('first 3 numbers:');
groupStreams.first.each(number => console.log(number));
console.log('rest of numbers:');
groupStreams.rest.each(number => console.log(number));

groupNCount (integerGroupSize, integerGroupCount)

myStream.write(20, 30, 40, 50, 60, 70, 80, 90, 100, 110, 120);
let groupStreams = myStream.groupCount(3, 2);
// groupStreams.group0.outValues equals [20, 30, 40]
// groupStreams.group1.outValues equals [50, 60, 70]
// groupStreams.rest.outValues equals [80, 90, 100, 110, 120]

groupIndex (...lists of indices)

myStream.write(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100);
let groupStreams = myStream.groupIndex([0], [1, 3, 5, 6]);
// groupStreams[0].outValues equals [0]
// groupStreams[1].outValues equals [10, 30, 50, 60]
// groupStreams.rest.outValues equals [20, 40, 70, 80, 90, 100]

console.log('first number:');
groupStreams[0].each(number => console.log(number));
console.log('important numbers:');
groupStreams[1].each(number => console.log(number));
console.log('other numbers:');
groupStreams.rest.each(number => console.log(number));

batch (integerBatchSize)

myStream.write(0, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100);
let outStream = myStream.batch(4);
// outStream.outValues equals [[0, 10, 20, 30], [40, 50, 60, 70]]

batchFlat (integerBatchSize)

let outStream = myStream.batchFlat(4);
myStream.write(0, 10, 20);
// outStream.outValues equals []
myStream.write(30, 40, 50);
// outStream.outValues equals [0, 10, 20, 30]
myStream.write(60, 70, 80);
// outStream.outValues equals [0, 10, 20, 30, 40, 50, 60, 70]
myStream.write(90, 100);
// outStream.outValues equals [0, 10, 20, 30, 40, 50, 60, 70]

generate (handler)

myStream.write(10, 40);
let outStream = myStream.generate(value => [value + 1, value * 2]);
// outStream.outValues equals [10, 11, 20, 40, 41, 80]

flatMap (handler)

myStream.write(10, 40);
let outStream = myStream.flatMap(value => [value + 1, value * 2]);
// outStream.outValues equals [11, 20, 41, 80]

throttle (integer)

myStream.write(promise1, promise2, promise3, promise4);
let throttled = myStream.throttle(2);
throttled.stream
    .wait()
    .each(doStuff)
    .each(throttled.nextOne);

After calling throttled = stream.throttle(n), throtled.stream will emit n values initially. It will emit 1 more value each time throttled.next() or throttled.nextOne() are invoked, and m more values each time throttled.next(m) is invoked.

myStream.write(1, 2, 3, 4, 5);
let throttled = myStream.throttle(2);
// throttled.stream.outValues equals [1, 2]
throttled.next(2);
// throttled.stream.outValues equals [1, 2, 3, 4]
throttled.next(2);
// throttled.stream.outValues equals [1, 2, 3, 4, 5]
myStream.write(6, 7);
// throttled.stream.outValues equals [1, 2, 3, 4, 5, 6]

Calling throttled = stream.throttle() is short for calling throttled = stream.throttle(0), which results in a lazy stream. throttled.stream will emit values only when throttled.next is invoked.

myStream.write(1, 2, 3, 4, 5);
let throttled = myStream.throttle(2);
// throttled.stream.outValues equals [1, 2]
throttled.unthrottle();
// throttled.stream.outValues equals [1, 2, 3, 4, 5]
myStream.write(6, 7);
// throttled.stream.outValues equals [1, 2, 3, 4, 5, 6, 7]

Calling throttled.unthrottle() will allow all current and future values to pass through without throttling, and rendering throttled.next() unnecessary.

clean()

myStream.write(1, 2, 3);
let oneToSix = myStream.map(a => a);
myStream.clean();
myStream.write(4, 5, 6);
let fourToFive = myStream.map(a => a);
// myStream.outValues equals [4, 5, 6];

disconnect()

myStream.write(1, 2, 3);
let oneToThree = myStream.map(a => a);
myStream.disconnect();
myStream.write(4, 5, 6);
let oneToSix = myStream.map(a => a);

promise

myStream.promise returns a promise that resolves when all already written values to the stream have resolved.

myStream.write(promise1, promise2, promise3, promise4);
myStream.promise.then(resolve1234 => process(resolve1234));

length

myStream.length

outValues

myStream.outValues

2.0.0

4 years ago

1.9.5

5 years ago

1.9.0

5 years ago

1.8.5

6 years ago

1.8.0

6 years ago

1.7.3

6 years ago

1.7.0

6 years ago

1.6.0

6 years ago

1.5.0

6 years ago

1.4.5

6 years ago

1.4.0

6 years ago

1.2.0

6 years ago

1.1.3

6 years ago

1.1.0

6 years ago

1.0.0

6 years ago

0.6.0

6 years ago

0.5.2

6 years ago

0.5.1

6 years ago

0.5.0

6 years ago

0.4.7

6 years ago

0.4.6

6 years ago

0.4.5

6 years ago

0.4.3

6 years ago

0.4.2

6 years ago

0.4.0

6 years ago

0.3.9

6 years ago

0.3.8

6 years ago

0.3.4

6 years ago

0.3.0

6 years ago

0.2.6

6 years ago

0.2.5

6 years ago

0.2.4

6 years ago

0.2.3

6 years ago

0.2.2

6 years ago

0.2.1

6 years ago

0.2.0

6 years ago

0.1.1

6 years ago