1.0.13 • Published 6 years ago

kafka-executor v1.0.13

Weekly downloads
1
License
MIT
Repository
github
Last release
6 years ago

kafka-executor

Listens to topics and executes asynchronous functions able to process each kafka message, ensuring that any processing will succeed, before the corresponding message offset is committed.


Installation

Usage

Documentation


Features

  • Simple API
  • Ensures that all the jobs will be executed successfully before a message will be committed
  • Retry strategy for jobs that fail
  • Graceful shutdown

Installation

Install with yarn or npm

yarn add kafka-executor
        #or
npm install kafka-executor --save

Usage

Basic
import KafkaExecutor, { Job } from 'kafka-executor';

const executor = new KafkaExecutor({
    brokerList:'0.0.0.0:9092,0.0.0.0:9091',
    topics:['topic1','topic2'],
    groupId:'groupId',
});

executor.init();

executor.addJob('myJobId',new Job((kafkaMessage)=>{
    console.log(kafkaMessage);
    return Promise.resolve();
}));

Documentation

Job

import { Job } from 'kafka-executor';

new Job(() => Promise.resolve(), {
    maxRetries?: number,
    retryDelay?: number | (retryNumber: number) => number,
    shouldRetry?: boolean | (err: Error) => boolean,
})
NameRequiredDefaultDescription
maxRetries: numberno3How many times must retry until fail
retryDelay: number | (retryIndex)=>numberno60000 msThe delay between the retries in ms
shouldRetry: boolean | (error)=>booleannotrueDetermines if a job have to retry in case of failure

KafkaExecutor

Options

import KafkaExecutor from 'kafka-executor';

new KafkaExecutor({
    brokerList: string;
    groupId: string;
    topics: string[];
    connectionTimeout: string[];
    checkInterval?: number;
    batchSize?: number;
    errorHandler?: (err: Error[], message:KafkaMessage,commit:Function) => void;
    logger?: (message: string, type: LogType, code?: string) => void;
    maxRetries?: number;
    retryDelay?: number;
    consumer?: object;
})
NameRequiredDefaultDescription
brokerList: stringyes-Initial list of brokers as a CSV list of broker host or host:port
topics: stringyes-The topics that the consumer will listen to
groupId: stringyes-Client group id string. All clients sharing the same group.id belong to the same group
checkInterval: numberno2000How match time to wait until check for new messages in case of dead period
batchSize: numberno1How many messages to process concurrently, Change this according to your error tolerance
errorHandler: (error,kafkaMessage,commit:Function)=>voidnoyesA function responsible for handling job errors. By Default the process will exit with code 1
logger: (message:string, type:'info'|'warn'|'error', code)=>voidnoconsoleA function responsible for logging
consumer: objectno-Options for the consumer see rdkafka configuration options
maxRetries: numberno3Global configuration for all jobs
retryDelay: numberno60000 msGlobal configuration for all jobs

Functions

import KafkaExecutor from 'kafka-executor';

const executor = new KafkaExecutor({
    brokerList: '0.0.0.0:9092';
    groupId: 'group';
    topics: ['topic'];
});

executor.addJob('myJobId',new Job(...))
executor.init() 
executor.removeJob('myJobId') 
executor.on('event',Function) 
executor.shutdown() 
NameDescription
init: (jobId:string)=>Promise)Initialize the kafka-executor and connect consumer with the kafka.
addJob: (jobId:string, new Job(...))=>void)Adds a job in the processing flow.
removeJob: (jobId:string)=>void)removes a job.
on: (jobId:string)=>void)Listens in a variant of events handled by kafka-executor and rdkafka
shutdown: (jobId:string)=>Promise)shutdown the process gracefully ensuring that the pending jobs will finish before exit

Events

EventArgumentsDescription
message.receivedkafkaMessage[]Fires when the consumer gets a message
message.committedkafkaMessageFires when the consumer commits a message
processing.errorkafkaMessage, errorFires when one or more jobs fail
shutdown-Fires when the kafka-executor shutdown

node-rdkafka events

EventDescription
dataWhen using the Standard API consumed messages are emitted in this event.
disconnectedThe disconnected event is emitted when the broker disconnects. This event is only emitted when .disconnect is called. The wrapper will always try to reconnect otherwise.
readyThe ready event is emitted when the Consumer is ready to read messages.
eventThe event event is emitted when librdkafka reports an event (if you opted in via the event_cb option).
event.logThe event.log event is emitted when logging events occur (if you opted in for logging via the event_cb option). You will need to set a value for debug if you want information to send.
event.statsThe event.stats event is emitted when librdkafka reports stats (if you opted in by setting the statistics.interval.ms to a non-zero value).
event.throttleThe event.throttle event is emitted when librdkafka reports throttling.

kafkaMessage

{
    value: Buffer, 
    size: number,
    topic: string, 
    offset: number, 
    partition: number, 
    key: string, 
    timestamp: number
}
NameTypeDescription
valueBuffermessage contents as a Buffer
sizenumbersize of the message, in bytes
topicstringtopic the message comes from
offsetnumberoffset the message was read from
partitionstringpartition the message was on
keynumberkey of the message if present
timestampnumbertimestamp of message creation

error

{
    ...Error,
    jobId: string,
    status?: string,
}
NametypeDescription
jobIdthe failed job
statusthe http status if exists

codes

NameDescription
kafkaErrorLog produced by kafka
connectionErrorLog produced when trying to connect to kafka
jobFailedLog produced by a job
jobRetryLog produced by a job when retries
1.0.13

6 years ago

1.0.12

6 years ago

1.0.11

6 years ago

1.0.10

6 years ago

1.0.9

6 years ago

1.0.8

6 years ago

1.0.7

6 years ago

1.0.6

6 years ago

1.0.5

6 years ago

1.0.4

6 years ago

1.0.3

6 years ago

1.0.2

6 years ago

1.0.1

6 years ago

1.0.0

6 years ago