cuckmq v0.4.3
A lightweight, configurable job-queue backed by Postgres, offering an alternative to the redis-backed bullmq
.
Core Features:
- Built-in type safety.
- Flexible concurrency semantics.
- Repeating/scheduled jobs.
- Retryable jobs.
- Robust event system.
A First Look
import process from "process"
import { Context } from "cuckmq"
import { Pool } from "pg"
// Create the central "context" object
const pool = new Pool({ connectionString: process.env.DATABASE_URL })
const context = new Context({ pool, schema: "_job" })
// Define a "PING" job that echo's its received payload
const pingJob = new JobDefinition({
name: "PING",
context: context
workFunction: async (payload : { message : string }, metadata: { jobId : string }) => {
console.log(`${metadata.jobId} - ${payload.message}`)
}
})
// Ensure job definition metadata is deployed to the database
await pingJob.deploy()
// Kick off an orchestrator and some workers
const orchestrator = new Orchestrator({ context })
const worker1 = new Worker({ context })
const worker2 = new Worker({ context })
for(let i = 0; i < 1000; i += 1) {
await pingJob.enqueue({ message: `Hello: ${i}` })
}
// Listen for a SIGINT and gracefully shutdown
process.on("SIGINT", async () => {
await orchestrator.stop()
await worker1.stop()
await worker2.stop()
await pool.end()
})
Installation & Setup
Install the package with:
yarn add cuckmq
Next, ensure the database is "prepared" to host a cuckmq deployment by applying cuckmq's idempotent migrations for a specified postgres schema (preventing naming collisions with your biz-logic tables). This can be done as follows:
import process from "process"
import { generateMigrationSql } from "cuckmq"
import { Pool } from "pg"
const pool = new Pool({ connectionString: process.env.DATABASE_URL })
const schema = "_cuckmq"
for(const sqlFragment of generateMigrationSql(schema)) {
await pool.query(sqlFragment)
}
await pool.end()
Getting Started
Once the above is done, we can get started with writing some code!
Context
Our first step is to define a Context
object. This context will be passed to all cuckmq object that we create and contains shared config and a reference to an underlying database connection pool. Ensure the schema passed into the context matches the one during the migration!
import process from "process"
import { Context } from "cuckmq"
import { Pool } from "pg"
const pool = new Pool({ connectionString: process.env.DATABASE_URL })
const context = new Context({ pool, schema: "_cuckmq" })
Job Definitions
Now that we have a functional context, we can build out the remaining cuckmq machinery. Next up are JobDefinition
objects. These are essentially definitions of the jobs that we would like cuckmq to run for us. They look like this:
import { JobDefinition } from "cuckmq"
type JobParams = { foo : string }
const jobDefinition = new JobDefinition<JobParams>({
name: "MY_JOB",
context: context
workFunction: async (payload : JobParams, metadata: { jobId : string }) => {
console.log(payload.foo)
}
})
A job definition must be constructed with a context (see above), a name that is unique across all other job definitions and a work function that describes the work to be done.
We must ensure our job definitions are deployed to the database before we use them - else we will encounter an error. This can be done by running:
await jobDefinition.deploy()
Enqueuing work is just as simple:
await jobDefinition.enqueue({ foo: "hello" })
Workers
We must now create workers to actually perform the work defined by each job definition:
import { Worker } from "cuckmq"
import { context } from "./context"
const worker = new Worker({ context })
To increase concurrency, we can simply instantiate more worker objects. All workers share the same context and thus the same underlying connection pool. As such, increasing the number of workers should have a relatively minimal performance impact.
The Orchestrator
Although worker daemons handle the meat-and-potatoes of processing outstanding jobs, each cuckmq deployment needs an orchestrator (only 1 is needed, but more won't cause issues). An orchestrator is primarily responsible for scheduling repeated jobs and performing regular maintenance on persisted job metadata. Although an orchestrator isn't strictly needed, it is highly recommended that you run one (even if you have no scheduled/repeatable jobs) to keep the size of persisted job metadata to reasonable/stable levels:
import { Orchestrator } from "cuckmq"
import { context } from "./context"
const orchestrator = new Worker({ context })
Shutting Down
It is imperative we shut down the daemons gracefully. Although cuckmq will ultimately recover from any shutdown event (even a hard crash), cuckmq may temporarily be locked out of processing certain jobs should the daemons not be given time to gracefully shutdown. The recommended approach for shutdown is to capture SIGINT
/SIGTERM
and then await Daemon#stop()
on all instantiated daemons before exiting.
import process from "process"
import { Worker } from "cuckmq"
import { context } from "./context"
const worker = new Worker({ context })
const orchestrator = new Orchestrator({ context })
process.on("SIGINT", () => Promise.all([
worker.stop(),
orchestrator.stop()
]))
Advanced Topics
You now have the basic knowledge required to go forth and process background jobs with cuckmq. However, cuckmq offers a large amount of powerful advanced features that we will cover below!
Repeating jobs
We can set a job definition to be regularly enqueued by the orchestrator by simply including a repeatSecs : number
parameter in the job definition constructor:
import { JobDefinition } from "cuckmq"
import { context } from "./context"
const jobDefinition = new JobDefinition({
name: "MY_REPEATING_JOB",
context: context,
repeatSecs: 60,
workFunction: async (payload, metadata: { jobId : string }) => {
console.log("hello")
}
})
N.B. Repeating jobs must have work functions that take empty objects as the payload. This constraint is enforced by the typechecker.
By default, the orchestrator polls for repeating jobs every 5 seconds. This value can be adjusted by setting: jobSchedulePollSecs
on the orchestrator constructor.
Delayed Jobs
We can defer execution of a job by at least some period of time (obviously we can't set an upper bound as that is a function of the size of the job queue). This is done by specifying a delaySecs
in the optional job definition enqueue params:
await jobDefinition.enqueue(payload, { delaySecs: 30 })
Retries
By default a job will only be attempted once. This behaviour can be adjusted using numAttempts : number
to specify the number of times a job should be attempted, and timeoutSecs : number
to specify the period of time workers should wait before retrying failed jobs. Both of these parameters should be specified in the constructor of the job definition:
const jobDefinition = new JobDefinition({
name: "MY_REPEATING_JOB",
numAttempts: 3,
timeoutSecs: 60 * 5,
context: context,
repeatSecs: 60,
workFunction: async (payload, metadata: { jobId : string }) => {
console.log("hello")
}
})
Maintenance
Jobs that have either successfully processed, or have failed until they run out of processing attempts are "finalized". Finalized jobs sit around in the database for a certain period of time (specified by jobPostFinalizeDeleteSecs
on the context constructor - default value is 24 hours).
After this time, the orchestrator will delete permanently from the database - keeping job metadata storage space requirements from growing unchecked.
Channels
Channels are a powerful concept that let us partition workers into processing different sets of jobs. For example, we may have low priority and high priority jobs. We may want to provide an exclusive set of workers that only operate on the high priority jobs to ensure a large backlog of low priority jobs doesn't prevent high priority jobs from getting processed in a timely fashion.
Job definitions can specify a channel over which to publish their jobs using the channel: string
parameter in the constructor (by default the channel is set to _default
).
Conversely, workers can specify a list of channels from which it will take jobs to process using the channels: string[]
parameter in its constructor. If the parameter isn't specified, workers will take jobs from all channels.
All together, using channels in the way described above looks something like this:
import { JobDefinition, Worker } from "cuckmq"
import { context } from "./context"
const highPriorityJob = new JobDefinition({
name: "HIGH_PRIORITY_JOB",
channel: "HIGH_PRIORITY",
context: context,
workFunction: async (payload, metadata: { jobId : string }) => {
// Do work
}
})
const lowPriorityJob = new JobDefinition({
name: "LOW_PRIORITY_JOB",
context: context,
workFunction: async (payload, metadata: { jobId : string }) => {
// Do work
}
})
const workers : Worker[] = [
new Worker({ context, channels: ["HIGH_PRIORITY"] }),
new Worker({ context }),
new Worker({ context }),
]
The above setup will ensure all 3 workers process high priority jobs when available, however, a single worker will be reserved for handling high priority jobs only.
Job Groups
The final advanced feature to discuss are job groups. Each job is assigned a job group when enqueued. By default this is a random uuidv4
. However, these job groups can be overridden:
await jobDefinition.enqueue(payload, { jobGroup: "MY_CUSTOM_JOB_GROUP" })
Each job group has its own global lock. Everytime a worker wishes to process a job, it must acquire the exclusive lock of the job's job group. Once the worker finishes processing the job (regardless of whether processing was successful or not), the lock is released.
As mentioned above, when daemons are gracefully shutdown, job group locks will always be released. In the event of a hard crash/power-off event - these locks may get stuck in a locked state - preventing certain jobs from being processed.
We can specify a jobGroupUnlockSecs
on the context constructor. This specifies a fallback upper limit on the amount of time a job group can remain locked. Thus, in the event of a crash where the locks aren't released, we must wait for jobGroupUnlockSecs
before jobs of the locked job group will be processed again.
When setting this value (default is 3 hours), we must ensure it is sufficiently high such that it doesn't become unlocked during the processing of a particularly long running job. Were this to occur, another job of the same job group (or perhaps even the same job!) would begin being processed.
Events
Cuckmq publishes a rich array of events. These can be subscribed to by calling: Context#addEventHandler
and passing in a custom event handler. The total set of events emitted is:
Key | Description |
---|---|
JOB_DEFINITION_DEPLOY | A JobDefinition has been deployed to the database |
JOB_DEFINITION_JOB_ENQUEUE | A job has been enqueued and persisted in the database |
WORKER_JOB_DEQUEUE | A worker has dequeued a job for processing |
WORKER_JOB_ORPHAN | A dequeued job did not find a corresponding JobDefinition registered with the Context |
WORKER_JOB_EXPIRE | A dequeued job has run out of attempts and will now be finalized |
WORKER_JOB_RUN | A job has started to run on a worker |
WORKER_JOB_RUN_ERROR | A job threw an error during processing |
WORKER_JOB_RUN_SUCCESS | A job was sucessfully processed and was subsequently finalized |
ORCHESTRATOR_JOB_DELETE | A finalized job was deleted by the orchestrator |
ORCHESTRATOR_JOB_GROUP_DELETE | An unreferenced job group was deleted |
ORCHESTRATOR_JOB_SCHEDULE | A repeating job is due to be enqueued |
ORCHESTRATOR_JOB_SCHEDULE_ORPHAN | A repeating job was due to be enqueued but there was no corresponding JobDefinition registered with the Context |
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
6 months ago
10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago