0.2.0 • Published 9 months ago

nest-nsq-transport v0.2.0

Weekly downloads
-
License
MIT
Repository
github
Last release
9 months ago

nest-nsq-transport

The most basic and unopinionated implementation of NSQ transport for NestJS microservices.

finish() is called automatically when no errors are thrown while handling, otherwise requeue() is called.



install

npm i nest-nsq-transport nsqjs
npm i -D @types/nsqjs

configure

setup server:

import { Message } from 'nsqjs';
import { NSQStrategy, NSQStrategyOptions, NSQPattern } from 'nest-nsq-transport';

NestFactory.createMicroservice(
  AppModule,
  {
    strategy: new NSQStrategy(<NSQStrategyOptions>{
      // Optional deserializer, please see implementation in the sources.
      //
      // Default deserializer converts message data from Buffer to utf8 string.
      // Could be replaced with the built-in JSONDeserializer or a custom one.
      deserializer: new MyDeserializer();

      // optional default options that will be passed to all nsqjs.Reader's of
      // each channel. It's possible to skip this field and describe options
      // for each channel separately with `channels` field
      defaultChannelOptions: {
        nsqdTCPAddresses: [process.env.NSQD_TCP_ADDR],
        // or
        lookupdHTTPAddresses: [process.env.LOOKUPD_HTTP_ADDR],

        // and all the other options, see here:
        // https://github.com/dudleycarr/nsqjs#new-readertopic-channel-options

        // with one extra option:
        // as `requeue` is called automatically on error, there is no way to
        // pass parameters, but it's possible to provide either default args:
        requeueParams: [1, true],
        // or a callback
        requeueParams: (message: Message, pattern: NSQPattern) => [1, true],
      },

      // Optional map of topic -> channel -> options
      channels: {
        'my-topic': {
          'my-channel': {
            // this will be merged with the defaultChannelOptions
          },
        },
      },
    });
  },
)

setup client:

import { ClientsModule } from '@nestjs/microservices';
import { NSQClient, NSQClientOpitons } from 'nest-nsq-transport';

export const clientToken = Symbol();

@Module({
  imports: [
    ClientsModule.register([
      {
        name: clientToken,
        customClass: NSQClient,
        options: <NSQClientOpitons>{
          // Optional serializer, please see implementation in the sources.
          // 
          // Default serializer converts emitted data to a string and pass it as
          // a Buffer to nsqjs.Writer.publish method. Could be replaced with the
          // built-in JSONSerializer or a custom one.
          serializer: new MySerializer(),

          // Connections options:
          // ether full nsqd tcp address
          nsqdURL: process.env.NSQD_TCP_ADDR, // in `host:port` pattern
          // or host and port separately
          nsqdHost: process.env.NSQD_HOST,
          nsqdPort: Number(process.env.NSQD_PORT),

          // Other options from nsqjs.ConnectionConfigOptions, see here:
          // https://github.com/dudleycarr/nsqjs#new-writernsqdhost-nsqdport-options
          // ...
        },
      },
    ]),
  ],
})
class AppModule {}

usage

import { ClientProxy, EventPattern, Payload, Ctx } from '@nestjs/microservices';
import { NSQContext } from 'nest-nsq-transport';

@Controller()
class TestController {
  constructor(
    // Token that passed to ClientsModule.register
    @Inject(clientToken) private readonly client: ClientProxy
  ) {}

  // To get only payload no decorators needed
  @EventPattern('my-topic/my-channel')
  async handle(payload: MyType) {
    //
  }

  // Decorators are required to get context
  @EventPattern('my-topic/my-channel-two')
  async handle(
    @Payload() payload: MyType,
    @Ctx() ctx: NSQContext
  ) {
    // 
  }

  async emit() {
    // This is an example how to emit data. Serializer which converts data to
    // Buffer have to be provided when client is registered. There are two
    // built-in serializers: JSONSerializer and StringSerializer
    await this.client.emit('my-topic', data as MyType).toPromise();
  }

  // Also if `maxAttempts` is set in NSQStrategyOptions message could be
  // discarded. It's possible to subscribe on discarded messages in a separate
  // handler adding `/discard` suffix to pattern. If `maxAttempts` is set and
  // such separate discard-handler is not provided then discarded message will
  // be sent to the original handler again. But it's always possible to check
  // current attempt with `ctx.message.attempts`, which starts from 1.
  @EventPattern('my-topic/my-channel/discard')
  async handle(
    @Payload() payload: MyType,
    @Ctx() ctx: NSQContext
  ) {
    await this.client.emit(
      `${ctx.pattern.topic}-${ctx.pattern.channel}-dead-letter`,
      payload,
    ).toPromise()
  }
}
0.2.0-alpha.060dc4

9 months ago

0.1.2-alpha.f8aa03

9 months ago

0.2.0-alpha.640b17

9 months ago

0.2.0

9 months ago

0.1.2

1 year ago

0.1.1

1 year ago

0.1.0

1 year ago