0.3.3 • Published 10 months ago

@delta-base/server v0.3.3

Weekly downloads
-
License
LicenseRef-LICENS...
Repository
github
Last release
10 months ago

DeltaBase Server SDK

npm version License

A TypeScript SDK for interacting with the DeltaBase event sourcing platform.

Overview

DeltaBase is an event sourcing platform that allows you to store and process event streams. This SDK provides a clean, typed interface for working with DeltaBase from Node.js applications.

Key features:

  • Event Storage: Append, read, and query events in streams
  • Event Subscriptions: React to events in real-time with webhooks and other subscription types
  • Advanced Querying: Filter and search events across streams
  • Event Aggregation: Build state from event streams with built-in aggregation helpers
  • Administration: Manage event stores, subscriptions, and platform resources

Installation

npm install @deltabase/server
# or
yarn add @deltabase/server
# or
pnpm add @deltabase/server

Quick Start

import { DeltaBase } from '@deltabase/server';

// Initialize the DeltaBase client
const client = new DeltaBase({
  apiKey: 'your-api-key',
  // For production use
  // baseUrl: 'https://api.delta-base.com'
});

// Get an event store client
const eventStore = client.getEventStore('my-event-store');

// Append events to a stream
await eventStore.appendToStream('user-123', [
  {
    type: 'user.created',
    data: { name: 'Alice', email: 'alice@example.com' }
  }
]);

// Read events from a stream
const { events } = await eventStore.readStream('user-123');
console.log('User events:', events);

Key Concepts

Event Sourcing

DeltaBase is built around the event sourcing pattern, where:

  • All changes to application state are captured as a sequence of immutable events
  • Events are stored in an append-only log
  • Current state can be reconstructed by replaying events
  • Event history provides a complete audit trail of all changes

Streams

A stream is a sequence of related events, typically representing the history of a single entity. Streams are identified by a unique ID, such as user-123 or order-456.

Events

An event represents something that happened in your domain. Events are immutable and include:

  • type: A descriptive name for the event (e.g., user.created, order.placed)
  • data: The payload of the event, containing relevant information
  • Metadata: Additional information like timestamps, correlation IDs, etc.

Subscriptions

Subscriptions allow you to react to events in real-time. You can subscribe to specific event types or patterns and receive notifications via webhooks or other mechanisms.

Usage Examples

Working with Event Stores

import { DeltaBase } from '@deltabase/server';

const client = new DeltaBase({ apiKey: 'your-api-key', baseUrl: 'https://api.delta-base.com' });

// Create a new event store
const management = client.getManagement();
await management.createEventStore({
  name: 'orders',
  description: 'Event store for order events',
});

// Get an event store client
const orderStore = client.getEventStore('orders');

// Append events
await orderStore.appendToStream('order-123', [
  {
    type: 'order.created',
    data: {
      customerId: 'cust-456',
      items: [{ productId: 'prod-789', quantity: 2, price: 25.99 }],
      total: 51.98
    }
  }
]);

// Read events
const { events } = await orderStore.readStream('order-123');

// Query events
const { events: userOrders } = await orderStore.queryEvents({
  type: 'order.created',
  fromDate: '2023-01-01T00:00:00Z',
  limit: 10
});

// List streams
const { streams } = await orderStore.listStreams({ pattern: 'order-*' });

Aggregating State from Events

// Define your state type
type OrderState = {
  id: string;
  customerId: string;
  items: Array<{ productId: string; quantity: number; price: number }>;
  total: number;
  status: 'pending' | 'shipped' | 'delivered' | 'canceled';
};

// Define your event types
type OrderEvent =
  | { type: 'order.created'; data: { customerId: string; items: Array<{ productId: string; quantity: number; price: number }>; total: number } }
  | { type: 'order.shipped'; data: { trackingNumber: string; shippedAt: string } }
  | { type: 'order.delivered'; data: { deliveredAt: string } }
  | { type: 'order.canceled'; data: { reason: string; canceledAt: string } };

