0.0.4 • Published 9 years ago

kinesiserapy v0.0.4

Weekly downloads
-
License
LGPL-3.0+
Repository
-
Last release
9 years ago

Kinesiserapy: a AWS Kinesis event stream interface

This module provides an AWS Kinesis event stream consumer and emitter implementations.

Configuration

AWS credentials and a stream name must be provided to connect to Kinesis. The credentials are passed in an object that must have the following keys

  • AWS_ACCESS_KEY_ID
  • AWS_SECRET_ACCESS_KEY
  • AWS_REGION

The stream name is simply a string.

Emit an event

var kinesiserapy = require('kinesiserapy');

var kinesisAuth = {
    AWS_ACCESS_KEY_ID: "the key id",
    AWS_SECRET_ACCESS_KEY: "the access key",
    AWS_REGION: "the region"
};
var stream = "my-stream";
var emitter = new kinesiserapy.KEmitter(stream,kinesisAuth);

emitter.emit(
    {
        "key": "value"
    },
    function(err) {
        if (err) {
            console.error(err);
        } else {
            console.log('event emitted');
        }
    });

Consuming from a stream

The easiest way to consumer from a stream is to list the shards and spawn a new process to consume from each shard.

var cluster = require('cluster');
var kinesiserapy = require('kinesiserapy');

var kinesisAuth = {
    AWS_ACCESS_KEY_ID: "the key id",
    AWS_SECRET_ACCESS_KEY: "the access key",
    AWS_REGION: "the region"
};
var stream = "my-stream";

if (cluster.isMaster) {
	var kinfo = new kinesiserapy.KInfo(stream, kinesisAuth);
	kinfo.listShards(function(err, shardIds) {
		if (err) {
			console.error(err);
		} else {
			shardIds.forEach(function (shardId, index, shardIds) {
				cluster.fork({stream: stream, shardId: shardId});
			});
		}
	});
} else if (cluster.isWorker) {
	var kConsumer = new kinesiserapy.KConsumer(
		process.env.stream,
		process.env.shardId,
		lambda,
		function(err) {
			console.error(err)
		},
		kinesisAuth);
	kConsumer.consume();
}

function lambda(data) {
    console.log(data);
}

API

kinesiserapy exposes three objects:

  • KInfo to obtain information about the stream * listShards(callback) where callback is a fn(err, shardIds). The callback function is called with err null and shardIds being an array of string. If an error occurred, err is not null.
  • KConsumer to consume from a shard
    • consume() starts an asynchronous consumer that loops
  • KEmitter to emit an event
    • emit(obj, cb) emits the object obj and calls the callback cb after the event is emitted.
      The callback has the signature cb(err, data). err is null unless an error occurred. data contains two keys: SequenceNumber: the sequence number of the emitted event ShardId: the shard on which the event was emitted