billow v1.4.0
billow
A stream pipeline based message processing framework.
Install
npm install billowUsage
'use strict'
const { Billow, Flow, Droplet } = require('billow')
let billow = new Billow({ separator: '\r\n' })
let flowOne = new Flow({ events: ['error', 'dropletError'] })
let flowTwo = new Flow({ events: ['error', 'dropletError'] })
flowOne.on('error', console.error).on('dropletError', console.error)
flowTwo.on('error', console.error).on('dropletError', console.error)
flowOne.addDroplets([
new Droplet({
handler: async function (chunk, encoding) {
return await Promise.resolve(`${chunk.toString()}==`)
}
}),
new Droplet({
handler: function (chunk, encoding) {
console.log(chunk.toString())
}
})
])
flowTwo.addDroplets([
new Droplet({
handler: async function (chunk, encoding) {
return await Promise.resolve(`${chunk.toString()}~~`)
}
}),
new Droplet({
handler: function (chunk, encoding) {
console.log(chunk.toString())
}
})
])
billow.addFlow(flowOne).addFlow(flowTwo).write('billow!\r\nbillow!\r\n')
// billow!==
// billow!~~
// billow!==
// billow!~~How it works
Billow's workflow is based on Flow which is the combination of Droplets. Each Droplet is an implementation of Node.js transform stream, in which you can use async / await function to handle the asynchronous data processing logic with happiness. And Flow is the abstact pipeline of those Droplets, so the data transmission in Flow is very fast and heap memory saved. Because Node.js transform stream will buffer the data when there is no downstream, so all data in Flows will be piped to a writable stream which name is blackHole eventually to prevent the potential resident memory leak.
| => flow one => |
| ( droplet(1) => droplet(2) => ... ) |
| |
data => | => flow two => | => blackHole
| ( droplet(4) => droplet(3) => ... ) |
| |
| => other flows => |API
Class: Billow
new Billow({ separator, highWaterMark })
- separator
Number: The separator to separate the in coming buffer, if separator is set to benull, thenbillowwill pass the buffer directly to theflows, by default it is'\r\n'. - highWaterMark
Number: If provided, it will be passed to the insidestream.Writablecontructor.
return a new instance of Billow.
addFlow(flow)
- flow
Flow: An instance offlow.
Register a new flow to billow.
Class: Flow
new Flow({ events })
- events
Array<String>: The events you want to listen emitted from thedroplets of thisflow.
return a new instance of Flow.
addDroplets(droplets)
- droplets
Droplet | Array<Droplet>: Thedroplets you want to add to theflow.
Register new droplets to flow.
Class: Droplet
new Droplet({ handler , highWaterMark})
- handler
async function (chunk, encoding) {}: The handler to process thechunk, the value this function returns will be pass to the nextdropletin currentflow. - highWaterMark
Number: If provided, it will be passed to the insidestream.Transformcontructor.
return a new instance of Droplet.
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago
11 years ago