@folksdo/event-bus v1.0.3
# Folksdo Event Bus
A lightweight event bus built on RabbitMQ, providing:
- **ConfigLoader**: Parses environment variables (and constructor overrides) into a strongly‐typed `RabbitMQConfig`.
- **ConnectionManager**: Handles AMQP connection, channel pooling, and reconnection logic.
- **EventBusClient**: Core publish/subscribe API with JSON serialization, automatic exchange/queue declarations, acknowledgments, error handling, and built-in Retry & Dead-Letter support.
- Supports both **topic**-style routing (via routing keys) and **headers**-exchange routing (via message-level headers).
- Automatically manages a per-exchange “retry” exchange + retry queue (with TTL → back to the main exchange) and a Dead-Letter Exchange (DLX) + Dead-Letter Queue (DLQ).
- **AppEvent & BaseEvent**: Strongly-typed domain event model with metadata (ID, type, source, timestamp, aggregate fields).
- **TypeScript Interfaces**: Defines interfaces for event payloads, headers, and client options, ensuring compile-time safety.
- **PublisherModule**: Generic wrapper for publishing `AppEvent<T>` instances to the event bus (including optional message headers).
- **SubscriberModule**: Generic wrapper for subscribing to events by type or header filter, managing queue bindings and consumption.
This README outlines project setup, components (including Retry/DLQ flow), usage examples, and test procedures.
---
## Table of Contents
1. [Prerequisites](#prerequisites)
2. [Getting Started](#getting-started)
3. [Architecture & Components](#architecture--components)
- [ConfigLoader](#configloader)
- [ConnectionManager](#connectionmanager)
- [EventBusClient](#eventbusclient)
- [Publish Path (with x-retry-count)](#publish-path-with-x-retry-count)
- [Subscribe Path (with Retry & DLQ)](#subscribe-path-with-retry--dlq)
- [AppEvent & BaseEvent](#appevent--baseevent)
- [PublisherModule](#publishermodule)
- [SubscriberModule](#subscribermodule)
4. [Usage Examples](#usage-examples)
- [Publishing an Event (Topic Exchange)](#publishing-an-event-topic-exchange)
- [Subscribing to Events (Topic)](#subscribing-to-events-topic)
- [Subscribing with Header Filters](#subscribing-with-header-filters)
5. [Testing](#testing)
- [Unit Tests](#unit-tests)
- [Integration Tests](#integration-tests)
6. [Docker Configuration for RabbitMQ](#docker-configuration-for-rabbitmq)
7. [License](#license)
---
## Prerequisites
- **Node.js** (v14 or higher)
- **npm** or **yarn**
- **RabbitMQ** (local or remote instance)
- **Docker** (for running a RabbitMQ container during integration tests)
By default, the code uses `amqp://localhost`. You can override with:
```bash
export AMQP_URI=amqp://your-rabbit-host
```You can also set any of the following in a .env file (or via process.env):
# ==============================
# RabbitMQ Connection Settings
# ==============================
# If you prefer a single URI, set this (overrides host/port):
# RABBITMQ_URL=amqp://guest:guest@localhost:5672/
RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/
# ==============================
# Exchange & Queue Configuration
# ==============================
RABBITMQ_EXCHANGE_NAME=default_exchange
RABBITMQ_QUEUE_NAME=default_queue
# ==============================
# Consumer Settings
# ==============================
RABBITMQ_PREFETCH_COUNT=10
RABBITMQ_RECONNECT_DELAY_MS=5000Getting Started
Clone the repository:
git clone https://github.com/your-org/folksdo-event-bus.git cd folksdo-event-busInstall dependencies:
npm install # or yarn installBuild the project (TypeScript → JavaScript):
npm run build # or yarn buildRun unit tests (no RabbitMQ needed):
npm test # or yarn testRun integration tests (requires RabbitMQ):
Make sure RabbitMQ is running (see Docker section below).
export AMQP_URI=amqp://localhost npm run test:integration # or yarn test:integrationRun in development mode (uses
ts-nodeonsrc/index.ts):npm run dev # or yarn devStart in production mode (after
npm run build):npm start # or yarn start
Architecture & Components
ConfigLoader
Centralizes environment parsing and default values into a strongly-typed config object. It uses Zod for runtime validation and enforces:
export interface RabbitMQConfig {
url?: string; // Full AMQP URI (optional; overrides host/port)
host: string; // If no URL is provided, the host to connect to
port: number; // If no URL, the port number
username: string; // RabbitMQ username
password: string; // RabbitMQ password
vhost: string; // Virtual host ("/" by default)
exchangeName: string; // Main exchange name
queueName: string; // Default queue name (unused by EventBusClient, but available)
prefetchCount: number; // Number of unacked messages to prefetch
reconnectDelayMs: number; // Base delay for exponential backoff on reconnect
}- Default values are provided if environment variables are missing.
- Constructor overrides can be passed as a partial
RabbitMQConfig, merging with defaults andprocess.env. Usage:
import { ConfigLoader } from './config'; const config = new ConfigLoader().getConfig(); // config is now a validated RabbitMQConfig.
File: src/config.ts
ConnectionManager
Manages a single RabbitMQ connection (typed as amqp.ChannelModel) with automatic reconnection, exponential backoff, and a channel pool. Emits 'reconnect' on successful connection or reconnection, and 'error' on failure.
Key Responsibilities
- Builds the AMQP URI: Uses
config.urlif present, otherwise constructsamqp://username:password@host:port/vhost. - Retries on Failure: Attempts
retryAttempts(default 5) with exponential backoff based onconfig.reconnectDelayMs. - Channel Pool: Exposes
getChannel(), which ensures a live connection and returns a newamqp.Channel. Tracks open channels in a pool for later cleanup. - Graceful Shutdown:
close()closes all pooled channels and the underlying connection.
Public Methods
class ConnectionManager extends EventEmitter {
constructor(config: RabbitMQConfig, retryAttempts = 5);
// Connects to RabbitMQ (or retries until success). Throws a descriptive
// error if unable to connect.
public async connect(): Promise<void>;
// Ensures connection is live, then returns a new channel.
public async getChannel(): Promise<amqp.Channel>;
// Closes all channels and the connection.
public async close(): Promise<void>;
}On failure to connect (after all retries),
connect()will throw:ConnectionManager: Failed to connect to RabbitMQ at "<uri>": <original error message>
File: src/connection.manager.ts
EventBusClient
Provides a high-level publish/subscribe API on top of ConnectionManager. Automatically declares exchanges, queues, retry/DLQ topology, and handles message routing & error flows. Supports both topic and headers exchange types.
Constructor
new EventBusClient(connectionManager: ConnectionManager, config: RabbitMQConfig);connectionManager: Instance ofConnectionManager.config: A validatedRabbitMQConfig(used primarily to readexchangeName,prefetchCount, and other settings).
Retry & Dead-Letter Topology (auto-declared per exchange)
For each exchangeName (e.g. "orders"), EventBusClient automatically asserts:
Retry Exchange
- Name:
${exchangeName}.retry.x(direct exchange) - Bound to: Retry Queue
- Name:
Retry Queue
- Name:
${exchangeName}.retry.queue Arguments:
x-message-ttl(default: 30000 ms) – how long to hold a retried message before dead-lettering back to main exchange.x-dead-letter-exchange:${exchangeName}– after TTL, the message returns to main exchange.
Bound to
retryExchangewith routing key'#'(catch-all).
- Name:
Dead-Letter Exchange (DLX)
- Name:
${exchangeName}.dlx(direct exchange) - Bound to: Dead-Letter Queue (DLQ)
- Name:
Dead-Letter Queue (DLQ)
- Name:
${exchangeName}.dlq - Bound to
dlxExchangewith routing key equal to queue name (${exchangeName}.dlq). - Holds messages that have exhausted retry attempts.
- Name:
Note: TTL can be customized in code. For exponential backoff, declare multiple retry queues with increasing TTLs.
publish<T>(event: AppEvent<T>, options?: PublishOptions): Promise<void>
Publishes an AppEvent<T> to RabbitMQ with optional persistence and headers. Always injects:
'x-retry-count': 0to track retry attempts.
export interface PublishOptions {
/** Whether to mark the message as persistent (default: true) */
persistent?: boolean;
/**
* Message-level headers. If non-empty, asserts the main exchange as "headers"
* so header-filtered subscribers can receive the message.
*/
headers?: Record<string, any>;
}
async publish<T>(
event: AppEvent<T>,
options: PublishOptions = {}
): Promise<void>;Workflow:
Determine
exchangeType:'headers'ifoptions.headersis non-empty; otherwise'topic'.await channel.assertExchange(exchangeName, exchangeType, { durable: true });Merge headers:
{ ...options.headers, 'x-retry-count': 0 }.Serialize
eventas JSON, then:const ok = channel.publish( exchangeName, event.type, // routingKey Buffer.from(JSON.stringify(event)), { persistent: options.persistent ?? true, headers: finalHeaders }, ); if (!ok) { await new Promise((resolve) => channel.once('drain', resolve)); }Log success or throw on error.
subscribe<T>(eventType: string, handler: EventHandler<T>, options?: SubscribeOptions): Promise<() => Promise<void>>
Subscribes to messages either by topic routing or headers filtering, based on options.headers. Wraps your handler to manage retries and dead-letter on failures.
export interface SubscribeOptions {
/**
* If provided, use this exact queue name (durable, non-exclusive).
* Otherwise, an exclusive, non-durable queue is created.
*/
queueName?: string;
/** Only used for topic subscriptions (if no headers). Defaults to eventType. */
routingKey?: string;
/** Number of unacked messages to prefetch. Defaults to 0 (no limit). */
prefetch?: number;
/** If true, RabbitMQ auto-acks on delivery. Defaults to false. */
noAck?: boolean;
/**
* If non-empty, assert a "headers" exchange and bind with:
* { 'x-match':'all', ...headers }. `routingKey` is ignored.
*/
headers?: Record<string, any>;
}
export type EventHandler<T> = (event: AppEvent<T>, raw: ConsumeMessage) => Promise<void> | void;
async subscribe<T>(
eventType: string,
handler: EventHandler<T>,
options: SubscribeOptions = {}
): Promise<() => Promise<void>>;Workflow:
- Determine
exchangeType:'headers'ifoptions.headersis non-empty; otherwise'topic'. await channel.assertExchange(exchangeName, exchangeType, { durable: true });Declare DLX + DLQ:
await channel.assertExchange(`${exchangeName}.dlx`, 'direct', { durable: true }); await channel.assertQueue(`${exchangeName}.dlq`, { durable: true }); await channel.bindQueue(`${exchangeName}.dlq`, `${exchangeName}.dlx`, `${exchangeName}.dlq`);Declare Retry Exchange + Retry Queue:
await channel.assertExchange(`${exchangeName}.retry.x`, 'direct', { durable: true }); await channel.assertQueue(`${exchangeName}.retry.queue`, { durable: true, arguments: { 'x-message-ttl': 30000, 'x-dead-letter-exchange': exchangeName, }, }); await channel.bindQueue(`${exchangeName}.retry.queue`, `${exchangeName}.retry.x`, '#');Declare consumer queue (either
options.queueName(durable) or an exclusive queue):- Attach argument:
'x-dead-letter-exchange':\${exchangeName}.retry.x``. const q = await channel.assertQueue(options.queueName ?? '', args);const queueName = q.queue;
- Attach argument:
Bind queue:
If
options.headersprovided:await channel.bindQueue(queueName, exchangeName, '', { 'x-match': 'all', ...options.headers });Else (topic mode):
const routingKey = options.routingKey ?? eventType; await channel.bindQueue(queueName, exchangeName, routingKey);
Set
prefetchif provided:if (typeof options.prefetch === 'number') { await channel.prefetch(options.prefetch); }Start consuming:
const consumer = await channel.consume( queueName, async (msg) => { if (!msg) return; try { const evt: AppEvent<T> = JSON.parse(msg.content.toString()); await handler(evt, msg); if (!consumeOpts.noAck) channel.ack(msg); } catch (err) { if (consumeOpts.noAck) return; // ❯ Determine x-retry-count: const incomingHeaders = msg.properties.headers ?? {}; const currentRetryCount = Number(incomingHeaders['x-retry-count'] ?? 0); const nextRetryCount = currentRetryCount + 1; if (currentRetryCount < MAX_RETRIES) { // ─── RETRY PATH ─────────────────────────────────────────── const newHeaders = { ...incomingHeaders, 'x-retry-count': nextRetryCount }; channel.publish(`${exchangeName}.retry.x`, msg.fields.routingKey, msg.content, { persistent: true, headers: newHeaders, }); channel.ack(msg); } else { // ─── DLQ PATH ────────────────────────────────────────────── const dlqHeaders = { ...incomingHeaders, 'x-original-exchange': msg.fields.exchange, 'x-original-routing-key': msg.fields.routingKey, 'x-final-reason': (err as Error).message, }; channel.publish(`${exchangeName}.dlx`, `${exchangeName}.dlq`, msg.content, { persistent: true, headers: dlqHeaders, }); channel.ack(msg); } } }, { noAck: options.noAck ?? false }, ); const unsubscribe = async () => { await channel.cancel(consumer.consumerTag); }; return unsubscribe;
MAX_RETRIESis a built-in constant (default: 5).- On final failure (retry ≥ 5), the message goes to DLQ (
${exchangeName}.dlq).
File: src/event.bus.client.ts
AppEvent & BaseEvent
A strongly-typed domain event model, using a UUID generator (e.g. @paralleldrive/cuid2) under the hood.
export interface IEvent<T> {
readonly id: string;
readonly type: string;
readonly source: string;
readonly timestamp: Date;
readonly payload: T;
readonly version: number;
readonly correlationId?: string;
readonly causationId?: string;
}
export interface AppEvent<T> extends IEvent<T> {
readonly aggregateId: string;
readonly aggregateName: string;
readonly aggregateVersion: number;
}
export abstract class BaseEvent<T> implements AppEvent<T> {
readonly id = `EVT-${this.constructor.name}-${createId()}`;
readonly type = this.constructor.name;
readonly source: string;
readonly timestamp = new Date();
readonly payload: T;
readonly version = 0;
readonly aggregateId: string;
readonly aggregateName: string;
readonly aggregateVersion = 0;
readonly correlationId?: string;
readonly causationId?: string;
constructor(
aggregateId: string,
aggregateName: string,
payload: T,
source: string,
correlationId?: string,
causationId?: string,
) {
this.aggregateId = aggregateId;
this.aggregateName = aggregateName;
this.payload = payload;
this.source = source;
this.correlationId = correlationId;
this.causationId = causationId;
}
}File: src/app.event.ts
PublisherModule
A lightweight façade over EventBusClient.publish(...). Accepts an AppEvent<T> and optional headers/persistence settings.
import { EventBusClient, PublishOptions } from '@/event.bus.client';
import { AppEvent } from '@/app.event';
export interface PublisherPublishOptions {
headers?: Record<string, any>;
persistent?: boolean;
}
export class PublisherModule {
constructor(private eventBus: EventBusClient) {}
async publishEvent<T>(event: AppEvent<T>, options: PublisherPublishOptions = {}): Promise<void> {
const publishOpts: PublishOptions = {
persistent: options.persistent ?? true,
headers: options.headers,
};
await this.eventBus.publish(event, publishOpts);
}
}File: src/publisher.module.ts
SubscriberModule
A lightweight façade over EventBusClient.subscribe(...). Accepts an event type, a handler, and optional queue/prefetch/noAck/header-filter settings.
import { EventBusClient } from '@/event.bus.client';
import { ConsumeMessage } from 'amqplib';
import { AppEvent } from '@/app.event';
export interface SubscribeOptions {
queueName?: string;
prefetch?: number;
noAck?: boolean;
headers?: Record<string, any>;
}
export type EventHandler<T> = (event: AppEvent<T>, raw: ConsumeMessage) => Promise<void> | void;
export class SubscriberModule {
constructor(private eventBus: EventBusClient) {}
async subscribeTo<T>(
eventType: string,
handler: EventHandler<T>,
options: SubscribeOptions = {},
): Promise<() => Promise<void>> {
const { queueName, prefetch = 5, noAck = false, headers } = options;
// Build SubscribeOptions for EventBusClient.subscribe()
const subscribeOpts: Record<string, any> = {
queueName,
prefetch,
noAck,
};
if (headers && Object.keys(headers).length > 0) {
subscribeOpts.headers = headers;
} else {
subscribeOpts.routingKey = eventType;
}
return this.eventBus.subscribe<T>(eventType, handler, subscribeOpts);
}
}File: src/subscriber.module.ts
Usage Examples
Below are common scenarios. Adjust exchangeName, queueName, or headers as needed.
Publishing an Event (Topic Exchange)
import { ConfigLoader } from './config';
import { ConnectionManager } from './connection.manager';
import { EventBusClient } from './event.bus.client';
import { PublisherModule } from './publisher.module';
import { BaseEvent } from './app.event';
async function main() {
// 1) Load config
const config = new ConfigLoader().getConfig();
// 2) Create & connect ConnectionManager
const cm = new ConnectionManager(config);
await cm.connect();
// 3) Use a topic exchange named "orders"
config.exchangeName = 'orders'; // override exchangeName if needed
const eventBus = new EventBusClient(cm, config);
const publisher = new PublisherModule(eventBus);
// 4) Define and publish an event
class OrderCreated extends BaseEvent<{ orderId: string; total: number }> {}
const event = new OrderCreated(
'ord-123',
'Order',
{ orderId: 'ord-123', total: 99.99 },
'order-service',
);
await publisher.publishEvent(event);
console.log('Published OrderCreated via topic exchange');
await cm.close();
}
main().catch(console.error);Subscribing to Events (Topic Exchange)
import { ConfigLoader } from './config';
import { ConnectionManager } from './connection.manager';
import { EventBusClient } from './event.bus.client';
import { SubscriberModule } from './subscriber.module';
import { AppEvent } from './app.event';
import { ConsumeMessage } from 'amqplib';
async function main() {
const config = new ConfigLoader().getConfig();
const cm = new ConnectionManager(config);
await cm.connect();
config.exchangeName = 'orders';
const eventBus = new EventBusClient(cm, config);
const subscriber = new SubscriberModule(eventBus);
const unsubscribe = await subscriber.subscribeTo<{ orderId: string; total: number }>(
'OrderCreated',
(event: AppEvent<{ orderId: string; total: number }>, raw: ConsumeMessage) => {
console.log('Received OrderCreated:', event.payload);
raw.channel.ack(raw);
},
{
queueName: 'order-service-queue',
prefetch: 10,
noAck: false,
},
);
// Later, to stop:
// await unsubscribe();
}
main().catch(console.error);Subscribing with Header Filters (Headers Exchange)
import { ConfigLoader } from './config';
import { ConnectionManager } from './connection.manager';
import { EventBusClient } from './event.bus.client';
import { PublisherModule } from './publisher.module';
import { SubscriberModule } from './subscriber.module';
import { BaseEvent, AppEvent } from './app.event';
import { ConsumeMessage } from 'amqplib';
async function main() {
const config = new ConfigLoader().getConfig();
const cm = new ConnectionManager(config);
await cm.connect();
// 1) Use an exchange for header-based routing
config.exchangeName = 'region-events';
// Subscriber: only for messages where header "region" = "eu-west"
const eventBusSubscriber = new EventBusClient(cm, config);
const subscriber = new SubscriberModule(eventBusSubscriber);
const unsubscribe = await subscriber.subscribeTo<{ foo: string }>(
'AnyEventType', // routingKey ignored for headers
(event: AppEvent<{ foo: string }>, raw: ConsumeMessage) => {
console.log('Received (region=eu-west):', event.payload);
raw.channel.ack(raw);
},
{
queueName: 'eu-west-queue',
prefetch: 5,
noAck: false,
headers: { region: 'eu-west' },
},
);
// Publisher: send with various region headers
const eventBusPublisher = new EventBusClient(cm, config);
const publisher = new PublisherModule(eventBusPublisher);
class RegionEvent extends BaseEvent<{ foo: string }> {}
const msg = new RegionEvent('agg1', 'RegionAggregate', { foo: 'bar' }, 'svc1');
// 1) region="us-east" ⇒ NOT received
await publisher.publishEvent(msg, {
headers: { region: 'us-east' },
});
// 2) region="eu-west" ⇒ received
await publisher.publishEvent(msg, {
headers: { region: 'eu-west' },
});
// To clean up:
// await unsubscribe();
// await cm.close();
}
main().catch(console.error);Testing
Unit Tests
- Location:
tests/unit/ - Framework: Jest
Coverage:
- ConnectionManager: Tests retry logic, channel creation, error/reconnect events.
- EventBusClient: Mocks a
ConnectionManager.getChannel(), verifiespublish()andsubscribe()routes (topic vs headers), JSON serialization, ack/nack logic, retry/DLQ behavior. - PublisherModule / SubscriberModule: Ensures correct forwarding of options to
EventBusClient.
Run unit tests:
npm test
# or
yarn testIntegration Tests
- Location:
tests/integration/event.bus.integration.test.ts - Requirement: A running RabbitMQ instance (default:
amqp://localhost).
Flow:
- Connect via
amqplib.connect(uri)and create a channel. - Instantiate
ConnectionManager,EventBusClient,PublisherModule,SubscriberModule. - Assert
integration-exchange(topic) and test publishing/subscribing a “TestEvent”. - Assert
integration-headers-exchange(headers) and test header-filtered publish/subscribe (region="eu-west"). - Verify that retry logic does not interfere with a successful path.
- Clean up queues/exchanges and close connections.
Run integration tests:
export AMQP_URI=amqp://localhost
npm run test:integration
# or
yarn test:integrationNote: The integration test will fail with a clear error if RabbitMQ is not reachable:
ConnectionManager: Failed to connect to RabbitMQ at "amqp://localhost": connect ECONNREFUSED 127.0.0.1:5672Docker Configuration for RabbitMQ
To launch a local RabbitMQ (with Management UI) for integration testing:
docker run -d --rm \
--hostname my-rabbit \
--name folksdo-rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management- 5672: AMQP port
- 15672: HTTP management console (default user/pass:
guest/guest)
After the container starts, visit http://localhost:15672 and log in with guest/guest.
License
MIT © Folksdo Inc