0.0.8 • Published 6 years ago

bhb-amqp-connection-manager v0.0.8

Weekly downloads
24
License
-
Repository
-
Last release
6 years ago

bhb-amqp-connection-manager

安装

npm install --save amqplib bhb-amqp-connection-manager@0.0.7
注意:该组件在不断完善中,可能引入比较多不兼容的内容,请锁死版本,按需升级

简介

大头兄弟amqp链接管理 基于amqp-connection-manager 包的功能,做部分业务扩展

需要了解的使用教程 包含

发送信息示例

const rabbitMQ = require('bhb-amqp-connection-manager');

const connection = rabbitMQ.connect(process.env.APP_MQ_URL)
    .on('connect', () => console.log('Connected!'))
    .on('disconnect', params => console.log('Disconnected.', params.err.stack));

const channel = connection.createChannel()
    .addSetup(async function(channel) {
        // `channel` here is a regular amqplib `ConfirmChannel`.
        const prefetch = parseInt(process.env.prefetch) || 1;

        await channel.assertQueue('hello', {durable: false});

        await channel.prefetch(prefetch);
    });
(async function() {
    await channel.sendToQueue('hello', {num: 1});
    await channel.publish('', 'hello', {num: 10001});
    await connection.close();
})();

消费消息示例

if (require('cluster').isMaster) {
    return require('../util/master');
}
const bluebird = require('bluebird');
const commonUtil = require('../util/common');

const rabbitMQ = require('bhb-amqp-connection-manager');

const connection = rabbitMQ.connect(process.env.APP_MQ_URL);

const channel = connection.createChannel()
    .addSetup(async function(channel) {
        // `channel` here is a regular amqplib `ConfirmChannel`.
        const prefetch = parseInt(process.env.prefetch) || 5;

        await channel.assertQueue('hello', {durable: false});

        await channel.prefetch(prefetch);

        await channel.consumerQueue('hello', async function({num}) {
            console.log(`start ${num}`);
            await bluebird.delay(2000);
            console.log(`end ${num}`);
        });
    });

commonUtil.bindGraceExit(async function() {
    await channel.cancelConsumers();
    await channel.waitMessageEmpty();
    await connection.close();
});

版本变更

  • v0.0.3 支持node 4.0版本(babel编译)
  • v0.0.4 移除babel编译,只支持node 6>0
  • v0.0.5 修复json解析错误未捕获的bug。默认忽略推入数据json格式有误的数据,并通过日志输出提醒.
  • v0.0.6 修复当连接后未执行完setup的情况下,channel已经失去连接导致忽略处理,并返回null的bug。改为直接抛出一个错误,中断后续没必要的执行。 引入一个不兼容的默认值处理:当queue声明为durable的时候,如果没设置x-queue-mode的情况下,将默认设置x-queue-mode=lazy, 如果需要自定义,需要明确声明该配置,否则assertQueue时导致配置不一致会抛出错误。
  • v0.0.7 添加 consumerQueueUseRetry 便捷方法 相比 consumerQueue增加一个option参数 option.count (可选) 设置重试次数 option.failureQueue (可选) 设置失败后推入的队列,如果没设置,则自动创建一个队列,名字为:failure.${queueName} ,持久缓存 option.delay (可选) 设置重试频率的函数 ,设置该值时 count 不生效。 当count和delay都不设置的时候,使用amqplib-retry 的默认行为
           (attempts) => {
             const delay = Math.pow(2, attempts)
             if (delay > 60 * 60 * 24) {
               // the delay for the message is longer than 24 hours.  Fail the message and never retry again.
               return -1
             }
             return delay * 1000
           }
- v0.0.8
    改写mq默认的失败重试机制

    新增setConfig方法,目前可配置项:

    ```
        {
            DINGDING_HOST:'',//失败钉钉通知
            DEAD_LETTER_TTLs:[4,20,100]//失败重试间隔 单位 秒
        }
    ```