@bemedev/pipe-machine
Features
- 2-Step Builder - Define input parameters and starting context
directly via an initializer function in
createPipe, then provide implementations in.define(). - Type Inference - Automatic inference of inputs, context, and steps from your initializer function and configuration. No manual type specifications needed.
- Strongly-Typed Pipes - Full TypeScript type inference and validation throughout the pipeline.
- Named-Step Support - Create named pipeline steps for better code clarity and partial overrides.
- Async Support - Handle both synchronous and asynchronous function pipelines seamlessly.
- Duplicate Key Support - Reuse an action name to run the same function multiple times in the pipeline.
Installation
npm install @bemedev/pipe-machine
# or
pnpm add @bemedev/pipe-machine
Quick Start
import { createPipe } from '@bemedev/pipe-machine';
const pipe = createPipe((x: number) => ({ value: x }), 'double', 'add10')
.define({
actions: {
double: ctx => ({ value: ctx.value * 2 }),
add10: ctx => ({ value: ctx.value + 10 }),
},
})
.build(ctx => ctx.value);
pipe(5); // 20
The 2-Step Flow
Step 1 — createPipe(initializer, ...configs)
Declares the entry point function (which determines the inputs and initial context) and the ordered list of named steps or configurations in the pipeline.
const builder = createPipe(
(input: string) => ({ text: input, count: 0 }),
'parse',
'validate',
);
Step 2 — .define(impl)
Provides the function implementations for actions, guards, and delays. Types are automatically inferred from the initializer function and the configuration array.
const runner = builder.define({
actions: {
parse: ctx => ({ ...ctx, count: parseInt(ctx.text, 10) }),
validate: ctx => ({ ...ctx, count: Math.abs(ctx.count) }),
},
});
runner('−42'); // { text: '−42', count: 42 }
Advanced Usage
Duplicate keys
Repeat a key name to run that function more than once:
const fn = createPipe(
(x: number) => ({ value: x }),
'add1',
'double',
'add1',
'double',
'add1',
)
.define({
actions: {
add1: ctx => ({ value: ctx.value + 1 }),
double: ctx => ({ value: ctx.value * 2 }),
},
})
.build(ctx => ctx.value);
fn(2); // ((((2+1)*2)+1)*2)+1 = 15
All actions operate on the unified context type returned by the
initializer, mapping Context to Context | Promise<Context>.
Multi-argument first step
The initializer function can accept any number of parameters, which defines the inputs of the completed pipeline:
const fn = createPipe(
(a: number, b: number) => ({ value: Math.hypot(a, b) }),
'double',
)
.define({
actions: {
double: ctx => ({ value: ctx.value * 2 }),
},
})
.build(ctx => ctx.value);
fn(3, 4); // 10
Async pipelines
If the initializer function or any step action returns a Promise, the
entire pipeline becomes async (returns a Promise):
const fn = createPipe(
async (url: string) => ({ text: await (await fetch(url)).text() }),
'parse',
)
.define({
actions: {
parse: ctx => ({ value: parseInt(ctx.text, 10) }),
},
})
.build(ctx => ctx.value);
await fn('https://example.com/value'); // number
Partial overrides
After building a pipeline, create variants by overriding specific steps:
const base = createPipe(
(x: number) => ({ value: x }),
'add1',
'double',
).define({
actions: {
add1: ctx => ({ value: ctx.value + 1 }),
double: ctx => ({ value: ctx.value * 2 }),
},
});
base(5); // { value: 12 }
const tripled = base.define({
actions: {
double: ctx => ({ value: ctx.value * 3 }),
},
});
tripled(5); // { value: 18 }
API
createPipe(initializer: (...params) => Context, ...configs: Config[]): MachineTyped
Creates a pipeline builder. The initializer function defines the
parameter inputs and initial context. The configs are an ordered sequence
of configurations, which can be plain actions (strings/Describers),
conditional branches (Condition[]), or delayed actions (Delayed).
.define(impl: MachineDefineInput): MachinePipeline
Provides implementations for the required actions, guards, and delays. Returns the completed, callable pipeline.
pipeline(...params): Context | Promise<Context>
Calls the composed pipeline. Returns a Promise if the initializer or any
action/delay is async, otherwise returns synchronously.
pipeline.define(overrides: Partial<MachineDefineInput>): MachinePipeline
Creates a new pipeline with some actions, guards, or delays replaced. Original pipeline is unchanged.
pipeline.build<T>(select: (ctx: Context) => T): (...params) => T
Transforms the output of the pipeline using a selector function. Returns a Promise-based function if the pipeline is async.
Exported Types
| Type | Description |
|---|---|
MachineTyped |
Returned by createPipe(...) |
MachinePipeline |
Completed callable pipeline |
MachineDefineInput |
Shape of the .define(impl) argument |
Describer |
Step key type: string or { name: string; description: string } |
FromDescriber<D> |
Extracts the string key name from a Describer |
Config |
Configuration object shape for guards and delays |
Condition |
Guard condition type |
Delayed |
Delayed action configuration with delay timing |
GuardConfig |
Shape for guard configuration |
ExtractActions<C> |
Extracts action names from a config |
ExtractGuards<C> |
Extracts guard names from a config |
ExtractDelays<C> |
Extracts delay names from a config |
SoA |
Struct-of-Arrays utility type |
Licence
MIT
CHANGE_LOG
Read CHANGELOG.md for more details about the changes.
Author
chlbri (bri_lvi@icloud.com)