conveyor v0.0.3
Description
Feed multiple node.js streams sequentially into one (Writable or Duplex) stream.
Requirements
- node.js -- v0.10.0 or newer
Install
npm install conveyorExamples
- Pass HTTP requests to an echo stream:
var TransformStream = require('stream').Transform,
http = require('http');
var Conveyor = require('conveyor');
var stream = new TransformStream();
stream._transform = function(chunk, encoding, cb) {
this.push(chunk);
cb();
};
var c = new Conveyor(stream),
TOTAL = 10,
count = 0;
http.createServer(function(req, res) {
if (++count === TOTAL)
this.close();
c.push(req, res);
}).listen(8080, function() {
for (var i = 0; i < TOTAL; ++i) {
http.request({
host: '127.0.0.1',
port: 8080,
method: 'POST'
}, function(res) {
var b = '';
res.setEncoding('utf8');
res.on('data', function(d) {
b += d;
}).on('end', function() {
console.log(b);
});
}).end('Hello from request #' + (i + 1));
}
});
// output:
// Hello from request #1
// Hello from request #2
// Hello from request #3
// Hello from request #4
// Hello from request #5
// Hello from request #6
// Hello from request #7
// Hello from request #8
// Hello from request #9
// Hello from request #10- Pass HTTP requests to an Writable stream:
var WritableStream = require('stream').Writable,
http = require('http');
var Conveyor = require('conveyor');
var stream = new WritableStream();
stream._write = function(chunk, encoding, cb) {
console.log(chunk.toString());
cb();
};
var c = new Conveyor(stream),
TOTAL = 10,
count = 0;
http.createServer(function(req, res) {
if (++count === TOTAL)
this.close();
c.push(req, function() {
// this req stream finished
res.end();
});
}).listen(8080, function() {
for (var i = 0; i < TOTAL; ++i) {
http.request({
host: '127.0.0.1',
port: 8080,
method: 'POST'
}, function(res) {
res.resume();
}).end('Hello from request #' + (i + 1));
}
});
// output (assuming 1-chunk requests):
// Hello from request #1
// Hello from request #2
// Hello from request #3
// Hello from request #4
// Hello from request #5
// Hello from request #6
// Hello from request #7
// Hello from request #8
// Hello from request #9
// Hello from request #10API
Conveyor is an EventEmitter
Conveyor events
- end() - Emitted after end() is called and all streams have been processed.
Conveyor methods
(constructor)(< Writable >dest, < object >config) - Creates and returns a new Dicer instance with the following valid
configsettings:- max - integer - This is the max queue size.
push(< Readable >stream, < Writable >pipeStream, < function >callback) - boolean - Pushes (appends)
streamto the queue. IfpipeStreamis set, data (fromdestpassed to the constructor) will be piped to this stream with optionalpipeStreamOptspipe settings.callbackis called oncestreamhas ended anddestis drained. The return value is false ifstreamcould not be enqueued due to the queue being full.unshift(< Readable >stream, < Writable >pipeStream, < function >callback) - boolean - Identical to push() except it unshifts (prepends)
streamto the queue.