3.0.0 • Published 7 years ago

@motorcycle/stream v3.0.0

Weekly downloads
1
License
MIT
Repository
github
Last release
7 years ago

@motorcycle/stream -- 2.1.0

Functional and reactive event streams for Motorcycle.ts

Get it

yarn add @motorcycle/stream
# or
npm install --save @motorcycle/stream

API Documentation

All functions are curried!

ap\<A, B>(fns: Stream\<(value: A) => B>, values: Stream\<A>): Stream\<B>

Applies a stream of functions to the latest from a stream of values.

const count$ = scan(x => x + 1, 0, skip(1, periodic(100)))

const fn$ = now(x => x * x)

const stream = ap(fn$, count$)

observe(console.log, stream) // 0 // 4 // 9 // ...

</details>

<details>
  <summary>See the code</summary>

```typescript

export { ap } from '@most/core'

at\<A>(time: Time, value: A): Stream\<A>

Creates a stream that emits a value after a given amount of time.

</details>

<details>
  <summary>See the code</summary>

```typescript

export { at } from '@most/core'

at\<A>(time: number, value: A): Stream\<A>

Create a stream containing a single event at a specific time.

observe(console.log, at(1000, 'Hello')) // After 1 second // logs 'Hello'

</details>

<details>
  <summary>See the code</summary>

```typescript

export { at } from '@most/core'

awaitPromises\<A>(stream: Stream\<Promise\<A>>): Stream\<A>

Turn a stream of promises into a stream containing the promises' values. Note that order is always preserved, regardless of promise fulfillment order.

// ----1-------> const a = new Promise(resolve => setTimeout(resolve, 100, 1)) // ---------2--> const b = new Promise(resolve => setTimeout(resolve, 200, 2)) // --3---------> const c = new Promise(resolve => setTimeout(resolve, 50, 3))

// bc---a-------> const source = mergeArray( at(100, a), now(b), now(c) )

// -----1----23-> const stream = awaitPromises(source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { awaitPromises } from '@most/core'

chain\<A, B>(f: (value: A) => Stream\<B>, stream: Stream\<A>): Stream\<B>

Creates a new stream by applying a stream-returning function to every event value and merging them into the resulting stream.

const stream = chain(x => now(x * 2), now(1000))

observe(console.log, stream) // 2000

</details>

<details>
  <summary>See the code</summary>

```typescript

export { chain } from '@most/core'

combine\<A, B, C>(f: (a: A, b: B) => C, a$: Stream\<A>, b$: Stream\<B>): Stream\<C>

Apply a function to the most recent event from each stream when a new event arrives on any stream.

const a$ = merge(at(100, 100), at(200, 200)) const b$ = merge(at(200, 3000), at(250, 100))

const stream = combine(add, a$, b$)

observe(console.log, stream) // 3200 -- at time 200 as a result of add(200, 3000) // 350 -- at time 250 as a result of add(200, 100)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { combine } from '@most/core'

combineArray\<A, B, C>(f: (a: A, b: B) => C, streams: Stream\<A>, Stream\<B> ): Stream\<C>

Applies a function to the most recent event from all streams when a new event arrives on any stream.

const a$ = now(1000) const b$ = now(2000) const c$ = merge(at(100, 1), at(200, 2))

const sum = (x, y, z) => x + y + z

const stream = combineArray(sum, a$, b$, c$ )

observe(console.log, stream) // 3001 -- at time 100 as result of sum(1000, 2000, 1) // 30002 -- at time 200 as result of sum(1000, 2000, 2)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { combineArray } from '@most/core'

combineObj\<Obj extends object>(obj: { K in keyof Obj: Stream\<ObjK> }): Stream\<Obj>

Takes an object of streams and returns a Stream of an object.

const obj = { a: now(1), b: now(2), c: now(3) }

const stream: Stream<{ a: number, b: number, c: number }> = combineObj(obj)

</details>

<details>
  <summary>See the code</summary>