// Aggregate events into state
const { state, currentStreamVersion } = await orderStore.aggregateStream<OrderState, OrderEvent>(
  'order-123',
  {
    initialState: () => ({
      id: '',
      customerId: '',
      items: [],
      total: 0,
      status: 'pending'
    }),
    evolve: (state, event) => {
      switch (event.type) {
        case 'order.created':
          return {
            ...state,
            id: event.streamId,
            customerId: event.data.customerId,
            items: event.data.items,
            total: event.data.total
          };
        case 'order.shipped':
          return { ...state, status: 'shipped' };
        case 'order.delivered':
          return { ...state, status: 'delivered' };
        case 'order.canceled':
          return { ...state, status: 'canceled' };
        default:
          return state;
      }
    }
  }
);

console.log('Current order state:', state);

Setting Up Event Subscriptions

// Get the event bus for a store
const eventBus = client.getEventBus('orders');

// Create a webhook subscription for all order events
const subscription = await eventBus.subscribeWebhook(
  'order.*',
  'https://example.com/webhooks/orders',
  {
    headers: { 'X-API-Key': 'webhook-secret' },
    retryPolicy: {
      maxAttempts: 3,
      backoffMinutes: 5
    }
  }
);

// List subscriptions
const { subscriptions } = await eventBus.listSubscriptions();

// Unsubscribe
await eventBus.unsubscribe(subscription.subscriptionId);

Administrative Operations

const management = client.getManagement();

// List all event stores
const { eventStores } = await management.listEventStores();

// Get details about a specific event store
const storeDetails = await management.getEventStore('orders');
console.log('Storage used:', storeDetails.statistics?.databaseSizeBytes);

// Update event store settings
await management.updateEventStore('orders', {
  description: 'Updated description',
  retentionPeriodDays: 90
});

// Delete an event store (use with caution!)
await management.deleteEventStore('unused-store');

API Reference

The SDK is organized into several main classes:

DeltaBase

The main client class that provides access to all functionality.

const client = new DeltaBase({
  apiKey: 'your-api-key',
  baseUrl: 'https://api.delta-base.com' // Optional
});

Methods:

  • getManagement(): Returns a ManagementClient for administrative operations
  • getEventStore(eventStoreId): Returns an EventStore client for the specified store
  • getEventBus(eventStoreId): Returns an EventBus client for the specified store

EventStore

For working with event streams within a specific event store.

Methods:

  • appendToStream(streamId, events, options?): Append events to a stream
  • readStream(streamId, options?): Read events from a stream
  • aggregateStream(streamId, options): Build state from events using an aggregation function
  • queryEvents(options?): Query events across streams with filtering
  • queryStreams(options?): Query streams with filtering
  • listStreams(options?): Get a list of stream IDs

EventBus

For managing event subscriptions.

Methods:

  • subscribe(options): Create a subscription to events
  • subscribeWebhook(eventFilter, url, options?): Convenient method to create a webhook subscription
  • getSubscription(subscriptionId): Get details about a subscription
  • listSubscriptions(options?): List all subscriptions
  • unsubscribe(subscriptionId): Delete a subscription

ManagementClient

For administrative operations on event stores.

Methods:

  • createEventStore(options): Create a new event store
  • listEventStores(): List all event stores
  • getEventStore(eventStoreId): Get details about an event store
  • updateEventStore(eventStoreId, settings): Update event store settings
  • deleteEventStore(eventStoreId): Delete an event store

Advanced Topics

Optimistic Concurrency Control

You can use the expectedStreamVersion option to implement optimistic concurrency control:

try {
  await eventStore.appendToStream(
    'user-123',
    [{ type: 'user.updated', data: { email: 'new@example.com' } }],
    { expectedStreamVersion: 5n } // Only succeed if the current version is 5
  );
} catch (error) {
  console.error('Concurrency conflict!', error);
}

Transaction IDs

Every event can include a transaction ID to group related events:

await eventStore.appendToStream('order-123', [
  {
    type: 'order.created',
    data: { /* ... */ },
    transactionId: 'tx-abc-123'
  },
  {
    type: 'inventory.reserved',
    data: { /* ... */ },
    transactionId: 'tx-abc-123' // Same transaction ID
  }
]);

License

(c) Copyright 2025 nibbio LLC, all rights reserved.

0.3.3

10 months ago

0.3.2

10 months ago

0.3.1

11 months ago

0.3.0

11 months ago

0.2.0

12 months ago

0.1.0

12 months ago