1.0.6 • Published 1 year ago

nestjs-kafka v1.0.6

Weekly downloads
1
License
ISC
Repository
github
Last release
1 year ago

NestJs Kafka Client

Description

A NestJS - KafkaJs Wrapper, wrapping on KafkaJS

Installation

npm install nestjs-kafka

Add it to the NestJS app.module.ts or any module

import { KafkaModule } from 'nestjs-kafka';

const serviceConfig = {
  clientConfig:  {
    clientId: 'go1-node-app',     // consumer client id
    brokers: ['localhost:9092'] // kafka broker address
  },
  consumerConfig: { groupId: "something" } // consumer group id
};

@Module({
    imports: [KafkaModule.forRoot(serviceConfig)],
    controllers: [],
    providers: [],
})
export class Module {}

How to sendMessage

import {KafkaService, KafkaPayload} from "nestjs-kafka";

@Injectable()
export class TaskKafkaProductService{
    constructor(private readonly kafkaService: KafkaService) {}

    public async sendPushTask(kafkaTaskDto: KafkaTaskDto): Promise<any> {
        const message: KafkaTaskDto = kafkaTaskDto;
        const payload: KafkaPayload = {
            messageId: '' + new Date().valueOf(),
            body: message,
            messageType: TASK_PUSH_INFO,
            topicName: TASK_PUSH_INFO,
        };
        this.kafkaService.sendMessage('test-kafka', payload);
    }
}

How to Subscribe Message

import {KafkaPayload, AbstractKafkaConsumer} from "nestjs-kafka";

@Injectable()
export class TaskKafkaConsumerService extends AbstractKafkaConsumer {

    constructor() {
        super();
    }
    // register topic
    protected registerTopic(): any {
        this.addTopic('task.push.info');
        this.addTopic('test-group');
    }

    @SubscribeTo('task.push.info')
    taskSubscriber(payload: string ): any {
        const data: KafkaPayload = JSON.parse(payload);
    }

    /**
     * When application or container scale up &
     * consumer group id is same for application
     * @param payload
     */
    @SubscribeToFixedGroup('test-group')
    helloSubscriberToFixedGroup(payload: KafkaPayload): any {}
}