sentry-fs v0.0.2
Sentry-fs是一个用于接收数据上传的任务节点, 当初始化一个上传任务后, 可以根据实际的传输需要, 分批进行数据片段的发送, 当最后数据包的累计字节数等于任务初始化时设定的总字节数时上传任务即完成, 数据片段将合并生成并保存为一个完整的文件. 目前上传可支持http1.1/https/http2/socket/socket.io/websocket/ftp/amqp/mqtt等协议;
上传的文件通过MD5唯一标识, 如果开通了下载插件(useDLoader), 则可通过MD5作为参数下载, 支持整个文件下载(/:md5)或者分段下载(/chunks/:md5)两种模式;
如果存在不同任务节点之间数据共享的需求, 可以启用节点插件(AsPeer), 它支持节点与节点之间的广播, 通过配置peer插件的参数, 任务数据将会在不同节点之间, 单向或者双向的进行传递, 可利用共享的任务元数据进行非本节点的数据下载;
日志分为系统日志和文件日志, 系统日志可查看当前节点的执行情况, 文件日志可查看以单个任务为单位的传输进展;
基本用法
new Sentry( conf ) // 设置配置参数, 实例化哨兵对象
.on(Event.CREATED, console.log) // 任务被创建时触发
.on(Event.RECEIVING, console.log) // 接收数据包时触发
.on(Event.COMPLETED, console.log) // 任务完成后触发
.on(Event.ERROR, console.log) // 发生错误时触发
.use( useDLoader ) // 启用下载插件
.use( useWebUI ) // 启用管理平台插件(暂未发布)
.use( asPeer ) // 启用节点服务插件
如何上传
HTTP:
初始化请求 POST: http://localhost:9090/init, body参数: { total, ...其他业务参数... }, total是一个必须字段, 用于标识此次上传的数据总字节数; 如果节点设置了需要访问Token, 则需要设置authorization; 该初始化请求会返回一个taskid字符串, 作为后续数据包传输的关键字;
数据包请求 POST: http://localhost:9090/data/:taskid/:packno/:packsize, RESTful请求tasked为初始化请求返回的关键字, packno为当前需要传输的数据包的编号, 该编号是一个从0开始的顺序数字, packsize为当前数据包的字节数; 该请求会返回一个taskid@packno格式的字符串;
Note上面的接口/init和/data可以通过actions配置项进行修改; 通过在配置参数中设置useHttps并设置CA证书,启用Https请求, 设置useHttp2启用Http2请求;
除了以上方法, 还可以使用安装包中提供的HttpClient实现快捷上传;
const token = getAccessToken({name:"geo-net-socket"}, {secret:"geo"})
const taskid = await HttpClient.init("http://localhost:9090/init", { total:32, name:"geo" }, token)
console.log("初始化成功,taskid=",taskid)
const code = await HttpClient.data(`http://localhost:9090/data`, taskid, 0, 32, Buffer.from("测试数据包1测试数据包1"), token)
console.log("发送数据包成功,code=",code)
FTP:
通过监控FTP上传目录来实现, 当配置中指定为allInOne时, 将参数、数据根据设定的分块参数, 写入一个文件内, 服务端监听到文件后, 根据配置参数进行解析, 如果发现文件的EOF标识, 则任务完成;
如果不是allInOne模式, 则通过两个文件来实现, 在actions中指定两种文件的扩展名; 一般分为init文件和data文件, init文件中包含了必要的初始化参数, data文件中为数据本体;
Amqp:
使用安装包中提供的MqClient.toAmqp或amqpSend方法发送消息到配置中指定的队列, 队列通过actions设置;
样例
const taskid = v4UUID()
const options = "amqp://geo:123456@localhost:5672"
MqClient.toAmqp(options, "geo-amqp-init", { name:"geo-amqp", total:32 }, { taskid })
MqClient.toAmqp(options, "geo-amqp-data", "测试数据包1测试数据包2", { taskid, packno: 0, pksize: 32 })
// 重复使用连接
// const client = await toLink<AMQP>(options)
// MqClient.amqpSend(client,"geo-amqp-init", { name:"geo-amqp", total:32 }, { taskid })
// MqClient.amqpSend(client,"geo-amqp-data", "测试数据包1测试数据包2", { taskid, packno: 0, pksize: 32 })
Mqtt:
使用安装包中提供的MqClient.toMqtt或mqttPublish方法发送消息到配置中指定的主题, 主题通过actions设置;
const taskid = v4UUID()
const options = "mqtt://geo:123456@localhost:1884"
MqClient.toMqtt(options, "geo-mqtt-init", { name:"geo-mqtt", total:32 }, taskid)
MqClient.toMqtt(options, "geo-mqtt-data", "测试数据包3测试数据包4", taskid, 0, 32)
// 重复使用连接
// const client = await toLink<MQTT>(options)
// MqClient.mqttPublish(client, "geo-mqtt-init", { name:"geo-mqtt", total:32 }, taskid)
// MqClient.mqttPublish(client, "geo-mqtt-data", "测试数据包3测试数据包4", taskid, 0, 32)
Socket:
通过安装包中提供的NetClient方法实现快捷上传, 动作Action可以通过actions设置;
const token = getAccessToken({name:"geo-net-socket"}, {secret:"geo"}, false)
const taskid = v4UUID()
const client = new net.Socket()
try{
await NetClient.connect(client, "127.0.0.1", 9000)
await NetClient.login(client, token)
const initBody = "total=32,name=geo"
const initHead = `action=geo-socket-init,taskid=${taskid},bodysize=${Buffer.byteLength(initBody)}`.padEnd(200," ")
NetClient.send(client, initHead + initBody)
const dataBody = "测试数据包1测试数据包2"
const dataHead = `action=geo-socket-data,taskid=${taskid},bodysize=${Buffer.byteLength(dataBody)},packno=0,pksize=${Buffer.byteLength(dataBody)}`.padEnd(200," ")
NetClient.send(client, dataHead + dataBody)
}catch(err){
console.log( err )
}
Socket.IO:
通过安装包中提供的IOClient方法实现快捷上传, 主题通过actions设置;
const token = getAccessToken({name:"geo-net-socket"}, {secret:"geo"}, false)
const taskid = v4UUID()
const client = io('https://localhost:9000', {
secure: true, // 指定使用安全的 HTTPS 协议
rejectUnauthorized: false, // 如果不验证证书,设置为 true
auth: { token }
})
try{
await IOClient.isAuthenticated(client)
IOClient.send(client, "geo-socketio-init", `taskid=${taskid},total=32,name=geo`)
const dataBody = "测试数据包1测试数据包2"
const dataHead = `taskid=${taskid},packno=0,pksize=${Buffer.byteLength(dataBody)}`.padEnd(200," ")
IOClient.send(client, "geo-socketio-data", dataHead + dataBody)
}catch(err){
console.log( err )
}
WebSocket:
通过安装包中提供的WSClient方法实现快捷上传, 动作Action可以通过actions设置;
const token = getAccessToken({name:"geo-net-socket"}, {secret:"geo"}, false)
const taskid = v4UUID()
const client = new WebSocket('https://localhost:9000', {
secure: true, // 指定使用安全的 HTTPS 协议
rejectUnauthorized: false, // 如果不验证证书,设置为 true
headers: { Authorization: token }
})
try{
await WSClient.isAuthenticated(client)
const initBody = "total=32,name=geo"
const initHead = `action=geo-ws-init,taskid=${taskid}`.padEnd(200," ")
WSClient.send(client, initHead + initBody)
const dataBody = "测试数据包1测试数据包1"
const dataHead = `action=geo-ws-data,taskid=${taskid},packno=0,pksize=${Buffer.byteLength(dataBody)}`.padEnd(200," ")
WSClient.send(client, dataHead + dataBody)
}catch(err){
console.log( err )
}
接口定义
// FTP服务配置
interface FtpConfig {
// 参数和数据是否在一个文件中, 当被设置时即被认为是该模式
allInOne?: {
target?: string; // 文件扩展名, 全部文件时设置“*”
whatIsEOF?: string; // 文件结束符
blocks?:{ // 按分块的先后顺序配置每个分块的参数, 包括分类标识, 编码, 分块的长度
tag?: "ini"|"dat"|"eof"; // 分类标识
encoding?: any; // 编码格式, UTF8 | ASCII 等, 默认utf8
len?: "flex"|number; // 字节数, 第一个和最后一个分块时可以设置为“flex”, 即剩余全部字节数
}[];
};
watchdir:string; // 需要监控的文件夹
intv?:number; // 每个多少秒检查一次, 非ALL-IN-ONE模式时设置
dura?:number; // 多少时间之后没有文件更新则认为文件已结束, 非ALL-IN-ONE模式时设置
Json4Init?: boolean, // 参数是否为JSON格式
SegSplits?: { // 非JSON格式时进行解析的设置
segs: ","|";"|"|"|"NEWLINE", // 键值对之间的分隔符, NEWLINE表示 /\r?\\n/
kv: string; // 键值之间的分隔符, 不能与键值对分隔符相同
}
}
/**
* 哨兵服务的配置参数
*/
interface SentryOptions {
logs?: {
systemlog: LogOpts; // 系统日志
footprint: FPOpts; // 文件日志
},
storage?: {
savepath?: string; // 文件存储路径
database?: string; // 数据库连接地址(redis)
archive?: {
enable?: boolean; // 是否将已经完成的任务进行归档
waits?: number; // 任务完成后等待多少秒之后进行归档处理
whereTo?: string; // 归档数据库文件的存放地址
filename?: string; // 归档数据库文件的名称
};
},
action?: {
init?: string; // 初始化标识符号, 默认为"init"; 对上传进行初始化, 一般是传递参数, 比如总大小, 用户、上传终端信息等, 生成任务; 当参数是一个任务编号时, 返回任务进度信息;
data?: string; // 数据包上传标识符号, 默认为"data"; 上传数据包, 将数据文件进行切割为数据包进行传输
},
packet?: { // 这个标签对于HTTP服务而言,设置与否无所谓; 主要是在socket通讯时解析数据包获取字段时需要识别; 只能是英文、数字、下划线
taskid?: string; // 任务编号, 用于标识当前数据包属于哪一个上传任务
packno?: string; // 数据包编号, 用于唯一标识当前数据包, 是一个数字, 明确当前数据包在所有数据包中的位置
pksize?: string; // 数据包大小, 用于表示数据包的字节数, 将累计所有数据包后与初始化任务的总大小进行比较, 确认任务已完成
},
thread?: {
filename?: string; // 线程文件名
checkIntv?: number; // 轮询时间间隔(秒), 默认1秒
taskLimit?: number; // 队列任务数量上限, 默认100
retryIntv?: number; // 当队列数量上限时, 当前任务等待指定时间后重新请求, 默认10秒
},
gate?: { // 默认端口9090
type?: GateType; // "http","socket","socketio","websocket","amqp","mqtt","ftp"
settings: HttpConfig|SocketConfig|IOSocketConfig|WebSocketConfig|FtpConfig|string; // 服务配置
actionTag?: string; // 在NetSocket或者WebSocket的时候需要指定识别数据包是初始化请求还是数据传输请求
argsInPacket: { // SocketIO或者WebSocket时需要设置; 需要拆解数据包进行分析参数部分和数据部分; NetSocket中已经通过包头包体进行了分解;
Json4Init?: boolean; // 是否初始化的参数是一个JSON
SegSplits?: { // 键值对时的配置
segs: ","|";"|"|"|"NEWLINE", // 键值对之间的分隔符, NEWLINE表示 /\r?\\n/
kv: string; // 键值之间的分隔符, 不能与键值对分隔符相同;
},
byteLen?: number; // SocketIO或者WebSocket的时候指定Packet数据包中参数部分的字节长度
}
},
plugins?: { // 插件配置
downloader?: HttpConfig; // 下载服务插件配置, 端口默认9091
webui?: HttpConfig; // 管理WebUI插件配置, 端口默认9092
peer?: HttpConfig & { // 节点服务配置, 端口默认9093
publicIP?: string; // 访问IP地址 (需要被外网访问时设置公网IP, 需要自动获取外网IP时不设置; 局域网设置局域网IP; 本机多节点设置127.0.0.1)
autoIpIfNonSet?: { // 如果没有设置publicIP, 则通过指定的URL获取JSON格式的回执, 指定字段获取IP地址
url: string; // 访问接口地址
field: string; // 获取IP的字段名称
},
broadcasts?: string[]; // 广播地址列表, 地址规则: 协议[可选]/IP地址[必须]/端口[可选]/访问令牌[可选]
policy?: { // 广播策略
trigger?: string; // 触发条件, 心跳模式: hearbeat:间隔时间(秒), 如heartbeat:5; 差分模式: onchanged当归档数据变化后
feedbackOnEat?: boolean; // 是否进行被动广播, 即对方广播给你后, 你也广播给对方, 即便没有在broadcasts中进行设置
encoding?: { // 编码规则: 广播时节点参数(args)、数据(data)进行编码、解码
src?: any; // 原始文本的编码格式, 默认utf8
encoded?: any; // 转换为传输文本时的编码格式, 默认base64
}
};
identify?: { // 节点Id的附加规则
allowPort?: boolean; // peerId创建时是否加入本机节点端口, 在同一台主机上设置不同的端口也会生成不同的节点
}
}
}
}
/**
* 支持的服务类型
*/
enum GateType {
Http = "http",
Socket = "socket",
SocketIO = "socketio",
WS = "websocket",
AMQP = "amqp",
MQTT = "mqtt",
FTP = "ftp",
}
/**
* 可监听事件
*/
enum Event {
CREATED = "__created__",
RECEIVING = "__receiving__",
COMPLETED = "__completed__",
ERROR = "__error__",
}
/**
* 数据哨兵
*/
class Sentry {
logger: Logger
service: any;
worker: Worker;
options: SentryOptions;
constructor(configFile:string);
constructor(options:SentryOptions);
/**
* 触发指定事件
* @param eventName
* @param args
*/
trigger(eventName:string, ...args:any[]): void ;
/**
* 监听指定事件
* @param event
* @param callback
* @returns
*/
on(event:Event, callback:(...args:any[])=>void): this;
/**
* 启用插件
* 当前仅支持插件服务的协议http1.1/http2/https
* @param usePlugin
* @returns
*/
use( usePlugin:Function ): this;
}
/**
* HTTP客户端上传工具
*/
namespace HttpClient {
function init(URL:string, params:any, token?:string): Promise<string>;
function data(URL:string, taskid:string, packno:number, packsize:number, buff:Buffer, token?:string): Promise<string>;
}
/**
* FTP客户端上传工具
*/
namespace FtpClient {
function file(savepath:string, filename:string, contents:Buffer|Buffer[], waits?:number): void;
}
/**
* MQ客户端上传工具
*/
namespace MqClient {
function amqpSend(client:AMQP, q:string, data:any, params:any): void;
function toAmqp(options:string, q:string, data:any, params:any): Promise<AMQP>;
function mqttPublish(client:MQTT, topic:string, data:any, taskid:string, packno?:number, packsize?:number): void;
function toMqtt(options:string, topic:string, data:any, taskid:string, packno?:number, packsize?:number): Promise<MQTT>;
}
/**
* TCP/Socket客户端上传工具
*/
namespace NetClient {
// 创建NetSocket连接
function connect(client:any, ip:string, port:number): Promise<void>;
// 身份验证
function login(client:any, token:string): Promise<void>;
// 发送消息
function send(client:any, data:Buffer): void;
}
/**
* Socket.IO客户端上传工具
*/
namespace IOClient {
// 是否已经身份验证通过
function isAuthenticated(socket:any, authTopic?:string): Promise<void>;
// 发送消息
function send(socket:any, topic:string, data:Buffer): void;
}
/**
* WebSocket客户端上传工具
*/
namespace WSClient {
// 是否已经身份验证通过
function isAuthenticated(socket:any): Promise<void>;
// 发送消息
function send(socket:any, data:Buffer): void;
}
/**
* 生成访问令牌
* @param payload 加密信息
* @param auth string | { secret:string } 密钥
* @param options {expiresIn, ...} 加密参数, 可选
* @param http 是否HTTP(http1.1/http2/https)服务的访问令牌, 默认true
* @returns
*/
function getAccessToken(payload:any, auth:string|{ secret:string; }): string;
function getAccessToken(payload:any, auth:string|{ secret:string; }, options:any): string;
function getAccessToken(payload:any, auth:string|{ secret:string; }, http:boolean): string;
function getAccessToken(payload:any, auth:string|{ secret:string; }, options:any, http:boolean): string;
一个HTTP节点的配置样例
{
"logs": {
"systemlog": {
"mode": "dev",
"path": "/node1/syslog"
},
"footprint": {
"path": "/node1/footlog",
"ext": ".fp",
"layout": "json"
}
},
"storage": {
"savepath": "/node1/dat",
"database": "redis://:123456@localhost:6379",
"archive": {
"enable": true,
"waits": 5,
"whereTo": "/node1",
"filename": "tasks.archived"
}
},
"action": {
"init": "init",
"data": "data"
},
"packet": {
"taskid": "taskid",
"packno": "packno",
"pksize": "pksize"
},
"gate": {
"type": "http",
"settings": {
"port": 9080,
"useHttps": false,
"useHttp2": false,
"allowHTTP1": true,
"credentials": {
"key": "/ca/ca.key",
"cert": "/ca/ca.pem"
},
"auth": {
"secret": "geo"
},
"cors": "*",
"urlencodedLimit": "10mb",
"public_path": "/public/html"
}
},
"plugins": {
"downloader": {
"port": 9081,
"auth": { "secret": "geo" }
},
"webui": {
"port": 9082,
"auth": { "secret": "geo" }
},
"peer": {
"publicIP": "127.0.0.1",
"port": 9083,
"auth": { "secret": "geo" },
"broadcasts": [],
"policy": {
"trigger": "onchanged",
"feedbackOnEat": true,
"encoding": {
"src": "utf8",
"encoded": "base64"
}
},
"identify": {
"allowPort": true
}
}
}
}