5.0.0 • Published 4 months ago

@goparrot/pubsub-event-bus v5.0.0

Weekly downloads
-
License
MIT
Repository
github
Last release
4 months ago

PubSub Event Bus

npm version Build Status

PubSub Event Bus is built on top of NestJS CQRS module.

It gives the ability to use NestJS Cqrs Module across microservice architecture, using RabbitMQ message broker.

Table of Contents

Installation

First install the required package:

npm install --save @goparrot/pubsub-event-bus

It is highly recommended installing peerDependencies by yourself.

Import module

Import module & configure it by providing the connection string.

import { CqrsModule } from "@goparrot/pubsub-event-bus";

export const connections: string[] = ["amqp://username:pass@example.com/virtualhost"];

@Module({
    imports: [CqrsModule.forRoot({ connections })],
})
export class AppModule {}

Full list of the PubSub CQRS Module options:

OptionsDescription
connectionsArray of connection strings
configAMQP connection options
isGlobalShould the module be registered as global
loggerLogger service to be used
connectionNameName of the connection to be displayed in the server logs and management UI. Final name will have a suffix :producer or :consumer depending on the connection purpose
retryOptionsGlobal options for the retry mechanism. Read more in the Retry Mechanism section

Note: The CqrsModule class should be imported from @goparrot/pubsub-event-bus library.

Usage

Create event

Event is a simple class with message payload.

export class StoreCreated implements IEvent {
    constructor(private readonly storeId: string) {
    }
}

This is a fully compatible event class that can be used with NestJS EventBus.

In order to make it PubSub ready, it should extend the AbstractPubsubEvent class and be decorated with PubsubEvent ( both imported from @goparrot/pubsub-event-bus).

import { AbstractPubsubEvent, PubsubEvent } from "@goparrot/pubsub-event-bus";

export interface IStoreCreatedPayload {
    storeId: string;
}

@PubsubEvent({ exchange: "store" })
export class StoreCreated extends AbstractPubsubEvent<IStoreCreatedPayload> {}

Publish event

Inject EventBus into the service in order to emit the event (imported from @goparrot/pubsub-event-bus).

import { EventBus } from "@goparrot/pubsub-event-bus";
import { Injectable } from "@nestjs/common";

@Injectable()
class SomeService {
    constructor(private readonly eventBus: EventBus) {
    }

    async doCoolStuff() {
        // create item

        await this.eventBus.publish(new StoreCreated({ storeId }));

        // return item
    }
}

Consuming events

Create event handler

Create a simple class which extends AbstractPubsubHandler and is decorated with PubsubEventHandler (both imported from @goparrot/pubsub-event-bus).

import { AbstractPubsubHandler, PubsubEventHandler } from "@goparrot/pubsub-event-bus";

@PubsubEventHandler(StoreCreated)
export class StoreCreatedHandler extends AbstractPubsubHandler<StoreCreated> {
    handle(event: StoreCreated) {
        console.log(`[${this.constructor.name}] ->`, event.payload);
    }
}

Notice, Unlike regular Cqrs events handlers, PubSub EventHandler uses its own decorator @PubsubEventHandler(StoreCreated)

@PubsubEventHandler decorator accepts a list of Events it is listening for, like:

@PubsubEventHandler(StoreCreated, UserCreated)

Implement required methods:

handle - central point where event payload will come

Register event handler

Register the event handler as provider:

@Module({
    providers: [StoreCreatedHandler],
})
export class AppModule {}

Once registered, event handler will start listening for incoming events.

Configuration

Event Configuration

In order to emit an event with extra headers, just call the withOptions({}) method and provide required configuration:

await this.eventBus.publish(
    new StoreCreated({ storeId: "storeId" }).withOptions({
        persistent: false,
        priority: 100,
        headers: ["..."],
    }),
);

Handler Configuration

PubsubEventHandler decorator accepts handler options as the last argument. List of available options

OptionsDescription
autoAckEvent acknowledge mode. Default ALWAYS_ACK. Read more in the Acknowledge Mode section
queueCustom queue name
bindingQueueOptionsQueue binding options from the amqplib
retryOptionsHandler specific retry options. Read more in the Retry Mechanism section

Acknowledge Mode

By default, library creates queues without automatic delivery acknowledgement, therefore, messages should be acknowledged by the client. There are several acknowledge modes provided by the library:

ALWAYS_ACK (default)

Positive acknowledge in case of success or failure

ACK_AND_NACK

Automatic positive ack in case of success and automatic negative acknowledge in case of error

NEVER

Acknowledge should be performed manually. Message can be manually positively or negatively acknowledged using AbstractPubsubHandler.ack and AbstractPubsubHandler.nack methods respectively

AUTO_RETRY

Automatic positive ack in case of success and automatic retry attempt in case of error. Read more in the Retry Mechanism section

Retry Mechanism

PubSub Event Bus supports automatic event processing retries with static or dynamic backoff. It can be enabled by setting acknowledge mode to AUTO_RETRY. In case of any unhandled error library will publish the event to the delayed exchange to return it back the queue with a delay.

Retry mechanism can be configured both on module and handler levels. Handler specific options are merged with the module ones.

Available options:

OptionsDescriptionDefault value
maxRetryAttemptsMaximum number of retry attempts3
delayDelay between retry attempts in milliseconds. Can be a fixed positive number or a function that receives current retry attempt count and returns delayMath.floor(1000 * Math.exp(retryCount - 1))
strategyRetry strategy to be used. Read more in the Retry Strategies sectionDEAD_LETTER_TTL

When number of retry attempts is exceeded handler method onRetryAttemptsExceeded is called with the event and last error as arguments. Then message is discarded.

Example:

// app.module.ts

import { CqrsModule, RetryStrategyEnum } from "@goparrot/pubsub-event-bus";

