1.0.6 • Published 8 months ago

exframe-stream v1.0.6

Weekly downloads
-
License
ISC
Repository
bitbucket
Last release
8 months ago

exframe-steam

A library for common stream patterns in the harmony / exframe environments

installation

npm install exframe-stream

usage

import { parallel } from 'exframe-stream';
import { WorkerPool } from 'exframe-worker-pool';

const pool = new WorkerPool();

await pipeline(
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  parallel(async (i) => {
    return i + 1;
  }, pool)
);

terms

termdescription
streamable A stream, iterator, async iterator, generator function, or async generator function
duplexable A duplex stream or async generator function

streams

Chain

Chain is an alternative to the experimental compose function that combines with some of the functionality of pipeline. This can be used to "chain" streams together into a Duplex but still maintain references to the head and the tail streams. Can be useful when the interface of the head and the tail are important. Further, chain is less aggressive about turning streams into Duplexes if not neccessary than compose.

examples

import { chain } from 'exframe-stream';

const items = await chain(
  [1, 2, 3, 4, 5],
  async function* (source) {
    yield* source;
  }
).toArray();

Constructor

function chain(stream: Streamable, ...streams?: Streamable[], options?: PipelineOptions & FinishedOptions) => Stream

The head of the stream may be readable or writable. All streams that follow another must be writable. The final stream may optionally be writable. If either the head or the tail end up getting returned, they will have the fields listed below added. Streamables that are not already streams will be duplexified.

Rules for What Stream is Returned

  • If there is just the one stream, return the one stream.
  • If the head is writable and the tail is readable, then return a duplex that exposes the head as the writable and the tail as the readable.
  • If the head is writable and the tail is not readable, then return the head
  • otherwise return the tail
fieldtypedescription
streamStreamableSee streamable
streamsStreamable[]Set of streamables that will be chained together
optionsPipelineOptions & FinishedOptionsSee PipelineOptions, See FinishedOptions

Fields

head: Stream

The first stream of the chain.

tail: Stream

The last stream of the chain.

finished: Promise<void>

An already wired up finished. See finished.

Channel

The channel is a fixed length buffer that provides ordered input and output as well as strictly blocked reads and writes when the buffer is empty or full respectively. The Channel is a full Duplex stream.

examples

import { channel } from 'exframe-stream';

const chan = channel({ max: 10 });

(async () => {
  for await (const item of chan) { // will block while there are no items
    console.log(item);
  }
})();

for (let i = 0; i < 20; ++i) {
  await channel.send(i); // will block if the buffer is full
}
import { pipeline } from 'stream/promises';

import channel from 'exframe-stream';

await pipeline(
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  channel({ max: 10 }),
  async function (source) {
    for await (const item of source) {
      console.log(item);
    }
  }
);

Constructor

function channel(options?: ChannelOptions & DuplexOptions) => ChannelStream & Duplex

fieldtypedescription
optionsChannelOptions & DuplexOptionsSee ChannelOptions, See DuplexOptions

type ChannelOptions

fieldtypedescription
maxintegerdefault = 10the maximum number of items in the channel's buffer
preRead<T, R>(item: T) => Promise<R>called before pushing to the read buffer
preWrite<T, R>(item: T, encoding) => Promise<R>Called before storing in the item queue

Fields

items: T[]

The channel's queue. Not recommended to interact with this array.

async send(item: T) => Promise<void>

Enqueues the given item, will block if the channel is full.

Demultiplex

Demultiplexes some source iterable or stream to 1 or more writables. This stream will currently terminate the stream at it's level. However, using compose or some other technique, each writable could chain to a number of other streams or iterators. The demultiplex stream can handle both binary and object mode. Any writable that is also a readable will be exposed as readableStreams on the DemultiplexStream. The MultiplexStream will be able to recombine them into a single stream if necessary. If any target becomes unable to take anymore data, then all targets will be blocked for additional data until the blocking target can resume.

examples

import { pipeline } from 'stream/promises';

import { demultiplex, pick } from 'exframe-stream';

await pipeline(
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  demultiplex(
    async function* (source) {
      for await (const item of source) {
        console.log(`log: ${item}`);
      }
    },
    compose(
      pick(item => item % 2 === 0),
      async function* (source) {
        for await (const item of source) {
          console.log(`even: ${item}`);
        }
      }
    ),
    { objectMode: true }
  )
);

Constructor

function demultiplex(...writables: (Writable|Function)[], options?: WritableOptions) => DemultiplexStream

Createes a DemultiplexStream which is a Writable taking any number of target Writables or async generator functions to process items.

fieldtypedescription
writables(Writable|Function)[]Set of Writable streams or async generator functions
optionsWritableOptionsSee WritableOptions

Fields

readableStreams: Readable[]

The set target writables that are also readable.

FeedbackLoop

