8.4.0 • Published 1 year ago

p-mongo-queue v8.4.0

Weekly downloads
Last release
1 year ago


A light-weight way to create queues using MongoDB. Forked from mhassan1/mongodb-queue-up, which itself is a fork of chilts/mongodb-queue. This fork enforces modern code standards and replaces previous callback-based approaches with promises.


import { MongoClient } from 'mongodb'
import { PMongoQueue } from 'p-mongo-queue'

const url = 'mongodb://localhost:27017/'
const client = new MongoClient(url)
await client.connect()

// Create a queue
const db = client.db('test')
const queue = PMongoQueue(db, 'my-queue')

// One time operation to create the required indexes
await queue.createIndexes()

// Add a message to a queue
await queue.add('Hello, World!')

// Get a message from the queue
const msg = await queue.get()

console.log(`msg.id = ${msg.id}`)
console.log(`msg.ack = ${msg.ack}`)
console.log(`msg.payload = ${msg.payload}`) // 'Hello, World!'
console.log(`msg.tries = ${msg.tries}`)

// Remove a message from the queue
await queue.ping(msg.ack)

// Process all messages from the queue, two at a time
queue.process(2, (msg) => {
  console.log(`msg.payload = ${msg.payload}`)
  // At the end of this function the message is automatically removed from the queue.

Messages are never removed from the queue, even if processed. To remove messages that have successfully completed:

await queue.clean()


PMongoQueue(db, name[, options])

  • db MongoDB client
  • name Queue name
  • options Options for the queue

Creates a queue instance.

db should be a v4 compatible client. name is used for both the queue name and collection name.


The default values are shown after each option key.

    // Another queue instance to place dead items on after `maxRetries` has been reached
    deadQueue: undefined,

    // Maximum retries before stopping processing of an item
    maxRetries: 5,

    // Delay in seconds before processing an item
    delay: 0,

    // How long before a message should be returned to the queue
    visibility: 30


Queue.add(payload[, options]): Promise<string | string[]>

Add items to the queue. payload can either be a single value, or an array of values. Options are identical to queue options.

The return value is an ID for each item provided. This can be used for tracking the item between functions.

// add one item
await queue.add('Hello, World!')

// add multiple items
await queue.add(['A', 'B', 'C'])

// add an object
await queue.add({ a: 'A', b: 'B' })

// delay visibility
await queue.add('Hello', { delay: 120 })

get([options]): Promise<QueueMessage | undefined>

Add items to the queue. payload can either be a single value, or an array of values. Options are identical to queue options. QueueMessage is a representation of the message on the queue.

// get one item
const message = await queue.get()

// override default queue setting
const message = await queue.get({ visibility: 10 })

ack(ack): Promise<string>

Marks an item as complete. ack should be the msg.ack value.

const msg = await queue.get()
await queue.ack(msg.ack)
// message removed from queue

ping(ack, options): Promise<string>

Extends the processing time of an item. ack should be the msg.ack value. Options are identical to queue options.

const msg = await queue.get()
await queue.ack(msg.ack)
// message removed from queue

Queue Message

A queue message is a representation of a message on the queue.

    ack: string,
    id: string,
    payload: any,
    tries: number


Queue.process([parallelism=1], processor): void

Parallelism is an optimal parameter to specify the number of simultaneous jobs to be processed. Processor is a function to be called to process each item. The function should take in as a single parameter a QueueMessage.

queue.process((msg) => {

Queue.stop(): Promise<void>

Stops the queue from processing new items. Existing items will continue to be processed. The promise resolves when all existing items have been processed.

Queue.start(): Promise<void>

Starts the queue processing new items. The promise resolves when the first new item has been processed, or no items are currently available.



Queue.createIndexes(): Promise<string>

Create indexes on the queue collection, required for fast use of queue. Returns the indexName generated.

Queue.clean(): Promise<void>

Removes all processed items from the queue.

Queue.size(): Promise<number>

Returns the total number of messages that are waiting in the queue.

Queue.inFlight(): Promise<number>

Returns the total number of messages that are currently in flight. ie. that have been received but not yet acked.

Queue.done(): Promise<number>

Returns the total number of messages that have been processed correctly in the queue.

Queue.total(): Promise<number>

Returns the total number of messages that has ever been in the queue, including all current messages.


Initially licensed by chilts under MIT (https://chilts.mit-license.org/2014/), then licensed by mhassan1 under MIT. New changes released under the MIT license with Crown Copyright.


1 year ago


1 year ago


1 year ago


1 year ago


1 year ago


1 year ago


1 year ago


1 year ago


1 year ago


1 year ago