0.56.5 • Published 3 years ago

cry-ebus v0.56.5

Weekly downloads
5
License
ISC
Repository
-
Last release
3 years ago

Enterprise bus architecture

INSTALLATION

npm install 'cry-ebus' --save

This library is a robust ZMQ-based implementation of reliable client-broker-worker arhitecture. Cilents request services from workers via a broker. Each worker implements exactly one service and many workers can implement the same service.

Minimal working configuration has one broker, one worker, and one client.

Clients can also send and receive notification via publish/subscribe mechanism. Notifications are organised into channels

Broker, clients, and workers are using ZMQ for communication. This allows for several transport options:

  • tcp: unicast transport using TCP
  • inproc: local in-process (inter-thread) communication transport
  • ipc: local inter-process communication transpor
  • pgm, epgm: reliable multicast transport using PGM

Consult zmq connect documentation for details.

BROKER

Broker is the central component (the bus of enterprise bus arhitecture). It reliably and effciently:

  • Registers and monitors available Workers;
  • Receives service requests from clients and forwards them to appropriate brokers;
  • Returns service responses (results) to the requesting client;
const Broker = require('cry-ebus').Broker;
const broker = new Broker();

broker.on('error', (err) => console.log('broker error', err));

broker.start();

process.on('SIGTERM', function () {
    broker.stop()
    process.exit(0);
});

process.on('SIGINT', function () {
    broker.stop()
    process.exit(0);
});

When starting new Broker(), the following config object can be passed in (here with defaults):

let opts ={ 
	frontend: 'tcp://*:5000',			// ZMQ port for clients
	backend: 'tcp://*:5001',			// ZMQ port for workers
	pub: 'tcp://127.0.0.1:5002',		// ZMQ port for publications
	sub: 'tcp://127.0.0.1:5003',		// ZMQ port for subscriptions
	heartbeat: 500,						// check workers every 500ms
	liveness: 3,						// deleted a worker that missed 5 heartbeats
	expire: 300000,						// reject service request in not answered in 300000ms
	resubmitEvery: 3,					// resubmit an ununswered request to a workers every 3 heartheats
	debug: 1							// log level 1 (error)
} 

const broker = new Broker(opts);

The same configuration can be using envieronment variable EBUS_BROKER_NAME and EBUS_NAME, the first overriding the second

For example: export EBUS_BROKER_DEBUG=3 would override export EBUS_DEBUG=2, a global ebus debug setting.

The configuration passed-in in the code overrides env variables so do avoid it.

Available events (subscibe with broker.on('eventname')):

  • error(err)
  • start, stop
  • expired(rid) - requet has expired because no worker replied in time, request will be rejected
  • request(service,rid,data) - new request arrived from a client
  • reply(service,rid,data) - a worker responded to a request
  • new worker(service,wid) - a new worker registered
  • disconnect worker (wid) - a worker is deleted from the set of available workes (it disconnected or was disconneted by broker)

Note: a broker will respond to a client request for service SERVICES by returning a list of services provided by connected workers.

Note: a broker will respond to a client request for service WORKERS by returning a list of attached workers as an array of objects { id, service, load } where load is the number of requests currently assigned to this worker.

WORKER

Workers are "servers" in the e-bus arhitecture. Each worker serves exactly one service which must be registered with the borker when a new worker is created.

Every worker must listen for request(rid,data) events, process these requests, and then call worker.respond(rid,result) with the results. If for whatever reason worker cannot respond to a request, it can call worker.reject(rid,result) to notify the client that this request cannot be answered.

const Worker = require('cry-ebus').Worker
const worker = new Worker('hello')

worker.on('request', request)
worker.on('error',(err) => console.error(err))

function request(rid, data) {

  if (!data || !data.name) worker.reject(rid, { answer: 'hello ' + data.name })
  else worker.respond(rid, { answer: 'hello ' + data.name })
}

worker.start()

Checking the on-line status of the worker (if worker is connected to the broker):

  • worker receives online and offline events when status changes
  • check worker.online() (return true is broker is online, false if not)
const Worker = require('cry-ebus').Worker
const worker = new Worker('hello')
 
worker.on('request', request)
worker.on('online',() => console.log('broker is on-line',worker.online()))
worker.on('offline',() => console.log('broker is off-line',worker.online()))

