@pgflow/core v0.3.0
pgflow SQL Core
PostgreSQL-native workflow engine for defining, managing, and tracking DAG-based workflows directly in your database.
!NOTE This project and all its components are licensed under Apache 2.0 license.
!WARNING This project uses Atlas to manage the schemas and migrations. See ATLAS.md for more details.
Table of Contents
Overview
The pgflow SQL Core provides the data model, state machine, and transactional functions for workflow management. It treats workflows as Directed Acyclic Graphs (DAGs) of steps, each step being a simple state machine.
This package focuses on:
- Defining and storing workflow shapes
- Managing workflow state transitions
- Exposing transactional functions for workflow operations
- Providing two-phase APIs for reliable task polling and status updates
The actual execution of workflow tasks is handled by the Edge Worker, which calls back to the SQL Core to acknowledge task completion or failure.
Key Features
- Declarative Workflows: Define flows and steps via SQL tables
- Dependency Management: Explicit step dependencies with atomic transitions
- Configurable Behavior: Per-flow and per-step options for timeouts, retries, and delays
- Queue Integration: Built on pgmq for reliable task processing
- Transactional Guarantees: All state transitions are ACID-compliant
Architecture
Schema Design
Schema ERD Diagram (click to enlarge)
The schema consists of two main categories of tables:
Static definition tables
flows(just an identity for the workflow with some global options)steps(DAG nodes belonging to particularflows, with option overrides)deps(DAG edges betweensteps)
Runtime state tables
runs(execution instances offlows)step_states(states of individualstepswithin arun)step_tasks(units of work for individualstepswithin arun, so we can have fanouts)
Execution Model
The SQL Core handles the workflow lifecycle through these key operations:
- Definition: Workflows are defined using
create_flowandadd_step - Instantiation: Workflow instances are started with
start_flow, creating a new run - Task Retrieval: The Edge Worker uses two-phase polling - first
read_with_pollto reserve queue messages, thenstart_tasksto convert them to executable tasks - State Transitions: When the Edge Worker reports back using
complete_taskorfail_task, the SQL Core handles state transitions and schedules dependent steps
Flow lifecycle diagram (click to enlarge)
Example flow and its life
Let's walk through creating and running a workflow that fetches a website, does summarization and sentiment analysis in parallel steps and saves the results to a database.
Defining a Workflow
Workflows are defined using two SQL functions: create_flow and add_step.
In this example, we'll create a workflow with:
websiteas the entry point ("root step")sentimentandsummaryas parallel steps that depend onwebsitesaveToDbas the final step, depending on both parallel steps
-- Define workflow with parallel steps
SELECT pgflow.create_flow('analyze_website');
SELECT pgflow.add_step('analyze_website', 'website');
SELECT pgflow.add_step('analyze_website', 'sentiment', deps_slugs => ARRAY['website']);
SELECT pgflow.add_step('analyze_website', 'summary', deps_slugs => ARRAY['website']);
SELECT pgflow.add_step('analyze_website', 'saveToDb', deps_slugs => ARRAY['sentiment', 'summary']);!WARNING You need to call
add_stepin topological order, which is enforced by foreign key constraints.!NOTE You can have multiple "root steps" in a workflow. You can even create a root-steps-only workflow to process a single input in parallel, because at the end, all of the outputs from steps that does not have dependents ("final steps") are aggregated and saved as run's
output.
Starting a Workflow Run
To start a workflow, call start_flow with a flow slug and input arguments:
SELECT * FROM pgflow.start_flow(
flow_slug => 'analyze_website',
input => '{"url": "https://example.com"}'::jsonb
);
-- run_id | flow_slug | status | input | output | remaining_steps
-- ------------+-----------------+---------+--------------------------------+--------+-----------------
-- <run uuid> | analyze_website | started | {"url": "https://example.com"} | [NULL] | 4When a workflow starts:
- A new
runrecord is created - Initial states for all steps are created
- Root steps are marked as
started - Tasks are created for root steps
- Messages are enqueued on PGMQ for worker processing
!NOTE The
inputargument must be a valid JSONB object: string, number, boolean, array, object or null.
Workflow Execution
Task Polling
The Edge Worker uses a two-phase approach to retrieve and start tasks:
Phase 1 - Reserve Messages:
SELECT * FROM pgflow.read_with_poll(
queue_name => 'analyze_website',
vt => 60, -- visibility timeout in seconds
qty => 5 -- maximum number of messages to fetch
);Phase 2 - Start Tasks:
SELECT * FROM pgflow.start_tasks(
flow_slug => 'analyze_website',
msg_ids => ARRAY[101, 102, 103], -- message IDs from phase 1
worker_id => '550e8400-e29b-41d4-a716-446655440000'::uuid
);How it works:
- read_with_poll reserves raw queue messages and hides them from other workers
- start_tasks finds matching step_tasks, increments attempts counter, and builds task inputs
- Task metadata and input are returned to the worker for execution
This two-phase approach ensures tasks always exist before processing begins, eliminating race conditions that could occur with single-phase polling.
Task Completion
After successful processing, the worker acknowledges completion:
SELECT pgflow.complete_task(
run_id => '<run_uuid>',
step_slug => 'website',
task_index => 0, -- we will have multiple tasks for a step in the future
output => '{"content": "HTML content", "status": 200}'::jsonb
);When a task completes:
- The task status is updated to 'completed' and the output is saved
- The message is archived in PGMQ
- The step state is updated to 'completed'
- Dependent steps with all dependencies completed are automatically started
- The run's remaining_steps counter is decremented
- If all steps are completed, the run is marked as completed with aggregated outputs
Error Handling
If a task fails, the worker acknowledges this using fail_task:
SELECT pgflow.fail_task(
run_id => '<run_uuid>',
step_slug => 'website',
task_index => 0,
error_message => 'Connection timeout when fetching URL'::text
);The system handles failures by:
- Checking if retry attempts are available
- For available retries:
- Keeping the task in 'queued' status
- Applying exponential backoff for visibility
- Preventing processing until the visibility timeout expires
- When retries are exhausted:
- Marking the task as 'failed'
- Marking the step as 'failed'
- Marking the run as 'failed'
- Archiving the message in PGMQ
- Notifying workers to abort pending tasks (future feature)
Retries and Timeouts
Retry behavior can be configured at both the flow and step level:
-- Flow-level defaults
SELECT pgflow.create_flow(
flow_slug => 'analyze_website',
max_attempts => 3, -- Maximum retry attempts (including first attempt)
base_delay => 5, -- Base delay in seconds for exponential backoff
timeout => 60 -- Task timeout in seconds
);
-- Step-level overrides
SELECT pgflow.add_step(
flow_slug => 'analyze_website',
step_slug => 'sentiment',
deps_slugs => ARRAY['website']::text[],
max_attempts => 5, -- Override max attempts for this step
base_delay => 2, -- Override base delay for exponential backoff
timeout => 30 -- Override timeout for this step
);The system applies exponential backoff for retries using the formula:
delay = base_delay * (2 ^ attempts_count)Timeouts are enforced by setting the message visibility timeout to the step's timeout value plus a small buffer. If a worker doesn't acknowledge completion or failure within this period, the task becomes visible again and can be retried.
TypeScript Flow DSL
!NOTE TypeScript Flow DSL is a Work In Progress and is not ready yet!
Overview
While the SQL Core engine handles workflow definitions and state management, the primary way to define and work with your workflow logic is via the Flow DSL in TypeScript. This DSL offers a fluent API that makes it straightforward to outline the steps in your flow with full type safety.
Type Inference System
The most powerful feature of the Flow DSL is its automatic type inference system:
- You only need to annotate the initial Flow input type
- The return type of each step is automatically inferred from your handler function
- These return types become available in the payload of dependent steps
- The TypeScript compiler builds a complete type graph matching your workflow DAG
This means you get full IDE autocompletion and type checking throughout your workflow without manual type annotations.
Basic Example
Here's an example that matches our website analysis workflow:
// Provide a type for the input of the Flow
type Input = {
url: string;
};
const AnalyzeWebsite = new Flow<Input>({
slug: 'analyze_website',
maxAttempts: 3,
baseDelay: 5,
timeout: 10,
})
.step(
{ slug: 'website' },
async (input) => await scrapeWebsite(input.run.url)
)
.step(
{ slug: 'sentiment', dependsOn: ['website'], timeout: 30, maxAttempts: 5 },
async (input) => await analyzeSentiment(input.website.content)
)
.step(
{ slug: 'summary', dependsOn: ['website'] },
async (input) => await summarizeWithAI(input.website.content)
)
.step(
{ slug: 'saveToDb', dependsOn: ['sentiment', 'summary'] },
async (input) =>
await saveToDb({
websiteUrl: input.run.url,
sentiment: input.sentiment.score,
summary: input.summary,
}).status
);How Payload Types Are Built
The payload object for each step is constructed dynamically based on:
- The
runproperty: Always contains the original workflow input - Dependency outputs: Each dependency's output is available under a key matching the dependency's ID
- DAG structure: Only outputs from direct dependencies are included in the payload
This means your step handlers receive exactly the data they need, properly typed, without any manual type declarations beyond the initial Flow input type.
Benefits of Automatic Type Inference
- Refactoring safety: Change a step's output, and TypeScript will flag all dependent steps that need updates
- Discoverability: IDE autocompletion shows exactly what data is available in each step
- Error prevention: Catch typos and type mismatches at compile time, not runtime
- Documentation: The types themselves serve as living documentation of your workflow's data flow
Data Flow
Input and Output Handling
Handlers in pgflow must return JSON-serializable values that are captured and saved when complete_task is called. These outputs become available as inputs to dependent steps, allowing data to flow through your workflow pipeline.
When a step is executed, it receives an input object where:
- Each key is a step_slug of a completed dependency
- Each value is that step's output
- A special "run" key contains the original workflow input
Example: sentiment
When the sentiment step runs, it receives:
{
"run": { "url": "https://example.com" },
"website": { "content": "HTML content", "status": 200 }
}Example: saveToDb
The saveToDb step depends on both sentiment and summary:
{
"run": { "url": "https://example.com" },
"sentiment": { "score": 0.85, "label": "positive" },
"summary": "This website discusses various topics related to technology and innovation."
}Run Completion
When all steps in a run are completed, the run status is automatically updated to 'completed' and its output is set. The output is an aggregation of all the outputs from final steps (steps that have no dependents):
-- Example of a completed run with output
SELECT run_id, status, output FROM pgflow.runs WHERE run_id = '<run_uuid>';
-- run_id | status | output
-- ------------+-----------+-----------------------------------------------------
-- <run uuid> | completed | {"saveToDb": {"status": "success"}}5 months ago
5 months ago
5 months ago
5 months ago
5 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago
7 months ago