1.2.0 • Published 4 years ago

plumbing-toolkit v1.2.0

Weekly downloads
3
License
MIT
Repository
github
Last release
4 years ago

Plumbing Toolkit

Build Status npm version

Pluggable and pluckable typesafe asynchronous streams.

A modular observable approach using plumber terminology to better conceptualize the data flow.

Examples

let c = 0;
pipe(fibonacci(20))
    .pipe( context(0) )
    .pipe( forEach(_ => _.context++) )
    .pipe( filter(_ => _.value % 2 == 0) )
    .pipe( map(_ => `${p.context}: ${p.value}`) )
    .pipe( consume(console.log) );

Prints out

3: 2
6: 8
9: 34
12: 144
15: 610
18: 2584

There is also a chaining method, so this example could also be written as

pipe(fibonacci(20)).chain(
    context(0),
    forEach(p => p.context++),
    filter(p => p.value % 2 == 0),
    map(p => `${p.context}: ${p.value}`),
    consume(console.log)
)

The chain method is also typed for up to 42 arguments, if that's not enough then it's also possible to chain the chains.

Installation

Simply via npm

$ npm i plumbing-toolkit

Concepts

Pipe

A pipe is the central object in plumbing-toolkit, it is created using a spring and can be piped into an operator or flushed into an outlet.

declare class Pipe<T> implements IPipe<T> {
    constructor(spring: Spring<T>);
    pipe<K, R = Pipe<K>>(operator: Operator<T, K, R>): ReturnType<Operator<T, K, R>>;
    flush(outlet: Outlet<T>): () => void;
    chain(...operators: Operator<any>[]): any; // The arguments are typed in IPipe<T>
}

A pipe can be created using the pipe() function

import { pipe, range } from "plumbing-toolkit";

const myPipe = pipe(range(0, 10)); // Pipe<number>

Spring

A spring is a function that takes an outlet and returns a pluck. When called, the spring emits values into the outlet until it is plucked using the pluck.

They typically have the format

export type Spring<T> = (outlet: Outlet<T>) => Pluck;

And usually come as closures like so

export function interval(ms: number): Spring<number> {
    return (outlet: Outlet<number>) => {
        let i = 0;
        const handle = setInterval(() => outlet.next(i++), ms);
        return () => clearInterval(handle); // return the pluck
    }
}

Operator

While springs provide data and pipes transport data, operators are the ones, that actually work (or operate) on the data, for example by filtering, mapping or transforming the data. An operator is usually a function that takes a pipe and returns a pipe.

They typically have the format

export type Operator<In, Out = In, Returns = IPipe<Out>> = (input: IPipe<In>) => Returns;

The most basic operator is an operator that returns a pipe of the same type, like the filter() operator.

However in plumbing-toolkit an operator is not limited to returning another pipe, even though that is the common usecase.

They are also used to provide endpoints for the pipes. For example the first() operator flushes the pipe it is passed into an outlet and then it returns the first value it receives and plucks the pipe.

const firstValue = await pipe(from(["foo", "bar", "baz"])).pipe(first())
console.log(firstValue); // foo

Outlet

An outlet is the receiving end of pipes or springs and usually provides three methods: next(), complete() and error(). Upon flush, the outlet receives values via next(), until the spring is completed at which point complete() is called. If an error occured anywhere in the chain, it will be passed to error().

When the spring or any other outlet above it in the chain is plucked, the outlet will also have the property "plucked" set to true.

export interface IOutlet<T> {
    next(value: T): MaybePromise<void>;
    error?(error: Error): MaybePromise<void>;
    complete?(): MaybePromise<void>;
    pluck?(): MaybePromise<void>;
    plucked?: boolean;
}

This is the outlet interface, it is implemented by the outlet class that provides guaranteed availability of each of those methods.

In day to day business you'll usually never have to manually instantiate an outlet as most usecases are already wrapped nicely in operators and outlet factory methods.

Pluck

A pluck is a simple function that plucks a spring, as in cancels it. It is returned by the spring and passed down the chain.

It can be accessed via flushing the pipe into an outlet or piping it into the consume operator

const myPipe = pipe(interval(1000)); 
const pluck = myPipe.flush(Outlet.to(console.log)) // or myPipe.pipe(consume(console.log))
setTimeout(pluck, 5000); // stop the interval after 5 seconds

In fact, the consume operator is implemented just like that

export function consume<T>(consumer: ConsumerFn<T>): Operator<T, T, Pluck>
{
    return input => input.flush( Outlet.to(consumer) );
}
1.2.2

4 years ago

1.2.1

4 years ago

1.2.0

5 years ago

1.1.0

5 years ago

1.0.5

5 years ago

1.0.4

5 years ago

1.0.3

5 years ago

1.0.2

5 years ago

0.7.1

5 years ago

0.7.0

5 years ago

0.6.5

6 years ago

0.6.4

6 years ago

0.6.1

6 years ago

0.6.0

6 years ago

0.5.0

6 years ago

0.4.0

6 years ago

0.3.0

6 years ago

0.2.0

6 years ago

0.1.5

6 years ago

0.1.4

6 years ago

0.1.3

6 years ago

0.1.2

6 years ago

0.1.1

6 years ago

0.1.0

6 years ago