0.0.7 • Published 9 years ago

handhole v0.0.7

Weekly downloads
1
License
MIT
Repository
github
Last release
9 years ago

Handhole

Build Status

This is develop/maintenance tool for StreamAPI

Target

Make it eacy to use StreamAPI : StreamAPIをもっと簡単に

データがどこを流れている、どう流れているかわかりにくい点、 そしてデータを流し込んだり、パイプをつなぐ操作が煩雑なので、負荷を軽減をするために作りました。

Install npm install handhole

Use

var Handhole = require('handhole');

var hh = Handhole(stream);
var hh = HandHole([s1,s2]);

Methods

Pipe Controlle

methodexsampledescript
inserthh.insert([target], stream or array)指定したstreamの前に追加する
removehh.remove(target)指定したstreamを取り除く
pipehh.pipe([target], stream or array)指定したstreamの後ろに追加
unpipehh.unpipe(target)指定したstreamの後ろを切り離す
splithh.sprit(to,[from])指定したstreamから切り離す

Stream Module

methodexsampledescript
hopperhh.hopper([target])データを流し込み口を作る。指定があればtargetの前に追加する
flowMaterhh.flowMater([target])データ量(count,size)を計測する。終了時に他totalを返す
valvehh.valve([DPS],[target])DPS(data/sec)を調整する(0=stop, 0 > no limit)

Module for Dev

開発時に陥りがちな不具合を避ける。 開発時に終端までpipeを通してないことは多いので、とりあえず終端にモニターをつける

methodexsampledescript
garbagehh.garbage(target, [done])trash chunk data
garbageAllhh.garbageAll(target, [done])set garbage to all EndTerm

Information Methods

Get stream information

list

Return registered stream list

var hh = HandHole([s1,s2]);
var list = hh.list();
console.log(list);
// [
// 	{
// 		id:0,              // uniqueid
// 		name:"s1",         // name(from transform function name)
// 		obj:StreamObject,  // stream object
// 		next:[1]           // list pipes id
// 	},
// 	...
// ]

viewlist

Check stream list by looking

var hh = HandHole([s1,s2]);
var list = hh.viewlist();
console.log(list);
// [
// 	{
// 		id:0,       // unique id
// 		name:"s1",  // name(from transform function name)
// 		next:[1]    // list pipes id
// 	},
// 	{
// 		id:1,
// 		mame:"s2",
// 		next:[]     // have not next pipe
// 	}
// ]

term

Get stream by termination type.

var hh = HandHole([s1,s2]);

// alone
hh.Add(s3);

// loop
hh.Add([s4,s5]);
hh.pipe("s5", "s4")

var term = hh.term();
console.log(term);
// {
// 	start:[...],	// start term.     ex.s1
// 	end:[...],		// end term.       ex.s2
// 	alone:[...],	// alone streams.  ex.s3
// 	loop:[...],		// loop streams.   ex.[s4,s5]
// 	other:[...],	// others
// }

Controll Methods

Insert

targetの前にstreamを追加する

hh.insert([target], stream or array)

var t = [A,B,C];			// A-B-C 
var hh = HandHole(t);

// 指定なしなら先頭に追加
hh.insert(D);				// D-A-B-C

// 指定をすれば前のStreamとの間に追加
hh.insert("B",E)		// D-A-E-B-C

// readableの前には入れられない
var r = fs.createReadStream(filepath);
var hh = HandHole(r);
hh.insert(0, F);	// Error!!!

Pipe

targetの後ろにstreamを追加する

hh.pipe([target], stream or array)

var t = [A,B,C];			// A-B-C
var hh = HandHole(t);

// 指定なしなら最後尾に追加
hh.pipe(D);				// A-B-C-D

// 指定をすれば並列に接続
hh.pipe("B",E)    // A-B-C-D
                  //   |-E

// ループ構造は基本的には作れない
hh.pipe("C","A")  // Error! stream loop is very slow

// if want to use
// hh._loopflag = true // _loopflag = true
// hh.pipe("C","A") // OK

// writableの後には入れられない
var r = fs.createWriteStream(filepath);
var hh = HandHole(r);
hh.insert("D", F);	// Error!!!

Remove

targetを取り除く

hh.pipe([target])

var t = stream;			// A-B-C-D-Eという順番でつながっていると仮定
var hh = HandHole(t);

// 指定したstreamを取り除いて前後とつなげる
hh.remove("D");				// A-B-C-E

Unpipe

targetの下流を切り離す。

hh.unpipe(target)

var t = [A,B,C,D,E];     // A-B-C-D-E
var hh = HandHole(t);

