knafka v0.1.0
Knafka
Overview
Wrapper around node-rdkafka for the usage in Knotel back-end services.
Usage
The module exports a single class Knafka:
const Knafka = require('...')
const knafka = new Knafka({
'metadata.broker.list': '',
'sasl.password': '',
'sasl.username': '',
'group.id': '',
'client.id': '',
'security.protocol': '',
'sasl.mechanisms': ''
})Takes in global config. Several properties are set automatically set from env variables (in non-local/dev environments) if not set in the constructor config:
metadata.broker.listrom ENVKAFKA_BOOTSTRAP_SERVERS(comma-separated)sasl.passwordrom ENVKAFKA_SASL_PASSWORDsasl.usernamefrom ENVKAFKA_SASL_USERNAMEclient.idfrom ENVAPP_NAMEsecurity.protocoltoSASL_SSLsasl.mechanismstoSCRAM-SHA-256
If environment variable ENV (defaults to dev) is either dev or local nothing is set and user can set what wants.
The property group.id is to be set if you will initiate consumers to encompass the group of consumers that are balanced to consume messages.
Logging
Logging is controlled by env LOG_LEVEL and defaults to info.
If the ENV KAFKA_DEBUG is set to true the code will setup logging for kafka-related:
- ready status
- disconnects
- event errors
- errors
- delivery reports
The instance exposes two main methods:
createConsumercreateProducer
Which can be used to create as many consumers/producers as one wants from the same factory (with the config provided)
Plain Producer
try {
const producer = await knafka.createProducer(name, configOverride, topicConfig)
producer.produce(topicName, -1, Buffer.from(...))
} catch (e) { ... }Returned is an instance of node-rdkafka Producer.
nameis the unique identifier for this producerconfigOverrideis global config that overrides the config set in the Knafka constructor for this particular instance (or sets some producer-specific config)topicConfigif topic configtopicNamename of the topic to publish
More details on the produce method in node-rdkafka Producer
Plain Consumer
try {
const consumer = await knafka.createConsumer(name, topics, configOverride, topicConfig)
consumer.on('data', (data) => { ... })
} catch (e) { ... }Returned is an instance of node-rdkafka KafkaConsumer.
nameis the unique identifier for this producertopicsis the list of topics this instance consumesconfigOverrideis global config that overrides the config set in the Knafka constructor for this particular instance (or sets some consumer-specific config)topicConfigif topic config
Local setup
For setting up local run of the Confluent Kafka:
cd examples/
docker-compose up -d --build... and follow from step 2: https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html#step-2-create-ak-topics ... to create topics (there is no need for other steps)
To stop docker:
docker container stop $(docker container ls -a -q -f "label=io.confluent.docker")Examples:
Cloudkarafka
cloudkarafka example in exmaples/cloud_karafka.js:
- set
USER_NAME,TOPIC(default gets created automatically by thecloudkarafkaservice) andKNAFKA_CONFIG - run
node ./examples/cloud_karafka.js
Local
Local example run in examples/local.js
Follow the guide to setup local and create two topics:
knotel-local-topic-aknotel-local-topic-b
... on http://localhost:9021/
Then:
node ./examples/local.jsTODO
- tests (require Kafka instance in CI)
6 years ago