0.3.0-beta2.5 • Published 2 years ago

@swnb/pipe v0.3.0-beta2.5

Weekly downloads
-
License
ISC
Repository
github
Last release
2 years ago

Pipe

Pipe can connect both io and calculate work together, see example blow

install

  yarn add @swnb/pipe

example

import { Pipe } from './pipe'

// create pipe 1 with capacity 10
const p1 = Pipe.new<number>(10)
// create pipe 2 with capacity 10
const p2 = Pipe.new<number>(10)

setInterval(() => {
  p1.write(1) // p1 receive number 1 every 10 ms
}, 10)

// reducer1 is an accumulator , reducer can be Promise
const reducer1 = (acc: number, cur: number) => acc + cur

// p1 pipe to p2 with reducer1
// this create stream emit every 10 ms
p1.reduce(reducer1, 0).pipeTo(p2)

;(async () => {
  // loop read result
  for (;;) {
    const result = await p2.read()
    console.log(result.value) // print 1,2,3,4,5,6,.....
  }
})()

p1 create number 1 every 10 ms , p1 through reducer (acc,cur)=> acc+cur ,first value zero) pipe to p2

p2 read value 1,2,3,4,5,..... every 10 ms,

blow flow graph show what happen

flowchart LR
  pipe1[[pipe 1]]
  pipe2[[pipe 2]]
input([1,1,1,1,...]) --> pipe1 -. acc = acc + cur .->pipe2 --> output([1,2,3,4,...])

let's see another example

import { Pipe } from './pipe'

const p1 = Pipe.new<number>(10)
const p2 = Pipe.new<number>(10)
const p3 = Pipe.new<string>(10)
const p4 = Pipe.new<string[]>(10)

// reducer1 is accumulator
const reducer1 = (acc: number, cur: number) => acc + cur

// delay delay duration time then resolve promise
const delay = async (duration: number) => {
  await new Promise(res => {
    setTimeout(res, duration)
  })
}

// reducer2 delay 1 second then join each value into string use ',' 
const reducer2 = async (acc: string, cur: number) => {
  await delay(1000)
  const result = acc === '' ? `${cur}` : `${acc},${cur}`
  return result
}
// mapper3 split each value into string[] by ','
const mapper3 = (_: any, cur: string) => cur.split(',')

p1.connect(reducer1, 0).pipeTo(p2)
p2.connect(reducer2, '').pipeTo(p3)
p3.connect(mapper3, []).pipeTo(p4)

setInterval(() => {
  p1.write(1)
}, 10)

setTimeout(() => {
  p1.close()
}, 10000)
;(async () => {
  for (;;) {
    const result = await p4.read()
    console.log(result.value) // print [ '1' ],[ '1', '2' ],[ '1', '2', '3' ],[ '1', '2', '3', '4' ],......
  }
})()

blow flow graph show what happen

flowchart TD
  input([1,1,1,1,...])
  pipe1[[pipe 1]]
  pipe2[[pipe 2]]
  pipe3[[pipe 3]]
  pipe4[[pipe 4]]
  pipe5[[pipe 5]]
  pipe6[[pipe 6]]
  input .-> pipe1 -. ac = acc + cur .-> pipe2
  pipe2 -. multiple .-> pipe3 & pipe4
  pipe3 -. block 1 second .->pipe5
  pipe4 -. some .-> pipe6
0.3.0-beta2.5

2 years ago

0.3.0-beta2.4

2 years ago

0.3.0-beta2.3

2 years ago

0.3.0-beta2.2

2 years ago

0.3.0-beta2.1

2 years ago

0.3.0-beta2

2 years ago

0.3.0-beta1

2 years ago

0.2.0

2 years ago

0.1.0

2 years ago