rxjs-changefeeds v0.0.4
rxjs-changefeed
RxJS helpers to build things on top of changefeeds.
Api docs: bellow Example: demo, code
What are changefeeds?
Changefeed is a log of set <key> <value>
and del <key> <value>
operations. This allows to represent a collection as observable in efficient way.
Many modern databases come with built in support for changefeeds: CouchDb, RethinkDB, MongoDB.
Change feed data model
type ChangeFeed<Key, Value> =
| ["initializing"]
| ["ready"]
| ["set", Key, Value]
| ["del", Key];
Example:
[
["initializing"], // initial data is loading (optional)
["set", "1", { id: "1", name: "Tom" }],
["set", "2", { id: "2", name: "Peter" }],
["ready"], // all initial data has loaded (optional)
["set", "1", { id: "1", name: "John" }], // Tom has changed name to John
["set", "3", { id: "3", name: "Merry" }], // new user Merry was created
["del", "2"] // Peter was deleted
];
Value operations:
set <key> <value>
: value is added to collection or updateddel <key>
: value is removed from collection
Optional state operations:
initializing
andready
does not effect behavior - they are useful when loading indicator is wished. We don't do much about it in this library except for passing them through.
API
RxJS Operators
Methods bellow return OperatorFunction<Input, Output>
To learn more check rxjs operators guide.
feedFilter( FilterFunction )
Filters changefeed values using FilterFunction
.
- FilterFunction:
(value: Value) => boolean
orObservable<FilterFunction>
- Input:
ChangeFeed<Key, Value>
- Output:
ChangeFeed<Key, Value>
Will transforms set
events to del
when old value pass filter but new value does not.
Can take observable filter function. When used with observable filter and filter changes, new filter is applied to all values again and set
/del
events re-emitted.
Example, using static filter function:
import { Subject } from "rxjs";
import { feedFilter } from "rxjs-changefeeds";
const filter = value => value < 2;
const input = new Subject();
const output = input.pipe(feedFilter(filter)).subscribe(console.log);
input.next(["set", "x", 1]);
// output: ["set","x",1]
input.next(["set", "y", 2]);
// no output
input.next(["set", "x", 2]);
// output: ["del","x"]
Example with observable filter function. Notice how only necessary updates are broadcasted after filter value changes.
import { BehaviorSubject, Subject } from "rxjs";
import { feedFilter } from "rxjs-changefeeds";
const filter = new BehaviorSubject(value => value < 3);
const input = new Subject();
const result = input.pipe(feedFilter(filter));
result.subscribe(console.log);
input.next(["set", "one", 1]);
input.next(["set", "two", 2]);
input.next(["set", "three", 3]);
// output:
// ["set","one",1]
// ["set","two",2]
filter.next(value => value > 1);
// output:
// ["del","one"]
// ["set","three",3]
Note that when input observable completes, result also completes and filter is unsubscribed.
feedGroupBy( KeyFunction )
Group changefeed using keyFunction into separate change feeds.
- KeyFunction:
(Value, Key) => GroupKey
- Input:
ChangeFeed<Key, Value>
- Output:
ChangeFeed<GroupKey, Observable<Input>>
utput result is change feed where each key is result ofkeyFunction
and each value is initial changefeed filtered usingkeyFunction
Notes:
When set
event on existing item in Input
results in different GroupKey
than before, this will trigger two events:
del
event in old groupset
in new group
When there are no more items left in a group, group's observable completes.
If initializing
and ready
are used they will be passed through to all groups. If ready
was received by input and new group is created, ready
event is triggered to newly created group.
Example:
import { Subject } from "rxjs";
import { mergeMap, map, tap } from "rxjs/operators";
import { feedGroupBy } from "rxjs-changefeeds";
const input = new Subject();
const getKey = num => (num % 2 ? "odd" : "even");
const output = input.pipe(feedGroupBy(getKey));
output.subscribe({
next(record) {
const [op, groupKey, groupChanges$] = record;
console.log("outer:", op, groupKey);
if (op === "set") {
groupChanges$.subscribe({
next(record) {
console.log(`inner[${groupKey}]:`, record);
},
complete() {
console.log(`inner[${groupKey}]: complete.`);
}
});
}
},
complete() {
console.log("outer: complete.");
}
});
input.next(["set", "x", 1]);
// output:
// outer: set odd
// inner[odd]: ["set","x",1]
input.next(["set", "y", 2]);
// output:
// outer: set even
// inner[even]: ["set","y",2]
input.next(["set", "x", 2]);
// output:
// inner[odd]: ["del","x"]
// inner[odd]: complete.
// outer: del odd
// inner[even]: ["set","x",2]
input.complete();
// output:
// outer: complete.
// inner[even]: complete.
feedSortedList( Comparator, Options? )
Transform changefeed into sorted array where each item in the array is an Observable (with additional key
property) of the Value
.
- Comparator: used for sorting the array
(a: Value, b: Value) => number
orObservable<Comparator>
- Input:
ChangeFeed<Key¸ Value>
- Output:
Array<Observable<Value> & {key: Key}>
- Options: optional object of parameters:
throttleTime
: default100
. Throttles Output and re-sorting. Usenull
to disable throttling and trigger update synchronously.scheduler
: defaultasyncScheduler
, not used whenthrottleTime: null
Notes:
Output only fires events when sort is effected: when new value arrives in input, first it's compared against it's siblings, if equality left <= value <= right
is maintained, then re-sorting does not happen.
Example:
import { Subject, queueScheduler, of, list } from "rxjs";
import { mergeMap, map, tap, distinct } from "rxjs/operators";
import { feedSortedList } from "rxjs-changefeeds";
const input = new Subject();
const list$ = input.pipe(
feedSortedList((a, b) => a - b, {
// for demo purpose we change
throttleTime: null
})
);
// Log sorted list:
list$.subscribe({
next(list) {
console.log("list:", list.map(value$ => value$.key));
},
complete() {
console.log("list: complete");
}
});
// Log inner values:
const flatValues$ = list$.pipe(
mergeMap(list => of(...list)),
distinct()
);
flatValues$.subscribe({
next(value$) {
value$.subscribe({
next(value) {
console.log(`inner[${value$.key}]:`, value);
},
complete() {
console.log(`inner[${value$.key}]: complete.`);
}
});
}
});
input.next(["set", "x", 1]);
// output:
// list: ["x"]
// inner[x]: 1
input.next(["set", "y", 2]);
// output:
// list: ["x","y"]
// inner[y]: 2
// If order does not change no list update is triggered:
input.next(["set", "y", 3]);
// output:
// inner[y]: 3
// deletion completes value observable and removes it from list:
input.next(["del", "x"]);
// output:
// list: ["y"]
// inner[x]: complete.
feedToKeyValueMap()
Flatten change feed to new Map()
- Input:
ChangeFeed<Key, Value>
- Output:
Map<Key, Value>
import { Subject, queueScheduler, of, list } from "rxjs";
import { mergeMap, map, tap, distinct } from "rxjs/operators";
import { feedToKeyValueMap } from "rxjs-changefeeds";
const input = new Subject();
input
.pipe(
feedToKeyValueMap(),
map(keyValues => Array.from(keyValues.entries()))
)
.subscribe(console.log);
input.next(["set", "x", 1]);
// output: [["x",1]]
input.next(["set", "y", 2]);
// output: [["x",1],["y",2]]
input.next(["set", "x", 3]);
// output: [["x",3],["y",2]]
input.next(["del", "x"]);
// output: [["y",2]]
Combiners
feedCombine(Inputs, ProjectFunction)
Merge multiple chagnefeeds into one based on key
.
- Inputs: array of input change feeds
[Observable<ChangeFeed<Key, Value1>>, ... ,
Observable<ChangeFeed<Key, ValueN>>]
- ProjectFunction: combiner producing values for output changefeed
(v1: Value1, ... , vN: ValueN) => Project
- Returns:
ChangeFeed<Key, Project>
Notes:
ProjectFunction
is called with latest value of each change feed in Inputs
array. When value is not available any of input change feed valueN
parameter will be undefined
.
If ProjectFunction
returns undefined
:
- no event is emitted, if
ProjectFunction
was not called for this key before or if previous call resulted innull
del <key>
is emitted, ifProjectFunction
previously had returned value
Example:
import { Subject } from "rxjs";
import { feedCombine } from "rxjs-changefeeds";
const a$ = new Subject();
const b$ = new Subject();
const project = (a, b) => ({ a, b });
const result = feedCombine([a$, b$], project);
result.subscribe(console.log);
a$.next(["set", "x", "ax"]);
a$.next(["set", "y", "ay"]);
// output:
// ["set","x",{"a":"ax"}]
// ["set","y",{"a":"ay"}]
b$.next(["set", "x", "bx"]);
// ["set","x",{"a":"ax","b":"bx"}]
a$.next(["del", "x"]);
// ["set","x",{"b":"by"}]
b$.next(["del", "x"]);
// ["del","x"]
Subjects
new ChangeFeedReplaySubject()
Useful when necessary to convert from hot changefeed to cold changefeed.
Similar to ReplaySubject, but replays only last record per key.
If input item is deleted with del
, then it's removed from ChangeFeedReplaySubject
internal state and won't trigger any events in subsequent subscribers.
import { ChangeFeedReplaySubject } from "rxjs-changefeeds";
const subject = new ChangeFeedReplaySubject();
subject.subscribe({
next(val) {
console.log("A:", val);
}
});
subject.next(["set", 1, "one"]);
subject.next(["set", 2, "two"]);
subject.next(["set", 1, "onePlus"]);
// output:
// A: ["set",1,"one"]
// A: ["set",2,"two"]
// A: ["set",1,"onePlus"]
subject.subscribe({
next(val) {
console.log("B:", val);
}
});
// output:
// B: ["set",1,"onePlus"]
// B: ["set",2,"two"]
subject.next(["del", 1]);
// output:
// A: ["del",1]
// B: ["del",1]
subject.subscribe({
next(val) {
console.log("C:", val);
}
});
// output:
// C: ["set",2,"two"]