0.0.17 • Published 4 months ago
@datarster/workflow-engine v0.0.17
Workflow Engine
Overview
@datarster/workflow-engine
is a powerful and extensible workflow management library for Node.js. It enables the execution of workflows using a JSON-based workflow definition. This engine supports sequential, parallel, conditional, looping, switch-case, and event-based workflows, with optional rollback functionality in case of failure.
Features
- Supports various workflow step types: task, decision, parallel, loop, wait, commit, execute_then_wait, and switch.
- Event-driven execution with workflow resumption capabilities.
- Integration with Redis for state management and workflow caching.
- MongoDB logging for workflow execution history.
- Rollback support for failed steps with custom rollback workers.
- Dynamic worker registration with timeout support.
- Switch/case logic for conditional execution.
- Flexible configuration for Redis and MongoDB connections.
Installation
npm install @datarster/workflow-engine
Usage
Import and Initialize the Engine
import { WorkflowEngine } from "@datarster/workflow-engine";
const config = {
redis: { host: "localhost", port: 6379 },
useLogging: true,
mongo: { uri: "mongodb://localhost:27017", database: "workflowDB", collection: "logs" }
};
const engine = new WorkflowEngine();
await engine.init(config);
Define a Workflow
const workflow = {
name: "Order Processing",
steps: [
{
id: "1",
type: "task",
name: "Validate Order",
worker: "validateOrder",
workerTimeout: 30000,
rollback: {
id: "1_rollback",
name: "Rollback Validation",
type: "task",
worker: "validateOrderRollback"
}
},
{ id: "2", type: "task", name: "Process Payment", worker: "processPayment" },
{ id: "3", type: "task", name: "Ship Order", worker: "shipOrder" }
]
};
await engine.registerWorkflow("order_processing", workflow);
Register Workers
engine.registerWorker("validateOrder", async (data) => {
console.log("Validating order", data);
// Your validation logic here
});
engine.registerWorker("validateOrderRollback", async (data) => {
console.log("Rolling back order validation", data);
// Your rollback logic here
});
engine.registerWorker("processPayment", async (data) => {
console.log("Processing payment", data);
// Your payment logic here
});
engine.registerWorker("shipOrder", async (data) => {
console.log("Shipping order", data);
// Your shipping logic here
});
Execute the Workflow
const executionId = await engine.executeWorkflow({
workflowId: "order_processing",
inputData: { orderId: 12345 },
callbackOptions: {
onStepComplete: (step, result) => {
console.log(`Step ${step.name} completed:`, result);
},
onError: (step, error) => {
console.error(`Step ${step.name} failed:`, error);
}
}
});
console.log("Workflow execution started with ID:", executionId);
Resume Workflow with Event
// Resume a waiting workflow with an event
await engine.resumeWorkflow(executionId, "external_approval_received", {
approved: true,
approver: "manager@example.com"
});
Fetch Workflow Logs
const log = await engine.fetchWorkflowLog(executionId);
console.log("Execution Log:", log);
Supported Workflow Step Types
Step Type | Description |
---|---|
task | Executes a registered worker function. |
decision | Evaluates a condition and executes a true/false branch. |
parallel | Runs multiple steps in parallel. |
loop | Repeats execution based on a condition. |
wait | Pauses execution until an event is received. |
execute_then_wait | Executes a task and then waits for an external event. |
switch | Evaluates a field value and executes matching case. |
commit | Marks a workflow step as a commit step. |
Advanced Workflow Examples
Switch/Case Workflow
const switchWorkflow = {
name: "Order Status Processing",
steps: [
{
id: "1",
type: "switch",
name: "Process Based on Status",
switch_cases: [
{
id: "case1",
field: "status",
value: "COMMITTED",
switch_case_steps: [
{ id: "1.1", type: "task", name: "Handle Committed", worker: "handleCommitted" }
]
},
{
id: "case2",
field: "status",
value: "CANCELLED",
switch_case_steps: [
{ id: "1.2", type: "task", name: "Handle Cancelled", worker: "handleCancelled" }
]
}
],
default_case: {
id: "default",
type: "task",
name: "Handle Default",
worker: "handleDefault"
}
}
]
};
Parallel Execution
const parallelWorkflow = {
name: "Parallel Processing",
steps: [
{
id: "1",
type: "parallel",
name: "Process Multiple Tasks",
parallel_steps: [
{ id: "1.1", type: "task", name: "Task A", worker: "workerA" },
{ id: "1.2", type: "task", name: "Task B", worker: "workerB" },
{ id: "1.3", type: "task", name: "Task C", worker: "workerC" }
]
}
]
};
Event-Driven Workflow
const eventWorkflow = {
name: "Event-Driven Process",
steps: [
{ id: "1", type: "task", name: "Initial Task", worker: "initialWorker" },
{
id: "2",
type: "wait",
name: "Wait for Approval",
event_name: "approval_received"
},
{ id: "3", type: "task", name: "Final Task", worker: "finalWorker" }
]
};
Configuration Options
Redis Configuration
const redisConfig = {
host: "localhost",
port: 6379,
// OR provide your own Redis client
redisClient: existingRedisClient
};
MongoDB Configuration
const mongoConfig = {
uri: "mongodb://localhost:27017",
database: "workflowDB",
collection: "logs",
// OR provide your own MongoDB client
mongoClient: existingMongoClient
};
Error Handling and Rollback
- If a task fails, the workflow engine triggers rollback steps if they are defined.
- Rollback steps are executed in reverse order (LIFO - Last In, First Out).
- Errors during rollback steps are logged but do not halt the rollback process.
- Each step can have its own rollback logic defined in the
rollback
property.
State Management
- Redis: Used for caching workflow execution state and registered workflows.
- MongoDB: Stores detailed workflow execution history and logs (optional).
- In-Memory: Falls back to in-memory storage if Redis is not configured.
TypeScript Support
The library is written in TypeScript and provides full type definitions:
import { WorkflowEngine, WorkflowStepType, WorkflowDefinition } from "@datarster/workflow-engine";
Contributing
Contributions are welcome! Feel free to submit issues or pull requests on GitHub.
License
MIT