npm.io
1.0.8 • Published yesterday

@asaidimu/runtime

Licence
MIT
Version
1.0.8
Deps
9
Size
514 kB
Vulns
0
Weekly
0

@asaidimu/runtime

A workflow runtime for executing pipelines built on @asaidimu/utils-pipeline. Routes bus events to workflow triggers, manages run lifecycle (invoke, pause, resume, abort), and provides execution-mode concurrency control.

Installation

npm install @asaidimu/runtime

Quick Start

import { WorkflowRuntime } from "@asaidimu/runtime";
import { createEventBus } from "@core/events";
import { StoreRegistry } from "@core/store";

const bus = createEventBus();
const storeRegistry = new StoreRegistry();

const runtime = new WorkflowRuntime({ bus, storeRegistry });

await runtime.register(myWorkflow, {
  mode: { type: "transient", concurrency: 10 },
  onPrepare: async (ctx) => {
    /* set up state */
  },
  onComplete: async (result) => {
    /* handle completion */
  },
});

API

WorkflowRuntime
Constructor
new WorkflowRuntime(options: WorkflowRuntimeOptions)
Option Type Description
bus EventBus Event bus for dispatching and subscribing to workflow events
storeRegistry StoreRegistry Registry for creating workflow state stores
timelineStore TimelineStore Optional store for recording execution timelines. When set, all log messages emitted by pipeline steps via pcxt.logger are automatically captured as timeline events with source "logger".
services ServiceDefinition[] Optional global singleton services injected into all pipeline runs
env Record<string, string | undefined> Optional global environment variables (defaults to process.env). Per-workflow overrides can be set on the Workflow definition.
static createTestTimelineStore(database?)

Creates a TimelineStore backed by IndexedDB, useful in test harnesses.

const timelineStore =
  await WorkflowRuntime.createTestTimelineStore("my-test-db");
register(workflow, options)

Registers a workflow with the runtime. Wires up triggers to bus events and execution-mode enforcement. If the workflow declares services, a scoped child container is created so those services are visible only to runs of that workflow. If the workflow declares env, those variables override the global env layer for runs of this workflow.

runtime.register(workflow, {
  mode: { type: "transient", concurrency: 10, capacity: 100 },
  onPrepare: async (ctx) => { ... },
  onComplete: async (result) => { ... },
  onResume: async (ctx) => { ... },
  onCleanup: async () => { ... },
  onDispatch: (result) => { ... },
});

Hook order on completion:

  1. onComplete(result) — fires immediately when the run finishes
  2. Internal bookkeeping (settle, onRunEnded, registry.prune)
  3. onCleanup() — fires after internal cleanup is done

This ordering ensures the UI is notified before any cleanup runs, and onCleanup runs after internal state is settled (safe to deregister the workflow).

deregister(workflowId)

Removes a workflow, releases all bus subscriptions, and disposes the workflow's scoped service container.

hasWorkflow(workflowId)

Returns true if the workflow is registered.

listWorkflows()

Returns an array of registered workflow IDs.

invoke(workflowId, triggerId, event)

Directly executes a pipeline, bypassing the bus and execution context. Returns a promise that resolves when the run completes or pauses.

const result = await runtime.invoke("my-workflow", "my-trigger", event);

if (result.ok) {
  if (result.value.status === "paused") {
    const runId = result.metadata!.runId;
    // Resume later via runtime.resume(runId, patch)
  }
}
resume(runId, patch?)

Resumes a paused run with an optional state patch. Handles:

  • Pause-window expiry (reconstructs context from checkpoint)
  • Watch drain — if the resumed run re-pauses, checks for buffered events
  • Service injection for reconstructed contexts
signal(runId, patch)

Writes a state patch directly into a running run's context.

abort(runId)

Aborts a running or paused run.

registry(workflowId)

Returns the PipelineRegistry for a workflow, or undefined.

watch(workflowId, runId)

Returns RunInfo for a specific run, or undefined.

listAllRuns()

Returns an array of all runs across all registered workflows.