// 指定したstreamを取り除いて前後とつなげる
hh.unpipe("C");       // A-B-C, D-Eができる

Split

split stream pipe by from-to

hh.unpipe(to, [from])

var t = [A,B,C,D,E];     // A-B-C-D-E
var hh = HandHole(t);

// split in front of target
hh.split("C");       // A-B, C-D-Eができる

// get between "to" and "from"
hh.split("B","D");	// A-E, B-C-D

Support Streams

Hopper

データを個別に導入するためのStream

var Handhole = require('handohole');

// 生成方法は二つ srreamobjを受け取るか
var hp = Handhole.hopper();

// 直接追加する
var hh = handhole(stream);
var hp = hh.hopper(1);	// insertと同じ動作を行う


// データはpushもしくはdataで追加できる。
hp.push("A");
hp.push("B");
hp.push("C");
hp.push("D");

hp.data(["A","B","C","D"]);	// data Methodなら配列で渡すこともできる

// close
hp.push(null);
hp.data(null);

Garbage / garbageAll()

閉じていないStreamを動作させるための終端Stream。
終了時にはcallbackを呼ぶ。
基本的にはgarbageAllで勝手にDuplexの終端を閉じるのでそちらを使う

var Handhole = require('handohole');

// 生成方法は二つ srreamobjを受け取るか
var hp = Handhole.hopper();

// 直接追加する
var hh = handhole(stream); // A-B-C-D

// Callbackを指定 A-B-C-D-[garbage]
hh.garbageAll(function(){
	// call by streamD.onfinish
	done();
});

hh.data(data);
hh.data(null);

FlowMater

Monitoring data flow call count and chunk size.

event

data

emiting on timer(default:500ms) return info latest dataflow.

total

emitting on finish. return info total dataflow.

handhole.flowMater([option])

// get Object from class
var fm = Handhole.flowMeter(option)

// or insert to instance
var hh = handhole(stream); 
var fm = hh.flowMater("B");	// insert

// option
option  = {
	timer:1000 // interval of emit flow event
}

// result
fm.on("flow", function (flow){
	console.log(flow.count); 	// count of call in interval
	console.log(flow.size); 	// size of total datasize in interval
})

Valve

valve is regurate data count per second.

Handhole.valve([option])

// get Object from class
var vl = Handhole.valve(option)

// or insert to instance
var hh = handhole(stream); 
var vl = hh.valve("B");	// insert

// option
option  = {
	valve:1000 // iDPS target
}

// change valve
vl.valve(50);

// result
vl.on("flow", function (flow){
	console.log(flow.count); 	// count of call in interval
	console.log(flow.size); 	// size of total datasize in interval
	console.log(flow.timer); 	// start datetime of controll span
	console.log(flow.wait); 	// buffering chunk count
	console.log(flow.valve); 	// now valve setting
})

Capture

Data output.

  • default : console.log(chunk)
  • file : output to file(string/buffer/JSON)
// get Object from class
var cp = Handhole.capture(option)

// or insert to instance
var hh = handhole(stream); 
var cp = hh.capture("B");	// insert

util func

Please read test/index.js

var getDatatype = HandHole.util.getDatatype;

Stacker

データブロックサイズ変更。 流れてきたデータを配列にためて出力する。

  • option.splitChar
    特定の文字列があったらstackした内容を書き出す。
    デフォルトは","
// get Object from class
var hp = handole.hopper();
var st = handhole.stacker({splitChar:","})
var hh = handhole([hp, st]);

for(var i=0; i< 50; i++){
	hp.push("D"+i);
}
// ["D0","D1"...]

hp.push(",")	// split

for(var i=0; i< 50; i++){
	hp.push("C"+i);
}
// ["D0","D1"...] , ["C0","C1"...] 

Conful

複数のパイプから一つの処理パイプに流すための合流pipe
すべてのpipeがcloseしたらconfulも閉じる。

var hp1 = HandHole.hopper();
var hp2 = HandHole.hopper();
var hp3 = HandHole.hopper();
var cnf = HandHole.conful([hp1, hp2]); // initial set pipe
cnf.conful(hp3);	// add pipe

turnstile

改札機 並列実行+timeout制御をする

var opt = {
	t: ff,	// function _transform
	max: 4,	// process max
	timeout: 800	// timeout[ms]
}
var tsl = HandHole.turnstile(opt);

tsl.on("timeout", function(chunk){
	// timeout chunk
})
0.0.7

9 years ago

0.0.6

9 years ago

0.0.5

9 years ago

0.0.4

9 years ago

0.0.3

9 years ago

0.0.2

9 years ago

0.0.1

9 years ago

0.0.0

9 years ago