1.0.0 • Published 4 months ago

mqmanager-nestjs v1.0.0

Weekly downloads
-
License
MIT
Repository
github
Last release
4 months ago

MQManager

A lightweight and extensible npm package for handling RabbitMQ producers and consumers in a Node.js/TypeScript environment. This package simplifies the integration of RabbitMQ into your applications by providing easy-to-use producer and consumer classes.


Features

  • Producer: Send messages to a RabbitMQ queue with support for publisher confirms.
  • Consumer: Consume messages from a RabbitMQ queue with retry logic and manual acknowledgment.
  • TypeScript Support: Full TypeScript definitions included.
  • Configurable: Queue and exchange names, retry policies, and connection details are fully configurable.
  • Graceful Shutdown: Handles cleanup during process termination.

Installation

Install the package via npm:

npm install mqmanager

Usage

Importing

You can import the RabbitMQProducer and RabbitMQConsumer classes:

import { RabbitMQProducer, RabbitMQConsumer } from "mqmanager";

Producer Example

This example demonstrates how to use the RabbitMQProducer to send messages to a RabbitMQ queue.

import { RabbitMQProducer } from "mqmanager";

const runProducer = async () => {
  const producer = new RabbitMQProducer(
    "example-queue", // Queue name
    "example-exchange", // Exchange name
    "amqp://localhost" // RabbitMQ connection string
  );

  try {
    await producer.connect();
    console.log("Producer connected successfully.");

    // Send a message
    const message = { id: 1, text: "Hello, RabbitMQ!" };
    await producer.sendMessage(message);

    console.log("Message sent successfully.");
  } catch (error) {
    console.error("Producer error:", error);
  } finally {
    await producer.close();
  }
};

runProducer();

Consumer Example

This example demonstrates how to use the RabbitMQConsumer to consume messages from a RabbitMQ queue.

import { RabbitMQConsumer, Message } from "mqmanager";

const runConsumer = async () => {
  const consumer = new RabbitMQConsumer(
    "example-queue", // Queue name
    "example-exchange", // Exchange name
    "amqp://localhost", // RabbitMQ connection string
    { maxRetries: 3, retryDelay: 1000 } // Retry options
  );

  try {
    await consumer.connect();
    console.log("Consumer connected successfully.");

    // Define the message handler
    const messageHandler = async (message: Message) => {
      console.log("Received message:", message);
      // Simulate processing
      if (message.id === 1) {
        throw new Error("Simulated processing error");
      }
      console.log("Message processed successfully:", message);
    };

    // Start consuming messages
    await consumer.consume(messageHandler);

    // Handle process signals for a graceful shutdown
    process.on("SIGINT", async () => {
      console.log("Received SIGINT. Shutting down gracefully...");
      await consumer.close();
      process.exit(0);
    });

    process.on("SIGTERM", async () => {
      console.log("Received SIGTERM. Shutting down gracefully...");
      await consumer.close();
      process.exit(0);
    });
  } catch (error) {
    console.error("Consumer error:", error);
  }
};

runConsumer();

API Reference

RabbitMQProducer

Constructor

constructor(queueName: string, exchangeName: string, connectionString: string);
  • queueName: Name of the queue to send messages to.
  • exchangeName: Name of the exchange to use.
  • connectionString: RabbitMQ connection string.

Methods

  • connect(): Connects the producer to RabbitMQ.
  • sendMessage(message: Record<string, any>): Sends a JSON message to the queue.
  • close(): Closes the connection.

RabbitMQConsumer

Constructor

constructor(
  queueName: string,
  exchangeName: string,
  connectionString: string,
  retryOptions?: { maxRetries: number; retryDelay: number }
);
  • queueName: Name of the queue to consume messages from.
  • exchangeName: Name of the exchange to use.
  • connectionString: RabbitMQ connection string.
  • retryOptions: (Optional) Retry policy.
    • maxRetries: Maximum number of retries.
    • retryDelay: Delay between retries (in milliseconds).

Methods

  • connect(): Connects the consumer to RabbitMQ.
  • consume(handler: (message: Message) => Promise<void>): Starts consuming messages using the provided handler.
  • close(): Closes the connection.

Configuration

Environment Variables

You can configure RabbitMQ connection details via environment variables:

  • RABBITMQ_URL: RabbitMQ connection string (e.g., amqp://localhost).

Example .env file:

RABBITMQ_URL=amqp://localhost

License

MIT

1.0.0

4 months ago