0.0.3 • Published 2 years ago

@claudeseo/nest-kafka v0.0.3

Weekly downloads
-
License
Unlicense
Repository
github
Last release
2 years ago

nest-kafka

npm.io npm-version Build Status Release date License: Unlicense

Description

Simple Integrate Kafka.js With Nest.js

Module Initialization

Register Kafka Module with the registerAsync method

import { ConfigModule, ConfigService } from '@nestjs/config';

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
    }),
    KafkaModule.registerAsync({
      inject: [ConfigService],
      useFactory: async (configService: ConfigService) => {
        const brokers = configService.get<string[]>('kafka.broker', [
          'localhost:9092',
        ]);

        return {
          consume_method: 'each',
          options: {
            client: {
              brokers,
              clientId: 'kafka-sample',
            },
            consumer: {
              groupId: 'kafka-sample',
            },
            producer: {
              allowAutoTopicCreation: false,
            },
            subscribe: {
              fromBeginning: false,
            },
            run: {
              autoCommit: false,
            },
          },
        };
      },
    }),
  ],
})
export class SomeModule {}

Configure

ConfigOptions
consume_method'each' or 'batch'
options
options.clienthttps://kafka.js.org/docs/configuration
options.consumerhttps://kafka.js.org/docs/consuming#options
options.producerhttps://kafka.js.org/docs/producing#options
options.run
options.run.autoCommittrue or false
options.subscribehttps://kafka.js.org/docs/consuming#frombeginning
options.subscribe.fromBeginningtrue or false
options.connect
options.connect.consumertrue or false
options.connect.producertrue or false

Consumer

const TOPIC_NAME = 'test';

@Controller()
export class AppController implements OnModuleInit {
  constructor(@InjectKafka() private readonly kafkaService: KafkaService) {}

  onModuleInit() {
    this.kafkaService.subscribeOf(TOPIC_NAME, this);
  }

  @KafkaSubscribeTo(TOPIC_NAME)
  async handler(payload: KafkaEachPayload): Promise<void> {
    console.log(payload);

    await this.kafkaService.commitOffset(
      payload.topic,
      payload.partition,
      payload.message.offset
    );
  }
}

Producer

@Injectable()
export class AppService {
  constructor(@InjectKafka() private readonly kafkaService: KafkaService) {}

  async sendMessage(): Promise<void> {
    await this.kafkaService.sendMessage({
      topic: 'test2',
      messages: [
        {
          key: null,
          value: 'Hello World',
        },
      ],
    });
  }
}