0.2.8 • Published 8 years ago

kafka-java-bridge v0.2.8

Weekly downloads
32
License
MIT
Repository
github
Last release
8 years ago

kafka-java-bridge

Built with Grunt Build Status npm version Dependency Status devDependency Status npm downloads NPM license

Nodejs wrapper for the JAVA kafka 0.8 client API.

Motivation

The need to have a production quality kafka0.8 client implementation in Nodejs. Please see:

Installation

  1. Make sure you have java v7 or higher installed
  2. Run npm install kafka-java-bridge

Consumer Example

var HLConsumer = require("kafka-java-bridge").HLConsumer;

var consumerOptions = {
    zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka",
    groupId: "example-consumer-group-id",
    topics: ["example-topic1","example-topic2"],
    getMetadata: true
};

var consumer = new HLConsumer(consumerOptions);

consumer.start(function (err) {
    if (err) {
        console.log("Error occurred when starting consumer. err:", err);
    } else {
        console.log("Started consumer successfully");
    }
});

consumer.on("message", function (msg, metadata) {
    console.log("On message. message:", msg);
    console.log("On message. metadata:", JSON.stringify(metadata));
});

consumer.on("error", function (err) {
    console.log("On error. err:", err);
});

process.on('SIGINT', function() {
    consumer.stop(function(){
       console.log("consumer stopped");
        // Timeout to allow logs to print
        setTimeout(function(){
            process.exit();
        } , 300);
    });
});

Producer Example

var StringProducer = require('kafka-java-bridge').StringProducer;
var BinaryProducer = require('kafka-java-bridge').BinaryProducer;

