2.9.2 • Published 5 months ago

@nestjstools/messaging-rabbitmq-extension v2.9.2

Weekly downloads
-
License
MIT
Repository
github
Last release
5 months ago

@nestjstools/messaging-rabbitmq-extension

A NestJS library for managing asynchronous and synchronous messages with support for buses, handlers, channels, and consumers. This library simplifies building scalable and decoupled applications by facilitating robust message handling pipelines while ensuring flexibility and reliability.


Documentation

https://nestjstools.gitbook.io/nestjstools-messaging-docs


Installation

npm install @nestjstools/messaging @nestjstools/messaging-rabbitmq-extension 

or

yarn add @nestjstools/messaging @nestjstools/messaging-rabbitmq-extension

RabbitMQ Integration: Messaging Configuration Example


import {MessagingModule} from '@nestjstools/messaging';
import {InMemoryChannelConfig, AmqpChannelConfig, ExchangeType} from '@nestjstools/messaging/channels';
import {SendMessageHandler} from './handlers/send-message.handler';
import {MessagingRabbitmqExtensionModule} from '@nestjstools/messaging-rabbitmq-extension';

@Module({
   imports: [
      MessagingRabbitmqExtensionModule,
      MessagingModule.forRoot({
         messageHandlers: [SendMessageHandler],
         buses: [
            {
               name: 'message.bus',
               channels: ['my-channel'],
            },
            {
               name: 'command-bus', //The naming is very flexible
               channels: ['amqp-command'], //be sure if you defined same channels name as you defined below 
            },
            {
               name: 'event-bus',
               channels: ['amqp-event'],
            },
         ],
         channels: [
            new InMemoryChannelConfig({
               name: 'my-channel',
               middlewares: [],
            }),
            new AmqpChannelConfig({
               name: 'amqp-command',
               connectionUri: 'amqp://guest:guest@localhost:5672/',
               exchangeName: 'my_app_command.exchange',
               bindingKeys: ['my_app.command.#'],
               exchangeType: ExchangeType.TOPIC,
               middlewares: [],
               queue: 'my_app.command',
               autoCreate: true, // Create exchange, queue & bind keys
            }),
            new AmqpChannelConfig({
               name: 'amqp-event',
               connectionUri: 'amqp://guest:guest@localhost:5672/',
               exchangeName: 'my_app_event.exchange',
               bindingKeys: ['my_app_event.#'],
               exchangeType: ExchangeType.TOPIC,
               queue: 'my_app.event',
               avoidErrorsForNotExistedHandlers: true, // We can avoid errors if we don't have handler yet for the event
               autoCreate: true,
            }),
         ],
         debug: true,
      }),
   ],
})
export class AppModule {
}

Key Features:

  1. Multiple Message Buses:

    • Configure distinct buses for in-memory, commands, and events:
      • message.bus (in-memory).
      • command.message-bus (AMQP command processing).
      • event.message-bus (AMQP event processing).
  2. In-Memory Channel:

    • Simple and lightweight channel suitable for non-persistent messaging or testing purposes.
  3. AMQP Channels:

    • Fully integrated RabbitMQ channel configuration using AmqpChannelConfig.
  4. Channel Details:

    • connectionUri: Specifies the RabbitMQ server connection.
    • exchangeName: The AMQP exchange to publish or consume messages from.
    • bindingKeys: Define message routing patterns using wildcards (e.g., my_app.command.#).
    • exchangeType: Supports RabbitMQ exchange types such as TOPIC.
    • queue: Specify a RabbitMQ queue to consume messages from.
    • autoCreate: Automatically creates the exchange, queue, and bindings if they don’t exist.
  5. Error Handling:

    • Use avoidErrorsForNotExistedHandlers in amqp-event to gracefully handle missing handlers for event messages.
  6. Debug Mode:

    • Enable debug: true to assist in monitoring and troubleshooting messages.

This configuration provides a solid foundation for integrating RabbitMQ as part of your messaging system. It facilitates the decoupling of commands, events, and in-memory operations, ensuring reliable and scalable communication across distributed systems.


Mapping Messages in RabbitMQ Channel

Topic Exchange

For optimal routing, it's recommended to use routing keys as part of the binding key. For example, if you bind a queue with the key my_app.command.#, messages with routing keys like my_app.command.domain.action will automatically be routed to that queue. This ensures that any message with a routing key starting with my_app.command is directed to the appropriate queue. Here's a more concise and clear version of your explanation:

Direct Exchange

Ensure your queue has defined binding keys, as messages will be routed to queues based on these keys. If no binding keys are defined, the routing key in RabbitMQ will default to the routing key specified in the handler.

Fanout Exchange

The Fanout Exchange broadcasts messages to all bound queues, ignoring the routing key. This type of exchange is useful for scenarios where you need to distribute the same message to multiple consumers.

Additional

  • You can override message routing using AmqpMessageOptions. This allows sending a message to a specified exchange and routing it with a custom key.
    this.messageBus.dispatch(new RoutingMessage(new SendMessage('Hello Rabbit!'), 'app.command.execute', new AmqpMessageOptions('exchange_name', 'rabbitmq_routing_key_to_queue')));

📨 Communicating Beyond a NestJS Application (Cross-Language Messaging)

To enable communication with a Handler from services written in other languages, follow these steps:

  1. Publish a Message to the queue

  2. Include the Routing Key Header Your message must include a header attribute named messaging-routing-key. The value should correspond to the routing key defined in your NestJS message handler:

    @MessageHandler('my_app_command.create_user') // <-- Use this value as the routing key
  3. You're Done! Once the message is published with the correct routing key, it will be automatically routed to the appropriate handler within the NestJS application.


Configuration options

AmqpChannel

AmqpChannelConfig

PropertyDescriptionDefault Value
nameName of the AMQP channel (e.g., 'amqp-command').
connectionUriURI for the RabbitMQ connection, such as 'amqp://guest:guest@localhost:5672/'.
exchangeNameThe AMQP exchange name (e.g., 'my_app.exchange').
bindingKeysThe routing keys to bind to (e.g., ['my_app.command.#']).[]
exchangeTypeType of the RabbitMQ exchange (e.g., TOPIC).
queueThe AMQP queue to consume messages from (e.g., 'my_app.command').
autoCreateAutomatically creates the exchange, queue, and bindings if they don’t exist.true
enableConsumerEnables or disables the consumer for this channel.true
avoidErrorsForNotExistedHandlersAvoid errors if no handler is available for the message.false
deadLetterQueueFeatureEnables a dead-letter queue to capture messages that could not be processed due to errors.false

This table provides a structured overview of the MessagingModule.forRoot configuration, with details about each property within buses and channels and their corresponding default values.

2.9.2

5 months ago

2.9.1

5 months ago

2.9.0

5 months ago

2.8.0

5 months ago

2.7.0

6 months ago

2.6.0

6 months ago

2.5.0

7 months ago

2.4.3

8 months ago

2.4.2

8 months ago

2.4.1

8 months ago

2.4.0

8 months ago

2.3.1

9 months ago

2.3.0

9 months ago

2.2.0

9 months ago

2.1.0

9 months ago

2.0.1

9 months ago

2.0.0

9 months ago

1.7.0

9 months ago

1.6.1

9 months ago

1.5.1

9 months ago

1.4.2

9 months ago

1.4.1

9 months ago

1.4.0

9 months ago

1.3.1

9 months ago

1.2.4

9 months ago

1.2.3

9 months ago

1.2.0

9 months ago

1.1.0

9 months ago

1.0.5

9 months ago

1.0.3

9 months ago

1.0.1

9 months ago

1.0.0

9 months ago