@eppler-software/rabbitmq-consumer v1.0.4
@eppler-software/rabbitmq-consumer
A reusable library for building RabbitMQ consumers in TypeScript with pluggable message handlers.
Installation
pnpm install @eppler-software/rabbitmq-consumer amqplib pg dotenv
# Or use npm install or yarn addPeer Dependencies: amqplib, pg, dotenv
Simple Example
Here's a basic example demonstrating how to set up a consumer that listens to a queue, defines a message type, and handles it.
1. Define Your Message Types
// src/messageTypes.ts
import { BaseMessage } from '@eppler-software/rabbitmq-consumer';
export interface SimpleGreetingMessage extends BaseMessage {
type: 'greeting.simple';
payload: { name: string };
}2. Implement Your Message Handler
// src/handlers.ts
import { MessageHandlerFn } from '@eppler-software/rabbitmq-consumer';
import { SimpleGreetingMessage } from './messageTypes';
export const handleSimpleGreeting: MessageHandlerFn<SimpleGreetingMessage> = async (
message,
services, // services object contains rabbitMQService and postgresService
) => {
console.log(`Received a simple greeting for: ${message.payload.name}`);
// You can use services here if needed, e.g., services.rabbitMQService.sendToQueue(...)
};3. Configure and Start the Consumer
// src/index.ts
import { ConsumerLib, ConsumerLibConfig, MessageHandlers } from '@eppler-software/rabbitmq-consumer';
import dotenv from 'dotenv';
import { handleSimpleGreeting } from './handlers'; // Import your handler
dotenv.config();
const rabbitMQUrl = process.env.RABBITMQ_URL || 'amqp://localhost';
const myConsumerQueue = 'my_simple_queue';
// Map your message type to the handler function
const messageHandlers: MessageHandlers = {
'greeting.simple': handleSimpleGreeting,
};
const config: ConsumerLibConfig = {
rabbitmq: {
url: rabbitMQUrl,
consumerQueueName: myConsumerQueue, // The queue this consumer listens to
queuesToAssert: [myConsumerQueue], // Ensure the consumer queue exists
},
// No 'postgres' config needed if handlers don't use it
messageHandlers: messageHandlers,
};
const consumer = new ConsumerLib(config);
consumer.start().catch(err => {
console.error('Failed to start consumer:', err);
process.exit(1);
});To run this example:
- Make sure you have a RabbitMQ instance running (defaulting to
amqp://localhost). - Put the code for
messageTypes.ts,handlers.ts, andindex.tsin your project'ssrcdirectory. - Have a
tsconfig.jsonandpackage.jsonset up for your application. - Run
pnpm install(or npm/yarn). - Run
pnpm build(or npm run build). - Run
pnpm start(or npm start).
Send a message like this to my_simple_queue to see the handler execute:
{
"type": "greeting.simple",
"payload": {
"name": "World"
}
}Example with Routing and Database
This example demonstrates handling two different message types, one that routes to another queue and one that writes to a database and then routes.
Message Types
// src/messageTypes.ts
import { BaseMessage } from '@eppler-software/rabbitmq-consumer';
export interface RouteMeMessage extends BaseMessage {
type: 'action.route_me';
payload: { someData: any };
}
export interface ProcessAndRouteMessage extends BaseMessage {
type: 'action.process_and_route';
payload: { dbData: any; queueDData: any };
}Handlers
// src/handlers.ts
import { MessageHandlerFn } from '@eppler-software/rabbitmq-consumer';
import { RouteMeMessage, ProcessAndRouteMessage } from './messageTypes';
const queueB = 'queue_for_routing';
const queueD = 'queue_after_processing';
export const handleRouteMe: MessageHandlerFn<RouteMeMessage> = async (
message,
{ rabbitMQService },
) => {
console.log('Handling Route Me:', message.payload);
await rabbitMQService.sendToQueue(queueB, Buffer.from(JSON.stringify(message.payload)));
console.log(`Routed to ${queueB}`);
};
export const handleProcessAndRoute: MessageHandlerFn<ProcessAndRouteMessage> = async (
message,
{ postgresService, rabbitMQService },
) => {
console.log('Handling Process and Route:', message.payload);
if (!postgresService) {
throw new Error('PostgresService is required for Process and Route handler');
}
await postgresService.insertData(message.payload.dbData);
console.log('Data written to database');
await rabbitMQService.sendToQueue(queueD, Buffer.from(JSON.stringify(message.payload.queueDData)));
console.log(`Routed to ${queueD}`);
};Consumer Configuration
// src/index.ts
import { ConsumerLib, ConsumerLibConfig, MessageHandlers } from '@eppler-software/rabbitmq-consumer';
import dotenv from 'dotenv';
import { handleRouteMe, handleProcessAndRoute } from './handlers'; // Import your handlers
dotenv.config();
const rabbitMQUrl = process.env.RABBITMQ_URL || 'amqp://localhost';
const postgresConnectionString = process.env.POSTGRES_URL || 'postgresql://user:password@host:port/database'; // Ensure this is set
const mainConsumerQueue = 'my_main_processing_queue';
const queueB = 'queue_for_routing'; // Must match the handler
const queueD = 'queue_after_processing'; // Must match the handler
const messageHandlers: MessageHandlers = {
'action.route_me': handleRouteMe,
'action.process_and_route': handleProcessAndRoute,
};
const config: ConsumerLibConfig = {
rabbitmq: {
url: rabbitMQUrl,
consumerQueueName: mainConsumerQueue,
queuesToAssert: [
mainConsumerQueue,
queueB,
queueD,
],
},
postgres: { // Include postgres config because a handler needs it
connectionString: postgresConnectionString,
},
messageHandlers: messageHandlers,
};
const consumer = new ConsumerLib(config);
consumer.start().catch(err => {
console.error('Failed to start consumer:', err);
process.exit(1);
});To run this example:
- Make sure you have RabbitMQ and PostgreSQL running.
- Update your
.envfile withRABBITMQ_URLandPOSTGRES_URL. - Use the code for message types, handlers, and index.
- Install dependencies (
pnpm install). - Build and start (
pnpm build,pnpm start).
Send messages to my_main_processing_queue to trigger the handlers:
Message for action.route_me:
{
"type": "action.route_me",
"payload": {
"someData": "This will go to queue_for_routing"
}
}Message for action.process_and_route:
{
"type": "action.process_and_route",
"payload": {
"dbData": {
"recordId": 123,
"value": "data for postgres"
},
"queueDData": {
"finalStep": true,
"processedValue": "ready for queue_after_processing"
}
}
}This README focuses on clear, runnable examples, which is often the most effective way to showcase how a library is used. Remember to keep the examples concise and directly demonstrate the key features.