function request(rid, data) {
  worker.respond(rid, {
    answer: 'hello ' + data.name
  })
}
 
worker.start()

Available events: a worker can subscibe to the following events:

  • start, stop
  • online, offline - status of connection to the broker changed
  • request(rid, data) - new workload arriving with id 'rid' and data
  • error(err) - an error occured

Optionally, a worker can receive configuration, here with defaults:

let opts = { 
	broker: 'tcp://localhost:5001',  // broker's ZMQ backend port
  	heartbeat: 500,     // report to worker every 500ms
  	cacheRqMs: 3000,        // cache each calculated result for 3000ms
  	identity: 'some-unique-id',     // worker's identity
  	debug: 1        // default level error (1)
} 

const worker = new Worker('hello', opts)

The same configuration can be set using envieronment variable EBUS_WORKER_NAME and EBUS_NAME, the first overriding the second. e.g. export EBUS_WORKER_DEBUG=3 overrides export EBUS_DEBUG=2.

The configuration passed-in in the code overrides env variables, do avoid it.

CLIENT

Clients consume services by

  • (using events): calling client.request(service,data). When reply arrives, event reply(rid,data) is triggered.
  • (using promises): requesting a promise with client.requestPromise(service,data) and awaiting it.

Client using promises:

// create client that expires ununswered requests in 2s
const Client = require('cry-ebus').Client
const client = new Client({ expires: 2000 })

client.start()

// request two services in parallel
let p = []
p.push(client.requestPromise('hello', { name: 'Ana' }))
p.push(client.requestPromise('hello', { name: 'Barbi' }))

// wait until both services are fulfilled
Promise
	.all(p)
	.then((data) => console.log('ALL', data))
	.catch((...err) => console.log('ERROR', err))

Client using events:

// create client that expires ununswered requests in 2s
const Client = require('cry-ebus').Client
const client = new Client({ expires: 2000 })

// subscibe to events before starting the client
client.on('expired', (rid, service, data) => console.error('expired', rid, service, data))
client.on('reply', (rid, data) => console.log('REPLY', rid, data))
client.start()

// send out two requests
client.request('hello', { name: 'Ana ' })
client.request('hello', { name: 'Barbi' })

// 'repy' event will be triggered twice when answers arrive

A client may request a list of services attached to the broker:

// create client that expires ununswered requests in 2s
const Client = require('cry-ebus').Client
const client = new Client({ expires: 2000 })
 
// subscibe to events before starting the client
client.on('expired', (rid, service, data) => console.error('expired', rid, service, data))
client.on('reply', (rid, data) => console.log('REPLY', rid, data))
client.start()
 
// request a list services attached to the broker
client.brokerServices()

Checking the on-line status of the client (if client is connected to the broker):

  • client receives online and offline events when status changes
  • check client.online() (return true is broker is online, false if not)
const Client = require('cry-ebus').Client
const client = new Client({ expires: 2000 })
 
// subscibe to events before starting the client
client.on('offline', () => console.log('broker is off-line',client.online()))
client.on('online', () => console.log('broker is on-line',client.online()))
client.start()

Available events: a client receives the following events:

  • start, stop
  • error(message)
  • message(data,channel) - subscription messasge received on channel
  • services(services) - broker sent an array of services served by attacked workers requested by client.brokerServices
  • workers(workers) - broker sent an array of workers {id,service,load} requested by client.brokerWorkers

pub/sub notifications

Clients can subscibe to notifications via channels, and can publish notifications to channels. A subscription is matched on the starst of the channel name. A client subscining to channel db/changes/ (just a string) would receive all nofications on channels named db/changes/, db/changes/mytable, db/changes/mytable/12345.

A publishing client

var ebus = require('cry-ebus');

var client = new ebus.Client();

let workload = 30
let sent = 0
let received = 0
let expired = 0
let rejected = 0

client.on('error', (msg) => console.log('error ', msg));

client.start();

var me = (new Date()).getSeconds()

let timer1 = setInterval(() => {
    if (sent < workload) {
        let msg = {
            msg: 'publication A' + me + ' ' + + ++sent
        }
        client.publish('CH A', msg);
        console.log('published on A: ', msg)
    }
}, 100)


let timer2 = setInterval(() => {
    if (sent < workload) {
        let msg = {
            msg: 'publication A.1' + me + ' ' + + ++sent
        }
        client.publish('CH AAA', msg);
        console.log('published on A.1: ', msg)
    }
}, 100)


