1.0.51 • Published 21 days ago

egg-plugin-grpc-server v1.0.51

Weekly downloads
2
License
MIT
Repository
github
Last release
21 days ago

egg-grpc-server

NPM version build status Test coverage David deps Known Vulnerabilities npm download

Install

npm install egg-plugin-grpc-server --save

Usage

// {app_root}/config/plugin.js
exports.grpcServer = {
  enable: true,
  package: 'egg-plugin-grpc-server',
};

Configuration

// {app_root}/config/config.default.js
exports.grpcServer = {
  port: 50051,             // grpc监听端口
  host: '127.0.0.1',       // 监听地址
  timeOut: 5000,           // 超时时间
  protoDir: 'app/proto',  // proto文件所在文件夹
  grpcDir: 'app/grpc'     // 接口实现所在文件夹
  errorHandle(error) {     // 全局统一错误处理
    // TODO
    // this 为ctx,接受error参数
  }
};

see config/config.default.js for more detail.

Example

启动

  • 插件在app上挂载了GrpcServer的class,需要在项目启动时实例化并调用start方法
  • 启动方式一:直接在app.js实例化并启动不推荐(在多进程模式下会报错)
// ${app_root}/app.js
module.exports = app => {
  const grpcServer = new app.GrpcServer(app);
  grpcServer.start();
  Reflect.defineProperty(app, 'grpcServer', { value: grpcServer });
};
  • 启动方式二:使用cluster-client管理启动
    • agent上创建leader,管理work中server实例
// ${app_root}/app.js
module.exports = app => {
  const registryClient = app.cluster(app.RealClient, { isBroadcast: false }).create({})

  app.beforeStart(async () => {
    // worker 订阅启动grpc 服务消息
    registryClient.subscribe({
      dataId: 'grpc.server.leader.attach'
    }, val => {
      app.logger.info(`worker subscribe [grpc.server.leader.attach]: ${val}`)
      if (val === process.pid) {
        const grpcServer = new app.GrpcServer(app)
        grpcServer.start()
        Reflect.defineProperty(app, 'grpcServer', { value: grpcServer })
        // 向leader发送start消息
        registryClient.publish({
          dataId: 'grpc.server.worker.start',
          publishData: process.pid
        })
      }
    })

    // 向leader发送ready消息
    registryClient.publish({
      dataId: 'grpc.server.worker.ready',
      publishData: process.pid
    })
  })

  // 退出时关闭 grpc server 并 像 leader 发送 close 信号
  app.beforeClose(async () => {
    await app.grpcServer.close()
    // 向leader发送ready消息
    registryClient.publish({
      dataId: 'grpc.server.worker.close',
      publishData: process.pid
    })
  })

  process.on('disconnect', async () => {
    await app.grpcServer.close()
    registryClient.publish({
      dataId: 'grpc.server.worker.close',
      publishData: process.pid
    })
  })

  process.on('uncaughtException', async err => {
    await app.grpcServer.close()
    registryClient.publish({
      dataId: 'grpc.server.worker.close',
      publishData: process.pid
    })
    app.logger.error(err)
  })
}
// ${app_root}/agent.js
module.exports = agent => {
  const registryClient = agent.cluster(agent.RealClient, { isBroadcast: false }).create({})

  agent.beforeStart(async () => {
    await registryClient.ready()
    const works = new Map()

    // 订阅 worker ready消息,并记录
    registryClient.subscribe({
      dataId: 'grpc.server.worker.ready'
    }, val => {
      agent.logger.info(`leader subscribe [grpc.server.worker.ready]: ${val}`)
      works.set(val, false)
      let isStarted = false
      works.forEach((value, key) => {
        if (value) isStarted = true
      })
      if (!isStarted) {
        registryClient.publish({
          dataId: 'grpc.server.leader.attach',
          publishData: [...works.keys()][0]
        })
      }
    })

    // 订阅 worker grpc server启动消息
    registryClient.subscribe({
      dataId: 'grpc.server.worker.start'
    }, val => {
      agent.logger.info(`leader subscribe [grpc.server.worker.start]: ${val}`)
      works.set(val, true)
    })

    // 订阅worker关闭消息
    registryClient.subscribe({
      dataId: 'grpc.server.worker.close'
    }, val => {
      agent.logger.info(`leader subscribe [grpc.server.worker.close]: ${val}`)
      agent.logger.info(`leader current works: ${works}`)
      works.delete(val)
      registryClient.publish({
        dataId: 'grpc.server.leader.attach',
        publishData: [...works.keys()][0]
      })
    })
  })

  process.on('uncaughtException', err => {
    agent.logger.error(err)
  })
}
  • proto/xxx.proto service中的的接口名 grpc/xxx.js 中的接口名 二者应当同名(grpc/xxx.js文件名可不大写)

