1.1.4 • Published 5 months ago

kafka-ts v1.1.4

Weekly downloads
-
License
MIT
Repository
github
Last release
5 months ago

KafkaTS

KafkaTS is a Apache Kafka client library for Node.js. It provides both a low-level API for communicating directly with the Apache Kafka cluster and high-level APIs for publishing and subscribing to Kafka topics.

Supported Kafka versions: 3.6 and later

Installation

npm install kafka-ts

Quick start

Create kafka client

export const kafka = createKafkaClient({
    clientId: 'my-app',
    bootstrapServers: [{ host: 'localhost', port: 9092 }],
});

Consuming messages

const consumer = await kafka.startConsumer({
    groupId: 'my-consumer-group',
    topics: ['my-topic'],
    onBatch: (messages) => {
        console.log(messages);
    },
});

Producing messages

export const producer = kafka.createProducer();

await producer.send([{ topic: 'my-topic', key: 'key', value: 'value' }]);

Low-level API

const cluster = kafka.createCluster();
await cluster.connect();

const { controllerId } = await cluster.sendRequest(API.METADATA, {
    allowTopicAutoCreation: false,
    includeTopicAuthorizedOperations: false,
    topics: [],
});

await cluster.sendRequestToNode(controllerId)(API.CREATE_TOPICS, {
    validateOnly: false,
    timeoutMs: 10_000,
    topics: [
        {
            name: 'my-topic',
            numPartitions: 10,
            replicationFactor: 3,
            assignments: [],
            configs: [],
        },
    ],
});

await cluster.disconnect();

Graceful shutdown

process.once('SIGTERM', async () => {
    await consumer.close(); // waits for the consumer to finish processing the last batch and disconnects
    await producer.close();
});

See the examples for more detailed examples.

Logging

By default KafkaTS logs out using a JSON logger. This can be globally replaced by calling setLogger method (see src/utils/logger.ts)

Retries

By default KafkaTS retries onBatch using an exponential backoff delay up to 5 times (see src/utils/retrier.ts). In case of failure the consumer is restarted.

In case you want to skip failed messages or implement a DLQ-like mechanism, you can overwrite retrier on startConsumer() and execute your own logic onFailure.

Example if you simply want to skip the failing messages:

await kafka.startConsumer({
    // ...
    retrier: createExponentialBackoffRetrier({ onFailure: () => {} }),
});

Partitioning

By default, messages are partitioned by message key or round-robin if the key is null or undefined. Partition can be overwritten by partition property in the message. You can also override the default partitioner per producer instance kafka.createProducer({ partitioner: customPartitioner }).

A simple example how to partition messages by the value in message header x-partition-key:

import type { Partitioner } from 'kafka-ts';
import { defaultPartitioner } from 'kafka-ts';

const myPartitioner: Partitioner = (context) => {
    const partition = defaultPartitioner(context);
    return (message) => partition({ ...message, key: message.headers?.['x-partition-key'] });
};

const producer = kafka.createProducer({ partitioner: myPartitioner });

await producer.send([{ topic: 'my-topic', value: 'value', headers: { 'x-partition-key': '123' } }]);

Motivation

The existing low-level libraries (e.g. node-rdkafka) are bindings on librdkafka, which doesn't give enough control over the consumer logic. The existing high-level libraries (e.g. kafkajs) are missing a few crucial features.

New features compared to kafkajs

  • Static consumer membership - Rebalancing during rolling deployments causes delays. Using groupInstanceId in addition to groupId can avoid rebalancing and continue consuming partitions in the existing assignment.
  • Consuming messages without consumer groups - When you don't need the consumer to track the partition offsets, you can simply create a consumer without groupId and always either start consuming messages from the beginning or from the latest partition offset.
  • Low-level API requests - It's possible to communicate directly with the Kafka cluster using the kafka api protocol.

Configuration

createKafkaClient()

NameTypeRequiredDefaultDescription
clientIdstringfalsenullThe client id used for all requests.
bootstrapServersTcpSocketConnectOpts[]trueList of kafka brokers for initial cluster discovery.
saslSASLProviderfalseSASL provider
sslTLSSocketOptionsfalseSSL configuration.
requestTimeoutnumberfalse60000Request timeout in milliseconds.

