nest-bull v0.9.0
Description
This is a Bull module for Nest 6.
Installation
$ npm i --save nest-bull bull
$ npm i --save-dev @types/bull
Quick Start
import {Body, Controller, Get, Module, Param, Post} from '@nestjs/common';
import {DoneCallback, Job, Queue} from 'bull';
import {BullModule, InjectQueue} from 'nest-bull';
@Controller()
export class AppController {
constructor(
@InjectQueue('store') readonly queue: Queue,
) {}
@Post()
async addJob( @Body() value: any ) {
const job: Job = await this.queue.add(value);
return job.id;
}
@Get(':id')
async getJob( @Param('id') id: string ) {
return await this.queue.getJob(id);
}
}
@Module({
imports: [
BullModule.register({
name: 'store',
options: {
redis: {
port: 6379,
},
},
processors: [
(job: Job, done: DoneCallback) => { done(null, job.data); },
],
}),
],
controllers: [
AppController,
],
})
export class ApplicationModule {}
Decorators
This module provides some decorators that will help you to set up your queue listeners.
@Processor()
The @Processor()
class decorator is mandatory if you plan to use this package's decorators.
It accepts an optional QueueDecoratorOptions
argument:
export interface QueueDecoratorOptions {
name?: string; // Name of the queue
}
@Process()
The @Process()
method decorator flags a method as a processing function for the queued jobs.
It accepts an optional QueueProcessDecoratorOptions
argument:
export interface QueueProcessDecoratorOptions {
name?: string; // Name of the job
concurrency?: number; // Concurrency of the job
}
Whenever a job matching the configured name
(if any) is queued, it will be processed by the decorated method.
Such method is expected to have the following signature (job: Job, done?: DoneCallback): any
;
@OnQueueEvent()
The OnQueueEvent()
method decorator flags a method as an event listener for the related queue.
It requires a BullQueueEvent
argument:
export type BullQueueEvent =
| 'error'
| 'waiting'
| 'active'
| 'stalled'
| 'progress'
| 'completed'
| 'failed'
| 'paused'
| 'resumed'
| 'cleaned'
| 'drained'
| 'removed'
| 'global:error'
| 'global:waiting'
| 'global:active'
| 'global:stalled'
| 'global:progress'
| 'global:completed'
| 'global:failed'
| 'global:paused'
| 'global:resumed'
| 'global:cleaned'
| 'global:drained'
| 'global:removed';
You can also use the BullQueueEvents
and BullQueueGlobalEvents
enums.
Fortunately, there is a shorthand decorator for each of the Bull events:
@OnQueueError()
@OnQueueWaiting()
@OnQueueActive()
@OnQueueStalled()
@OnQueueProgress()
@OnQueueCompleted()
@OnQueueFailed()
@OnQueuePaused()
@OnQueueResumed()
@OnQueueCleaned()
@OnQueueDrained()
@OnQueueRemoved()
@OnGlobalQueueError()
@OnGlobalQueueWaiting()
@OnGlobalQueueActive()
@OnGlobalQueueStalled()
@OnGlobalQueueProgress()
@OnGlobalQueueCompleted()
@OnGlobalQueueFailed()
@OnGlobalQueuePaused()
@OnGlobalQueueResumed()
@OnGlobalQueueCleaned()
@OnGlobalQueueDrained()
@OnGlobalQueueRemoved()
If you need more details about those events, head straight to Bull's reference doc.
Example
Here is a pretty self-explanatory example on how this package's decorators should be used.
import {Processor, Process, OnQueueActive, OnQueueEvent, BullQueueEvents} from '../../lib';
import {NumberService} from './number.service';
import {Job, DoneCallback} from 'bull';
@Processor()
export class MyQueue {
private readonly logger = new Logger('MyQueue');
constructor(private readonly service: NumberService) {}
@Process({ name: 'twice' })
processTwice(job: Job<number>) {
return this.service.twice(job.data);
}
@Process({ name: 'thrice' })
processThrice(job: Job<number>, callback: DoneCallback) {
callback(null, this.service.thrice(job.data));
}
@OnQueueActive()
onActive(job: Job) {
this.logger.log(
`Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
);
}
@OnQueueEvent(BullQueueEvents.COMPLETED)
onCompleted(job: Job) {
this.logger.log(
`Completed job ${job.id} of type ${job.name} with result ${job.returnvalue}`,
);
}
}
Separate processes
This module allows you to run your job handlers in fork processes.
To do so, add the filesystem path to a file (or more) exporting your processor function to the processors
property of the BullModule options.
You can read more on this subject in Bull's documentation.
Please note that, your function being executed in a fork, Nestjs' DI won't be available.
Example
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from 'nest-bull';
import { join } from 'path';
@Module({
imports: [
BullModule.register({
processors: [ join(__dirname, 'processor.ts') ]
})
]
})
export class AppModule {}
// processor.ts
import { Job, DoneCallback } from 'bull';
export default function(job: Job, cb: DoneCallback) {
cb(null, 'It works');
}
People
- Author - Frederic Woelffel
- Website - https://nestjs.com
5 years ago
5 years ago
5 years ago
5 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
7 years ago
7 years ago
7 years ago
7 years ago