export const connections: string[] = ["amqp://username:pass@example.com/virtualhost"];

@Module({
    imports: [
        CqrsModule.forRoot({
            connections,
            retryOptions: {
                maxRetryAttempts: 5,
                delay: (retryCount: number) => retryCount * 1000,
                strategy: RetryStrategyEnum.DELAYED_MESSAGE_EXCHANGE,
            },
        }),
    ],
})
export class AppModule {}

// store-created.handler.ts

import { AbstractPubsubHandler, PubsubEventHandler, RetryStrategyEnum } from "@goparrot/pubsub-event-bus";

@PubsubEventHandler(StoreCreated, {
    autoAck: AutoAckEnum.AUTO_RETRY,
    retryOptions: {
        maxRetryAttempts: 10,
        delay: (retryCount: number) => retryCount ** 2 * 1000,
        strategy: RetryStrategyEnum.DEAD_LETTER_TTL,
    },
})
export class StoreCreatedHandler extends AbstractPubsubHandler<StoreCreated> {
    async handle(event: StoreCreated) {
        // process the event
    }

    async onRetryAttemptsExceeded(event: StoreCreated, error: Error) {
        // log the event processing failure
    }
}

Retry Strategies

This library provides two different strategies for retry mechanism implementation. The main differences are requirements and performance.

Dead Letter Message and Per-Message TTL Strategy

This strategy has no additional requirements and therefore is the default one.

Library creates several RabbitMQ components:

  • Waiting queues, one for each waiting time
  • Exchange to route messages to the corresponding waiting queue
  • Exchange to route messages back to source queue

Example:

There are two PubSub handlers:
The first one with static delay 1000 ms and 5 maximum retry attempts. Only one queue is required with waiting time 1000 ms.
The second one with delay function 1000*2^x ms and 3 maximum retry attempts. Several queues are required with waiting time 1000, 2000 and 4000 ms.

Therefore, library will create 3 queues with 1000, 2000 and 4000 ms waiting time. Queue with waiting time 1000 ms will be used for both handlers.

Delayed Message Exchange Strategy

This strategy requires RabbitMQ Delayed Message Plugin to be installed and enabled on the RabbitMQ server.

Library creates a delayed message exchange to route messages back to the source queue with a set delay.

Known Issues

Several handlers listening to the same event

Problem: When several handlers listen to the same event, each handler receives the same event several times (equal to number of listeners)

Workaround: You can create one "proxy" pub-sub event listener that will listen to the required event. Then there are two options available:

  • It will publish a local event with the same content. Then you can create as many event listeners to this local event as you need. Main disadvantage of this approach is that the pub-sub event is acknowledged in this "proxy" event listener. Therefore, if something goes wrong in the actual event listeners, the library won't handle the error.
  • It will execute all the required commands. Main disadvantage of this approach is that the pub-sub event acknowledgement is shared to all commands. Therefore, you should handle the double event processing in each command handler.

Enjoy!

5.0.0

4 months ago

4.2.0

4 months ago

4.1.3-alpha.1

5 months ago

4.1.3

12 months ago

4.1.2

1 year ago

4.1.1

2 years ago

4.0.3-dev.2

2 years ago

4.0.3-dev.1

2 years ago

4.0.3

2 years ago

4.1.0-dev.1

2 years ago

4.1.0

2 years ago

4.0.1

2 years ago

4.0.2

2 years ago

3.0.2-alpha.1

2 years ago

3.0.0-alpha.1

2 years ago

3.0.0-alpha.3

2 years ago

3.0.0-alpha.2

2 years ago

4.0.0

2 years ago

2.3.4

2 years ago

3.0.2

2 years ago

3.0.1

2 years ago

3.0.0

2 years ago

2.3.2-dev.log.1

2 years ago

2.3.2

2 years ago

2.3.1

2 years ago

2.3.3

2 years ago

2.3.2-dev.cqrs.1

2 years ago

2.2.0

2 years ago

2.3.0-dev.cqrs.2

2 years ago

2.1.0-dev.amqp.1

2 years ago

2.3.0

2 years ago

2.3.0-dev.cqrs.1

2 years ago

2.1.0

2 years ago

2.0.3-dev.4

2 years ago

2.0.2-dev.1

2 years ago

2.0.3-dev.3

2 years ago

2.0.3-dev.2

2 years ago

2.0.3-dev.1

2 years ago

2.0.2

2 years ago

2.0.1

3 years ago

2.0.0

3 years ago

1.5.2

3 years ago

1.5.1

3 years ago

1.5.0-dev.5611.2

3 years ago

1.5.0-dev.5611.1

3 years ago

1.5.0-dev.5611.0

3 years ago

1.5.0

3 years ago

1.4.0

3 years ago

1.3.0-dev.1

3 years ago

1.2.0-dev.5

3 years ago

1.3.0

3 years ago

1.2.0-dev.mc.1

3 years ago

1.2.0-dev.3

3 years ago

1.2.0-dev.2

3 years ago

1.2.0-dev.1

3 years ago

1.2.0-dev.5201.2

3 years ago

1.2.0-dev.5201.1

3 years ago

1.2.0

3 years ago

1.1.1

3 years ago

1.1.0

3 years ago

1.0.2

4 years ago

1.0.1

4 years ago

1.0.0-dev.4

4 years ago

1.0.0-dev.3

4 years ago

1.0.0-dev.2

4 years ago

1.0.0-dev.1

4 years ago

1.0.0-dev.0

4 years ago

0.0.1-dev.6

4 years ago

0.0.1-dev.5

4 years ago

0.0.1-dev.4

4 years ago

0.0.1-dev.3

4 years ago

0.0.1-dev.2

4 years ago

0.0.1-dev.1

4 years ago

0.0.1-dev.0

4 years ago