stop()

Gracefully stops the runtime. Deregisters all workflows and unsubscribes from abort/signal events.

Execution Modes

Mode Description
transient Concurrency-limited. Drops when queue hits capacity.
serialized FIFO queue, one run at a time. Uses Serializer.
singleton_loop At most one active run. On busy: drop, signal, or replace.
exclusive At most one active run. On busy: reject or queue_single (latest-wins).

Pause / Resume

Pipelines can pause at a stage boundary. The runtime:

  1. Buffers matching events before the pause (pre-pause window) and between resume cycles (inter-resume buffer).
  2. On pause, immediately drains any buffered events via resume().
  3. If the queue is empty, the run "parks" — the next matching bus event triggers resume automatically.

The PauseService is available to steps as services.__pause_service__ and provides register(), cancel(), and condition-based event matching with AND semantics across operators: ==, !=, >, >=, <, <=, exists.

Services

Global services

Services available to every pipeline run can be injected at construction. They are registered as singletons on the runtime's internal ArtifactContainer.

new WorkflowRuntime({
  bus,
  storeRegistry,
  services: [
    {
      id: "my-api",
      factory: () => new ApiClient(), // context is available but unused
    },
    {
      id: "my-context",
      factory: (ctx) => new RequestContext(ctx.runId),
      // ctx has state(), use(), select(), onCleanup(), etc.
    },
  ],
});

The factory receives a full ArtifactFactoryContext allowing service factories to use ctx.use(), ctx.select(), and other artifact container features for dependency injection.

ServiceDefinition has no scope field — global services are always "singleton".

Workflow-level services

Workflows can declare their own services with three scope options:

Scope Lifetime Container Artifact scope
"workflow" As long as the workflow is registered Scoped child of the global container "singleton"
"run" For the duration of a single pipeline run Run's context container (per-run) "singleton"
"transient" Per-resolution (new instance each time) Run's context container (per-run) "transient"

"workflow" (default) services are registered in a scoped child container, isolating them to runs of that workflow. They cannot access run state via select() because their factory runs against the scoped container's store.

"run" and "transient" services are registered directly on each run's context container. Their factories resolve against the run's state store, so they can use deps.select() to read pipeline state and react to state changes.

const workflow = {
  id: "my-workflow",
  label: "My Workflow",
  triggers: { ... },
  pipelines: { ... },
  services: [
    // workflow-scoped: shared across all runs, no run state access
    { id: "config", scope: "workflow", factory: () => loadConfig() },

    // run-scoped: singleton per run, can access run state
    {
      id: "userProfile",
      scope: "run",
      factory: async (ctx) => {
        const userId = await ctx.use((deps) =>
          deps.select((s: any) => s.userId),
        );
        return fetchProfile(userId);
      },
    },

    // transient: new instance per resolution, can access run state
    { id: "requestId", scope: "transient", factory: () => crypto.randomUUID() },
  ],
};
Reserved service IDs

The following service IDs are reserved and cannot be redefined:

ID Description
__pause_service__ Built-in PauseService for pause/resume workflows
__env__ Built-in environment variable service for layered env access

Attempting to redefine either throws a SystemError with code DUPLICATE_KEY.


Environment variables (__env__)

The runtime provides a built-in __env__ service for reading environment variables. It supports two layers:

  • Global — set via WorkflowRuntimeOptions.env (falls back to process.env)
  • Per-workflow — set via Workflow.env, overrides the global layer for that workflow
const runtime = new WorkflowRuntime({
  bus,
  storeRegistry,
  env: { DATABASE_URL: "postgres://..." }, // global default
});

const workflow = {
  id: "my-workflow",
  env: { DATABASE_URL: "postgres://override..." }, // per-workflow override
  // ...
};

Steps access env vars via deps.require("__env__") and call .get():

action: async (ctx) => {
  const env = await ctx.use((deps) => deps.require("__env__"));
  const dbUrl = env.get("DATABASE_URL"); // workflow env wins, then global
};

