1.0.3 • Published 5 months ago

@folksdo/event-bus v1.0.3

Weekly downloads
-
License
ISC
Repository
github
Last release
5 months ago
# Folksdo Event Bus

A lightweight event bus built on RabbitMQ, providing:

- **ConfigLoader**: Parses environment variables (and constructor overrides) into a strongly‐typed `RabbitMQConfig`.
- **ConnectionManager**: Handles AMQP connection, channel pooling, and reconnection logic.
- **EventBusClient**: Core publish/subscribe API with JSON serialization, automatic exchange/queue declarations, acknowledgments, error handling, and built-in Retry & Dead-Letter support.
  - Supports both **topic**-style routing (via routing keys) and **headers**-exchange routing (via message-level headers).
  - Automatically manages a per-exchange “retry” exchange + retry queue (with TTL → back to the main exchange) and a Dead-Letter Exchange (DLX) + Dead-Letter Queue (DLQ).
- **AppEvent & BaseEvent**: Strongly-typed domain event model with metadata (ID, type, source, timestamp, aggregate fields).
- **TypeScript Interfaces**: Defines interfaces for event payloads, headers, and client options, ensuring compile-time safety.
- **PublisherModule**: Generic wrapper for publishing `AppEvent<T>` instances to the event bus (including optional message headers).
- **SubscriberModule**: Generic wrapper for subscribing to events by type or header filter, managing queue bindings and consumption.

This README outlines project setup, components (including Retry/DLQ flow), usage examples, and test procedures.

---

## Table of Contents