```typescript

export function combineObj<Obj extends object>(
  object: { readonly [K in keyof Obj]: Stream<Obj[K]> }
): Stream<Obj> {
  const objectKeys = keys(object)
  const sources = values(object) as Array<Stream<Obj[keyof Obj]>>

  return combineArray((...values: Array<Obj[keyof Obj]>) => {
    const valuesMap = {} as Obj

    for (let i = 0; i < length(values); ++i) valuesMap[objectKeys[i]] = values[i]

    return valuesMap
  }, sources)
}

concatMap\<A, B>(f: (value: A) => Stream\<B>, stream: Stream\<A>): Stream\<B>

Creates a new stream by lazily applying a stream-returning function to each event value of a stream concatenating that stream's values to the resulting stream.

const source = // --104--101--108--108--111|

const f = (x: number) => now(String.fromCharCode(x))

const stream = concatMap(f, source)

observe(console.log, stream) // h // e // l // l // o

</details>

<details>
  <summary>See the code</summary>

```typescript

export { concatMap } from '@most/core'

constant\<A>(value: A, stream: Stream\<any>): Stream\<A>

Replace each event on a stream with a given value.

const stream = constant(100, periodic(1000))

observe(console.log, stream) // every 1 second logs 100

</details>

<details>
  <summary>See the code</summary>

```typescript

export { constant } from '@most/core'

continueWith(f: () => Stream\<A>, stream: Stream\<A>): Stream\<A>

Replace the end signal with a new stream returned by f. Note that f must return a stream.

// ----1------> const a = at(100, 1) // ----2------> const b = at(100, 2)

// ----1----2-> const stream = continueWith(() => b, a)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { continueWith } from '@most/core'

debounce\<A>(ms: number, stream: Stream\<A>): Stream\<A>

Wait for a burst of events to subside and keep only the last event in the burst.

const source = // abcd----abcd---> // -----d-------d-> const stream = debounce(2, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { debounce } from '@most/core'

delay\<A>(ms: number, stream: Stream\<A>): Stream\<A>

Timeshift a stream by a number of milliseconds.

const source = -1--2--3--4--5----> // ----1--2--3--4--5-> const stream = delay(3, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { delay } from '@most/core'

drain\<A>(stream: Stream\<A>): Promise\<void>

Activates a stream using an default scheduler instance from @most/scheduler, returning a promise of completion.

drain(stream) .then(() => console.log('complete'))

</details>

<details>
  <summary>See the code</summary>

```typescript

export const drain = <A>(stream: Stream<A>): Promise<void> => runEffects(stream, scheduler)

during\<A>(signal: Stream\<Stream\<any>>, stream: Stream\<A>): Stream\<A>

Keep events that occur during a time window defined by a higher-order stream.

const source = // -1-2-3-4-5-6-7-8-> const signal = // ------s----------> const s = // --------x--> // -------4-5-6-7| const stream = during(signal, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { during } from '@most/core'

empty\<A>(): Stream\<A>

Create a stream containing no events, which ends immediately.

const stream = empty()

drain(stream) .then(() => console.log('complete'))

</details>

<details>
  <summary>See the code</summary>

```typescript

export { empty } from '@most/core'

filter\<A>(predicate: (value: A) => boolean, stream: Stream\<A>): Stream\<A>

Retain only events for which a predicate is truthy.

const source = // ---true---false---true---|

// resulting stream only contains truthy values const stream = filter(Boolean, source)

observe(console.log, stream) // true // true

</details>

<details>
  <summary>See the code</summary>

```typescript

export { filter } from '@most/core'

fromPromise\<A>(promise: Promise\<A>): Stream\<A>

Create a stream containing a promise's value.

const a = fromPromise(Promise.resolve(1)) const b = fromPromise(Promise.reject(new Error('failure')))

observe(console.log, a) .then(() => console.log('done')) .catch(err => console.error(err.message)) // 1 // done

observe(console.log, b) .then(() => console.log('done')) .catch(err => console.error(err.message)) // 'failure'

</details>

<details>
  <summary>See the code</summary>

