1.0.1 • Published 5 months ago

etl_pod_nest v1.0.1

Weekly downloads
-
License
UNLICENSED
Repository
-
Last release
5 months ago

一,ETL监控系统架构

op6s52.png

二,etl pod 节点设计哲学

1,ETL的过程

uvhk1a.png

2,关键逻辑称呼

  • 数据输入:input
  • 数据输出:output
  • 数据流类型:tag

一次数据流包括:数据流种类,输入 和 输出

3,ETL pod 接入方式推荐

  1. 给ETL的数据分类

    比如flaw,panel, sheet, ....

  2. 找出数据流入, 和数据流出的部分

  3. 接入 输入 和 输入的监控信息

三,使用方法

1,下载包npm i etl_pod_nest

2,在app.module.ts中注册

import { EtlPodModule } from 'etl_pod_nest';

EtlPodModule.forRoot({
    // ETL 节点名称【对应监控系统中的ETL节点名称】
    etlPodName: 'pol_data_extractor',
    // ETL 监控中心服务IP
    masterPodIp: '127.0.0.1',
    // ETL 监控中心服务ws 端口【默认4001】
    masterPodPort: 4001,
    // ETL 描述
    desc: 'pol 数据抽取',
    // 数据流列表
    tags: {
        // 数据流标识key
        doff: {
            // 源节点名称
            sourcePod: 'kafka',
            // 目标节点名称
            targetPod: 'mysql',
            // 数据流功能描述
            desc: 'doff卷数据同步通道',
        },
        flaw: {
            sourcePod: 'kafka',
            targetPod: 'mysql',
            desc: 'flaw缺陷数据 同步通道',
        },
        other: {
            sourcePod: 'kafka',
            targetPod: 'redis',
            desc: '其他数据同步通道',
        },
    },
}),

3,在业务中 将ETL信息到 监控节点

(1)使用装饰器

使用装饰器,默认会拦截函数的执行时间

tag: 数据流类型

// 数据输入
@DataInput({ tag: 'other' })
async getData(data) {
    // 数据流入,清洗
    const data: any = ...;
    return data.mock.slice(0, count);
}

// 数据输出
@DataOutput({ tag: 'other' })
  async uploadFlawHasIndex(data) {
      try {
          // 数据上传
          await 。。。
          return data;
      } catch (error) {
          console.log(error);
      }
  }

上传自定义监控信息

函数 返回值实现指定接口 CustomMonitor

import { CustomMonitor } from 'src/etl-pod/etl.pod.class';
import { DataInput } from 'src/main';
// 数据输入
@DataInput({tag: 'flaw'})
function dataInput(): CustomMonitor {
    return {
        // 其他业务信息
        a: 1,
        // 自定义监控信息
        monitorData: {
            id: {
                value: 1,
                // 描述
                desc: 'asdas',
                // 单位
                suffix: '%',
                // 是否要形成折线图
                charts: true,
            },
        },
    };
}

(2)使用注入Sender发送

import { Injectable } from '@nestjs/common';
import { DataInput, DataOutput, Sender, EtlSender } from 'etl_pod_nest';

@Injectable()
export class DemoService {

    // 注入Sender
    @Sender()
    private etlSender: EtlSender;

    constructor( ) {}

    async getData2() {
       
        let startTime = Date.now();
        let count = Math.floor(Math.random() * 1000);
        const data: any =  ...;
        const curData = data.mock.slice(0, count);
        // 自定义 发送ETL信息到监控节点
        this.etlSender.send({
            tag: 'doff',
            // 输入 | 输出 , input | output
            type: 'input',
            timeStart: startTime,
            timeEnd: Date.now(),
            // 自定义的监控信息
            customMsg: {
                curId: {
                    value: curData[curData.length - 1]?.uid,
                    desc: '当前doffId',
                    suffix: '',
                    charts: true,
                },
                curFactory: {
                    value: curData[curData.length - 1]?.factory,
                    desc: '当前工厂',
                    suffix: '',
                },
                counts: {
                    value: curData.length,
                    desc: '同步数量',
                    charts: true,
                    suffix: '个',
                },
            },
        });

        return data.mock;
    }
}