A mechanism that allows items to be pushed back to the start. Will continue until both the source and the repeated work have ended.

examples

import { pipeline } from 'stream/promises';

import { feedbackLoop, FeedbackState } from 'exframe-stream';

await pipeline(
  [
    { tasks: [add(1), multiply(4), subtract(3), divide(2)], i: 0, value: 0 },
    { tasks: [add(2), multiply(4), divide(2)], i: 0, value: 0 },
    { tasks: [add(3), multiply(4)], i: 0, value: 0 }
  ],
  feedbackLoop(
    async function* (source) {
      for await (const context of source) {
        context.value = context.tasks[context.i++](context.value);
        yield context;
      }
    },
    async function (context) {
      return context.i < context.tasks.length
        ? FeedbackState.Repeat
        : FeedbackState.Complete;
    },
    chain(async function* (source) {
      for await (const context of source) {
        context.repeated = true;
        yield context;
      }
    })
  ),
  async function (source) {
    for await (const context of source) {
      console.log(context);
    }
  }
)

Constructor

function feedbackLoop(operationStream: Duplexable, feedback: async <T>(item: T) => Promise<FeedbackState>, feedbackStream?: Duplexable, options?: FeedbackLoopOptions) => FeedbackLoopStream

Createes a FeedbackLoopStream which is a Duplex taking an operation stream to process the items, a feedback filter to determine whether items should repeat and an optional feedbackStream that can process over items that are starting again.

fieldtypedescription
operationStreamDuplexableThe stream to process over all items with. See duplexable
feedbackasync <T>(item: T) => Promise<FeedbackState>The filter to determine whether an item should repeat, complete, or be discarded
feedbackStreamDuplexableStream to process over items that are being repeated
optionsReadableOptionsSee ReadableOptions

Multiplex

Multiplexes a set of source iterables or streams into a single readable. The stream will end when all sources are ended.

examples

import { pipeline } from 'stream/promises';

import { multiplex } from 'exframe-stream';

await pipeline(
  multiplex(
    async function* () {
      for (let i = 0; i < 10; ++i) {
        yield i
      }
    },
    [2, 4, 6, 8, 10],
    Readable.from([2]),
    { objectMode: true }
  ),
  async function (source) {
    for (const item of source) {
      console.log(item);
    }
  }
);

function multiplex(...readables: (Readable|Function)[], options?: ReadableOptions) => MultiplexStream

Createes a MultiplexStream which is a Readable taking any number of target Readabless or async generator functions to process items.

fieldtypedescription
readables(Readable|Function)[]Set of Readable streams or async generator functions
optionsReadableOptionsSee ReadableOptions

Parallel

Executes some operation over the incoming items and outputs the results in the correct order. Uses exframe-worker-pool to govern the concurrent execution. The parallel stream can only operate in object mode.

function parallel(operation: async <T, R>(T) => Promise<R>, pool: WorkerPool, options?: DuplexOptions): ParallelStream

Creates a ParallelStream which is a ChannelStream or a Duplex. Each incoming item is operated on by the given operation and each result will be outputted in the incoming order. If operation returns undefined, the stream will behave essentially like a Writable rather than a Duplex.

examples

import { pipeline } from 'stream/promises';

import { parallel } from 'exframe-stream';
import { WorkerPool } from 'exframe-worker-pool';

const pool = new WorkerPool();

await pipeline(
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  parallel(async (item) => {
    const result = await someRequest(item);

    return result;
  }, pool)
);
fieldtypedescription
operationasync <T, R>(T) => Promise<R>Mapping function to that returns some output for every item
pool?WorkerPoolThe worker pool to use to govern the amount of concurrency that the parallel stream can use. If not set, then a pool with a max of 1 and overflow of 0 will be created.
options?DuplexOptionsSee DuplexOptions

Pick

Special case of parallel that filters the stream for items matching the given predicate. Like parallel is only available for object mode.

examples

import { pipeline } from 'stream/promises';

import { pick } from 'exframe-stream';

await pipeline(
  [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  pick(x => x % 2 === 0),
  async function* (source) {
    for await (const item of source) {
      console.log(item);
    }
  }
);

function pick(predicate: async <T>(T) => Promise<boolean>, options?: PickOptions&DuplexOptions): ParallelStream

Creates a ParallelStream that will filter the stream to some subset.

fieldtypedescription
operationasync <T, R>(T) => Promise<R>Mapping function to that returns some output for every item
options?PickOptions&DuplexOptionsSee DuplexOptions

type PickOptions

fieldtypedescription
pool?WorkerPoolThe worker pool for the parallel stream.
1.0.6

8 months ago

1.0.5

9 months ago

1.0.4

9 months ago

1.0.2

9 months ago

1.0.3

9 months ago

1.0.1

1 year ago

1.0.0

1 year ago