```typescript

export { fromPromise } from '@most/core'

hold\<A>(stream: Stream\<A>): Stream\<A>

Deliver the most recently seen event to each new observer the instant it begins observing. A held stream is always multicast.

Given an input stream:

stream:    -a---b---c---d-\>

observers which begin observing at different times will see:

observer1: -a---b---c---d-\>
observer2:    a-b---c---d-\>
observer3:           c--d-\>

const doc = createDocumentDomSource(now(document))

// start holding on first subscription const click$ = hold(map(e => ({ x: e.clientX, y: e.clientY }), events('click', doc)))

// hold the latest event even before the first subscription drain(click$)

</details>

<details>
  <summary>See the code</summary>

```typescript

export function hold<A>(stream: Stream<A>): Stream<A> {
  return new Hold<A>(stream)
}

class Hold<A> extends MulticastSource<A> implements Stream<A> {
  private has: boolean
  private value: A
  private scheduler: Scheduler

  constructor(stream: Stream<A>) {
    super(stream)
  }

  public run(sink: Sink<A>, scheduler: Scheduler) {
    this.scheduler = scheduler

    return super.run(sink, scheduler)
  }

  public add(sink: Sink<A>) {
    if (this.has) sink.event(this.scheduler.currentTime(), this.value)

    return super.add(sink)
  }

  public event(time: Time, value: A) {
    this.has = true
    this.value = value

    return super.event(time, value)
  }
}

last\<A>(stream: Stream\<A>): Stream\<A>

Returns a stream that will only emit it's last value right before ending. If the stream does not end, then no events will ever occur. If the stream ends before emitting a value, no value will emit.

</details>

<details>
  <summary>See the code</summary>

```typescript

export function last<A>(stream: Stream<A>): Stream<A> {
  return new Last(stream)
}

class Last<A> implements Stream<A> {
  constructor(private source: Stream<A>) {}

  public run(sink: Sink<A>, scheduler: Scheduler): Disposable {
    return this.source.run(new LastSink(sink), scheduler)
  }
}

class LastSink<A> implements Sink<A> {
  private has: boolean = false
  private value: A

  constructor(private sink: Sink<A>) {}

  public event(_: Time, value: A) {
    this.has = true
    this.value = value
  }

  public error(time: Time, error: Error) {
    this.has = false
    this.sink.error(time, error)
  }

  public end(time: Time) {
    if (this.has) this.sink.event(time, this.value)

    this.has = false

    this.sink.end(time)
  }
}

loop\<A, B, C>(f: (accumulator: B, value: A) => { seed: B, value: C }, initial: B, stream: Stream\<A>): Stream\<A>

Accumulate results using a feedback loop that emits one value and feeds back another to be used in the next iteration.

It allows you to maintain and update a "state" while emitting a different value. In contrast, scan feeds back and produces the same value.

function pairwiseInterval (acc: number): { seed: number, value: number, number } { const seed = acc + 1 const value = acc, seed

return { seed, value } }

const stream = loop(pairwiseInterval, periodic(100))

observe(console.log, stream) // 0, 1 // 1, 2 // 2, 3 // ....

</details>

<details>
  <summary>See the code</summary>

```typescript

export { loop } from '@most/core'

map\<A, B>(f: (value: A) => B, stream: Stream\<A>): Stream\<B>

Apply a function to each event value of a stream, returning a new stream containing the returned values.

const stream = map(x => x + 1, now(100))

observe(console.log, stream) // 101

</details>

<details>
  <summary>See the code</summary>

```typescript

export { map } from '@most/core'

mapList\<A, B>(f: (value: A, index: number) => B, sinksList$: Stream\<ArrayLike\<A>>): Stream\<ReadonlyArray\<B>>

Applies a function to all Sinks in a list of Sinks.

function Component(sources) { const { listOfData$ } = sources

const sinksList$: Stream<ReadonlyArray> = mapList( data => ChildComponent({ ...sources, data$: now(data) })), listOfData$, )

const childViews$: Stream<ReadonlyArray<Stream> = mapList(({ view$ }) => view$, sinksList$)

... }

</details>

<details>
  <summary>See the code</summary>

```typescript

