ballvalve v5.0.1
ballvalve
This is a collection of helpers for creating and using the ES 2018 AsyncIterable type. It includes all the usual functional methods (map, reduce, collect, and so on), as well as adapters for building an AsyncIterable out of a push-based stream, and reading discrete bytes from an AsyncIterator<Uint8Array>.
For example, to transform a nodejs buffer stream into a series of 4-byte little-endian u32 values, and sum them:
import * as stream from "node:stream";
import { asyncIter, byteReader } from "ballvalve";
// chop a stream into 4-byte chunks
async function* into4(stream: AsyncIterable<Uint8Array>) {
const reader = byteReader(stream);
while (true) {
const chunk = await reader.read(4);
if (chunk === undefined) return;
if (chunk.length < 4) throw new Error("partial frame");
yield chunk;
}
}
// generate a sample stream of 8 bytes, encoding 0x4030201 and 0x5060708
const s = stream.Readable.from([
Buffer.from("0102030408", "hex"),
Buffer.from("0706", "hex"),
Buffer.from("05", "hex")
]);
// map each chunk into its u32 value, and sum them as we go.
const sum = await asyncIter(into4(s)).map(buffer => buffer.readUInt32LE(0)).reduce((sum, n) => sum + n, 0);
// 0x9090909As another example, you can transform a nodejs stream into lines of text, split at linefeeds:
async function* lines(stream: AsyncIterable<Uint8Array>) {
const reader = byteReader(stream);
while (true) {
const line = await reader.readUntil(10);
if (line === undefined) return;
yield line.toString("utf-8");
}
}
const s = stream.Readable.from([ "hell", "o\nsa", "ilor\nxyzzy\n" ].map(s => Buffer.from(s)));
for await (const line of asyncIter(lines(s))) {
console.log(line.trim());
}
// "hello", "sailor", "xyzzy"Building
npm install && npm testAPI
asyncIter<A>(iter: AsyncIterable<A> | AsyncIterator<A> | Iterable<A>): ExtendedAsyncIterable<A>
Ensure that an object is async-iterable, by wrapping it in an object with a [Symbol.asyncIterator] method if necessary, and then wrap it in a fancy object with functional methods:
map<B>(f: (item: A) => (B | Promise<B>)): ExtendedAsyncIterable<B>Transform items from one type to another, just like
mapon an array. The function may return a promise.flatMap<B>(f: (item: A) => AsyncIterable<B>): ExtendedAsyncIterable<B>Transform each item into an async-iterator of a possibly different type, and chain them together, just like
flatMapon an array.filter(f: (item: A) => boolean): ExtendedAsyncIterable<A>Keep only the items where
freturns true, just likefilteron an array.filterMap<B>(f: (item: A) => B | undefined): ExtendedAsyncIterable<B>Transform items from one type to another, discarding any new items that are
undefined.find(f: (item: A) => boolean): Promise<A | undefined>Return the first item where
freturns true, orundefinedif it never does. Stop reading from the iterator as soon as an item is found.some(f: (item: A) => boolean): Promise<boolean>Return true if
fever returns true for an item, just likesomeon an array. Stop reading from the iterator as soon as an item is found. Other languages call this functionany.every(f: (item: A) => boolean): Promise<boolean>Return true if
freturns true for every item until the iterator is exhausted, just likeeveryon an array. Stop reading from the iterable as soon as any item "fails" the test. Other languages call this functionall.reduce<B>(f: (sum: B, item: A) => B, start: B): Promise<B>Starting with the value
start, accumulate by callingfon each element with the running total so far. The return value fromfwill be the running total passed toffor the next item. The final return value fromfis the final result ofreduce. This behaves just likereduceon an array, except that the starting value is required.collect(): Promise<A[]>Use
reduceto collect all values into a single array. Return that array when the iterator is exhausted.chain(iter: AsyncIterable<A>): ExtendedAsyncIterable<A>Return a new async-iterable which contains each item from this iterable first, and then each element from
iter.iteris not even read until this iterator is exhausted.static chainAll<A>(iters: AsyncIterable<A>[]): ExtendedAsyncIterable<A>Return a new async-iterable which contains items from each iterable, exhausting each one before moving to the next.
zip<B>(iter: AsyncIterable<B>): ExtendedAsyncIterable<[ A, B ]>Return a new async-iterable which pairs each item from this iterable with an incoming item from
iter. Each item-pair is provided only when each of the iterators has provided its next item. When either of the iterators is exhausted, the new iterator ends too.merge<B>(...iter: AsyncIterable<B>[]): ExtendedAsyncIterable<A | B>Return a new async-iterable which contains items from this iterable and each of
iter, in the order that those items become available. The order of items may be unpredictable.static mergeAll<A>(iterables: AsyncIterable<A>[]): ExtendedAsyncIterable<A>Return a new async-iterable which contains items from each of
iterables, in the order that items become available. The order of the items may be unpredictable.enumerate(): ExtendedAsyncIterable<[ number, A ]>Return a new async-iterable which pairs each item from this iterable with its index, starting from 0.
splitWhen(f: (item: A) => boolean): [ ExtendedAsyncIterable<A>, ExtendedAsyncIterable<A> ]Return two new iterables:
- The first returns items as long as
freturns false. - The second contains the first items where
freturns true, and all items that follow it.
This splits the iterable into two consecutive iterables at an arbitrary point, doing the opposite of
chain.- The first returns items as long as
tee(count: number = 2): ExtendedAsyncIterable<A>[]Return
countnew iterables, each of which generates the same items, using this iterable as a source. If any of the new iterables is left un-read, it will queue items into memory as the other iterables are used, so you may want to make sure that all of the new iterables have an active reader.partition(f: (item: A) => boolean): [ ExtendedAsyncIterable<A>, ExtendedAsyncIterable<A> ]Split this iterable in two: the first provides items where
freturns true; the second provides items wherefreturns false. Liketee, if either of the returned iterables isn't actively processed, it will build up a queue of items, so make sure both iterables have an active reader.takeWhile(f: (item: A) => boolean): ExtendedAsyncIterable<A>Return a new iterable that provides items as long as
freturns true. Oncefreturns false for any item, that item is dropped and the new iterable ends.take(n: number): Promise<A[]>Return a new iterable that provides up to
nitems from this iterable, then ends.takeFor(msec: number): ExtendedAsyncIterable<A>Return a new iterable that provides items from this iterable until
msecmilliseconds have passed, at which point it will abruptly end.takeUntil(deadline: number): ExtendedAsyncIterable<A>Return a new iterable that provides items from this iterable until
Date.now()is greater than or equal todeadline, at which point it will abruptly end.dropWhile(f: (item: A) => boolean): ExtendedAsyncIterable<A>Return a new iterable that drops all items as long as
freturns true. The first provided item will be the first wherefreturned false. Afterwards, all items are passed along.drop(n: number): ExtendedAsyncIterable<A>Return a new iterable that drops the first
nitems, and provides any items after that.after(f: () => Promise<void>): ExtendedAsyncIterable<A>Return a new iterable that calls
fwhen it's exhausted, and waits forfto finish before ending.withPromiseAfter(): [ ExtendedAsyncIterable<A>, Promise<void> ]Return a new iterable and a promise. The promise is fulfilled when the iterator is exhausted.
toNodeStream(options: ReadableOptions = {}): ReadableCreate and return a nodejs
Readablestream that pulls from this iterable. This can be useful when dealing with older libraries.
byteReader(wrapped: AsyncIterable<Uint8Array>): ByteReader
Wrap an AsyncIterable<Uint8Array> to provide byte-level read operations.
read(size: number): Promise<Uint8Array | undefined>Read
sizebytes, waiting for data to arrive if necessary. If the stream ends before enough data is received, it will return as much as it can. If the stream has already ended, it will returnundefined.readUntil(byte: number): Promise<Uint8Array | undefined>Read and buffer data from this stream until a specific byte is seen. Once it's seen, all buffered data up to & including the requested byte is returned. If the byte is never seen before the stream ends, it returns
undefined.readUntilMatch(test: (buffer: Uint8Array) => number): Promise<Uint8Array | undefined>Read and buffer data from this stream until a
testreturns an index indicating a successful match. If no match is found before the stream ends, it returnsundefined.testis called with all currently buffered data, each time new data arrives. It should return -1 on an unsuccessful match. On success, it should return the index right after the end of the match (as if you were about to pass it toslice).streamUntilMatch(test: (buffer: Uint8Array) => number, sizeHint: number = 1): ExtendedAsyncIterable<Uint8Array>Generate a stream which passes through data from this stream until
testreturns an index indicating a successful match, or this stream ends. If a match was found, after the new stream ends, this (original) stream will resume at the start of the match. Otherwise, the original stream will end too.testis called on each buffer as it arrives from the underlying stream. It should return -1 on an unsuccessful match. on success, it should return the index of the start of the match.If
sizeHintis greater than 1, each subsequent call totestwill havesizeHint - 1bytes repeated from the end of the previous buffer, so that nosizeHint-length span of the stream will be broken up across calls totest. (This is used bystreamUntilBuffer.)streamUntilBuffer(match: Uint8Array): ExtendedAsyncIterable<Uint8Array>Generate a stream which passes through data from this stream until an exact match for
matchis found, or this stream ends. If a match was found, after the new stream ends, this (original) stream will resume at the start of the match. Otherwise, the original stream will end too.unread(buffer: Uint8Array)"unshift" a buffer back onto the head of the stream.
remainder(): Uint8Array | undefinedReturn all currently buffered data without waiting for more, and clear the buffers. If nothing is buffered, it returns
undefined.
PushAsyncIterator<A>(pullNext?: () => (A | undefined))
Adapt a streaming source into an AsyncIterable.
You may use this class in two different modes:
- pure push: Call
push(item)as each item arrives, and don't set apullNextcallback. Incoming items are queued for the recipient. - demand-based: Call
push()with no argument when you think data is ready, and usepullNextto fetch the next item (orundefinedif it was a false alarm and nothing is ready yet).
StreamAsyncIterator is a good example of a demand-based iterator. A pure push iterator has no flow control, but may sometimes be necessary when adapting sources that have no flow control, like an "rxjs" stream.
This class works by responding to next() with an unfulfilled promise. When you call push, if an item is available, it's posted into that promise. If not, it calls pullNext to see if an item is available that way.
When the stream is finished, call end() to signal to the recipient that the iterator is done. Any queued items (from push(item)) will be delivered first.
On error, call error(error) to cause all current and future next() calls to throw an exception.
push(item?: A)end()error(error: Error)
StreamAsyncIterator(stream: stream.Readable, public size?: number)
(Note: Recent nodejs releases implement streams as AsyncIterable, so this class is now deprecated, and provided only as an example of how to use StreamAsyncIterator.)
Wrap a nodejs Readable stream into an AsyncIterator. The iterable will place the stream into "pull" mode (paused) and emit Uint8Array objects on demand until it reaches the end of the stream, or the stream emits an error.
size is an optional parameter to pass to the stream's read() method, if you want to try to read chunks of a specific size.
License
Apache 2 (open-source) license, included in LICENSE.txt.
Authors
@robey - Robey Pointer robeypointer@gmail.com
2 years ago
2 years ago
2 years ago
2 years ago
3 years ago
3 years ago
3 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
5 years ago
6 years ago
7 years ago
7 years ago
7 years ago
7 years ago
7 years ago
8 years ago
8 years ago
8 years ago
8 years ago
8 years ago