knafka v0.1.0
Knafka
Overview
Wrapper around node-rdkafka for the usage in Knotel back-end services.
Usage
The module exports a single class Knafka
:
const Knafka = require('...')
const knafka = new Knafka({
'metadata.broker.list': '',
'sasl.password': '',
'sasl.username': '',
'group.id': '',
'client.id': '',
'security.protocol': '',
'sasl.mechanisms': ''
})
Takes in global config. Several properties are set automatically set from env variables (in non-local/dev environments) if not set in the constructor config:
metadata.broker.list
rom ENVKAFKA_BOOTSTRAP_SERVERS
(comma-separated)sasl.password
rom ENVKAFKA_SASL_PASSWORD
sasl.username
from ENVKAFKA_SASL_USERNAME
client.id
from ENVAPP_NAME
security.protocol
toSASL_SSL
sasl.mechanisms
toSCRAM-SHA-256
If environment variable ENV
(defaults to dev
) is either dev
or local
nothing is set and user can set what wants.
The property group.id
is to be set if you will initiate consumers to encompass the group of consumers that are balanced to consume messages.
Logging
Logging is controlled by env LOG_LEVEL
and defaults to info
.
If the ENV KAFKA_DEBUG
is set to true
the code will setup logging for kafka-related:
- ready status
- disconnects
- event errors
- errors
- delivery reports
The instance exposes two main methods:
createConsumer
createProducer
Which can be used to create as many consumers/producers as one wants from the same factory (with the config provided)
Plain Producer
try {
const producer = await knafka.createProducer(name, configOverride, topicConfig)
producer.produce(topicName, -1, Buffer.from(...))
} catch (e) { ... }
Returned is an instance of node-rdkafka Producer.
name
is the unique identifier for this producerconfigOverride
is global config that overrides the config set in the Knafka constructor for this particular instance (or sets some producer-specific config)topicConfig
if topic configtopicName
name of the topic to publish
More details on the produce method in node-rdkafka Producer
Plain Consumer
try {
const consumer = await knafka.createConsumer(name, topics, configOverride, topicConfig)
consumer.on('data', (data) => { ... })
} catch (e) { ... }
Returned is an instance of node-rdkafka KafkaConsumer.
name
is the unique identifier for this producertopics
is the list of topics this instance consumesconfigOverride
is global config that overrides the config set in the Knafka constructor for this particular instance (or sets some consumer-specific config)topicConfig
if topic config
Local setup
For setting up local run of the Confluent Kafka:
cd examples/
docker-compose up -d --build
... and follow from step 2: https://docs.confluent.io/current/quickstart/ce-docker-quickstart.html#step-2-create-ak-topics ... to create topics (there is no need for other steps)
To stop docker:
docker container stop $(docker container ls -a -q -f "label=io.confluent.docker")
Examples:
Cloudkarafka
cloudkarafka example in exmaples/cloud_karafka.js
:
- set
USER_NAME
,TOPIC
(default gets created automatically by thecloudkarafka
service) andKNAFKA_CONFIG
- run
node ./examples/cloud_karafka.js
Local
Local example run in examples/local.js
Follow the guide to setup local and create two topics:
knotel-local-topic-a
knotel-local-topic-b
... on http://localhost:9021/
Then:
node ./examples/local.js
TODO
- tests (require Kafka instance in CI)
5 years ago