1.0.1 • Published 9 years ago

qiot-io-kafka v1.0.1

Weekly downloads
1
License
MIT
Repository
github
Last release
9 years ago

qiot-io-kafka

A node.js library for consume from and publish messages to Kafka in the QIOT-IO platform.

Consume messages

The consumer fetches JSON messages from a given topic, parses the received JSON payload and invokes a function with the resulting Object.

Features:

  • Let zookeeper manage topic offsets
  • Read from multiple partitions
  • Handle OffsetOutOfRange events
  • Handle LeaderNotAvailable errors
  • Retry when a connection error occurs
  • Retry when a message push fails

When fromBeginning is true, the consumer will fetch messages from beginnig, unless it has read before from the topic. In that case it will read starting from the latest offset read in each topic's partition.

When fromBeginning option is false, the consumer will fetch the current offset of each partition, commit them, and start consuming from there, unless it has read before from the topic.

Consumer usage

const KafkaClient = require('qiot-io-kafka');

const client = new KafkaClient({
  groupId: 'consumer-group-0',
  zookeeperUrl: 'localhost',
  fromBeginning: false
});

client.consumer('some-topic', payload => {
  console.log('Message received: ', payload);
});

// Close client on SIGINT
process.on('SIGINT', () => {
  client.close().then(() => process.exit());
});

Produce messages

KafkaClient is just a tiny wrapper over HighLevelProducer API that let's you cycle over partitions and send a JavaScript object that will be converted to a JSON string.

Producer usage

const KafkaClient = require('qiot-io-kafka');

const client = new KafkaClient({
  zookeeperUrl: 'localhost',
});

client.getKafkaProducer().on('ready', () => {
  client.produce('some-topic', [{ city: 'Paris' }, { city: 'Rome' }])
    .then((result) => console.log('Published', result));
});

// Close client on SIGINT
process.on('SIGINT', () => {
  client.close().then(() => process.exit());
});

Tests

Run tests with:

$ npm run test