hapi-kafka v0.0.4
Hapi-Kafka
Launch Kafka Queue
docker run -p 2181:2181 -p 9092:9092 --env ADVERTISED_HOST=localhost --env ADVERTISED_PORT=9092 --env KAFKA_CREATE_TOPICS=test:1:1 vicentehidalgo/kafka:0.0.1
(Important!) Logs are stored at: /tmp/kafka-logs
Reference:
Parameters
ADVERTISED_HOST
: the external ip for the container, e.g. docker-machine ip \docker-machine active
ADVERTISED_PORT
: the external port for Kafka, e.g. 9092
KAFKA_CREATE_TOPICS
: <topic:n_partitions:repl_factor>
ZK_CHROOT
: the zookeeper chroot that's used by Kafka (without / prefix), e.g. "kafka"
LOG_RETENTION_HOURS
: the minimum age of a log file in hours to be eligible for deletion (default is 168, for 1 week)
LOG_RETENTION_BYTES
: configure the size at which segments are pruned from the log, (default is 1073741824, for 1GB)
NUM_PARTITIONS
: configure the default number of log partitions per topic
This is a plugin to share a common kafka connection across the whole Hapi server.
This version is intended for use with hapi v11.x.x
Registering the plugin
Options:
zkHost
:Producer
Consumer
The Zookeeper host.host
:Consumer
the host to send the messages to.topics
:Consumer
list of topics to be consumedgroupId
:Consumer
the cosumer group ID.id
:Consumer
the consumer ID.encoding
:Consumer
Encoding to use.
Using the plugin
Two objects are exposed by this plugin :
client
: a kafka client connection object that is connected to the kafka instancelibrary
: the kafka module used by this modulename
: the kafka module used by this module
Example
var Hapi = require('hapi');
var config = require('./config/environment');
// Create a server with a host and port
var server;
var setOptions = function () {
var opts = {};
opts.routes = {prefix: config.routes.prefix};
return opts;
};
var init = function () {
return new Promise((resolve, reject) => {
// Create a server with a host and port
server = new Hapi.Server();
server.connection({port: config.port, routes: {cors: true}});
// Register the server and start the application
server.register(
[
{register: require('./routes')},
{register: require('hapi-mongodb'), options: config.mongoSettings},
{register: require('hapi-kafka'),
options: {
host: 'http://' + config.ip + ':' + config.port + config.routes.prefix + '/msg',
zkHost: 'localhost:2181/' ,
topics: [{topic: 'topic1'}],
groupId: 'kafka-node-group',
id: 'my-consumer-id-1',
encoding: 'utf8'
}
},
{register: require('inert')},
{register: require('vision')}
],
setOptions(),
function (err) {
if (err) {
return reject(err);
}
server.start(function (err) {
if (err) {
return reject(err);
}
GlobalModule.setConfigValue('producer', server.plugins['hapi-kafka'].producer);
return resolve(server);
});
}
);
});
};
exports.init = init;
exports.stopServer = stopServer;