0.2.1 • Published 4 years ago
cloudevents-kafka v0.2.1
CloudEvents JS SDK Kafka
Kafka transport extension for CloudEvents JS SDK
Description
Allow serialise and deserialise CloudEvents for kafka protocol.
- Based on Kafka Protocol Binding for CloudEvents - Version 1.0.1
- Support Partitioning
- Tested with KafkaJS, but probably will work with any other client.
- Currently support only structured mode
- Supports CloudEvent versions 0.3, 1.0
- Correctly works with official CloudEvents JavaScript SDK starting from v5.
- Have zero dependencies.
- Strict typescript usage
Installation
npm install cloudevents cloudevents-kafka kafkajs
# or
yarn add cloudevents cloudevents-kafka kafkajsNote: For examples will be used kafkajs, but you can use any other client library.
Usage
Strict CloudEvent
Default CloudEvent constructor do not strictly check input object,
for enable strict mode this library have two classes.
import { Version, CloudEvent } from 'cloudevents'
import {CloudEventStrict, CloudEventStrictV1} from "cloudevents-kafka"
// Will throw runtime exceptions about missing `id` field
const ce = new CloudEvent({
specversion: Version.V1,
source: 'some-source',
// id: 'some-id',
type: 'message.send'
})
// Will show typescript error during compilation about missing `id` field
const ces = new CloudEventStrict({
specversion: Version.V1,
source: 'some-source',
// id: 'some-id',
type: 'message.send'
})
// Will show typescript error during compilation about missing `id` field
const cev1 = new CloudEventStrictV1({
source: 'some-source',
// id: 'some-id',
type: 'message.send'
})Receiving Events
If received valid KafkaMessage it will be dedeserialized as CloudEvent
kafka = new Kafka({
clientId: 'test-app',
brokers: ['kafka:9092']
})
consumer = kafka.consumer({ groupId: 'test-group' })
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
await consumer.run({
eachMessage: async ({message}: EachMessagePayload) => {
const receivedEvent = CeKafka.deserialize(message)
console.log(receivedEvent); // will be valid CloudEvent
}
})Emitting Events
CloudEvent will be serialised as KafkaMessage object, which will contain key, value, header and timestamp fields, which yhou can send using any client.
import { Version } from 'cloudevents'
import {Consumer, EachMessagePayload, Kafka, Producer} from 'kafkajs'
import * as CeKafka from "cloudevents-kafka"
const {CloudEventStrict} = CeKafka
kafka = new Kafka({
clientId: 'test-app',
brokers: ['kafka:9092']
})
producer = kafka.producer()
await producer.connect()
const ce = new CloudEventStrict({
specversion: Version.V1,
source: 'some-source',
id: 'some-id',
type: 'message.send'
})
const messsage = CeKafka.structured(ce)
await producer.send({
topic: 'test-topic',
messages: [
messsage,
],
})Patitions Key
For define key property of kafka message you can add partitionkey field to your event
const ce = new CloudEvent({
specversion: Version.V1,
source: 'some-source',
id: 'some-id',
type: 'message.send',
partitionkey: 'some-partitionkey'
})
const messsage = CeKafka.structured(ce)
console.log(message.key) // some-partitionkeyDevelopment
Firstly save alias in /etc/hosts
127.0.0.1 localhost kafkaStart kafka
docker-compose up kafkaInstall dependencies
yarnRun tests
yarn test