pvc v0.0.6
pvc
pvc is a lightweight (no dependencies!) collection of utilities for Node.js streams in object mode. It wraps vanilla Readable streams, and also arrays, allowing you to manipulate them functionally.
TALK ABOUT FUNCTIONAL PARADIGM FOR STREAMS. TALK ABOUT BACK-PRESSURE. TALK ABOUT PASSING EXCEPTIONS.
Basic Example
pvc provides several methods for functional transformation of streams.
Install with npm install pvc.
var pvc = require('pvc');
// You can easily make an array into a stream
pvc.source([1, 2, 3, 4, 5, 6, 7, 8, 9])
.map( function (n) { return n + 1; } )
// [2, 3, 4, 5, 6, 7, 8, 9, 10]
.filter( function (n) { return n % 2 == 0; } )
// [2, 4, 6, 8, 10]
.mapAsync( function (n, callback) {
setTimeout( function () {
callback(null, n + 10);
}, 100)
})
// [12, 14, 16, 18, 20]
.filterAsync( function (n, callback) {
setTimeout( function () {
callback(null, ( n % 3 == 0 ) );
}, 100)
})
// [12, 18]
.toArray( function (exceptions, results) {
console.error('Exceptions:', exceptions);
// 'Exceptions: null'
console.log('Results:', results);
// 'Results: [12, 18]'
});Sources
source
Create a pvc Readable stream, from the argument if supplied.
If there is no argument given, create a PassThrough source:
s = pvc.source();
s.write(1);
s.write('a');
s.read(); // 1
s.read(); // 'a'
s.read(); // nullIf the argument is an array, create a readable stream from the array:
s = pvc.source([1, 'a', '1']);
s.read(); // 1
s.read(); // 'a'
s.read(); // '1'
s.read(); // nullIf the argument is a node Readable stream, augment it with the pvc functional methods.
// TODO: Examplemerge
Merge takes multiple streams, and merges them into a single stream. All outputs objects of the input streams will be outputted by the merged stream, but there is no guarantee of the order.
// TODO: Examplezip
Zip takes a map of streams, zipping them into a stream that outputs objects containing the values of the stream outputs.
// TODO: ExampleSinks
Sinks pull results from the pipe as fast as they can, accumulating them in some way. They are all passed a callback which is called when the pipe is finished.
toArray
Collect the output of the pipe into an array. toArray is called when the
pipe is finished with a callback which receives an exceptions array (null if
no exceptions occured) and a results array.
pvc.source(1, 2, 3)
.map(function (x) {
if (x == 2)
throw new Error('BAD');
else
return x * 2
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Results:', results);
// 'Results: [2, 4]'
});reduce
Reduce the output of the pipe according to the passed reduction function. An
optional initialValue argument is the initial value used for reduction;
if it is not passed (or passed as null or undefined), the first value in
the pipe is used as the initial value.
pvc.source([1, 2, 3, 4])
.reduce(function (x, y) {
if (x == 2)
throw new Error('BAD');
else
return x + y;
}, function (exceptions, result) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Result:', result);
// 'Result: 8'
});
pvc.source([1, 2, 3, 4])
.reduce(-4, function (x, y) {
return x + y;
}, function (exceptions, result) {
console.log('Exceptions:', exceptions);
// 'Exceptions: null'
console.log('Result:', result);
// 'Result: 6'
});count
Count the number of output objects (not including exceptions).
pvc.source([1, 2, 3, 4])
.count(function (exceptions, result) {
console.log('Exceptions:', exceptions);
// 'Exceptions: null'
console.log('Result:', result);
// 'Result: 4'
});Transforms
limit
Limits the stream to the first n entries.
s = pvc.source( ['a', 'b', 'c', 'd', 'e'] ).limit(2);
s.read(); // 'a'
s.read(); // 'b'
s.read(); // nullskip
Skips the first n entries.
s = pvc.source( ['a', 'b', 'c', 'd', 'e'] ).skip(2);
s.read(); // 'c'
s.read(); // 'd'
s.read(); // 'e'
s.read(); // nullmap
Map the incoming objects, passing the mapped objects. If the mapping function
throws an error, pass that through. It will drop null or undefined output
values.
pvc.source(1, 2, 3)
.map(function (x) {
if (x == 2)
throw new Error('BAD');
else
return x * 2
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Results:', results);
// 'Results: [2, 4]'
});mapAsync
Map the incoming objects through an asynchronous function, passing the mapped
objects. The mapping function takes the incoming object and a node-style
callback. If the callback is passed a non-null argument in the first position,
it will pass it as an exception. Arguments in the second position will be
passed as outputs. It will drop null or undefined outputs.
pvc.source(1, 2, 3)
.mapAsync(function (x, callback) {
process.nextTick( function () {
if (x == 2)
callback(new Error('BAD'));
else
callback(null, x * 2);
})
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Results:', results);
// 'Results: [2, 4]'
});filter
Filter the incoming objects, only passing objects that pass the truth test. If the filter function throws an error, pass it through as an exception.
pvc.source(1, 2, 3)
.filter(function (x) {
if (x == 2)
throw new Error('BAD');
else
return x == 3
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Results:', results);
// 'Results: [3]'
});filterAsync
Pass the inputs through an asynchronous filter function. The filter function is passed the input object, and a node-style callback.
pvc.source(1, 2, 3)
.mapAsync(function (x, callback) {
process.nextTick( function () {
if (x == 2)
callback(new Error('BAD'));
else
callback(null, x == 3);
})
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Results:', results);
// 'Results: [3]'
});doto
Operate on the incoming elements and pass them through. Note that currently
this is performed synchronously; pass through a function wrapped in
process.nextTick to make it perform asynchronously.
pvc.source(1, 2, 3)
.doto(function (x) {
console.log('Passing through:', x);
// 'Passing through: 1'
// 'Passing through: 2'
// 'Passing through: 3'
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: null'
console.log('Results:', results);
// 'Results: [1, 2, 3]'
});separate
Separate incoming arrays, so that they are flattened on the output.
Takes two options, lax and recursive. If recursive == false (default),
it does not flatten array elements of arrays. If recursive == true, it will
recursively flatten array elements of arrays (all the way down). If
lax == false (default), it will emit an error if it receives a non-array
input. If lax == true, it will allow non-array inputs to pass through
unchanged.
s = pvc.source([ [1], [2, 3] ]).separate();
s.read(); // 1
s.read(); // [2, 3]
s.read(); // null
s = pvc.separate();
s.write( 1 );
s.on('error', function (e) {
// Will emit an error because 1 is not an array!
});
s = pvc.separate({ lax: true });
s.write( 1 );
s.read(); // 1split
TODO: Description
debounce
TODO: Description