npm.io
1.1.1 • Published 6d ago

catqueue

Licence
ISC
Version
1.1.1
Deps
1
Size
18 kB
Vulns
0
Weekly
0

catqueue

A Redis-free, PostgreSQL-native job queue for Node.js — built for developers who already have Postgres and don't want to manage another service (Redis).

npm install catqueue

Why catqueue?

Most job queues need Redis. catqueue doesn't. If you're already running PostgreSQL, you have everything you need.

Feature catqueue BullMQ
Broker required PostgreSQL only Redis required
Idempotency keys Built-in
Per-attempt error log JSON array
Dead letter + replay
Atomic job locking SELECT FOR UPDATE SKIP LOCKED Redis SETNX
Crash recovery
TypeScript support Full types

Quick Start

1. Run the migration

Create the jobs table in your PostgreSQL database. Find 001_init.sql in the migrations/ folder inside the package:

node_modules/catqueue/migrations/001_init.sql

Run it against your database:

psql YOUR_CONNECTION_STRING -f node_modules/catqueue/migrations/001_init.sql

Or paste it directly into your database's SQL editor (Neon, Supabase, etc).

2. Use it
import { CatQueue } from "catqueue";

const queue = new CatQueue({
  connectionString: process.env.DATABASE_URL!,
});

// Register handlers
queue.register("send-email", async (payload) => {
  console.log("Sending email to", payload.to);
  // await mailer.send(payload);
});

queue.register("resize-image", async (payload) => {
  console.log("Resizing image", payload.url);
});

// Start the worker (polls every 2s by default)
queue.start();

// Enqueue jobs from anywhere in your app
const jobId = await queue.enqueue("send-email", {
  to: "user@example.com",
  subject: "Welcome!",
});

console.log("Job enqueued:", jobId);

API Reference

new CatQueue(config)
const queue = new CatQueue({
  connectionString: string,  // required — your PostgreSQL connection string
  pollInterval?: number,     // ms between polls, default: 2000
  lockDuration?: number,     // seconds a job stays locked, default: 30
});

queue.enqueue(jobName, payload, options?)

Adds a job to the queue. Returns the job ID.

const jobId = await queue.enqueue("send-email", { to: "user@example.com" });

// With options
const jobId = await queue.enqueue(
  "send-email",
  { to: "user@example.com" },
  {
    priority: 1, // 1 = urgent, 5 = low, default: 3
    maxAttempts: 3, // default: 5
    runAt: new Date(Date.now() + 5000), // delay by 5 seconds
    idempotencyKey: "welcome-email-user-123", // prevent duplicate jobs
  },
);

Idempotency keys — if you enqueue a job with an idempotencyKey that already exists, the second call is silently ignored. Useful for preventing duplicate emails, charges, etc.


queue.register(jobName, handler)

Registers a handler function for a job type. Must be called before queue.start().

queue.register<{ to: string; subject: string }>(
  "send-email",
  async (payload) => {
    // payload is fully typed
    await mailer.send({ to: payload.to, subject: payload.subject });
  },
);

queue.start()

Starts the worker polling loop. Each poll:

  1. Recovers any stuck PROCESSING jobs with expired locks
  2. Atomically claims the next PENDING job using SELECT FOR UPDATE SKIP LOCKED
  3. Runs the registered handler
  4. On success → marks job COMPLETED
  5. On failure → retries with exponential backoff (2^attemptCount seconds)
  6. After maxAttempts → marks job DEAD
queue.start();

queue.stop()

Gracefully stops the worker and closes the database connection.

process.on("SIGINT", async () => {
  await queue.stop();
  process.exit(0);
});

Job Lifecycle

PENDING → PROCESSING → COMPLETED
                ↓
            (on failure)
                ↓
           attemptCount++
           runAt = now + 2^n seconds
                ↓
           back to PENDING
                ↓
    (after maxAttempts exceeded)
                ↓
             DEAD

Dead jobs can be replayed by resetting them to PENDING in your application logic.


Error Logging

Every failed attempt is appended to the job's error_log column as a JSON array:

[
  { "attempt": 1, "error": "Connection timeout", "at": "2026-06-26T10:00:00Z" },
  { "attempt": 2, "error": "Connection timeout", "at": "2026-06-26T10:00:02Z" }
]

Retry Schedule (Exponential Backoff)

Attempt Retry after
1 2 seconds
2 4 seconds
3 8 seconds
4 16 seconds
5 (dead)

TypeScript

catqueue ships with full TypeScript types. Use generics for typed payloads:

interface EmailPayload {
  to: string;
  subject: string;
  body: string;
}

queue.register<EmailPayload>("send-email", async (payload) => {
  // payload.to, payload.subject, payload.body are all typed
});

await queue.enqueue<EmailPayload>("send-email", {
  to: "user@example.com",
  subject: "Hello",
  body: "Welcome!",
});

Requirements

  • Node.js 18+
  • PostgreSQL 13+ (uses gen_random_uuid() and SKIP LOCKED)

License

MIT