1.1.2 • Published 19 days ago

@buka/nestjs-kafka v1.1.2

Weekly downloads
-
License
MIT
Repository
github
Last release
19 days ago

@buka/nestjs-kafka

version downloads dependencies license Codecov

Warning!!! This packages had be deprecated. Nestjs microservice has fully implemented this function. There is no reason to maintain this package.

This is a nestJS module implemented using KafkaJS. That support multiple connections and fits the coding style of nestjs.

Usage

Import KafkaModule.forRoot to AppModule:

// app.module.js
import { Module } from "@nestjs/common";
import { KafkaModule } from "@buka/nestjs-kafka";

@Module({
  imports: [
    KafkaModule.forRoot({
      name: "my-kafka",
      groupId: "my-group-id",
      clientId: "my-client-id",
      brokers: ["my_kafka_host:9092"],
    }),
  ],
})
export class AppModule {}

KafkaConsumer

Create a provider named AppConsumer that consume messages:

// app.consumer.js
@Injectable()
@KafkaConsumer()
export class AppConsumer {
  @KafkaConsume("my-topic")
  async finishTask(@KafkaMessage() message: string): Promise<void> {
    // do something
    console.log(message);
  }

  @KafkaConsume("other-topic", { json: true })
  async finishTask(
    @KafkaMessage() message: Record<string, any>
  ): Promise<void> {
    // do something
    console.log(message);
  }
}

AppConsumer and AppService can be merged into one provider, but writing them separately will make the code clearer.

Then, append AppConsumer to AppModule:

import { Module } from "@nestjs/common";
import { AppConsumer } from "./app.consumer";

@Module({
  imports: [
    /* ... */
  ],
  providers: [AppConsumer],
})
export class AppModule {}

KafkaProducer

KafkaProducer will connect on module init and disconnect on module destroy. To use this, import KafkaModule.forProducer(options) to AppModule:

// app.module.js
import { Module } from "@nestjs/common";
import { KafkaModule, Partitioners } from "@buka/nestjs-kafka";
import AppService from "./app.service";

@Module({
  imports: [
    KafkaModule.forRoot({
      name: "my-kafka",
      groupId: "my-group-id",
      clientId: "my-client-id",
      brokers: ["my_kafka_host:9092"],
    }),
    KafkaModule.forProducer({
      name: "my-kafka",
      createPartitioner: Partitioners.LegacyPartitioner,
    }),
  ],
  provider: [AppService],
})
export class AppModule {}

The options of .forProducer is exactly the same as the options of kafka.producer in KafkaJS

Inject KafkaProducer to your AppService:

// app.service.js
@Injectable()
export class AppService {
  constructor(
    @InjectKafkaProducer('my-kafka')
    private readonly producer: KafkaProducer
  ) {}

  async sendMessage() {
    this.producer.send({
      topic: 'kafka-topic'
      messages: [{ value: 'Hello Kafka' }]
    })
  }
}

The .send function of KafkaProducer is exactly the same as the .send function of KafkaJS

KafkaService

Using the KafkaService, you can create consumer and producer like plain KafkaJS.

// app.service.js
import { OnModuleDestroy, OnModuleInit } from "@nestjs/common";
import { Producer, ProducerRecord, RecordMetadata } from "kafkajs";
import { KafkaService } from "@buka/nestjs-kafka";

@Injectable()
export class AppService implements OnModuleInit, OnModuleDestroy {
  producer!: Producer;
  consumer!: Consumer;

  constructor(private readonly kafka: KafkaService) {}

  async onModuleInit(): Promise<void> {
    this.producer = this.kafka.producer();
    await this.producer.connect();

    this.consumer = this.kafka.consumer({
      groupId: "my-group-id",
    });

    this.consumer.subscribe({ topic: "kafka-topic" });
    this.consumer.run({
      eachMessage: async (context) => {
        // do somethings
      },
    });
  }

  async onModuleDestroy(): Promise<void> {
    await this.producer.disconnect();
    await this.consumer.disconnect();
  }
}

Q&A

KafkaConsumer not working with CreateRequestContext of mikro-orm

If you don't pay attention to the order of CreateRequestContext decorators, you may have problems with any of other method decorators, not only @buka/nestjs-kafka.

import { Injectable } from "@nestjs/common";
import { KafkaConsumer, KafkaConsume, KafkaMessage } from "@buka/nestjs-kafka";
import { CreateRequestContext } from "@mikro-orm/mysql";

// app.consumer.js
@Injectable()
@KafkaConsumer()
export class AppConsumer {
  @CreateRequestContext()
  // !! KafkaConsume decorator will not work !!
  @KafkaConsume("my-topic")
  async finishTask(@KafkaMessage() message: string): Promise<void> {
    console.log(message);
  }
}

There are two solutions:

  1. recommend written as two functions:

    @Injectable()
    @KafkaConsumer()
    export class AppConsumer {
      @KafkaConsume("my-topic")
      async consumeMessage(@KafkaMessage() message: string): Promise<void> {
        // ... filter and format message
        this.finishTask(JSON.parse(message))
      }
    
      @CreateRequestContext()
      async finishTask(task: Task): Promise<void> {
        // do something
        console.log(task);
      }
  2. Pay attention to the order of CreateRequestContext:

    @Injectable()
    @KafkaConsumer()
    export class AppConsumer {
      @KafkaConsume("my-topic")
      // use CreateRequestContext as the last decorator
      @CreateRequestContext()
      async finishTask(@KafkaMessage() message: string): Promise<void> {
        // do something
        console.log(message);
      }
    }