Resolution order:

  1. Per-workflow env layer (if set on Workflow.env)
  2. Global env layer (WorkflowRuntimeOptions.env or process.env)
  3. undefined
EnvService API
interface EnvService {
  get(key: string): string | undefined;
}

Only .get() is exposed — plain property access (env.KEY) is not supported. The service is read-only.

Do's and Don'ts of Services
Do
  • Do use "run"-scoped services when the factory needs to read pipeline state via select() — they resolve against the run's state store.
  • Do use "workflow"-scoped services for shared configuration, API clients, or connections that are identical across all runs of a workflow.
  • Do use __env__ for reading configuration — it correctly resolves per-workflow overrides over the global defaults.
  • Do keep service factories pure where possible — prefer injecting state via select() over side-effectful construction.
  • Do use ctx.onCleanup() inside factories to release resources (close sockets, free handles) when the run ends.
Don't
  • Don't attempt to register a service with ID __pause_service__ or __env__ — both are reserved and throw DUPLICATE_KEY.
  • Don't use "workflow"-scoped services for per-run state — they are singletons shared across all runs and cannot access run-specific state via select().
  • Don't cache per-run data in "workflow"-scoped or global singletons — it will leak across runs.
  • Don't mutate the env service or treat it as a plain object — always use .get("KEY").
  • Don't assume process.env is available — the runtime may be configured with an explicit env object that does not include it.
Service resolution

Steps resolve services via ctx.use():

action: async (ctx) => {
  const svc = await ctx.use((deps) => deps.require("my-api"));
};

The ctx.use() callback receives a UseDependencyContext with resolve(), require(), and select().

Container Architecture

The runtime uses an ArtifactContainer (from @core/artifacts) as its internal service container:

Global service container (ArtifactContainer)
  ├── Global singleton services
  ├── __pause_service__ (built-in PauseService)
  ├── __env__ (unscoped fallback — rarely hit)
  └── Scoped containers per workflow
       ├── Workflow-scoped services ("workflow" scope)
       └── Run context extends with scoped __env__ (global + per-workflow layers)

When a pipeline run starts, the run's context container is extended with:

  1. The workflow's scoped container (for "workflow"-scoped services)
  2. The global service container (for global services and __pause_service__)
  3. "run"-scoped and "transient" services are registered directly on the run's container, so their factories resolve against the run's state store.

On deregister(), the workflow's scoped container is disposed, cleaning up all workflow-level artifacts.

Container capabilities

The underlying ArtifactContainer provides:

Feature Description
register(template) Register an artifact with scope (singleton / transient), lazy/eager loading, timeouts, retries, param key, and serialization opt-in
resolve(key) Resolve an artifact, falling back through parent containers
require(key) Like resolve but returns the instance directly, throws on error
peek(key) Synchronous non-resolving lookup
invalidate(key) Invalidate a cached artifact, optionally force rebuild
extend(parent) Add a parent container for fallback resolution
scope(name) Create a namespaced ScopedContainer
watch(key) Create an observer that fires on change
on(event) Subscribe to lifecycle events (build:start, build:complete, build:error, artifact:invalidated, artifact:disposed, artifact:registered, stream:emit, container:dispose)
export() Serialize singleton artifacts for persistence
from(bundle) Static factory: create container from store + optional bundle + templates
debugInfo() Snapshot of all artifacts' status, dependencies, and dependents

Internal Architecture

Bus events → dispatch index → ExecutionContext.accept() → spawnRun()
                                                              │
                     ┌────────────────────────────────────────┤
                     │                                        │
               pipeline runs                              returns runId
                     │                                        │
             ┌───────┴───────┐                          ExecutionContext
             │               │                          updates inFlight
          paused         completed                      / activeRunId
             │               │
     drainWatchQueue     onComplete() → UI notified
             │               │
        resume()         bookkeeping
             │               │
     ┌───────┴───────┐   onCleanup()
     │               │
   re-paused      completed → onComplete()

Testing

npm test
npm run test:watch