0.1.3 • Published 5 years ago

exp-kafka-listener v0.1.3

Weekly downloads
11
License
ISC
Repository
github
Last release
5 years ago

exp-kafka-listener

Simple stream-based kafka listener based on node-rdkafka. Calculates metrics on lag and group consumption rate.

API

Exposes a single function that returns an object used for streaming messages and consuming.

const kafka = require("exp-kafka-listener");
const listener = listen(options, groupId, topics);
const readStream = listener.readStream;

See examples below for more info.

Options

  • host: Comma-separated list of kafka hosts.
  • username: If set, SASL/PLAIN authentication will be used when connecting.
  • password: Password for SASL authentication.
  • autoCommit: Automatically commit messeges every 5 seconds, default false.
  • fetchSize: Kafka fetch size, default 500.
  • fromOffset: Kafka start offset, default "latest".
  • statsInterval: The rate at which statistics are reported (in ms), default 30000.

Events

The object returned from "listen" is an event emitter that emits the following events:

  • 'ready': Emitted once the listener has successfully connected to the kafka cluster.
  • 'stats': Emitted on a regular interval, supplies an object with the following props
    • lag: Total lag for consumer group
    • messageRate: Message consumption rate for consumer group (will be negative if producers are faster than consumers)
    • error: If an error occured when stats were calculated
    • time: Timestamp when stats were generated

Examples

Manual commits and streams

Use this if you want to be sure that all messages are processed before being committed. Any in-flight messages will be re-sent in case of a process crash/restart. Back-pressure is handled by node js streams so the fetch rate is adjusted to the consumption rate.

const kafka = require("exp-kafka-listener");
const through = require("through2");
const {pipeline} = require("stream");

const kafkaOptions = {
  host: "mykafkahost-1:9200,mykafkahost-2:9200",
  autoCommit: false
}

const listener = kafka.listen("my-group-id", ["my-topic"]);

const msgHandler = through.obj((msg, _encoding, done) => {
  const payload = msg.value;
  someAsyncOperation(payload, (err)) => {
    done(err);
    this.push(msg);
  });
});

const commitHandler = through.obj((msg, _encoding, done) => {
  listener.commit(msg);
  done();
});

pipeline(listener.readStream, msgHandler, commitHandler, (err) {
  throw err || "Stream ended"; // Stream should never end.
});

Autocommit and streams

Use this if you don't care about losing a few in-flight messages during restarts. Messages will be automatically committed every five seconds. Back-pressure is handled by node js streams so the fetch rate is adjusted to the consumtion rate. Therefore the number of in-flight messages is usually low.

const kafka = require("exp-kafka-listener");
const through = require("through2");
const {pipeline} = require("stream");

const kafkaOptions = {
  host: "mykafkahost-1:9200,mykafkahost-2:9200",
  autoCommit: true
}

const listener = kafka.listen("my-group-id", ["my-topic"]);

const msgHandler = through.obj((msg, _encoding, done) => {
  const payload = msg.value;
  someAsyncOperation(payload, (err)) => {
    done(err);
    this.push(msg);
  });
});

pipeline(listener.readStream, msgHandler, (err) {
  throw err || "Stream ended"; // Stream should never end.
});

Autocommit scenario ignoring backpressure

The simplest and fastest of consuming messages. However, backpressure is not dealt with so if consumption is slow many messages left hanging in-flight and likely not redelivered in case of crashes/restarts.

const kafka = require("exp-kafka-listener");

const kafkaOptions = {
  host: "mykafkahost-1:9200,mykafkahost-2:9200",
  autoCommit: true
}

const listener = kafka.listen("my-group-id", ["my-topic"]);
listener.readStream.on("data", (msg) => {
  // .. go to town
});

Further reading

Node js streams: node-rdkafka

0.1.3

5 years ago

0.1.2

5 years ago

0.1.1

5 years ago

0.1.0

5 years ago