0.0.26 • Published 2 years ago
cda-utils v0.0.26
CDA UTILS
Installation
$ npm install
Usage of Activity Stream
const { KafkaService } = require("cda-utils");
const topics = [YOUR_KAFKA_PRODUCER];
const getProducerConfig = () => {
let conf = {
clientId: `your-client-id`,
brokers: ['hostnames'],
producerProperties: {
transactionalId: "unique-sample-transaction-id",
maxInFlightRequests: 1,
idempotent: true,
},//this config is preffred by kafkajs https://kafka.js.org/docs/transactions
}
if (
process.env.KAFKA_SASL_USERNAME ||
global.Config.broker.kafka.sasl_username
) {
conf = {
...conf,
"sasl.username":
process.env.KAFKA_SASL_USERNAME ||
global.Config.broker.kafka.sasl_username,
"sasl.password":
process.env.KAFKA_SASL_PASSWORD ||
global.Config.broker.kafka.sasl_password,
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "PLAIN",
};
}
return conf;
};
const getConsumerConfig = () => {
let conf = {
clientId: `your-client-id`,
brokers: ['hostnames'],
consumerProperties: {
groupId: `your-group-id`,
readUncommited: false, //only for producer using transactions
},
}
if (
process.env.KAFKA_SASL_USERNAME ||
global.Config.broker.kafka.sasl_username
) {
conf = {
...conf,
"sasl.username":
process.env.KAFKA_SASL_USERNAME ||
global.Config.broker.kafka.sasl_username,
"sasl.password":
process.env.KAFKA_SASL_PASSWORD ||
global.Config.broker.kafka.sasl_password,
"security.protocol": "SASL_PLAINTEXT",
"sasl.mechanisms": "PLAIN",
};
}
return conf;
};
const Kafka = new KafkaService(getProducerConfig());
const kafkaEventDelegator = async (messageReceived) => {
try {
const message = messageReceived.content;
if (message.eventName === "test") {
console.log("test kafka event");
return false; //this will mark message as processed
}
} catch (e) {
return false; //this will push the message for DLQ
console.log(error.response.data);
}
// Do something with the message
};
const kafkaConsumerConfig = {
prefetch: 10,
...getConsumerConfig(),
};
let index= 0;
async function testPushToQueue() {
await Kafka.pushToQueue(
"dev-test",
[
{
eventName: "test",
data: {
val: `hello world ${index} `,
index: 2,
},
},
],
{
"replication-factor": 1,
}
);
await Kafka.commitTransaction(); //commit or abort this so transaction can reach the end state
index++;
}
async function init() {
Kafka.listenQueue(topics, kafkaEventDelegator, kafkaConsumerConfig);
setInterval(testPushToQueue, 10000);
}
try {
init();
} catch (error) {
console.log(error);
}
Usage of Kafka Broker
const cda_utils = require('cda-utils');
const Kafka_Broker = cda_utils.Broker.Kafka;
Mandatory Config for cda_utils
global.Config = {};
global.Config.env = '';
global.Config.cda_utils = {};
global.Config.cda_utils.queueName = 'skuad-events';
global.Config.cda_utils.broker = {};
global.Config.cda_utils.broker.maxQueueSize = '100'
global.Config.cda_utils.broker.kafka = {
brokers: 'localhost:9092',
commitTimeInterval: '1',
maxParallelHandles: '10',
messageProcessingTimeoutMS: '1'
};
};
0.0.26
2 years ago
0.0.25
2 years ago
0.0.24
2 years ago
0.0.23
2 years ago
0.0.22
2 years ago
0.0.21
2 years ago
0.0.20
2 years ago
0.0.19
2 years ago
0.0.18
2 years ago
0.0.17
2 years ago
0.0.16
2 years ago
0.0.15
2 years ago
0.0.14
2 years ago
0.0.13
2 years ago
0.0.12
2 years ago
0.0.11
2 years ago
0.0.10
2 years ago
0.0.9
2 years ago
0.0.8
2 years ago
0.0.7
2 years ago
2.0.7
2 years ago
2.0.6
2 years ago
2.0.5
2 years ago
2.0.4
2 years ago
2.0.3
2 years ago
2.0.2
2 years ago
2.0.1
2 years ago
2.0.0
2 years ago