npm.io
5.0.3 • Published 3 months ago

@unchainedshop/connector-sdk

Licence
ISC
Version
5.0.3
Deps
1
Size
109 kB
Vulns
0
Weekly
0

@unchainedshop/connector-sdk

SDK for building ETL connectors that sync data into the Unchained Commerce engine via its Bulk Import API.

The SDK manages the full Extract-Transform-Load lifecycle: fetching remote data into a local MongoDB staging database, deduplicating events against previously submitted hashes, chunking and streaming uploads to the Unchained Bulk Import endpoint, and tracking each run in a journal for incremental (differential) syncs.

Installation

npm install @unchainedshop/connector-sdk mongodb

mongodb is a peer dependency (^7.0.0).

Quick Start

import { Connector } from "@unchainedshop/connector-sdk";
import type { BulkImportEvent } from "@unchainedshop/connector-sdk";

const connector = new Connector({
  unchainedEndpoint: process.env.UNCHAINED_ENDPOINT,
  unchainedSecret: process.env.UNCHAINED_SECRET,
});

try {
  await connector.init();

  // 1. Extract — fetch remote data into local MongoDB collections
  await connector.extract(fetchProducts, "products");
  await connector.extract(fetchAssortments, "assortments", { replace: true });

  // 2. Transform — build events from extracted data
  const events: BulkImportEvent[] = buildEventsFromDB(connector.db);

  // 3. Prepare — deduplicate updates, emit deletes (streamed via async generator)
  const prepared = connector.prepareEvents(events, { emitDeletes: "PRODUCT" });

  // 4. Load — submit to Unchained in 10K-event chunks
  const ids = await connector.load(prepared);

  await connector.reportSuccess(ids);
} catch {
  await connector.reportFailure("LOAD");
} finally {
  await connector.dispose();
}

Environment Variables

