1.0.0 • Published 5 months ago

@folksdo/eventstore v1.0.0

Weekly downloads
-
License
MIT
Repository
github
Last release
5 months ago

MongoDB-Backed EventStore Library

A lightweight EventStore library for Node.js, supporting both in-memory and MongoDB persistence, with optional snapshotting.

Table of Contents

  1. Overview
  2. Features
  3. Installation
  4. Folder Structure
  5. Configuration • Environment Variables • ConfigLoader API • Default Values
  6. Usage • Initializing the Client • Appending Events • Rehydrating an Aggregate • Manual Snapshotting • Deleting an Aggregate
  7. Event and Snapshot Models
  8. In-Memory vs. MongoDB Backends
  9. Logging
  10. Error Handling
  11. Testing
  12. Contributing
  13. License

Overview

This library provides a simple, pluggable EventStore implementation that supports:

• Event sourcing for domain aggregates • Optimistic concurrency using aggregateVersion • Optional snapshotting (take a snapshot every N events or manually) • MongoDB persistence (with retry/backoff on connection failures) • In-memory persistence (for testing or ephemeral usage) • A TypeScript-friendly API, with runtime validation of configuration

Use it to build event-sourced services in Node.js projects, especially when you want to store streams of domain events in MongoDB (or use a simpler in-memory store for unit and integration tests).

Features

• EventStoreClient: single-entry point for connecting, appending events, rehydrating aggregates, snapshotting, and deletion • ConfigLoader: zod-based runtime validation of process.env plus code overrides, with sensible defaults • MongoConnectionManager: retry/backoff logic on connect, plus logging • MongoEventStore: stores events in MongoDB, enforces a unique (aggregateId, aggregateVersion) index, throws ConcurrencyException on version conflict • MongoSnapshotStore: stores snapshots in MongoDB, keyed by (aggregateId, version) • InMemoryEventStore and InMemorySnapshotStore: drop-in replacements for running tests without a real MongoDB instance • snapshotEveryN: automatically take a snapshot on every “Nth” event for an aggregate • snapshotReducer: user-supplied function to rebuild aggregate state from a series of events • Comprehensive logging (info, warn, error) on all major operations • Jest integration tests (using mongodb-memory-server) included

Installation

Run:

npm install your-org-eventstore

Ensure that peer dependencies are installed:

npm install mongodb zod

For integration tests:

npm install --save-dev jest ts-jest @types/jest mongodb-memory-server

Folder Structure

src/lib/ client/ event.store.client.ts config/ interface.ts (EventStoreConfig and defaultConfig) config.ts (ConfigLoader, zod-based parsing of ENV plus overrides) connection/ mongo.connection.manager.ts exception/ app.exception.ts (BasicException, PersistenceException, ConcurrencyException, DatabaseUnavailableException, etc.) eventstore/ IEventStore.ts InMemoryEventStore.ts MongoEventStore.ts ISnapshotStore.ts in.memory.snapshot.store.ts mongodb.snapshot.store.ts model/ app.event.ts (BaseEvent, AppEvent, IEvent interfaces) snapshot.ts (Snapshot interface) logger/ index.ts (A simple logging utility, e.g. a Winston wrapper) index.ts (Optional top-level exports)

Configuration

Environment Variables

All ENV keys are prefixed with EVENTSTORE_.

