0.0.4 • Published 9 years ago
kinesiserapy v0.0.4
Kinesiserapy: a AWS Kinesis event stream interface
This module provides an AWS Kinesis event stream consumer and emitter implementations.
Configuration
AWS credentials and a stream name must be provided to connect to Kinesis. The credentials are passed in an object that must have the following keys
- AWS_ACCESS_KEY_ID
- AWS_SECRET_ACCESS_KEY
- AWS_REGION
The stream name is simply a string.
Emit an event
var kinesiserapy = require('kinesiserapy');
var kinesisAuth = {
AWS_ACCESS_KEY_ID: "the key id",
AWS_SECRET_ACCESS_KEY: "the access key",
AWS_REGION: "the region"
};
var stream = "my-stream";
var emitter = new kinesiserapy.KEmitter(stream,kinesisAuth);
emitter.emit(
{
"key": "value"
},
function(err) {
if (err) {
console.error(err);
} else {
console.log('event emitted');
}
});
Consuming from a stream
The easiest way to consumer from a stream is to list the shards and spawn a new process to consume from each shard.
var cluster = require('cluster');
var kinesiserapy = require('kinesiserapy');
var kinesisAuth = {
AWS_ACCESS_KEY_ID: "the key id",
AWS_SECRET_ACCESS_KEY: "the access key",
AWS_REGION: "the region"
};
var stream = "my-stream";
if (cluster.isMaster) {
var kinfo = new kinesiserapy.KInfo(stream, kinesisAuth);
kinfo.listShards(function(err, shardIds) {
if (err) {
console.error(err);
} else {
shardIds.forEach(function (shardId, index, shardIds) {
cluster.fork({stream: stream, shardId: shardId});
});
}
});
} else if (cluster.isWorker) {
var kConsumer = new kinesiserapy.KConsumer(
process.env.stream,
process.env.shardId,
lambda,
function(err) {
console.error(err)
},
kinesisAuth);
kConsumer.consume();
}
function lambda(data) {
console.log(data);
}
API
kinesiserapy exposes three objects:
- KInfo to obtain information about the stream * listShards(callback) where callback is a fn(err, shardIds). The callback function is called with err null and shardIds being an array of string. If an error occurred, err is not null.
- KConsumer to consume from a shard
- consume() starts an asynchronous consumer that loops
- KEmitter to emit an event
- emit(obj, cb) emits the object obj and calls the callback cb after the event is emitted.
The callback has the signature cb(err, data). err is null unless an error occurred. data contains two keys: SequenceNumber: the sequence number of the emitted event ShardId: the shard on which the event was emitted
- emit(obj, cb) emits the object obj and calls the callback cb after the event is emitted.