0.0.10 • Published 6 years ago

altflo v0.0.10

Weekly downloads
-
License
MIT
Repository
-
Last release
6 years ago

Make highly reliable workflows across microservices

(current version: 0.0.2, beta usage only)

Why?

This tool makes it simple to execute a complex sequence of tasks, where each task runs in a separate process / thread / microservice (optional). It's simpler to configure than Amazon SWF, and less complex than a home-baked solution of passing messages between microservices over a message queue (Redis, NSQ.io, Amazon SQS) or via IPC.

How it works:

  1. Build and visualize Workflows - the glue that links together a sequence of separate processes, or Workers.
  2. Workflows can include branches and parallelized executions.
  3. Workflows consist of a series of Tasks. Each Worker completes one Task. Each initiation of a Workflow is a Job. Branching and parallelization create additional JobThreads, which reference the original Job.
  4. A message queue of your choice is used to communicate between processes.
  5. A database of your choice is used to track completion details of all Jobs, JobThreads, and Tasks.
  6. Workers only need access to message queue endpoints. Run Workers anywhere access to these endpoints is available (local, in a cloud, etc.). Run multiple Workers for each Task for redundancy.
  7. Run WorkflowManager instances on infrastructure with access to message queue and database, these processes ensure Jobs move thru your Workflow appropriately.

Usage

npm install altflo
import altflo from 'altflo'
// OR
let altflo = require('altflo')

Creating a workflow (and visualizing it)

let altflo = require('altflo');
let WorkflowStep = altflo.WorkflowStep;
let WorkflowGraph = altflo.WorkflowGraph;
let GraphUtils = altflo.GraphUtils;

let step1 = new WorkflowStep('STEP_1', 'http://localhost/sample_queue_uri_1');
let step2 = new WorkflowStep('STEP_2', 'http://localhost/sample_queue_uri_2');
let step3 = new WorkflowStep('STEP_3', 'http://localhost/sample_queue_uri_3');
let step4 = new WorkflowStep('STEP_4', 'http://localhost/sample_queue_uri_4');
let step5 = new WorkflowStep('STEP_5', 'http://localhost/sample_queue_uri_5');
let step6 = new WorkflowStep('STEP_6', 'http://localhost/sample_queue_uri_6');
let step7 = new WorkflowStep('STEP_7', 'http://localhost/sample_queue_uri_7');
let step8 = new WorkflowStep('STEP_8', 'http://localhost/sample_queue_uri_8');

step7.addNextStep(step8);
step3.addNextSteps([step5, step6, step7]);
step2.addNextSteps([step3, step4]);
step1.addNextStep(step2);

// IN THIS EXAMPLE step1 IS THE ROOT NODE FOR OUR WORKFLOW GRAPH
let graph = new WorkflowGraph('SAMPLE_GRAPH_1', 'V1');
graph.loadWorkflowFromStepGraph(step1);

//OPTIONAL: SEE WHAT YOUR WORKFLOW LOOKS LIKE, USEFUL FOR COMPLEX BRANCHING SCENARIOS
console.log('graph structure: ' + GraphUtils.visualizeGraph(graph));

/*
OUTPUT OF THIS CODE IS:

graph structure:
(start): STEP_1
└─ STEP_2
   ├─ STEP_3
   │  ├─ STEP_5
   │  ├─ STEP_6
   │  └─ STEP_7
   │     └─ STEP_8
   └─ STEP_4
*/

Running your workflow

This example leverages Amazon SQS and PostgreSQL:

// ADD THIS CODE IMMEDIATELY AFTER THE ABOVE CODE
let WorkflowManager = altflo.WorkflowManager;
let AmazonSQSBackend = altflo.AmazonSQSBackend;
let PostgreSQLBackend = altflo.PostgreSQLBackend;

let WORKFLOW_QUEUE_URI = 'http://localhost/workflow_manager_queue';
let manager = new WorkflowManager(graph, WORKFLOW_QUEUE_URI, new AmazonSQSBackend(), new PostgreSQLBackend());
manager.initialize();

// VOILA! NOW RUN SEPARATE WORKER PROCESSES FOR EACH STEP IN THE WORKFLOW. SEE BELOW.

Individual processes (Workers)

Run these anywhere. Process data, crunch numbers, wait for events, etc.

