3.6.18 • Published 6 months ago

@lml_taf/taf-rpc v3.6.18

Weekly downloads
-
License
UNLICENSED
Repository
-
Last release
6 months ago

00 - 安装

taf-rpc模块发布在公司内部NPM镜像,NPM镜像参数设置的方法请参考文档。

设置好NPM参数之后,使用如下命令安装模块:

$ npm install taf-rpc

01 - taf-rpc简介

taf-rpc是TAF4NodeJS项目底层的RPC调用框架,提供了一个多服务器进程间进行RPC调用的基础设施。简单来说我们可以用这个模块做这些事情:

  • 使用jce2node将Jce文件翻译成客户端代理类代码后,供客户端调用任意的TAF服务。

  • 使用jce2node将Jce文件翻译成服务端代码后,可以实现标准的TAF服务,该服务可被任意使用JCE/WUP/TAF协议的客户端直接调用。

  • 远程日志、染色日志、属性上报、告警上报、tafnode与服务通信等框架内服务。

  • 创建自定义通信协议的客户端代理类(比如使用JSON格式的协议)。

  • 创建自定义通信协议的服务端(比如使用JSON格式的协议)。

  • 模块:taf-registry,功能:根据名字到主控查询该服务可用的IP列表。

taf-rpc分为客户端和服务器端两个部分。 客户端部分提供了rpc代理生成,消息路由和网络通讯等功能。 服务器端提供了远程服务暴露,请求派发,网络通讯等功能。

02 - 关于协议、Jce文件以及翻译工具jce2node的说明

在深入学习taf-rpc的相关知识之前,我们先理清JCE协议WUP协议TAF协议三者之间的关系:

  • JCE协议是一种数据编解码规则,它将整形、枚举值、字符串、序列、字典、自定义结构体等数据类型按照一定的规则编码到二进制数据流中。对端接收到二进制数据流之后,按照相应的规则反序列化可得到原始数值。

  • JCE协议使用一种叫做TAG的整型值(unsigned char)来标识变量,比如某个变量A的TAG值为100(该值由开发者自定义) ,我们将变量值编码的同时,也将该TAG值编码进去。对端需要读取变量A的数值时,就到数据流中寻找TAG值为100的数据段,找到后按规则读出数据部分即是变量A的数值。

  • JCE协议的定位是一套编码规则。JCE协议序列化之后的数据不仅可以进行网络传输,同时还可以存储到数据库或者DCache中。

  • WUP协议是JCE协议的上层封装,定位为通信协议。它使用变量名作为变量的关键字,编码时,客户端将变量名打包到数据流中;解码时,对端根据变量名寻找对应的数据区,然后根据数据类型对该数据区进行反序列化得到原始数值。

  • WUP协议内置一个JCE的Map类型,该Map的关键字就是变量名,Map的值是将变量的数据值经过JCE序列化的二进制数据。

  • WUP协议封装的数据包可以直接发送给TAF服务端,而服务端可以直接反序列化得到原始值。

  • TAF协议是对BasePacket使用JCE协议封装的通信协议。结构体包含比如请求序列号、协议类型、RPC参数序列化之后二进制数据等重要信息。

由Jce文件生成客户端或者服务端的代码通常有如下三种方法:

jce2node工具简介:

学习Jce文件的编写方法之后,我们就可以自己来定义通信描述文件,然后使用jce2node的不同命令行选项生成不同的代码文件:

$ jce2node Protocol.jce

上述命令将忽略interface描述段,只转换文件中定义的“常量”、“枚举值”、“结构体”等数据类型,供开发者当不使用TAF框架作为调用工具时的编解码库文件。生成的文件名称为“ProtocolJce.js”。

$ jce2node Protocol.jce --client

上述命令不仅转换文件中定义的“常量”、“枚举值”、“结构体”等数据类型,同时将interface的描述段翻译成RPC调用框架。生成的文件名称为“ProtocolProxy.js”,该文件供调用方使用。开发者引入该文件之后,可以直接调用服务端的服务。具体的使用方法请参考“npm install taf-rpc”模块的说明文档。

