catqueue
A Redis-free, PostgreSQL-native job queue for Node.js — built for developers who already have Postgres and don't want to manage another service (Redis).
npm install catqueue
Why catqueue?
Most job queues need Redis. catqueue doesn't. If you're already running PostgreSQL, you have everything you need.
| Feature | catqueue | BullMQ |
|---|---|---|
| Broker required | PostgreSQL only | Redis required |
| Idempotency keys | Built-in | |
| Per-attempt error log | JSON array | |
| Dead letter + replay | ||
| Atomic job locking | SELECT FOR UPDATE SKIP LOCKED |
Redis SETNX |
| Crash recovery | ||
| TypeScript support | Full types |
Quick Start
1. Run the migration
Create the jobs table in your PostgreSQL database. Find 001_init.sql in the migrations/ folder inside the package:
node_modules/catqueue/migrations/001_init.sql
Run it against your database:
psql YOUR_CONNECTION_STRING -f node_modules/catqueue/migrations/001_init.sql
Or paste it directly into your database's SQL editor (Neon, Supabase, etc).
2. Use it
import { CatQueue } from "catqueue";
const queue = new CatQueue({
connectionString: process.env.DATABASE_URL!,
});
// Register handlers
queue.register("send-email", async (payload) => {
console.log("Sending email to", payload.to);
// await mailer.send(payload);
});
queue.register("resize-image", async (payload) => {
console.log("Resizing image", payload.url);
});
// Start the worker (polls every 2s by default)
queue.start();
// Enqueue jobs from anywhere in your app
const jobId = await queue.enqueue("send-email", {
to: "user@example.com",
subject: "Welcome!",
});
console.log("Job enqueued:", jobId);
API Reference
new CatQueue(config)
const queue = new CatQueue({
connectionString: string, // required — your PostgreSQL connection string
pollInterval?: number, // ms between polls, default: 2000
lockDuration?: number, // seconds a job stays locked, default: 30
});
queue.enqueue(jobName, payload, options?)
Adds a job to the queue. Returns the job ID.
const jobId = await queue.enqueue("send-email", { to: "user@example.com" });
// With options
const jobId = await queue.enqueue(
"send-email",
{ to: "user@example.com" },
{
priority: 1, // 1 = urgent, 5 = low, default: 3
maxAttempts: 3, // default: 5
runAt: new Date(Date.now() + 5000), // delay by 5 seconds
idempotencyKey: "welcome-email-user-123", // prevent duplicate jobs
},
);
Idempotency keys — if you enqueue a job with an idempotencyKey that already exists, the second call is silently ignored. Useful for preventing duplicate emails, charges, etc.
queue.register(jobName, handler)
Registers a handler function for a job type. Must be called before queue.start().
queue.register<{ to: string; subject: string }>(
"send-email",
async (payload) => {
// payload is fully typed
await mailer.send({ to: payload.to, subject: payload.subject });
},
);
queue.start()
Starts the worker polling loop. Each poll:
- Recovers any stuck
PROCESSINGjobs with expired locks - Atomically claims the next
PENDINGjob usingSELECT FOR UPDATE SKIP LOCKED - Runs the registered handler
- On success → marks job
COMPLETED - On failure → retries with exponential backoff (
2^attemptCountseconds) - After
maxAttempts→ marks jobDEAD
queue.start();
queue.stop()
Gracefully stops the worker and closes the database connection.
process.on("SIGINT", async () => {
await queue.stop();
process.exit(0);
});
Job Lifecycle
PENDING → PROCESSING → COMPLETED
↓
(on failure)
↓
attemptCount++
runAt = now + 2^n seconds
↓
back to PENDING
↓
(after maxAttempts exceeded)
↓
DEAD
Dead jobs can be replayed by resetting them to PENDING in your application logic.
Error Logging
Every failed attempt is appended to the job's error_log column as a JSON array:
[
{ "attempt": 1, "error": "Connection timeout", "at": "2026-06-26T10:00:00Z" },
{ "attempt": 2, "error": "Connection timeout", "at": "2026-06-26T10:00:02Z" }
]
Retry Schedule (Exponential Backoff)
| Attempt | Retry after |
|---|---|
| 1 | 2 seconds |
| 2 | 4 seconds |
| 3 | 8 seconds |
| 4 | 16 seconds |
| 5 (dead) | — |
TypeScript
catqueue ships with full TypeScript types. Use generics for typed payloads:
interface EmailPayload {
to: string;
subject: string;
body: string;
}
queue.register<EmailPayload>("send-email", async (payload) => {
// payload.to, payload.subject, payload.body are all typed
});
await queue.enqueue<EmailPayload>("send-email", {
to: "user@example.com",
subject: "Hello",
body: "Welcome!",
});
Requirements
- Node.js 18+
- PostgreSQL 13+ (uses
gen_random_uuid()andSKIP LOCKED)
License
MIT