1.0.1 • Published 11 months ago

ngft-kafka-nestjs v1.0.1

Weekly downloads
-
License
-
Repository
-
Last release
11 months ago

NGFT Kafka-NestJS

Get started

Before you can use the kafka-nestjs module you need to install it first. Make sure that you can access Google Artifact registry.

Install the module:

npm i @ngft/kafka-nestjs

Example / Lib Tester App

see ./example

Module setup

In you AppModule (or another module where you want to use it) you need to import it.

@Module({
  imports: [
    // ..
    KafkaModule.forRoot(/* configurations */),
    // ..
  ],
  // ...
})
export class AppModule {}

The KafkaModule expects these configurations to run correctly:

  • General Kafka Config
  • Schema Registry Config
  • Consumer Kafka Config
  • Producer Kafka Config

Required Configuration

General Kafka Config

see interface KafkaConfig

PropertyDescription
brockersAn array of brockers you want to connect to
isActiveEnable/Disable the whole kafka service
sslWhether you want to use tls encryption
...... all valid kafka.js KafkaConfig attributes

Schema Registry Config

see interface KafkaRegistryConfig

PropertyDescription
hostschema registry uri

Consumer Kafka Config

see interface KafkaConsumerConfig

PropertyDescription
isActiveEnable/Disable event consumption
groupIdservice group identifier
debugVerbose mode to output every Event Received
from Beginningstarts consuming from beginning of the topic
isBlockingRetries an event in case of an exception
...... all valid kafka.js ConsumerConfig attributes

Producer Kafka Config

see interface KafkaProducerConfig

PropertyDescription
isActiveEnable/Disable event publishing
debugVerbose mode to output every Event Published
...... all valid kafka.js ProducerConfig attributes

Hint: Alternatively you can provide your existing kafkaConfig as configuration argument for KafkaModule

@Module({
  imports: [
    // ..
    KafkaModule.forRoot(
      kafkaConfig, 
      registryConfig,
      kafkaProducerConfig, 
      kafkaConsumerConfig
    ),
    // ..
  ],
  // ..
})
export class AppModule {}

Usage

Event Consumption

  • Extend the Consumer class with AbstractKafkaHandler
  • Call super() in the class constuctor
  • Apply the Annotation @KafkaEventHandler to a method
    • @KafkaEventHandler registers the method for all specified kafka event types
    • @KafkaEventHandler can get used multiple times
    • First Argument is the desired topic
    • Second Argument is the desired event type
  • Method will be called with the following parameters depending on the event type:
    • ngft (schema-based): NgftMessageHandlerPayload<T extends KafkaPayload>
@Injectable()
export class AircraftConsumer extends AbstractKafkaHandler {
  constructor() {
    super()
  }

  @KafkaEventHandler(kafkaTopic.aircraft, AIRCRAFT_CREATED)
  @KafkaEventHandler(kafkaTopic.aircraft, AIRCRAFT_UPDATED)
  async onAircraftUpsert(
    data: KafkaEvent<AircraftUpsert>,
    metadata: KafkaMetadata,
    rawMessage: RawKafkaMessage,
  ): Promise<void> {
    console.log(data)
  }

  @KafkaEventHandler(/event.*/, /.*/)
  async catchAllNgftEvents(
    data: KafkaEvent<AircraftUpsert>,
    // schema based events will also provide the schemaId in metadata
    metadata: SchemaBasedKafkaMetadata,
    rawMessage: RawKafkaMessage,
  ): Promise<void> {
    console.log(data)
  }
}

Event Publishing

@Injectable()
export class Service {
  constructor(
    private readonly producer: KafkaProducerService,
  ) {
    const dummydata = {
      id: 123,
      name: 'Hi there',
      createdAt: new Date(),
    };
    const event = {
      meta: {
        topicKey: 'event.my-shiny-topic',
        eventType: 'my-event-type',
        createdAt: dummydata.toISOString()
        eventKey: 'my-event-identifier',
      },
      data: dummydata,
    } as KafkaPublishableMessage;

    this.producer.sendMessage(event);
  }
}

Release Cycle

kafka-nestjs is automagically releases using semantic-release. Whenever a commit is merged into main it will be released according to the detected changes.

1.0.1

11 months ago

1.0.0

12 months ago