1. [Prerequisites](#prerequisites)
2. [Getting Started](#getting-started)
3. [Architecture & Components](#architecture--components)

   - [ConfigLoader](#configloader)
   - [ConnectionManager](#connectionmanager)
   - [EventBusClient](#eventbusclient)

     - [Publish Path (with x-retry-count)](#publish-path-with-x-retry-count)
     - [Subscribe Path (with Retry & DLQ)](#subscribe-path-with-retry--dlq)

   - [AppEvent & BaseEvent](#appevent--baseevent)
   - [PublisherModule](#publishermodule)
   - [SubscriberModule](#subscribermodule)

4. [Usage Examples](#usage-examples)

   - [Publishing an Event (Topic Exchange)](#publishing-an-event-topic-exchange)
   - [Subscribing to Events (Topic)](#subscribing-to-events-topic)
   - [Subscribing with Header Filters](#subscribing-with-header-filters)

5. [Testing](#testing)

   - [Unit Tests](#unit-tests)
   - [Integration Tests](#integration-tests)

6. [Docker Configuration for RabbitMQ](#docker-configuration-for-rabbitmq)
7. [License](#license)

---

## Prerequisites

- **Node.js** (v14 or higher)
- **npm** or **yarn**
- **RabbitMQ** (local or remote instance)
- **Docker** (for running a RabbitMQ container during integration tests)

By default, the code uses `amqp://localhost`. You can override with:

```bash
export AMQP_URI=amqp://your-rabbit-host
```

You can also set any of the following in a .env file (or via process.env):

# ==============================
# RabbitMQ Connection Settings
# ==============================

# If you prefer a single URI, set this (overrides host/port):
# RABBITMQ_URL=amqp://guest:guest@localhost:5672/

RABBITMQ_HOST=localhost
RABBITMQ_PORT=5672

RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest
RABBITMQ_VHOST=/

# ==============================
# Exchange & Queue Configuration
# ==============================

RABBITMQ_EXCHANGE_NAME=default_exchange
RABBITMQ_QUEUE_NAME=default_queue

# ==============================
# Consumer Settings
# ==============================

RABBITMQ_PREFETCH_COUNT=10
RABBITMQ_RECONNECT_DELAY_MS=5000

Getting Started

  1. Clone the repository:

    git clone https://github.com/your-org/folksdo-event-bus.git
    cd folksdo-event-bus
  2. Install dependencies:

    npm install
    # or
    yarn install
  3. Build the project (TypeScript → JavaScript):

    npm run build
    # or
    yarn build
  4. Run unit tests (no RabbitMQ needed):

    npm test
    # or
    yarn test
  5. Run integration tests (requires RabbitMQ):

    Make sure RabbitMQ is running (see Docker section below).

    export AMQP_URI=amqp://localhost
    npm run test:integration
    # or
    yarn test:integration
  6. Run in development mode (uses ts-node on src/index.ts):

    npm run dev
    # or
    yarn dev
  7. Start in production mode (after npm run build):

    npm start
    # or
    yarn start

Architecture & Components

ConfigLoader

Centralizes environment parsing and default values into a strongly-typed config object. It uses Zod for runtime validation and enforces:

export interface RabbitMQConfig {
  url?: string; // Full AMQP URI (optional; overrides host/port)
  host: string; // If no URL is provided, the host to connect to
  port: number; // If no URL, the port number
  username: string; // RabbitMQ username
  password: string; // RabbitMQ password
  vhost: string; // Virtual host ("/" by default)
  exchangeName: string; // Main exchange name
  queueName: string; // Default queue name (unused by EventBusClient, but available)
  prefetchCount: number; // Number of unacked messages to prefetch
  reconnectDelayMs: number; // Base delay for exponential backoff on reconnect
}
  • Default values are provided if environment variables are missing.
  • Constructor overrides can be passed as a partial RabbitMQConfig, merging with defaults and process.env.
  • Usage:

    import { ConfigLoader } from './config';
    
    const config = new ConfigLoader().getConfig();
    // config is now a validated RabbitMQConfig.

File: src/config.ts


ConnectionManager

Manages a single RabbitMQ connection (typed as amqp.ChannelModel) with automatic reconnection, exponential backoff, and a channel pool. Emits 'reconnect' on successful connection or reconnection, and 'error' on failure.

Key Responsibilities

  • Builds the AMQP URI: Uses config.url if present, otherwise constructs amqp://username:password@host:port/vhost.
  • Retries on Failure: Attempts retryAttempts (default 5) with exponential backoff based on config.reconnectDelayMs.
  • Channel Pool: Exposes getChannel(), which ensures a live connection and returns a new amqp.Channel. Tracks open channels in a pool for later cleanup.
  • Graceful Shutdown: close() closes all pooled channels and the underlying connection.

Public Methods

class ConnectionManager extends EventEmitter {
  constructor(config: RabbitMQConfig, retryAttempts = 5);

  // Connects to RabbitMQ (or retries until success). Throws a descriptive
  // error if unable to connect.
  public async connect(): Promise<void>;

  // Ensures connection is live, then returns a new channel.
  public async getChannel(): Promise<amqp.Channel>;

  // Closes all channels and the connection.
  public async close(): Promise<void>;
}
  • On failure to connect (after all retries), connect() will throw:

    ConnectionManager: Failed to connect to RabbitMQ at "<uri>": <original error message>

File: src/connection.manager.ts


EventBusClient

Provides a high-level publish/subscribe API on top of ConnectionManager. Automatically declares exchanges, queues, retry/DLQ topology, and handles message routing & error flows. Supports both topic and headers exchange types.

Constructor

new EventBusClient(connectionManager: ConnectionManager, config: RabbitMQConfig);
  • connectionManager: Instance of ConnectionManager.
  • config: A validated RabbitMQConfig (used primarily to read exchangeName, prefetchCount, and other settings).

Retry & Dead-Letter Topology (auto-declared per exchange)

For each exchangeName (e.g. "orders"), EventBusClient automatically asserts:

  1. Retry Exchange

    • Name: ${exchangeName}.retry.x (direct exchange)
    • Bound to: Retry Queue
  2. Retry Queue

    • Name: ${exchangeName}.retry.queue
    • Arguments:

      • x-message-ttl (default: 30000 ms) – how long to hold a retried message before dead-lettering back to main exchange.
      • x-dead-letter-exchange: ${exchangeName} – after TTL, the message returns to main exchange.
    • Bound to retryExchange with routing key '#' (catch-all).

  3. Dead-Letter Exchange (DLX)

    • Name: ${exchangeName}.dlx (direct exchange)
    • Bound to: Dead-Letter Queue (DLQ)
  4. Dead-Letter Queue (DLQ)

    • Name: ${exchangeName}.dlq
    • Bound to dlxExchange with routing key equal to queue name (${exchangeName}.dlq).
    • Holds messages that have exhausted retry attempts.

Note: TTL can be customized in code. For exponential backoff, declare multiple retry queues with increasing TTLs.


publish<T>(event: AppEvent<T>, options?: PublishOptions): Promise<void>

Publishes an AppEvent<T> to RabbitMQ with optional persistence and headers. Always injects:

'x-retry-count': 0

to track retry attempts.

export interface PublishOptions {
  /** Whether to mark the message as persistent (default: true) */
  persistent?: boolean;
  /**
   * Message-level headers. If non-empty, asserts the main exchange as "headers"
   * so header-filtered subscribers can receive the message.
   */
  headers?: Record<string, any>;
}

async publish<T>(
  event: AppEvent<T>,
  options: PublishOptions = {}
): Promise<void>;

Workflow:

  1. Determine exchangeType: 'headers' if options.headers is non-empty; otherwise 'topic'.

  2. await channel.assertExchange(exchangeName, exchangeType, { durable: true });

  3. Merge headers: { ...options.headers, 'x-retry-count': 0 }.

  4. Serialize event as JSON, then:

    const ok = channel.publish(
      exchangeName,
      event.type, // routingKey
      Buffer.from(JSON.stringify(event)),
      { persistent: options.persistent ?? true, headers: finalHeaders },
    );
    if (!ok) {
      await new Promise((resolve) => channel.once('drain', resolve));
    }
  5. Log success or throw on error.


subscribe<T>(eventType: string, handler: EventHandler<T>, options?: SubscribeOptions): Promise<() => Promise<void>>

Subscribes to messages either by topic routing or headers filtering, based on options.headers. Wraps your handler to manage retries and dead-letter on failures.

export interface SubscribeOptions {
  /**
   * If provided, use this exact queue name (durable, non-exclusive).
   * Otherwise, an exclusive, non-durable queue is created.
   */
  queueName?: string;

  /** Only used for topic subscriptions (if no headers). Defaults to eventType. */
  routingKey?: string;

  /** Number of unacked messages to prefetch. Defaults to 0 (no limit). */
  prefetch?: number;

  /** If true, RabbitMQ auto-acks on delivery. Defaults to false. */
  noAck?: boolean;

  /**
   * If non-empty, assert a "headers" exchange and bind with:
   * { 'x-match':'all', ...headers }. `routingKey` is ignored.
   */
  headers?: Record<string, any>;
}

export type EventHandler<T> = (event: AppEvent<T>, raw: ConsumeMessage) => Promise<void> | void;

async subscribe<T>(
  eventType: string,
  handler: EventHandler<T>,
  options: SubscribeOptions = {}
): Promise<() => Promise<void>>;

Workflow:

  1. Determine exchangeType: 'headers' if options.headers is non-empty; otherwise 'topic'.
  2. await channel.assertExchange(exchangeName, exchangeType, { durable: true });
  3. Declare DLX + DLQ:

    await channel.assertExchange(`${exchangeName}.dlx`, 'direct', { durable: true });
    await channel.assertQueue(`${exchangeName}.dlq`, { durable: true });
    await channel.bindQueue(`${exchangeName}.dlq`, `${exchangeName}.dlx`, `${exchangeName}.dlq`);
  4. Declare Retry Exchange + Retry Queue:

    await channel.assertExchange(`${exchangeName}.retry.x`, 'direct', { durable: true });
    await channel.assertQueue(`${exchangeName}.retry.queue`, {
      durable: true,
      arguments: {
        'x-message-ttl': 30000,
        'x-dead-letter-exchange': exchangeName,
      },
    });
    await channel.bindQueue(`${exchangeName}.retry.queue`, `${exchangeName}.retry.x`, '#');
  5. Declare consumer queue (either options.queueName (durable) or an exclusive queue):

    • Attach argument: 'x-dead-letter-exchange':\${exchangeName}.retry.x``.
    • const q = await channel.assertQueue(options.queueName ?? '', args);
    • const queueName = q.queue;
  6. Bind queue:

    • If options.headers provided:

      await channel.bindQueue(queueName, exchangeName, '', { 'x-match': 'all', ...options.headers });
    • Else (topic mode):

      const routingKey = options.routingKey ?? eventType;
      await channel.bindQueue(queueName, exchangeName, routingKey);
  7. Set prefetch if provided:

    if (typeof options.prefetch === 'number') {
      await channel.prefetch(options.prefetch);
    }
  8. Start consuming:

    const consumer = await channel.consume(
      queueName,
      async (msg) => {
        if (!msg) return;
    
        try {
          const evt: AppEvent<T> = JSON.parse(msg.content.toString());
          await handler(evt, msg);
          if (!consumeOpts.noAck) channel.ack(msg);
        } catch (err) {
          if (consumeOpts.noAck) return;
          // ❯ Determine x-retry-count:
          const incomingHeaders = msg.properties.headers ?? {};
          const currentRetryCount = Number(incomingHeaders['x-retry-count'] ?? 0);
          const nextRetryCount = currentRetryCount + 1;
    
          if (currentRetryCount < MAX_RETRIES) {
            // ─── RETRY PATH ───────────────────────────────────────────
            const newHeaders = { ...incomingHeaders, 'x-retry-count': nextRetryCount };
            channel.publish(`${exchangeName}.retry.x`, msg.fields.routingKey, msg.content, {
              persistent: true,
              headers: newHeaders,
            });
            channel.ack(msg);
          } else {
            // ─── DLQ PATH ──────────────────────────────────────────────
            const dlqHeaders = {
              ...incomingHeaders,
              'x-original-exchange': msg.fields.exchange,
              'x-original-routing-key': msg.fields.routingKey,
              'x-final-reason': (err as Error).message,
            };
            channel.publish(`${exchangeName}.dlx`, `${exchangeName}.dlq`, msg.content, {
              persistent: true,
              headers: dlqHeaders,
            });
            channel.ack(msg);
          }
        }
      },
      { noAck: options.noAck ?? false },
    );
    
    const unsubscribe = async () => {
      await channel.cancel(consumer.consumerTag);
    };
    
    return unsubscribe;
  • MAX_RETRIES is a built-in constant (default: 5).
  • On final failure (retry ≥ 5), the message goes to DLQ (${exchangeName}.dlq).

File: src/event.bus.client.ts


AppEvent & BaseEvent

A strongly-typed domain event model, using a UUID generator (e.g. @paralleldrive/cuid2) under the hood.

export interface IEvent<T> {
  readonly id: string;
  readonly type: string;
  readonly source: string;
  readonly timestamp: Date;
  readonly payload: T;
  readonly version: number;
  readonly correlationId?: string;
  readonly causationId?: string;
}

export interface AppEvent<T> extends IEvent<T> {
  readonly aggregateId: string;
  readonly aggregateName: string;
  readonly aggregateVersion: number;
}

export abstract class BaseEvent<T> implements AppEvent<T> {
  readonly id = `EVT-${this.constructor.name}-${createId()}`;
  readonly type = this.constructor.name;
  readonly source: string;
  readonly timestamp = new Date();
  readonly payload: T;
  readonly version = 0;
  readonly aggregateId: string;
  readonly aggregateName: string;
  readonly aggregateVersion = 0;
  readonly correlationId?: string;
  readonly causationId?: string;

  constructor(
    aggregateId: string,
    aggregateName: string,
    payload: T,
    source: string,
    correlationId?: string,
    causationId?: string,
  ) {
    this.aggregateId = aggregateId;
    this.aggregateName = aggregateName;
    this.payload = payload;
    this.source = source;
    this.correlationId = correlationId;
    this.causationId = causationId;
  }
}

File: src/app.event.ts


PublisherModule

A lightweight façade over EventBusClient.publish(...). Accepts an AppEvent<T> and optional headers/persistence settings.

import { EventBusClient, PublishOptions } from '@/event.bus.client';
import { AppEvent } from '@/app.event';

export interface PublisherPublishOptions {
  headers?: Record<string, any>;
  persistent?: boolean;
}

export class PublisherModule {
  constructor(private eventBus: EventBusClient) {}

  async publishEvent<T>(event: AppEvent<T>, options: PublisherPublishOptions = {}): Promise<void> {
    const publishOpts: PublishOptions = {
      persistent: options.persistent ?? true,
      headers: options.headers,
    };
    await this.eventBus.publish(event, publishOpts);
  }
}

File: src/publisher.module.ts


SubscriberModule

A lightweight façade over EventBusClient.subscribe(...). Accepts an event type, a handler, and optional queue/prefetch/noAck/header-filter settings.

import { EventBusClient } from '@/event.bus.client';
import { ConsumeMessage } from 'amqplib';
import { AppEvent } from '@/app.event';

export interface SubscribeOptions {
  queueName?: string;
  prefetch?: number;
  noAck?: boolean;
  headers?: Record<string, any>;
}

export type EventHandler<T> = (event: AppEvent<T>, raw: ConsumeMessage) => Promise<void> | void;

export class SubscriberModule {
  constructor(private eventBus: EventBusClient) {}

  async subscribeTo<T>(
    eventType: string,
    handler: EventHandler<T>,
    options: SubscribeOptions = {},
  ): Promise<() => Promise<void>> {
    const { queueName, prefetch = 5, noAck = false, headers } = options;

    // Build SubscribeOptions for EventBusClient.subscribe()
    const subscribeOpts: Record<string, any> = {
      queueName,
      prefetch,
      noAck,
    };

    if (headers && Object.keys(headers).length > 0) {
      subscribeOpts.headers = headers;
    } else {
      subscribeOpts.routingKey = eventType;
    }

    return this.eventBus.subscribe<T>(eventType, handler, subscribeOpts);
  }
}

File: src/subscriber.module.ts


Usage Examples

Below are common scenarios. Adjust exchangeName, queueName, or headers as needed.

Publishing an Event (Topic Exchange)

import { ConfigLoader } from './config';
import { ConnectionManager } from './connection.manager';
import { EventBusClient } from './event.bus.client';
import { PublisherModule } from './publisher.module';
import { BaseEvent } from './app.event';

async function main() {
  // 1) Load config
  const config = new ConfigLoader().getConfig();
  // 2) Create & connect ConnectionManager
  const cm = new ConnectionManager(config);
  await cm.connect();

  // 3) Use a topic exchange named "orders"
  config.exchangeName = 'orders'; // override exchangeName if needed
  const eventBus = new EventBusClient(cm, config);
  const publisher = new PublisherModule(eventBus);

  // 4) Define and publish an event
  class OrderCreated extends BaseEvent<{ orderId: string; total: number }> {}
  const event = new OrderCreated(
    'ord-123',
    'Order',
    { orderId: 'ord-123', total: 99.99 },
    'order-service',
  );

  await publisher.publishEvent(event);
  console.log('Published OrderCreated via topic exchange');

  await cm.close();
}

main().catch(console.error);

Subscribing to Events (Topic Exchange)

import { ConfigLoader } from './config';
import { ConnectionManager } from './connection.manager';
import { EventBusClient } from './event.bus.client';
import { SubscriberModule } from './subscriber.module';
import { AppEvent } from './app.event';
import { ConsumeMessage } from 'amqplib';

async function main() {
  const config = new ConfigLoader().getConfig();
  const cm = new ConnectionManager(config);
  await cm.connect();

  config.exchangeName = 'orders';
  const eventBus = new EventBusClient(cm, config);
  const subscriber = new SubscriberModule(eventBus);

  const unsubscribe = await subscriber.subscribeTo<{ orderId: string; total: number }>(
    'OrderCreated',
    (event: AppEvent<{ orderId: string; total: number }>, raw: ConsumeMessage) => {
      console.log('Received OrderCreated:', event.payload);
      raw.channel.ack(raw);
    },
    {
      queueName: 'order-service-queue',
      prefetch: 10,
      noAck: false,
    },
  );

  // Later, to stop:
  // await unsubscribe();
}

main().catch(console.error);

Subscribing with Header Filters (Headers Exchange)

import { ConfigLoader } from './config';
import { ConnectionManager } from './connection.manager';
import { EventBusClient } from './event.bus.client';
import { PublisherModule } from './publisher.module';
import { SubscriberModule } from './subscriber.module';
import { BaseEvent, AppEvent } from './app.event';
import { ConsumeMessage } from 'amqplib';

async function main() {
  const config = new ConfigLoader().getConfig();
  const cm = new ConnectionManager(config);
  await cm.connect();

  // 1) Use an exchange for header-based routing
  config.exchangeName = 'region-events';

  // Subscriber: only for messages where header "region" = "eu-west"
  const eventBusSubscriber = new EventBusClient(cm, config);
  const subscriber = new SubscriberModule(eventBusSubscriber);

  const unsubscribe = await subscriber.subscribeTo<{ foo: string }>(
    'AnyEventType', // routingKey ignored for headers
    (event: AppEvent<{ foo: string }>, raw: ConsumeMessage) => {
      console.log('Received (region=eu-west):', event.payload);
      raw.channel.ack(raw);
    },
    {
      queueName: 'eu-west-queue',
      prefetch: 5,
      noAck: false,
      headers: { region: 'eu-west' },
    },
  );

  // Publisher: send with various region headers
  const eventBusPublisher = new EventBusClient(cm, config);
  const publisher = new PublisherModule(eventBusPublisher);

  class RegionEvent extends BaseEvent<{ foo: string }> {}
  const msg = new RegionEvent('agg1', 'RegionAggregate', { foo: 'bar' }, 'svc1');

  // 1) region="us-east" ⇒ NOT received
  await publisher.publishEvent(msg, {
    headers: { region: 'us-east' },
  });

  // 2) region="eu-west" ⇒ received
  await publisher.publishEvent(msg, {
    headers: { region: 'eu-west' },
  });

  // To clean up:
  // await unsubscribe();
  // await cm.close();
}

main().catch(console.error);

Testing

Unit Tests

  • Location: tests/unit/
  • Framework: Jest
  • Coverage:

    • ConnectionManager: Tests retry logic, channel creation, error/reconnect events.
    • EventBusClient: Mocks a ConnectionManager.getChannel(), verifies publish() and subscribe() routes (topic vs headers), JSON serialization, ack/nack logic, retry/DLQ behavior.
    • PublisherModule / SubscriberModule: Ensures correct forwarding of options to EventBusClient.

Run unit tests:

npm test
# or
yarn test

Integration Tests

  • Location: tests/integration/event.bus.integration.test.ts
  • Requirement: A running RabbitMQ instance (default: amqp://localhost).

Flow:

  1. Connect via amqplib.connect(uri) and create a channel.
  2. Instantiate ConnectionManager, EventBusClient, PublisherModule, SubscriberModule.
  3. Assert integration-exchange (topic) and test publishing/subscribing a “TestEvent”.
  4. Assert integration-headers-exchange (headers) and test header-filtered publish/subscribe (region="eu-west").
  5. Verify that retry logic does not interfere with a successful path.
  6. Clean up queues/exchanges and close connections.

Run integration tests:

export AMQP_URI=amqp://localhost
npm run test:integration
# or
yarn test:integration

Note: The integration test will fail with a clear error if RabbitMQ is not reachable:

ConnectionManager: Failed to connect to RabbitMQ at "amqp://localhost": connect ECONNREFUSED 127.0.0.1:5672

Docker Configuration for RabbitMQ

To launch a local RabbitMQ (with Management UI) for integration testing:

docker run -d --rm \
  --hostname my-rabbit \
  --name folksdo-rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  rabbitmq:3-management
  • 5672: AMQP port
  • 15672: HTTP management console (default user/pass: guest/guest)

After the container starts, visit http://localhost:15672 and log in with guest/guest.


License

MIT © Folksdo Inc

1.0.3

5 months ago

1.0.2

5 months ago

1.0.1

5 months ago

1.0.0

5 months ago