export const mapList: MapList = curry2(__mapList)

export type MapList = {
  <A, B>(f: (value: A, index: number) => B, list$: Stream<ArrayLike<A>>): Stream<ReadonlyArray<B>>
  <A, B>(f: (value: A, index: number) => B): (
    list$: Stream<ArrayLike<A>>
  ) => Stream<ReadonlyArray<B>>
}

function __mapList<A, B>(
  f: (value: A) => B,
  list$: Stream<ArrayLike<A>>
): Stream<ReadonlyArray<B>> {
  return map<ArrayLike<A>, ReadonlyArray<B>>(mapArray(f), list$)
}

merge\<A>(a: Stream\<A>, b: Stream\<A>): Stream\<A>

Creates a new Stream containing events from both streams.

const stream = merge(at(1000, 'World'), at(100, 'Hello'))

observe(console.log, stream) // Hello -- at time 100 // World -- at time 1000

</details>

<details>
  <summary>See the code</summary>

```typescript

export { merge } from '@most/core'

mergeArray\<A>(stream: Array\<Stream\<A>>): Stream\<A>

Creates a new stream containing all events of underlying streams.

const stream = mergeArray( at(100, 'foo'), at(300, 'baz') at(200, 'bar'), )

observe(console.log, stream) // foo -- at time 100 // bar -- at time 200 // baz -- at time 300

</details>

<details>
  <summary>See the code</summary>

```typescript

export { mergeArray } from '@most/core'

multicast\<A>(stream: Stream\<A>): Stream\<A>

Returns a stream equivalent to the original, but which can be shared more efficiently among multiple consumers.

// --1--2--3--4--5--6--7--8--> const source = // ...

// --1--2--3--4--5--6--7--8--> observe(console.log, source)

