5.3.2-beta.0 • Published 3 years ago

eventor-kafka-elastic-bridge v5.3.2-beta.0

Weekly downloads
-
License
-
Repository
-
Last release
3 years ago

eventor-kafka-elastic-bridge

The Eventor Kafka-Elastic-Bridge is an interface from the HIT Kafka infrastructure (Eventor) to DTC's ElasticSearch cluster - this allows the data that is pumped into the cluster to be searchable and certain Kafka messages can be traced / documented.

Configuration

As with all Eventor bridges, the config (specified in config.json) has three parts:

{
  "sourceConfig": "...",
  "sinkConfig": "...",
  "transformConfig": "..."
}

Examples of configurations can be found in the eventor-bridge-shared repo.

sourceConfig

The sourceConfig section configures the Kafka connection string and related properties. See the node-rdkafka page on configuration.

sinkConfig

The sinkConfig configures the ElasticSearch connection string and its options. The common module makes use of the ElasticSearch Node.js client.

There is a defaultIndex field that should be specified, and this will be the index that all messages will be sent to, barring any additional indices specified in the CustomEsIndexer mentioned below. On startup, the sink will detect and create any non-existent indices, with settings configured in the indexSettings and indexMappings field of the sinkConfig. Each index will have its own entry under the relevant sections, and the body will conform to the ElasticSearch configuration standards. Additional indices can be set under the transformConfig.

transformConfig

The current transform class used is the KafkaToElasticTransform class. Users can change this to whichever transform they need. Check the documentation here.

The transformConfig contains a map of additional index names to the settings, if desired:

{
  ...
  "transformConfig": {
    "myExtraIndex": {
      "indexSettings": { ... },
      "indexMappings": { ... }
    },
    "myOtherIndex": {
      "indexSettings": { ... },
      "indexMappings": { ... }
    },
    ...
  }
}

Configuring a Custom Transformation

in the KafkaToElasticTransform class, the user is provided with the ability to inject their own, custom, transformer code. Exposed via the base class CustomEsIndexer, this interface has one abstract transform() function that can perform any other transformations the user may need between Kafka and ElasticSearch. To use this feature, extend the base class and implement the transform() function as desired, and pass it an instance of it to the constructor of the KafkaToElasticTransform. This class also is able to manage any additional indices that messages will be posted to in Elasticsearch.

N.B.: the KafkaToElasticTransform transform function by default does not touch the message coming out of Kafka, except for parsing the message body buffer into JSON. The KafkaToElasticTransform.prepareForIndex() function takes the resulting body and builds a proper ES client payload.

Error Handling The custom transformer should handle its own errors, but the calling code is nested inside a try...catch which will lob any error during the transform step up the call-stack.

Exposed Methods

When constructing the KafkaToElasticTransform, note that the constructor requires a defaultIndexName parameter. This is exposed on the ElasticSink object via a getter, since that class already contains the configuration for the default index.

The ElasticSink class also has exposed a static function which constructs the master list of indices which will be used. By convention, the first element of the list is the defaultIndex, and additional indices are appended.