rabrpc v2.1.1
RabRPC
Yet another opinionated RPC library based on RabbitMQ (through rabbot)
Features
- Promise-based interface (thanks rabbot)
- Convention over configuration in exchange, queue, routingKeys naming
Implemented producer/consumer patterns:
- Request / Response
- Publish / Subscribe
- Send / Receive
Installation
npm install --save rabrpc
# or
yarn add rabrpcExamples
Initialization
Important!
rpc.configureshould be called after binding handlers viarpc.respondin consumer microservice and must be called before requesting data withrpc.requestin provider microservice
rpc.configure(config, transformConfig = true)
config- rabrpc or rabbot config objecttransformConfig- ifconfigis a rabbot settingtransformConfigmust be false, default true
rabbot json configuration
If you need more flexibility, you can pass a valid rabbot configuration into rpc.configure
The only requirement is name exchanges, queues and bindings with convention
const config = {
connection: {
user: 'guest',
pass: 'guest',
server: '127.0.0.1',
// server: "127.0.0.1, 194.66.82.11",
// server: ["127.0.0.1", "194.66.82.11"],
port: 5672,
timeout: 2000,
vhost: '%2fmyhost'
},
exchanges: [
{ name: 'config-ex.1', type: 'fanout', publishTimeout: 1000 },
{
name: 'config-ex.2',
type: 'topic',
alternate: 'alternate-ex.2',
persistent: true
},
{ name: 'dead-letter-ex.2', type: 'fanout' }
],
queues: [
{ name: 'config-q.1', limit: 100, queueLimit: 1000 },
{ name: 'config-q.2', subscribe: true, deadLetter: 'dead-letter-ex.2' }
],
bindings: [
{ exchange: 'config-ex.1', target: 'config-q.1', keys: ['bob', 'fred'] },
{ exchange: 'config-ex.2', target: 'config-q.2', keys: 'test1' }
]
}
rpc.configure(config, false) // transform config = falseRequest / Response
Responder initialization
const rpc = require('rabrpc') // singleton
const config = {
// uri
connection: 'amqp://guest:guest@localhost:5672/?heartbeat=10',
// or object passed to rabbot see https://github.com/arobson/rabbot#configuration-via-json
// connection: {user: 'guest', pass: 'guest', server: 'localhost', ...}
// respond configuration in consumer microservice
// this config will create exchange(s), queue(s) and binding(s)
// request configation in provider microservice only create exchange(s)
res: {
// string or object or array of strings or objects
serviceName: 'foo-service-name',
// rabbot queue options, see https://github.com/arobson/rabbot#addqueue-queuename-options-connectionname-
// subscribe: true is default
messageTtl: 30000,
limit: 10
// ...etc
}
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promiseRequester initialization
const rpc = require('rabrpc') // singleton
const config = {
connection: '<URI string>', // see above
// requesting resource configuration
// this config will create only exchange(s)
// respond configuration in consumer microservice will create queue(s) and binding(s)
// req: 'foo-service-name' | ['foo-service-name', 'bar-service-name'] | {serviceName: 'foo-service-name'} | [{serviceName: 'foo-service-name'}, {serviceName: 'bar-service-name'}]
req: 'foo-service-name'
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promiseConvention
| Parmeter | Value | Example |
|---|---|---|
| exchange | req-res.serviceName | req-res.foo-service-name |
| queue | req-res.serviceName | req-res.foo-service-name |
| routingKey | serviceName | foo-service-name |
| messageType | version.serviceName.action | v1.foo-service-name.someAction |
Response
rpc.respond(messageType, handler, raw = false)
messageType- full path for service action, e.g.'v1.images.resize'or'v1.users.role.findAll'where second part (images,users) is a serviceName specified in config (in rabbot using as type of message)handler- function, which takespayloadormessage,responseActionsandmessageTypepayloadormessage-messageifrawistrueotherwisemessage.bodyresponseActions- object with 3 functionssuccess,fail,errormessageType- type of rabbot message (usefull when listening for types, which contain*or#)
raw- iftruethen first argument forhandlerwill be rabbotmessageinsteadmessage.bodyby default
Example
const rpc = require('rabrpc')
// before initialization
rpc.respond('v1.foo-service-name.someAction', (payload, actions, messageType) =>
actions.success(payload * 2)
)
// handler can aslo just return promise, or `.then`able or value and result will be replied with success status
// exception or rejected promise will cause replying error (be sure throw `Error` with message)
rpc.respond('v1.foo-service-name.anotherAction', payload =>
SomeDB.query({
/* ... */
}).then(rows => ({ count: rows.count, data: rows }))
)
rpc.respond('v1.foo-service-name.thirdAction', payload => payload * 2)
rpc.respond(
'v1.foo-service-name.someResource.*',
(payload, actions, messageType) => {
const [version, serviceName, resource, actionName] = messageType.split('.')
switch (actionName) {
case 'find':
return Resource.findAll(paylaod)
case 'create':
return Resource.create(payload)
case 'destroy':
return Resource.destroy(payload)
default:
throw new Error(`Action '${actionName}' is not supported!`)
}
}
)
// in your service initialization cycle
rpc.configure(config)Request
rpc.request(messageType, payload, options, raw = false)
messageType- seerpc.respondmessageType argumentpayload- payload data, which will passed into respond handler (see supported payload)options- rabbot request options (will be merged with defaults:{replyTimeout: 10000})raw- resolve rabbot replymessageinstead ofmessage.body
returns Promise, which resolved with body (or message if raw is true)
bodyormessage
Example
const rpc = require('rabrpc')
// request allowed only after initialization
rpc
.configure(config)
.then(() => rpc.request('v1.foo-service-name.someAction', 42))
.then(body => {
console.log('response:', body.data) // body = {status: 'succes', data: 84}
})Publish / Subscribe
Subscriber initialization
const rpc = require('rabrpc') // singleton
const config = {
connection: 'amqp://guest:guest@localhost:5672/?heartbeat=10',
sub: {
// string or object or array of strings or objects
serviceName: 'foo-service-name',
limit: 10
// ...etc
}
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promisePublisher initialization
const rpc = require('rabrpc') // singleton
const config = {
connection: '<URI string>', // see above
// pub: 'foo-service-name' | ['foo-service-name', 'bar-service-name'] | {serviceName: 'foo-service-name'} | [{serviceName: 'foo-service-name'}, {serviceName: 'bar-service-name'}]
pub: 'foo-service-name'
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promiseConvention
| Parmeter | Value | Example |
|---|---|---|
| exchange | pub-sub.serviceName | pub-sub.foo-service-name |
| queue | pub-sub.serviceName.uuid4 | pub-sub.foo-service-name.110ec58a-a0f2-4ac4-8393-c866d813b8d1 |
| routingKey | serviceName | foo-service-name |
| messageType | version.serviceName.action | v1.foo-service-name.someAction |
Subscribe
rpc.subscribe(messageType, handler, raw = false)
messageType- full path for service action, e.g.'v1.images.archive'or'v1.statistics.synchronize'where second part (images,statistics) is a serviceName specified in config (in rabbot using as type of message)handler- function, which takespayloadormessage,actionsandmessageTypepayloadormessage-messageifrawistrueotherwisemessage.bodymessageType- type of rabbot message (usefull when listening for types, which contain*or#)
raw- iftruethen first argument forhandlerwill be rabbotmessageinsteadmessage.bodyby default
Example
const rpc = require('rabrpc')
// before initialization
rpc.subscribe(
'v1.foo-service-name.someAction',
(payload, actions, messageType) => {}
)
// always auto ack
// in your service initialization cycle
rpc.configure(config)Publish
rpc.publish(messageType, payload, options)
messageType- seerpc.respondmessageType argumentpayload- payload data, which will passed into respond handler (see supported payload)options- rabbot publish options (will be merged with defaults:{replyTimeout: 10000})
returns rabbot publish Promise (see Rabbot Publish)
Example
const rpc = require('rabrpc')
// publish allowed only after initialization
rpc
.configure(config)
.then(() => rpc.publish('v1.foo-service-name.someAction', 42))
.then(() => {
console.log('Message published')
})Send / Receive
Receiver initialization
const rpc = require('rabrpc') // singleton
const config = {
connection: 'amqp://guest:guest@localhost:5672/?heartbeat=10',
recv: {
// string or object or array of strings or objects
serviceName: 'foo-service-name',
messageTtl: 30000,
limit: 10
// ...etc
}
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promiseSender initialization
const rpc = require('rabrpc') // singleton
const config = {
connection: '<URI string>', // see above
// send: 'foo-service-name' | ['foo-service-name', 'bar-service-name'] | {serviceName: 'foo-service-name'} | [{serviceName: 'foo-service-name'}, {serviceName: 'bar-service-name'}]
send: 'foo-service-name'
}
// somewhere in your microservice initialization cycle
rpc.configure(config) // returns promiseConvention
| Parmeter | Value | Example |
|---|---|---|
| exchange | send-recv.serviceName | send-recv.foo-service-name |
| queue | send-recv.serviceName | send-recv.foo-service-name |
| routingKey | serviceName | foo-service-name |
| messageType | version.serviceName.action | v1.foo-service-name.someAction |
Receive
rpc.receive(messageType, handler, raw = false)
messageType- full path for service action, e.g.'v1.images.archive'or'v1.statistics.synchronize'where second part (images,statistics) is a serviceName specified in config (in rabbot using as type of message)handler- function, which takespayload,actionsandmessageTypepayloadormessage-messageifrawistrueotherwisemessage.bodyactions- object with 3 functionsack,nack,reject(see Rabbot Message API)messageType- type of rabbot message (usefull when listening for types, which contain*or#)
raw- iftruethen first argument forhandlerwill be rabbotmessageinsteadmessage.bodyby default
Example
const rpc = require('rabrpc')
// before initialization
rpc.receive('v1.foo-service-name.someAction', (payload, actions, messageType) =>
actions.ack()
)
// handler can aslo just return promise, or `.then`able or value and message will be ack'ed on promise resolution
// exception or rejected promise will cause nack'ing message
rpc.receive('v1.foo-service-name.anotherAction', payload =>
SomeDB.query({
/* ... */
})
) // auto ack
rpc.receive(
'v1.foo-service-name.someResource.*',
(payload, actions, messageType) => {
// you can manually ack message if you don't need default behaviuor
actions.ack() // DO NOT RETURNS PROMISE
const [version, serviceName, resource, actionName] = messageType.split('.')
switch (actionName) {
case 'find':
return Resource.findAll(paylaod)
case 'create':
return Resource.create(payload)
case 'destroy':
return Resource.destroy(payload)
default:
throw new Error(`Action '${actionName}' is not supported!`) // will not produce nack call
}
}
)
// in your service initialization cycle
rpc.configure(config)Send
rpc.send(messageType, payload, options)
messageType- seerpc.respondmessageType argumentpayload- payload data, which will passed into respond handler (see supported payload)options- rabbot publish options (will be merged with defaults:{replyTimeout: 10000})
returns rabbot publish Promise (see Rabbot Publish)
Example
const rpc = require('rabrpc')
// send allowed only after initialization
rpc
.configure(config)
.then(() => rpc.send('v1.foo-service-name.someAction', 42))
.then(() => {
console.log('Message sended')
})Graceful shutdown
rpc.stopSubscription()
remove all handlers, unsubscribe from queues
Supported payload
stringnumbernull- JSON serializable
Object Buffer
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
9 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago