3.1.0 • Published 3 years ago

@zuze/pubsub v3.1.0

Weekly downloads
2
License
MIT
Repository
github
Last release
3 years ago

@zuze/pubsub

npm version Coverage Status Build Status Bundle Phobia

What is this?

PubSub is one of the best known decoupling software architecture patterns - it allows different components to communicate with each other without knowing about each other.

What's special about this one?

Well, we really like the RxJS inspired operators and the super small dependency free package size.

Getting Started

Install it as a dependency in your JavaScript/Typescript project

npm install @zuze/pubsub
# or
yarn add @zuze/pubsub

Or just pull it in from the browser:

<script src="https://unpkg.com/@zuze/pubsub"></script>
<script>
    const { publish, subscribe } = pubsub();
    subscribe('someEvent', console.log);
    publish('someEvent', 'it works!'); // logs it works! in the console
</script>

API

  • pubsub<T extends Topics>(): EventBus<T>

Create an EventBus of type T.

import pubsub from '@zuze/pubsub';

type Topics = {
    user_event: {
        first_name: string;
        last_name: string;
        user_id: number;
    };
    post: {
        post_id: number;
        content: string;
    };
    comment: {
        comment_id: number;
        content: string;
        post_id: number;
    };
    nums: number;
};

const { subscribe, publish } = pubsub<Topics>();
  • publish: <R extends keyof T>(topic: R, data: T[R], sync?: boolean) => void

Broadcasts an entity to all subscribers on a given topic:

publish(
    'comment',
    {
        comment_id: 10,
        content: 'some comment',
        post_id: 27
    }
);

Take note of the optional third sync parameter. By default @zuze/pubsub broadcasts to all subscribers for a given topic asynchronously so as to prevent a single subscriber from potentially blocking any other subscriber. If you pass the third parameter as true, events will be broadcast synchronously.

  • subscribe: <R extends keyof T>(topic: R, subscriber: Subscriber<T[R]>) => Unsubscribe

Subscribes to a topic and returns an unsubscribe function.

e.g. 
const { subscribe, publish } = pubsub<Topics>();
const unsub = subscribe('comment', comment => console.log('got comment',comment));
publish(
    'comment',
    {
        comment_id: 10,
        content: 'some comment',
        post_id: 27
    }
);

/*
output:
got comment
    {
        comment_id: 10,
        content: 'some comment',
        post_id: 27
    }
*/

unsub(); // no further events will be logged

Subscribing with pipe and Operators

You could stop here and have an great, incredibly small PubSub utility. But subscribing with a pipe and operators takes your functional PubSub system to the next level.

import pubsub from '@zuze/pubsub';
import { createPipe, filter, subscriber } from '@zuze/pubsub/operators';

// creates a pipe that accepts operator functions that act on Topics['comment']
const pipe = createPipe<Topics['comment']>();
const { subscribe, publish } = pubsub<Topics>();

subscribe(
    'comment', 
    pipe(
        filter(({ comment_id }) => comment_id === 10),
        subscriber(({content}) => console.log(content))
    )
);

publish('comment',{ comment_id: 9, content: 'some comment' }); // no log
publish('comment',{ comment_id: 10, content: 'bye' }); // logs 'bye'

In the browser

<script src="https://unpkg.com/@zuze/pubsub"></script>
<script src="https://unpkg.com/@zuze/pubsub/pipe"></script>
<script>
    const { publish, subscribe } = pubsub();
    const { map, subscriber, pipe } = pubsubPipe;    
    subscribe(
        'someEvent', 
        pipe(
            map(r => `I said, ${r}`),
            subscriber(output => console.log(output))
        )
    );

    publish('someEvent','it works!'); // logs 'I said, it works!' in the console
</script>

Perhaps counterintuitively, due to the way a pipe is constructed from operator functions, nothing ever "comes out" of a pipe, then end result is always undefined. If you want to call some external function at any point in the pipe, use the subscriber operator (an alias of tap)

pipe and operators are stolen from inspired by the RxJS operator API, but without the power or complexity of schedulers or marble diagrams.

Using Operators

pipe<T>(...operators: OperatorFn<T,any>[])

When using typescript, you should create the pipe first using createPipe:

import { createPipe } from '@zuze/pubsub/operators';

// creates a pipe whose operator functions act on SomeType
const pipe = createPipe<SomeType>();
const myPipe = pipe(...operatorFunctions);

When using plain JavaScript, feel free to use pipe directly:

import { pipe } from '@zuze/pubsub/operators';
const myPipe = pipe(...operatorFunctions);

Creating Reusable Operators

An operator function accepts a single parameter - a unary function typically named next and returns a function that, after performing some operation, may or may not call next with a single argument.

So an operator creator is any function that returns an operator function.

While this might sound complicated, in reality it's just a bunch of fat arrows. Let's create one:

import { log } from '@zuze/pubsub/operators';

// greaterThan is an operator creator - a function that returns an operator.

// call next with the published event if it's greater than the user supplied number
const greaterThan = num => next => event => event > num && next(event);

// use like
const { subscribe, publish } = pubsub<Topics>();
const pipe = createPipe<Topics['number']>();

subscribe( 'nums', pipe( greaterThan(5), log() ));

publish('nums', 4); // no log
publish('nums', 8); // log's 8

const multiplyBy = num => next => event => next(event*num);

// use like
subscribe( 'nums', pipe( multiplyBy(4), log() ));

publish('nums',4); // logs 16

// chain your custom operators!
subscribe( 'nums', pipe( multiplyBy(4), greaterThan(5), log() ));

publish('nums', 1); // no log
publish('nums', 2); // logs 8

Async operators are trivial:

import pubsub from '@zuze/pubsub';
import { pipe, log } from '@zuze/pubsub/operators';

const { publish, subscribe } = pubsub();

// call next with the published event IF it's greater than the user supplied number
const callApiOperator = apiCall => next => async event => next(await apiCall(event));

subscribe(
    'user_ids',
    pipe(
        callApiOperator(
            async id => (await fetch(`https://jsonplaceholder.typicode.com/users/${id}`)).json()
        ),
        log()
    )
);

publish('user_ids', 10);

You can easily create operators that are simply a common reuse of other operators using pipe using pipeable. This is handy if you find yourself consistently reusing operators together and is used internally to create operators as well:

The below code is taken directly from bufferCount:

import { pipeable, stack, filter, map } from '@zuze/pubsub/operators';

const bufferCount = (size, every) => pipeable(
  stack(),
  filter(
    e => (!size || e.length >= size) && (!every || e.length % every === 0),
  ),
  map(e => e.slice(size * -1)),
);

Just for fun, creating bufferCount without using pipeable would look like this:

const bufferCount = (size,every) => next => pipe(
  stack(),
  filter(
    e => (!size || e.length >= size) && (!every || e.length % every === 0),
  ),
  map(e => e.slice(size * -1)),
  () => next    
);

List of Operators

We tried to stay as close as possible to RxJS operating naming, considering we're not in an observable environment. With that in mind, here's the list:

map<T,R>(mapper: (e: T) => R): R

Transforms the emitted event into a new value using a function before passing it to the next function:

const fn = pipe( map(e => e*2), log() ) )
fn(3); // logs 6

mapTo<R>(mapper: () => R): R

Transforms the emitted event into a new value before passing it to the next function

const fn = pipe( map('joe'), log() ) )
fn(3); // logs 'joe'

take<T>(num: number): T

Stops calling the next function after num

const fn = pipe( take(2), log() ) )
fn(3); // logs 3
fn(3); // logs 3
fn(3); // no log

takeUntil<T>(promise: Promise<void>): T

Calls the next function until the provided promise resolved

const when = new Promise(res => setTimeout(res,2000));
const fn = pipe( takeUntil(when), log() ) )
fn(3); // logs 3
fn(3); // logs 3
// 2 seconds pass
fn(3); // no log

startWith<T>(T | Promise<T> | () => (T | Promise<T>)): T

Immediately calls the next operator in the chain with the value:

const fn = pipe( filter(i => i > 2), startWith(7), log() ); // 7 is logged immediately
fn(1); // no log
fn(3); // logs 3


const fn = pipe ( filter(i => i > 2), startWith(() => new Promise(res => setTimeout(() => res(7))), log() );

single<T>():T

Alias of take(1)

skip<T>(num: number): T

Only starts calling the next function after num calls

const fn = pipe( skip(2), log() ) )
fn(3); // no log
fn(3); // no log
fn(3); // logs 3

skipUntil<T>(promise: Promise<void>): T

Prevents calling the next function until the provided promise resolved

const when = new Promise(res => setTimeout(res,2000));
const fn = pipe( takeUntil(when), log() ) )
fn(3); // no log
fn(3); // no log
// 2 seconds pass
fn(3); // logs 3

tap<T>(fn: (e: T) => void): T

Calls it's function argument with the emitted event and continues calling the next function in the pipe:

const fn = pipe( tap(e => console.log(e*2)), log() ) )
fn(3); // logs 6, then logs 3

subscriber<T>(fn: (e: T) => void): T

Alias of tap

log<T>(): T

Alias of tap(console.log)

every<T>(num: number): T

Only calls the subsequent function in the pipe when a modulo of num has been admitted.

const fn = pipe( every(2), log() ); // only proceed after every second event
fn(3); // no log
fn(3); // logs 3
fn(3); // no log
fn(3); // logs 3

count<T>(startAt?: number): [number,T]

Adds in the number of events that have been emitted when calling the next function in the chain:

const fn = pipe( count(), log() ); 
fn(3); // logs [0,3]
fn(3); // logs [1,3]
fn(3); // logs [2,3]
fn(3); // logs [3,3]

stack<T>(minSize?: number, maxSize?: number): T[]

Calls the next operator with an array of all previously emitted events. Will not call the next operator until the stack reaches minSize (defaults to 1). Will call the next operator with a maximum array length of maxSize (defaults to all).

const fn = pipe( stack(), log() ); 

fn(1); // logs [1]
fn(2); // logs [1,2]
fn(3); // logs [1,2,3]
fn(4); // logs [1,2,3,4]


const fn = pipe( stack(3), log() ); 

fn(1); // no log
fn(2); // no log
fn(3); // logs [1,2,3]
fn(4); // logs [1,2,3,4]

const fn = pipe( stack(3,2), log() ); 

fn(1); // no log
fn(2); // no log
fn(3); // logs [2,3]
fn(4); // logs [3,4]
fn(5); // logs [4,5]

bufferCount<T>(size?: number, every?: number): T[]

Calls the next operator when at least size events have been emitted with an array of size. If every is provided, will only call the next operator when the current call number%every is 0.

const fn = pipe( bufferCount(2,3), log() ); 

fn(1); // no log
fn(2); // no log
fn(3); // logs [2,3]
fn(4); // no log
fn(5); // no log
fn(6); // logs [5,6]
fn(7); // no log

pairwise<T>(): T[]

Calls the next operator with an array containing the most recent emitted event (last element) and the event emitted previous to this.

Note: This operator will NOT call the subsequent operator until at least 2 events have been emitted.

filter<T>(filterFn: (e: T) => boolean): T

The provided filter function controls whether to calling the next operator in the pipe depending on if it returns true or false

const fn = pipe( filter(e => e > 2), log() )
fn(1); // no log
fn(3); // logs 3

pluck<T,R>(...keys: string[]): R

Plucks value from the object the function was called with based on the given keys:

const fn = pipe ( pluck( 'comment', 'created_on' ), log() );
fn({
    comment: {
        created_on: 1606101991,
        content: 'hi'
    }
}) // logs 1606101991

distinct<T>(comparator?: (a: T, b: T) => boolean): R

Only calls subsequent operators if the value hasn't been previously emitted ever.

distinctKey<T>(key: string, comparator?: (a: T, b: T) => boolean): R

Only calls subsequent operators if the value at the given key hasn't been previously emitted ever.

distinctUntilChanged<T>(comparator?: (a: T, b: T) => boolean): R

Only calls subsequent operators if the value is different than the previous value.

distinctUntilKeyChanged<T>(key: string, comparator?: (a: T, b: T) => boolean): R

Only calls subsequent operators if the value at the given key is different than the previous value.

delay(by?: number)

Delays calling the next function in the pipe using by (milliseconds):

const fn = pipe( delay(100), log() ); 
fn(3); // asynchronous logs 3, 100 ms after emitted

debounce(by?: number, leading?: boolean = false)

Debounces calls to the subsequent operator given using by (milliseconds). If leading is true then the function will be called immediately on first invocation and subsequent calls within by will be ignored.

throttle(by?: number, leading?: boolean)

Throttles calls to the subsequent operator using by (milliseconds)

const fn = pipe( delay(100), log() ); 
fn(3); // asynchronous logs 3, 100 ms after emitted

Contributing

PRs are welcome!

License

MIT © akmjenkins

3.1.0

3 years ago

2.2.3

3 years ago

3.0.5

3 years ago

3.0.4

3 years ago

3.0.3

3 years ago

3.0.2

3 years ago

3.0.1

3 years ago

2.2.2

3 years ago

3.0.0

3 years ago

2.2.1

3 years ago

2.2.0

3 years ago

2.1.8

3 years ago

2.1.6

3 years ago

2.1.7

3 years ago

2.1.5

3 years ago

2.1.4

3 years ago

2.1.3

3 years ago

2.1.2

3 years ago

2.1.1

3 years ago

2.1.0

3 years ago

2.0.3

3 years ago

2.0.2

3 years ago

2.0.1

3 years ago

2.0.0

3 years ago

1.0.0

3 years ago