primus-backpressure v3.0.3
primus-backpressure

Node streams2 over Primus: added back-pressure!
- Pass in a Primus client or spark, get back a
stream.Duplex. Do this on both sides of a Primus connection. writereturnstruewhen the receiver is full.- Use
readandreadableto exert back-pressure on the sender. - Unit tests with 100% coverage.
- Browser unit tests using webpack and PhantomJS.
The API is described here.
Example
Exerting backpressure over Primus:
var Primus = require('primus'),
PrimusDuplex = require('primus-backpressure').PrimusDuplex,
primus = Primus.createServer({ port: 9000 }),
Socket = primus.Socket,
assert = require('assert'),
read_a = false;
primus.once('connection', function (spark)
{
var spark_duplex = new PrimusDuplex(spark,
{
highWaterMark: 2
});
assert.equal(spark_duplex.write('ab'), false);
spark_duplex.on('drain', function ()
{
assert(read_a);
});
});
var client_duplex = new PrimusDuplex(new Socket('http://localhost:9000'),
{
highWaterMark: 1
});
client_duplex.once('readable', function ()
{
assert.equal(this.read().toString(), 'a');
read_a = true;
assert.equal(this.read(), null);
this.once('readable', function ()
{
assert.equal(this.read().toString(), 'b');
primus.end();
console.log('done')
});
});Another Example
Piping data over Primus:
var Primus = require('primus'),
PrimusDuplex = require('primus-backpressure').PrimusDuplex,
primus = Primus.createServer({ port: 9000 }),
Socket = primus.Socket,
assert = require('assert'),
crypto = require('crypto'),
tmp = require('tmp'),
fs = require('fs');
primus.once('connection', function (spark)
{
var spark_duplex = new PrimusDuplex(spark);
tmp.tmpName(function (err, random_file)
{
assert.ifError(err);
var random_buf = crypto.randomBytes(1024 * 1024);
fs.writeFile(random_file, random_buf, function (err)
{
assert.ifError(err);
tmp.tmpName(function (err, out_file)
{
assert.ifError(err);
var random_stream = fs.createReadStream(random_file),
out_stream = fs.createWriteStream(out_file);
out_stream.on('finish', function ()
{
fs.readFile(out_file, function (err, out_buf)
{
assert.ifError(err);
assert.deepEqual(out_buf, random_buf);
fs.unlink(random_file, function (err)
{
assert.ifError(err);
fs.unlink(out_file, function (err)
{
assert.ifError(err);
primus.end();
console.log('done');
});
});
});
});
spark_duplex.pipe(out_stream);
random_stream.pipe(spark_duplex);
});
});
});
});
var client_duplex = new PrimusDuplex(new Socket('http://localhost:9000'));
client_duplex.pipe(client_duplex);Installation
npm install primus-backpressureLicence
Test
Node client to Node server:
grunt testBrowser client to Node server (requires PhantomJS):
grunt test-browserCode Coverage
grunt coveragec8 results are available here.
Coveralls page is here.
Lint
grunt lintAPI
PrimusDuplex inherits from stream.Duplex so you can call any method from stream.Readable and stream.Writable.
Extra constructor options and an additional parameter to readable.read are described below.
Source: index.js
PrimusDuplex(msg_stream, options)
Creates a new
PrimusDuplexobject which exerts back-pressure over a Primus connection.
Both sides of a Primus connection must use PrimusDuplex — create one for your Primus client and one for your spark as soon as you have them.
Parameters:
{Object} msg_streamThe Primus client or spark you wish to exert back-pressure over.{Object} [options]Configuration options. This is passed ontostream.Duplexand can contain the following extra properties:{Function} [encode_data(chunk, encoding, start, end, internal)]Optional encoding function for data passed towritable.write.chunkandencodingare as described in thewritable.writedocumentation. The difference is thatencode_datais synchronous (it must return the encoded data) and it should only encode data between thestartandendpositions inchunk. Defaults to a function which doeschunk.toString('base64', start, end). Note thatPrimusDuplexmay also pass some internal data through this function (always withchunkas aBuffer,encoding=nullandinternal=true).{Function} [decode_data(chunk, internal)]Optional decoding function for data received on the Primus connection. The type ofchunkwill depend on how the peerPrimusDuplexencoded it. Defaults to a function which doesBuffer.from(chunk, 'base64'). If the data can't be decoded, returnnull(and optionally callthis.emitto emit an error). Note thatPrimusDuplexmay also pass some internal data through this function (always withinternal=true) — in which case you must return aBuffer.{Integer} [max_write_size]Maximum number of bytes to write onto the Primus connection at once, regardless of how many bytes the peer is free to receive. Defaults to 0 (no limit).{Boolean} [check_read_overflow]Whether to check if more data than expected is being received. Iftrueand the high-water mark for reading is exceeded then thePrimusDuplexobject emits anerrorevent. This should not normally occur unless you add data yourself usingreadable.unshift— in which case you should setcheck_read_overflowtofalse. Defaults totrue.
Go: TOC
PrimusDuplex.prototype.read(size, send_status)
See
readable.read.PrimusDuplexadds an extra optional parameter,send_status.
Parameters:
{Number} [size]Optional argument to specify how much data to read. Defaults toundefined(you can also specifynull) which means return all the data available.{Boolean} [send_status]Every time you callread, a status message is sent to the peerPrimusDuplexindicating how much space is left in the internal buffer to receive new data. To prevent deadlock, these status messages are always sent — they aren't subject to back-pressure. Normally this is fine because status messages are small. However, if your application reads data one byte at a time, for example, you may wish to control when status messages are sent. To stop a status message being sent when you callread, passsend_statusasfalse.send_statusdefaults totrue. To force a status message to be sent without reading any data, callread(0).
Go: TOC | PrimusDuplex.prototype
—generated by apidox—
2 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
6 years ago
7 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago