1.1.1 • Published 9 months ago

nestjs-redis-stream-microservice v1.1.1

Weekly downloads
-
License
MIT
Repository
github
Last release
9 months ago

npm npm GitHub issues GitHub Repo stars GitHub forks

Installation

## npm
npm install --save nestjs-redis-stream-microservice
yarn add nestjs-redis-stream-microservice

Note: The library was developed based on the nestjs-redis-streams package.

Introduce

NestJs Basic Redis Pub/Sub transporter Problems

SEE : https://github.com/nestjs/nest/issues/3960

  • The official Redis Transporter provided by NestJs is implemented based on Redis PUB/SUB and is not suitable for use when the SUB server is a distributed system.
  • We implemented a redis-stream based Custom Transporter to ensure that messages are not duplicated and that messages are received or not.

What I wanted to improve from the library I referenced

  • Inconvenience of having to format response data
  • Message handler must be configured with the specified decorator
  • User needs to know technical concepts for setting up redis-stream

Structure & Concept

alt text The library implements the NestJs Custom Transporter, which can be used in a NestJs-based MSA distributed environment.

  • Server Mode Support
    • server mode is the mode used by the Responder server on the right side of the image.
    • mode used when processing requests from requesters and configuring responses and returning them.
    • e.g. nestjs Micro Service App
  • Client Mode Support
    • client mode is the mode used by the requester server on the left side of the image.
    • You can configure and send a message to the responder, receive a response, or forget it after sending (Fire&Forgot);
    • e.g. nestjs API Gateway App or other Micro Service App

Basically, it is designed to leverage Redis Stream capabilities to implement the Custom Transporter interface provided by Nestjs.

Usage

Client Mode (Requestor App)

(1) Register the client provider

// each module that you want to send a message to
@Module({
  imports: [
    // sync mode
    RedisStreamClientModule.register({
      connection: {
        host: '127.0.0.1',
        port: 6388,
        password: 'beobwoo',
      },
    }),

    // async mode
    RedisStreamClientModule.registerAsync({
      useFactory: (configService: ConfigService) => ({
        connection: {
          host: configService.get('HOST'),
          port: configService.get('PORT'),
          password: configService.get('PASSWORD'),
        },
      }),
      inject: [ConfigService],
    }),
  ],
})
export class AppModule {}

Note : When using client mode, it will be modified so that it can only be registered once in the root module globally.

(2) Enable shutDownHook in main.ts

// main.ts
async function bootstrap() {
  const app = await NestFactory.create(AppModule);
  // NOTE : ShutDownHook must be enabled to automatically delete the consumer of redis-stream.
  // If you do not enable it, you must manually delete the consumer in the redis-stream.
  app.enableShutdownHooks();
  await app.listen(3000);
}
  • To avoid wasting Redis resources, it includes the ability to automatically clean up unnecessary Redis Stream resources upon server shutdown.
  • You must enable enableShutdownHooks to ensure that the resource cleanup function works correctly.

(3) Send the message to the server using the client

// your service or controller
import { ClientProxy } from '@nestjs/microservices';

constructor(
  @InjectRedisStreamClient()
  private readonly client: ClientProxy,
) {}

If you received the client instance that you registered in course 1, you can use the client instance to send a message to the server. Depending on the purpose, the message can be sent in two ways.

(3-1) Fire And Forgot

Note : Emit a message without waiting for a response

@Controller()
export class Requestor1Controller {
  constructor(
    @InjectRedisStreamClient()
    private readonly client: ClientProxy,
  ) {}

  @Post()
  emit() {
    this.clientProxy.emit('stream:name:hear', data);
    return 'Emit Success';
  }
}

(3-2) Send a message and get a response

@Controller()
export class Requestor1Controller {
  constructor(
    @InjectRedisStreamClient()
    private readonly client: ClientProxy,
  ) {}

  @Post()
  send() {
    const response$ = this.clientProxy.send('stream:name:hear', data);
    const response = await lastValueFrom(observable); // get the last value from the observable
    return JSON.stringify(response);
  }
}
  • Internally, it generates its own response stream.
  • Even if the server that sent the request is deployed, the response is returned only to the exact server that sent the request.

    Note : use uuid v4 to identify the requester and cause extremely low probability of collision.

Server Mode (Receiver App)

(1) Enable shutDownHook in main.ts and Transport Strategy

// main.ts
async function bootstrap() {
  const app = await NestFactory.create(Responder1Module);

  app.connectMicroservice({
    strategy: new RedisStreamServer({
      connection: {
        host: '127.0.0.1',
        port: 6388,
        password: 'beobwoo',
      },
      option: {
        // The logical group that wants to receive the message.
        // NOTE : Give the same group name if it is a distributed server with the same functionality
        consumerGroup: 'my-server',
      },
    }),
  });
  // NOTE : ShutDownHook must be enabled to automatically delete the consumer of redis-stream.
  // If you do not enable it, you must manually delete the consumer in the redis-stream.
  app.enableShutdownHooks();

  await app.startAllMicroservices();
  await app.listen(3080);
}
bootstrap();

(2) Handle the messages comming from a Requestor

@Controller()
export class WelcomeEmailController {
  @MessagePattern('stream-1')
  async returnObservable(@Payload() data: any) {
    return of([1, 2, 3]);
  }

  @MessagePattern('stream-2')
  async returnObject(@Payload() data: any) {
    return {
      status: 'good',
    };
  }

  @MessagePattern('stream-3')
  async returnPrimitiveType(@Payload() data: any) {
    return true;
  }
}

Note : The feature to get request metadata using @Ctx() will be added in the next version.

  • Return data configurations are designed to support return of response data configurations in all possible ways, including Observable, Object, and Primary type, to free themselves without a fixed format.

Test

The entire git project includes an e2e test set to verify normal behavior in a distributed environment. The following procedures must be followed to run the test.

Note : Don't worry! The npm package only has a necessary source code.

Unit Test

yarn test

E2E Test

Set Up

yarn test:set-up

# start Requestor 1
yarn start:dev requestor-1
# start Requestor 2
yarn start:dev requestor-2
# start Responder 1
yarn start:dev responder-1
# start Responder 2
yarn start:dev responder-2

Start e2e Test

yarn test:e2e

Stay in touch

Author/Developer - KIMBEOBWOO

License

Nest is MIT licensed.