no-kafka-slim v0.1.1
no-kafka-slim
no-kafka-slim is Apache Kafka 0.9 client for Node.js with new unified consumer API support. This module is a direct fork of oleksiyk/kafka, but with removed depency for Snappy (due to various problems on Windows).
All methods will return a promise
Using
- download and install Kafka
- create your test topic:
kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic kafka-test-topic --partitions 3 --replication-factor 1- install no-kafka-slim
npm install no-kafka-slimProducer
Example:
var Kafka = require('no-kafka-slim');
var producer = new Kafka.Producer();
return producer.init().then(function(){
return producer.send({
topic: 'kafka-test-topic',
partition: 0,
message: {
value: 'Hello!'
}
});
})
.then(function (result) {
/*
[ { topic: 'kafka-test-topic', partition: 0, offset: 353 } ]
*/
});Send and retry if failed 2 times with 100ms delay:
return producer.send(messages, {
retries: {
attempts: 2,
delay: 100
}
});Accumulate messages into single batch until their total size is >= 1024 bytes or 100ms timeout expires (overwrite Producer constructor options):
producer.send(messages, {
batch: {
size: 1024,
maxWait: 100
}
});
producer.send(messages, {
batch: {
size: 1024,
maxWait: 100
}
});Please note, that if you pass different options to the send() method then these messages will be grouped into separate batches:
// will be sent in batch 1
producer.send(messages, {
batch: {
size: 1024,
maxWait: 100
},
codec: Kafka.COMPRESSION_GZIP
});Producer options:
requiredAcks- require acknoledgments for produce request. If it is 0 the server will not send any response. If it is 1 (default), the server will wait the data is written to the local log before sending a response. If it is -1 the server will block until the message is committed by all in sync replicas before sending a response. For any number > 1 the server will block waiting for this number of acknowledgements to occur (but the server will never wait for more acknowledgements than there are in-sync replicas).timeout- timeout in ms for produce requestclientId- ID of this client, defaults to 'no-kafka-client'connectionString- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'partitioner- function used to determine topic partition for message. If message already specifies a partition, the partitioner won't be used. The partitioner function receives 3 arguments: the topic name, an array with topic partitions, and the message (useful to partition by key, etc.).partitionercan be sync or async (return a Promise).retries- controls number of attempts at delay between them when produce request failsattempts- number of total attempts to send the message, defaults to 3delay- delay in ms between retries, defaults to 1000
codec- compression codec, one of Kafka.COMPRESSION_NONE, Kafka.COMPRESSION_GZIPbatch- control batching (grouping) of requestssize- group messages together into single batch until their total size exceeds this value, defaults to 16384 bytes. Set to 0 to disable batching.maxWait- send grouped messages after this amount of milliseconds expire even if their total size doesn't exceedbatch.sizeyet, defaults to 10ms. Set to 0 to disable batching.
asyncCompression- boolean, use asynchronouse compression instead of synchronous, defaults tofalse
SimpleConsumer
Manually specify topic, partition and offset when subscribing. Suitable for simple use cases.
Example:
var consumer = new Kafka.SimpleConsumer();
// data handler function can return a Promise
var dataHandler = function (messageSet, topic, partition) {
messageSet.forEach(function (m) {
console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
});
};
return consumer.init().then(function () {
// Subscribe partitons 0 and 1 in a topic:
return consumer.subscribe('kafka-test-topic', [0, 1], dataHandler);
});Subscribe (or change subscription) to specific offset and limit maximum received MessageSet size:
consumer.subscribe('kafka-test-topic', 0, {offset: 20, maxBytes: 30}, dataHandler)Subscribe to latest or earliest offsets in the topic/parition:
consumer.subscribe('kafka-test-topic', 0, {time: Kafka.LATEST_OFFSET}, dataHandler)
consumer.subscribe('kafka-test-topic', 0, {time: Kafka.EARLIEST_OFFSET}, dataHandler)Subscribe to all partitions in a topic:
consumer.subscribe('kafka-test-topic', dataHandler)Commit offset(s) (V0, Kafka saves these commits to Zookeeper)
consumer.commitOffset([
{
topic: 'kafka-test-topic',
partition: 0,
offset: 1
},
{
topic: 'kafka-test-topic',
partition: 1,
offset: 2
}
])Fetch commited offset(s)
consumer.fetchOffset([
{
topic: 'kafka-test-topic',
partition: 0
},
{
topic: 'kafka-test-topic',
partition: 1
}
]).then(function (result) {
/*
[ { topic: 'kafka-test-topic',
partition: 1,
offset: 2,
metadata: null,
error: null },
{ topic: 'kafka-test-topic',
partition: 0,
offset: 1,
metadata: null,
error: null } ]
*/
});SimpleConsumer options
groupId- group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0'timeout- timeout for fetch requests, defaults to 100msidleTimeout- timeout between fetch calls, defaults to 1000msminBytes- minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 bytemaxBytes- maximum size of messages in a fetch responseclientId- ID of this client, defaults to 'no-kafka-client'connectionString- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'recoveryOffset- recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSETasyncCompression- boolean, use asynchronouse decompression instead of synchronous, defaults tofalse
GroupConsumer (new unified consumer API)
Specify an assignment strategy (or use no-kafka built-in consistent or round robin assignment strategy) and subscribe by specifying only topics. Elected group leader will automatically assign partitions between all group members.
Example:
var Promise = require('bluebird');
var consumer = new Kafka.GroupConsumer();
var dataHandler = function (messageSet, topic, partition) {
return Promise.map(messageSet, function (m){
console.log(topic, partition, m.offset, m.message.value.toString('utf8'));
// commit offset
return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
});
};
var strategies = [{
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
handler: dataHandler
}];
consumer.init(strategies); // all done, now wait for messages in dataHandlerAssignment strategies
no-kafka provides three built-in strategies:
Kafka.WeightedRoundRobinAssignmentweighted round robin assignment (based on wrr-pool).Kafka.ConsistentAssignmentwhich is based on a consistent hash ring and so provides consistent assignment across consumers in a group based on suppliedmetadata.idandmetadata.weightoptions.Kafka.RoundRobinAssignmentsimple assignment strategy (default).
Using Kafka.WeightedRoundRobinAssignment:
var strategies = {
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
metadata: {
weight: 4
},
fn: Kafka.WeightedRoundRobinAssignment,
handler: dataHandler
};
// consumer.init(strategies)....Using Kafka.ConsistentAssignment:
var strategies = {
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
metadata: {
id: process.argv[2] || 'consumer_1',
weight: 50
},
fn: Kafka.ConsistentAssignment,
handler: dataHandler
};
// consumer.init(strategies)....Note that each consumer in a group should have its own and consistent metadata.id.
Using Kafka.RoundRobinAssignment (default in no-kafka):
var strategies = {
strategy: 'TestStrategy',
subscriptions: ['kafka-test-topic'],
handler: dataHandler
};
// consumer.init(strategies)....You can also write your own assignment strategy function and provide it as fn option of the strategy item.
GroupConsumer options
groupId- group ID for comitting and fetching offsets. Defaults to 'no-kafka-group-v0.9'timeout- timeout for fetch requests, defaults to 100msidleTimeout- timeout between fetch calls, defaults to 1000msminBytes- minimum number of bytes to wait from Kafka before returning the fetch call, defaults to 1 bytemaxBytes- maximum size of messages in a fetch responseclientId- ID of this client, defaults to 'no-kafka-client'connectionString- comma delimited list of initial brokers list, defaults to '127.0.0.1:9092'sessionTimeout- session timeout in ms, min 6000, max 30000, defaults to15000heartbeatTimeout- delay between heartbeat requests in ms, defaults to1000retentionTime- offset retention time in ms, defaults to 1 day (24 3600 1000)startingOffset- starting position (time) when there is no commited offset, defaults toKafka.LATEST_OFFSETrecoveryOffset- recovery position (time) which will used to recover subscription in case of OffsetOutOfRange error, defaults to Kafka.LATEST_OFFSETasyncCompression- boolean, use asynchronouse decompression instead of synchronous, defaults tofalse
GroupAdmin (consumer groups API)
Offes two methods:
listGroups- list existing consumer groupsdescribeGroup- describe existing group by its id
Example:
var admin = new Kafka.GroupAdmin();
return admin.init().then(function(){
return admin.listGroups().then(function(groups){
// [ { groupId: 'no-kafka-admin-test-group', protocolType: 'consumer' } ]
return admin.describeGroup('no-kafka-admin-test-group').then(function(group){
/*
{ error: null,
groupId: 'no-kafka-admin-test-group',
state: 'Stable',
protocolType: 'consumer',
protocol: 'TestStrategy',
members:
[ { memberId: 'group-consumer-82646843-b4b8-4e91-94c9-b4708c8b05e8',
clientId: 'group-consumer',
clientHost: '/192.168.1.4',
version: 0,
subscriptions: [ 'kafka-test-topic'],
metadata: <Buffer 63 6f 6e 73 75 6d 65 72 2d 6d 65 74 61 64 61 74 61>,
memberAssignment:
{ _blength: 44,
version: 0,
partitionAssignment:
[ { topic: 'kafka-test-topic',
partitions: [ 0, 1, 2 ] },
],
metadata: null } },
] }
*/
})
});
});Compression
no-kafka supports Gzip compression.
Enable compression in Producer:
var Kafka = require('no-kafka');
var producer = new Kafka.Producer({
clientId: 'producer',
codec: Kafka.COMPRESSION_GZIP // Kafka.COMPRESSION_NONE, Kafka.COMPRESSION_GZIP
});Alternatively just send some messages with specified compression codec (overwrites codec set in contructor):
return producer.send({
topic: 'kafka-test-topic',
partition: 0,
message: { value: 'p00' }
}, { codec: Kafka.COMPRESSION_GZIP })By default no-kafka will use synchronous compression and decompression (synchronous Gzip is not availble in node < 0.11).
Enable async compression/decompression with asyncCompression options:
Producer:
var producer = new Kafka.Producer({
clientId: 'producer',
asyncCompression: true,
codec: Kafka.COMPRESSION_GZIP
});Consumer:
var consumer = new Kafka.SimpleConsumer({
idleTimeout: 100,
clientId: 'simple-consumer',
asyncCompression: true
});Logging
You can differentiate messages from several instances of producer/consumer by providing unique clientId in options:
var consumer1 = new Kafka.GroupConsumer({
clientId: 'group-consumer-1'
});
var consumer2 = new Kafka.GroupConsumer({
clientId: 'group-consumer-2'
});=>
2016-01-12T07:41:57.884Z INFO group-consumer-1 ....
2016-01-12T07:41:57.884Z INFO group-consumer-2 ....Change the logging level:
var consumer = new Kafka.GroupConsumer({
clientId: 'group-consumer',
logger: {
logLevel: 1 // 0 - nothing, 1 - just errors, 2 - +warnings, 3 - +info, 4 - +debug, 5 - +trace
}
});Send log messages to Logstash server(s) via UDP:
var consumer = new Kafka.GroupConsumer({
clientId: 'group-consumer',
logger: {
logstash: {
enabled: true,
connectionString: '10.0.1.1:9999,10.0.1.2:9999',
app: 'myApp-kafka-consumer'
}
}
});You can overwrite the function that outputs messages to stdout/stderr:
var consumer = new Kafka.GroupConsumer({
clientId: 'group-consumer',
logger: {
logFunction: console.log
}
});