0.0.17 • Published 7 months ago
@datarster/workflow-engine v0.0.17
Workflow Engine
Overview
@datarster/workflow-engine is a powerful and extensible workflow management library for Node.js. It enables the execution of workflows using a JSON-based workflow definition. This engine supports sequential, parallel, conditional, looping, switch-case, and event-based workflows, with optional rollback functionality in case of failure.
Features
- Supports various workflow step types: task, decision, parallel, loop, wait, commit, execute_then_wait, and switch.
- Event-driven execution with workflow resumption capabilities.
- Integration with Redis for state management and workflow caching.
- MongoDB logging for workflow execution history.
- Rollback support for failed steps with custom rollback workers.
- Dynamic worker registration with timeout support.
- Switch/case logic for conditional execution.
- Flexible configuration for Redis and MongoDB connections.
Installation
npm install @datarster/workflow-engineUsage
Import and Initialize the Engine
import { WorkflowEngine } from "@datarster/workflow-engine";
const config = {
redis: { host: "localhost", port: 6379 },
useLogging: true,
mongo: { uri: "mongodb://localhost:27017", database: "workflowDB", collection: "logs" }
};
const engine = new WorkflowEngine();
await engine.init(config);Define a Workflow
const workflow = {
name: "Order Processing",
steps: [
{
id: "1",
type: "task",
name: "Validate Order",
worker: "validateOrder",
workerTimeout: 30000,
rollback: {
id: "1_rollback",
name: "Rollback Validation",
type: "task",
worker: "validateOrderRollback"
}
},
{ id: "2", type: "task", name: "Process Payment", worker: "processPayment" },
{ id: "3", type: "task", name: "Ship Order", worker: "shipOrder" }
]
};
await engine.registerWorkflow("order_processing", workflow);Register Workers
engine.registerWorker("validateOrder", async (data) => {
console.log("Validating order", data);
// Your validation logic here
});
engine.registerWorker("validateOrderRollback", async (data) => {
console.log("Rolling back order validation", data);
// Your rollback logic here
});
engine.registerWorker("processPayment", async (data) => {
console.log("Processing payment", data);
// Your payment logic here
});
engine.registerWorker("shipOrder", async (data) => {
console.log("Shipping order", data);
// Your shipping logic here
});Execute the Workflow
const executionId = await engine.executeWorkflow({
workflowId: "order_processing",
inputData: { orderId: 12345 },
callbackOptions: {
onStepComplete: (step, result) => {
console.log(`Step ${step.name} completed:`, result);
},
onError: (step, error) => {
console.error(`Step ${step.name} failed:`, error);
}
}
});
console.log("Workflow execution started with ID:", executionId);Resume Workflow with Event
// Resume a waiting workflow with an event
await engine.resumeWorkflow(executionId, "external_approval_received", {
approved: true,
approver: "manager@example.com"
});Fetch Workflow Logs
const log = await engine.fetchWorkflowLog(executionId);
console.log("Execution Log:", log);Supported Workflow Step Types
| Step Type | Description |
|---|---|
task | Executes a registered worker function. |
decision | Evaluates a condition and executes a true/false branch. |
parallel | Runs multiple steps in parallel. |
loop | Repeats execution based on a condition. |
wait | Pauses execution until an event is received. |
execute_then_wait | Executes a task and then waits for an external event. |
switch | Evaluates a field value and executes matching case. |
commit | Marks a workflow step as a commit step. |
Advanced Workflow Examples
Switch/Case Workflow
const switchWorkflow = {
name: "Order Status Processing",
steps: [
{
id: "1",
type: "switch",
name: "Process Based on Status",
switch_cases: [
{
id: "case1",
field: "status",
value: "COMMITTED",
switch_case_steps: [
{ id: "1.1", type: "task", name: "Handle Committed", worker: "handleCommitted" }
]
},
{
id: "case2",
field: "status",
value: "CANCELLED",
switch_case_steps: [
{ id: "1.2", type: "task", name: "Handle Cancelled", worker: "handleCancelled" }
]
}
],
default_case: {
id: "default",
type: "task",
name: "Handle Default",
worker: "handleDefault"
}
}
]
};Parallel Execution
const parallelWorkflow = {
name: "Parallel Processing",
steps: [
{
id: "1",
type: "parallel",
name: "Process Multiple Tasks",
parallel_steps: [
{ id: "1.1", type: "task", name: "Task A", worker: "workerA" },
{ id: "1.2", type: "task", name: "Task B", worker: "workerB" },
{ id: "1.3", type: "task", name: "Task C", worker: "workerC" }
]
}
]
};Event-Driven Workflow
const eventWorkflow = {
name: "Event-Driven Process",
steps: [
{ id: "1", type: "task", name: "Initial Task", worker: "initialWorker" },
{
id: "2",
type: "wait",
name: "Wait for Approval",
event_name: "approval_received"
},
{ id: "3", type: "task", name: "Final Task", worker: "finalWorker" }
]
};Configuration Options
Redis Configuration
const redisConfig = {
host: "localhost",
port: 6379,
// OR provide your own Redis client
redisClient: existingRedisClient
};MongoDB Configuration
const mongoConfig = {
uri: "mongodb://localhost:27017",
database: "workflowDB",
collection: "logs",
// OR provide your own MongoDB client
mongoClient: existingMongoClient
};Error Handling and Rollback
- If a task fails, the workflow engine triggers rollback steps if they are defined.
- Rollback steps are executed in reverse order (LIFO - Last In, First Out).
- Errors during rollback steps are logged but do not halt the rollback process.
- Each step can have its own rollback logic defined in the
rollbackproperty.
State Management
- Redis: Used for caching workflow execution state and registered workflows.
- MongoDB: Stores detailed workflow execution history and logs (optional).
- In-Memory: Falls back to in-memory storage if Redis is not configured.
TypeScript Support
The library is written in TypeScript and provides full type definitions:
import { WorkflowEngine, WorkflowStepType, WorkflowDefinition } from "@datarster/workflow-engine";Contributing
Contributions are welcome! Feel free to submit issues or pull requests on GitHub.
License
MIT
0.0.17
7 months ago
0.0.16
7 months ago
0.0.15
8 months ago
0.0.14
8 months ago
0.0.13
8 months ago
0.0.12
8 months ago
0.0.11
9 months ago
0.0.10
9 months ago
0.0.9
9 months ago
0.0.8
10 months ago
0.0.7
10 months ago
0.0.6
10 months ago
0.0.4
11 months ago
0.0.3
11 months ago
0.0.2
11 months ago
1.0.1
11 months ago
1.0.0
11 months ago