kafka-ts v1.1.4
KafkaTS
KafkaTS is a Apache Kafka client library for Node.js. It provides both a low-level API for communicating directly with the Apache Kafka cluster and high-level APIs for publishing and subscribing to Kafka topics.
Supported Kafka versions: 3.6 and later
Installation
npm install kafka-tsQuick start
Create kafka client
export const kafka = createKafkaClient({
    clientId: 'my-app',
    bootstrapServers: [{ host: 'localhost', port: 9092 }],
});Consuming messages
const consumer = await kafka.startConsumer({
    groupId: 'my-consumer-group',
    topics: ['my-topic'],
    onBatch: (messages) => {
        console.log(messages);
    },
});Producing messages
export const producer = kafka.createProducer();
await producer.send([{ topic: 'my-topic', key: 'key', value: 'value' }]);Low-level API
const cluster = kafka.createCluster();
await cluster.connect();
const { controllerId } = await cluster.sendRequest(API.METADATA, {
    allowTopicAutoCreation: false,
    includeTopicAuthorizedOperations: false,
    topics: [],
});
await cluster.sendRequestToNode(controllerId)(API.CREATE_TOPICS, {
    validateOnly: false,
    timeoutMs: 10_000,
    topics: [
        {
            name: 'my-topic',
            numPartitions: 10,
            replicationFactor: 3,
            assignments: [],
            configs: [],
        },
    ],
});
await cluster.disconnect();Graceful shutdown
process.once('SIGTERM', async () => {
    await consumer.close(); // waits for the consumer to finish processing the last batch and disconnects
    await producer.close();
});See the examples for more detailed examples.
Logging
By default KafkaTS logs out using a JSON logger. This can be globally replaced by calling setLogger method (see src/utils/logger.ts)
Retries
By default KafkaTS retries onBatch using an exponential backoff delay up to 5 times (see src/utils/retrier.ts). In case of failure the consumer is restarted.
In case you want to skip failed messages or implement a DLQ-like mechanism, you can overwrite retrier on startConsumer() and execute your own logic onFailure.
Example if you simply want to skip the failing messages:
await kafka.startConsumer({
    // ...
    retrier: createExponentialBackoffRetrier({ onFailure: () => {} }),
});Partitioning
By default, messages are partitioned by message key or round-robin if the key is null or undefined. Partition can be overwritten by partition property in the message. You can also override the default partitioner per producer instance kafka.createProducer({ partitioner: customPartitioner }).
A simple example how to partition messages by the value in message header x-partition-key:
import type { Partitioner } from 'kafka-ts';
import { defaultPartitioner } from 'kafka-ts';
const myPartitioner: Partitioner = (context) => {
    const partition = defaultPartitioner(context);
    return (message) => partition({ ...message, key: message.headers?.['x-partition-key'] });
};
const producer = kafka.createProducer({ partitioner: myPartitioner });
await producer.send([{ topic: 'my-topic', value: 'value', headers: { 'x-partition-key': '123' } }]);Motivation
The existing low-level libraries (e.g. node-rdkafka) are bindings on librdkafka, which doesn't give enough control over the consumer logic. The existing high-level libraries (e.g. kafkajs) are missing a few crucial features.
New features compared to kafkajs
- Static consumer membership - Rebalancing during rolling deployments causes delays. Using 
groupInstanceIdin addition togroupIdcan avoid rebalancing and continue consuming partitions in the existing assignment. - Consuming messages without consumer groups - When you don't need the consumer to track the partition offsets, you can simply create a consumer without groupId and always either start consuming messages from the beginning or from the latest partition offset.
 - Low-level API requests - It's possible to communicate directly with the Kafka cluster using the kafka api protocol.
 
Configuration
createKafkaClient()
| Name | Type | Required | Default | Description | 
|---|---|---|---|---|
| clientId | string | false | null | The client id used for all requests. | 
| bootstrapServers | TcpSocketConnectOpts[] | true | List of kafka brokers for initial cluster discovery. | |
| sasl | SASLProvider | false | SASL provider | |
| ssl | TLSSocketOptions | false | SSL configuration. | |
| requestTimeout | number | false | 60000 | Request timeout in milliseconds. | 
Supported SASL mechanisms
- PLAIN: 
saslPlain({ username, password }) - SCRAM-SHA-256: 
saslScramSha256({ username, password }) - SCRAM-SHA-512: 
saslScramSha512({ username, password }) 
Custom SASL mechanisms can be implemented following the SASLProvider interface. See src/auth for examples.
kafka.startConsumer()
| Name | Type | Required | Default | Description | 
|---|---|---|---|---|
| topics | string[] | true | List of topics to subscribe to | |
| groupId | string | false | null | Consumer group id | 
| groupInstanceId | string | false | null | Consumer group instance id | 
| rackId | string | false | null | Rack id | 
| isolationLevel | IsolationLevel | false | IsolationLevel.READ_UNCOMMITTED | Isolation level | 
| sessionTimeoutMs | number | false | 30000 | Session timeout in milliseconds | 
| rebalanceTimeoutMs | number | false | 60000 | Rebalance timeout in milliseconds | 
| maxWaitMs | number | false | 5000 | Fetch long poll timeout in milliseconds | 
| minBytes | number | false | 1 | Minimum number of bytes to wait for before returning a fetch response | 
| maxBytes | number | false | 1_048_576 | Maximum number of bytes to return in the fetch response | 
| partitionMaxBytes | number | false | 1_048_576 | Maximum number of bytes to return per partition in the fetch response | 
| allowTopicAutoCreation | boolean | false | false | Allow kafka to auto-create topic when it doesn't exist | 
| fromTimestamp | bigint | false | -1 | Start consuming messages from timestamp (-1 = latest offsets, -2 = earliest offsets) | 
| onBatch | (batch: Message[]) => Promise | true | Callback executed when a batch of messages is received | 
kafka.createProducer()
| Name | Type | Required | Default | Description | 
|---|---|---|---|---|
| allowTopicAutoCreation | boolean | false | false | Allow kafka to auto-create topic when it doesn't exist | 
| partitioner | Partitioner | false | defaultPartitioner | Custom partitioner function. By default, it uses a default java-compatible partitioner. | 
producer.send(messages: Message[], options?: { acks?: -1 | 1 })
| Name | Type | Required | Default | Description | 
|---|---|---|---|---|
| topic | string | true | Topic to send the message to | |
| partition | number | false | null | Partition to send the message to. By default partitioned by key. If key is also missing, partition is assigned round-robin | 
| timestamp | bigint | false | null | Message timestamp in milliseconds | 
| key | Buffer | null | false | null | Message key | 
| value | Buffer | null | true | Message value | |
| headers | Record<string, string> | false | null | Message headers | 
12 months ago
12 months ago
12 months ago
12 months ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
12 months ago
1 year ago
1 year ago
11 months ago
9 months ago
11 months ago
11 months ago
11 months ago
9 months ago
12 months ago
11 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
8 months ago
8 months ago
11 months ago
12 months ago
12 months ago
12 months ago
12 months ago
12 months ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
1 year ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago
6 years ago