1.2.8 • Published 6 years ago

datastreamer v1.2.8

Weekly downloads
90
License
-
Repository
-
Last release
6 years ago

DataStreamer

Data streamer for streaming analytics test platforms. Takes batch CSV,JSON data and streams to targets (Kafka,HTTP).

Installation

npm.io

DataStreamer at NPM (version 1.2.7)

Install with NPM -> npm install datastreamer

Details of DataStreamer

DataStreamer(configName,lineListener,pauseListener,resumeListener,streamListener,extraFields);

Parameters

  • configName: Name or path of config file in JSON format without extension (if your config file 'config.json', configName will be 'config')

  • lineListener: Callback function which will be triggered when every line readed from file. This function takes these parameters:

    • fileStream: File stream for resume, pause and close the stream.

    • fieldNames: Field names of data. You can add extra fields via give field names as array to extraFields parameter of DataStreamer constructor.

    • fieldValues: Field values of data. You can add values of extra fields to this array

    • jsonGenerator: Json generator for corresponding data schema which has given with config file's dataSchema attribute.

    function lineListener(fileStream,fieldNames,fieldValues,jsonGenerator) { /* your implementation */ }
  • pauseListener: Callback function which will be triggered when stream paused.
    function pauseListener() { /* your implementation */ }
  • resumeListener: Callback function which will be triggered when stream resumed.
    function resumeListener() { /* your implementation */ }
  • streamListener: Callback function which will be triggered when data streamed to Kafka. These function takes kafkaBuffer and fileStream as parameters:

    • kafkaBuffer: File stream writes data to this buffer to send data to Kafka and DataStreamer reads this buffer and sends to Kafka.
    • fileStream: File stream for resume, pause and close the stream.
    function streamListener(kafkaBuffer,fileStream) { /* your implementation */ }

Usage of DataStreamer

var DataStreamer = require('datastreamer');
var begin = Date.now();

var vars = {
    "queue": [],
    "timestamp": begin,
    "tx_id": 1
};

var dataStreamer = new DataStreamer("paysim-config", // config file name without file extension (.json mandatory)
                                    lineListener,
                                    null,
                                    null,
                                    null,
                                    ["timestamp","tx_id"]);
                                    
function lineListener(fileStream,fieldNames,fieldValues,jsonGenerator) {
    fieldValues.push(vars["timestamp"] + vars["tx_id"]);
    fieldValues.push(vars["tx_id"]);

    var now = Date.now();
    var timestamp = vars["timestamp"] + vars["tx_id"];

    if (timestamp < now) { 
       dataStreamer.pushToKafka(jsonGenerator.generateJSON(fieldNames, fieldValues));
    } else {
        vars["queue"].push({ "timestamp": timestamp, "data": jsonGenerator.generateJSON(fieldNames,fieldValues) });
    }

    ++vars["tx_id"];
}

function checkSendingTime() {
    var currentTime = Date.now();

    for (var i = 0; i >= 0 && i < vars["queue"].length; ++i) {
        if (vars["queue"][i]["timestamp"] <= currentTime) {
            dataStreamer.pushToKafka(vars["queue"][i]["data"]);
            vars["queue"].splice(i,1);
            --i;
        }
    }

    setTimeout(checkSendingTime,1);
}

dataStreamer.startStream();
checkSendingTime();

Configuration of DataStreamer

Config for Kafka target

{
  "filename": "nyc-fraud.json",
  "dataSchema": "integer,string,double,string,double,double,string,double,double,integer,integer",
  "chunkSize": 1000,
  "triggerInterval": 200,
  "loggerEnabled": false,
  "target": {
    "type": "kafka",
    "config": {
	    "topic": "nyc-fraud.poc",
	    "connectionString": "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181",
	    "clientId": "nyc-fraud",
	    "zkOptions": {
	      "sessionTimeout": 30000,
	      "spinDelay": 1000,
	      "retries": 10
	    }
	}
  }
}

Config for HTTP target

{
  "filename": "nyc-fraud.json",
  "dataSchema": "integer,string,double,string,double,double,string,double,double,integer,integer",
  "chunkSize": 1000,
  "triggerInterval": 200,
  "loggerEnabled": false,
  "target": {
    "type": "http",
    "config": {
      "hostname": "127.0.0.1",
      "port": 12345,
      "method": "POST",
      "path": "/nyc-fraud",
      "headers": {
        "contentType": "application/json"
      }
    }   
  }
}

Configuration attributes:

  • filename: Name or path of the data

  • dataSchema: Data types of data's columns

  • target: Configuration of stream target

  • chunkSize: Number of datas will be written in triggerInterval

  • triggerInterval: Period of writing data

  • loggerEnabled: Logging flag for log to file or console

  • loggerType: Type of logger which can be file or console

  • logFilename: If logger type is file, logFilename will be used by logger to create a log file with name logFilename.

1.2.8

6 years ago

1.2.7

6 years ago

1.2.6

6 years ago

1.2.5

6 years ago

1.2.4

6 years ago

1.2.3

6 years ago

1.2.2

6 years ago

1.2.1

6 years ago

1.2.0

6 years ago

1.1.9

6 years ago

1.1.8

6 years ago

1.1.7

6 years ago

1.1.6

6 years ago

1.1.5

6 years ago

1.1.4

6 years ago

1.1.3

6 years ago

1.1.2

6 years ago

1.1.1

6 years ago

1.1.0

6 years ago

1.0.9

6 years ago

1.0.8

6 years ago

1.0.7

6 years ago

1.0.6

6 years ago

1.0.5

6 years ago

1.0.4

6 years ago

1.0.3

6 years ago

1.0.2

6 years ago

1.0.1

6 years ago

1.0.0

6 years ago

0.0.0

6 years ago