Variable Description Required
UNCHAINED_ENDPOINT Unchained engine base URL (e.g. https://engine.example.com) Yes (or pass via ConnectorOptions)
UNCHAINED_SECRET API secret used for Bearer admin:<secret> authentication Yes (or pass via ConnectorOptions)
MONGO_URL MongoDB connection string for the local staging database Yes (or pass via ConnectorOptions)
UNCHAINED_WORKER_ID Worker identifier reported to the work queue (default: "connector") No
UNCHAINED_CONNECTOR_TIMEOUT Timeout in minutes for runFromExternalWork (default: 30) No
NODE_ENV Set to "development" to auto-start an in-memory MongoDB via mongodb-memory-server No
COCKPIT_ENDPOINT Cockpit CMS endpoint (for remotes.cockpit) No
COCKPIT_API_KEY Cockpit CMS API key No

API

Connector

The main class that manages the full ETL lifecycle. All accessors (db, unchained, journalEntry) throw if init() has not been called.

Constructor
new Connector(options?: ConnectorOptions)
interface ConnectorOptions {
  mongoUrl?: string;                        // Falls back to MONGO_URL env
  unchainedEndpoint?: string;               // Falls back to UNCHAINED_ENDPOINT env
  unchainedSecret?: string;                 // Falls back to UNCHAINED_SECRET env
  customHeaders?: Record<string, string>;   // Extra headers on all Unchained API requests
  reset?: boolean;                          // Drop journal + submitted events on init
}

All options fall back to their corresponding environment variables when omitted.

Connector.fromExisting(options)

Static factory that creates a Connector from already-initialized MongoDB and Unchained clients. The connector will not close the MongoDB connection on dispose() since it does not own it.

const connector = await Connector.fromExisting({
  mongo: existingMongoClient,
  unchainedAPI: existingUnchainedAPI,
  reset: false,
});
connector.init(): Promise<this>

Connects to MongoDB, initializes the Unchained API client, creates database indexes, and starts a new journal entry. Returns this for chaining:

const connector = await new Connector().init();
connector.dispose(): Promise<void>

Closes the MongoDB connection (if the connector owns it) and releases all internal references.

connector.db: Db

The MongoDB Db instance for the staging database. Use this to query extracted data when building transform logic.

connector.unchained: UnchainedAPI

The Unchained API client. See UnchainedAPI for the full interface.

connector.journalEntry: WithId<JournalEntry>

The current journal entry document for this run. Contains started, since (start of the sync window for differential syncs), differential (whether a previous successful run exists), and status.


Extract
connector.extract(fetchFn, collectionName, options?): Promise<number>

Calls fetchFn to retrieve an array of documents and stores them in the named MongoDB collection. Returns the number of inserted documents.

// Simple insertion
await connector.extract(
  () => fetch("https://api.example.com/products").then((r) => r.json()),
  "products",
);

// Upsert by _id (for incremental updates)
await connector.extract(fetchProducts, "products", { replace: true });

// Delete matching docs before inserting
await connector.extract(fetchProducts, "products", {
  deleteFilter: { category: "old" },
});
Option Type Description
replace boolean Upsert documents by _id instead of plain insertMany. Use when re-extracting data that may already exist.
deleteFilter Record<string, unknown> Delete matching documents from the collection before inserting the new data.
loadFromArchive boolean If true and archivePath is set, attempt to load data from a gzip archive instead of calling fetchFn. Falls back to fetchFn if no archive exists.
storeInArchive boolean If true and archivePath is set, save fetched data to a gzip archive after extraction.
archivePath string Directory path for archive files (default: "./offline-archives").
dateFields string[] Field names to deserialize as Date objects when loading from archive (JSON serializes dates as strings).

The fetchFn receives a RequestInit object and must return Promise<P[]> where P extends BulkImportPayload.


Transform
connector.prepareEvents(newEvents, options?): AsyncGenerator<HashedBulkImportEvent>

An async generator that deduplicates update events against previously submitted hashes and optionally emits REMOVE operations for entities that are no longer present in the source data. Because it yields events one at a time, no intermediate arrays need to be allocated in memory.

const prepared = connector.prepareEvents(events, {
  emitDeletes: "PRODUCT",
  excludeKeys: (key) => key === "updatedAt",
});
Option Type Description
excludeKeys (key: string) => boolean Predicate that returns true for keys to strip before hashing. Use this to ignore volatile fields that change every sync but don't represent meaningful updates.
emitDeletes boolean | string Emit REMOVE operations for entities that were previously submitted but are missing from newEvents. Pass a string (e.g. "PRODUCT") to scope deletes to one entity type, or true for all entity types.

Default excluded keys: Authorization, updated, created, updatedAt, createdAt, modified, modifiedAt, published.

How deduplication works:

  1. Queries the unchained_submitted_events collection for the last meaningful (non-REMOVE) event per entity:payloadId pair
  2. Hashes each incoming event's payload (with excluded keys stripped) using object-code
  3. Skips UPDATE events whose hash matches the previously submitted hash
  4. If emitDeletes is enabled, queries all previously submitted entity IDs and yields REMOVE events for any that are absent from the new event set

The input newEvents can be a plain array or an AsyncIterable<BulkImportEvent>.


Load
connector.load(events, submitOptions?): Promise<ObjectId[]>

Submits events to the Unchained Bulk Import API and records them in the unchained_submitted_events collection for future deduplication. Returns the MongoDB ObjectIds of the submitted event records.

Events are processed in chunks of 10,000. Each chunk is serialized to a temporary file and streamed to the API (unless disableStream is set). After all chunks are submitted, duplicate submitted events are garbage-collected.

Accepts a plain array or an AsyncIterable (the generator returned by prepareEvents).

const ids = await connector.load(prepared, {
  shouldUpsert: true,
  skipCacheInvalidation: true,
});
interface SubmitOptions {
  shouldUpsert?: boolean;           // CREATE upserts if an entity with the same _id already exists
  shouldUpsertOnUpdate?: boolean;   // UPDATE creates the entity if no entity with _id exists
  skipCacheInvalidation?: boolean;  // Skip cache invalidation after import
  disableStream?: boolean;          // Send JSON body instead of streaming from a temp file
}

Journal & Reporting

The journal tracks each ETL run. On init(), a new journal entry is created with status INITIAL. The since field is set to the started timestamp of the last successful (COMPLETE) run, enabling differential/incremental syncs.

connector.reportSuccess(eventIds?): Promise<void>

Records a COMPLETE status on the current journal entry. Optionally pass the ObjectId[] returned by load() to store them on the journal entry.

const ids = await connector.load(prepared);
await connector.reportSuccess(ids);
connector.reportFailure(stage?): Promise<void>

Records a failure status. Pass "EXTRACT", "TRANSFORM", or "LOAD" to indicate which stage failed (defaults to "LOAD").

try {
  await connector.extract(fetchFn, "products");
} catch {
  await connector.reportFailure("EXTRACT");
}
enum CompletionStatus {
  INITIAL = "INITIAL",
  COMPLETE = "COMPLETE",
  FAILED_EXTRACT = "FAILED_EXTRACT",
  FAILED_TRANSFORM = "FAILED_TRANSFORM",
  FAILED_LOAD = "FAILED_LOAD",
}

Existing Entity IDs
connector.getExistingEntityIds(entity?): Promise<Set<string>>

Returns a Set of payload _id values for all previously submitted entities (excluding those whose last operation was REMOVE). Optionally filter by entity type.

const existingProductIds = await connector.getExistingEntityIds("PRODUCT");
if (!existingProductIds.has("some-product-id")) {
  // This product has never been synced before
}

Archives

Utility methods for offline development and caching extracted data as gzip-compressed JSON files.

connector.loadFromArchive<T>(archiveName, options?): Promise<T | null>

Loads and decompresses a {archivePath}/{archiveName}.json.gz file. Returns the parsed data or null if the file doesn't exist.

const products = await connector.loadFromArchive<Product[]>("products", {
  archivePath: "./offline-archives",
  dateFields: ["updatedAt", "createdAt"],
});
Option Type Description
archivePath string Directory for archive files (default: "./offline-archives")
dateFields string[] Field names to convert from strings back to Date objects
connector.storeToArchive(archiveName, data, options?): Promise<void>

Serializes data as JSON, gzip-compresses it, and writes it to {archivePath}/{archiveName}.json.gz. Creates the directory if it doesn't exist.

await connector.storeToArchive("products", fetchedProducts, {
  archivePath: "./offline-archives",
});

Work Queue
runFromExternalWork(mainFn, options): Promise<void>

Orchestrates an ETL run driven by the Unchained work queue. This is designed for connectors that run as long-lived workers polling for work items.

The function:

  1. Allocates work of the given workType from the Unchained work queue
  2. If no work is available, cleans up any dead/zombie workers and returns
  3. Runs mainFn with a timeout (configurable via UNCHAINED_CONNECTOR_TIMEOUT, default: 30 minutes)
  4. Reports success or failure back to the Unchained engine
import { Connector, runFromExternalWork } from "@unchainedshop/connector-sdk";

async function syncProducts(options) {
  const connector = new Connector();
  try {
    await connector.init();
    // ... ETL logic ...
    await connector.reportSuccess(ids);
    return connector.journalEntry;
  } finally {
    await connector.dispose();
  }
}

await runFromExternalWork(syncProducts, {
  workType: "SYNC_PRODUCTS",
  unchainedAPI: connector.unchained,
});
type RunFromExternalWorkOptions<T> = {
  workType: string;        // The work type to allocate from the queue
  unchainedAPI: UnchainedAPI;  // Initialized Unchained API client
} & T;  // Additional options are passed through to mainFn

The mainFn must return a JournalEntry. If journalEntry.status === "COMPLETE", the work is reported as successful; otherwise it's reported as failed.

timeout(minutes): Promise<void>

Returns a promise that rejects after the given number of minutes. Used internally by runFromExternalWork but exported for custom timeout logic.

import { timeout } from "@unchainedshop/connector-sdk";

await Promise.race([
  longRunningOperation(),
  timeout(10),
]);

Remotes

Low-level API clients available via the remotes namespace. These are used internally by the Connector class but can be used directly for advanced use cases.

import { remotes } from "@unchainedshop/connector-sdk";
remotes.mongodb

MongoDB client initialization and in-memory server management.

initMongoDBClient(options?): Promise<MongoClient>

Creates and returns a connected MongoClient. Connection settings: 100s connect timeout, 1h socket timeout, ignoreUndefined: true.

const mongo = await remotes.mongodb.initMongoDBClient({
  mongoUrl: "mongodb://localhost:27017/connector",
});
resolveURI(options?): Promise<string>

Resolves the MongoDB connection URI. Checks (in order):

  1. options.mongoUrl
  2. MONGO_URL environment variable
  3. If NODE_ENV=development, auto-starts an in-memory MongoDB server
start(): Promise<{ uri, port, dbPath, dbName }>

Starts a MongoMemoryServer instance. Useful for development and testing.

stop(): Promise<void>

Stops the in-memory MongoDB server if one was started.

remotes.unchained

remotes.unchained.default(options?): UnchainedAPI

Creates an Unchained API client. All requests use Bearer admin:<secret> authentication with a 5-minute default timeout (15 minutes for streaming uploads).

const api = remotes.unchained.default({
  unchainedEndpoint: "https://engine.example.com",
  unchainedSecret: "my-secret",
  customHeaders: { "X-Custom": "value" },
});

UnchainedAPI methods:

Method Description
submitEventsAsStream(fileName, submitOptions?) Stream a temp file to the /bulk-import endpoint (15-min timeout)
submitEvents(body, submitOptions?) POST a JSON body to the /bulk-import endpoint
allocateWork(workType) Allocate the next available work item from the queue via GraphQL. Returns the work object, null if none available, or false on connection failure.
finishWork(workId, { success, result, error? }) Report work completion/failure back to the engine via GraphQL
findDeadWork(workType) Find work items with ALLOCATED status for the current worker (zombie detection)
addExternalWork(workType, input) Add a new work item to the queue via GraphQL
submitErrorMessage(title, content) Send an error report as a MESSAGE work item
unchainedFetch(path, params, fetchOptions) Low-level HTTP wrapper with auth headers. Params are appended as URL search params.
remotes.cockpit

Cockpit CMS client for fetching content.

remotes.cockpit.default(options): CockpitAPI
const cockpit = remotes.cockpit.default({
  endpoint: "https://cockpit.example.com",
  apiKey: "my-api-key",
});

CockpitAPI methods:

Method Description
fetchEntries(entity, locale?, { filter? }) Fetch entries from a Cockpit collection. Locale defaults to "default" (mapped from "de").
findLocaleTranslation(projectName, locale?) Fetch localization data for a Lokalize project. Locale defaults to "de".
fetchPageById(page, id, params?) Fetch a specific page by ID.

Utilities
normalizePrice(textualPrice?: string | number): number

Converts a price value (string or number) to an integer in the smallest currency unit (cents). Handles thousands separators (commas, apostrophes, backticks) in string inputs.

import { normalizePrice } from "@unchainedshop/connector-sdk";

normalizePrice("2,299.00"); // 229900
normalizePrice(22.99);       // 2299
normalizePrice(undefined);   // NaN

Types

All types are exported for use in consumer code:

import type {
  BulkImportEvent,
  HashedBulkImportEvent,
  BulkImportPayload,
  BulkImportEntity,
  BulkImportOperation,
  SubmitOptions,
  ConnectorOptions,
  JournalEntry,
  RunFromExternalWorkOptions,
  UnchainedAPI,
  Db,
  MongoClient,
} from "@unchainedshop/connector-sdk";
BulkImportEvent<P>
interface BulkImportEvent<P extends BulkImportPayload = BulkImportPayload> {
  entity: BulkImportEntity;
  operation: BulkImportOperation;
  payload: P;
}
BulkImportPayload
interface BulkImportPayload {
  _id: string;
  [key: string]: unknown;
}
BulkImportEntity

One of "PRODUCT", "ASSORTMENT", "FILTER", "ENROLLMENT", "ORDER", "USER", "QUOTATION", or any custom string.

BulkImportOperation

"CREATE" | "UPDATE" | "REMOVE"

HashedBulkImportEvent<P>

Extends BulkImportEvent with a hash: string field used for deduplication.


MongoDB Collections

The connector uses two internal collections in the staging database:

Collection Purpose
journal Tracks each ETL run with status, timestamps, and event IDs. Indexed on { status: 1, started: -1 }.
unchained_submitted_events Records every event submitted to Unchained with its payload hash for deduplication. Indexed on { emitted: 1, _id: 1 }, { payload._id: 1, entity: 1 }, { payload._id: 1 }, and { entity: 1 }.

Extracted data is stored in user-defined collections (the collectionName parameter to extract()).


Complete Example: Work-Queue-Driven Connector

import {
  Connector,
  runFromExternalWork,
  remotes,
} from "@unchainedshop/connector-sdk";
import type { BulkImportEvent } from "@unchainedshop/connector-sdk";

// Initialize an Unchained API client for the work queue
const unchainedAPI = remotes.unchained.default();

async function syncProducts() {
  const connector = new Connector();

  try {
    await connector.init();

    // Use the journal's "since" for incremental fetches
    const since = connector.journalEntry.since;

    // Extract
    await connector.extract(
      () => fetchProductsUpdatedSince(since),
      "products",
      { replace: true },
    );

    // Transform
    const events: BulkImportEvent[] = [];
    const products = await connector.db
      .collection("products")
      .find()
      .toArray();

    for (const product of products) {
      events.push({
        entity: "PRODUCT",
        operation: "UPDATE",
        payload: {
          _id: product.externalId,
          title: product.name,
          price: product.price,
        },
      });
    }

    // Prepare (deduplicate + emit deletes)
    const prepared = connector.prepareEvents(events, {
      emitDeletes: "PRODUCT",
    });

    // Load
    const ids = await connector.load(prepared);
    await connector.reportSuccess(ids);

    return connector.journalEntry;
  } catch (e) {
    await connector.reportFailure("LOAD");
    throw e;
  } finally {
    await connector.dispose();
  }
}

// Poll the work queue in a loop
while (true) {
  await runFromExternalWork(syncProducts, {
    workType: "SYNC_PRODUCTS",
    unchainedAPI,
  });
  await new Promise((resolve) => setTimeout(resolve, 60_000));
}

License

ISC