1.0.1 • Published 3 years ago

egg-yanxin-kafkajs v1.0.1

Weekly downloads
-
License
MIT
Repository
-
Last release
3 years ago

egg-yanxin-kafkajs

用于egg.js的kafka消息队列插件,依赖于kafkajs,相关文档详情:kafkajs

依赖说明

依赖的 egg 版本

enn-egg-kafka 版本egg 1.x
1.x😁
0.x

依赖的插件

  • kafkajs

开启插件

// config/plugin.js
exports.ennEggKafka = {
  enable: true,
  package: 'kafkajs',
};

使用场景

  • kafkajs模块封装的eggjs插件

配置

  • 配置 config/config.default.js
// 普通配置
config.ennEggKafka = {
    client: {
        clientId: 'my-app',
        brokers: ['1.1.1.1:9092'],
    },
    consumer: {
        groupId: 'group-test',
        fetchLogger: false, // 是否开启消费记录日志
    },
    producer: {
        allowAutoTopicCreation: true,
    },
};


// 阿里云kafka服务的SASL_SSL协议配置,仅作参考
config.ennEggKafka = {
    client: {
        clientId: 'my-app',
        // 可配置多个地址
        brokers: ['1.1.1.1:9093'],
        ssl: {
            rejectUnauthorized: false,
            // 阿里云ca文件
            ca: [path.join(__dirname, './config/ca-cert')],
        },
        sasl: {
            mechanism: 'plain',
            username: 'your username',
            password: 'your password',
        },
    },
    consumer: {
        groupId: 'group-test',
        fetchLogger: false, // 是否开启消费记录日志
    },
    producer: {
        allowAutoTopicCreation: true,
    }
}

其中fetchLogger为新增字段,其他配置字段详见 https://kafka.js.org/docs/getting-started

使用

// 生产消息
await app.kafka.producer.sendMessage('test_topic', [{ key: 'key2111', value: 'hey hey!1111' }])

// 单条信息消费
await app.kafka.consumer.initSubscribe({ topics: ['test_topic'] });
app.kafka.consumer.run({
    eachMessage: async ({
      topic,
      partition,
      message,
      heartbeat
    }) => {
      const data = {
        partition,
        topic,
        key: message.key.toString(),
        value: message.value.toString(),
        headers: message.headers,
        timestamp: message.timestamp,
        offset: message.offset,
      };
      if (message.extra) {
        data.extra = message.extra.toString();
      }
      console.log(message)
    }
});
  
// 批量消费信息
await app.kafka.consumer.initConsumer({ topic: 'test_topic' }); 
app.kafka.consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async ({
      batch,
      resolveOffset,
      heartbeat,
      commitOffsetsIfNecessary,
      uncommittedOffsets,
      isRunning,
      isStale,
    }) => {
        const datas = [];
        for (let message of batch.messages) {
            console.log(message)
            const data = {
            topic: batch.topic,
            partition: batch.partition,
            highWatermark: batch.highWatermark,
            message: {
                offset: message.offset,
                key: message.key.toString(),
                value: message.value.toString(),
                headers: message.headers,
            }
            };
            if (message.extra) {
                data.extra = message.extra.toString();
            }
            resolveOffset(message.offset);
            await heartbeat();
        }
    },
})

License

MIT