0.2.1 • Published 6 years ago
async-streamjs v0.2.1
async-streamjs
async version of streamjs(https://github.com/winterbe/streamjs)
Install
npm i async-streamjs
Lazy load demo
import AsyncStream from 'async-stream'
import _ from 'lodash'
test('lazy query', async () => {
let seq = []
const totalCount = 12
const pageSize = 5
async function fakeQueryDB(pageIdx) {
seq.push(`query page ${pageIdx}`)
let offset = pageIdx * pageSize
return Array.from({length: pageSize}, (v, i) => i + offset)
.filter(v => v < totalCount)
}
let done = false
await AsyncStream.range()
.map(async pageIdx => {
let arr = done ? [] : await fakeQueryDB(pageIdx)
done = arr.length < pageSize
return arr
})
.takeWhile(arr => 0 < arr.length)
.flatMap(_.identity)
.forEach(v => seq.push(`value ${v}`))
expect(seq).toEqual([
"query page 0",
"value 0",
"value 1",
"value 2",
"value 3",
"value 4",
"query page 1",
"value 5",
"value 6",
"value 7",
"value 8",
"value 9",
"query page 2",
"value 10",
"value 11",
])
});1Message queue demo
import AsyncStream from 'async-stream'
import _ from 'lodash'
test('fifo queue', async () => {
let targetArr = await AsyncStream.fromAsyncCallback(resolve => {
let srcArr0 = _.range(0, 5)
let srcArr1 = _.range(5, 10)
for (let i of srcArr0) {
resolve(i)
}
setTimeout(() => {
for (let i of srcArr1) {
resolve(i)
}
resolve(null)
}, 500)
setTimeout(() => {
resolve(null)
}, 1000)
}).toArray()
expect(targetArr).toEqual(_.range(0, 10))
});API
static range(start = 0, end = null): AsyncStreamstatic fromIterable(iterable): AsyncStreamstatic fromAsyncCallback(bufferedExecutor): AsyncStreamasync first(): anyasync rest(): AsyncStreamasync isEmpty(): boolasync forEach(asyncCallback): voidasync toArray(): Arrayasync reduce(asyncReducer, init = undefined): anyrestLazy(): AsyncStreamtake(n): AsyncStreamdrop(n): AsyncStreamtakeWhile(asyncPredicate): AsyncStreamdropWhile(asyncPredicate): AsyncStreamfilter(asyncPredicate): AsyncStreammap(asyncMapper): AsyncStreamchunk(size = 1): AsyncStreamconcat(anotherAsyncStream): AsyncStreamflatMap(asyncMapper): AsyncStream
asyncCallback: async val => void
asyncPredicate: async val => boolean
asyncMapper: async val => any
asyncReducer: async (accumulate, val) => nextAccumulate
bufferedExecutor: resolveOnce => void, resolveOnce can call multiple times, resolveOnce(null) means end of data
empty stream constant: import {EMPTY_STREAM} from 'async-stream'
LICENSE
MIT