2.0.0 • Published 1 year ago

nestjs-nsq-transporter v2.0.0

Weekly downloads
-
License
MIT
Repository
github
Last release
1 year ago

Nestjs Transporter For NSQ

Overview

This library provides a nestjs transporter by taking NSQ as its message broker. We could simply take the concept of a transporter as an event-driven framework as follows:

  • As a message producer, I could send a message to NSQ to various topics by emitting various events based on the topic names.
  • As a message consumer, I could receive a message by subscribing to those emitted events.

For more details on the overall architecture, please kinldy refer to this article

Restriction

This library has not implemented the request-response communication style and currently it only supports the event-dispatching style.

Produce a message

Add the nsq client provider in your module's definition:

import { DynamicModule, Module } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { ClientNsq, NsqOptions } from 'nestjs-nsq-transporter';

@Module({
  providers: [
    {
      provide: 'NSQ_CLIENT',
      useFactory: (): ClientProxy => new ClientNsq(options),
    }
    SomeService,
  ],
})
export class SomeModule {}

Then you can inject the ClientNsq instance and use it to emit the message event:

import { Injectable } from '@nestjs/common';

@Injectable()
export class SomeService {
  constructor(@Inject('NSQ_CLIENT') private nsqProducer: ClientNsq) {}

  sendMessage(topic: string, msg: any) {
    return this.nsqProducer.emit(topic, msg);
  }
}

Notes: You can not blindly pass any key/value as the 2nd argument(msg) of the emit function because there are two special reserved keys: meta and options. The meta is used by the default OutboundEventSerializer to attach extra informations; options is for setting the options for the nsq message producing, see the sample below.

    this.nsqProducer.emit(topic,
      { foo: 1, bar: 2, meta: { component: 'XYZ'}, options: {retry: { retries: 3 } }
    );
    // The `emit` above will send a nsq message with payload as follow with 3 times retry strategy.
    { data: { foo: 1, bar: 2 }, meta: { component: 'XYZ'} }

Receive a message

Connect to the nsq microservice in your app and specify the nsq options as follows:

import { MicroserviceOptions } from '@nestjs/microservices';
import { serverNsq } from 'nestjs-nsq-transporter';

app.connectMicroservice<MicroserviceOptions>({
  strategy: new ServerNsq(options),
});

Then use @EventPattern to mark a function as the handler for messages from specific event pattern:

import { Injectable } from '@nestjs/common';
import { EventPattern, Ctx, Payload } from '@nestjs/microservices';

@Controller()
export class AppController {

  @EventPattern({
    topic: 'topic1',
    channel: 'channel1',
    options: { // optional
      maxAttempts: 3
    }
  })
  messageHandlerForTopic1(@Payload() payload: any, @Ctx() context: NsqContext)
    // Handle messages
    // Notes: if you throw an Error from this messageHandler, it's better to use `RpcException` so that we have explicit log in nsq-transporter.
  }
}

Definition of NsqOptions

The options that passed to either ServerNsq or ClientNsq has the type as NsqOptions and the available fields are as folows:

Field NameIs RequiredTypeDescriptionExample
lookupdHTTPAddressesNostring[]http address list for nsq lookupds['http://localhost:4161']
strategyNo'round_robin' or 'fan_out'message sending strategyround_robin
discardHandlerNo(arg: any) => voidhandler function to process when message is discarded(arg: any) => console.log(arg)
maxInFlightNonumberThe maximum number of messages to process at once1
requeueDelayNonumberThe default amount of time (milliseconds) a message requeued should be delayed by before being dispatched by nsqd.90000
lookupdPollIntervalNonumberThe frequency in seconds for querying lookupd instances.60
maxAttemptsNonumberThe number of times a given message will be attempted (given to MESSAGE handler) before it will be handed to the DISCARD handler and then automatically finished3
serializerNoSerializer in @nestjs/microservicesThe instance of Serializer class which provides a serialize method to serialize the outbound messageserialize(value: any) => value
deserializerNoConsumerDeserializer in @nestjs/microservicesThe instance of ConsumerDeserializer class which provides a deserialize method to deserialize the inbound messagedeserialize(value: any) => value
2.0.0

1 year ago

1.4.2

2 years ago

1.4.1

2 years ago

1.4.0

2 years ago

1.3.0

2 years ago

1.2.2

2 years ago

1.2.1

2 years ago

1.2.0

2 years ago

1.1.0

2 years ago

1.0.2

2 years ago

1.0.1

2 years ago

1.0.0

2 years ago