avrokado v0.5.3
Avrokado
:avocado: A Kafka client and Avro (de)serializer library
Table of Contents
Installation
To install, use your favourite dependency manager.
The package name is avrokado.
npm i avrokado --save
yarn add avrokado --exactUsage
For examples, please refer to the examples folder.
SchemaRegistry
This will fetch the key and value schemas for a topicName.
Class Signature
new SchemaRegistry (
endpoint: string,
topics: ReadonlyArray<string> | string,
version: number | 'latest' | 'all'
) => SchemaRegistry;Where:
- endpoint: Endpoint for your Schema Registry;
- topics: Name of the topics (
Array) or topic (String) you want to retrieve the schemas for; - version: It can be either:
- A
number, which will then force the function to only fetch that version; all, which means it will fetchallversions of the schemas;latest, which will fetch only thelatestschema versions.
- A
Fields
- schemas: Object containing the loaded schemas.
Methods
load
async load() => Promise<void>;The load method will load all the schemas selected to memory, and can be accessed through the schemas field from the instanced class.
Best Practices
It is recommended to load the schemas BEFORE creating your Consumer or Producer.
AvroConsumer
This will create a consumer stream using node-rdkafka.
Please check their DOCUMENTATION since most of the options are from this library.
Class Signature
new AvroConsumer(
conf: Object,
topicConf: Object,
schemas: TopicsSchemas
) => AvroConsumer;Where:
- consumerConfiguration:
librdkafka's consumer-specific configuration; - defaultTopicConfiguration:
librdkafka's default topic configuration; - streamOpts:
librdkafka's read stream options; - schemas: An object with all
keyandvalueschemas (return fromloadSchemas).
Returns a AvroConsumer, which extends from Readable stream.
Fields
- stream: This is a
ConsumerStreamobject fromnode-rdkafka, which has another fieldconsumerfor theKafkaConsumeritself (yes it's ugly).
Events Emitted
| Event name | Trigger/Description |
|---|---|
avro | Whenever a message is parsed with Avro |
ready | When the Consumer Stream is created |
event.error | Wraps ConsumerStream.consumer's event.error event |
And any other event emitted by a ConsumerStream from node-rdkafka.
API
Specifically for avro event emitted, it should be expected a AvroMessage type, which contains:
| Variable | Description |
|---|---|
value | The raw value buffer |
key | The raw key buffer |
size | Size in bytes of the raw message |
topic | Name of the topic |
offset | Offset in which the message is |
partition | Partition from the topic |
timestamp | When the message was retrieved |
valueSchemaId | Schema ID for the value |
keySchemaId | Schema ID for the key |
parsedValue | Avro-deserialized value (from value) |
parsedKey | Avro-deserialized key (from key) |
Notes
- To use the
KafkaConsumermethods, for now you will need to doAvroConsumer.stream.consumer.
AvroProducer
This will create a producer using node-rdkafka.
Please check their DOCUMENTATION since most of the options are from this library.
Class Signature
new AvroProducer(
conf: Object,
topicConf: Object,
schemas: TopicsSchemas
) => AvroProducer;Where:
- conf:
librdkafka's producer-specific configuration; - topicConf?:
librdkafka's default topic configuration; - schemas: An object with all
keyandvalueschemas (return fromloadSchemas).
Returns a AvroProducer, which extends from Producer.
Methods
connect
connect(
metadataOption: Object = {}
) => Promise<true | Error>;The connect method will connect to the Kafka broker and await until a connection is successfully made or an error is thrown.
produce
produce(
topic: string,
partition?: number,
message?: unknown,
key?: unknown,
sendRaw?: boolean,
timestamp?: number,
opaque?: unknown
) => void;The produce method will produce a message to Kafka. If sendRaw is set to true, the message WILL NOT be avro encoded.
disconnect
disconnect(
timeout: number = 5000
) => Promise<true | Error>;The disconnect method will disconnect from the Kafka broker and await until it is gracefully interrupted.
Tests
- Install
Docker; - Install
docker-compose; - Start up the images with
docker-compose up -dand make sure zookeeper, kafka and schema-registry are all running; - Run
npm run testoryarn test.
TODO
- Improve in-code documentation.