4.1.0 • Published 2 years ago

observable-reader v4.1.0

Weekly downloads
-
License
-
Repository
github
Last release
2 years ago

Description

There's a lot of cases when you have an observable of Observable<T[]>/Observable<string>, and you need to read those chunks asynchronously as a single array/string with ability to have an intermediate buffer to prevent any kind of data leaks and be able to push data synchronously. This library allows you to do it.

Examples

xxxUntil methods

const _ = (async () => {
    const observable = new Observable(observer => {
        observer.next([1, 2, 3])
        observer.next([4, 5, 6])
        observer.next([7, 8, 9])
        observer.next([0])
        observer.complete()
    }).pipe(
        delay(1000),
        tap(console.log))
    const reader = new ObservableReader(observable)
    // when finds the sequence [4, 5], quits skipping // the second arg (4) is a limit
    // if sequence is empty, skips until the data ends
    console.log(await reader.skipUntil([4, 5], 4))
    const target = new Array(10)
    // reads until the sequence [7, 8]
    console.log(await reader.readUntil(target, [7, 8]))
    console.log(target)
    // takes data (returns an array of taken data) until the data ends
    console.log(await reader.take()) // await reader.takeUntil([]) does the same
})()

// Output:
// [ 1, 2, 3 ]
// [ 4, 5, 6 ]
// 4 // number of skipped items after .skipUntil
// [ 7, 8, 9 ]
// 4 // number of readed items after .readUntil
// [ 5, 6, 7, 8, <6 empty items> ] // the result of reading
// [ 0 ]
// [ 9, 0 ] // taken items after .takeUntil

IoBuffer

const buffer = new IoBuffer()
// actually, IoBuffer is a linked list of arrays and their boundaries
// IoBuffer never change consumed arrays
buffer
    .append(['one', 'two', 'three'])
    .append([true, false])
    .append([1, 2, 3], 1, 2) // setting limits
const target = new Array(10)
target.length = buffer.read(target, 0, 2)
console.log(target) // [ 'one', 'two' ]
buffer.skip(2)
console.log(buffer.take()) // [ false, 2 ]

Reader attaching/detaching

// creating detached reader
const reader = new ObservableReader()
reader.take(5).then(console.log)

const firstObservable = of([1], [2, 3])
const secondObservable = of(['one', 'two'], ['three'])

reader.attach(firstObservable)
// the reader detaches automatically
reader.attach(secondObservable)
console.log(reader.buffer.take())

// Output:
// [ 'three' ]
// [ 1, 2, 3, 'one', 'two' ]

ObserverWriter

const target = {
    next: chunk => console.log(chunk),
    error: error => console.log('error:', error),
    complete: () => console.log('complete')
}
// 5 is a flush treshold. When buffer length is greater than flush treshold, writer auto-flushes
new ObserverWriter(target, 5)
    .next([1, 2, 3])
    .flush()
    .next([4, 5, 6])
    .next([7, 8, 9])
    .error(123)
    // completes the writer whose class implements Observer<ArrayLike<any>>, but not the target
    // after observer writer completes, the writer detaches, and almost every method of the writer gets stubbed
    .complete()

const original = new ObserverWriter(target)
const mapped = original
    // now the mapped writer accepts strings and pushes parsed integers to the original writer
    // flush treshold 0 actually acts like auto-flush on every non-empty input
    // ObserverWriter<T> uses only ArrayLike<T> not ReadonlyArray<T>
    .mapBack(chunk => Array.from(chunk).map(item => parseInt(item)), 0)
    .next(['1', '2', '3'])
// we flushed the mapped writer, but we also need to flush the original writer,
// because the mapped writer flushed the data into the original buffer
original.flush()

// completes both the writer and the target
new ObserverWriter(target).complete(true)
4.1.0

2 years ago

4.0.0

2 years ago

3.2.0

2 years ago

3.1.0

2 years ago

3.0.0

2 years ago

2.0.0

2 years ago

1.0.1

2 years ago

1.0.0

2 years ago