distributed-eventemitter v1.2.1
Why?
The library solve the need of a multi process and multi server oriented messaging API in Node.js. Using the known EventEmitter API, listeners registration and events emitting is super simple. A new 'emitToOne' method allows one-to-one events notification, intended for request/response flows on clustered services. The classic 'emit' method broadcast custom events to local and distributed listeners.
For non-distributed usage of this API (browser, single node process or development) developers can use: process-eventemitter
Quick Start
Mailer server (A.js):
var EventEmitter = require('distributed-eventemitter'); var events = new EventEmitter(); // host: localhost, port: 61613 events.connect().then(() => { events.on('email.send', (message, resolve, reject) => { console.log('sending email...'); //... send email // ... resolve('sent'); }); });
Run mailer server as a cluster with PM2:
pm2 start A.js -i 4 --node-args="--harmony"
Mailer client (B.js):
var EventEmitter = require('distributed-eventemitter'); var events = new EventEmitter(); // host: localhost, port: 61613 events.connect().then(() => { events.emitToOne('email.send', { to: 'kyberneees@gmail.com', subject: 'Hello Node.js', body: 'Introducing easy distributed messaging for Node.js...' }, 3000).then((response) => { if ('sent' === response) { console.log('email was sent!'); } }).catch(console.log.bind()); });
Run mailer client:
node --harmony B.js
Example using ES6 generators
"use strict";
const EventEmitter = require('distributed-eventemitter');
const co = require('co');
const events = new EventEmitter(); // host: localhost, port: 61613
co(function* () {
try {
yield events.connect();
let response = yield events.emitToOne('email.send', {
to: 'kyberneees@gmail.com',
subject: 'Hello Node.js',
body: 'Introducing easy distributed messaging for Node.js...'
}, 3000);
if ('sent' === response) {
console.log('email was sent!');
}
yield events.disconnect();
} catch (error) {
console.log('error: ' + error);
}
});
Requirements
Running STOMP compliant server instance. Default client destinations are:
- /topic/distributed-eventemitter: Used for events broadcast (emit)
/queue/distributed-eventemitter: Used for one-to-one events (emitToOne)
If the server require clients to be authenticated, you can use:
{ 'host': 'localhost', 'connectHeaders': { 'heart-beat': '5000,5000', 'host': '', 'login': 'username', 'passcode': 'password' } }
Installation
$ npm install distributed-eventemitter
Features
- Extends eventemitter2. (wildcards are enabled).
- ECMA6 Promise based API.
- Request/Response communication intended for service clusters (emitToOne). Uncaught exceptions automatically invoke 'reject(error.message)'
- Events broadcast to local and distributed listeners (emit)
- Works with any STOMP compliant server (ie. ActiveMQ, RabbitMQ, ...).
- Uses stompit as STOMP client. (automated server reconnection is supported)
Config params
var config = {};
config.destination = 'distributed-eventemitter'; // server topic and queue destinations
config.excludedEvents = []; // events that are not distributed
config.servers = [{
'host': 'localhost',
'port': 61613,
'connectHeaders': {
'heart-beat': '5000,5000',
'host': '',
'login': '',
'passcode': ''
}
}];
config.reconnectOpts = {
maxReconnects: 10;
}
var events = new EventEmitter(config);
For more details about 'servers' and 'reconnectOpts' params please check: http://gdaws.github.io/node-stomp/api/connect-failover/
Internal events
events.on('connected', (emitterId) => {
// triggered when the emitter has been connected to the network (STOMP server)
});
events.on('disconnected', (emitterId) => {
// triggered when the emitter has been disconnected from the network (STOMP server)
});
events.on('error', (error) => {
// triggered when an error occurs in the connection channel.
}):
events.on('connecting', (connector) => {
// triggered when the STOMP client is trying to connect to a server.
}):
events.on('request', (event, request, raw) => {
// triggered before invoke a listener using emitToOne feature
// request data filtering and modification is allowed
// example:
request.data = ('string' === typeof request.data) ? request.data.toUpperCase() : request.data
}):
events.on('response', (event, response, raw) => {
// triggered after invoke a listener using emitToOne feature
// response data filtering and modification is allowed
// example:
if (response.ok)
response.data = ('string' === typeof response.data) ? response.data.toUpperCase() : response.data
else
console.log('error ocurred: ' + response.data.message);
}):
API
getId: Get the emitter instance unique id.
events.getId(); // UUID v4 value
connect: Connect the emitter to the server. Emit the 'connected' event.
events.connect().then(()=> {
console.log('connected');
});
disconnect: Disconnect the emitter from the server. Emit the 'disconnected' event.
events.disconnect().then(()=> {
console.log('disconnected');
});
emitToOne: Notify a custom event to only one target listener (locally or in the network). The method accept only one argument as event data.
events.on('my.event', (data, resolve, reject) => {
if ('hello' === data){
resolve('world');
} else {
reject('invalid args');
}
});
// calling without timeout
events.emitToOne('my.event', 'hello').then((response) => {
console.log('world' === response);
});
// calling with timeout (ms)
events.emitToOne('my.event', {data: 'hello'}, 100).catch((error) => {
console.log('invalid args' === error);
});
Roadmap
- Express integration.
Known limitations
- The feature 'emitAsync' from the EventEmitter2, only work locally(not distributed).
Tests
$ npm install
$ npm test
History changes
1.1.x
- From version 1.1.0+ we use Stompit as STOMP client because Stompjs does not support server reconnections and is also unmaintained.
- No API changes from 1.0 version except the configuration params, since now we consider a list of STOMP servers to connect/reconnect.
1.1.1
- Supporting request/request events intended for pre invocation content filtering and modification.
1.1.2
Fixing dependencies declaration (due to automatic modification from 'npm update --save').
1.1.3
Fixing issue on distributed events broadcast(emit) that caused repeated broadcasts.
1.1.4
Updating dependencies and adding generators example to docs.
1.1.5
- Updating dependencies and docs.
1.1.6
Improving unexpected error handling when data can't be transmited because the STOMP connection is closed. Produced exceptions are notified as an 'error' event on the emitter.
1.2.0
- Supporting jWebSocket EventBus. JVM based applications using the jWebSocket EventBus component, can communicate with nodejs services through this module.
1.2.1
- Minor source code refactorization.
- Package updates.