0.0.22 • Published 4 months ago

@awesomeniko/kafka-trail v0.0.22

Weekly downloads
-
License
MIT
Repository
github
Last release
4 months ago

Workflow

How to create enemy

  1. Choose what enemy type will be used for base. scenes/enemies
  2. Create specific enemy resource (goblin / bug / something else) in resources/enemies

How to spawn enemy

  1. Place spawner as level child. scenes/spawner.
  2. Configure spawner:
    • Type:
      • Outside viewport - spawning outside player viewport. Distance controls by Min Distance / Max Distance
      • Area - spawning enemies inside RectangleShape2D of CollisionShape2D
      • Marker - spawning enemies on exactly position of Marker2D
    • Mode:
      • SIMULTANEOUS, entries = [ {goblin,3}, {orc,1} ] → spawns 3 goblins + 1 orc every interval.
      • RANDOM_UNIFORM, total_count = 2, entries = [goblin, orc, slime] → spawns any 2 enemies chosen uniformly each tick.
      • RANDOM_WEIGHTED, total_count = 5, entries = [ {wolf, weight=5}, {bear, weight=1} ] → about 5:1 wolves to bears each tick.

Kafka-trail - MessageQueue Library

A Node.js library for managing message queues with Kafka, designed to simplify creating, using, and managing Kafka topics with producers and consumers.

Based on Kafkajs


Features

  • Fully in typescript
  • Branded types
  • Connect to Kafka brokers easily.
  • Create or use existing Kafka topics with specified partitions.
  • Initialize the message queue with minimal setup.
  • Setup consumer handlers
  • Compressing (see)
  • Supports custom encoders/decoders.

Installation

Install the library using npm or Yarn:

npm install kafka-trail

Or with Yarn:

yarn add kafka-trail

Usage

Here’s an example of how to use the kafka-trail library in your project.

If you want only producer:

// Define your Kafka broker URLs
import { 
  KTTopic, 
  KafkaClientId, 
  KafkaMessageKey, 
  KafkaTopicName, 
  KTMessageQueue 
} from "kafka-trail";

const kafkaBrokerUrls = ["localhost:19092"];

// Create a MessageQueue instance
const messageQueue = new KTMessageQueue();

// Start producer
await messageQueue.initProducer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
  },
})

// Create topic fn
const TestExampleTopic = KTTopic<{
  fieldForPayload: number
}>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works is batchConsuming = true
})

// Create or use topic
await messageQueue.initTopics([
  TestExampleTopic,
])

// Use publishSingleMessage method to publish message
const payload = TestExampleTopic({
  fieldForPayload: 1,
}, {
  messageKey: KafkaMessageKey.NULL, //If you don't want to specify message key
})

await messageQueue.publishSingleMessage(payload)

If you want consumer only:

import type  { pino } from "pino";

import { 
  KTHandler, 
  KTTopic, 
  KafkaClientId, 
  KafkaMessageKey, 
  KafkaTopicName, 
  KTMessageQueue 
} from "kafka-trail";

// Another dependency example
class DatabaseClass {
  #client: string
  constructor () {
    this.#client = 'test-client'
  }

  getClient() {
    return this.#client
  }
}

const dbClass = new DatabaseClass()

const kafkaBrokerUrls = ["localhost:19092"];

// Create a MessageQueue instance
const messageQueue = new KTMessageQueue({
  // If you want pass context available in handler
  ctx: () => {
    return {
      dbClass,
    }
  },
});

export const TestExampleTopic = KTTopic<{
  fieldForPayload: number
}>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works is batchConsuming = true
})

// Create topic handler
const testExampleTopicHandler = KTHandler({
  topic: TestExampleTopic,
  run: async (payload, ctx: {logger: pino.Logger, dbClass: typeof dbClass}) => {
    // Ts will show you right type for `payload` variable from `TestExampleTopic`
    // Ctx passed from KTMessageQueue({ctx: () => {...}})

    const [data] = payload

    if (!data) {
      return Promise.resolve()
    }

    const logger = ctx.logger.child({
      payload: data.fieldForPayload,
    })

    logger.info(dbClass.getClient())

    return Promise.resolve()
  },
})

messageQueue.registerHandlers([
  testExampleTopicHandler,
])

// Start consumer
await messageQueue.initConsumer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
    consumerGroupId: 'consumer-group-id', 
    batchConsuming: true // default false
  },
})