$ jce2node Protocol.jce --server

上述命令不仅转换文件中定义的“常量”、“枚举值”、“结构体”等数据类型,同时将interface的描述段翻译成服务端的接口文件。生成的文件名称为“Protocol.js”以及“ProtocolImp.js”,开发者不要改动“Protocol.js”,只需要继续完善“ProtocolImp.js”,实现文件中具体的函数,即可作为TAF服务端提供服务。具体的使用方法请参考“npm install taf-rpc”模块的说明文档。

jce2node支持的命令行参数及其作用:

选项作用
--taf-lib-path=\<DIRECTORY>指定taf-stream模块的路径,默认使用NodeJS的目录。
--with-taf是否允许“taf”作为命名空间(因为taf这个命名空间主要用于框架服务的jce文件定义)。
--dir=\<DIRECTORY>生成文件的输出目录。
--relative限定所有的Jce文件都在当前目录寻找。
--jceBase=\<DIRECTORY>指定Jce文件的搜索目录。
--r转换嵌套的Jce文件(比如在A.jce中包含了B.jce和C.jce,使用该参数,在翻译A.jce的同时,也将B.jce和C.jce翻译成JS代码。
--client生成客户端的调用类代码。
--server生成服务端的框架代码。

03 - taf-rpc示例和开发步骤

文档看不下去了,马上动手实测!

D:\
Demo > ls
目录: D:\
Demo
Mode
LastWriteTime
Length
Name
---------------------------
  -a-- - 2015 / 1 / 16
15
:
33
1294
client.js
- a-- - 2015 / 1 / 15
9
:
40
558
NodeJsComm.jce
- a-- - 2015 / 1 / 15
9
:
38
7991
NodeJsComm.js
- a-- - 2015 / 1 / 16
15
:
26
891
NodeJsCommImp.js
- a-- - 2015 / 1 / 15
9
:
38
8234
NodeJsCommProxy.js
- a-- - 2015 / 1 / 16
15
:
39
395
server.js

第二步,安装taf-rpc模块。

$ npm install taf-rpc

第三步,在一个命令行窗口中启动服务端。

$ node server.js

第四步,在另外一个命令行窗口中启动客户端。

$ node client.js

使用taf-rpc模块的开发步骤

第一步,编写jce文件,定义客户端与服务端通信用到的常量、枚举值、结构体、函数等通信协议。我们使用如下jce文件作为示例:

一般而言Jce文件通常由服务端开发制定、维护和提供。

module TRom
{
    struct User_t
    {
        0 optional int id = 0;
        1 optional int score = 0;
        2 optional string name = "";
    };

    struct Result_t
    {
        0 optional int id = 0;
        1 optional int iLevel = 0;
    };

    interface NodeJsComm
    {
        int test();

        int getall(User_t stUser, out Result_t stResult);

        int getUsrName(string sUsrName, out string sValue1, out string sValue2);

        int secRequest(vector<byte> binRequest, out vector<byte> binResponse);
    };
};

将上述内容保存为:NodeJsComm.jce。

第二步,根据jce文件生成客户端的调用代码

$ jce2node --client NodeJsComm.jce

第三步,客户端程序

//STEP01 引入系统模块以及工具生成的代码
var Taf = require("taf-rpc").client;
var TRom = require("./NodeJsCommProxy.js").TRom;

//STEP02 初始化TAF客户端
//       该步骤非必选项,后续文档将介绍[taf-rpc].client.initialize函数在什么情况下需要调用以及它做了哪些工作
//       initialize函数只需调用一次,初始化之后全局可用
//       在演示程序中我们不需要使用过多的特性,所以先将其注释
//Taf.initialize("./config.conf");

//STEP03 生成服务端调用代理类实例
var prx = Taf.stringToProxy(TRom.NodeJsCommProxy, "TRom.NodeJsTestServer.NodeJsCommObj@tcp -h 127.0.0.1 -p 14002 -t 60000");

//STEP04 客户端调用采用Promise机制进行回调,这里编写成功以及失败的回调函数
var success = function (result) {
  console.log("result.response.costtime:", result.response.costtime);
  console.log("result.response.return:", result.response.return);
  console.log("result.response.arguments.stResult:", result.response.arguments.stResult);
}

var error = function (result) {
  console.log("result.response.costtime:", result.response.costtime);
  console.log("result.response.error.code:", result.response.error.code);
  console.log("result.response.error.message:", result.response.error.message);
}

//STEP05 初始化接口参数,开始调用RPC接口
var stUser = new TRom.User_t();
stUser.name = "tencent-mig";

prx.getall(stUser).then(success, error).done();

将上述代码保存为client.js,使用如下命令即可调用服务端。

$ node client.js result.response.costtime: 7 result.response.return: 200 result.response.arguments.stResult: { id: 10000, iLevel: 10001 }

如果我们只是调用方,写到这里已经足矣。按照刚才的示例,拿到相应Jce文件我们就可以调用C++的TAF服务、Java的TAF服务或者NodeJS的TAF服务。

stringToProxy函数定义:

stringToProxy(proxyHandle, objName, setName, options);

参数说明:

  • proxyHandle:代理句柄。
  • objName: 被调服务是名字。
  • setName:set分区,可以不填,也可以填"",默认为空。
  • options: 选项,这里的选项主要有:
    keepAlive: 保持长连接标识,true:保持长连接;false:不保持长连接。
    heartInterval:长连接心跳时间,单位ms,跟keepAlive一起使用,建议20000。

例如:

Taf.stringToProxy(TRom.NodeJsCommProxy, "TRom.NodeJsTestServer.NodeJsCommObj", "", {keepAlive: true, heartInterval: 20000});

第四步,实现一个NodeJS版本的TAF服务。

首先,完形填空。完成Jce文件中定义的RPC函数,实现自己的业务逻辑。

jce2node的--erver 选项将Jce文件生成服务端的代码。使用该选项翻译工具不仅转换文件中定义的“常量”、“枚举值”、“结构体”等数据类型,同时将interface描述段翻译成服务端的接口文件。主要生成两个文件,比如在当前例子中会生成NodeJsComm.jsNodeJsCommImp.js。开发者 不需要也尽量不要 改动NodeJsComm.js ,该文件主要实现了结构体编解码、函数参数编解码、函数分发等功能。NodeJsCommImp.js继承与NodeJsComm.js ,该文件主要供开发者填补定义的RPC函数,实现业务逻辑。

var TRom = require('./NodeJsComm.js').TRom;
module.exports.TRom = TRom;

TRom.NodeJsCommImp.prototype.initialize = function () {
  //TODO::

}

TRom.NodeJsCommImp.prototype.test = function (current) {
  //TODO::

}

TRom.NodeJsCommImp.prototype.getall = function (current, stUser, stResult) {
  //TODO::
  //初始时,每个RPC函数都为空,需要开发者自己完形填空,补齐这里缺失的业务逻辑。
  //补齐业务逻辑之后,开发者调用current的sendResponse函数,返回数据给调用方。
  //需要注意:每个函数的sendResponse都是不一样的,它的参数与当前函数的 返回值 和 出参 相对应。
  //         如果当前函数有返回值,那么current.sendResponse的第一个参数应该是该返回。示例中当前函数的返回值为int类型,我们返回200作为示例。
  //         解决返回值的问题之后,我们按顺序写入当前的出参即可。参数的编解码和网络传输由框架解决。

  stResult.id = 10000;
  stResult.iLevel = 10001;

  current.sendResponse(200, stResult);
}

TRom.NodeJsCommImp.prototype.getUsrName = function (current, sUsrName, sValue1, sValue2) {
  //TODO::

}

TRom.NodeJsCommImp.prototype.secRequest = function (current, binRequest, binResponse) {
  //TODO::

}

接下来,创建一个服务入口文件。它主要负责读取配置文件、配置端口、设置协议解析器、启动服务等等工作。

var Taf = require("taf-rpc").server;
var TRom = require("./NodeJsCommImp.js").TRom;

var svr = Taf.createServer(TRom.NodeJsCommImp);
svr.start({
  name: "TRom.NodeJsTestServer.NodeJsCommObjAdapetr",
  servant: "TRom.NodeJsTestServer.NodeJsCommObj",
  endpoint: "tcp -h 127.0.0.1 -p 14002 -t 10000",
  protocol: "taf",
  maxconns: 200000
});

console.log("server started.");

将上述代码保存为server.js,使用如下命令启动。

$ node server.js

server started.

04 - 客户端的初始化函数taf-rpc.client.initialize

在演示代码中我们提到initialize不一定要显示调用,我们用其他方式同样可以设置我们需要的参数。

首先我们看下配置文件的格式和必要参数:

<taf>
    <application>
        <client>
            locator = taf.tafregistry.QueryObj@tcp -h 172.27.208.171 -p 17890 ##定义主控地址
            async-invoke-timeout=60000										  ##异步调用的超时时间(ms)
        </client>
    </application>
</taf>

这个配置文件正是由tafnode生成的,我们主要使用"taf.application.client.locator"和" taf.application.client.async-invoke-timeout"这个两个配置项。

什么情况下可以不用调用initialize函数?

如果我们在生成服务端代理时,每个服务端都使用直连的模式,也就是在stringToProxy中指定IP地址就可以不用初始化了。

除了使用配置文件设置这两个参数之外,我们可以调用taf-rpc.client对外暴露的函数进行设置:

var Taf  = require("taf-rpc").client;

Taf.set("locator", "taf.tafregistry.QueryObj@tcp -h 172.27.208.171 -p 17890");
Taf.set("timeout", 60000);

上述的调用方法,与使用initialize+配置文件的方式等价。

05 - TAF服务的创建方法

taf-rpc有三种方法创建一个标准的TAF服务:

第一种,使用tafnode生成的配置文件。

使用这种方法与TAF4C++的使用方式一样。

首先需要我们在TAF运维管理平台配置服务的Obj,然后在启动程序时由tafnode生成包含监听端口的配置文件,然后服务框架再依赖该配置绑定端口+启动服务。

tafnode生成的配置文件类似与如下:

<taf>
	<application>
		enableset=n
		setdivision=NULL
		<server>
			node=taf.tafnode.ServerObj@tcp -h 10.209.15.37 -p 19386 -t 60000
			app=TRom
			server=NodeJsTestServer
			localip=10.209.15.37
			netthread=2
			local=tcp -h 127.0.0.1 -p 10002 -t 3000
			basepath=/usr/local/app/taf/tafnode/data/MTT.NodeJSTest/bin/
			datapath=/usr/local/app/taf/tafnode/data/MTT.NodeJSTest/data/
			logpath=/usr/local/app/taf/app_log//
			logsize=15M
			config=taf.tafconfig.ConfigObj
			notify=taf.tafnotify.NotifyObj
			log=taf.taflog.LogObj
			deactivating-timeout=3000
			openthreadcontext=0
			threadcontextnum=10000
			threadcontextstack=32768
			closeout=0
			<TRom.NodeJsTestServer.NodeJsCommObjAdapter>
				allow
				endpoint=tcp -h 127.0.0.1 -p 14002 -t 60000
				handlegroup=TRom.NodeJsTestServer.NodeJsCommObjAdapter
				maxconns=200000
				protocol=taf
				queuecap=10000
				queuetimeout=60000
				servant=TRom.NodeJsTestServer.NodeJsCommObj
				shmcap=0
				shmkey=0
				threads=5
			</TRom.NodeJsTestServer.NodeJsCommObjAdapter>
		</server>
		<client>
			locator=taf.tafregistry.QueryObj@tcp -h 172.27.208.171 -p 17890:tcp -h 172.27.34.213 -p 17890
			refresh-endpoint-interval=60000
			stat=taf.tafstat.StatObj
			property=taf.tafproperty.PropertyObj
			report-interval=60000
			sample-rate=1000
			max-sample-count=100
			sendthread=1
			recvthread=1
			asyncthread=3
			modulename=TRom.NodeJsTestServer
			async-invoke-timeout=60000
			sync-invoke-timeout=3000
		</client>
	</application>
</taf>

我们使用该配置文件创建一个服务,代码如下:

//STEP01 引入关键模块
var Taf = require("taf-rpc");
var TRom = require("./NodeJsCommImp.js");

//STEP02 创建一个服务的实例
//注意这里的配置,在正式环境时,用 process.env.TAF_CONFIG 来表示配置文件的路径
//也就是:svr.initialize(process.env.TAF_CONFIG, function (server){ ... });
var svr = new Taf.server();
svr.initialize("./TRom.NodeJsTestServer.config.conf", function (server) {
  server.addServant(TRom.NodeJsCommImp, server.Application + "." + server.ServerName + ".NodeJsCommObj");
});

//STEP03 上步初始化服务之后,开始启动服务
svr.start();

第二种,显示化服务端信息

//STEP01 引入关键模块
var Taf  = require("taf-rpc").server;
var TRom = require("./NodeJsCommImp.js").TRom;

//STEP02 创建一个服务的实例
//注意这里的“endpoint”和“protocol”为必选项,格式必须如下示例相同
var svr  = Taf.createServer(TRom.NodeJsCommImp);
svr.start({
    name     : "TRom.NodeJsTestServer.AdminObjAdapetr",
    servant  : "TRom.NodeJsTestServer.AdminObj",
    endpoint : "tcp -h 127.0.0.1 -p 14002 -t 10000",
    maxconns : 200000,
    protocol : "taf"
});

console.log("server started.");

第三种,从tafnode生成的配置文件中,选取部分服务来启动

//STEP01 引入关键模块
var Taf   = require("taf-rpc");
var TRom  = require("./NodeJsCommImp.js");

Taf.server.getServant("./TRom.NodeJsTestServer.config.conf").forEach(function (config){
    var svr, map;
    map = {
        'TRom.NodeJsTestServer.NodeJsCommObj' : TRom.NodeJsCommImp
    };

    svr = Taf.server.createServer(map[config.servant]);
    svr.start(config);
});

06 - TAF客户端的实现原理

07 - TAF服务端的实现原理

08 - taf-rpc作为客户端调用第三方协议服务的示例

首先我们先定一个双方认可的通信协议,比如我们以Json格式作为通信协议,格式假定:

//客户端 --> 服务端 
{
  P_RequestId : 0, 					//本次调用的序列号
    P_FuncName
:
  'test'				//本次调用的函数名称
  P_Arguments : ['aa', 'bb'...
..]		//本次调用的函数参数
}

//客户端 <-- 服务端
{
  P_RequestId : 0, 					//本次调用的序列号
    P_FuncName
:
  'test'				//本次调用的函数名称
  P_Arguments : ['ee', 'ff'...
..]		//本次调用的返回参数
}

实现协议解析类:

//将文件保存为Protocol.js

var stream = function () {
  this._name = "json";
}

stream.prototype.__defineGetter__("name", function () {
  return this._name;
});

module.exports = stream;

/**
 * 根据传入数据进行打包的方法
 * @param request
 * request.iRequestId : 框架生成的请求序列号
 * request.sFuncName  : 函数名称
 * request.sBuffer  : 函数的参数列表
 */
stream.prototype.compose = function (data) {
  var str = JSON.stringify({
    P_RequestId: data.iRequestId,
    P_FuncName: data.sFuncName,
    P_Arguments: data.sBuffer
  });

  var len = 4 + Buffer.byteLength(str);
  var buf = new Buffer(len);
  buf.writeUInt32BE(len, 0);
  buf.write(str, 4);

  return buf;
}

/**
 *
 * 网络收取包之后,填入数据判断是否完整包
 * @param networkBuffer 网络buffer, NetworkBuffer类 @lml_taf/taf-rpc 下的NetworkBuffer
 * @param message: 如果有完整数据包, 返回完整数据包
 * @return true/false: 返回是否有完整数据包
 *
 * 当有一个完整的请求时,解包函数抛出事件,需按照如下格式补充事件的数据成员:
 *
 * {
 *     iRequestId :  0,	//本次请求的序列号
 *     sFuncName  : "",	//本次请求的函数名称
 *     sBuffer  : []	//本次请求的参数列表
 * }
 *
 */
stream.prototype.feed = function (networkBuffer, message) {

  if (networkBuffer.getBufferLength() < 4) {
    return false;
  }
  //有一个完整的包
  var Length = networkBuffer.readLength();

  if (Length > networkBuffer.getBufferLength()) {
    //包没有收全
    return false;
  }

  networkBuffer.moveHeader(4);

  var result = JSON.parse(networkBuffer.getBuffers(Length - 4).toString());
  message.iRequestId
:
  result.P_RequestId,
    message.sFuncName
:
  result.P_FuncName,
    message.origin.sBuffer
:
  result.P_Arguments

  networkBuffer.moveHeader(Length - 4);

  return true;

  // var BinBuffer = data;
  // if (this._data != undefined) {
  //     var temp = new Buffer(this._data.length + data.length);
  //     this._data.copy(temp, 0);
  //     data.copy(temp, this._data.length);
  //     this._data = undefined;
  //     BinBuffer = temp;
  // }

  // for (var pos = 0; pos < BinBuffer.length; ) {
  //     if (BinBuffer.length - pos < 4) {
  //         break;
  //     }
  //     var Length = BinBuffer.readUInt32BE(pos);
  //     if (pos + Length > BinBuffer.length) {
  //         break;
  //     }
  //     var result   = JSON.parse(BinBuffer.slice(pos + 4, pos + Length).toString());
  //     var request  =
  //     {
  //         iRequestId : result.P_RequestId,
  // 		sFuncName  : result.P_FuncName,
  //         Arguments  : result.P_Arguments
  //     };

  //     this.emit("message", request);
  //     pos += Length;
  // }

  // if (pos != BinBuffer.length) {
  //     this._data = new Buffer(BinBuffer.length - pos);
  //     BinBuffer.copy(this._data, 0, pos);
  // }
}

// /**
//  * 重置当前协议解析器
//  */
// stream.prototype.reset = function () {
//     delete this._data;
//     this._data = undefined;
// }

客户端使用该协议解析器,调用服务端的代码:

var Taf = require("taf-rpc").client;
var Protocol = require("./ProtocolClient.js");

var prx = Taf.stringToProxy(Taf.ServantProxy, "test@tcp -h 127.0.0.1 -p 12306 -t 60000");
prx.setProtocol(Protocol);
prx.rpc.createFunc("echo");

var success = function (result) {
  console.log("success");
  console.log("result.response.costtime:", result.response.costtime);
  console.log("result.response.arguments:", result.response.arguments);
}

var error = function (result) {
  console.log("error");
  console.log("result.response.error.code:", result.response.error.code);
  console.log("result.response.error.message:", result.response.error.message);
}

prx.rpc.echo("tencent", "mig", "abc").then(success, error);

09 - taf-rpc作为第三方协议服务的示例

首先实现RPC函数处理类,注意框架的分发逻辑:

A.如果客户端传来的函数名,是处理类的函数,那么框架有限调用对应函数

B.如果客户端传来的函数不是处理的函数,那么调用该处理类的 onDispatch函数,由该函数负责处理该请求

C.如果也没有onDispatch函数,则报错

//将该文件保存为:EchoHandle.js
var Handle = function () {

}

Handle.prototype.initialize = function () {
}
Handle.prototype.echo = function (current, v1, v2, v3) {
  console.log("EchoHandle.echo::", v1, v2, v3);

  current.sendResponse("TX", "TX-MIG");
}

Handle.prototype.onDispatch = function (v1, v2, v3) {
  console.log("EchoHandle.onDispatch::", v1, v2, v3);
}

module.exports = Handle;

服务端启动函数的代码示例:

var Taf = require("taf-rpc").server;
var Protocol = require("./ProtocolClient.js");
var Handle = require("./EchoHandle.js");

var svr = Taf.createServer(Handle);
svr.start({
  endpoint: "tcp -h 127.0.0.1 -p 12306 -t 10000",
  protocol: Protocol
});