0.0.17 • Published 7 months ago

@datarster/workflow-engine v0.0.17

Weekly downloads
-
License
MIT
Repository
-
Last release
7 months ago

Workflow Engine

Overview

@datarster/workflow-engine is a powerful and extensible workflow management library for Node.js. It enables the execution of workflows using a JSON-based workflow definition. This engine supports sequential, parallel, conditional, looping, switch-case, and event-based workflows, with optional rollback functionality in case of failure.

Features

  • Supports various workflow step types: task, decision, parallel, loop, wait, commit, execute_then_wait, and switch.
  • Event-driven execution with workflow resumption capabilities.
  • Integration with Redis for state management and workflow caching.
  • MongoDB logging for workflow execution history.
  • Rollback support for failed steps with custom rollback workers.
  • Dynamic worker registration with timeout support.
  • Switch/case logic for conditional execution.
  • Flexible configuration for Redis and MongoDB connections.

Installation

npm install @datarster/workflow-engine

Usage

Import and Initialize the Engine

import { WorkflowEngine } from "@datarster/workflow-engine";

const config = {
    redis: { host: "localhost", port: 6379 },
    useLogging: true,
    mongo: { uri: "mongodb://localhost:27017", database: "workflowDB", collection: "logs" }
};

const engine = new WorkflowEngine();
await engine.init(config);

Define a Workflow

const workflow = {
    name: "Order Processing",
    steps: [
        { 
            id: "1", 
            type: "task", 
            name: "Validate Order", 
            worker: "validateOrder",
            workerTimeout: 30000,
            rollback: {
                id: "1_rollback",
                name: "Rollback Validation",
                type: "task",
                worker: "validateOrderRollback"
            }
        },
        { id: "2", type: "task", name: "Process Payment", worker: "processPayment" },
        { id: "3", type: "task", name: "Ship Order", worker: "shipOrder" }
    ]
};

await engine.registerWorkflow("order_processing", workflow);

Register Workers

engine.registerWorker("validateOrder", async (data) => {
    console.log("Validating order", data);
    // Your validation logic here
});

engine.registerWorker("validateOrderRollback", async (data) => {
    console.log("Rolling back order validation", data);
    // Your rollback logic here
});

engine.registerWorker("processPayment", async (data) => {
    console.log("Processing payment", data);
    // Your payment logic here
});

engine.registerWorker("shipOrder", async (data) => {
    console.log("Shipping order", data);
    // Your shipping logic here
});

Execute the Workflow

const executionId = await engine.executeWorkflow({
    workflowId: "order_processing",
    inputData: { orderId: 12345 },
    callbackOptions: {
        onStepComplete: (step, result) => {
            console.log(`Step ${step.name} completed:`, result);
        },
        onError: (step, error) => {
            console.error(`Step ${step.name} failed:`, error);
        }
    }
});
console.log("Workflow execution started with ID:", executionId);

Resume Workflow with Event

// Resume a waiting workflow with an event
await engine.resumeWorkflow(executionId, "external_approval_received", {
    approved: true,
    approver: "manager@example.com"
});

Fetch Workflow Logs

const log = await engine.fetchWorkflowLog(executionId);
console.log("Execution Log:", log);

Supported Workflow Step Types

Step TypeDescription
taskExecutes a registered worker function.
decisionEvaluates a condition and executes a true/false branch.
parallelRuns multiple steps in parallel.
loopRepeats execution based on a condition.
waitPauses execution until an event is received.
execute_then_waitExecutes a task and then waits for an external event.
switchEvaluates a field value and executes matching case.
commitMarks a workflow step as a commit step.

Advanced Workflow Examples

Switch/Case Workflow

const switchWorkflow = {
    name: "Order Status Processing",
    steps: [
        {
            id: "1",
            type: "switch",
            name: "Process Based on Status",
            switch_cases: [
                {
                    id: "case1",
                    field: "status",
                    value: "COMMITTED",
                    switch_case_steps: [
                        { id: "1.1", type: "task", name: "Handle Committed", worker: "handleCommitted" }
                    ]
                },
                {
                    id: "case2", 
                    field: "status",
                    value: "CANCELLED",
                    switch_case_steps: [
                        { id: "1.2", type: "task", name: "Handle Cancelled", worker: "handleCancelled" }
                    ]
                }
            ],
            default_case: {
                id: "default",
                type: "task",
                name: "Handle Default",
                worker: "handleDefault"
            }
        }
    ]
};

Parallel Execution

const parallelWorkflow = {
    name: "Parallel Processing",
    steps: [
        {
            id: "1",
            type: "parallel",
            name: "Process Multiple Tasks",
            parallel_steps: [
                { id: "1.1", type: "task", name: "Task A", worker: "workerA" },
                { id: "1.2", type: "task", name: "Task B", worker: "workerB" },
                { id: "1.3", type: "task", name: "Task C", worker: "workerC" }
            ]
        }
    ]
};

Event-Driven Workflow

const eventWorkflow = {
    name: "Event-Driven Process",
    steps: [
        { id: "1", type: "task", name: "Initial Task", worker: "initialWorker" },
        {
            id: "2",
            type: "wait",
            name: "Wait for Approval",
            event_name: "approval_received"
        },
        { id: "3", type: "task", name: "Final Task", worker: "finalWorker" }
    ]
};

Configuration Options

Redis Configuration

const redisConfig = {
    host: "localhost",
    port: 6379,
    // OR provide your own Redis client
    redisClient: existingRedisClient
};

MongoDB Configuration

const mongoConfig = {
    uri: "mongodb://localhost:27017",
    database: "workflowDB",
    collection: "logs",
    // OR provide your own MongoDB client
    mongoClient: existingMongoClient
};

Error Handling and Rollback

  • If a task fails, the workflow engine triggers rollback steps if they are defined.
  • Rollback steps are executed in reverse order (LIFO - Last In, First Out).
  • Errors during rollback steps are logged but do not halt the rollback process.
  • Each step can have its own rollback logic defined in the rollback property.

State Management

  • Redis: Used for caching workflow execution state and registered workflows.
  • MongoDB: Stores detailed workflow execution history and logs (optional).
  • In-Memory: Falls back to in-memory storage if Redis is not configured.

TypeScript Support

The library is written in TypeScript and provides full type definitions:

import { WorkflowEngine, WorkflowStepType, WorkflowDefinition } from "@datarster/workflow-engine";

Contributing

Contributions are welcome! Feel free to submit issues or pull requests on GitHub.

License

MIT

0.0.17

7 months ago

0.0.16

7 months ago

0.0.15

8 months ago

0.0.14

8 months ago

0.0.13

8 months ago

0.0.12

8 months ago

0.0.11

9 months ago

0.0.10

9 months ago

0.0.9

9 months ago

0.0.8

10 months ago

0.0.7

10 months ago

0.0.6

10 months ago

0.0.4

11 months ago

0.0.3

11 months ago

0.0.2

11 months ago

1.0.1

11 months ago

1.0.0

11 months ago