0.16.2 • Published 6 months ago

@flowcore/pathways v0.16.2

Weekly downloads
-
License
MIT
Repository
github
Last release
6 months ago

Flowcore Pathways

A TypeScript Library for creating Flowcore Pathways, simplifying the integration with the Flowcore platform. Flowcore Pathways helps you build event-driven applications with type-safe pathways for processing and producing events.

Table of Contents

Installation

# Bun
bunx jsr add @flowcore/pathways

# Deno
deno add jsr:@flowcore/pathways

# npm / yarn
npx jsr add @flowcore/pathways

or using npm:

npm install @flowcore/pathways

or using yarn:

yarn add @flowcore/pathways

Getting Started

Here's a basic example to get you started with Flowcore Pathways:

import { z } from "zod"
import { PathwaysBuilder } from "@flowcore/pathways"

// Define your event schema
const userSchema = z.object({
  id: z.string(),
  name: z.string(),
  email: z.string(),
})

// Create a pathways builder
const pathways = new PathwaysBuilder({
  baseUrl: "https://api.flowcore.io",
  tenant: "your-tenant",
  dataCore: "your-data-core",
  apiKey: "your-api-key",
})

// Register a pathway
pathways
  .register({
    flowType: "user",
    eventType: "created",
    schema: userSchema,
  })
  .handle("user/created", async (event) => {
    console.log(`Processing user created event: ${event.eventId}`)
    console.log(`User data:`, event.payload)

    // Process the event...

    // You can write to another pathway if needed
    await pathways.write("notifications/sent", {
      data: {
        userId: event.payload.id,
        message: `Welcome ${event.payload.name}!`,
        channel: "email",
      },
    })
  })

Core Concepts

Flowcore Pathways is built around these core concepts:

  • PathwaysBuilder: The main entry point for creating and managing pathways
  • Pathways: Define event flows with schemas for type safety
  • Handlers: Process incoming events
  • Writers: Send events to pathways
  • Router: Direct incoming events to the appropriate pathway
  • Persistence: Store pathway state for reliable processing

Usage

Creating a Pathways Builder

The PathwaysBuilder is the main configuration point for your pathways:

import { PathwaysBuilder } from "@flowcore/pathways"

const pathways = new PathwaysBuilder({
  baseUrl: "https://api.flowcore.io",
  tenant: "your-tenant",
  dataCore: "your-data-core",
  apiKey: "your-api-key",
  pathwayTimeoutMs: 10000, // Optional, default is 10000 (10s)
  logger: customLogger, // Optional, defaults to NoopLogger
})

Registering Pathways

Register pathways with their schemas for type-safe event handling:

import { z } from "zod"

// Define your event schema
const orderSchema = z.object({
  orderId: z.string(),
  userId: z.string(),
  total: z.number(),
  items: z.array(
    z.Object({
      id: z.string(),
      quantity: z.number(),
    }),
  ),
})

// Register pathway
pathways.register({
  flowType: "order",
  eventType: "placed",
  schema: orderSchema,
  writable: true, // Optional, default is true
  maxRetries: 3, // Optional, default is 3
  retryDelayMs: 500, // Optional, default is 500
})

Handling Events

Set up handlers to process events for specific pathways:

const pathwayKey = "order/placed"

pathways.handle(pathwayKey, async (event) => {
  console.log(`Processing order ${event.payload.orderId}`)

  // Access typed payload data
  const { userId, total, items } = event.payload

  // Your business logic here
  await updateInventory(items)
  await notifyUser(userId, total)
})

Writing Events

Send events to pathways:

// Basic write
const eventId = await pathways.write("order/placed", {
  data: {
    orderId: "ord-123",
    userId: "user-456",
    total: 99.99,
    items: [
      { id: "item-1", quantity: 2 },
    ],
  },
})

// Write with metadata
const eventId2 = await pathways.write("order/placed", {
  data: orderData,
  metadata: {
    correlationId: "corr-789",
    source: "checkout-service",
  },
})

// Fire-and-forget mode (doesn't wait for processing)
const eventId3 = await pathways.write("order/placed", {
  data: orderData,
  options: {
    fireAndForget: true,
  },
})

// Batch write multiple events
const eventIds = await pathways.write("order/placed", {
  batch: true,
  data: [orderData1, orderData2, orderData3],
})

// Batch write with metadata
const eventIds2 = await pathways.write("order/placed", {
  batch: true,
  data: [orderData1, orderData2],
  metadata: {
    source: "bulk-import",
  },
})

Error Handling

Handle errors in pathway processing:

// Error handler for a specific pathway
pathways.onError("order/placed", (error, event) => {
  console.error(`Error processing order ${event.payload.orderId}:`, error)
  reportToMonitoring(error, event)
})

// Global error handler for all pathways
pathways.onAnyError((error, event, pathway) => {
  console.error(`Error in pathway ${pathway}:`, error)
  reportToMonitoring(error, event, pathway)
})

Event Observability

Subscribe to events for observability at different stages:

// Before processing
pathways.subscribe("order/placed", (event) => {
  console.log(`About to process order ${event.payload.orderId}`)
}, "before")

// After processing
pathways.subscribe("order/placed", (event) => {
  console.log(`Finished processing order ${event.payload.orderId}`)
}, "after")

// At both stages
pathways.subscribe("order/placed", (event) => {
  console.log(`Event ${event.eventId} at ${new Date().toISOString()}`)
}, "all")

Setting up a Router

The PathwayRouter routes incoming events to the appropriate pathway:

import { PathwayRouter } from "@flowcore/pathways"

// Create a router with a secret key for validation
const WEBHOOK_SECRET = "your-webhook-secret"
const router = new PathwayRouter(pathways, WEBHOOK_SECRET)

// Process an incoming event from a webhook
async function handleWebhook(req: Request) {
  const event = await req.json()
  const secret = req.headers.get("X-Webhook-Secret")

  try {
    // This validates the secret and routes to the right pathway
    await router.processEvent(event, secret)
    return new Response("Event processed", { status: 200 })
  } catch (error) {
    console.error("Error processing event:", error)
    return new Response("Error processing event", { status: 500 })
  }
}

HTTP Server Integration

Integrate with Deno's HTTP server:

import { serve } from "https://deno.land/std/http/server.ts"

serve(async (req: Request) => {
  const url = new URL(req.url)

  if (req.method === "POST" && url.pathname === "/webhook") {
    return handleWebhook(req)
  }

  return new Response("Not found", { status: 404 })
}, { port: 3000 })

Persistence Options

Flowcore Pathways supports different persistence options to track processed events and ensure exactly-once processing.

Default In-Memory KV Store (Development)

By default, Flowcore Pathways uses an internal in-memory KV store for persistence:

// The default persistence is used automatically, no explicit setup required
const pathways = new PathwaysBuilder({
  baseUrl: "https://api.flowcore.io",
  tenant: "your-tenant",
  dataCore: "your-data-core",
  apiKey: "your-api-key",
})

The internal store uses the appropriate KV adapter for your environment (Bun, Node, or Deno), but note that this state is not persistent across application restarts and should be used primarily for development.

PostgreSQL Persistence (Production)

For production environments, you can use PostgreSQL for reliable and scalable persistence:

import { createPostgresPathwayState, PostgresPathwayState } from "@flowcore/pathways"

// Create a PostgreSQL state handler
const postgresState = createPostgresPathwayState({
  host: "localhost",
  port: 5432,
  user: "postgres",
  password: "postgres",
  database: "pathway_db",
  tableName: "pathway_state", // Optional, defaults to "pathway_state"
  ttlMs: 300000, // Optional, defaults to 5 minutes (300000ms)
  ssl: false, // Optional, defaults to false
})

// Use PostgreSQL for pathway state
pathways.withPathwayState(postgresState)

The PostgreSQL implementation:

  • Automatically creates the necessary table if it doesn't exist
  • Includes TTL-based automatic cleanup of processed events
  • Creates appropriate indexes for performance

Advanced Usage

Auditing

Enable auditing to track events:

// Set up auditing
pathways
  .withAudit((path, event) => {
    console.log(`Audit: ${path} event ${event.eventId}`)
    logToAuditSystem(path, event)
  })
  .withUserResolver(async () => {
    // Get the current user ID from context
    return {
      entityId: "user-123",
      entityType: "user",
    }
  })

Custom Loggers

Create a custom logger:

import { Logger } from "@flowcore/pathways"

class MyCustomLogger implements Logger {
  debug(message: string, context?: Record<string, unknown>): void {
    console.debug(`[DEBUG] ${message}`, context)
  }

  info(message: string, context?: Record<string, unknown>): void {
    console.info(`[INFO] ${message}`, context)
  }

  warn(message: string, context?: Record<string, unknown>): void {
    console.warn(`[WARN] ${message}`, context)
  }

  error(message: string, error?: Error, context?: Record<string, unknown>): void {
    console.error(`[ERROR] ${message}`, error, context)
  }
}

