0.0.26 • Published 2 years ago

cda-utils v0.0.26

Weekly downloads
-
License
ISC
Repository
-
Last release
2 years ago

CDA UTILS

Installation

$ npm install

Usage of Activity Stream

const { KafkaService } = require("cda-utils");

const topics = [YOUR_KAFKA_PRODUCER];
const getProducerConfig = () => {
    let conf = {
      clientId: `your-client-id`,
      brokers: ['hostnames'],
      producerProperties: {
        transactionalId: "unique-sample-transaction-id",
        maxInFlightRequests: 1,
        idempotent: true,
      },//this config is preffred by kafkajs https://kafka.js.org/docs/transactions
    }
    if (
      process.env.KAFKA_SASL_USERNAME ||
      global.Config.broker.kafka.sasl_username
    ) {
      conf = {
        ...conf,
        "sasl.username":
          process.env.KAFKA_SASL_USERNAME ||
          global.Config.broker.kafka.sasl_username,
        "sasl.password":
          process.env.KAFKA_SASL_PASSWORD ||
          global.Config.broker.kafka.sasl_password,
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanisms": "PLAIN",
      };
    }
    return conf;
};

const getConsumerConfig = () => {
    let conf = {
      clientId: `your-client-id`,
      brokers: ['hostnames'],
      consumerProperties: {
        groupId: `your-group-id`,
        readUncommited: false, //only for producer using transactions
      },
    }
    if (
      process.env.KAFKA_SASL_USERNAME ||
      global.Config.broker.kafka.sasl_username
    ) {
      conf = {
        ...conf,
        "sasl.username":
          process.env.KAFKA_SASL_USERNAME ||
          global.Config.broker.kafka.sasl_username,
        "sasl.password":
          process.env.KAFKA_SASL_PASSWORD ||
          global.Config.broker.kafka.sasl_password,
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanisms": "PLAIN",
      };
    }
    return conf;
};

const Kafka = new KafkaService(getProducerConfig());
const kafkaEventDelegator = async (messageReceived) => {
  try {
    const message = messageReceived.content;
    if (message.eventName === "test") {
      console.log("test kafka event");
      return false;   //this will mark message as processed
    }
  } catch (e) {
      return false;   //this will push the message for DLQ
      console.log(error.response.data);
  }

  // Do something with the message
};
const kafkaConsumerConfig = {
  prefetch: 10,
  ...getConsumerConfig(),
};
let index= 0;
async function testPushToQueue() {
  await Kafka.pushToQueue(
    "dev-test",
    [
      {
        eventName: "test",
        data: {
          val: `hello world ${index} `,
          index: 2,
        },
      },
    ],
    {
      "replication-factor": 1,
    }
  );
  await Kafka.commitTransaction(); //commit or abort this so transaction can reach the end state
  index++;
}
async function init() {
  Kafka.listenQueue(topics, kafkaEventDelegator, kafkaConsumerConfig);
  setInterval(testPushToQueue, 10000);
}
try {
  init();
} catch (error) {
  console.log(error);
}

Usage of Kafka Broker

const cda_utils = require('cda-utils');
const Kafka_Broker = cda_utils.Broker.Kafka;

Mandatory Config for cda_utils

global.Config = {};
global.Config.env = '';
global.Config.cda_utils = {};
global.Config.cda_utils.queueName = 'skuad-events';
global.Config.cda_utils.broker = {};
global.Config.cda_utils.broker.maxQueueSize = '100'
global.Config.cda_utils.broker.kafka = {
    brokers: 'localhost:9092',
    commitTimeInterval: '1',
    maxParallelHandles: '10',
    messageProcessingTimeoutMS: '1'
};
};
0.0.26

2 years ago

0.0.25

2 years ago

0.0.24

2 years ago

0.0.23

2 years ago

0.0.22

2 years ago

0.0.21

2 years ago

0.0.20

2 years ago

0.0.19

2 years ago

0.0.18

2 years ago

0.0.17

2 years ago

0.0.16

2 years ago

0.0.15

2 years ago

0.0.14

2 years ago

0.0.13

2 years ago

0.0.12

2 years ago

0.0.11

2 years ago

0.0.10

2 years ago

0.0.9

2 years ago

0.0.8

2 years ago

0.0.7

2 years ago

2.0.7

2 years ago

2.0.6

2 years ago

2.0.5

2 years ago

2.0.4

2 years ago

2.0.3

2 years ago

2.0.2

2 years ago

2.0.1

2 years ago

2.0.0

2 years ago