Supported SASL mechanisms

  • PLAIN: saslPlain({ username, password })
  • SCRAM-SHA-256: saslScramSha256({ username, password })
  • SCRAM-SHA-512: saslScramSha512({ username, password })

Custom SASL mechanisms can be implemented following the SASLProvider interface. See src/auth for examples.

kafka.startConsumer()

NameTypeRequiredDefaultDescription
topicsstring[]trueList of topics to subscribe to
groupIdstringfalsenullConsumer group id
groupInstanceIdstringfalsenullConsumer group instance id
rackIdstringfalsenullRack id
isolationLevelIsolationLevelfalseIsolationLevel.READ_UNCOMMITTEDIsolation level
sessionTimeoutMsnumberfalse30000Session timeout in milliseconds
rebalanceTimeoutMsnumberfalse60000Rebalance timeout in milliseconds
maxWaitMsnumberfalse5000Fetch long poll timeout in milliseconds
minBytesnumberfalse1Minimum number of bytes to wait for before returning a fetch response
maxBytesnumberfalse1_048_576Maximum number of bytes to return in the fetch response
partitionMaxBytesnumberfalse1_048_576Maximum number of bytes to return per partition in the fetch response
allowTopicAutoCreationbooleanfalsefalseAllow kafka to auto-create topic when it doesn't exist
fromTimestampbigintfalse-1Start consuming messages from timestamp (-1 = latest offsets, -2 = earliest offsets)
onBatch(batch: Message[]) => PromisetrueCallback executed when a batch of messages is received

kafka.createProducer()

NameTypeRequiredDefaultDescription
allowTopicAutoCreationbooleanfalsefalseAllow kafka to auto-create topic when it doesn't exist
partitionerPartitionerfalsedefaultPartitionerCustom partitioner function. By default, it uses a default java-compatible partitioner.

producer.send(messages: Message[], options?: { acks?: -1 | 1 })

NameTypeRequiredDefaultDescription
topicstringtrueTopic to send the message to
partitionnumberfalsenullPartition to send the message to. By default partitioned by key. If key is also missing, partition is assigned round-robin
timestampbigintfalsenullMessage timestamp in milliseconds
keyBuffer | nullfalsenullMessage key
valueBuffer | nulltrueMessage value
headersRecord<string, string>falsenullMessage headers
1.0.2

8 months ago

1.0.1

8 months ago

1.0.0

8 months ago

1.0.3

8 months ago

0.0.6-beta.8

9 months ago

0.0.6-beta.7

9 months ago

0.0.6-beta.6

9 months ago

0.0.6-beta.5

9 months ago

0.0.6-beta.4

9 months ago

0.0.6-beta.3

9 months ago

0.0.6-beta.2

9 months ago

0.0.9-beta.0

9 months ago

0.0.6-beta.1

9 months ago

0.0.6-beta.0

9 months ago

1.1.2-beta.2

7 months ago

1.1.3-beta.0

5 months ago

1.1.2-beta.1

7 months ago

1.1.2-beta.4

7 months ago

1.1.2-beta.3

7 months ago

0.0.9-0

5 months ago

0.0.17-beta.3

8 months ago

1.1.2-beta.0

7 months ago

1.1.1

8 months ago

1.1.0

8 months ago

0.0.17-beta.0

8 months ago

0.0.17-beta.1

8 months ago

0.0.17-beta.2

8 months ago

1.1.4

5 months ago

1.1.3

5 months ago

1.1.2

7 months ago

0.0.16

9 months ago

0.0.17

8 months ago

0.0.13

9 months ago

0.0.14

9 months ago

0.0.8

9 months ago

0.0.5

9 months ago

0.0.6

9 months ago

0.0.4

9 months ago

0.0.1-beta.6

9 months ago

0.0.3

9 months ago

0.0.2

9 months ago

0.0.1-beta.4

9 months ago

0.0.1-beta.3

9 months ago

0.0.1-beta.2

9 months ago

0.0.1-beta.1

9 months ago

0.0.1-beta.0

9 months ago

0.0.3-beta

9 months ago

0.0.2-beta

9 months ago

0.0.1-beta

9 months ago

0.3.0

6 years ago

0.2.0

6 years ago

0.1.0

6 years ago

0.0.12

6 years ago

0.0.11

6 years ago

0.0.10

6 years ago

0.0.9

6 years ago

0.0.7

6 years ago

0.0.1

6 years ago