mqstreams v1.0.0
mqstreams  
Publish-Subscribe node streams style, based on mqemitter.
- Installation
- Basic Example
- API
- Licence & copyright
Installation
$ npm install mqemitter mqstreams --saveBasic Example
'use strict'
var mqemitter = require('mqemitter')
var mqstreams = require('mqstreams')
var emitter = mqstreams(mqemitter())
var through = require('through2')
var input = emitter.writable()
var output = emitter.readable('output/#')
emitter
  .readable('some/+')
  .pipe(through.obj(function (msg, enc, callback) {
    msg.topic = 'output/' + msg.topic
    this.push(msg)
    callback()
  }))
  .pipe(emitter.writable())
input.write({ topic: 'some/food', type: 'greek' })
input.write({ topic: 'some/startup', type: 'instasomething' })
input.end({ topic: 'some/dev', type: 'matteo' })
output.on('data', function (msg) {
  console.log(msg)
  // OUTPUT:
  // { topic: 'output/some/food', type: 'greek' }
  // { topic: 'output/some/startup', type: 'instasomething' }
  // { topic: 'output/some/dev', type: 'matteo' }
})API
- mqstreams
- emitter#readable()
- emitter#writable()
mqstreams(mqemitter)
Extends the MQEmitter with the readable() and writable() methods.
emitter.readable(topic, opts)
Return
a Readable
stream in object mode that will include all emitter messages that match
the given topic. The opts parameter is passed through to the Stream
constructor. This stream fully respect the Stream3 interface.
The topic parameter is passed to the
emitter.on method.
the returned object has the following method added:
subscribe(), unsubscribe(), destroy().
emitter.readable#subscribe(topic)
Subscribe to the given topic, which can also be an array of topics.
emitter.readable#unsubscribe(topic)
Unsubscribe from the given topic, which can also be an array of topics.
emitter.readable#destroy()
Close the stream, unsubscribing from all the topics.
This is aliased to close() for backwards compatibility.
emitter.writable(opts)
Return
a Writable
stream in object mode that will pass any message to the
emitter.emit method.
This stream fully respect the Stream3 interface.
LICENSE
MIT