1.0.3 • Published 3 years ago

@xfx/utils v1.0.3

Weekly downloads
Last release
3 years ago

Flows simple library


Flow - flow consists of processes and connections

Process - process is an atomic operation containing in ports and out ports. Each process is based on Node

Node - defenition of behavior for processes. Node can have props, input ports, out ports

Connection - connections between ports of processes

Flow example

# main loop
# each process is defined as name_of_process(path/to/Node)
ticks(@xfx/time/Ticks) -> locker(@xfx/etc/Locks) -> tasks(@xfx/pg/ArrayQuery) -> dispatch(./Dummy)

# when task is started notify wait process about it
dispatch START -> wait(@xfx/etc/Wait)

# when task is done notify wait process about it
dispatch DONE -> DONE wait

# when wait process knows that all tasks are started, then it pushes message to ALLDONE port
wait ALLDONE -> NEXT tasks

# when done
tasks DONE -> UNLOCK locker

Each process can be initialized with args in flow definition, for example:

ticks(@xfx/time/Ticks, {"interval": 3000}) -> tasks(@xfx/pg/ArrayQuery, {"sql":"select * from some.table where p = $1", "values":[ 123 ]})

Or args can be defined outside

const data = {
    ticks: {
        interval: 5000
    tasks: {
        dsn: dsn1,
        sql: `
            from some.table
                p = $1
            limit $2
        values: message => [ 123, 12 ],
        shiftBy: 2,
        out: 'task'

And passed to flow builder

build(flow_def, data, (err, flow) => {


Node examples

Ticks node


  • interval


  • start
  • stop


  • out
const Ticks = {
    // props that must be initialized 
    props: {
        interval: { default: 1000 }
    // in ports named with underscore at start
    _start(message, done) {
        let interval = typeof this.props.interval === 'function' ? this.props.interval(message) : this.props.interval
        this.send('out', { t: Date.now() }, done)
        this.__timer = setInterval(_ => this.send('out', { t: Date.now() }), interval)
    _stop(message, done) {
        if (this.__timer) clearInterval(this.__timer)
    // out ports named with underscore at end
    out_: 1

Locks node

const Locks = {
    _in(message, done) {
        if (this.locked) {
            this.logger.info({ locked: this.locked }, this.name + '_in')
            this.send('pass', message, done)
        else {
            this.locked = true
            this.send('out', message, done)
    out_: 1,
    pass_: 1,
    _unlock(message, done) {
        this.logger.info(message, this.name + '_reset')
        this.locked = false

ArrayQuery node


  • dsn - connection string
  • sql - string or function generator for sql
  • values - array of function generator for values
  • shiftBy - count of parallel shifting in out port
  • out - datakey for each row pushed into message


  • in
  • next


  • out
  • errors
  • done
const ArrayQuery = {
    props: {
        dsn: { required: true },
        sql: [String,Function],
        values: [Array,Function],
        shiftBy: { type: Number, default: 1 },
        out: { default: 'item' }
    out_: 1,
    errors_: 1,
    done_: 1,
    // each node can have init function
    init(done) {
        this.__pool = getPool(this.props.dsn)
        done(null, this)
    _in(message, done) {
        let sql = typeof this.props.sql === 'function' ? this.props.sql(message) : this.props.sql
        let values = typeof this.props.values === 'function' ? this.props.values(message) : this.props.values

        this.logger.info({ dsn: this.props.dsn, sql, values }, this._name)

        this.__pool.query(sql, values, (err, res) => {
            if (err) {
                message.error = err
                this.send('errors', message, done)
            else {
                this.__items = res.rows
                this._next(message, done)
    _next(message, done) {
        let buf = this.__items.splice(0, this.props.shiftBy)
        if (!buf.length) {
            this.send('done', message)
        else {
            buf.forEach(item => {
                this.send('out', { ...message, [ this.props.out ]: item })

Experimental nodes definition

Define node as a function with some useful input arguments, for example:

  • env - environment of flow, can contain logger
const Locks = (args, env) => ({
    _in(message, done) {
        if (this.locked) {
            env.logger.info({ locked: this.locked }, this.name + '_in')
            this.send('pass', message, done)
        else {
            this.locked = true
            this.send('out', message, done)
    out_: 1,
    pass_: 1,
    _unlock(message, done) {
        env.logger.info(message, this.name + '_reset')
        this.locked = false

const Parallel = (args, env) => {

    let res = {
        _in(message, done) {
            // do parallel on N outs 

    for (let i = 0; i < args.parallel; i++) res[ 'out' + i + '_' ] = 1

    return res


# comment
process_name1(path/to/Node) OUT_PORT_NAME -> IN_PORT_NAME process_name2(path/to/Node, { "a": 1, "b": 2 })


process_name2 -> process_name3