1.1.3 • Published 3 years ago

@fpt-smart-cloud/nestjs-kafka v1.1.3

Weekly downloads
-
License
MIT
Repository
github
Last release
3 years ago

Description

Kafka module for Nest.

Installation

$ npm i --save  @fpt-smart-cloud/nestjs-kafka

Synchronous Module Initialization

Register the KafkaModule synchronous with the register() method:

@Module({
  imports: [
    KafkaModule.register([
      {
        name: 'HERO_SERVICE',
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
  ...
})

Asynchronous Module Initialization

Register the KafkaModule asynchronous with the registerAsync() method:

import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot(),
    KafkaModule.registerAsync(['HERO_SERVICE'], {
            useFactory: async (configService: ConfigService) => {
                const broker = this.configService.get('broker');
                return [
                    {
                        name: 'HERO_SERVICE',
                        options: {
                              clientId: 'hero',
                              brokers: [broker],
                            },
                            consumer: {
                              groupId: 'hero-consumer'
                            }
                        }
                    }
                ];
            },
            inject: [ConfigService]
        })
  ]
  ...
})

Asynchronous Module Initialization using config service

import { KafkaModuleOption, KafkaOptionsFactory } from '../kafka';
import { KAFKA_INTEGRATION_SERVICE } from '../common/constants';

export class KafkaConfigService implements KafkaOptionsFactory {
  creatKafkaModuleOptions(): KafkaModuleOption[] {
    return [
      {
        name: 'HERO_SERVICE',
        options: {
          client: {
            clientId: clientId,
            brokers: brokers,
          },
          consumer: {
            groupId: clientId,
          },
        },
      },
    ];
  }
}

import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
import { AppService } from './app.service';
import { KafkaModule } from '@kimtuan1102/nestjs-kafka';

@Module({
  imports: [
    KafkaModule.registerAsync(['HERO_SERVICE'], {
      imports: [ConfigModule],
      useClass: KafkaConfigService,
    },
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule {}

Full settings can be found:

ConfigOptions
clienthttps://kafka.js.org/docs/configuration
consumerhttps://kafka.js.org/docs/consuming#options
producerhttps://kafka.js.org/docs/producing#options
serializer
deserializer
consumeFromBeginningtrue/false

Subscribing

Subscribing to a topic to accept messages.

export class Consumer {
  constructor(
    @Inject('HERO_SERVICE') private client: KafkaService
  ) {}

  onModuleInit(): void {
    this.client.subscribeToResponseOf('hero.kill.dragon', this)
  }

  @SubscribeTo('hero.kill.dragon')
  async getWorld(@Payload() data: KafkaMessage): Promise<void> {
    ...
  }

}

Producing

Send messages back to kafka.

const TOPIC_NAME = 'hero.kill.dragon';

export class Producer {
  constructor(
    @Inject('HERO_SERVICE') private client: KafkaService
  ) {}

  async post(message: string = 'Hello world'): Promise<RecordMetadata[]> {
    const result = await this.client.send({
      topic: TOPIC_NAME,
      messages: [
        {
          key: '1',
          value: message
        }
      ]
    });

    return result;
  }

}

Support

Nest is an MIT-licensed open source project. It can grow thanks to the sponsors and support by the amazing backers. If you'd like to join them, please read more here.

Stay in touch

License

Nest is MIT licensed.