@akmjenkins/hgseukf v0.0.3
@zuze/pubsub
What is this?
PubSub is one of the best known decoupling software architecture patterns - it allows different components to communicate with each other without knowing about each other.
What's special about this one?
Well, we really like the RxJS inspired operators and the super small dependency free package size.
Getting Started
Install it as a dependency in your JavaScript/Typescript project
npm install @zuze/pubsub
# or
yarn add @zuze/pubsubOr just pull it in from the browser:
<script src="https://unpkg.com/@zuze/pubsub"></script>
<script>
const { publish, subscribe } = pubsub();
subscribe('someEvent', console.log);
publish('someEvent', 'it works!'); // logs it works! in the console
</script>API
pubsub<T extends Topics>(): EventBus<T>
Create an EventBus of type T.
import pubsub from '@zuze/pubsub';
type Topics = {
user_event: {
first_name: string;
last_name: string;
user_id: number;
};
post: {
post_id: number;
content: string;
};
comment: {
comment_id: number;
content: string;
post_id: number;
};
};
const { subscribe, publish } = pubsub<Topics>();publish: <R extends keyof T>(topic: R, data: T[R], sync?: boolean) => void
Broadcasts an entity to all subscribers on a given topic:
publish(
'comment',
{
comment_id: 10,
content: 'some comment',
post_id: 27
}
);Take note of the optional third sync parameter. By default @zuze/pubsub broadcasts to all subscribers for a given topic asynchronously so as to prevent a single subscriber from potentially blocking any other subscriber. If you pass the third parameter as true, events will be broadcast synchronously.
subscribe: <R extends keyof T>(topic: R, subscriber: Subscriber<T[R]>) => Unsubscribe
Subscribes to a topic and returns an unsubscribe function.
e.g.
const { subscribe, publish } = pubsub<Topics>();
const unsub = subscribe('comment', comment => console.log('got comment',comment));
publish(
'comment',
{
comment_id: 10,
content: 'some comment',
post_id: 27
}
);
/*
output:
got comment
{
comment_id: 10,
content: 'some comment',
post_id: 27
}
*/
unsub(); // no further events will be loggedSubscribing with pipe and Operators
You could stop here and have an great, incredibly small PubSub utility. But subscribing with pipe and operators takes your functional PubSub system to the next level.
import pubsub from '@zuze/pubsub';
import { pipe, filter, subscriber } from '@zuze/pubsub/operators';
const { subscribe, publish } = pubsub<Topics>();
subscribe(
'comment',
pipe(
filter(({ comment_id }) => comment_id === 10),
subscriber(({content}) => console.log(content))
)
);
publish('comment',{ comment_id: 9, content: 'some comment' }); // no log
publish('comment',{ comment_id: 10, content: 'bye' }); // logs 'bye'In the browser
<script src="https://unpkg.com/@zuze/pubsub"></script>
<script src="https://unpkg.com/@zuze/pubsub/operators"></script>
<script>
const { publish, subscribe } = pubsub();
const { map, subscriber, pipe } = pubsubOperators;
subscribe(
'someEvent',
pipe(
map(r => `I said, ${r}`),
subscriber(output => console.log(output))
)
);
publish('someEvent','it works!'); // logs 'I said, it works!' in the console
</script>pipe and operators are stolen from inspired by the RxJS operator API.
Using Operators
-pipe(...operators)
Creating Reusable Operators
Creating operators is acheived using createOperator function.
createOperator accepts a function that accepts any number of user-supplied arguments and returns a function that accepts a next callback, followed by a function that accepts the published event.
That sounds a bit complicated, but in reality, it's just a bunch of fat-arrows:
import { createOperator, log } from '@zuze/pubsub/operators';
// call next with the published event if it's greater than the user supplied number
const greaterThan = createOperator(num => next => event => event > num && next(event));
// use like
const { subscribe, publish } = pubsub<Topics>();
subscribe( 'nums', pipe( greaterThan(5), log() ));
publish('nums', 4); // no log
publish('nums', 8); // log's 8
const multiplyBy = createOperator(num => next => event => next(event*num));
// use like
subscribe( 'nums', pipe( multiplyBy(4), log() ));
publish('nums',4); // logs 16
// chain your custom operators!
subscribe( 'nums', pipe( multiplyBy(4), greaterThan(5), log() ));
publish('nums', 1); // no log
publish('nums', 2); // logs 8Async operators are trivial:
import pubsub from '@zuze/pubsub';
import { createOperator, pipe, log } from '@zuze/pubsub/operators';
const { publish, subscribe } = pubsub();
// call next with the published event IF it's greater than the user supplied number
const callApiOperator = createOperator(apiCall => next => async event => next(await apiCall(event)));
subscribe(
'user_ids',
pipe(
callApiOperator(
async id => (await fetch(`https://jsonplaceholder.typicode.com/users/${id}`)).json()
),
log()
)
);
publish('user_ids', 10);You can easily create operators that are simply a common reuse of other operators using pipe using createPipeOperator.
createPipeOperator accepts a callback that is called with the same user-supplied arguments (like createOperator) but it returns an array of operator functions. This is handy if you find yourself consistently reusing operators together and is used internally to create operators as well:
The below code is taken directly from bufferCount:
import { createPipeOperator, stack, filter, map } from '@zuze/pubsub/operators';
const bufferCount = createPipeOperator((size, every) => [
stack(),
filter(
e => (!size || e.length >= size) && (!every || e.length % every === 0),
),
map(e => e.slice(size * -1)),
]);Just for fun, creating buffer count using createOperator instead of createPipeOperator would look like this:
const bufferCount = createOperator((size,every) => next => pipe(
stack(),
filter(
e => (!size || e.length >= size) && (!every || e.length % every === 0),
),
map(e => e.slice(size * -1)),
() => next
))List of Operators
We tried to stay as close as possible to RxJS operating naming, considering we're not in an observable environment. With that in mind, here's the list:
map<T,R>(mapper: (e: T) => R): R
Transforms the emitted event into a new value using a function before passing it to the next function:
const fn = pipe( map(e => e*2), log() ) )
fn(3); // logs 6mapTo<R>(mapper: () => R): R
Transforms the emitted event into a new value before passing it to the next function
const fn = pipe( map('joe'), log() ) )
fn(3); // logs 'joe'take<T>(num: number): T
Stops calling the next function after num
const fn = pipe( take(2), log() ) )
fn(3); // logs 3
fn(3); // logs 3
fn(3); // no logsingle<T>():T
Alias of take(1)
skip<T>(num: number): T
Only starts calling the next function after num calls
const fn = pipe( skip(2), log() ) )
fn(3); // no log
fn(3); // no log
fn(3); // logs 3tap<T>(fn: (e: T) => void): T
Calls it's function argument with the emitted event and continues calling the next function in the pipe:
const fn = pipe( tap(e => console.log(e*2)), log() ) )
fn(3); // logs 6, then logs 3subscriber<T>(fn: (e: T) => void): T
Alias of tap
log<T>(): T
Alias of tap(console.log)
every<T>(num: number): T
Only calls the subsequent function in the pipe when a modulo of num has been admitted.
const fn = pipe( every(2), log() ); // only proceed after every second event
fn(3); // no log
fn(3); // logs 3
fn(3); // no log
fn(3); // logs 3count<T>(startAt?: number): [number,T]
Adds in the number of events that have been emitted when calling the next function in the chain:
const fn = pipe( count(), log() );
fn(3); // logs 0,3
fn(3); // logs 1,3
fn(3); // logs 2,3
fn(3); // logs 3,3stack<T>(minSize?: number, maxSize?: number): T[]
Calls the next operator with an array of all previously emitted events. Will not call the next operator until the stack reaches minSize (defaults to 1). Will call the next operator with a maximum array length of maxSize (defaults to all).
const fn = pipe( stack(), log() );
fn(1); // logs [1]
fn(2); // logs [1,2]
fn(3); // logs [1,2,3]
fn(4); // logs [1,2,3,4]
const fn = pipe( stack(3), log() );
fn(1); // no log
fn(2); // no log
fn(3); // logs [1,2,3]
fn(4); // logs [1,2,3,4]
const fn = pipe( stack(3,2), log() );
fn(1); // no log
fn(2); // no log
fn(3); // logs [2,3]
fn(4); // logs [3,4]
fn(5); // logs [4,5]bufferCount<T>(size?: number, every?: number): T[]
Calls the next operator when at least size events have been emitted with an array of size. If every is provided, will only call the next operator when the current call number%every is 0.
const fn = pipe( bufferCount(2,3), log() );
fn(1); // no log
fn(2); // no log
fn(3); // logs [2,3]
fn(4); // no log
fn(5); // no log
fn(6); // logs [5,6]
fn(7); // no logpairwise<T>(): T[]
Calls the next operator with an array containing the most recent emitted event (last element) and the event emitted previous to this.
Note: This operator will NOT call the subsequent operator until at least 2 events have been emitted.
filter<T>(filterFn: (e: T) => boolean): T
The provided filter function controls whether to calling the next operator in the pipe depending on if it returns true or false
const fn = pipe( filter(e => e > 2), log() )
fn(1); // no log
fn(3); // logs 3pluck<T,R>(...keys: string[]): R
Plucks value from the object the function was called with based on the given keys:
const fn = pipe ( pluck( 'comment', 'created_on' ), log() );
fn({
comment: {
created_on: 1606101991,
content: 'hi'
}
}) // logs 1606101991distinct<T>(comparator?: (a: T, b: T) => boolean): R
Only calls subsequent operators if the value hasn't been previously emitted ever.
distinctKey<T>(key: string, comparator?: (a: T, b: T) => boolean): R
Only calls subsequent operators if the value at the given key hasn't been previously emitted ever.
distinctUntilChanged<T>(comparator?: (a: T, b: T) => boolean): R
Only calls subsequent operators if the value is different than the previous value.
distinctUntilKeyChanged<T>(key: string, comparator?: (a: T, b: T) => boolean): R
Only calls subsequent operators if the value at the given key is different than the previous value.
delay(by?: number)
Delays calling the next function in the pipe using by (milliseconds):
const fn = pipe( delay(100), log() );
fn(3); // asynchronous logs 3, 100 ms after emitteddebounce(by?: number)
Debounces calls to the subsequent operator given using by (milliseconds).
throttle(by?: number, leading?: boolean)
Throttles calls to the subsequent operator using by (milliseconds)
const fn = pipe( delay(100), log() );
fn(3); // asynchronous logs 3, 100 ms after emittedContributing
PRs are welcome! Ideas include:
- TYPES FOR OPERATORS!
- Make the operators fully tree-shakeable. Right now they aren't.
- Include any RxJS operators that make sense in a non-observable environment
License
MIT © akmjenkins