1.0.0 • Published 4 months ago
mqmanager-nestjs v1.0.0
MQManager
A lightweight and extensible npm package for handling RabbitMQ producers and consumers in a Node.js/TypeScript environment. This package simplifies the integration of RabbitMQ into your applications by providing easy-to-use producer and consumer classes.
Features
- Producer: Send messages to a RabbitMQ queue with support for publisher confirms.
- Consumer: Consume messages from a RabbitMQ queue with retry logic and manual acknowledgment.
- TypeScript Support: Full TypeScript definitions included.
- Configurable: Queue and exchange names, retry policies, and connection details are fully configurable.
- Graceful Shutdown: Handles cleanup during process termination.
Installation
Install the package via npm:
npm install mqmanager
Usage
Importing
You can import the RabbitMQProducer
and RabbitMQConsumer
classes:
import { RabbitMQProducer, RabbitMQConsumer } from "mqmanager";
Producer Example
This example demonstrates how to use the RabbitMQProducer
to send messages to a RabbitMQ queue.
import { RabbitMQProducer } from "mqmanager";
const runProducer = async () => {
const producer = new RabbitMQProducer(
"example-queue", // Queue name
"example-exchange", // Exchange name
"amqp://localhost" // RabbitMQ connection string
);
try {
await producer.connect();
console.log("Producer connected successfully.");
// Send a message
const message = { id: 1, text: "Hello, RabbitMQ!" };
await producer.sendMessage(message);
console.log("Message sent successfully.");
} catch (error) {
console.error("Producer error:", error);
} finally {
await producer.close();
}
};
runProducer();
Consumer Example
This example demonstrates how to use the RabbitMQConsumer
to consume messages from a RabbitMQ queue.
import { RabbitMQConsumer, Message } from "mqmanager";
const runConsumer = async () => {
const consumer = new RabbitMQConsumer(
"example-queue", // Queue name
"example-exchange", // Exchange name
"amqp://localhost", // RabbitMQ connection string
{ maxRetries: 3, retryDelay: 1000 } // Retry options
);
try {
await consumer.connect();
console.log("Consumer connected successfully.");
// Define the message handler
const messageHandler = async (message: Message) => {
console.log("Received message:", message);
// Simulate processing
if (message.id === 1) {
throw new Error("Simulated processing error");
}
console.log("Message processed successfully:", message);
};
// Start consuming messages
await consumer.consume(messageHandler);
// Handle process signals for a graceful shutdown
process.on("SIGINT", async () => {
console.log("Received SIGINT. Shutting down gracefully...");
await consumer.close();
process.exit(0);
});
process.on("SIGTERM", async () => {
console.log("Received SIGTERM. Shutting down gracefully...");
await consumer.close();
process.exit(0);
});
} catch (error) {
console.error("Consumer error:", error);
}
};
runConsumer();
API Reference
RabbitMQProducer
Constructor
constructor(queueName: string, exchangeName: string, connectionString: string);
queueName
: Name of the queue to send messages to.exchangeName
: Name of the exchange to use.connectionString
: RabbitMQ connection string.
Methods
connect()
: Connects the producer to RabbitMQ.sendMessage(message: Record<string, any>)
: Sends a JSON message to the queue.close()
: Closes the connection.
RabbitMQConsumer
Constructor
constructor(
queueName: string,
exchangeName: string,
connectionString: string,
retryOptions?: { maxRetries: number; retryDelay: number }
);
queueName
: Name of the queue to consume messages from.exchangeName
: Name of the exchange to use.connectionString
: RabbitMQ connection string.retryOptions
: (Optional) Retry policy.maxRetries
: Maximum number of retries.retryDelay
: Delay between retries (in milliseconds).
Methods
connect()
: Connects the consumer to RabbitMQ.consume(handler: (message: Message) => Promise<void>)
: Starts consuming messages using the provided handler.close()
: Closes the connection.
Configuration
Environment Variables
You can configure RabbitMQ connection details via environment variables:
RABBITMQ_URL
: RabbitMQ connection string (e.g.,amqp://localhost
).
Example .env
file:
RABBITMQ_URL=amqp://localhost
License
MIT
1.0.0
4 months ago