1.1.4 • Published 6 months ago

kafka-ts v1.1.4

Weekly downloads
-
License
MIT
Repository
github
Last release
6 months ago

KafkaTS

KafkaTS is a Apache Kafka client library for Node.js. It provides both a low-level API for communicating directly with the Apache Kafka cluster and high-level APIs for publishing and subscribing to Kafka topics.

Supported Kafka versions: 3.6 and later

Installation

npm install kafka-ts

Quick start

Create kafka client

export const kafka = createKafkaClient({
    clientId: 'my-app',
    bootstrapServers: [{ host: 'localhost', port: 9092 }],
});

Consuming messages

const consumer = await kafka.startConsumer({
    groupId: 'my-consumer-group',
    topics: ['my-topic'],
    onBatch: (messages) => {
        console.log(messages);
    },
});

Producing messages

export const producer = kafka.createProducer();

await producer.send([{ topic: 'my-topic', key: 'key', value: 'value' }]);

Low-level API

const cluster = kafka.createCluster();
await cluster.connect();

const { controllerId } = await cluster.sendRequest(API.METADATA, {
    allowTopicAutoCreation: false,
    includeTopicAuthorizedOperations: false,
    topics: [],
});

await cluster.sendRequestToNode(controllerId)(API.CREATE_TOPICS, {
    validateOnly: false,
    timeoutMs: 10_000,
    topics: [
        {
            name: 'my-topic',
            numPartitions: 10,
            replicationFactor: 3,
            assignments: [],
            configs: [],
        },
    ],
});

await cluster.disconnect();

Graceful shutdown

process.once('SIGTERM', async () => {
    await consumer.close(); // waits for the consumer to finish processing the last batch and disconnects
    await producer.close();
});

See the examples for more detailed examples.

Logging

By default KafkaTS logs out using a JSON logger. This can be globally replaced by calling setLogger method (see src/utils/logger.ts)

Retries

By default KafkaTS retries onBatch using an exponential backoff delay up to 5 times (see src/utils/retrier.ts). In case of failure the consumer is restarted.

In case you want to skip failed messages or implement a DLQ-like mechanism, you can overwrite retrier on startConsumer() and execute your own logic onFailure.

Example if you simply want to skip the failing messages:

await kafka.startConsumer({
    // ...
    retrier: createExponentialBackoffRetrier({ onFailure: () => {} }),
});

Partitioning

By default, messages are partitioned by message key or round-robin if the key is null or undefined. Partition can be overwritten by partition property in the message. You can also override the default partitioner per producer instance kafka.createProducer({ partitioner: customPartitioner }).

A simple example how to partition messages by the value in message header x-partition-key:

import type { Partitioner } from 'kafka-ts';
import { defaultPartitioner } from 'kafka-ts';

const myPartitioner: Partitioner = (context) => {
    const partition = defaultPartitioner(context);
    return (message) => partition({ ...message, key: message.headers?.['x-partition-key'] });
};

const producer = kafka.createProducer({ partitioner: myPartitioner });

await producer.send([{ topic: 'my-topic', value: 'value', headers: { 'x-partition-key': '123' } }]);

Motivation

The existing low-level libraries (e.g. node-rdkafka) are bindings on librdkafka, which doesn't give enough control over the consumer logic. The existing high-level libraries (e.g. kafkajs) are missing a few crucial features.

New features compared to kafkajs

  • Static consumer membership - Rebalancing during rolling deployments causes delays. Using groupInstanceId in addition to groupId can avoid rebalancing and continue consuming partitions in the existing assignment.
  • Consuming messages without consumer groups - When you don't need the consumer to track the partition offsets, you can simply create a consumer without groupId and always either start consuming messages from the beginning or from the latest partition offset.
  • Low-level API requests - It's possible to communicate directly with the Kafka cluster using the kafka api protocol.

Configuration

createKafkaClient()

NameTypeRequiredDefaultDescription
clientIdstringfalsenullThe client id used for all requests.
bootstrapServersTcpSocketConnectOpts[]trueList of kafka brokers for initial cluster discovery.
saslSASLProviderfalseSASL provider
sslTLSSocketOptionsfalseSSL configuration.
requestTimeoutnumberfalse60000Request timeout in milliseconds.

