kafka-zookeeper v0.0.9
node-kafka-zookeeper
A high-level client library in Node.js for the Apache Kafka project with Zookeeper integration
Kafka is a persistent, efficient, distributed publish/subscribe messaging system.
Prozess is a Kafka client library used for low-level access from node-kafka-zookeeper.
###Consumer example:
A Zookeeper
object handles broker enumeration and offset storage
var Zookeeper = require('kafka-zookeeper').Zookeeper;
var zk = new Zookeeper({
host: 'kafka00.lan',
port: 2181
});
var onMessages = function(messages, error, cb) {
if (error) return console.error(error);
console.log('Received %d messages', messages.length);
// true - (Acknowledge) Update Zk offsets and continue consuming
// false - (Fail) Resend the same batch in 5 seconds so I don't
// have to put it somewhere. TODO: configure wait
cb(true);
}
// Start consuming
// TODO: Support message filter function argument
zk.consumeTopic('MessageHeaders', 'dcrouse', onMessages);
// Stop consuming
// TODO: Implement
###Utility examples:
The Zookeeper
object also exposes some utility functions - used internally and useful for testing
var _ = require('underscore');
var Zookeeper = require('../lib/Zookeeper');
var zk = new Zookeeper({
host: 'localhost',
port: 2181
});
var topic = 'KafkaTopic', group = 'ConsumerGroup';
// Retrieve all consumer offsets for topic/group
var onConsumerOffsets = function(offsets, error) {
if (error) return console.error('onConsumerOffsets', error);
console.log('Offsets', offsets);
};
zk.getConsumerOffsets(topic, group, onConsumerOffsets);
// Initialize consumer offsets
var onInitializeConsumerOffsets = function(error) {
if (error) return console.error('onInitializeConsumerOffsets', error);
console.log('Consumer offsets initialized');
};
zk.initializeConsumerOffsets(topic, group, onInitializeConsumerOffsets);
###Installation:
npm install kafka-zookeeper
###Checkout the code and run the tests:
git clone https://github.com/devoncrouse/node-kafka-zookeeper.git
cd node-kafka-zookeeper; make test-cov && open coverage.html
###Kafka Compatability matrix:
Versions taken from http://incubator.apache.org/kafka/downloads.html