0.1.0 • Published 1 year ago

@hongfangze/mq v0.1.0

Weekly downloads
-
License
MIT
Repository
-
Last release
1 year ago

@hongfangze/mq 消息队列

介绍

消息队列的使用,目前支持:RabbitMQ,后期增加其他MQ的支持。

开始使用:

npm install @hongfangze/mq

import { AMQP } from '@hongfangze/mq';
import { RabbitMQOptions } from '@hongfangze/mq/OptionsDefine';

RabbirMQ API

/**
     * 发布一个数据到指定的队列
     * @param {string} queueName 队列名称
     * @param {string} msg 需要添加到队列的数据
     * @return {*}  {Promise<void>}
     * @memberof IRabbitMQ
     */
    publish(queueName: string, msg: string): Promise<void>;

    /**
     * 订阅一个指定的队列
     * @param {Function} handle 一个async函数,参数为当前的消息
     * @param {string} queueName 队列名称
     * @return {*}  {Promise<string>} 一个GUID标识符,用于取消订阅
     * @memberof IRabbitMQ
     */
    subscription(handle: Function, queueName: string): Promise<string>;

    /**
     * 取消订阅一个指定的队列
     * @param {string} uuid 一个GUID标识符
     * @return {*}  {Promise<void>}
     * @memberof IRabbitMQ
     */
    unsubscription(uuid: string): Promise<void>;

    /**
     * 发送通知数据(一般用于基础数据维护项目的基础数据变化后,通知需要接收变化的项目)
     * @param {string} msg 通知的消息
     * @param {string} exchangeName 交换机名称
     * @param {string} routingKey 路由键,#代表任意
     * @return {*}  {Promise<{ result: boolean, msg?: string }>} result:成功或失败,msg:失败原因
     * @memberof IRabbitMQ
     */
    sendNotify(exchangeName: string, routingKey: string, msg: string): Promise<{ result: boolean, msg?: string }>;

    /**
     * 接收通知数据
     * @param {Function} handle 一个async函数,参数为当前的消息
     * @param {string} exchangeName 交换机名称
     * @param {string} routingKey 路由键,#代表任意
     * @param {string} queueName 队列名称,可以不传使用非持久队列
     * @return {*}  {Promise<string>} 一个GUID标识符,用于停止接收
     * @memberof IRabbitMQ
     */
    receiveNotify(handle: Function, exchangeName: string, routingKey: string, queueName?: string): Promise<string>;

    /**
     * 停止接收通知数据
     * @param {*} uuid 一个GUID标识符
     * @return {*}  {Promise<void>}
     * @memberof IRabbitMQ
     */
    unreceiveNotify(uuid: string): Promise<void>;

RabbitMQ 发布/订阅Demo

const rabbitmqConf: RabbitMQOptions = {
    hostname: '10.2.103.36',
    username: "guest",
    password: 'guest',
};

const pub = async () => {
    const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
    for (let i = 0; i < 10; i++) {
        console.log(`pub:${i}`);
        await rabbitmq.publish("q", i.toString());
        await sleep(100);
    }
}

const sub1 = async () => {
    const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
    let i = 1;
    const sub = await rabbitmq.subscription((async (msg) => {
        console.log(`sub1收到第${i}次消息:`, msg);
        await sleep(500);
        i++;
    }), "q");
    // setTimeout(async () => {
    //     await rabbitmq.unsubscription(sub);
    // }, 1000);
}

const sub2 = async () => {
    const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
    let i = 1;
    const sub = await rabbitmq.subscription((async (msg) => {
        console.log(`sub2收到第${i}次消息:`, msg);
        await sleep(1000);
        i++;
    }), "q");
    // setTimeout(async () => {
    //     await rabbitmq.unsubscription(sub);
    // }, 1000);
}

export default async () => {
    sub1();
    sub2();
    pub();
}

RabbitMQ 通知Demo

 const rabbitmqConf: RabbitMQOptions = {
    hostname: '10.2.103.36',
    username: "guest",
    password: 'guest',
};

// 模拟基础数据的生产者,当基础数据发生改变时,进行广播
// 且每个客户端的处理时长不一致,防止并发处理而采取的单个订阅模式
// 假设base为基础数据生产者
// product1需要监听user的变化
// product2需要监听role的变化
// product3需要监听user和role的变化

const company = "hongfangze";
const modules = ["user", "role"];
const oper = ["insert", "update", "delete"];

const sendNotify = async () => {
    const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
    for (let i = 0; i < 10; i++) {
        const _module = modules[getRandomNum(0, modules.length - 1)];
        const data = {
            id: i + 1,
            text: `${_module}-${getRandomStr()}`,
            oper: oper[getRandomNum(0, oper.length - 1)]
        };
        const routingKey = `${company}.${_module}`;
        console.log(`broadcast:${routingKey}:`, data);
        await rabbitmq.sendNotify(company, "userProduct", _module, JSON.stringify(data));
        await sleep(100);
    }
}

const product1 = async () => {
    const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
    const notify = await rabbitmq.receiveNotify((async (msg) => {
        console.log(`product1收到消息:`, msg);
        await sleep(500);
    }), "product1", company, "userProduct", "user");
    // setTimeout(async () => {
    //     await rabbitmq.unreceiveNotify(notify);
    // }, 1000);
}

const product2 = async () => {
    const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
    const notify = await rabbitmq.receiveNotify((async (msg) => {
        console.log(`product2收到消息:`, msg);
        await sleep(1000);
    }), "product2", company, "userProduct", "user");
    // setTimeout(async () => {
    //     await rabbitmq.unreceiveNotify(notify);
    // }, 1000);
}

const product3 = async () => {
    const rabbitmq = new AMQP(rabbitmqConf).RabbitMQ;
    const notify = await rabbitmq.receiveNotify((async (msg) => {
        console.log(`product3收到消息:`, msg);
        await sleep(1500);
    }), "product3", company, "userProduct", null);
    // setTimeout(async () => {
    //     await rabbitmq.unreceiveNotify(notify);
    // }, 1000);
}

export default async () => {
    await product1();
    await product2();
    await product3();

    sendNotify();
}

版本迭代记录

2024-05-30 v0.0.3

  • 基础的发布订阅逻辑实现。
  • 消息通知逻辑实现。

2024-07-10 v0.1.0

  • sendNotify、receiveNotify这2个函数重构。
0.1.0

1 year ago

0.0.3

1 year ago

0.0.2

1 year ago

0.0.1

1 year ago