0.1.0 • Published 9 years ago

express-kafka-producer v0.1.0

9 years ago

Middleware to send Node.js + Express requests direct to Kafka

add express-kafka-producer to you package.json


Check the example app for a working example on how to use the middleware

Running kafka

There is a docker-compose configured to make tests easier


Use as a middleware in your express app

expressProducer = require('express-kafka-producer');

var kafka = {
  producer: {
    topic: 'my-node-topic'

app.get('/', expressProducer(kafka), function(req, res) { });


The options available are

  • verbose (boolean): Enables log to the kafka client, messages being publish and errors, all logs will start with the > KAFKA MIDDLEWARE string. Needless to say that it should be false in production
  • client (object or kafka-node.Client): Settings that deal with kafka server connection. If a kafka-node.Client object is provided, it will used it instead of creating a new connection.
urlstringKafka server URL. Default value
client_idstringKafka client_id. Default value kafka-node-produce
  • producer (object): Producer options
topic (required)stringTopic to send messages to
attributesstringCompression attribute. Check kafka-node docs for possible values
settingsobjectValues used to handle producer acks and publish timeouts. Check kafka node' HighLevelProducer for possible values
partitionstringPartition to send message to
partitionerPartitionerPartioner object to be used (description bellow). Default for keyed messages will be lib/lib/default-partitioner.js which get a fixed partition number for key if partition number never changes, making log compaction easier to use.

Partitioner (object):

partitionfunction(key, numberOfPartitions) -> NumberObject function called before publishing message and should return an absolute Number that will be used as the partition number (so it should NOT be greater than the param numberOfPartitions received)
  • key (function): If provided, it is the function (function(req, res, callback) {}) that is called to generate a key for the message before being sent to kafka topic. callback expected to be called in the format callback(error, key), where key is expected to be a string
  • message (function): If provided, it is the function (function(req, res, callback) {}) that is called to generate a custom message payload to be sent to kafka topic. callback expected to be called in the format callback(error, message), where message is expected to be an object or string. If not provided, the message will have just some keys from express request object

  • messages (object): Options to be used to handle automatic message generated from express request object

whitelistarray of stringkeys to be added to the message before being sent to kafka. Possible vales available at express docs
blacklistarray of stringkeys to be excluded to the message before being sent to kafka. Possible vales available at express docs
  • error (function): If provided, it is the function (function(err, req, res, callback) {}) that is called when any error happens inside the middleware while sending message to kafka, generating message object or after custom callbacks call the callback with an error object. If not provided, any error will be ignored and the next middlewares in the stack will be called normally.

  • parse_to_json (boolean): Kafka client needs messages (and key, if provided) to be sent as string. Add this options to use JSON.stringify for automatic key and messages parsing

  • batch (object): Batching message options. If enabled, each payload sent to kafka will contain an array of messages sent to kafka

enabledbooleanIf batching messages is enabled
payloadnumberMaximum number of messager that will be pilled up before sending it to kafka. Default value is 1000.
timeoutnumberTime (in ms) between message flushes to kafka. This ensures that messages are sent to kafka after a certain period of time even if the number of messages received is smaller than the payload. Default value is 500ms


$ npm install
$ node run test



Example App:

  • Add example app docs
  • Use node-foreman
  • Add example of a worker consuming data from kafka to show data properly being added
  • Some kind of performance test