0.1.1 • Published 2 months ago

@tawk.to/nestjs-batch-kafka v0.1.1

Weekly downloads
-
License
MIT
Repository
github
Last release
2 months ago

Description

Process and Publish Kafka message by batch in NestJS. Cross compatible with ServerKafka and ClientKafka from @nestjs/microservices package.

Installation

$ npm i --save @tawkto/nestjs-batch-kafka

Overview

To use the batch kafka consumer, initialize BatchKafkaServer in your main.ts file by connecting the microservice to your app.

const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
	// The config is the same as the KafkaOptions from the @nestjs/microservices package
	strategy: new KafkaBatchServer({
		client: {
			brokers: ['localhost:52800', 'localhost:52801'],
        },
        consumer: {
          groupId: 'test',
          heartbeatInterval: 5000,
          sessionTimeout: 30000,
        },
        run: {
          autoCommitInterval: 5000,
          autoCommitThreshold: 100,
          partitionsConsumedConcurrently: 4,
        },
	})
})

Then you can start consuming the events in batches as follow

@BatchProcessor('test')
  async test(
    @Payload() data: any[],
    @Ctx() context: KafkaBatchContext,
  ) {
    const heartbeat = context.getHeartbeat();
    const resolveOffset = context.getResolveOffset();
    const commitOffsetsIfNecessary = context.getCommitOffsetsIfNecessary();

    await heartbeat();

    for (const message of data) {
      console.log(message);
    }

    resolveOffset(context.getMessages().at(-1).offset);
    console.log("Batch resolved");

    await heartbeat();
    await commitOffsetsIfNecessary();
  }

Context

The KafkaBatchContext object provides the necessary components from kafkajs's EachBatchPayload:

Client

The KafkaBatchClient is exactly the same as the KafkaClient from the @nestjs/microservices package, except that client.send method is removed from the client as batch messages should not be used for request-response communication. On top of that, KafkaBatchClient also have the capability to publish batch messages or publish to multiple topics just like in kafkajs.

@Module({
	imports: [
		ClientsModule.register([{
			name: 'KAFKA_BATCH_CLIENT',
			customClass: KafkaBatchClient,
			options: {
				client: {
				brokers: ['localhost:52800', 'localhost:52801'],
				},
				consumer: {
				groupId: 'test',
				heartbeatInterval: 5000,
				sessionTimeout: 30000,
				},
			},
		}]),
	],
})
export class AppModule {}

Then you can inject and use the KafkaBatchClient in your service as follow

@Injectable()
export class AppService {
	constructor(
		@Inject('KAFKA_BATCH_CLIENT')
		private kafkaClient: KafkaBatchClient,
	) {}

	async eventToBatch() {
		this.kafkaClient.emit('test', { example: 'data'});
	}

	async publishBatch() {
		// equivalent to kafkajs producer.send
		this.kafkaClient.emitBatch('test', [{
			example: 'data1'
		}, {
			example: 'data2'
		}])
	}

	async publishBatchTopics() {
		// will publish to two topics, topic1 and topic2
		// equivalent to kafkajs producer.publishBatch
		this.kafkaClient.emitBatchTopics([{
			pattern: 'topic1',
			data: [{ example: 'data11' }, { example: 'data12' }]
		}, {
			pattern: 'topic2',
			data: [{ example: 'data21' }, { example: 'data22' }]
		}])
	}
}

Calling send with the KafkaBatchClient will result in an error.

this.kafkaClient.send('send', { data: 'data'}); // Error
0.1.1

2 months ago

0.1.0

2 months ago

0.0.1

4 months ago