0.3.5 • Published 8 years ago

kafka-node-topic-consumer v0.3.5

Weekly downloads
3
License
MIT
Repository
github
Last release
8 years ago

kafka-node-topic-consumer

wrapper around kafka-node's HighLevelConsumer that provides error handling and message processing concurrency control via fastq (a lightweight fast queue modeled after async's queue).

Installing

npm install --save kafka-node kafka-node-topic-consumer

Purpose

There are two main motivations for this module:

  1. There are known issues with the high level consumer api in kafka 0.8. Often when starting consumers too quickly after a failure or too near in time to another member of the same group, rebalancing issues are experienced. To help alleviate these issues, the TopicConsumer will self heal when an error is encountered by the underlying HighLevelConsumer by first attempting to close the existing consumer before removing it and scheduling a rebuild at a random time in the near future (30-90 seconds). The rebuild process is infinite, in that if it fails, it will restart the healing process.
  2. Although kafka guarantees ordering within a partition, kafka-node's HighLevelConsumer' resembles a sort of firehose, emitting messages as soon as they arrive, regardless of how fast the application is able to process them. To control this issue, the TopicConsumer implements an in memory queue which processes a single batch of messages at a time. As soon as the underlying consumer emits the first message of a newly received batch, it pauses the consumer and pushes all messages into the queue. Once the last message has been processed, it resumes consuming messages.

Getting Started

import TopicConsumer from 'kafka-node-topic-consumer';

// create a new TopicConsumer
const consumer = new TopicConsumer({
  host: process.env.ZOOKEEPER_HOST,
  consumer: { groupId: 'my-consumer-group' },
  topic: 'my-topic',
});

consumer.registerWorker((msg) => {
  console.log(msg);
  return Promise.resolve();
});

consumer.on('message:error', (err, msg) => {
  console.error(err, msg);
});

consumer.connect()

API

constructor(options) => TopicConsumer

instantiate a new topic consumer

Params
nametypedescription
optionsObjectconstructor options
options.concurrencyNumbernumber of tasks to be processed at any given time, default is 1
options.consumerObjectconsumer options
options.consumer.groupIdStringconsumer group id
options.hostStringzookeeper connection string
options.parseFunctiona function (raw) => Promise for parsing raw kafka messages before they are pushed into the queue. the default parse function will attempt to parse the raw message's value attribute as utf-8 stringified json and add it as the parsedValue attribute on the message
options.rebuildObjectrebuild configuration
options.rebuild.closingObjectvalid retry options for closing failed consumers
options.rebuild.maxDelayNumber, Stringthe maximum time to wait before rebuilding, default is 2m
options.rebuild.minDelayNumber, Stringthe minimum time to wait before rebuilding, default is 35s
options.topicString, Objecttopic name or payload
options.validateFunctiona function (parsed) => Promise for validating queue messages. Messages that fail validation will not be processed by workers
Example
import Bluebird from 'bluebird';
import joi from 'joi';
import TopicConsumer from 'kafka-node-topic-consumer';

const consumer = new TopicConsumer({
  host: process.env.ZOOKEEPER_HOST,
  consumer: {
    groupId: 'my-group-id'
  },
  topic: 'my-topic',
  parse(raw) {
    return Bluebird.try(() => {
      return JSON.parse(raw.value.toString('utf8'));
    });
  },
  validate(parsed) {
    const schema = joi.object({
      id: joi.string().guid().required(),
      action: joi.string().valid('create', 'destroy', 'update').required(),
      data: joi.object().required(),
    });
    const result = joi.validate(parsed, schema);
    if (result.error) {
      return Promise.reject(result.error);
    }
    return Promise.resolve(result.value);
  },
});

connect(done) => Promise

Wait for a new consumer to register

Params
nametypedescription
doneFunctionoptional callback
Example
consumer.connect(err => {});

consumer.connect()
.then(() => {})
.catch(err => {});

consumer

the underlying HighLevelConsumer instance


queue

the underlying queue instance


getStatus() => Object

get current status

Returns
{
  "consumer": {
    "groupId": "my-consumer-group",
    "initialized": false,
    "ready": true,
    "closing": false,
    "paused": false,
    "rebalancing": false,
    "topicPayloads": [
      {
        "topic": "my-topic",
        "partition": "6",
        "offset": 39,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "7",
        "offset": 19,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "8",
        "offset": 16,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "9",
        "offset": 28,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "10",
        "offset": 14,
        "maxBytes": 1048576,
        "metadata": "m"
      },
      {
        "topic": "my-topic",
        "partition": "11",
        "offset": 33,
        "maxBytes": 1048576,
        "metadata": "m"
      }
    ]
  },
  "queue": {
    "idle": true,
    "length": 0
  },
  "status": "up"
}

registerWorker(worker)

register a new worker function

Params
nametypedescription
workerFunctiona function worker(parsed) => Promise that is passed every (valid) message for processing
Example
consumer.registerWorker(parsed => {
  return Promise.resolve();
});

Events

the TopicConsumer extends from the EventEmitter class and emits the following lifecycle events:

eventdescription
consumer:closing-errorfired (err) when all attempts to close a failed consumer have failed
consumer:commit-errorfired (err) when an error is encountered commiting offsets
consumer:connectingfired when a new consumer instance is waiting to connect/register
consumer:errorfired (err) anytime the underlying consumer emits an error
consumer:offset-out-of-rangefired when underlying consumer encounters an OffsetOutOfRangeError
consumer:pausingfired when first message is pushed into queue and underlying consumer is paused
consumer:rebuild-initiatedfired when the rebuild process has been initiated
consumer:rebuild-scheduledfired (delayInSeconds) when the rebuild has been scheduled
consumer:rebuild-startedfired when the rebuild has started
consumer:resumingfired when last task in queue has been processed and underlying consumer is resuming
consumer:startingfired after a new consumer has registered and is beginning to fetch messages
message:processingfired (parsed) when the queue has started processing a message
message:skippedfired (parsed, reason) when a message fails validation
message:successfired (parsed, results) when a message has been successfully processed
message:errorfired (err, parsed) when a worker rejects

Testing

Requires docker 1.8+ and docker-compose 1.12+

docker-compose up

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

License

Copyright (c) 2016 Gaia

Licensed under the MIT license

0.3.5

8 years ago

0.3.4

8 years ago

0.3.3

8 years ago

0.3.2

8 years ago

0.3.1

8 years ago

0.3.0

8 years ago

0.2.1

8 years ago

0.2.0

8 years ago

0.1.2

8 years ago

0.1.1

8 years ago

0.1.0

8 years ago