Supported SASL mechanisms

  • PLAIN: saslPlain({ username, password })
  • SCRAM-SHA-256: saslScramSha256({ username, password })
  • SCRAM-SHA-512: saslScramSha512({ username, password })

Custom SASL mechanisms can be implemented following the SASLProvider interface. See src/auth for examples.

kafka.startConsumer()

NameTypeRequiredDefaultDescription
topicsstring[]trueList of topics to subscribe to
groupIdstringfalsenullConsumer group id
groupInstanceIdstringfalsenullConsumer group instance id
rackIdstringfalsenullRack id
isolationLevelIsolationLevelfalseIsolationLevel.READ_UNCOMMITTEDIsolation level
sessionTimeoutMsnumberfalse30000Session timeout in milliseconds
rebalanceTimeoutMsnumberfalse60000Rebalance timeout in milliseconds
maxWaitMsnumberfalse5000Fetch long poll timeout in milliseconds
minBytesnumberfalse1Minimum number of bytes to wait for before returning a fetch response
maxBytesnumberfalse1_048_576Maximum number of bytes to return in the fetch response
partitionMaxBytesnumberfalse1_048_576Maximum number of bytes to return per partition in the fetch response
allowTopicAutoCreationbooleanfalsefalseAllow kafka to auto-create topic when it doesn't exist
fromTimestampbigintfalse-1Start consuming messages from timestamp (-1 = latest offsets, -2 = earliest offsets)
onBatch(batch: Message[]) => PromisetrueCallback executed when a batch of messages is received

kafka.createProducer()

NameTypeRequiredDefaultDescription
allowTopicAutoCreationbooleanfalsefalseAllow kafka to auto-create topic when it doesn't exist
partitionerPartitionerfalsedefaultPartitionerCustom partitioner function. By default, it uses a default java-compatible partitioner.

producer.send(messages: Message[], options?: { acks?: -1 | 1 })

NameTypeRequiredDefaultDescription
topicstringtrueTopic to send the message to
partitionnumberfalsenullPartition to send the message to. By default partitioned by key. If key is also missing, partition is assigned round-robin
timestampbigintfalsenullMessage timestamp in milliseconds
keyBuffer | nullfalsenullMessage key
valueBuffer | nulltrueMessage value
headersRecord<string, string>falsenullMessage headers
1.0.2

10 months ago

1.0.1

10 months ago

1.0.0

10 months ago

1.0.3

10 months ago

0.0.6-beta.8

10 months ago

0.0.6-beta.7

10 months ago

0.0.6-beta.6

10 months ago

0.0.6-beta.5

10 months ago

0.0.6-beta.4

10 months ago

0.0.6-beta.3

10 months ago

0.0.6-beta.2

10 months ago

0.0.9-beta.0

10 months ago

0.0.6-beta.1

10 months ago

0.0.6-beta.0

10 months ago

1.1.2-beta.2

9 months ago

1.1.3-beta.0

7 months ago

1.1.2-beta.1

9 months ago

1.1.2-beta.4

9 months ago

1.1.2-beta.3

9 months ago

0.0.9-0

7 months ago

0.0.17-beta.3

10 months ago

1.1.2-beta.0

9 months ago

1.1.1

10 months ago

1.1.0

10 months ago

0.0.17-beta.0

10 months ago

0.0.17-beta.1

10 months ago

0.0.17-beta.2

10 months ago

1.1.4

6 months ago

1.1.3

7 months ago

1.1.2

9 months ago

0.0.16

10 months ago

0.0.17

10 months ago

0.0.13

10 months ago

0.0.14

10 months ago

0.0.8

10 months ago

0.0.5

10 months ago

0.0.6

10 months ago

0.0.4

10 months ago

0.0.1-beta.6

10 months ago

0.0.3

10 months ago

0.0.2

10 months ago

0.0.1-beta.4

11 months ago

0.0.1-beta.3

11 months ago

0.0.1-beta.2

11 months ago

0.0.1-beta.1

11 months ago

0.0.1-beta.0

11 months ago

0.0.3-beta

11 months ago

0.0.2-beta

11 months ago

0.0.1-beta

11 months ago

0.3.0

6 years ago

0.2.0

6 years ago

0.1.0

6 years ago

0.0.12

6 years ago

0.0.11

6 years ago

0.0.10

6 years ago

0.0.9

6 years ago

0.0.7

6 years ago

0.0.1

6 years ago