3.0.0 • Published 5 years ago

@jx/stream v3.0.0

Weekly downloads
1
License
MIT
Repository
-
Last release
5 years ago

Stream

Introduction

The JavaScript language now has async iterators and async generators, however not included is a way to create async iterators from existing data sources.

This library is designed to fill that void by providing a single concrete type with a lot of similarities to the Promise type. It also borrows ideas from the observable proposal and allows any observer to also be used as the first argument to Stream.

NOTE: Like the Observable proposal the goal of Stream is to have minimal API surface which is why Stream does't include any operators such as .map/.filter/etc, those should provided by some other library

Examples

const interval = new Stream(stream => {
    setInterval(stream.yield, 1000)
})

for await (const _ of interval) {
    console.log("Tick")
}
function mediaChunks(mediaRecorder, stopWhen) {
    return new Stream(stream => {
        mediaRecorder.ondataavailable = ({ data }) => stream.yield(data)
        stopWhen(stream.return)
    })
}

const userVoice = await navigator.mediaDevices.getUserMedia({ audio: true })
const stopWhen = callback => setTimeout(stopWhen, 10000)

const recorder = new MediaRecorder(userVoice)

for await (const chunk of mediaChunks(recorder, stopWhen)) {
    // Even if db.append is slow the rest of the chunks will still be queued
    await db.table(filename).append(chunk)
}

API

new Stream(initializer, { queue=new Queue() }={})

The Stream constructor requires a single paramater as it's first argument, the initializer will be called immediately a StreamController object, it may optionally return a single function that will be called when cleanup is started and the stream is complete.

Optionally as a second argument an options bag may be provided. At current the only available option is queue.

StreamOptions.queue

Optionally you can pass a custom queue object. It needs to conform to the following interface:

interface AbstractQueue<T> {
    isEmpty: boolean;
    enqueue: (item: T) => any;
    dequeue: () => T;
}

The only other invariant on these methods is that that if .isEmpty is true then until the microtask ends the next call to .dequeue must return an item. .dequeue will never be called if .isEmpty is false.

stream.next()

This is the implementation of AsyncIterator.next, if an item is in the queue then it'll return the first item from the queue. Otherwise we'll wait until the next item is available.

Note: Multiple calls to .next/.return are safe even without waiting for the previous one to resolve. The results will always be resolved in order.

stream.return(value?)

This is the implementation of AsyncIterator.return, if a value is passed and the queue is not yet complete then that value will be the value in the IteratorResult.

When calling this method cleanup will be scheduled to happen once all pending calls to .next are resolved.

If the cleanup callback returns a promise then this will not resolve until the returned promise is resolved.

Note: If the queue still has items they will simply be ignored.

StreamController

The stream controller object is how you put values into the iterator. You can send values, throw an error or end the iterable at any point.

StreamController.yield(value)/StreamController.next(value)

The .yield method (and its alias .next) put a value in the stream. If there are already calls to .next waiting it will be immediately sent to those, otherwise it will enqueue the values in the queue.

StreamController.throw/StreamController.error

The .throw method (and its alias .error) causes the stream to terminate with an error.

StreamController.return/StreamController.complete

The .return method (and its alias .complete) will cause the stream to become complete once the final items on the queue are consumed.

Default Queue

The default queue is a simple FIFO queue with infinite length.

State Overview

In order for the stream to clean up at an appropriate time every stream object has a current state. These are similar to Promise objects which have pending, fulfilled and rejected states.

Here's the full state diagram. It's just a teensy tiny bit more complicated than the Promise one:

a diagram showing all the states of the stream

Okay so it's a bit more complicated than the Promise one but it guarantees that all .next/.return calls will be appropriately fulfilled if they have been requested.

3.0.0

5 years ago

2.0.4

5 years ago

2.0.3

5 years ago

2.0.2

5 years ago

2.0.0

5 years ago

0.0.3

6 years ago

0.0.2

6 years ago

0.0.1

7 years ago