mqlobber v9.0.7
mqlobber

Streaming message queue with pub-sub, work queues, wildcards and back-pressure. Just Node and a filesystem required.
mqlobber basically remotes
qlobber-fsq over one or more
connections.
Say you have a server and a number of clients, with the clients
connected to the server using some mechanism which provides a stream for each
connection. Create a QlobberFSQ instance on the server and for each stream,
pass the instance and the stream to MQlobberServer.
On each client, pass the other end of the stream to MQlobberClient. Clients
can then publish and subscribe to topics (including wildcard subscriptions).
Work queues are also supported - when publishing a message, a client can specify
that only one subscriber should receive it.
All data is transferred on streams multiplexed over each connection using
bpmux, with full back-pressure support
on each stream. Clients get a Writable when publishing a message and a
Readable when receiving one.
You can scale out horizontally by creating a number of QlobberFSQ instances
(e.g. one per CPU core), all sharing the same message directory.
No other backend services are required - just Node and a filesystem.
The API is described here.
Example
First, let's create a server program which listens on a TCP port specified on the command line:
// server.js
var net = require('net'),
QlobberFSQ = require('qlobber-fsq').QlobberFSQ,
MQlobberServer = require('mqlobber').MQlobberServer,
fsq = new QlobberFSQ();
fsq.on('start', function ()
{
var server = net.createServer().listen(parseInt(process.argv[2]));
server.on('connection', function (c)
{
new MQlobberServer(fsq, c);
});
});Next, a program which connects to the server and subscribes to messages published to a topic:
// client_subscribe.js
var assert = require('assert'),
MQlobberClient = require('mqlobber').MQlobberClient,
c = require('net').createConnection(parseInt(process.argv[2])),
mq = new MQlobberClient(c),
topic = process.argv[3];
mq.subscribe(topic, function (s, info)
{
var msg = '';
s.on('readable', function ()
{
var data;
while ((data = this.read()) !== null)
{
msg += data.toString();
}
});
s.on('finish', function ()
{
c.end();
});
s.on('end', function ()
{
console.log('received', info.topic, msg);
assert.equal(msg, 'hello');
});
});Finally, a program which connects to the server and publishes a message to a topic:
// client_publish.js
var MQlobberClient = require('mqlobber').MQlobberClient,
c = require('net').createConnection(parseInt(process.argv[2])),
mq = new MQlobberClient(c);
mq.publish(process.argv[3], function ()
{
c.end();
}).end('hello');Run two servers listening on ports 8600 and 8601:
node server.js 8600 &
node server.js 8601 &Subscribe to two topics, foo.bar and wildcard topic foo.*, one against each
server:
node client_subscribe.js 8600 foo.bar &
node client_subscribe.js 8601 'foo.*' &Then publish a message to the topic foo.bar:
node client_publish.js 8600 foo.barYou should see the following output, one line from each subscriber:
received foo.bar hello
received foo.bar helloOnly the servers should still be running and you can now terminate them:
$ jobs
[1]- Running node server.js 8600 &
[2]+ Running node server.js 8601 &
$ kill %1 %2
[1]- Terminated node server.js 8600
[2]+ Terminated node server.js 8601Installation
npm install mqlobberLicence
Test
grunt testLint
grunt lintCode Coverage
grunt coverageIstanbul results are available here.
Coveralls page is here.
API
- MQlobberClient
- MQlobberClient.prototype.subscribe
- MQlobberClient.prototype.unsubscribe
- MQlobberClient.prototype.publish
- MQlobberClient.events.handshake
- MQlobberClient.events.backoff
- MQlobberClient.events.drain
- MQlobberClient.events.full
- MQlobberClient.events.removed
- MQlobberClient.events.error
- MQlobberClient.events.warning
- MQlobberServer
- MQlobberServer.prototype.subscribe
- MQlobberServer.prototype.unsubscribe
- MQlobberServer.events.subscribe_requested
- MQlobberServer.events.unsubscribe_requested
- MQlobberServer.events.unsubscribe_all_requested
- MQlobberServer.events.publish_requested
- MQlobberServer.events.message
- MQlobberServer.events.handshake
- MQlobberServer.events.backoff
- MQlobberServer.events.drain
- MQlobberServer.events.full
- MQlobberServer.events.removed
- MQlobberServer.events.ack
- MQlobberServer.events.error
- MQlobberServer.events.warning
MQlobberClient(stream, options)
Create a new
MQlobberClientobject for publishing and subscribing to messages via a server.
Parameters:
{Duplex} streamConnection to a server. The server should useMQlobberServeron its side of the connection. How the connection is made is up to the caller - it just has to supply aDuplex. For example,net.SocketorPrimusDuplex.{Object} [options]Configuration options. This is passed down toQlobberDedup(which matches messages received from the server to handlers) andBPMux(which multiplexes message streams over the connection to the server). It also supports the following additional property:{Buffer} [handshake_data]Application-specific handshake data to send to the server. The server-sideMQlobberServerobject will emit this as ahandshakeevent to its application.
Throws:
{Error}If an error occurs before initiating the multiplex with the server.
Go: TOC
MQlobberClient.prototype.subscribe(topic, handler, cb)
Subscribe to messages published to the server.
Parameters:
{String} topicWhich messages you're interested in receiving. Message topics are split into words using.as the separator. You can use*to match exactly one word in a topic or#to match zero or more words. For example,foo.*would matchfoo.barwhereasfoo.#would matchfoo,foo.barandfoo.bar.wup. Note these are the default separator and wildcard characters. They can be changed on the server when constructing theQlobberFSQobject passed toMQlobberServer.{Function} handlerFunction to call when a new message is received from the server due to its topic matching againsttopic.handlerwill be passed the following arguments:{Readable} streamThe message content as a Readable. Note that all subscribers will receive the same stream for each message.{Object} infoMetadata for the message, with the following properties:{String} topicTopic to which the message was published.{Boolean} singleWhether this message is being given to at most one handler (across all clients connected to all servers).{Integer} expiresWhen the message expires (number of seconds after 1 January 1970 00:00:00 UTC). This is only present if the server'sMQlobberServerinstance is configured withsend_expiresset totrue.{Integer} sizeSize of the message in bytes. This is only present if the server'sMQlobberServerinstance is configured withsend_sizeset totrue.
{Function} doneFunction to call once you've handled the message. Note that calling this function is only mandatory ifinfo.single === true, in order to clean up the message on the server.donetakes one argument:{Object} errIf an error occurred then pass details of the error, otherwise passnullorundefined.
{Function} [cb]Optional function to call once the subscription has been registered with the server. This will be passed the following argument:{Object} errIf an error occurred then details of the error, otherwisenull.
Throws:
{Error}If an error occurs before sending the subscribe request to the server.
Go: TOC | MQlobberClient.prototype
MQlobberClient.prototype.unsubscribe(topic, handler, cb)
Unsubscribe to messages published to the server.
Parameters:
{String} [topic]Which messages you're no longer interested in receiving via thehandlerfunction. Iftopicisundefinedthen all handlers for all topics are unsubscribed.{Function} [handler]The function you no longer want to be called with messages published to the topictopic. This should be a function you've previously passed tosubscribe. If you subscribedhandlerto a different topic then it will still be called for messages which match that topic. Ifhandlerisundefined, all handlers for the topictopicare unsubscribed.{Function} [cb]Optional function to call oncehandlerhas been unsubscribed fromtopicon the server. This will be passed the following argument:{Object} errIf an error occurred then details of the error, otherwisenull.
Throws:
{Error}If an error occurs before sending the unsubscribe request to the server.
Go: TOC | MQlobberClient.prototype
MQlobberClient.prototype.publish(topic, options, cb)
Publish a message to the server for interested clients to receive.
Parameters:
{String} topicMessage topic. The topic should be a series of words separated by.(or whatever you configuredQlobberFSQwith on the server).{Object} [options]Optional settings for this publication:{Boolean} singleIftruethen the message will be given to at most one handler (across all clients connected to all servers). If you don't specify this then all interested handlers (across all clients) will receive it.{Integer} ttlTime-to-live (in seconds) for this message. If you don't specify this then the default is taken from theQlobberFSQinstance on the server. In any case,QlobberFSQ's configured time-to-live is used to constrainttl's maximum value.
{Function} [cb]Optional function to call once the server has published the message. This will be passed the following argument:{Object} errIf an error occurred then details of the error, otherwisenull.
Return:
{Writable} Stream to which to write the message's data. Make sure you end it when you're done.
Throws:
{Error}If an error occurs before sending the publish request to the server.
Go: TOC | MQlobberClient.prototype
MQlobberClient.events.handshake(handshake_data)
handshakeevent
Emitted by a MQlobberClient object after it successfully completes an initial
handshake with its peer MQlobberServer object on the server.
Parameters:
{Buffer} handshake_dataApplication-specific data which theMQlobberServerobject sent along with the handshake.
Go: TOC | MQlobberClient.events
MQlobberClient.events.backoff()
backoffevent
Emitted by a MQlobberClient object when it delays a request to the server
because the connection is at full capacity. If you want to avoid buffering
further requests, don't call subscribe,
unsubscribe or
publish until a drain event is emitted.
Go: TOC | MQlobberClient.events
MQlobberClient.events.drain()
drainevent
Emitted by a MQlobberClient object when the multiplexing layer emits a drain event.
Go: TOC | MQlobberClient.events
MQlobberClient.events.full()
fullevent
Emitted by a MQlobberClient object when the multiplexing layer emits a full event.
Go: TOC | MQlobberClient.events
MQlobberClient.events.removed(duplex)
removedevent
Emitted by a MQlobberClient object when the multiplexing layer emits a removed event.
Parameters:
{Duplex} duplexThe multiplexed stream which has closed.
Go: TOC | MQlobberClient.events
MQlobberClient.events.error(err, obj)
errorevent
Emitted by a MQlobberClient object if an error is emitted by the multiplexing
layer (bpmux), preventing proper
communication with the server.
Parameters:
{Object} errThe error that occurred.{Object} objThe object on which the error occurred.
Go: TOC | MQlobberClient.events
MQlobberClient.events.warning(err, obj)
warningevent
Emmited by a MQlobberClient object when a recoverable error occurs. This will
usually be due to an error on an individual request or multiplexed stream.
Note that if there are no warning event listeners registered then the error
will be displayed using console.error.
Parameters:
{Object} errThe error that occurred.{Object} objThe object on which the error occurred.
Go: TOC | MQlobberClient.events
MQlobberServer(fsq, stream, options)
Create a new
MQlobberServerobject for publishing and subscribing to messages on behalf of a client.
Parameters:
{QlobberFSQ | QlobberPG} fsqFile system queue - an instance ofQlobberFSQ. This does the heavy-lifting of reading and writing messages to a directory on the file system. Alternatively, you can pass an instance ofQlobberPG, which uses PostgreSQL to process messages.{Duplex} streamConnection to the client. The client should useMQlobberClienton its side of the connection. How the connection is made is up to the caller - it just has to supply aDuplex. For example,net.SocketorPrimusDuplex.{Object} [options]Configuration options. This is passed down toBPMux(which multiplexes message streams over the connection to the client). It also supports the following additional properties:{Boolean} send_expiresWhether to include message expiry time in metadata sent to the client. Defaults tofalse.{Boolean} send_sizeWhether to include message size in metadata sent to then client. Defaults tofalse.{Boolean} defer_to_final_handlerIftruethen a message stream is only considered finished when allMQlobberServerobjects finish processing it. Defaults tofalse.
Go: TOC
MQlobberServer.prototype.subscribe(topic, options, cb)
Subscribe the connected client to messages.
Note: If the client is already subscribed to topic, this function will do
nothing (other than call cb).
Parameters:
{String} topicWhich messages the client should receive. Message topics are split into words using.as the separator. You can use*to match exactly one word in a topic or#to match zero or more words. For example,foo.*would matchfoo.barwhereasfoo.#would matchfoo,foo.barandfoo.bar.wup. Note these are the default separator and wildcard characters. They can be changed when constructing theQlobberFSQinstance passed toMQlobberServer's constructor.{Object} [options]Optional settings for this subscription:{Boolean} subscribe_to_existingIftruethen the client will be sent any existing, unexpired messages that matchtopic, as well as new ones. Defaults tofalse(only new messages).
{Function} [cb]Optional function to call once the subscription has been made. This will be passed the following arguments:{Object} errIf an error occurred then details of the error, otherwisenull.{Integer} nThe number of subscriptions made (0 iftopicwas already subscribed to, 1 if not).
Go: TOC | MQlobberServer.prototype
MQlobberServer.prototype.unsubscribe(topic, cb)
Unsubscribe the connected client from messages.
Parameters:
{String} [topic]Which messages the client should no longer receive. If topic isundefinedthen the client will receive no more messages at all.{Function} [cb]Optional function to call once the subscription has been removed. This will be passed the following arguments:{Object} errIf an error occurred then details of the error, otherwisenull'.{Integer} nThe number of subscriptions removed.
Go: TOC | MQlobberServer.prototype
MQlobberServer.events.subscribe_requested(topic, cb)
subscribe_requestedevent
Emitted by a MQlobberServer object when it receives a request from its peer
MQlobberClient object to subscribe to messages published to a topic.
If there are no listeners on this event, the default action is to call
subscribe(topic, cb).
If you add a listener on this event, the default action will not be called.
This gives you the opportunity to filter subscription requests in the
application.
Parameters:
{String} topicThe topic to which the client is asking to subscribe.{Function} cbFunction to call after processing the subscription request. This function must be called even if you don't callsubscribeyourself. It takes the following arguments:
Go: TOC | MQlobberServer.events
MQlobberServer.events.unsubscribe_requested(topic, cb)
unsubscribe_requestedevent
Emitted by a MQlobberServer object when it receives a request from its peer
MQlobberClient object to unsubscribe from messages published to a topic.
If there are no listeners on this event, the default action is to call
unsubscribe(topic, cb).
If you add a listener on this event, the default action will not be called.
This gives you the opportunity to filter unsubscription requests in the
application.
Parameters:
{String} topicThe topic from which the client is asking to unsubscribe.{Function} cbFunction to call after processing the unsubscription request. This function must be called even if you don't callunsubscribeyourself. It takes the following arguments:{Object} errIfnullthen a success status is returned to the client (whether you calledunsubscribeor not). Otherwise, the client gets a failed status and awarningevent is emitted witherr.{Integer} nThe number of subscriptions removed.{Buffer} [data]Optional data to return to the client.
Go: TOC | MQlobberServer.events
MQlobberServer.events.unsubscribe_all_requested(cb)
unsubscribe_all_requestedevent
Emited by a MQlobberServer object when it receives a request from its peer
MQlobberClient object to unsubscribe from all messages published to any topic.
If there are no listeners on this event, the default action is to call
unsubscribe(cb). If you add a listener
on this event, the default action will not be called. This gives you the
opportunity to filter unsubscription requests in the application.
Parameters:
{Function} cbFunction to call after processing the unsubscription request. This function must be called even if you don't callunsubscribeyourself. It takes the following arguments:{Object} errIfnullthen a success status is returned to the client (whether you calledunsubscribeor not). Otherwise, the client gets a failed status and awarningevent is emitted witherr.{Integer} nThe number of subscriptions removed.{Buffer} [data]Optional data to return to the client.
Go: TOC | MQlobberServer.events
MQlobberServer.events.publish_requested(topic, stream, options, cb)
publish_requestedevent
Emitted by a MQlobberServer object when it receives a request from its peer
MQlobberClient object to publish a message to a topic.
If there are no listeners on this event, the default action is to call
stream.pipe(fsq.publish(topic, options, cb)), where fsq is the
QlobberFSQ
instance you passed to MQlobberServer's constructor.
Parameters:
{String} topicThe topic to which the message should be published.{Readable} streamThe message data as aReadable. This is multiplexed over the connection to the client - back-pressure is applied to the senderMQlobberClientobject according to when you callread.{Object} optionsOptional settings for this publication:{Boolean} singleIftruethen the message should be published to at most one client (across all servers). Otherwise, it should be published to all interested clients.{Integer} ttlTime-to-live (in seconds) for this message.
{Function} cbFunction to call after processing the publication request. This function must be called even if you don't callpublishyourself. It takes the following arguments:
Go: TOC | MQlobberServer.events
MQlobberServer.events.message(stream, info, multiplex, done)
messageevent
Emitted by a MQlobberServer object when its QlobberFSQ object passes it a
message published to a topic to which its peer MQlobberClient object has subscribed.
If there are no listeners on this event, the default action is to call
stream.pipe(multiplex()).
You can add a listener on this event to insert processing between the message stream and the client.
Parameters:
{Readable} streamThe message content as a Readable. Note that all subscribers will receive the same stream for each message.{Object} infoMetadata for the message, with the following properties:{String} topicTopic to which the message was published.{Boolean} singleWhether this message is being given to at most one handler (across all clients connected to all servers).{Integer} expiresWhen the message expires (number of seconds after 1 January 1970 00:00:00 UTC). This is only present if theMQlobberServerobject was configured withsend_expiresset totrue.{Integer} sizeSize of the message in bytes. This is only present if the server'sMQlobberServerinstance is configured withsend_sizeset totrue.
{Function} multiplexFunction to call in order to multiplex a new stream over the connection to the client. It returns the multiplexed stream, to which the data fromstreamshould be written - after the application applies whatever transforms and processing it requires.{Function} doneIf you don't callmultiplexthen you should call this function to indicate you have finished handling the message.donetakes the following optional argument:{Object} [err]If an error occurred while handling the message, pass it here.
Go: TOC | MQlobberServer.events
MQlobberServer.events.handshake(handshake_data, delay_handshake)
handshakeevent
Emitted by a MQlobberServer object after it receives an initial handshake
message from its peer MQlobberClient object on the client.
Parameters:
{Buffer} handshake_dataApplication-specific data which theMQlobberClientobject sent along with the handshake.{Function} delay_handshakeBy default,MQlobberServerreplies toMQlobberClient's handshake message as soon as your event handler returns and doesn't attach any application-specific handshake data. If you wish to delay the handshake message or provide handshake data, calldelay_handshake. It returns another functon which you can call at any time to send the handshake message. The returned function takes a single argument:{Buffer} [handshake_data]Application-specific handshake data to send to the client. The client-sideMQlobberClientobject will emit this as ahandshakeevent to its application.
Go: TOC | MQlobberServer.events
MQlobberServer.events.backoff()
backoffevent
Emitted by a MQlobberServer object when it delays a message to the client
because the connection is at full capacity.
If you want to avoid buffering further messages, use a filter function (see
QlobberFSQ's constructor) to prevent messages being sent until a drain event is emitted. In the filter function, a handler owned by a MQlobberServer object will have a property named mqlobber_server set to the MQlobberServer object.
You can also use event listeners on subscribe_requested, unsubscribe_requested, unsubscribe_all_requested and publish_requested to prevent responses being
sent to the client until a drain event is emitted.
Depending on your application, you might also terminate the connection if it can't keep up.
Go: TOC | MQlobberServer.events
MQlobberServer.events.drain()
drainevent
Emitted by a MQlobberServer object when the multiplexing layer emits a drain event.
Go: TOC | MQlobberServer.events
MQlobberServer.events.full()
fullevent
Emitted by a MQlobberServer object when the multiplexing layer emits a full event.
Go: TOC | MQlobberServer.events
MQlobberServer.events.removed(duplex)
removedevent
Emitted by a MQlobberServer object when the multiplexing layer emits a removed event.
Parameters:
{Duplex} duplexThe multiplexed stream which has closed.
Go: TOC | MQlobberServer.events
MQlobberServer.events.ack(info)
ackevent
Emitted by a MQlobberServer object when the client has acknowledged receipt
of a message.
Parameters:
{Object} infoMetadata for the message, with the following properties:{String} topicTopic to which the message was published.{Boolean} singleAlwaystruebecause acknowledgements are only supported for messages which were given to a single handler (across all clients connected to all servers).{Integer} expiresWhen the message expires (number of milliseconds after 1 January 1970 00:00:00 UTC).
Go: TOC | MQlobberServer.events
MQlobberServer.events.error(err, obj)
errorevent
Emitted by a MQlobberServer object if an error is emitted by the multiplexing
layer (bpmux), preventing proper
communication with the client.
Parameters:
{Object} errThe error that occurred.{Object} objThe object on which the error occurred.
Go: TOC | MQlobberServer.events
MQlobberServer.events.warning(err, obj)
warningevent
Emited by a MQlobberServer object when a recoverable error occurs. This will
usually be due to an error on an individual request or multiplexed stream.
Note that if there are no warning event listeners registered then the error
will be displayed using console.error.
Parameters:
{Object} errThe error that occurred.{Object} objThe object on which the error occurred.
Go: TOC | MQlobberServer.events
—generated by apidox—
5 years ago
5 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago