0.5.2 • Published 9 years ago
flowpipe v0.5.2
flowpipe is installable via:
- npm:
npm install flowpipe --save
Promise Based Flowpipe: upper v0.5.0
- v0.5.0 is based on
PromiseObject, so not support previous version. - for using older version,
let flowpipe = require('flowpipe').older
Quick Start
'use strict';
const Flowpipe = require('flowpipe');
let myWork = (args)=> new Promise((resolve)=>{
args.data2.push(Math.random());
setTimeout(resolve, 5);
});
let flowpipe = Flowpipe.instance('MyWork');
flowpipe
// set arguments
.init((args)=> args.index = 0)
.init((args)=> args.data1 = [])
.init((args)=> args.data2 = [])
.init((args)=> args.data3 = [])
// set work, name as work-1
.then('work-1', (args)=> new Promise((resolve)=> {
args.data1.push(Math.random());
setTimeout(resolve, 5);
}))
// set work, set name as myWork
.then(myWork)
// set work, set auto created name
.then((args)=> new Promise((resolve)=> {
args.data3.push(Math.random());
setTimeout(resolve, 5);
}))
// loop back to work-1 if args.index < 100 and increase args.index
.loop('work-1', (args)=> ++args.index < 100)
// print something
.log((args)=> `data1 data2 data3 / ${args.data1.length} ${args.data2.length} ${args.data3.length}`)
// set parallel max thread
.maxThread(30)
// set parallel
.parallel((args)=> args.data1, (args, data, idx)=> new Promise((resolve)=> {
setTimeout(()=> {
resolve();
}, data * 1000);
}))
// print timestamp
.timestamp((total, premodule)=> `total: ${total}ms, parallel ${premodule}ms`)
// run
.run();Document
- Flowpipe.instance(name)
returnFlowpipe instance- such as
let myjob = require('flowpipe').instance('myJob')
- instance.init(setterFn)
- set instance's local variables
setterFn(args): must sync function, not async.- such as
instance.init((args)=> args.data = [])
- such as
- instance.log(printFn)
- print log
printFn(args): must have return string
- instance.timestamp(printFn)
- print timestamp
printFn(total, preModule): must have return string
- instance.then(name, work)
- add some work
- also,
then, add, pipe name(optional): work name for loop backwork: work function,
- instance.loop(workname, condition)
- also,
loop, for, loopback workname: target to loopbackcondition: must have return boolean
- also,
- instance.maxThread(number)
- set parallel max thread
- instance.parallel(which, work)
which: must have return arraywork: work function (args, data, idx)
Older Flowpipe: v0.4.0
Quick Start: v0.4.0

var flowpipe = require('flowpipe').older;
flowpipe
.init(function (next) {
// init before start
var page = 1;
next(null, page);
})
.pipe('start', function (next, page) {
// page is that previous next function's variable
setTimeout(function (err) {
// next is function that proceed next pipe, parallel or loopback
next(err, page);
}, 1000);
})
.pipe('list', function (next, page) {
console.log('start page: ' + page);
// preparing list for parallel
var list = [{no: 1}, {no: 2}, {no: 3}];
for (var i = 0; i < list.length; i++) {
list[i].page = page;
list[i].delay = 3 - i; // delay time in parallel process
}
// next before parallel, pass only second parameter to parallel process.
// second parameter must be array.
// other parameters pass to next pipe or loopback.
next(null, list, page);
})
.parallel('parallel process', function (next, data) {
// data is indicating each of list items.
setTimeout(function () {
data.title = 'title-' + data.no;
next(null, data);
}, data.delay * 1000);
})
.parallel('parallel process 2', function (next, data) {
// data is indicating each of list items.
setTimeout(function () {
data.title = 'title-' + data.no;
next(null, data);
}, data.delay * 1000);
})
.parallel('multi-thread', function (next, data) {
// this option {multiThread: true} makes your function async.
// in node.js's single thread, navie logic is processing by single thread.
// if you use this, all of logics are processing in multi thread.
// ***WARNING: use only local variables or parameters from previous pipe.
// do not use global, or functional objects from previous pipe.
for(var i=0;i<100000000;i++) ;
next(null, data);
}, {multiThread: true})
.pipe('pass-1', function (next, parallel, page) {
console.log('pass-1');
next(null, parallel, page);
})
.pipe('pass-else', function (next, parallel, page) {
console.log('pass-else');
next(null, parallel, page);
})
.loopback('pipe-list', function (loop, next, instance, parallel, page) {
// loopback(process_type-process_name, fn)
// - process_type: pipe, parallel
// - process_name: must defined
// - fn(loop, next, instance, others...)
// - loop(err, variables...): passing variable to target process
// - next(err, variables...): proceeding if loop ended
// - instance: maintainable variable in loop, object type {}
// - others: passed from before process
if (!instance.data) instance.data = [];
for (var i = 0; i < parallel.length; i++)
instance.data.push(parallel[i]);
if (page < 5) loop(null, page + 1);
else next(null, instance.data);
})
.pipe('finalize', function (next, data) {
next(null, data);
})
.end(function (err, test) {
// end(callback) or end()
// this must be declared, if not all function don't working.
// proceed in the end or occured error in process
})
.graph('./basic-example-graph.html');Documents: v0.4.0
- flowpipe.init(work)
- initialize variables
- params
work: function(next)next: function(err, arg1, arg2 ...)- callback for next work. must be execute this function in callback.
- flowpipe.pipe(name, work)
- params
name: current work's namework: function(next, arg1, arg2 ...)next: function(err, arg1, arg2 ...)args: previous work's results
- params
- flowpipe.parallel(name, work, opts)
- processing work in parallel.
- params
name: current work's namework: function(next, list_item, args ...)next: function(err, item)item: sync items to list
list_item: parameter in previous work's first variable. must be array in previous.args: previous work's results, not array
opts:- multiThread: default
false
- multiThread: default
- flowpipe.loopback(target, work)
- params
target: jump to,function-namework: function(loop, next, instance, args ...)loop: function(err, args ...), when loop continuenext: function(err, args ...), when loop endinstance: maintenance variable in loop
- params
- flowpipe.end(work)
- this function must be added in last.
- params
- work: function(err, args ...)
- flowpipe.graph(savePath)
- save graph
- must be proceed after end
0.5.2
9 years ago
0.5.1
9 years ago
0.5.0
9 years ago
0.4.0
10 years ago
0.3.6
10 years ago
0.3.5
10 years ago
0.3.4
10 years ago
0.3.3
10 years ago
0.3.2
10 years ago
0.3.1
10 years ago
0.3.0
10 years ago
0.2.2
10 years ago
0.2.1
10 years ago
0.2.0
10 years ago
0.1.3
10 years ago
0.1.2
10 years ago
0.1.1
10 years ago
0.1.0
10 years ago
0.0.3
10 years ago
0.0.2
10 years ago
0.0.1
10 years ago