0.1.0 • Published 11 months ago

@chance-get-yours/nestjs-nats-jetstream-microservice v0.1.0

Weekly downloads
-
License
MIT
Repository
github
Last release
11 months ago

NATS JetStream Custom Transporter for NestJS microservice

Goals of this library

Install

Library

npm i @nestjs/microservices
npm i nats
npm i @chance/nestjs-nats-jetstream-microservice

Server

NATS server could run locally

docker run -d --name nats -p 4222:4222 -p 6222:6222 -p 8222:8222 nats --jetstream -m 8222

or using Synadia NGS for quick setup.

Cli

NATS cli covers all needs from the command line

brew tap nats-io/nats-tools
brew install nats-io/nats-tools/nats

Or download official release.

Configuration

Streams

Streams are 'message stores', each stream defines how messages are stored and what the limits (duration, size, interest) of the retention are. Streams consume normal NATS subjects, any message published on those subjects will be captured in the defined storage system. You can do a normal publish to the subject for unacknowledged delivery, though it's better to use the JetStream publish calls instead as the JetStream server will reply with an acknowledgement that it was successfully stored.

Subjects can be queried using NATS syntax

Configurations options could be set using the library

const bootstrap = async () => {
  const options: CustomStrategy = {
    strategy: new NatsJetStreamServer({
      connectionOptions: {
        servers: "127.0.0.1:4222",
        // Name the client for connection hostname to avoid overlap
        name: `nats-connection.${os.hostname()}`,
      },
      // Stream will be created if not exist
      // To work we need all this stream to be available
      assertStreams: [
        {
          name: "booking",
          description: "Booking domain with all its events",
          subjects: ["booking.>"],
        } as Partial<StreamConfig>
      ],
    }),
  };

  // hybrid microservice and web application
  const app = await NestFactory.create<NestFastifyApplication>(HotelBookingModule);
  const microService = app.connectMicroservice(options);
  await microService.listen();
  return app;
};
bootstrap();

Consumer options

A consumer is a stateful view of a stream. It acts as interface for clients to consume a subset of messages stored in a stream and will keep track of which messages were delivered and acknowledged by clients. Unlike with core NATS which provides an at most once delivery guarantee of a message, a consumer can provide an at least once delivery guarantee.

Configuration options could be set using library and the decorator. Or be set using the cli and using the named consumer with the decorator

@Controller()
export class BotNatsController {
  constructor(private scheduleCleaning: ScheduleCleaningCommandHandler) {}

  // Consumer will be created if not exists
  // Updated if exists
  @EventPattern("ConsumerName", {
    description: "Trigger cleaning side effect when room is booked",
    filter_subject: "booking.*.room-booked-event.>",
    deliver_to: "cleanupInbox",
    durable: "cleanupStack",
    manual_ack: true,
  } as ConsumeOptions)
  async cleanup(
    @Payload() event: RoomBookedEvent,
    @Ctx() context: NatsJetStreamContext
  ) {
    // DO the work
    context.message.ack();
  }
}

Publishing events

@Module({
  imports: [
    NatsJetStreamTransport.register({
      connectionOptions: {
      servers: "127.0.0.1:4222",
      name: "hotel-booking-publisher",
    }})
  ],
  controllers: [BotNatsController,],
  providers: [],
})
export class HotelBookingModule {}
@Injectable()
export class BookRoomCommandHandler {
  constructor(private client: NatsJetStreamClient) {}
  async handle(command: BookRoomCommand) {
    // CloudEvent like syntax here, but nothing's mandatory
    const event = new RoomBookedEvent(
      { ...command.data, date: isoDate.toISOString() },
      source,
      correlationId
    );
    const uniqueBookingSlug = `booked-${correlationId}`;
    this.client
      .publish(
        'my.super.subject',
        event,
        // deduplication trick : booking slug is unique using message ID
        // dupe-window should be configured on stream, default 2mn
        { msgID: uniqueBookingSlug }
      )
      .then((res: PubAck) => {
        if (!res.duplicate) {
          return res;
        }
        throw new ConflictException('MsgID already exists error');
      });
  }
}