0.8.0 • Published 1 year ago

nats-jobs v0.8.0

Weekly downloads
-
License
ISC
Repository
github
Last release
1 year ago

NATS Jobs

Background job processing using NATS JetStream for distributing work.

See examples directory for more examples.

Companion libraries

Usage

This library uses debug. To enable:

DEBUG=nats-jobs node myfile.js

msgpackr is the recommended way to encode complex data structures since it's fast, efficient, and can handle serializing and unserializing dates.

Processing jobs

import { JsMsg } from 'nats'
import { setTimeout } from 'node:timers/promises'
import { expBackoff, jobProcessor } from 'nats-jobs'

const def = {
  stream: 'ORDERS',
  backoff: expBackoff(1000),
  async perform(msg: JsMsg) {
    console.log(`Started ${msg.info.streamSequence}`)
    console.log(msg.data.toString())
    // Simulate work
    await setTimeout(5000)
    console.log(`Completed ${msg.info.streamSequence}`)
  },
}

const processor = await jobProcessor()
const myJob = processor.start(jobDef)
// Gracefully handle shutdown
const shutDown = async () => {
  await myJob.stop()
  process.exit(0)
}
process.on('SIGTERM', shutDown)
process.on('SIGINT', shutDown)

To gracefully shutdown mutliple jobs and close the NATS connection call stop from the object returned by jobProcessor.

const processor = await jobProcessor()
const jobs = [
  processor.start(jobDef1),
  processor.start(jobDef2),
  processor.start(jobDef3),
]
const shutDown = async () => {
  // Shuts down jobs 1, 2, and 3 and closes the NATS connection
  await processor.stop()
  process.exit(0)
}

process.on('SIGTERM', shutDown)
process.on('SIGINT', shutDown)

Publish via NATS

nats pub ORDERS someText

Strategies

Short-lived jobs For short-lived jobs like sending an email you shouldn't need to enable autoExtendTimeout. Processing of a message should happen in less than the 10 second ack_wait default.

Long-lived jobs Long-lived jobs should use a short ack_wait with autoExtendAckTimeout set to true. You may also want to use the timeout option as a sanity check. A timeout call abort on the abort signal just like calling stop. However, you can differntiate by checking signal.reason which will be set to timeout for a timeout and stop when stop is called. This allows the job control of how it wants to handle this situation with two likely scenarios:

(1) Finish processing current record and exit. (2) Register an onabort callback that logs an error and calls process.exit().

Testing

Run the following in the /docker directory to start up NATS.

docker compose up
0.8.0

1 year ago

0.5.0

2 years ago

0.7.0

2 years ago

0.6.0

2 years ago

0.4.0

2 years ago

0.1.0

2 years ago

0.3.0

2 years ago

0.0.3

2 years ago

0.2.0

2 years ago

0.0.2

2 years ago

0.0.5

2 years ago

0.0.4

2 years ago

0.0.6

2 years ago

0.0.1

2 years ago