@asaidimu/runtime
A workflow runtime for executing pipelines built on @asaidimu/utils-pipeline. Routes bus events to workflow triggers, manages run lifecycle (invoke, pause, resume, abort), and provides execution-mode concurrency control.
Installation
npm install @asaidimu/runtime
Quick Start
import { WorkflowRuntime } from "@asaidimu/runtime";
import { createEventBus } from "@core/events";
import { StoreRegistry } from "@core/store";
const bus = createEventBus();
const storeRegistry = new StoreRegistry();
const runtime = new WorkflowRuntime({ bus, storeRegistry });
await runtime.register(myWorkflow, {
mode: { type: "transient", concurrency: 10 },
onPrepare: async (ctx) => {
/* set up state */
},
onComplete: async (result) => {
/* handle completion */
},
});
API
WorkflowRuntime
Constructor
new WorkflowRuntime(options: WorkflowRuntimeOptions)
| Option | Type | Description |
|---|---|---|
bus |
EventBus |
Event bus for dispatching and subscribing to workflow events |
storeRegistry |
StoreRegistry |
Registry for creating workflow state stores |
timelineStore |
TimelineStore |
Optional store for recording execution timelines. When set, all log messages emitted by pipeline steps via pcxt.logger are automatically captured as timeline events with source "logger". |
services |
ServiceDefinition[] |
Optional global singleton services injected into all pipeline runs |
env |
Record<string, string | undefined> |
Optional global environment variables (defaults to process.env). Per-workflow overrides can be set on the Workflow definition. |
static createTestTimelineStore(database?)
Creates a TimelineStore backed by IndexedDB, useful in test harnesses.
const timelineStore =
await WorkflowRuntime.createTestTimelineStore("my-test-db");
register(workflow, options)
Registers a workflow with the runtime. Wires up triggers to bus events and execution-mode enforcement. If the workflow declares services, a scoped child container is created so those services are visible only to runs of that workflow. If the workflow declares env, those variables override the global env layer for runs of this workflow.
runtime.register(workflow, {
mode: { type: "transient", concurrency: 10, capacity: 100 },
onPrepare: async (ctx) => { ... },
onComplete: async (result) => { ... },
onResume: async (ctx) => { ... },
onCleanup: async () => { ... },
onDispatch: (result) => { ... },
});
Hook order on completion:
onComplete(result)— fires immediately when the run finishes- Internal bookkeeping (
settle,onRunEnded,registry.prune) onCleanup()— fires after internal cleanup is done
This ordering ensures the UI is notified before any cleanup runs, and onCleanup runs after internal state is settled (safe to deregister the workflow).
deregister(workflowId)
Removes a workflow, releases all bus subscriptions, and disposes the workflow's scoped service container.
hasWorkflow(workflowId)
Returns true if the workflow is registered.
listWorkflows()
Returns an array of registered workflow IDs.
invoke(workflowId, triggerId, event)
Directly executes a pipeline, bypassing the bus and execution context. Returns a promise that resolves when the run completes or pauses.
const result = await runtime.invoke("my-workflow", "my-trigger", event);
if (result.ok) {
if (result.value.status === "paused") {
const runId = result.metadata!.runId;
// Resume later via runtime.resume(runId, patch)
}
}
resume(runId, patch?)
Resumes a paused run with an optional state patch. Handles:
- Pause-window expiry (reconstructs context from checkpoint)
- Watch drain — if the resumed run re-pauses, checks for buffered events
- Service injection for reconstructed contexts
signal(runId, patch)
Writes a state patch directly into a running run's context.
abort(runId)
Aborts a running or paused run.
registry(workflowId)
Returns the PipelineRegistry for a workflow, or undefined.
watch(workflowId, runId)
Returns RunInfo for a specific run, or undefined.
listAllRuns()
Returns an array of all runs across all registered workflows.
stop()
Gracefully stops the runtime. Deregisters all workflows and unsubscribes from abort/signal events.
Execution Modes
| Mode | Description |
|---|---|
transient |
Concurrency-limited. Drops when queue hits capacity. |
serialized |
FIFO queue, one run at a time. Uses Serializer. |
singleton_loop |
At most one active run. On busy: drop, signal, or replace. |
exclusive |
At most one active run. On busy: reject or queue_single (latest-wins). |
Pause / Resume
Pipelines can pause at a stage boundary. The runtime:
- Buffers matching events before the pause (pre-pause window) and between resume cycles (inter-resume buffer).
- On pause, immediately drains any buffered events via
resume(). - If the queue is empty, the run "parks" — the next matching bus event triggers resume automatically.
The PauseService is available to steps as services.__pause_service__ and provides register(), cancel(), and condition-based event matching with AND semantics across operators: ==, !=, >, >=, <, <=, exists.
Services
Global services
Services available to every pipeline run can be injected at construction. They are registered as singletons on the runtime's internal ArtifactContainer.
new WorkflowRuntime({
bus,
storeRegistry,
services: [
{
id: "my-api",
factory: () => new ApiClient(), // context is available but unused
},
{
id: "my-context",
factory: (ctx) => new RequestContext(ctx.runId),
// ctx has state(), use(), select(), onCleanup(), etc.
},
],
});
The factory receives a full ArtifactFactoryContext allowing service factories to use ctx.use(), ctx.select(), and other artifact container features for dependency injection.
ServiceDefinition has no scope field — global services are always "singleton".
Workflow-level services
Workflows can declare their own services with three scope options:
| Scope | Lifetime | Container | Artifact scope |
|---|---|---|---|
"workflow" |
As long as the workflow is registered | Scoped child of the global container | "singleton" |
"run" |
For the duration of a single pipeline run | Run's context container (per-run) | "singleton" |
"transient" |
Per-resolution (new instance each time) | Run's context container (per-run) | "transient" |
"workflow" (default) services are registered in a scoped child container, isolating them to runs of that workflow. They cannot access run state via select() because their factory runs against the scoped container's store.
"run" and "transient" services are registered directly on each run's context container. Their factories resolve against the run's state store, so they can use deps.select() to read pipeline state and react to state changes.
const workflow = {
id: "my-workflow",
label: "My Workflow",
triggers: { ... },
pipelines: { ... },
services: [
// workflow-scoped: shared across all runs, no run state access
{ id: "config", scope: "workflow", factory: () => loadConfig() },
// run-scoped: singleton per run, can access run state
{
id: "userProfile",
scope: "run",
factory: async (ctx) => {
const userId = await ctx.use((deps) =>
deps.select((s: any) => s.userId),
);
return fetchProfile(userId);
},
},
// transient: new instance per resolution, can access run state
{ id: "requestId", scope: "transient", factory: () => crypto.randomUUID() },
],
};
Reserved service IDs
The following service IDs are reserved and cannot be redefined:
| ID | Description |
|---|---|
__pause_service__ |
Built-in PauseService for pause/resume workflows |
__env__ |
Built-in environment variable service for layered env access |
Attempting to redefine either throws a SystemError with code DUPLICATE_KEY.
Environment variables (__env__)
The runtime provides a built-in __env__ service for reading environment variables. It supports two layers:
- Global — set via
WorkflowRuntimeOptions.env(falls back toprocess.env) - Per-workflow — set via
Workflow.env, overrides the global layer for that workflow
const runtime = new WorkflowRuntime({
bus,
storeRegistry,
env: { DATABASE_URL: "postgres://..." }, // global default
});
const workflow = {
id: "my-workflow",
env: { DATABASE_URL: "postgres://override..." }, // per-workflow override
// ...
};
Steps access env vars via deps.require("__env__") and call .get():
action: async (ctx) => {
const env = await ctx.use((deps) => deps.require("__env__"));
const dbUrl = env.get("DATABASE_URL"); // workflow env wins, then global
};
Resolution order:
- Per-workflow env layer (if set on
Workflow.env) - Global env layer (
WorkflowRuntimeOptions.envorprocess.env) undefined
EnvService API
interface EnvService {
get(key: string): string | undefined;
}
Only .get() is exposed — plain property access (env.KEY) is not supported. The service is read-only.
Do's and Don'ts of Services
Do
- Do use
"run"-scoped services when the factory needs to read pipeline state viaselect()— they resolve against the run's state store. - Do use
"workflow"-scoped services for shared configuration, API clients, or connections that are identical across all runs of a workflow. - Do use
__env__for reading configuration — it correctly resolves per-workflow overrides over the global defaults. - Do keep service factories pure where possible — prefer injecting state via
select()over side-effectful construction. - Do use
ctx.onCleanup()inside factories to release resources (close sockets, free handles) when the run ends.
Don't
- Don't attempt to register a service with ID
__pause_service__or__env__— both are reserved and throwDUPLICATE_KEY. - Don't use
"workflow"-scoped services for per-run state — they are singletons shared across all runs and cannot access run-specific state viaselect(). - Don't cache per-run data in
"workflow"-scoped or global singletons — it will leak across runs. - Don't mutate the env service or treat it as a plain object — always use
.get("KEY"). - Don't assume
process.envis available — the runtime may be configured with an explicitenvobject that does not include it.
Service resolution
Steps resolve services via ctx.use():
action: async (ctx) => {
const svc = await ctx.use((deps) => deps.require("my-api"));
};
The ctx.use() callback receives a UseDependencyContext with resolve(), require(), and select().
Container Architecture
The runtime uses an ArtifactContainer (from @core/artifacts) as its internal service container:
Global service container (ArtifactContainer)
├── Global singleton services
├── __pause_service__ (built-in PauseService)
├── __env__ (unscoped fallback — rarely hit)
└── Scoped containers per workflow
├── Workflow-scoped services ("workflow" scope)
└── Run context extends with scoped __env__ (global + per-workflow layers)
When a pipeline run starts, the run's context container is extended with:
- The workflow's scoped container (for
"workflow"-scoped services) - The global service container (for global services and
__pause_service__) "run"-scoped and"transient"services are registered directly on the run's container, so their factories resolve against the run's state store.
On deregister(), the workflow's scoped container is disposed, cleaning up all workflow-level artifacts.
Container capabilities
The underlying ArtifactContainer provides:
| Feature | Description |
|---|---|
register(template) |
Register an artifact with scope (singleton / transient), lazy/eager loading, timeouts, retries, param key, and serialization opt-in |
resolve(key) |
Resolve an artifact, falling back through parent containers |
require(key) |
Like resolve but returns the instance directly, throws on error |
peek(key) |
Synchronous non-resolving lookup |
invalidate(key) |
Invalidate a cached artifact, optionally force rebuild |
extend(parent) |
Add a parent container for fallback resolution |
scope(name) |
Create a namespaced ScopedContainer |
watch(key) |
Create an observer that fires on change |
on(event) |
Subscribe to lifecycle events (build:start, build:complete, build:error, artifact:invalidated, artifact:disposed, artifact:registered, stream:emit, container:dispose) |
export() |
Serialize singleton artifacts for persistence |
from(bundle) |
Static factory: create container from store + optional bundle + templates |
debugInfo() |
Snapshot of all artifacts' status, dependencies, and dependents |
Internal Architecture
Bus events → dispatch index → ExecutionContext.accept() → spawnRun()
│
┌────────────────────────────────────────┤
│ │
pipeline runs returns runId
│ │
┌───────┴───────┐ ExecutionContext
│ │ updates inFlight
paused completed / activeRunId
│ │
drainWatchQueue onComplete() → UI notified
│ │
resume() bookkeeping
│ │
┌───────┴───────┐ onCleanup()
│ │
re-paused completed → onComplete()
Testing
npm test
npm run test:watch