• EVENTSTORE_MONGO_URI Full MongoDB URI (e.g. mongodb://user\:pass\@host:27017/dbname). Required if you don’t supply mongoUri in code overrides. • EVENTSTORE_DB_NAME Name of the Mongo database. Default: “eventstore”. • EVENTSTORE_EVENTS_COLLECTION Name of the collection for events. Default: “events”. • EVENTSTORE_SNAPSHOTS_COLLECTION Name of the collection for snapshots. Default: “snapshots”. • EVENTSTORE_WRITE_CONCERN_W Write concern “w” (either “majority” or a numeric w value). Default: “majority”. • EVENTSTORE_WRITE_CONCERN_J Journal flag (“true” or “false”). Default: true. • EVENTSTORE_READ_CONCERN Read concern level (“local” or “majority”). Default: “majority”. • EVENTSTORE_RETRY_MAX_RETRIES Maximum number of retry attempts on Mongo connection. Default: 3. • EVENTSTORE_RETRY_INITIAL_DELAY_MS Initial backoff in milliseconds before a retry. Default: 100. • EVENTSTORE_RETRY_MAX_DELAY_MS Maximum backoff in milliseconds. Default: 1000. • EVENTSTORE_SNAPSHOT_EVERY_N Take a snapshot automatically every N events (whenever aggregateVersion % N == 0). Default: undefined (no automatic snapshot).

ConfigLoader API

import { ConfigLoader, EventStoreConfig } from "./config";

const overrides: Partial<EventStoreConfig> = {
  mongoUri: "mongodb://user:pass@localhost:27017/mydb",
  snapshotEveryN: 10,
  snapshotReducer: myReducerFunction, // must be provided if snapshotEveryN is set
};

const loader = new ConfigLoader(overrides);
const config = loader.getConfig();

ConfigLoader merges process.env (validated + defaulted by zod) with any overrides passed to its constructor. If neither EVENTSTORE_MONGO_URI nor overrides.mongoUri is present, getConfig() throws an error. The returned config is typed as EventStoreConfig.

Default Values

If you do not supply certain fields explicitly or via ENV, these defaults apply:

• databaseName: “eventstore” • eventsCollectionName: “events” • snapshotsCollectionName: “snapshots” • writeConcern: { w: “majority”, j: true } • readConcernLevel: “majority” • retryOptions.maxRetries: 3 • retryOptions.initialDelayMs: 100 • retryOptions.maxDelayMs: 1000 • snapshotEveryN: undefined (no automatic snapshot) • snapshotReducer: undefined (required if snapshotEveryN is used)

Usage

  1. Initializing the Client

    import { EventStoreClient } from "./client/event.store.client"; import { ConfigLoader } from "./config"; import { applyOrderEvents, OrderState } from "../tests/order.aggregate";

    async function start() { const overrides = { mongoUri: process.env.MONGO_URI!, snapshotEveryN: 5, snapshotReducer: applyOrderEvents, };

    const config = new ConfigLoader(overrides).getConfig(); const client = new EventStoreClient(config); await client.connect(); // use client.appendEvent, client.rehydrateAggregate, etc. }

If you prefer code-only configuration, pass every field in overrides instead of relying on ENV.

  1. Appending Events

Your domain event classes must extend BaseEvent. Example in src/lib/model/app.event.ts:

export interface AppEvent<T> {
  id: string;          // “EVT-<EventType>-<cuid2>”
  type: string;        // constructor name, e.g. “OrderItemAdded”
  source: string;      // e.g. “order-service”
  timestamp: Date;     // auto-generated
  payload: T;          // event-specific data
  version: number;     // for event-schema versioning (default 0)
  aggregateId: string; // which aggregate this belongs to
  aggregateName: string; // e.g. “Order”
  aggregateVersion: number; // version within that aggregate
  correlationId?: string;
  causationId?: string;
}

export abstract class BaseEvent<T> implements AppEvent<T> {
  id = `EVT-${this.constructor.name}-${createId()}`;
  type = this.constructor.name;
  source: string;
  timestamp = new Date();
  payload: T;
  version = 0;

  aggregateId: string;
  aggregateName: string;
  aggregateVersion = 0;

  correlationId?: string;
  causationId?: string;

  constructor(
    aggregateId: string,
    aggregateName: string,
    payload: T,
    source: string,
    correlationId?: string,
    causationId?: string
  ) {
    this.aggregateId = aggregateId;
    this.aggregateName = aggregateName;
    this.payload = payload;
    this.source = source;
    this.correlationId = correlationId;
    this.causationId = causationId;
  }
}

In your application code:

import { BaseEvent } from "./model/app.event";

class OrderItemAdded extends BaseEvent<{ id: string; price: number }> {}

const aggId = "order-123";
const evt = new OrderItemAdded(aggId, "Order", { id: "itemA", price: 25 }, "order-service");
Object.assign(evt, { aggregateVersion: 1 });

await client.appendEvent(evt);

– Versioning: You must set event.aggregateVersion before calling appendEvent(). – Concurrency: If aggregateVersion ≠ lastVersion + 1, a ConcurrencyException is thrown. Duplicate key errors (code 11000) also throw ConcurrencyException.

  1. Rehydrating an Aggregate

To rebuild an aggregate’s current state:

interface OrderState {
  total: number;
  items: { id: string; price: number }[];
}

function applyOrderEvents(
  prevState: OrderState | null,
  events: AppEvent<any>[]
): OrderState {
  let state = prevState
    ? { ...prevState, items: [...prevState.items] }
    : { total: 0, items: [] };
  for (const e of events) {
    switch (e.type) {
      case "OrderItemAdded": {
        const { id, price } = e.payload;
        state.items.push({ id, price });
        state.total += price;
        break;
      }
      case "OrderItemRemoved": {
        const { id } = e.payload;
        const idx = state.items.findIndex((x) => x.id === id);
        if (idx >= 0) {
          state.total -= state.items[idx].price;
          state.items.splice(idx, 1);
        }
        break;
      }
      default:
        break;
    }
  }
  return state;
}

const { snapshot, eventsToApply, lastVersion } = await client.rehydrateAggregate<OrderState>(aggId);

if (snapshot) {
  const fullState = applyOrderEvents(snapshot.state, eventsToApply);
  console.log(fullState);
} else {
  const fullState = applyOrderEvents(null, eventsToApply);
  console.log(fullState);
}

– snapshot: the most recent Snapshot (or null). – eventsToApply: all events with version > snapshot.version. – lastVersion: highest version covered (either snapshot.version or the highest event returned).

  1. Manual Snapshotting

If you need to take a snapshot at a specific time (not tied to “Nth event”):

const { snapshot, eventsToApply, lastVersion } = await client.rehydrateAggregate<OrderState>(aggId);

const newState = applyOrderEvents(snapshot ? snapshot.state : null, eventsToApply);

const manualSnapshot: Snapshot<OrderState> = {
  aggregateId: aggId,
  version: lastVersion,
  state: newState,
  timestamp: new Date(),
};

await client.saveSnapshot(manualSnapshot);

– This writes a new document in the snapshots collection. – Subsequent rehydrateAggregate() will start from this snapshot and only return newer events.

  1. Deleting an Aggregate

To remove all events and snapshots for an aggregate:

await client.deleteAggregate(aggId);

Internally, this calls eventStore.deleteEventsForAggregate(aggId) and snapshotStore.deleteSnapshotsForAggregate(aggId).

Event and Snapshot Models

AppEvent (in src/lib/model/app.event.ts)

export interface IEvent<T> {
  readonly id: string;            // “EVT-<EventType>-<cuid2>”
  readonly type: string;          // Constructor name, e.g. “OrderCreated”
  readonly source: string;        // e.g. “order-service”
  readonly timestamp: Date;       // Auto-generated
  readonly payload: T;            // Domain data
  readonly version: number;       // Event-schema versioning (default 0)
  readonly correlationId?: string;
  readonly causationId?: string;
}

export interface AppEvent<T> extends IEvent<T> {
  readonly aggregateId: string;      // Which aggregate this belongs to
  readonly aggregateName: string;    // e.g. “Order”
  readonly aggregateVersion: number; // Version within that aggregate
}

export abstract class BaseEvent<T> implements AppEvent<T> {
  id = `EVT-${this.constructor.name}-${createId()}`;
  type = this.constructor.name;
  source: string;
  timestamp = new Date();
  payload: T;
  version = 0;
  aggregateId: string;
  aggregateName: string;
  aggregateVersion = 0;
  correlationId?: string;
  causationId?: string;

  constructor(
    aggregateId: string,
    aggregateName: string,
    payload: T,
    source: string,
    correlationId?: string,
    causationId?: string
  ) {
    this.aggregateId = aggregateId;
    this.aggregateName = aggregateName;
    this.payload = payload;
    this.source = source;
    this.correlationId = correlationId;
    this.causationId = causationId;
  }
}

Snapshot (in src/lib/model/snapshot.ts)

export interface Snapshot<T> {
  aggregateId: string;
  version: number;   // Which aggregateVersion this snapshot covers
  state: T;          // Fully reconstructed state up through version
  timestamp: Date;   // When this snapshot was taken
}

In-Memory vs. MongoDB Backends

InMemoryEventStore & InMemorySnapshotStore

• Useful for unit tests, local development, or ephemeral usage • Implements IEventStore and ISnapshotStore, but stores data in a Map • No external dependencies—just import and use:

import { InMemoryEventStore } from "./eventstore/InMemoryEventStore";
import { InMemorySnapshotStore } from "./eventstore/in.memory.snapshot.store";

const eventStore = new InMemoryEventStore();
const snapshotStore = new InMemorySnapshotStore();

MongoEventStore & MongoSnapshotStore

• Require a real (or in-memory) MongoDB instance • Enforce unique index on (aggregateId, aggregateVersion) for events • Enforce unique index on (aggregateId, version) for snapshots • For integration tests, use mongodb-memory-server to spin up an ephemeral Mongo

Logging

This library uses a getLogger(componentName) utility to produce structured logs:

• info: successful operations (e.g. “connected to MongoDB,” “event inserted,” “snapshot saved”) • warn: recoverable issues (e.g. version conflict, a failed connection attempt) • error: unrecoverable failures (e.g. persistence errors, database unavailable after retries)

Log format:

2025-06-03T03:33:26.916Z [DEV:<Component>.<method>] info: <message> { <metadata> }

You can swap out getLogger for your preferred logging framework if needed.

Error Handling

All storage operations throw BasicException or its subclasses:

• PersistenceException (statusCode 500) Thrown on read/write failures that are not version conflicts. Wraps the original error. • ConcurrencyException (statusCode 409) Thrown if aggregateVersion ≠ lastVersion + 1 or on duplicate key error (Mongo code 11000). • DatabaseUnavailableException (statusCode 503) Thrown if MongoConnectionManager.connect() exhausts all retries.

When calling public methods on EventStoreClient, catch and handle these exceptions.

Testing

Unit & Integration Tests

• In-memory tests: verify event ordering, version conflicts, and replay logic without MongoDB. • MongoDB integration: run end-to-end tests against MongoConnectionManager, MongoEventStore, MongoSnapshotStore, and EventStoreClient using mongodb-memory-server.

Basic test coverage includes:

  1. Connecting to MongoDB (success and failure)
  2. Appending events and retrieving them in order
  3. ConcurrencyException on version conflict
  4. Automatic snapshotEveryN logic
  5. Manual saveSnapshot behavior
  6. Deleting aggregates (both events and snapshots)

To run tests, add to package.json:

"scripts": {
  "test": "jest --passWithNoTests",
  "test:integration": "jest --config jest.config.js"
}

and run:

npm test
npm run test:integration

Contributing

  1. Fork the repository
  2. Clone your fork locally
  3. Install dependencies (npm install)
  4. Run tests (npm test)
  5. Create a feature branch (git checkout -b feature/your-feature)
  6. Make changes, following existing code style and adding TSDoc comments
  7. Add tests for any new behavior
  8. Open a pull request describing your changes

Please ensure all tests pass before submitting a PR.

License

This library is open-source under the MIT License. See the LICENSE file for details.