1.1.3 • Published 8 years ago
zmmq v1.1.3
ZMMQ
使用mongoose实现的消息队列。
特性
- 优先级
- 延时队列
- 父子队列
安装
npm install zmmq介绍
zmmq使用mongodb进行消息存储,各客户端使用轮询的方式进行队列中消息的读取。
node环境
需要开启async/await。
开始使用
var zmmq=require('zmmq');
var queue=new zmmq(url, name,options);url: mongodb的连接字符串,示例值:mongodb://localhost/mq。name: 队列名称,示例值:test。options.done_mode:消息完成的处理方式,可选值:DEL/SUCCESS,删除消息和标识为已完成,默认值为:SUCCESS。
发布一个消息
var msg_id=await queue.push(msg,options,queue)push方法返回一个消息id
参数描述
msg:发布到队列中的消息,支持javascript对象,消息会转换为JSON字符串存储到mongodb。options:发布选项,详细描述见后表。默认值:null。queue:发布到的队列,默认发布到当前队列。
返回值
发布成功返回true,发布失败抛出错误。
读取一个消息
var msg=await queue.pull(ms);参数描述
ms:当指定该值后,若没有读取到消息,将等待ms毫秒后重新尝试读取一个消息,直到读取到一个消息。若不指定该值,没有读取到消息将返回undefined
返回值
若有返回值,将返回一个消息对象。
{
id: ObjectId,
body: Object,
done: function,
groupid:String,
level:Number,
tag:String
}id:消息idbody:消息内容done:消息确认函数groupid:消息分组,可能为空。level:消息优先级。tag:消息标签。
确认一个消息
确认一个消息有两种方式:
- 通过消息的
done方法确认。
await msg.done();- 通过zmmq实例的
done方法
await queue.done(id);id:需要确认的消息id
事务
当有一批消息需要发布时,可以使用事务保证所有消息都发布成功或失败。
创建一个事务
var trans=await queue.Begin();发布消息
var msg_id1=await trans.push(msg,options,queue);
var msg_id2=await trans.push(msg,options,queue);
//...
var msg_idn=await trans.push(msg,options,queue);
await trans.Commit();参数
与zmmq实例参数一致。
确认消息
当需要发布消息与确认消息同时成功时,使用事务进行管理。
await trans.push(msg,options,queue);
await trans.done(id, queue)
await trans.Commit();参数
id:确认消费的消息id。queue:消息所在队列,默认取zmmq实例所在队列。
回滚
await trans.Rollback();setTag
用于设置当前zmmq实例的tag属性,设置了tag属性的实例在pull时,优先处理tag相同的消息。
queue.setTag("xyz");消息和事务超时
消息处理超时
当消息处理超时时,需要重新设置消息状态,便于下一次处理。
await zmmq.Restore(options)参数
- {string} options.connstring - 连接字符串
- {Number} options.timeout - 消息处理超时时间,单位分钟,默认10分钟
- {string[]} options.queues - 需要处理超时的队列,必填
- {Number} options.interval - 每轮的间隔时间,单位毫秒,默认60000
事务超时
以超时的方式监控异常的事务,并调对数据进行恢复。
var trans=require('zmmq/lib/transaction');
await trans.Restore(options);参数
- {string} options.connstring - 连接字符串
- {Number} options.timeout - 事务超时时间,单位分钟,默认10分钟
- {string} options.trans - 存储事务的集合名称,默认_trans
- {Number} options.interval - 每轮的间隔时间,单位毫秒,默认60000
父子队列
父子队列仅在事务中实现。
什么是父子队列
具有父子关系的队列,只有当子队列中的所有任务都完成后,父队列中相关的消息才会出列被执行。
通常情况下,一个父队列具有多个子队列。
发布消息
由于需要同时发布多个消息到队列,需要使用事务的特性。
在发布是,通过options参数指定消息的父队列关系
//发布消息到父队列
let fid=trans.push(msg,options,"father");
//布消息到子队列,并关联父队列中的消息
await trans.push(msg,{fahter:"father:"+fid},queue);
await trans.push(msg,{fahter:"father:"+fid},queue);
await trans.push(msg,{fahter:"father:"+fid},queue);
await trans.Commit();确认消息
和普通消息确认相同。
options
该参数在发布消息时进行设置,指定消息的属性,可指定的属性如下:
level:消息优先级,值越大优先级越高。groupid:消息分组id,可不指定。start:消息什么时候才允许处理,值为unix时间戳,当当前时间大于该值时,消息才会出队。tag:消息tag,当zmmq实例指定tag属性时,优先处理tag匹配的消息。
以下属性仅对父子队列有效,建议在事务中使用:
father:父队列信息,格式为:{queue}:{id},queue为父队列名称,id为父队列中消息对应的消息id。