0.0.2 • Published 10 months ago

@blued-core-oversea/kafka-client v0.0.2

Weekly downloads
-
License
ISC
Repository
-
Last release
10 months ago

基于 kafka-node 的一层封装。

npm i @blued-core-oversea/kafka-client
import { KafkaClient } from '../index'

const kafka = {
  oversea: '/blued/backend/ukafka/oversea_app/bootstrap',
}

const kafkaClient = new KafkaClient(kafka)
export const liveLogKafkaClient = () => kafkaClient.getClient('oversea')

const testData = { timestamp: 1629460128, data: { uid: 90000024, extra: { event: 15, targetUid: 90000024, liveId: 0, taskLevel: '4', taskId: '8' } }, app: 2 }
const testData2 = { timestamp: 1629460144, data: { uid: 90000024, extra: { event: 15, targetUid: 90000024, liveId: 0, taskLevel: '4', taskId: '8' } }, app: 2 }
const testData3 = { timestamp: 1629460155, data: { uid: 90000024, extra: { event: 15, targetUid: 90000024, liveId: 0, taskLevel: '4', taskId: '8' } }, app: 2 }
const testData4 = { timestamp: 1629460166, data: { uid: 90000024, extra: { event: 15, targetUid: 90000024, liveId: 0, taskLevel: '4', taskId: '8' } }, app: 2 }
const testData5 = { timestamp: 1629460177, data: { uid: 90000024, extra: { event: 15, targetUid: 90000024, liveId: 0, taskLevel: '4', taskId: '8' } }, app: 2 }
function getPartition(size: number) {
  const data = Math.floor(Math.random() * size)
  return data >= 1 ? data : 1
}

export const sleep = (ms: number) => new Promise(resolve => setTimeout(resolve, ms))

const main = async () => {
  // 自封装调用
  liveLogKafkaClient().send(
    'report-log',
    JSON.stringify(testData5),
    {
      key: '',
      partition: getPartition(3),
      attributes: 0,
    }
  )
  console.log('--------------before ready')

  await sleep(10000)
  console.log('---------------start')
  // 自封装调用
  liveLogKafkaClient().send(
    'report-log',
    JSON.stringify(testData),
    {
      key: '',
      partition: getPartition(3),
      attributes: 0,
    }
  )

  // 自封装调用「async模式」
  await liveLogKafkaClient().sendAsync(
    'report-log',
    JSON.stringify(testData3),
    {
      key: '',
      partition: getPartition(3),
      attributes: 0,
    }
  )

  // 原生调用
  liveLogKafkaClient().primarySend([{
    topic: 'report-log',
    key: '',
    partition: getPartition(3),
    attributes: 0,
    messages: JSON.stringify(testData2),
  }])

  // 原生调用「async模式」
  await liveLogKafkaClient().primarySendAsync([{
    topic: 'report-log',
    key: '',
    partition: getPartition(3),
    attributes: 0,
    messages: JSON.stringify(testData4),
  }])

  console.log('---------------over')
  await sleep(10000)
}

main().then(() => {
  console.log('success')
}).catch(error => {
  console.log(error)
})