0.0.4 • Published 10 months ago
tuna-nest-kafka v0.0.4
NestJS + KafkaJS
Integration of KafkaJS with NestJS to build event driven microservices.
Setup
Import and add the TunaNestKafkaModule
to the imports array of the module for which you would like to use Kafka.
Synchronous Module Initialization
Register the TunaNestKafkaModule
synchronous with the register()
method:
@Module({
imports: [
TunaNestKafkaModule.register([
{
name: 'HERO_SERVICE',
options: {
config: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})
Asynchronous Module Initialization
Register the TunaNestKafkaModule
synchronous with the registerMultiple()
method:
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot(),
TunaNestKafkaModule.registerMultiple([
{
name: 'tuna-app',
config: {
brokers: ['localhost:9092'],
clientId: 'tuna-client-kafka-1',
},
consumer: {
groupId: 'tuna-nest-kafka-grp1',
retryNumber: 1,
},
},
{
name: 'tuna-app-2',
config: {
brokers: ['localhost:9092'],
clientId: 'tuna-client-kafka-2',
},
consumer: {
groupId: 'tuna-nest-kafka-grp3',
retryNumber: 1,
},
},
]),
]
...
})
Asynchronous Module Initialization
Register the TunaNestKafkaModule
synchronous with the registerAsync()
method:
@Module({
imports: [
TunaNestKafkaModule.registerAsync({
useFactory: (configService: ConfigService): TunaKafkaOption => {
return {
config: {
brokers: configService.get('BROKER_URL'),
clientId: configService.get('CLIENT_ID'),
},
consumer: {
groupId: configService.get('GROUP_ID'),
},
};
},
inject: [ConfigService],
}),
]
...
})
Asynchronous Module Initialization
Register the TunaNestKafkaModule
Asynchronous with the registerMultipleAsync()
method:
import { ConfigModule, ConfigService } from '@nestjs/config';
@Module({
imports: [
ConfigModule.forRoot(),
TunaNestKafkaModule.registerMultipleAsync({
useFactory: (configService: ConfigService): TunaKafkaOptions[] => {
return [
{
name: 'app1',
config: {
brokers: configService.get('BROKER_URL'),
clientId: configService.get('CLIENT_ID'),
},
consumer: {
groupId: configService.get('GROUP_ID'),
},
},
];
},
inject: [ConfigService],
}),
]
...
})
Full settings can be found:
Config | Options |
---|---|
client | https://kafka.js.org/docs/configuration |
consumer | https://kafka.js.org/docs/consuming#options |
producer | https://kafka.js.org/docs/producing#options |
serializer | |
deserializer | |
consumeFromBeginning | true/false |
Producing
Send messages back to kafka.
const TOPIC_NAME = 'hero.kill.dragon';
export class Producer {
constructor(
@Inject('HERO_SERVICE') private client: KafkaService
) {}
async post(message: string = 'Hello world'): Promise<RecordMetadata[]> {
const result = await this.client.send({
topic: TOPIC_NAME,
messages: [
{
key: '1',
value: message
}
]
});
return result;
}
}
Consumer
@Injectable()
export class AppService {
getHello(): string {
return 'Hello World!';
}
@KafkaConsumer('tuna_consumer_1') // If don't set config here the consumer will use in common configuration
con1(payload: any, context: KafkaInfo) {
console.log('Message ==> ', payload, 'Context ', context);
throw new Error('Hello World! Error: '); // The function will be retried after failing
}
@KafkaConsumer('tuna_consumer_2', {
groupId: 'tuna-nest-kafka-grp2', // Custom consumer group
retryNumber: 2, // Number of retry
instanceName: 'tuna-app-2' // Specific consumer instance if have multiple instances
})
consumer2(payload: any, context: KafkaInfo) {
console.log('Message ==> ', payload, 'Context ', context);
sleep(10000);
}
@KafkaConsumer('tuna_consumer_3', {
groupId: 'tuna-nest-kafka-grp1', // This consumer will belong to default instance or first instance if have multiple instances
})
consumer3(payload: any, context: KafkaInfo) {
console.log('Message ==> ', payload, 'Context ', context);
sleep(10000);
}
}