1.0.3 • Published 3 years ago
@xfx/utils v1.0.3
Flows simple library
Concept
Flow
- flow consists of processes
and connections
Process
- process is an atomic operation containing in ports
and out ports
. Each process is based on Node
Node
- defenition of behavior for processes. Node can have props
, input ports
, out ports
Connection
- connections between ports of processes
Flow example
# main loop
# each process is defined as name_of_process(path/to/Node)
ticks(@xfx/time/Ticks) -> locker(@xfx/etc/Locks) -> tasks(@xfx/pg/ArrayQuery) -> dispatch(./Dummy)
# when task is started notify wait process about it
dispatch START -> wait(@xfx/etc/Wait)
# when task is done notify wait process about it
dispatch DONE -> DONE wait
# when wait process knows that all tasks are started, then it pushes message to ALLDONE port
wait ALLDONE -> NEXT tasks
# when done
tasks DONE -> UNLOCK locker
Each process can be initialized with args in flow definition, for example:
ticks(@xfx/time/Ticks, {"interval": 3000}) -> tasks(@xfx/pg/ArrayQuery, {"sql":"select * from some.table where p = $1", "values":[ 123 ]})
Or args can be defined outside
const data = {
ticks: {
interval: 5000
},
tasks: {
dsn: dsn1,
sql: `
select
*
from some.table
where
p = $1
limit $2
`,
values: message => [ 123, 12 ],
shiftBy: 2,
out: 'task'
}
}
And passed to flow builder
build(flow_def, data, (err, flow) => {
})
Node examples
Ticks node
props:
interval
inports:
start
stop
outports:
out
const Ticks = {
// props that must be initialized
props: {
interval: { default: 1000 }
},
// in ports named with underscore at start
_start(message, done) {
let interval = typeof this.props.interval === 'function' ? this.props.interval(message) : this.props.interval
this.send('out', { t: Date.now() }, done)
this.__timer = setInterval(_ => this.send('out', { t: Date.now() }), interval)
},
_stop(message, done) {
if (this.__timer) clearInterval(this.__timer)
done()
},
// out ports named with underscore at end
out_: 1
}
Locks node
const Locks = {
_in(message, done) {
if (this.locked) {
this.logger.info({ locked: this.locked }, this.name + '_in')
this.send('pass', message, done)
}
else {
this.locked = true
this.send('out', message, done)
}
},
out_: 1,
pass_: 1,
_unlock(message, done) {
this.logger.info(message, this.name + '_reset')
this.locked = false
done()
}
}
ArrayQuery node
props:
dsn
- connection stringsql
- string or function generator for sqlvalues
- array of function generator for valuesshiftBy
- count of parallel shifting in out portout
- datakey for each row pushed into message
inports:
in
next
outports:
out
errors
done
const ArrayQuery = {
props: {
dsn: { required: true },
sql: [String,Function],
values: [Array,Function],
shiftBy: { type: Number, default: 1 },
out: { default: 'item' }
},
out_: 1,
errors_: 1,
done_: 1,
// each node can have init function
init(done) {
this.__pool = getPool(this.props.dsn)
done(null, this)
},
_in(message, done) {
let sql = typeof this.props.sql === 'function' ? this.props.sql(message) : this.props.sql
let values = typeof this.props.values === 'function' ? this.props.values(message) : this.props.values
this.logger.info({ dsn: this.props.dsn, sql, values }, this._name)
this.__pool.query(sql, values, (err, res) => {
if (err) {
this.logger.error(err)
message.error = err
this.send('errors', message, done)
}
else {
this.__items = res.rows
this._next(message, done)
}
})
},
_next(message, done) {
let buf = this.__items.splice(0, this.props.shiftBy)
if (!buf.length) {
this.send('done', message)
}
else {
buf.forEach(item => {
this.send('out', { ...message, [ this.props.out ]: item })
})
}
done()
}
}
Experimental nodes definition
Define node as a function with some useful input arguments, for example:
env
- environment of flow, can containlogger
const Locks = (args, env) => ({
_in(message, done) {
if (this.locked) {
env.logger.info({ locked: this.locked }, this.name + '_in')
this.send('pass', message, done)
}
else {
this.locked = true
this.send('out', message, done)
}
},
out_: 1,
pass_: 1,
_unlock(message, done) {
env.logger.info(message, this.name + '_reset')
this.locked = false
done()
}
})
const Parallel = (args, env) => {
let res = {
_in(message, done) {
// do parallel on N outs
}
}
for (let i = 0; i < args.parallel; i++) res[ 'out' + i + '_' ] = 1
return res
}
Gramma
# comment
process_name1(path/to/Node) OUT_PORT_NAME -> IN_PORT_NAME process_name2(path/to/Node, { "a": 1, "b": 2 })
process_name3(path/to/Node)
process_name2 -> process_name3