// Use custom logger
const pathways = new PathwaysBuilder({
  // ...other config
  logger: new MyCustomLogger(),
})

Retry Mechanisms

Configure retry behavior for pathways:

// Global timeout for pathway processing
const pathways = new PathwaysBuilder({
  // ...other config
  pathwayTimeoutMs: 15000, // 15 seconds
})

// Per-pathway retry configuration
pathways.register({
  flowType: "payment",
  eventType: "process",
  schema: paymentSchema,
  maxRetries: 5, // Retry up to 5 times
  retryDelayMs: 1000, // 1 second between retries
})

Session Pathways

The SessionPathwayBuilder provides a way to associate session IDs with pathway operations, making it easier to track and manage user sessions in your application.

Setting Up Session Support

To use session-specific functionality, first configure your PathwaysBuilder with session support:

import { PathwaysBuilder } from "@flowcore/pathways"

// Configure the builder with session support
const pathways = new PathwaysBuilder({
  baseUrl: "https://api.flowcore.io",
  tenant: "your-tenant",
  dataCore: "your-data-core",
  apiKey: "your-api-key",
  enableSessionUserResolvers: true, // Enable session-specific resolvers
})

Creating Session Pathways

Create a session-specific pathway wrapper:

import { SessionPathwayBuilder } from "@flowcore/pathways"

// Create a session with an auto-generated session ID
const session = new SessionPathwayBuilder(pathways)
const sessionId = session.getSessionId() // Get the auto-generated ID

// Or create a session with a specific session ID
const customSession = new SessionPathwayBuilder(pathways, "user-session-123")

Session-Specific User Resolvers

You can register different user resolvers for different sessions, allowing you to associate users with specific sessions:

// Register a user resolver for a specific session
pathways.withSessionUserResolver("user-session-123", async () => {
  // Return the user ID for this session
  return {
    entityId: "user-456",
    entityType: "user",
  }
})

// Alternative: Register directly through the session instance
session.withUserResolver(async () => {
  return {
    entityId: "key-789",
    entityType: "key",
  }
})

Writing Events with Session Context

Events written through a session builder automatically include the session ID:

// Write an event with session context
await session.write("order/placed", {
  data: {
    orderId: "ord-123",
    userId: "user-456",
    total: 99.99,
    items: [{ id: "item-1", quantity: 2 }],
  },
})

// You can override the session ID for a specific write
await session.write("order/placed", {
  data: orderData,
  options: { sessionId: "different-session" },
})

// Batch write events with session context
await session.write("user/actions", {
  batch: true,
  data: [actionData1, actionData2, actionData3],
})

Session ID in Audit Events

When auditing is enabled, the session ID is included in the audit metadata:

// Enable auditing
pathways.withAudit((path, event) => {
  console.log(`Audit: ${path} event ${event.eventId}`)
  // The session ID will be included in event metadata
})

// Now when writing events through a session
await session.write("order/placed", { data: orderData })
// The session ID is automatically included in the audit metadata

File Pathways

File pathways provide a specialized way to handle file uploads and processing in your Flowcore applications. They automatically handle file type detection, binary content processing, and provide a structured approach to file management.

Registering File Pathways

Register a file pathway by setting the isFilePathway flag to true:

import { z } from "zod"

// Define additional properties schema for your file
const documentSchema = z.object({
  documentType: z.enum(["invoice", "receipt", "contract"]),
  department: z.string(),
  metadata: z.record(z.string()).optional(),
})

// Register a file pathway
pathways.register({
  flowType: "document",
  eventType: "uploaded",
  schema: documentSchema, // Additional properties beyond the file itself
  isFilePathway: true, // This marks it as a file pathway
  writable: true,
})

Writing Files to Pathways

File pathways use a special input format that includes file content and metadata:

import { readFile } from "node:fs/promises"

// Read file content (as Buffer for Node.js/Bun, Uint8Array for Deno)
const fileContent = await readFile("./invoice.pdf")

// Write a file to a pathway
const eventId = await pathways.write("document/uploaded", {
  data: {
    fileId: "file-123", // Unique identifier for the file
    fileName: "invoice-2024.pdf", // Original filename
    fileContent: fileContent, // File content as Buffer/Uint8Array
    // Additional properties defined in your schema
    documentType: "invoice",
    department: "finance",
    metadata: {
      customer: "ACME Corp",
      amount: "1500.00",
    },
  },
})

File Input Schema

File pathways automatically include these required fields:

// Built-in file fields (automatically added)
interface FileInput {
  fileId: string // Unique identifier for the file
  fileName: string // Original filename with extension
  fileContent: Buffer | Uint8Array // Binary file content
  // ... your additional schema properties
}

