0.9.0 • Published 5 months ago

nstarter-rabbitmq v0.9.0

Weekly downloads
-
License
MIT
Repository
github
Last release
5 months ago

RabbitMQ 队列

使用说明

AMQP 链接

import AmqpConnector from 'nstarter-rabbitmq';
const amqp = AmqpConnector({
    user: 'user',
    password: 'password',
    brokers: [
        {
            host: '127.0.0.1',
            port: 5672
        }
    ],
    heartbeatIntervalInSeconds: 60,
    reconnectTimeInSeconds: 1
});

队列启动

import { IQueueConfig, queueFactory, IRabbitMqMessage } from 'nstarter-rabbitmq';

export interface IDemoMessage extends IRabbitMqMessage {
    value: string;
}

// 队列配置
const queueConfig: IQueueConfig = {
    name: 'demo:normal',
    prefetch: 2,
    maxLength: 10000,
};

export const demo_queue = queueFactory<IDemoMessage>(amqp, queueConfig);

生产者,向队列发消息

import { IProduceOptions, queueProducerFactory } from 'nstarter-rabbitmq';
import { demo_queue, IDemoMessage } from './queue';

/**
 * 增量同步延迟队列 生产者
 */
const produceOption: Partial<IProduceOptions> = {};

export const producer = queueProducerFactory<IDemoMessage>(demo_queue, produceOption);

// 启动生产者
producer.setup().then();
// 发送消息
producer
    .publish({ value: 'demo:normal' }, { mandatory: true, deliveryMode: true, persistent: true })
    .then(_.noop)
    .catch((err: Error) => console.log(err));

消费者,向队列订阅消息

import { AckPolicy, queueConsumerFactory, RetryMethod, IConsumerConfig, startQueueConsumers } from 'nstarter-rabbitmq';
import { queue, IDemoMessage } from'./queue';

const consumerConfig: IConsumerConfig<IDemoMessage> = {
    retryMethod: RetryMethod.republish,
    ackPolicy: AckPolicy.after,
    consumeTimeout: 10000, // 10s
    run(message): Promise<void> {
        const demoMessage: IDemoMessage = message.content;
        console.log(demoMessage);            
    }
};

export const consumer = queueConsumerFactory<IDemoMessage>(queue, consumerConfig);

// 注册队列消费者
consumer.register();
// 启动队列消费者
startQueueConsumers().then();

RabbitMqQueue

参数名类型参数说明
amqpAmqpConnectManagerRabbitMQ 链接管理
queueIQueueConfig队列配置

RabbitMqQueue#waitForSetup(): Promise

等待链接初始化完成。

RabbitMqQueue#close(): Promise

关闭链接。

RabbitMqQueue#subscribe(messageHandler: IMessageHandler, options: Consume): Promise

Push 模式,客户端订阅队列消息,消息由服务端“推送”给客户端。

参数名类型参数说明
messageHandlerIMessageHandler<T>消息处理逻辑
optionsobject参数配置
options.exclusiveboolean是否启用匿名队列订阅,服务端分配一个匿名队列,断开链接后自动删除

RabbitMqQueue#publish(content: IQueuePayload, options: Publish): Promise

Confirm 模式,将消息内容发送到 RabbitMQ 中的 Exchange,确保消息准确被添加到队列,且持久化保存后返回。消息分发规则由 routingKeyexchange 规则确定。

参数名类型参数说明
contentany消息内容
optionsIProducerConfig<T>消息参数, 参考 RabbitMqProducer 说明

RabbitMqQueue#ack(message: IQueueMessage, allUpTo?: boolean): Promise

确认消息消费,RabbitMQ 会将对应的消息删除。allUpTotrue,会将该消息之前的所有消息均 ack 掉。

RabbitMqQueue#nack(message: IQueueMessage, allUpTo?: boolean, requeue?: boolean): Promise

RabbitMQ 会“拿回”该消息的。requeuetrue 会重新将该消息放回队列,否则丢弃该消息。

RabbitMqProducer

参数名类型参数说明
queueRabbitMqQueue<T>队列对象
optionsIProducerConfig<T>消息参数
options.headersIProduceHeaders消息生产者 headers
options.priorityPriority消息优先级,高优先级先分发消费
options.pushRetryTimesnumber消息发送时,本地重试次数

RabbitMqProducer#setup(): Promise

队列生产者启动方法。

RabbitMqProducer#publish(content: IQueuePayload, options: Publish): Promise

此方法带本地重试机制。参数内容同 RabbitMqQueue#publish(content, options)

RabbitMqConsumer

参数名类型参数说明
queueRabbitMqQueue<T>队列对象
optionsIConsumerConfig<T>消费者参数
options.retryTimesnumber重试次数
options.retryDelayDelayLevel重试延时等级
options.retryMethodRetryMethod重试策略,RetryMethod.retry 本地重试,RetryMethod.republish 重新发布到队列
options.timeoutnumber消息消费超时时间,从消息生产开始算,republish 会刷新时间
options.run()(message: IQueueMessage<T>): Promise<void>消息消费逻辑

RabbitMqConsumer#start(): Promise

启动消费者, 执行任务订阅。

RabbitMqConsumer#stop(): Promise

停止消费者执行。

0.9.0

5 months ago

0.8.0

6 months ago

0.7.1

9 months ago

0.6.5

1 year ago

0.6.3

1 year ago

0.6.1

2 years ago

0.6.0

2 years ago

0.5.0

2 years ago

0.3.0

3 years ago

0.4.1

3 years ago

0.4.0

3 years ago

0.4.2

3 years ago

0.2.8

3 years ago

0.2.7

4 years ago

0.2.6

5 years ago

0.2.5

5 years ago

0.2.4

5 years ago

0.2.3-beta

5 years ago

0.2.2-beta

5 years ago

0.2.1-beta

5 years ago

0.2.0-beta

5 years ago

0.1.6-beta

5 years ago

0.1.5-beta

5 years ago

0.1.4-beta

5 years ago

0.1.3-beta

5 years ago

0.1.2-beta

5 years ago

0.1.1-beta

5 years ago

0.1.0-beta

5 years ago