let altflo = require('altflo');
let Worker = altflo.Worker;
let AmazonSQSBackend = altflo.AmazonSQSBackend;
let workerQueue = 'http://localhost/sample_queue_uri_1';
let managerQueue = 'http://localhost/workflow_manager_queue';

function performWork(messageBody, messageMetaData){
    // YOUR TASK HERE
    console.log('DOING SOME TASK, PART OF THE WORKFLOW');

    // CALL ME WHEN TASK COMPLETE
    worker.resumeWorkflow(messageBody, messageMetaData);
}

let worker = new Worker(workerQueue, managerQueue, new AmazonSQSBackend(), performWork);
worker.initialize();

// CREATE ONE OF THESE WORKERS FOR EACH TASK IN THE WORKFLOW.

A full, functional example

See /examples/example1 for a simple configuration that uses Amazon SQS and PostgreSQL The following steps will guide you thru setting up and running this workflow:

Configure SQS

Create the following .fifo queues in console.aws.amazon.com/sqs

w1-step1.fifo
w1-step2.fifo
w1-step3.fifo
w1-step4.fifo
workflow1.fifo

All queues should be set with Content-Based Deduplication turned on. NOTE: This means you'll need to send messages with different contents, otherwise they will not send.

Configure PostgreSQL

You'll need the following tables:

Jobs: jobId (serial, primary key), startTime (timestamp ), endTime (timestamp)

JobThreads: jobId (integer, foreign key), jobThreadId (serial, primary key), startTime (timestamp ), endTime (timestamp), startStepName (varchar)

Tasks: jobId (integer, foreign key), jobThreadId (integer, foreign key), taskId (serial, primary key), startTime (timestamp ), endTime (timestamp), stepName (varchar)

SET Node ENVIRONMENT variables

Anytime a WorkerManager is run, it needs access to the database backend. If PostgreSQL is running locally, your configuration for the demo may be as follows:

# PGUSER = Your PostgreSQL username
# PGHOST = localhost if PostgreSQL running locally
# PGDATABASE = Your database name
# PGPORT = # Your PostgreSQL port
export PGUSER='postgres' 
export PGHOST='localhost' 
export PGDATABASE='altflo-dev' 
export PGPORT='5432' 
export PGPASSWORD='yourpassword'

For this specific example, you'll also need to provide your Amazon account ID (see queueHandles.js)

export AWS_ACCOUNT_NUMBER='abc123' #Find this in your SQS .fifo queue URLs

Run processes with a process manager

cd examples/example1
pm2 start workflowManagerExample.js
pm2 start worker1Example.js
pm2 start worker2Example.js
pm2 start worker3Example.js
pm2 start worker4Example.js

pm2 list # Confirm all processes are running OK

To run a job thru your workflow:

Either programmatically, or thru the SQS console, send a message to workflow1.fifo with the following JSON body:

{
    "workflowStep": "INIT_WORKFLOW",
    "messageBody": { 
        "sampleVar": "add any data you want in the messageBody"
    }
}

To check Job completion:

See the Jobs database table in your provided database backend.

SELECT * FROM "Jobs";
SELECT * FROM "JobThreads";
SELECT * FROM "Tasks";

Planned Features:

  1. Unit tests
  2. More robust workers
  3. Recommended best-practices for retry & fail-over
  4. Conditional branching
  5. More comprehensive transaction logging
  6. Debug mode
  7. A browser admin component (D3 + React?) for visualizing workflows & job history.

Debatable Features:

  1. Merging of Workflow branches or parallel threads.
  2. Option to automatically create & manage message queue endpoints (on first run of a new workflow). Quicker setup for new users.

Contributing

Before implementing any code, please open an Issue and discuss. Fixes for bugs with open Issues are welcome, and implementation of new features will be merged if coordinated thru an Issue.

Implementation of additional backends are probably the easiest way to contribute! e.g.

  • Redis (QueueBackend)
  • NSQ.io (QueueBackend)
  • MongoDB (DatabaseBackend)
  • MySQL (DatabaseBackend)
  • Google Cloud Pub/Sub (QueueBackend)
  • Azure Service Bus Queues (QueueBackend)

Questions?

Create an Issue for any usage questions!

0.0.10

6 years ago

0.0.9

6 years ago

0.0.8

6 years ago

0.0.7

6 years ago

0.0.6

6 years ago

0.0.5

6 years ago

0.0.4

6 years ago

0.0.3

6 years ago

0.0.2

6 years ago

0.0.1

6 years ago

0.0.0

6 years ago