File Event Schema

When processed, file events include automatic file type detection:

// Built-in file event fields (automatically added to your schema)
interface FileEvent {
  fileId: string // Unique identifier for the file
  fileName: string // Original filename
  fileType: string // MIME type (automatically detected)
  fileContent: Blob // File content as Blob
  // ... your additional schema properties
}

Handling File Events

Handle file events just like regular events, but with access to file-specific properties:

pathways.handle("document/uploaded", async (event) => {
  const { fileId, fileName, fileType, fileContent, documentType, department } = event.payload

  console.log(`Processing file: ${fileName} (${fileType})`)
  console.log(`Document type: ${documentType}, Department: ${department}`)

  // Process the file content
  if (fileType === "application/pdf") {
    await processPDFDocument(fileContent, event.payload.metadata)
  } else if (fileType.startsWith("image/")) {
    await processImageFile(fileContent, documentType)
  }

  // Store file metadata
  await storeFileMetadata({
    fileId,
    fileName,
    fileType,
    documentType,
    department,
    processedAt: new Date(),
  })
})

File Pathway Limitations

File pathways have some specific limitations:

// ❌ Batch writes are NOT supported for file pathways
// This will throw an error:
await pathways.write("document/uploaded", {
  batch: true, // Error: Batch is not possible for file pathways
  data: [fileData1, fileData2],
})

// ✅ Write files individually instead:
for (const fileData of fileDataArray) {
  await pathways.write("document/uploaded", { data: fileData })
}

Complete File Pathway Example

Here's a complete example of setting up and using file pathways:

import { PathwaysBuilder } from "@flowcore/pathways"
import { z } from "zod"
import { readFile } from "node:fs/promises"

// Define schema for additional file properties
const documentSchema = z.object({
  documentType: z.enum(["invoice", "receipt", "contract", "report"]),
  department: z.string(),
  tags: z.array(z.string()).optional(),
  metadata: z.record(z.string()).optional(),
})

const pathways = new PathwaysBuilder({
  baseUrl: "https://api.flowcore.io",
  tenant: "your-tenant",
  dataCore: "your-data-core",
  apiKey: "your-api-key",
})

// Register file pathway
pathways
  .register({
    flowType: "document",
    eventType: "uploaded",
    schema: documentSchema,
    isFilePathway: true,
  })
  .handle("document/uploaded", async (event) => {
    const { fileId, fileName, fileType, documentType, department } = event.payload

    console.log(`Processing ${documentType} from ${department}: ${fileName}`)

    // File type-specific processing
    switch (fileType) {
      case "application/pdf":
        await extractPDFText(event.payload.fileContent)
        break
      case "image/jpeg":
      case "image/png":
        await extractImageMetadata(event.payload.fileContent)
        break
      default:
        console.log(`Unsupported file type: ${fileType}`)
    }

    // Trigger downstream processing
    await pathways.write("document/processed", {
      data: {
        fileId,
        fileName,
        documentType,
        department,
        processedAt: new Date().toISOString(),
        status: "completed",
      },
    })
  })

// Upload a file
async function uploadDocument(filePath: string, documentType: string, department: string) {
  const fileContent = await readFile(filePath)
  const fileName = filePath.split("/").pop() || "unknown"

  return await pathways.write("document/uploaded", {
    data: {
      fileId: `doc-${Date.now()}`,
      fileName,
      fileContent,
      documentType,
      department,
      tags: ["automated-upload"],
      metadata: {
        uploadedAt: new Date().toISOString(),
        source: "api",
      },
    },
  })
}

// Usage
await uploadDocument("./invoice.pdf", "invoice", "finance")

API Reference

For a complete API reference, please see the API documentation.

0.16.2

6 months ago

0.16.1

6 months ago

0.16.0

6 months ago

0.15.3

6 months ago

0.15.2

6 months ago

0.15.1

6 months ago

0.15.0

7 months ago

0.14.0

7 months ago

0.13.2

7 months ago

0.13.1

8 months ago

0.13.0

8 months ago

0.12.0

8 months ago

0.11.0

8 months ago

0.10.0

8 months ago

0.9.1

8 months ago

0.9.0

9 months ago

0.8.0

9 months ago

0.7.0

9 months ago

0.6.0

9 months ago

0.5.0

9 months ago

0.4.0

9 months ago

0.3.0

9 months ago

0.2.4

9 months ago

0.2.3

9 months ago

0.2.2

9 months ago

0.2.1

9 months ago