@vpriem/kafka-broker v2.5.0
kafka-broker
Easily compose and manage your kafka resources in one place.
A wrapper around KafkaJS heavily inspired from Rascal.
Table of contents
- Install
- Concepts
- Configuration
- Publishing
- Consuming
- Encoding
- Schema registry
- Typescript
- Dead letter
- Shutdown
- Advanced configuration
- Running tests
- License
Install
yarn add @vpriem/kafka-broker
# or
npm install @vpriem/kafka-brokerConcepts
This library is using two concepts: Publications and Subscriptions.
A Publication is a named configuration to produce messages to a certain topic with a specific producer and options.
A Subscription is a named configuration to consumer messages from certain topics with a specific consumer group and options.
Configuration
In order to publish or consume messages, you need first to configure your broker:
import { Broker } from '@vpriem/kafka-broker';
const broker = new Broker({
namespace: 'my-service',
config: {
brokers: [process.env.KAFKA_BROKER as string],
},
publications: {
'to-my-topic': 'my-long-topic-name',
},
subscriptions: {
'from-my-topic': 'my-long-topic-name',
},
});This will create a default producer to produce messages to the topic named my-long-topic-name
and a consumer with the consumer group my-service.from-my-topic to consume from that topic.
Connections are lazy and will be first established once published or subscribed.
This is equivalent of doing:
const broker = new Broker({
namespace: 'my-service',
config: {
clientId: 'my-service',
brokers: [process.env.KAFKA_BROKER as string],
},
producers: {
default: {},
},
publications: {
'to-my-topic': {
topic: 'my-long-topic-name',
producer: 'default',
},
},
subscriptions: {
'from-my-topic': {
topics: ['my-long-topic-name'],
consumer: {
groupId: 'my-service.from-my-topic',
},
},
},
});Publishing
In order to publish messages to a topic you need to refer to his publication name:
await broker.publish('to-my-topic', { value: 'my-message' });
// Or mutilple messages:
await broker.publish('to-my-topic', [
{ value: 'my-message' },
{ value: 'my-message' },
]);This will publish to my-long-topic-name using the default producer.
Multiple topics
You can publish simultaneously to multiple topics:
new Broker({
// ...
publications: {
'to-multiple-topic': {
topic: ['my-first-topic', 'my-second-topic'],
},
},
});
await broker.publish('to-multiple-topic', { value: 'my-message' });This will publish to my-first-topic and my-second-topic in a batch.
Batching
You can configure the producers to automatically batch messages.
This will delay sending messages so that more messages can be sent into a single request.
This can significantly reduce the number of requests made to the cluster, and improve performance overall.
new Broker({
// ...
producers: {
default: {
batch: {
size: 150,
lingerMs: 100,
}
},
},
});size: maximum number of messages to batch in one requestlingerMs: maximum duration to fill the batch
If size is reached, the batch request will be sent immediately regardless of lingerMs.
Otherwise, the producer will wait up to lingerMs to send the batch request.
⚠️ Be aware that a large size might increase memory usage and a high lingerMs might increase latency.
Because the batch request can contain multiple topics, the options acks, compression and timeout have to be provided on the batch level, this also means that any provided config on the publications level will be discarded:
new Broker({
// ...
producers: {
default: {
batch: {
size: 150,
lingerMs: 100,
acks: 1,
compression: CompressionTypes.GZIP,
}
},
},
publications: {
'to-my-topic': {
topic: 'my-long-topic-name',
config: { acks: 0 }, // this will be ignored
},
},
});Consuming
In order to start consuming messages from a topic you need to subscribe and run the subscription:
await broker
.subscription('from-my-topic')
.on('message', (value, message, topic, partition) => {
// ...
})
.run();This will consume messages from my-long-topic-name using the consumer group my-service.from-my-topic.
Or you can just run all subscriptions:
await broker
.subscriptionList()
.on('message', (value) => {
// Consume from all registered topics
})
.on('message.my-long-topic-name', (value) => {
// Or from "my-long-topic-name" only
})
.run();Handlers
Handlers are a different approach to consume messages by using small functions equivalent to lambdas and can help to structure your code better by splitting them into small files:
const handler: Handler = async (value) => {
await myAsyncOperation(value);
};
const broker = new Broker({
// ...
subscriptions: {
'from-my-topic': {
topics: ['my-long-topic-name'],
handler,
},
},
});
await broker.subscription('from-my-topic').run();Consumer group
Consumer group are important and needs to be unique across applications.
If no groupId is specified in the subscription configuration, the name is auto generated as following [namespace].[subscription]:
const broker = new Broker({
namespace: 'my-service',
// ...
subscriptions: {
'from-my-topic': 'my-long-topic-name',
},
});This will create and use the consumer group my-service.from-my-topic for the topic my-long-topic-name.
Parallelism
⚠️ Experimental ⚠️
kakfa-broker is build on top of kafkajs eachMessage
which is consuming one message at a time.
Sometimes you might want to speed up consumption in special cases where you don't care about message order.
The library come with 2 built-in parallelism modes build on top of kafkajs eachBatch:
all-at-once: all messages of a batch are consumed at once, in parallel.by-partition-key: same asall-at-onceexcept that all messages of the same partition key are consumed in series, one after the other. Messages without a partition key are grouped together.
Be aware that those 2 modes can affect your workload.
const broker = new Broker({
namespace: 'my-service',
// ...
subscriptions: {
'from-my-topic': {
topic: 'my-long-topic-name',
parallelism: 'by-partition-key'
},
},
});Error handling
KafkaJs will restart consumer on errors that are considered as "retriable" (see restartOnFailure) but not on errors considered as "non-retriable".
The broker instance will emit those "non-retriable" errors as error events.
Encoding
JSON
Published objects are automatically encoded to JSON.
await broker.publish('to-my-topic', { value: { id: 1 } });A content-type: application/json header is added to the message headers
in order to automatically decode messages as object on the consumer side:
await broker
.subscription('from-my-topic')
.on('message', (value) => {
console.log(value.id); // Print 1
})
.run();AVRO
Not supported yet.
Plain text
For string messages, a content-type: text/plain header is added to the message headers
and automatically decoded as string on the consumer side:
await broker.publish('to-my-topic', { value: 'my-value' });
await broker
.subscription('from-my-topic')
.on('message', (value) => {
console.log(value); // Print "my-value"
})
.run();Enforce contentType
In some case you might have messages produced by another applications without the content-type header being set.
You can enforce the decoding on your side by specifying the contentType in the subscription configuration:
const broker = new Broker({
// ...
subscriptions: {
'from-json-topic': {
topics: ['my-long-json-topic-name'],
contentType: 'application/json',
},
},
});Schema registry
Schema registry is supported and can be configured as following:
const broker = new Broker({
// ...
schemaRegistry: process.env.SCHEMA_REGISTRY_URL as string,
// or
schemaRegistry: {
host: process.env.SCHEMA_REGISTRY_URL as string,
options: {
/* SchemaRegistryOptions */
},
},
});For the full configuration please refer to @kafkajs/confluent-schema-registry.
Producers need to specify the schema registry id or subject/version in the publication config:
const broker = new Broker({
// ...
publications: {
'to-my-topic': {
topic: 'my-long-topic-name',
schema: 1, // equivalent to { id: 1 }
},
// or
'to-my-topic': {
topic: 'my-long-topic-name',
schema: 'my-subject', // equivalent to { subject: 'my-subject', version: 'latest' }
},
},
});By doing this, a content-type: application/schema-registry header will be added to the message,
in order to automatically decode messages using schema registry on the consumer side.
Accessing the registry:
const registry = broker.schemaRegistry();
await registry?.register(/* Schema */);Typescript
Typescript generics are supported for more type safety:
interface MyEvent {
id: number;
}
await broker
.subscription('from-my-topic')
.on<MyEvent>('message', ({ id }) => {
console.log(id); // Print 1
})
.run();
// or with an handler
const MyHandler: Handler<MyEvent> = async ({ id }) => {
console.log(id); // Print 1
};
await broker.publish<MyEvent>('to-my-topic', { value: { id: 1 } });Dead letter
Unprocessed messages due to error can be send to a dead letter topic to be analysed later:
const broker = new Broker({
// ...
publications: {
'to-my-topic': 'my-long-topic-name',
'to-my-topic-dlx': 'my-long-topic-name-dlx',
},
subscriptions: {
'from-my-topic': {
topics: ['my-long-topic-name'],
deadLetter: 'to-my-topic-dlx',
},
},
});Note that the deadLetter is the publication name and not the topic itself.
Shutdown
It is important to shutdown the broker to disconnect all producers and consumers:
await broker.shutdown();Advanced configuration
Using defaults
const broker = new Broker({
// ...
defaults: {
producer: {
/* KafkaProducerConfig to be applyed to all producers */
},
consumer: {
/* KafkaConsumerConfig to be applyed to all consumers */
},
},
});Topic alias:
const broker = new Broker({
// ...
subscriptions: {
'from-all-topics': [
{ topic: 'my-long-topic-name-1', alias: 'my-topic1' },
{ topic: 'my-long-topic-name-2', alias: 'my-topic2' },
],
},
});
await broker
.subscription('from-all-topics')
.on('message', (value) => {
// Consume from "my-long-topic-name-1" and "my-long-topic-name-2"
})
.on('message.my-topic1', (value) => {
// Consume from "my-long-topic-name-1" only
})
.on('message.my-topic2', (value) => {
// Consume from "my-long-topic-name-2" only
})
.run();Or with handlers:
const broker = new Broker({
// ...
subscriptions: {
'from-all-topics': [
{
topic: 'my-long-topic-name-1',
handler: async (value) => {
// Consume from "my-long-topic-name-1" only
},
},
{
topic: 'my-long-topic-name-2',
handler: async (value) => {
// Consume from "my-long-topic-name-2" only
},
},
],
},
});
await broker.subscriptionList().run();Multiple producers
You can define multiple producers, configure them differently and reuse them across publications:
const broker = new Broker({
// ...
producers: {
'producer-1': {
/* KafkaProducerConfig */
},
'producer-2': {
/* KafkaProducerConfig */
},
},
publications: {
'to-my-topic-1': {
topic: 'my-long-topic-name-1',
producer: 'producer-1',
},
'to-my-topic-2': {
topic: 'my-long-topic-name-2',
producer: 'producer-2',
},
},
});
// This will use "producer-1"
await broker.publish('to-my-topic-1', { value: 'my-message-to-topic-1' });
// This will use "producer-2"
await broker.publish('to-my-topic-2', { value: 'my-message-to-topic-2' });Multiple brokers
You can also build a Broker from resources coming from multiple kafka instances:
const broker = new Broker({
namespace: 'my-service',
brokers: {
public: {
config: {
brokers: [process.env.KAFKA_PUBLIC_BROKER as string],
},
publications: {
'my-topic': 'my-long-topic-name',
},
subscriptions: {
'my-topic': 'my-long-topic-name',
},
},
private: {
config: {
brokers: [process.env.KAFKA_PRIVATE_BROKER as string],
},
publications: {
'my-topic': 'my-long-topic-name',
},
subscriptions: {
'my-topic': 'my-long-topic-name',
},
},
},
});
await broker
.subscription('public/my-topic')
.on('message', (value) => {
// Consume from public only
})
.run();
await broker
.subscription('private/my-topic')
.on('message', (value) => {
// Consume from private only
})
.run();
await broker
.subscriptionList()
.on('message', (value) => {
console.log(value); // Consume from public and private
})
.run();
await broker.publish('public/my-topic', { value: 'my-public-message' });
await broker.publish('private/my-topic', { value: 'my-private-message' });Full configuration
const broker = new Broker({
namespace: 'my-service',
defaults: {
// optional
producer: {
/* KafkaProducerConfig */
}, // optional
consumer: {
/* KafkaConsumerConfig */
}, // optional
},
config: {
/* KafkaConfig */
},
schemaRegistry: {
// optional
host: 'http://localhost:8081',
options: {
/* SchemaRegistryOptions */
}, // optional
},
producers: {
// optional
[name]: {
/* KafkaProducerConfig */
}, // optional
},
publications: {
[name]: 'my-topic',
[name]: {
topic: 'my-topic',
producer: 'my-producer', // optional, default to "default"
config: {
/* ProducerRecord */
}, // optional
messageConfig: {
/* MessageConfig */
}, // optional
schemaId: 1, // optional
},
},
subscriptions: {
[name]: 'my-topic',
[name]: [
'my-topic',
{
topic: 'my-topic',
alias: 'my-topic-alias', // optional
handler: () => {}, // optional
},
],
[name]: {
topics: [
'my-topic',
{
topic: 'my-topic',
alias: 'my-topic-alias', // optional
handler: () => {}, // optional
},
],
consumer: {
/* ConsumerConfig */
}, // optional
runConfig: {
/* RunConfig */
}, // optional
handler: () => {}, // optional
contentType: 'application/json', // optional
},
},
});Running tests
yarn install
yarn k up
yarn k testLicense
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
3 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
4 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago