1.1.5 • Published 5 months ago

@luongdao/node-rabbitmq v1.1.5

Weekly downloads
-
License
MIT
Repository
github
Last release
5 months ago

@luongdao/node-rabbitmq

Description

A small package inpired by @golevelup/nestjs-rabbitmq library. Helps to work with rabbitmq in nodejs more easily (support expressjs).

Connection Management

This packages leverages amqp-connection-manager package to support connection resiliency.

By default, if you try to launch the app when rabbit connection is not available, an error was thrown and caused the app to crash.

If you want to enable connection resiliency, you can configure connectionInitOptions to not wait for a connection to be available. For example:

import {
  RabbitMQService,
  EXTENDED_EXCHANGE_TYPE,
} from '@luongdao/node-rabbitmq';

async function initRabbitMQ() {
  const rabbitMQService = new RabbitMQService({
    exchanges: [
      {
        name: 'exchange1',
        type: EXTENDED_EXCHANGE_TYPE.TOPIC,
      },
    ],
    uri: 'amqp://rabbitmq:rabbitmq@localhost:5672',
    connectionInitOptions: { wait: false },
  });

  await rabbitMQService.initConnection();
}

With wait set to false, unavailability of a RabbitMQ broker still allows your application to bootstrap correctly and relevant channel setups take place whenever a connection can be established.

The same principle applies to when a connection is lost. In such cases, rabbitmq tries to reconnect and set up everything again once it is reconnected.

Usage

Install

npm install ---save @luongdao/node-rabbitmq

or

yarn add @luongdao/node-rabbitmq

Module Initialization

Import and add RabbitMQService it to the imports array of module.

import {
  RabbitMQService,
  EXTENDED_EXCHANGE_TYPE,
} from '@luongdao/node-rabbitmq';

async function initRabbitMQ() {
  const rabbitMQService = new RabbitMQService({
    exchanges: [
      {
        name: 'exchange1',
        type: 'topic',
      },
    ],
    uri: 'amqp://rabbitmq:rabbitmq@localhost:5672',
    channels: {
      channel_1: {
        prefetchCount: 15,
        default: true,
      },
      channel_2: {
        prefetchCount: 2,
      },
    },
  });

  await rabbitMQService.initConnection();
}

Register RabbitMQ subscribers

Simply make use of registerSubscriber() function of the connection object obtained from your rabbitmq service.

import { RabbitMQSubscriberHandler } from '@luongdao/node-rabbitmq';

// init rabbitmq service and connection

export const subscriber1: RabbitMQSubscriberHandler<{
  data: string;
}> = (msg) => {
  logger.debug('Subscriber 1 is running: ' + msg.data);
};

await service.connection.registerSubscriber(subscriber1, {
  queue: 'queue_1',
  exchange: 'exchange_1',
  routingKey: 'routing_key_1',
  queueOptions: {
    channel: 'channel_1',
  },
});

Handling messages with format different than JSON

By default, messages are parsed with JSON.parse method when they are received and stringified with JSON.stringify on publish. If you wish to change this behavior, you can use your own parsers, like so

import {
  RabbitMQService,
  EXTENDED_EXCHANGE_TYPE,
} from '@luongdao/node-rabbitmq';
import { ConsumeMessage } from 'amqplib';

async function initRabbitMQ() {
  const rabbitMQService = new RabbitMQService({
    // ...other configs
    deserializer: (message: Buffer, msg: ConsumeMessage) => {
      const decodedMessage = myCustomDeserializer(
        msg.toString(),
        msg.properties.headers,
      );
      return decodedMessage;
    },
    serializer: (msg: any) => {
      const encodedMessage = myCustomSerializer(msg);
      return Buffer.from(encodedMessage);
    },
  });

  await rabbitMQService.initConnection();
}

Publising Messages (Fire and Forget)

If you just want to publish a message onto a RabbitMQ exchange, use the publish method of the AmqpConnection which has the following signature:

public publish<T = any>(
  exchange: string,
  routingKey: string,
  message: T,
  options?: amqplib.Options.Publish
)

// init service and connnection
await rabbitMQService.connection.publish('exchange_1', 'routing_key_1', {message: 'Hello World'})

For example:

// init service and connnection
await rabbitMQService.connection.publish('exchange_1', 'routing_key_1', {
  message: 'Hello World',
});
1.1.5

5 months ago

1.1.4

5 months ago

1.1.3

5 months ago

1.1.2

5 months ago

1.1.1

5 months ago

1.1.0

5 months ago

1.0.7

5 months ago

1.0.6

5 months ago

1.0.5

5 months ago

1.0.4

5 months ago

1.0.3

5 months ago

1.0.2

5 months ago

1.0.1

5 months ago

1.0.0

5 months ago