For both consumer and producer:

import { 
  KTHandler, 
  KTTopic, 
  KafkaClientId, 
  KafkaMessageKey, 
  KafkaTopicName, 
  KTMessageQueue 
} from "kafka-trail";

const kafkaBrokerUrls = ["localhost:19092"];

// Create a MessageQueue instance
const messageQueue = new KTMessageQueue();

// Create topic fn
const TestExampleTopic = KTTopic<{
  fieldForPayload: number
}>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works is batchConsuming = true
})

// Required, because inside handler we are going to publish data
await messageQueue.initProducer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
  },
})

// Create or use topic
await messageQueue.initTopics([
  TestExampleTopic,
])

// Create topic handler
const testExampleTopicHandler = KTHandler({
  topic: TestExampleTopic,
  run: async (payload, _, publisher, { heartbeat, partition, lastOffset, resolveOffset }) => { // resolveOffset available for batchConsuming = true only
    // Ts will show you right type for `payload` variable from `TestExampleTopic`

    const [data] = payload

    if (!data) {
      return Promise.resolve()
    }

    const newPayload = TestExampleTopic({
      fieldForPayload: data.fieldForPayload + 1,
    }, {
      messageKey: KafkaMessageKey.NULL,
    })

    await publisher.publishSingleMessage(newPayload)
  },
})

messageQueue.registerHandlers([
  testExampleTopicHandler,
])

// Start consumer
await messageQueue.initConsumer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
    consumerGroupId: 'consumer-group-id', 
    batchConsuming: true // default false
  },
})

Destroying all will help you perform graceful shutdown

const messageQueue = new MessageQueue();

process.on("SIGINT", async () => {
  await messageQueue.destroyAll()
});

process.on("SIGTERM", async () => {
  await messageQueue.destroyAll()
});

Configurations

Compression codec

By default, lib using LZ4 codec to compress and decompress data. You can override it, by passing via KTKafkaSettings type. Be careful - producer and consumer should have same codec. Ref docs. Example:

import { KTMessageQueue } from "kafka-trail";
import { CompressionTypes } from "kafkajs";
import lz4 from "lz4";

// Instanciate messageQueue
const kafkaBrokerUrls = ["localhost:19092"];

const messageQueue = new KTMessageQueue();

await messageQueue.initProducer({
  kafkaSettings: {
    brokerUrls: kafkaBrokerUrls,
    clientId: KafkaClientId.fromString('hostname'),
    connectionTimeout: 30_000,
    compressionCodec: {
      codecType: CompressionTypes.LZ4,
      codecFn: {
        compress(encoder: Buffer) {
          return lz4.encode(encoder);
        },

        decompress<T>(buffer: Buffer) {
          return lz4.decode(buffer) as T;
        },
      },
    },
  },
})

Data encoding / decoding

You can provide custom encoders / decoders for sending / receiving data. Example:

type MyModel = {
  fieldForPayload: number
}

const TestExampleTopic = KTTopic<MyModel>({
  topic: KafkaTopicName.fromString('test.example'),
  numPartitions: 1,
  batchMessageSizeToConsume: 10, // Works is batchConsuming = true
}, {
  encode: (data) => {
    return JSON.stringify(data)
  },
  decode: (data: string | Buffer) => {
    if (Buffer.isBuffer(data)) {
      data = data.toString()
    }

    return JSON.parse(data) as MyModel
  },
})

Contributing

Contributions are welcome! If you’d like to improve this library:

  1. Fork the repository.
  2. Create a new branch.
  3. Make your changes and submit a pull request.

License

This library is open-source and licensed under the MIT License.

0.0.22

4 months ago

0.0.21

4 months ago

0.0.20

4 months ago

0.0.19

5 months ago

0.0.18

5 months ago

0.0.17

5 months ago

0.0.16

5 months ago

0.0.15

6 months ago

0.0.14

6 months ago

0.0.13

6 months ago

0.0.12

6 months ago

0.0.11

6 months ago

0.0.10

6 months ago

0.0.9

6 months ago

0.0.8

6 months ago

0.0.7

6 months ago

0.0.6

6 months ago

0.0.5

6 months ago

0.0.4

6 months ago

0.0.3

6 months ago

0.0.2

6 months ago

0.0.1

6 months ago