3.0.0 • Published 6 years ago
febs-message v3.0.0
message 库封装了消息队列, 将复杂的调用封装成
rpc
: 有返回值的双向通信, 只有一个接收者能接收到消息subscribe
: 无返回值的订阅发布消息, 所有订阅者都能接收到消息.
目前底层使用 rabbitmq
, 系统初始化完成,如果中途发生断线将会自动重连
Example
//
// producer
//
var mq = require('febs-message');
/**
* @desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){
console.error(msg);
});
/**
* @desc: 初始化rpc通信.
*/
mq.rpc.init({
url: 'amqp://xxx',
heartbeat: 10, // in seconds.
reconnect: 10000, // 连接失败后多长时间重连.
rpcTimout: 5000, // rpc等待返回消息的超时.
persistent: false, // 是否持久消息.
registerPublisher: true,
registerSubscriber: false,
// devSingle: ['live'],
},
null // 不接收消息.
).then(()=>{
console.log("Connect Success");
// 发送消息.
mq.rpc.request({handler:"handle1", recvSys:'live'}, {message:'hello', data:1})
.then(ret=>{
console.log('return message: ');
console.log(ret);
});
});
//
// customer
//
var mq = require('febs-message');
/**
* @desc: 消息处理方法
*/
async function handle1(msg) {
console.log('handle1:');
console.log(msg);
return {err:msg.data}; // return to sender.
}
async function handle2(msg) {
console.log('handle2:');
console.log(msg);
return {err:msg.data};
}
/**
* @desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){
console.error(msg);
});
/**
* @desc: 初始化rpc通信.
*/
mq.rpc.init({
url: 'amqp://xxx',
heartbeat: 10, // in seconds.
reconnect: 10000, // 连接失败后多长时间重连.
rpcTimout: 5000, // rpc等待返回消息的超时.
persistent: false, // 是否持久消息.
registerSubscriber: true,
// devSingle: ['live'],
},
'live', // 接收发送给live系统的消息.
'live2' // 接收发送给live系统的消息.
).then(()=>{
console.log('Connect success');
// 绑定消息处理方法.
mq.rpc.bind('handle1', handle1);
mq.rpc.bind('handle2', handle2);
});
Error
系统错误使用 mq.Error
实例抛出异常.
全局配置
/**
* @desc: 初始化系统.
* @param errorLogCB: 错误log回调. function(msg, filename, line)
*/
function init(errorLogCB)
- 全局初始化后, 对需要用到的rpc, subscribe类型的消息队列再进行各自的初始化.
- 在开发模式下, 可以进行本机调试而不影响服务器上的服务; 例如: 存在一个名为
sysMain
的系统, 开发者使用sysMain_xxx
系统名称 进行注册, 在api层进行url的处理, 如,/?sys=xxx
, 将消息发送给sysMain_xxx
系统从而实现本机调试;
rpc
RPC 消息是远程调用消息, 只有一个消费者能接收到消息, 并且有返回值返回给生产者.
/**
* @desc: 连接消息队列.
* - 使用direct模式,只有一个阅者都能接收到消息.
* @param opt: { // 全局唯一配置, 只有第一次调用有效.
url: 'amqp://xxxxx',
heartbeat: 10, // in seconds.
reconnect: 10000, // 连接失败后多长时间重连.
persistent: false, // 是否持久消息.
registerPublisher: false, // 是否注册发布者.
registerSubscriber: false, // 是否注册订阅者.
errHandleCB: function(e, handleName, recvData:string):data; // 消息处理的错误处理函数. 返回对象将反馈给发送方.
beforeHandleCB: function(requestData:any):any; // 消息处理前的回调. 返回null则正常处理, 否则将返回数据返回给rpc.
beforeRequestCB: function(requestData:any):boolean; // 消息发送前的统一处理, 返回false则不进行发送.
beforeResponseCB: function(requestData:any, responseData:any); // 消息返回前的统一处理.
beforeReturnCB: function(recvData:any); // 通过request接口发送消息后, 消息已经通过网络接收到, 方法返回前的处理.
devSingle: false, // 在单包模式下开发, 此模式下不使用真实的消息队列, 而在接口一致的情况下使用本地缓存进行开发.
}
* @param recvFromSys: 接收哪些系统的消息,使用系统的名称 sys.main, ...
* 如果不指定, 则不接收消息.
* @return: throw in err.
*/
function init(opt:object, ...recvFromSys:string[])
/**
* @desc: 注册消息处理器. 如果在处理消息的过程中发生了异常, 则会调用errHandleCB.
* @param handleName: 处理器名称.
* @param handle: async function(jsonData):data; 返回data给sender.
* @return:
*/
function bind(handleName:string, handle:func)
/**
* @desc: 发送消息.
* @param receiver:
{
handler: '', // 消息处理器.
recvSys: '', // 接收的系统.
}
* @param data: (json). 需要发送的数据.
* @return: Promise.
- resolve(msg)
- catch('timeout')
*/
async function request(receiver, data)
subscribe
sbscribe 消息是订阅消息, 所有订阅者都能接收到消息, 无返回值返回给生产者.
/**
* @desc: 连接消息队列.
* - 使用subscribe模式,所有的订阅者都能接收到消息.
* @param opt: { // 全局唯一配置, 只有第一次调用有效.
url: 'amqp://xxxxx',
heartbeat: 10, // in seconds.
reconnect: 10000, // 连接失败后多长时间重连.
persistent: false, // 是否持久消息.
registerPublisher: false, // 是否注册发布者.
registerSubscriber: false, // 是否注册订阅者.
errHandleCB: function(e, handleName, recvData:string):void; // 消息处理的错误处理函数. 返回true则会从队列中移除消息.
beforeHandleCB: function(requestData:any); // 消息处理前的回调.
beforeRequestCB: function(requestData:any):boolean; // 消息发送前的统一处理, 返回false则不进行发送.
devSingle: false, // 在单包模式下开发, 此模式下不使用真实的消息队列, 而在接口一致的情况下使用本地缓存进行开发.
}
* @param recvFromSys: 接收哪些系统的消息,使用系统的名称 sys.main, ...
* 如果不指定, 则不接收消息.
* @return: Promise
*/
function init(opt:object, ...recvFromSys:string[])
/**
* @desc: 注册消息处理器.
* @param handleName: 处理器名称.
* @param handle: function(jsonData)
* @return:
*/
function bind(handleName:string, handle:func)
/**
* @desc: 发布消息.
* @param receiver:
{
handler: '', // 消息处理器.
recvSys: '', // 接收的系统.
}
* @param data: (json). 需要发送的数据.
* @return: boolean.
*/
async function publish(receiver, data)
devSingle example
单包开发模式下, 可以将多个系统统一个入口启动 (单实例), 方便调试.
rpc client.
File: client.js
var mq = require('febs-message');
/**
* @desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){ // log处理方法
console.log(msg);
});
/**
* @desc: 初始化rpc通信.
*/
mq.rpc.init({
url: 'amqp://xxxx',
heartbeat: 10, // in seconds.
reconnect: 10000, // 连接失败后多长时间重连.
rpcTimout: 5000, // rpc等待返回消息的超时.
persistent: false, // 是否持久消息.
registerPublisher: true,
registerSubscriber: false,
devSingle: ['sysname'],
},
null // 不接收消息.
).then(()=>{
console.log("OK");
// 发送消息.
mq.rpc.request({handler:"handle1", recvSys:'sysname'}, {message:'hello', data:1})
.then(ret=>{
console.log('return message: ');
console.log(ret);
});
});
rpc server.
File: server.js
var mq = require('febs-message');
/**
* @desc: 消息处理方法
*/
async function handle1(msg) {
console.log('handle1:');
console.log(msg);
return {err:msg.data}; // return to sender.
}
async function handle2(msg) {
console.log('handle2:');
console.log(msg);
return {err:msg.data};
}
/**
* @desc: 初始化消息队列.
*/
mq.init(function(msg, filename, line){
console.error(msg);
});
/**
* @desc: 初始化rpc通信.
*/
mq.rpc.init({
url: 'amqp://xxxx',
heartbeat: 10, // in seconds.
reconnect: 10000, // 连接失败后多长时间重连.
rpcTimout: 5000, // rpc等待返回消息的超时.
persistent: false, // 是否持久消息.
registerSubscriber: true,
devSingle: ['sysname'],
}, 'sysname' // 接收发送给live系统的消息.
)
.then(()=>{
console.log('ok');
// 绑定消息处理方法.
mq.rpc.bind('handle1', handle1);
mq.rpc.bind('handle2', handle2);
require('./client');
});
仅启动 server.js
进程.