5.0.1 • Published 4 years ago

kafka-consumer-manager v5.0.1

Weekly downloads
41
License
MIT
Repository
github
Last release
4 years ago

kafka-consumer-manager

Build Status

NPM NPM

This package is used to simplify the common use of kafka consumer:

  • Supports multiple kafka-consumer-manager by instance creation
  • Provides support for autoCommit: false and throttling by saving messages to queues and working messages by message per partition, (concurrency level equals to the partitions number)
  • Provides an api for kafka consumer offset out of sync check by checking that the offset of the partition is synced to zookeeper
  • Accepts a promise with the business logic each consumed message should go through
  • Accepts a promise function with the business logic of when to pause and when to resume the consuming
  • Provides an api for sending message back to the topic (usually for retries)
  • Prometheus integration for reporting statistics about handling message and offset diff between zookeeper and consumer group

Install

npm install --save kafka-consumer-manager

API

How to use

let KafkaConsumerManager = require('kafka-consumer-manager');
let configuration = {
        KafkaUrl: "localhost:9092",
        GroupId: "some-group-id",
        KafkaConnectionTimeout: 10000,
        KafkaRequestTimeout: 30000,
        KafkaOffsetDiffThreshold: 3,
        Topics: ["TOPIC-A", "TOPIC-B"],
        ResumePauseIntervalMs: 30000,
        ResumePauseCheckFunction: (consumer) => {
            return shouldPauseConsuming(consumer)
        },
        MessageFunction: (msg) => { return handleMessage(msg) },
        MaxMessagesInMemory: 100,
        ResumeMaxMessagesRatio: 0.25,
        CreateProducer: false,
        StartOffset: "earliest"
    };
(async () => {
let kafkaConsumerManager = new KafkaConsumerManager()
await kafkaConsumerManager.init(configuration)
    .then(() => {})
    })()
Configuration
  • KafkaUrl URL of Kafka.
  • GroupId Defines the Consumer Group this process is consuming on behalf of.
  • KafkaConnectionTimeout Max wait time for kafka to connect. (default: 10000ms)
  • KafkaRequestTimeout Max wait time for a kafka request. (default: 30000ms)
  • KafkaOffsetDiffThreshold Tolerance for how far the partition offset of the consumer can be from the real offset, this value is used by the health check to reject in case the offset is out of sync.
  • Topics Array of topics that should be consumed.
  • ResumePauseIntervalMs Interval of when to run the ResumePauseCheckFunction (Optional).
  • ResumePauseCheckFunction Promise that should always be resolve. In case of resolve with true value, the consumer will be resumed, if false it will be paused (Mandatory if ResumePauseIntervalMs provided). this function accepts one param (consumer).
  • MessageFunction Promise that should always be resolve. this function applied to each consumed message, It accepts one param (message). Don't change the original message, it may cause it may cause unstable behaviour in getLastMessage function.
  • ErrorMessageFunction(message, err) optional, a function being called once MessageFunction rejects.

  • FetchMaxBytes The maximum bytes to include in the message set for this partition. This helps bound the size of the response. (Default 1024^2).

  • WriteBackDelay Delay the produced messages by ms. (optional).
  • AutoCommit Boolean, If AutoCommit is false, the consumer will queue messages from each partition to a specific queue and will handle messages by the order and commit the offset when it's done.
  • LoggerName String, the value of consumer_name field of the internal logger, if empty this field will not exist.
  • CreateProducer Boolean, If CreateProducer is true it will create Producer instance.(Default true)
  • ExposePrometheusMetrics Boolean, If true prometheus metrics will be collected and registered in the prom-client *please note that prom-client is a peer-dependency meaning it have to exist in your application.
  • PrometheusHistogramBuckets optional, array of doubles defining bucket size for kafka_request_duration_seconds_bucket in seconds in which the kafka message processing time of your service will be written. default values are: 0.001, 0.003, 0.005, 0.015, 0.03, 0.05, 0.1, 0.15, 0.2, 0.3, 0.4, 0.5
  • ConsumerGroupOffsetCheckerInterval optional, the size of the check consumer group diff offset in milliseconds. default size is 5000.
  • StartOffset optional, a string specifying the offset from which the consumer will start reading messages. Default is set to 'latest'
AutoCommit: true settings
  • MaxMessagesInMemory If enabled, the consumer will pause after having this number of messages in memory, to lower the counter call the finishedHandlingMessage function (Optional).
  • ResumeMaxMessagesRatio If enabled when the consumer is paused it will resume only when MaxMessagesInMemory * ResumeMaxMessagesRatio < CurrentMessagesInMemory, number should be below 1 (Optional).
AutoCommit: false settings
  • ThrottlingThreshold If the consumer will have more messages than this value it will pause, it will resume consuming once the value is below that given threshold`.
  • ThrottlingCheckIntervalMs The interval in ms of when to check if messages are above or below the threshold`.
  • CommitEachMessage Boolean, If CommitEachMessage is false the commit will be each AutoCommitIntervalMs.(Default true)
  • AutoCommitIntervalMs The interval in ms to make commit to the broker, relevant only if CommitEachMessage is false.(Default 5000)

await kafka-consumer-manager.init(configuration)

Init the consumer and the producer, make sure to pass full configuration object else you will get exceptions.

The function returns Promise.

kafka-consumer-manager.validateOffsetsAreSynced()

Runs a check the offset of the partitions are synced and moving as expected by checking progress and zookeeper offsets.

The function returns Promise.

await kafka-consumer-manager.closeConnection()

Closes the connection to kafka, return Promise.

kafka-consumer-manager.pause()

Pause the consuming of new messages.

kafka-consumer-manager.resume()

Resume the consuming of new messages.

kafka-consumer-manager.send(message, topic)

Send a message back to a topic. returns a promise.

kafka-consumer-manager.finishedHandlingMessage()

Decrease the counter of how many messages currently processed in the service, used with combine of the env params: ResumeMaxMessagesRatio and MaxMessagesInMemory Only relevant for autoCommit: true

kafka-consumer-manager.getLastMessage()

Get the last message that the consumer received. Don't change the original message, it may cause unstable behaviour in MessageFunction function.

kafka-consumer-manager.on(eventName, eventHandler)

Listens on the chosen consumer events

Running Tests

Using mocha and nyc

npm test
5.0.1

4 years ago

5.0.0

4 years ago

5.0.0-beta2

4 years ago

5.0.0-beta1

4 years ago

3.2.4

4 years ago

3.2.3

5 years ago

3.2.2

5 years ago

3.2.1

5 years ago

3.1.2

5 years ago

3.2.0

5 years ago

3.1.1

6 years ago

3.1.0

6 years ago

3.0.3

6 years ago

3.0.2

6 years ago

3.0.1

6 years ago

3.0.0

6 years ago

2.0.4

6 years ago

2.0.3

6 years ago

2.0.2

6 years ago

2.0.1

6 years ago

2.0.0

6 years ago

1.0.9

6 years ago

1.0.8

6 years ago

1.0.7

6 years ago

1.0.8-temp3

6 years ago

1.0.8-temp2

6 years ago

1.0.7-temp2

6 years ago

1.0.7-temp

6 years ago

1.0.6

6 years ago

1.0.5

6 years ago

1.0.4

6 years ago

1.0.3

6 years ago

1.0.2

6 years ago

1.0.1

6 years ago

1.0.0

6 years ago

0.9.9

6 years ago

0.9.1

6 years ago

0.9.0

6 years ago

0.1.1

6 years ago

0.1.0

6 years ago