proto 文件,可通过config.dir配置目录,默认app/proto

// {app_root}/app/protos/hello.proto
syntax = "proto3";

package egg.node;

service Hello {
    rpc sayHello (HelloReq) returns (HelloResp) {};
    rpc buf (stream BufRequest) returns (stream BufResp) {};
}

message HelloReq {
    string name = 1;
    int32 group = 2;
}

message HelloResp {
    int32 code = 1;
    string message = 2;
}

message BufRequest {
    string name = 1;
}

message BufResp {
    string message = 1;
    int32 code = 2;
}

接口实现,thiseggContext,接受一个参数为请求的call(grpc中的call,call.request为请求参数)

// {app_root}/app/grpc/hello.js
'use strict';

exports.sayHello = async function(call) {
  const { request } = call;

  function sleep(time) {
    return new Promise(reslove => {
      setTimeout(() => {
        reslove();
      }, time);
    });
  }

  await sleep(1000);

  return {
    code: 200,
    message: 'hello ' + request.name + ', you are in ' + request.group,
  };
};

exports.buf = function(call) {

  call.on('data', data => {
    console.log('service recive data' + JSON.stringify(data));
    call.write('hello' + data.name);
  });
  call.on('end', () => {
    call.end();
  });
};

单元测试

  • unittest环境中app挂载了grpcServerTest方法用于测试
const { assert, app } = require('egg-mock/bootstrap');
const path = require('path');
const PROTO_PATH_HELLO = path.join(__dirname, '../app/proto/hello.proto');
const grpc = require('grpc');
describe('test/grpc-server.test.js', () => {
  let test;
  before(async () => {
    await app.ready();
    test = app.grpcServerTest
  });
  it('should visit hello by grpcServer.hello', async () => {
    const res = await test.call(app, {
      proto: PROTO_PATH_HELLO,
      implement: 'sayHello',
      service: 'Hello', // 默认第一个service
      data: { name: 'leo', group: 1 },
    });
    assert(res.code === 200);
  });
  it('should visit by stream test', () => {
    const call = test.call(app, {
      proto: PROTO_PATH_HELLO,
      implement: 'buf',
    });
    call.write({ name: 'leo' });
    call.end();
    call.on('data', function(data) {
      console.log('buf-resp-data', data);
    });
    call.on('end', function() {
      // TODO
    });
  });
  

Questions & Suggestions

Please open an issue here.

License

MIT

1.0.51

21 days ago

1.0.50

4 months ago

1.0.49

6 years ago

1.0.48

6 years ago

1.0.47

6 years ago

1.0.46

6 years ago

1.0.45

6 years ago

1.0.44

6 years ago

1.0.43

6 years ago

1.0.42

6 years ago

1.0.41

6 years ago

1.0.40

6 years ago

1.0.39

6 years ago

1.0.38

6 years ago

1.0.37

6 years ago

1.0.36

6 years ago

1.0.35

6 years ago

1.0.34

6 years ago

1.0.33

6 years ago

1.0.32

6 years ago

1.0.31

6 years ago

1.0.3

6 years ago

1.0.2

6 years ago

1.0.1

6 years ago

1.0.0

6 years ago