0.0.17 • Published 4 months ago

@datarster/workflow-engine v0.0.17

Weekly downloads
-
License
MIT
Repository
-
Last release
4 months ago

Workflow Engine

Overview

@datarster/workflow-engine is a powerful and extensible workflow management library for Node.js. It enables the execution of workflows using a JSON-based workflow definition. This engine supports sequential, parallel, conditional, looping, switch-case, and event-based workflows, with optional rollback functionality in case of failure.

Features

  • Supports various workflow step types: task, decision, parallel, loop, wait, commit, execute_then_wait, and switch.
  • Event-driven execution with workflow resumption capabilities.
  • Integration with Redis for state management and workflow caching.
  • MongoDB logging for workflow execution history.
  • Rollback support for failed steps with custom rollback workers.
  • Dynamic worker registration with timeout support.
  • Switch/case logic for conditional execution.
  • Flexible configuration for Redis and MongoDB connections.

Installation

npm install @datarster/workflow-engine

Usage

Import and Initialize the Engine

import { WorkflowEngine } from "@datarster/workflow-engine";

const config = {
    redis: { host: "localhost", port: 6379 },
    useLogging: true,
    mongo: { uri: "mongodb://localhost:27017", database: "workflowDB", collection: "logs" }
};

const engine = new WorkflowEngine();
await engine.init(config);

Define a Workflow

const workflow = {
    name: "Order Processing",
    steps: [
        { 
            id: "1", 
            type: "task", 
            name: "Validate Order", 
            worker: "validateOrder",
            workerTimeout: 30000,
            rollback: {
                id: "1_rollback",
                name: "Rollback Validation",
                type: "task",
                worker: "validateOrderRollback"
            }
        },
        { id: "2", type: "task", name: "Process Payment", worker: "processPayment" },
        { id: "3", type: "task", name: "Ship Order", worker: "shipOrder" }
    ]
};

await engine.registerWorkflow("order_processing", workflow);

Register Workers

engine.registerWorker("validateOrder", async (data) => {
    console.log("Validating order", data);
    // Your validation logic here
});

engine.registerWorker("validateOrderRollback", async (data) => {
    console.log("Rolling back order validation", data);
    // Your rollback logic here
});

engine.registerWorker("processPayment", async (data) => {
    console.log("Processing payment", data);
    // Your payment logic here
});

engine.registerWorker("shipOrder", async (data) => {
    console.log("Shipping order", data);
    // Your shipping logic here
});

Execute the Workflow

const executionId = await engine.executeWorkflow({
    workflowId: "order_processing",
    inputData: { orderId: 12345 },
    callbackOptions: {
        onStepComplete: (step, result) => {
            console.log(`Step ${step.name} completed:`, result);
        },
        onError: (step, error) => {
            console.error(`Step ${step.name} failed:`, error);
        }
    }
});
console.log("Workflow execution started with ID:", executionId);

Resume Workflow with Event

// Resume a waiting workflow with an event
await engine.resumeWorkflow(executionId, "external_approval_received", {
    approved: true,
    approver: "manager@example.com"
});

Fetch Workflow Logs

const log = await engine.fetchWorkflowLog(executionId);
console.log("Execution Log:", log);

Supported Workflow Step Types

Step TypeDescription
taskExecutes a registered worker function.
decisionEvaluates a condition and executes a true/false branch.
parallelRuns multiple steps in parallel.
loopRepeats execution based on a condition.
waitPauses execution until an event is received.
execute_then_waitExecutes a task and then waits for an external event.
switchEvaluates a field value and executes matching case.
commitMarks a workflow step as a commit step.

Advanced Workflow Examples

Switch/Case Workflow

const switchWorkflow = {
    name: "Order Status Processing",
    steps: [
        {
            id: "1",
            type: "switch",
            name: "Process Based on Status",
            switch_cases: [
                {
                    id: "case1",
                    field: "status",
                    value: "COMMITTED",
                    switch_case_steps: [
                        { id: "1.1", type: "task", name: "Handle Committed", worker: "handleCommitted" }
                    ]
                },
                {
                    id: "case2", 
                    field: "status",
                    value: "CANCELLED",
                    switch_case_steps: [
                        { id: "1.2", type: "task", name: "Handle Cancelled", worker: "handleCancelled" }
                    ]
                }
            ],
            default_case: {
                id: "default",
                type: "task",
                name: "Handle Default",
                worker: "handleDefault"
            }
        }
    ]
};

Parallel Execution

const parallelWorkflow = {
    name: "Parallel Processing",
    steps: [
        {
            id: "1",
            type: "parallel",
            name: "Process Multiple Tasks",
            parallel_steps: [
                { id: "1.1", type: "task", name: "Task A", worker: "workerA" },
                { id: "1.2", type: "task", name: "Task B", worker: "workerB" },
                { id: "1.3", type: "task", name: "Task C", worker: "workerC" }
            ]
        }
    ]
};

Event-Driven Workflow

const eventWorkflow = {
    name: "Event-Driven Process",
    steps: [
        { id: "1", type: "task", name: "Initial Task", worker: "initialWorker" },
        {
            id: "2",
            type: "wait",
            name: "Wait for Approval",
            event_name: "approval_received"
        },
        { id: "3", type: "task", name: "Final Task", worker: "finalWorker" }
    ]
};

Configuration Options

Redis Configuration

const redisConfig = {
    host: "localhost",
    port: 6379,
    // OR provide your own Redis client
    redisClient: existingRedisClient
};

MongoDB Configuration

const mongoConfig = {
    uri: "mongodb://localhost:27017",
    database: "workflowDB",
    collection: "logs",
    // OR provide your own MongoDB client
    mongoClient: existingMongoClient
};

Error Handling and Rollback

  • If a task fails, the workflow engine triggers rollback steps if they are defined.
  • Rollback steps are executed in reverse order (LIFO - Last In, First Out).
  • Errors during rollback steps are logged but do not halt the rollback process.
  • Each step can have its own rollback logic defined in the rollback property.

State Management

  • Redis: Used for caching workflow execution state and registered workflows.
  • MongoDB: Stores detailed workflow execution history and logs (optional).
  • In-Memory: Falls back to in-memory storage if Redis is not configured.

TypeScript Support

The library is written in TypeScript and provides full type definitions:

import { WorkflowEngine, WorkflowStepType, WorkflowDefinition } from "@datarster/workflow-engine";

Contributing

Contributions are welcome! Feel free to submit issues or pull requests on GitHub.

License

MIT

0.0.17

4 months ago

0.0.16

4 months ago

0.0.15

5 months ago

0.0.14

5 months ago

0.0.13

5 months ago

0.0.12

5 months ago

0.0.11

6 months ago

0.0.10

6 months ago

0.0.9

6 months ago

0.0.8

7 months ago

0.0.7

7 months ago

0.0.6

7 months ago

0.0.4

8 months ago

0.0.3

8 months ago

0.0.2

8 months ago

1.0.1

8 months ago

1.0.0

8 months ago