nsq.js-jc v0.15.1
nsq.js
JavaScript NSQ client WIP.
Features
- actually written in js :p
- easier debugging via debug() instrumentation
- native json message support
- does not arbitrarily apply backoff on requeues
- disabling of auto-RDY support for manual control (high throughput etc)
- reconnection to dead nsqd nodes
- graceful close support
Installation
$ npm install nsq.js
About
Debugging
The DEBUG environment variable can be used to enable traces within the module, for example all nsq debug() calls except fo the framer:
$ DEBUG=nsq*,-nsq:framer node test
nsq:reader connect nsqd 0.0.0.0:4150 events/ingestion [5] +0ms
nsq:connection connect: 0.0.0.0:4150 V2 +0ms
nsq:connection command: IDENTIFY null +2ms
nsq:connection command: SUB ["events","ingestion"] +1ms
nsq:connection command: RDY [5] +0ms
nsq:connection connect: undefined:4150 V2 +0ms
nsq:connection command: IDENTIFY null +1ms
nsq:connection command: PUB ["events"] +0ms
nsq:reconnect reset backoff +0ms
nsq:reconnect reset backoff +1ms
nsq:connection response OK +3ms
nsq:connection response OK +0ms
nsq:connection response OK +0ms
Requeue backoff
The NSQD documentation recommends applying backoff when requeueing implying that the consumer is faulty, IMO this is a weird default, and the opposite of what we need so it's not applied in this client.
Example
var nsq = require('nsq.js');
// subscribe
var reader = nsq.reader({
nsqd: [':4150'],
maxInFlight: 1,
maxAttempts: 5,
topic: 'events',
channel: 'ingestion'
});
reader.on('error', function(err){
console.log(err.stack);
});
reader.on('message', function(msg){
var body = msg.body.toString();
console.log('%s attempts=%s', body, msg.attempts);
msg.requeue(2000);
});
reader.on('discard', function(msg){
var body = msg.body.toString();
console.log('giving up on %s', body);
msg.finish();
});
// publish
var writer = nsq.writer(':4150');
writer.on('ready', function() {
writer.publish('events', 'foo');
writer.publish('events', 'bar');
writer.publish('events', 'baz');
});
API
nsq.reader(options)
Create a reader:
id
connection identifier (seeclient_id
in the spec)topic
topic namechannel
channel namensqd
array of nsqd addressesnsqlookupd
array of nsqlookupd addressesmaxAttempts
max attempts before discarding InfinitymaxConnectionAttempts
max reconnection attempts InfinitymaxInFlight
max messages distributed across connections 10msgTimeout
session-specific msg timeoutpollInterval
nsqlookupd poll interval10000ready
whenfalse
auto-RDY maintenance will be disabledtrace
trace function
Events:
message
(msg) incoming messagediscard
(msg) discarded messageerror response
(err) response from nsqerror
(err)
reader#close(fn)
Close the reader's connection(s) and fire the optional fn when completed.
nsq.writer(options|address)
Create a writer. By default a connection attempt to 0.0.0.0:4150 will be made unless one of the following options are provided:
port
numberhost
namensqd
array of nsqd addressesnsqlookupd
array of nsqlookupd addresses
Events:
error response
(err) response from nsqerror
(err)
writer#publish(topic, message, fn)
Publish the given message
to topic
where message
may be a string, buffer, or object. An array of messages
may be passed, in which case a MPUT is performed.
writer#close(fn)
Close the writer's connection(s) and fire the optional fn when completed.
Message
A single message.
Message#finish()
Mark message as complete.
Message#requeue(delay)
Re-queue the message immediately, or with the
given delay
in milliseconds, or a string such
as "5s", "10m" etc.
Message#touch()
Reset the message's timeout, increasing the length of time before NSQD considers it timed out.
Message#json()
Return parsed JSON object.
Tracing
The following jstrace probes are available:
connection:ready
ready count sentconnection:message
message receivedmessage:finish
finished a messagemessage:requeue
requeued a messagemessage:touch
touched a message
Running tests
nsqd --lookupd-tcp-address=0.0.0.0:4160 &
nsqadmin --lookupd-http-address=0.0.0.0:4161 &
nsqlookupd &
make test
License
MIT
8 years ago