let timer3 = setInterval(() => {
    if (sent < workload) {
        let msg = 'publication B' + me + ' ' + ++sent
        client.publish('CH B', msg);
        console.log('published on B: ', msg)
    }
}, 100)


let timer4 = setInterval(() => {
    if (sent < workload) {
        let msg = 'publication C' + me + ' ' + ++sent
        client.publish('CH C', msg);
        console.log('published on C: ', msg)
    }
}, 100)


process.on('SIGTERM', function () {
    console.log('sigterm')
    client.stop()
    process.exit(0);
});

process.on('SIGINT', function () {
    console.log('SIGINT')
    client.stop()
    process.exit(0);
});

A subscribing client

var ebus = require('../../lib/ebus.js');

var client = new ebus.Client();

client.on('message', (msg,channel) => console.log('message on channel',channel, msg));
client.on('error', (msg, more) => console.log('error ', msg, more));

client.subscribe('CH A')
client.subscribe('CH B')

client.start();

process.on('SIGTERM', function () {
    console.log('sigterm')
    client.stop()
    process.exit(0);
});

process.on('SIGINT', function () {
    console.log('SIGINT')
    client.stop()
    process.exit(0);
});

A client accepts the following config on creation:

let opts = { broker: 'tcp://localhost:5000',	// broker's frontend ZMQ port
  pub: 'tcp://localhost:5002',  	// broker's publications ZMQ port
  sub: 'tcp://localhost:5003',      // broker's subscriptions ZMQ port
  heartbeat: 500,       // test is broker is present every 500ms
  expires: 300000,      // expire ununswered request in 300000ms, raising 'expired(rid,service,data)' event
  resends: 1000,        // resubmit unanswered request to broker every 1000ms
  debug: '5',       // default debug level (error)
  noHeartbeat: false        // set to true to avoid heartbeating broker (use if same code is both worker and client, to avoid double heartbeat)
} 

var client = new ebus.Client(opts);

The same configuration can be set using envieronment variables EBUS_CLIENT_NAME and EBUS_NAME, the first overriding the second. For example export EBUS_CLIENT_DEBUG=3.

Logging

A global setting export EBUS_DEBUG sets log error (an integer)

  • FATAL = 0
  • ERROR = 1
  • INFO = 2
  • DEBUG = 3
let logs = require('cry-ebus').Utils.Log
let worker = new Worker('hello', { debug: logs.INFO })
0.56.5

3 years ago

0.56.4

3 years ago

0.56.3

3 years ago

0.56.2

3 years ago

0.56.1

3 years ago

0.56.0

3 years ago

0.55.4

3 years ago

0.55.2

4 years ago

0.55.3

4 years ago

0.55.1

4 years ago

0.55.0

4 years ago

0.54.0

4 years ago

0.53.0

4 years ago

0.52.0

4 years ago

0.51.3

4 years ago

0.51.2

4 years ago

0.51.0

4 years ago

0.51.1

4 years ago

0.50.7

5 years ago

0.50.6

5 years ago

0.50.5

5 years ago

0.50.4

5 years ago

0.50.3

5 years ago

0.50.2

5 years ago

0.50.1

5 years ago

0.47.9

5 years ago

0.47.8

5 years ago

0.47.7

5 years ago

0.47.6

5 years ago

0.47.5

5 years ago

0.47.4

5 years ago

0.47.3

5 years ago

0.47.2

5 years ago

0.47.1

5 years ago

0.47.0

5 years ago

0.46.0

5 years ago

0.44.2

5 years ago

0.44.1

5 years ago

0.44.0

5 years ago

0.43.9

5 years ago

0.43.7

5 years ago

0.43.6

5 years ago

0.43.5

5 years ago

0.43.4

5 years ago

0.43.2

5 years ago

0.43.1

5 years ago

0.43.0

5 years ago

0.42.1

5 years ago

0.42.0

5 years ago

0.41.0

5 years ago

0.40.4

5 years ago

0.40.3

5 years ago

0.40.1

5 years ago

0.40.0

5 years ago

0.39.0

5 years ago

0.38.0

5 years ago

0.37.0

5 years ago

0.36.0

5 years ago

0.35.0

5 years ago

0.34.0

5 years ago

0.33.0

5 years ago

0.32.0

5 years ago