var stringProducer = new StringProducer({bootstrapServers: "broker1:9092, broker2:9092"});
var binaryProducer = new BinaryProducer({zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka"});

const buf = new Buffer([0x0, 0x1, 0x2, 0x3, 0x4]);
binaryProducer.send("myBinaryTopic", buf, function(err, msgMetadata){
    console.log('send msg cb. err = ' + err + '. metadata = ' + JSON.stringify(msgMetadata));
});
stringProducer.send("myStringTopic", "testString", function(err, msgMetadata){
    console.log('send msg cb. err = ' + err + '. metadata = ' + JSON.stringify(msgMetadata));
});

process.on('SIGINT', function() {
    stringProducer.close(function(err){
        binaryProducer.close(function(err) {
            process.exit();
        });
    });
});

Performance and stability

Performance

Libraries compared:

  • kafka-java-bridge , this package.
  • kafka-node, available High Level Consumer for kafka0.8.
  1. We show below representative cpu consumption (lower is better) for processing same amount of messages per second(~11K).

image 1.

Library nameCPU% average
kafka-java-bridge11.76
kafka-node73
  1. Consumer comparision (number of messages). Tested with 16GB Ram, 4 core machine on Amazon AWS EC2 Instance. (Metircs measured with Newrelic)
Library nameRpm AvgNetwork AvgCpu/System Avg
kafka-java-bridge947K300 Mb/s6.2%
kafka-node87.5K75 Mb/s11.2%

Kakfa-Java-Bridge RPM Kafka-Node RPM

Stability

Kafka-java-bridge wraps Confluent's official Java High Level Consumer.

While testing kafka-node we encountered multiple issues such as:

Those issues along side with the inadequate performance results where the trigger for developing this library.

API

HLConsumer(options)

Consumer object allows messages fetching from kafka topic. Each consumer can consume messages from multiple topics.

var consumerOptions = {
    zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka",
    groupId: "example-consumer-group-id",
    topic: "example-topic",
    serverPort: 3042,// Optional
    threadCount: 1,// Optional
    properties: {"rebalance.max.retries": "3"}// Optional
};
Option nameMandatoryTypeDefault valueDescription
zookeeperUrlYesStringundefinedZookeeper connection string.
groupIdYesStringundefinedKafka consumer groupId. From kafka documentation: groupId is a string that uniquely identifies the group of consumer processes to which this consumer belongs. By setting the same group id multiple processes indicate that they are all part of the same consumer group.
topicNoStringundefinedKafka topic name.
getMetadataNobooleanfalseIf true, message metadata(topic, partition, offset) will be provided with each message. Use false for better performance.
topicsYesArray of StringundefinedKafka topics names array.
serverPortNoNumber3042Internal server port used to transfer the messages from the java thread to the node js thread.
threadCountNoNumber1The threading model revolves around the number of partitions in your topic and there are some very specific rules. For More information: kafka consumer groupsgetMetadataNoBooleanfalseGet message metadata (contains topic, partition and offset ).
propertiesNoObjectundefinedProperties names can be found in the following table: high level consumer properties.

Events emitted by the HLConsumer:

  • message: this event is emitted when a message is consumed from kafka.
  • error: this event is emitted when an error occurs while consuming messages.

hlConsumer.start(cb)

Start consumer messages from kafka topic.

cb - callback is called when the consumer is started.

If callback was called with err it means consumer failed to start.

hlConsumer.stop(cb)

Stop consuming messages.

cb - callback is called when the consumer is stopped.

message/error events can still be emitted until stop callback is called.

StringProducer(options) / BinaryProducer(options)

Producer object produces messages to kafka. With each message topic is specified so one producer can produce messages to multiple topics.

StringProducer should be used to send string messages. BinaryProducer should be used to send binary messages.

var producerOptions = {
    zookeeperUrl: "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181/kafka",
    properties: {"client.id": "kafka-java-bridge"}// Optional
};
OR 
var producerOptions = {
    bootstrapServers: "kafka:2181,kafka2:2181,kafka3:2181/kafka",
    properties: {"client.id": "kafka-java-bridge"}// Optional
};
Option nameMandatoryTypeDefault valueDescription
bootstrapServersNOStringundefinedKafka broker connection string.
zookeeperUrlNoStringundefinedZookeeper connection string. If provided, broker list will be retrieved from standard path.
propertiesNoObjectundefinedProperties names can be found in the following table: high level producer properties.

producer.send(topic, msg, cb)

topic - target topic name String.

msg - message to be sent to kafka String or Buffer.

cb - callback is called when message is sent. with err in case of failure or msg metadata in case of success.

producer.sendWithKey(topic, msg, key, cb)

topic - target topic name String.

msg - message to be sent to kafka String or Buffer.

key - kafka message key String or Buffer.

cb - callback is called when message is sent. with err in case of failure or msg metadata in case of success.

producer.sendWithKeyAndPartition(topic, msg, key, partition, cb)

topic - target topic name String.

msg - message to be sent to kafka String or Buffer.

key - kafka message key String or Buffer.

partition - target partition Integer.

cb - callback is called when message is sent. with err in case of failure or msg metadata in case of success.

Adding Your Own Jars To Classpath

If you wish to add jars to the classpath, it can be done by placing them at:

{app root path}/kafka-java-bridge/java/lib/yourjar.jar

Java Tier Logging

By default, underlying java tier logging is disabled.

If you wish to enable java tier logging you can place your own log4j.properties file at:

{app root path}/kafka-java-bridge/log4j/log4j.properties

Troubleshooting

In case of installation failure, you may want to take a look at our dependency java npm installation and troubleshooting sections.

If you are working on a windows machine, you may want to look at windows-build-tools for native code compilation issues.

Sources

License

MIT

0.2.8

8 years ago

0.2.7

8 years ago

0.2.6

8 years ago

0.2.5

8 years ago

0.2.4

8 years ago

0.2.3

8 years ago

0.2.2

8 years ago

0.2.1

8 years ago

0.2.0

8 years ago

0.1.12

8 years ago

0.1.11

8 years ago

0.1.10

9 years ago

0.1.8

9 years ago

0.1.7

9 years ago

0.1.6

9 years ago

0.1.5

9 years ago

0.1.4

9 years ago

0.1.3

9 years ago

0.1.2

9 years ago

0.1.1

9 years ago

0.1.0

9 years ago