1.3.0 • Published 5 years ago

nest-rabbit-tasks v1.3.0

Weekly downloads
2
License
MIT
Repository
github
Last release
5 years ago

nest-rabbit-tasks

nest-rabbit-tasks is a TaskQueue based upon RabbitMQ for NestJS.

NPM

David npm bundle size

CircleCI

Quality Gate Status Maintainability Rating Reliability Rating Security Rating

rabbit

Motivation

By working hard on abstracting NestJS injection and RabbitMQ connection, we wanted to make it easy to do background tasks in NestJS.

nest-bull is a reference in this domain: it use bull and redis to provide the task queue. Moreover nest-bull and bull are the state of art about how TaskQueue complexity can be hidden and abstracted. With them, building a queue is actually fun.

However, complex system may need other technologies than redis: RabbitMQ provides more features, such as exchange (to allow task routing).

We needed it on a project, so here it is made available for you.

Disclaimer

This is beta quality software. We just started it so it has probably a few bugs.

We are actively working on it so it may break without notice / API may change.

:warning: Use with caution, be ready to pin a version and make some modification on the code :warning:

Contributors ✨

Thanks goes to these wonderful people (emoji key):

This project follows the all-contributors specification. Contributions of any kind welcome!

Get Started

Declare the module:

import { Module } from '@nestjs/common';
import { NestRabbitTasksModule } from 'nest-rabbit-tasks';

// Import the worker.
import { EmailWorker } from './worker/email.worker';
// And import some services.
import { SMTPService } from './service/smtp.service';

@Module({
  imports: [
    // Note that there is also a `registerAsync` method
    // that allows the parameters to depend on an injected configService.
    // You can have a look at the interface to see what options it takes.
    NestRabbitTasksModule.registerSync({
      // That is not the name of RabbitMQ queue
      // but an unique reference that identify the queue and the worker
      // and allow us to link the conf here and the worker implementation there
      reference: 'worker-email-queue',
      // Queue or exchange (but exchange are not implemented yet ^^).
      entityType: 'queue',
      // AMQP connection/channel wide options.
      amqpOptions: { connectionUrl: 'amqp://localhost:5672' },
      // Module option that dictate the worker behavior
      globalOptions: {
        // By default RabbitMQ create queue if the queue doesn't exist
        // By setting `{ immutableInfrastructure: true }` will throw if the queue does not exist
        // Note that this is not yet implemented yet but will be pretty soon
        immutableInfrastructure: true,
        // How many messages the Worker should take.
        // Increasing it increase the throughput but decrease the consistency
        prefetchSize: 1,
      },
      // Definition of the queue and queue specific options
      // such as `single-active-consumer`
      entity: { queueName: 'worker.queue.1', queueOptions: {} },
      // Worker used to handle the AMQP message
      worker: EmailWorker,
    }),
  ],
  // Here you should import the worker (EmailWorker)
  // and other service you need
  providers: [EmailWorker, SMTPService],
})
export class AppModule {}

// ---

// in main.js
import { NestFactory } from '@nestjs/core';

async function bootstrap() {
  // Start the module like always
  const app = await NestFactory.create(AppModule);
  // And launch it.
  // Having an HTTP port is useful to configure an health-check route
  await app.listen(3000);
}

and create a worker:

import { RabbitWorker, RabbitTasksWorker, RabbitTasksWorker } from 'nest-rabbit-tasks';

// You can specify what the Event looks like.
// By default it is any.
interface Event {
  name: string;
  recipient: string[];
  cc?: string[];
  bcc?: string[];
  content: string;
}

@RabbitTasksWorker({ reference: 'worker-email-queue' })
export class EmailWorker extends RabbitWorker<Event> {
  // Please do dependency injection
  // and inject what need for the worker to work
  public constructor(private readonly smtpService: SMTPService) {}

  // This method is mandatory
  public async handleMessage(data: Event, message: RabbitTasksWorker<Event, void>) {
    if (data.name === 'send-mail') {
      await this.smtpService.sendEmail(data.recipient, data.cc, data.bcc, data.content);
      // Acknowledge the message to remove it from the queue
      message.ack();
    } else {
      log.error('unknown event');
      if (message.getHeader('x-retries') < 3) {
        // Non acknowledge the message but with retry
        // to requeue it and process it later
        message.nack(true);
      } else {
        // Non acknowledge the message without retry
        // to remove it from the queue
        message.nack(false);
      }
    }
  }
}

Advanced usage

Async configuration

Why: your configuration may be dynamic, depends on env variable or API calls.

@Module({
  imports: [
    NestRabbitTasksModule.registerAsync({
      // The reference must be static and unique
      reference: 'toto',
      // Same for the entity type
      entityType: 'queue',
      // The handler is static too
      // (but that is not a mandatory constraint in the code, let me know if you have usages that)
      // (... requires it to be dynamic. I just found it made more sense like this in my use cases.)
      worker: TestWorker,
      // The rest of the options are dynamic
      // (We only provide `useFactory` for now but `useExisting` and `useClass` can be easily implemented)
      useFactory: async (configService: ConfigService) => {
        const queueName = await configService.getQueueName();
        return {
          entity: { queueName },
          amqpOptions: { connectionUrl: 'amqp://localhost:5672' },
          globalOptions: { immutableInfrastructure: true, prefetchSize: 1 },
        };
      },
      // For it to work you have to import a module, eg. a config module
      // that export a service, eg. a config service
      // and inject the configService, so `useFactory` can resolve it
      imports: [ConfigModule],
      inject: [ConfigService],
    }),
  ],
  providers: [TestWorker],
})
export class AppModule {}

Road-map

  • implement the immutableInfrastructure mode
  • connect Rabbit logger to nest Logger and improve debug logs
  • properly check that config is correct and report error if not
  • implement async configuration (registerAsync)
  • prevent Haredo deps to leak
  • implement @OnEvent(rabbitEventName: string) to decorate a method of the Worker that will be call when rabbitEventName is emitted in the queue
  • work on quality (unit tests, E2E tests)
  • improve the doc (registerAsync)

  • implement an Exchange class (so users can publish to exchange using this)

1.3.0

5 years ago

1.2.6

5 years ago

1.2.5

5 years ago

1.2.4

5 years ago

1.2.3

5 years ago

1.2.2

5 years ago

1.2.1

5 years ago

1.2.0

5 years ago

1.1.0

5 years ago

1.0.0

5 years ago

0.2.6

5 years ago

0.2.5

5 years ago

0.2.4

5 years ago

0.2.3

5 years ago

0.2.2

5 years ago

0.2.1

5 years ago

0.2.0

5 years ago

0.1.3

5 years ago

0.1.1

5 years ago

0.1.0

5 years ago