0.5.0 • Published 3 years ago

@imec/digital-twin-kafka-utils v0.5.0

Weekly downloads
23
License
-
Repository
-
Last release
3 years ago

Build Status

Kafka Connection Manager

Description

Kafka manager is a wrapper around Kafkajs for the Digital Twin project. It's meant to be available publicly from NPM.

How to update this package

First of all you need to be a collaborator of the npm package. Ask david.vermeir@imec.be or ismail.kutlu@imec.be to get access.

  1. Create a branch
  2. Update code
  3. Run yarn build
  4. Run run npm version <patch,minor,major>
  5. Commit changes and the version update
  6. Open PR
  7. Merge PR
  8. Checkout to master
  9. Run npm publish on master

How to use it

Example to create and initialize an instance

import KafkaManager, { ProducerRecord, KafkaMessage } from ".";

interface RoadData{
	name: string,
	age: number
}

interface HobbitData{
	name: string,
	address: string
}

export default class Example {
  kafka!: KafkaManager

	async init() {
    this.kafka = new KafkaManager({ brokers: ['kafka-server:9092']});
    const producerConfig = {} // producer config from kafkajs, optional can be left out
    const kafkaProducer = await this.kafka.createProducer(producerConfig);
    const consumerConfig = {groupId: "hello"};
    const kafkaConsumer = await this.kafka.createConsumer(consumerConfig);
    const topic = 'traffic-loop';

		const record: ProducerRecord = {
			topic,
			messages: [
        {
          headers: {'sequenceId': '2'},
          key: 'remove road',
          value: {name: 'my awesome name', age: 124}
        },
        {
          headers: {'sequenceId': '2'},
          key: 'add hobbit',
          value: {name: 'hobbit', address: 'Esgaroth'}
        }
      ]
	  }

    setInterval(() => {
      console.log('publishing');
      this.kafka.publish(kafkaProducer, record);
    }, 5000)

    function eachBatchFn(messages: KafkaMessage<RoadData|HobbitData>[]) {
      messages.forEach(element => {
        switch (element.key) {
        case "add hobbit":
          console.log("Handle the hobbit message");
          break;
        case "remove road":
          console.log("Handle the road message");
          break;
        default:
          console.log("Handle unexpected default");
        }
      });
      console.log("batch messages", messages)
    }

    function eachMessageFn(message: KafkaMessage<RoadData|HobbitData>) {
      console.log("messages", message)
	    switch (message.key) {
        case "add hobbit":
			    console.log("Handle the hobbit message");
          break;
        case "remove road":
			    console.log("Handle the road message");
          break;
        default:
          console.log("Handle unexpected default");
        }
		}

    // Subscribing to a topic can be done using the two methods below. Note that you cannot have `eachMessage` and `eachBatch` in the same subscription! This will cause kafkajs to only run one of the two.

    // Use for each message subscription
    this.kafka.subscribe<RoadData|HobbitData>(kafkaConsumer, {
      subscribeTopic: {topic},
      eachMessage: eachMessageFn
    });

    // Use batch subscription
    this.kafka.subscribe<RoadData|HobbitData>(kafkaConsumer, {
      subscribeTopic: {topic},
      eachBatch: eachBatchFn,
    });
	}
}
0.5.0

3 years ago

0.4.3

4 years ago

0.4.2

5 years ago

0.4.1

5 years ago

0.4.0

5 years ago

0.3.0

6 years ago

0.2.1

6 years ago

0.2.3

6 years ago

0.2.2

6 years ago

0.2.0

6 years ago

0.1.2

6 years ago

0.1.1

6 years ago

0.1.0

6 years ago