0.9.0 • Published 5 years ago

nest-bull v0.9.0

Weekly downloads
1,514
License
MIT
Repository
github
Last release
5 years ago

Master branch build Dev branch build Code Climate maintainability Code Climate coverage npm NPM npm peer dependency version npm peer dependency version

Description

This is a Bull module for Nest 6.

Installation

$ npm i --save nest-bull bull
$ npm i --save-dev @types/bull

Quick Start

import {Body, Controller, Get, Module, Param, Post} from '@nestjs/common';
import {DoneCallback, Job, Queue} from 'bull';
import {BullModule, InjectQueue} from 'nest-bull';

@Controller()
export class AppController {

  constructor(
    @InjectQueue('store') readonly queue: Queue,
  ) {}

  @Post()
  async addJob( @Body() value: any ) {
    const job: Job = await this.queue.add(value);
    return job.id;
  }

  @Get(':id')
  async getJob( @Param('id') id: string ) {
    return await this.queue.getJob(id);
  }
}

@Module({
  imports: [
    BullModule.register({
      name: 'store',
      options: {
        redis: {
          port: 6379,
        },
      },
      processors: [
        (job: Job, done: DoneCallback) => { done(null, job.data); },
      ],
    }),
  ],
  controllers: [
    AppController,
  ],
})
export class ApplicationModule {}

Decorators

This module provides some decorators that will help you to set up your queue listeners.

@Processor()

The @Processor() class decorator is mandatory if you plan to use this package's decorators.

It accepts an optional QueueDecoratorOptions argument:

export interface QueueDecoratorOptions {
  name?: string; // Name of the queue
}

@Process()

The @Process() method decorator flags a method as a processing function for the queued jobs.

It accepts an optional QueueProcessDecoratorOptions argument:

export interface QueueProcessDecoratorOptions {
  name?: string; // Name of the job
  concurrency?: number; // Concurrency of the job
}

Whenever a job matching the configured name (if any) is queued, it will be processed by the decorated method. Such method is expected to have the following signature (job: Job, done?: DoneCallback): any;

@OnQueueEvent()

The OnQueueEvent() method decorator flags a method as an event listener for the related queue.

It requires a BullQueueEvent argument:

export type BullQueueEvent =
  | 'error'
  | 'waiting'
  | 'active'
  | 'stalled'
  | 'progress'
  | 'completed'
  | 'failed'
  | 'paused'
  | 'resumed'
  | 'cleaned'
  | 'drained'
  | 'removed'
  | 'global:error'
  | 'global:waiting'
  | 'global:active'
  | 'global:stalled'
  | 'global:progress'
  | 'global:completed'
  | 'global:failed'
  | 'global:paused'
  | 'global:resumed'
  | 'global:cleaned'
  | 'global:drained'
  | 'global:removed';

You can also use the BullQueueEvents and BullQueueGlobalEvents enums.

Fortunately, there is a shorthand decorator for each of the Bull events:

  • @OnQueueError()
  • @OnQueueWaiting()
  • @OnQueueActive()
  • @OnQueueStalled()
  • @OnQueueProgress()
  • @OnQueueCompleted()
  • @OnQueueFailed()
  • @OnQueuePaused()
  • @OnQueueResumed()
  • @OnQueueCleaned()
  • @OnQueueDrained()
  • @OnQueueRemoved()
  • @OnGlobalQueueError()
  • @OnGlobalQueueWaiting()
  • @OnGlobalQueueActive()
  • @OnGlobalQueueStalled()
  • @OnGlobalQueueProgress()
  • @OnGlobalQueueCompleted()
  • @OnGlobalQueueFailed()
  • @OnGlobalQueuePaused()
  • @OnGlobalQueueResumed()
  • @OnGlobalQueueCleaned()
  • @OnGlobalQueueDrained()
  • @OnGlobalQueueRemoved()

If you need more details about those events, head straight to Bull's reference doc.

Example

Here is a pretty self-explanatory example on how this package's decorators should be used.

import {Processor, Process, OnQueueActive, OnQueueEvent, BullQueueEvents} from '../../lib';
import {NumberService} from './number.service';
import {Job, DoneCallback} from 'bull';

@Processor()
export class MyQueue {
  private readonly logger = new Logger('MyQueue');

  constructor(private readonly service: NumberService) {}

  @Process({ name: 'twice' })
  processTwice(job: Job<number>) {
    return this.service.twice(job.data);
  }

  @Process({ name: 'thrice' })
  processThrice(job: Job<number>, callback: DoneCallback) {
    callback(null, this.service.thrice(job.data));
  }

  @OnQueueActive()
  onActive(job: Job) {
    this.logger.log(
      `Processing job ${job.id} of type ${job.name} with data ${job.data}...`,
    );
  }

  @OnQueueEvent(BullQueueEvents.COMPLETED)
  onCompleted(job: Job) {
    this.logger.log(
      `Completed job ${job.id} of type ${job.name} with result ${job.returnvalue}`,
    );
  }
}

Separate processes

This module allows you to run your job handlers in fork processes. To do so, add the filesystem path to a file (or more) exporting your processor function to the processors property of the BullModule options. You can read more on this subject in Bull's documentation.

Please note that, your function being executed in a fork, Nestjs' DI won't be available.

Example

// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from 'nest-bull';
import { join } from 'path';

@Module({
  imports: [
    BullModule.register({
      processors: [ join(__dirname, 'processor.ts') ]
    }) 
  ]
})
export class AppModule {}
// processor.ts
import { Job, DoneCallback } from 'bull';

export default function(job: Job, cb: DoneCallback) {
  cb(null, 'It works');
}

People

0.9.0

5 years ago

0.8.4

5 years ago

0.8.3

5 years ago

0.8.2

5 years ago

0.8.1

5 years ago

0.8.0

6 years ago

0.7.0

6 years ago

0.6.2

6 years ago

0.6.1

6 years ago

0.6.0

6 years ago

0.5.0

6 years ago

0.4.0

6 years ago

0.3.1

6 years ago

0.3.0

6 years ago

0.2.5

6 years ago

0.2.4

6 years ago

0.2.3

6 years ago

0.2.2

7 years ago

0.2.1

7 years ago

0.2.0

7 years ago

0.1.0

7 years ago