setTimeout(() => { // --------------1--2--3--4--5--6--7--8--> observe(console.log, source) }, 5)

const stream = multicast(source)

// --1--2--3--4--5--6--7--8--> observe(console.log, stream)

setTimeout(() => { // --------------5--6--7--8--> observe(console.log, stream) }, 5)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { multicast } from '@most/core'

never\<A>(): Stream\<A>

Create a stream containing no events, which never ends.

const stream = never()

drain(stream) // Returns a promise that never fulfills.

</details>

<details>
  <summary>See the code</summary>

```typescript

export { never } from '@most/core'

now\<A>(value: A): Stream\<A>

Create a stream containing a single event at time 0

const stream = now(1)

observe(console.log, stream) // 1

</details>

<details>
  <summary>See the code</summary>

```typescript

export { now } from '@most/core'

observe\<A>(f: (value: A) => any, stream: Stream\<A>): Promise\<void>

Activates a stream, calling a function f with each event value, and returns a Promise of completion.

</details>

<details>
  <summary>See the code</summary>

```typescript

export const observe: Observe = curry2(<A>(f: (value: A) => any, stream: Stream<A>): Promise<
  void
> => drain(tap(f, stream)))

export interface Observe {
  <A>(f: (value: A) => any, stream: Stream<A>): Promise<void>
  <A>(f: (value: A) => any): (stream: Stream<A>) => Promise<void>
}

periodic(ms: number): Stream\<void>

Creates a stream that emits ever time 0 and every n milliseconds after.

// void----void----void----void----> const stream = periodic(5)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { periodic } from '@most/core'

recoverWith\<A>((err: Error) => Stream\<A>, stream: Stream\<A>): Stream\<A>

Recover from a stream failure by calling a function to create a new stream.

// -1-2-3X-------> const a = // ... // -4-5-6--------> const b = // ...

// -1-2-3-4-5-6--> const stream = recoverWith(() => b, a)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { recoverWith } from '@most/core'

runEffects\<A>(stream: Stream\<A>, scheduler: Scheduler): Promise\<void>

Activate an event stream, and consume all its events.

const logStream = tap(console.log, stream)

runEffects(logStream, newDefaultScheduler()) .then(() => console.log('complete')) .catch(err => console.error(err))

</details>

<details>
  <summary>See the code</summary>

```typescript

export { runEffects } from '@most/core'

sample\<A, B, C>(f: (a: A, b: B) => C, sampler: Stream\<A>, stream: Stream\<B>): Stream\<C>

For each event in a sampler stream, apply a function to combine it with the most recent event in another stream. The resulting stream will contain the same number of events as the sampler stream.

s1: -1-----2-----3-> sampler: -1--2--3--4--5-> sample(sum, sampler, s1): -2--3--5--6--8->

</details>

<details>
  <summary>See the code</summary>

```typescript

export { sample } from '@most/core'

sampleWith\<A>(sampler: Stream\<any>, stream: Stream\<A>): Stream\<A>

Given each event occurrence from a sampler stream takes the latest value from the given stream.

function submit(dom: DomSource): Stream { const button = query('button', dom) const input = query('input', dom)

const click$ = events('click', button) const value$ = map(ev => ev.target.value, events('input', input))

return sampleWith(click$, value$) }

</details>

<details>
  <summary>See the code</summary>

```typescript

export const sampleWith = sample(takeRight) as SampleWith

export interface SampleWith {
  <A>(sampler: Stream<any>, stream: Stream<A>): Stream<A>
  <A>(sampler: Stream<any>): (stream: Stream<A>) => Stream<A>
  (sampler: Stream<any>): <A>(stream: Stream<A>) => Stream<A>
}

function takeRight<A>(_: any, value: A): A {
  return value
}

scan\<A, B>(f: (seed: B, value: A) => B, initial: B, stream: Stream\<A>): Stream\<B>

Incrementally accumulate results, starting with the provided initial value.

// creates a stream that increments by 1 every 1000ms const count$ = scan(x => x + 1, 0, periodic(1000))

observe(console.log, count$)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { scan } from '@most/core'

scheduler (Scheduler)

A shared instance of the default scheduler from @most/scheduler

const stream = now(1)

const sink = { event(time: number, value: number) { ... }, error(time: number, err: Error) { ... }, end(time: number) { ... } }

const disposable = stream.run(sink, scheduler)

// later disposable.dispose()

</details>

<details>
  <summary>See the code</summary>

```typescript

export const scheduler: Scheduler = newDefaultScheduler()

since\<A>(startSingal: Stream\<any>, stream: Stream\<A>): Stream\<A>

Discard all events in one stream until the first event occurs in another.:

const source = // -1-2-3-4-5-6-7-8-> const start = // --------x--------> // ---------5-6-7-8-> const stream = since(start, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { since } from '@most/core'

skip\<A>(quanity: number, stream: Stream\<A>): Stream\<A>

Skip the first n number of events.

const source = // -1-2-3-4-5-6-7-8-9-10-> // -----------6-7-8-9-10-> const stream = skip(5, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { skip } from '@most/core'

skipAfter\<A>(predicate: (value: A) => boolean, stream: Stream\<A>): Stream\<A>

Discard all events after the first event for which predicate returns true.

const source = // --1-2-3-4-5-6-7-8-> // --1-2| const stream = skipAfter(even, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { skipAfter } from '@most/core'

skipRepeats\<A>(stream: Stream\<A>): Stream\<A>

Remove adjacent events that are equal in terms of value equality.

const a = { a: 1 } const b = Object.assign({}, a) const c = { c: 2 }

const source = // --a--b--a--c--> // --a--------c--> const stream = skipRepeats(source)

observe(console.log, stream) // { a: 1 } // { c: 2 }

</details>

<details>
  <summary>See the code</summary>

```typescript

export const skipRepeats: SkipRepeats = skipRepeatsWith(equals)

export type SkipRepeats = <A>(stream: Stream<A>) => Stream<A>

skipRepeatsWith\<A>(predicate: (a: A, b: A) => boolean, stream: Stream\<A>): Stream\<A>

Remove adjacent repeated events, using the provided equality function to compare adjacent events.:

const equalsIgnoreCase = (a: string, b: string) => a.toLowerCase() === b.toLowerCase()

const stream = skipRepeatsWith(equalsIgnoreCase, source)

observe(console.log, stream) // a // b // c // D // e

</details>

<details>
  <summary>See the code</summary>

```typescript

export { skipRepeatsWith } from '@most/core'

skipWhile(predicate: (value: A) => boolean, stream: Stream\<A>): Stream\<A>

Discard all events until predicate returns false, and keep the rest.

const source = // -2-4-5-6-8-> // ----5-6-8-> const stream = skipWhile(even, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { skipWhile } from '@most/core'

slice\<A>(skip: number, take: number, stream: Stream\<A>): Stream\<A>

Keep only events in a range, where start \<= index \< end, and index is the ordinal index of an event in stream.

const source = // --1--2--3--4--5--6--7--8--9--10--> // --------3--4--5| const stream = slice(2, 3, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { slice } from '@most/core'

startWith\<A>(initialValue: A, stream: Stream\<A>): Stream\<A>

Prepends an event to a stream at time 0.

const stream = startWith('Hello', at(1000, 'world'))

observe(console.log, stream) // At time 0 logs 'Hello' // At time 1000 logs 'world'

</details>

<details>
  <summary>See the code</summary>

```typescript

export { startWith } from '@most/core'

state\<A, B>(f: (acc: A, value: B) => A, seed$: Stream\<A>, values$: Stream\<B>): Stream\<A>

Especially useful when keeping local state that also needs to be updated from an outside source.

export function ReorderableList(sources) { const { list$, dom } = sources const li = query('li', dom) const dragOver$ = dragOverEvent(li) const dragStart$ = dragstartEvent(li) const drop$ = dropEvent(li) const reducer$: Stream<(list: Array) => Array> = sample((to, from) => move(from, to), map(getKey, drop$), map(getKey, dragStart$)) const reorderedList$ = state((x, f) => f(x), list$, reducer$) // create all of our tags const childViews$ = mapList(listItem, reorderedList$) // create our containgin our tags const view$ = map(view, childViews$)

return { view$, preventDefault$: dragOver$, } }

</details>

<details>
  <summary>See the code</summary>

```typescript

export const state: State = curry3(__state)

function __state<A, B>(
  f: (accumulator: B, value: A) => B,
  seed$: Stream<B>,
  values$: Stream<A>
): Stream<B> {
  return switchMap(seed => scan(f, seed, values$), seed$)
}

export interface State {
  <A, B>(f: (accumulator: A, value: B) => A, seed$: Stream<A>, values$: Stream<B>): Stream<A>
  <A, B>(f: (accumulator: A, value: B) => A, seed$: Stream<A>): (values$: Stream<B>) => Stream<A>
  <A, B>(f: (accumulator: A, value: B) => A): {
    (seed$: Stream<A>, values$: Stream<B>): Stream<A>
    (seed$: Stream<A>): (values$: Stream<B>) => Stream<A>
  }
}

switchCombine\<A>(streamList$: Stream\<Array\<Stream\<A>>): Stream\<ReadonlyArray\<A>>

Flattens an array of streams into an array of values. Particularly useful when dealing with a list of children components.

function Component(sources) { const { listOfData$ } = sources

const childSinks$ = map( listOfData => listOfData.map(data => ChildComponent({ ...sources, data$: now(data) })) listOfData$ )

const childViews$: Stream<ReadonlyArray> = switchCombine(mapSinks(sinks => sinks.view$, childSinks$))

const view$ = map(view, childView$)

return { view$ } }

function view(childViews: ReadonlyArray): VNode { // ... }

</details>

<details>
  <summary>See the code</summary>

```typescript

export function switchCombine<A>(streamList$: Stream<Array<Stream<A>>>): Stream<ReadonlyArray<A>> {
  return switchLatest(
    map(
      streams => (streams.length === 0 ? now([]) : combineArray((...items) => items, streams)),
      streamList$
    )
  )
}

switchLatest\<A>(stream: Stream\<Stream\<A>>): Stream\<A>

Given a higher-order stream, return a new stream that adopts the behavior of (ie emits the events of) the most recent inner stream.

const A = // -1--2--3-----> const B = // -4--5--6-----> const C = // -7--8--9----->

// --A-----B-----C--------> const source = // ...

// ---1--2--4--5--7--8--9-> const stream = switchLatest(source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { switchLatest } from '@most/core'

switchMap\<A, B = A>(f: (a: A) => Stream\<B>, s: Stream\<A>): Stream\<B>

Applies a function, which returns a higher-order stream, to each event value of a stream and returns a new stream that adopts the behavior of (i.e., emits the events of) the most recent inner stream.

const a$ = now(1) const b$ = now(2) const f = (a: number) => scan((x, y) => x + y, a, b$) const s = skip(1, switchMap(f, a$))

observe(console.log, s) // 3

</details>

<details>
  <summary>See the code</summary>

```typescript

export const switchMap: SwitchMapArity2 = curry2(function switchMap<A, B = A>(
  f: (a: A) => Stream<B>,
  s: Stream<A>
): Stream<B> {
  return switchLatest(map(f, s))
})

export interface SwitchMapArity2 {
  <A, B = A>(f: (a: A) => Stream<B>, s: Stream<A>): Stream<B>

  <A, B = A>(f: (a: A) => Stream<B>): SwitchMapArity1<A, B>
}

export interface SwitchMapArity1<A, B = A> {
  (s: Stream<A>): Stream<B>
}

switchMerge\<A>(streams$: Stream\<Array\<Stream\<A>>): Stream\<A>

Merges a list of streams into a single stream containing events from all of the stream. Particularly useful when dealing with a list of child components.

function Component(sources) { const { listOfData$ } = sources

const childSinks$ = map( listOfData => listOfData.map(data => ChildComponent({ ...sources, data$: now(data) }))), listOfData$ )

const foo$ = switchMerge(mapSinks(sinks => sinks.foo$, childSinks$))

return { foo$ } }

</details>

<details>
  <summary>See the code</summary>

```typescript

export function switchMerge<A>(streams$: Stream<Array<Stream<A>>>): Stream<A> {
  return switchLatest(map(mergeArray, streams$))
}

switchSinkOr\<Sinks, K extends keyof Sinks>(or$: SinksK, sinkName: K, sinks$: Stream\<Sinks>): SinksK

Flattens a stream of sinks into a single sink.

const switchSinkOrNever = switchSinkOr(never())

function Component(sources) { const { listOfItems$ } = sources

const sinks$ = map(items => SubComponent({ ...sources, items$: now(items) }), listOfItems$)

const history$ = switchSinkOrNever('history$', sinks$)

return { history$ } }

</details>

<details>
  <summary>See the code</summary>

```typescript

export const switchSinkOr: SwitchSinkOr = curry3<any, any, any, any>(function switchSinkOr<
  Sinks extends { readonly [key: string]: Stream<any> },
  K extends keyof Sinks = keyof Sinks
>(or$: Sinks[K], sinkName: K, sinks$: Stream<Sinks>): Sinks[K] {
  return switchLatest(map(sinks => sinks[sinkName] || or$, sinks$))
})

export interface SwitchSinkOr {
  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K],
    sinkName: K,
    sinks$: Stream<Sinks>
  ): Sinks[K]

  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K],
    sinkName: K
  ): (sinks$: Stream<Sinks>) => Sinks[K]

  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K]
  ): (sinkName: K, sinks$: Stream<Sinks>) => Sinks[K]

  <Sinks extends { readonly [key: string]: Stream<any> }, K extends keyof Sinks = keyof Sinks>(
    or$: Sinks[K]
  ): (sinkName: K) => (sinks$: Stream<Sinks>) => Sinks[K]
}

take\<A>(quantity: number, stream: Stream\<A>): Stream\<A>

Take at most the first n events of a stream.

const source = // -1-2-3-4-5-6-7-8-9-10-> // -1-2-3| const stream = take(3, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { take } from '@most/core'

takeWhile\<A>(predicate: (value: A) => boolean, stream: Stream\<A>): Stream\<A>

Keep all events until predicate returns false, and discard the rest.

const source = // -2-4-5-6-8-> // -2-4-| const stream = takeWhile(even, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { takeWhile } from '@most/core'

tap\<A>(f: (value: A) => any, stream: Stream\<A>): Stream\<A>

Creates a new stream that upon each event performs a side-effect.

const logStream = tap(console.log, stream)

drain(logStream)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { tap } from '@most/core'

throttle\<A>(ms: number, stream: Stream\<A>): Stream\<A>

Limit the rate of events to at most one per a number of milliseconds.

In contrast to debounce, throttle simply drops events that occur "too often", whereas debounce waits for a "quiet period".

const source = // -abcd---abcd---> // -a-c----a-c----> const stream = throttle(2, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { throttle } from '@most/core'

throwError(err: Error): Stream\<never>

Create a stream in the error state. This can be useful for functions that need to return a stream, but need to signal an error.

const f = (x: Maybe): Stream => isNothing(x) ? throwError(new Error('cannot be given Nothing')) : now(fromJust(x))

const stream = chain(f, maybe$)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { throwError } from '@most/core'

until\<A>(endSignal: Stream\<any>, stream: Stream\<A>): Stream\<A>

Keep all events in one stream until the first event occurs in another.

const source = // --1-2-3-4-5-6-7-8-> const endSignal = // ---------z--------> // --1-2-3-4| const stream = until(endSingal, source)

</details>

<details>
  <summary>See the code</summary>

```typescript

export { until } from '@most/core'

withArrayValues\<A>(array: Array\<A>, stream: Stream\<any>): Stream\<A>

Creates a new stream by associating event times with values from an array. The resulting stream will end when all array values have been used or when the underlying stream ends.

const stream = withArrayValues( 1, 2, 3 , periodic(100))

observe(console.log, stream) // 1 -- time 0 // 2 -- time 100 // 3 -- time 200

</details>

<details>
  <summary>See the code</summary>

```typescript

export { withArrayValues } from '@most/core'

zip\<A, B, C>(f: (a: A, b: B) => C, a$: Stream\<A>, b$: Stream\<B>): Stream\<C>

Applies a function to corresponding pairs of events from the input streams.

const tuple = (x, y) => x, y

const a$ = // --1----3-------5------6----| const b$ = // --------2--3--------4------| // // --------3,2--5,3--6,4| const stream = zip(tuple, a$, b$)

observe(console.log, stream) // 3, 2 // 5, 3 // 6, 4

</details>

<details>
  <summary>See the code</summary>

```typescript

export { zip } from '@most/core'

zipArray\<A, B, C>(f: (a: A, b: B) => C, streams: Stream\<A>, Stream\<B>): Stream\<C>

Applies a function to corresponding pairs of events from the input streams.

const tuple = (x, y) => x, y

const a$ = // --1----3-------5------6----| const b$ = // --------2--3--------4------| // // --------3,2--5,3--6,4| const stream = zipArray(tuple a$, b$)

observe(console.log, stream) // 3, 2 // 5, 3 // 6, 4

</details>

<details>
  <summary>See the code</summary>

```typescript

export { zipArray } from '@most/core'

zipArrayValues\<A, B, C>(f: (arrayValue: A, streamValue: Stream\<B>) => C, array: Array\<A>, stream: Stream\<B>): Stream\<C>

Creates a new stream by applying a function with a value at increasing index of an array and the latest event value from a stream. The resulting stream will end when all array values have been used or as soon as the underlying stream ends.

const f = (x, y) => x + y

const array = 100, 200 const stream = concat(now(1), now(2))

observe(console.log, zipArrayValues(f, array, stream)) // 101 // 202

</details>

<details>
  <summary>See the code</summary>

```typescript

export { zipArrayValues } from '@most/core'