tream v0.1.4
Lightweight lazy streams in TypeScript
Data types
The first abstraction of lazy streams representation called Pull.
It's a function which must be called by consumer side to get a next value from the stream.
In response to get value request the producer side must send value or terminate stream.
The Pull function has two arguments: push and done, which can be used in this purposes.
Note: Implementation requires that the push and done both must be deferred.
Also the consumer side can cancel getting value using function which returned from Pull.
You cannot operate with streams when pull request is started but you can either wait for push response or cancel request immediatelly by undo operation.
In both cases you get the pull to make next requests.
The done response means the end of stream. The ended (or terminated) stream cannot be used to get next values.
Pull\
The Pull function defined as:
interface Pull<Type> {
(push: Push<Type>, done?: Done): Undo<Type>;
}In common words Pull type represents source or stream.
Push\
When stream as active (i.e. not ended) it can send values in response of Pull calls by calling Push function.
The Push function defined as:
interface Push<Type> {
(val: Type, pull: Pull<Type>): void;
}It sends to consumer the value and the next Pull which can be used by consumer to get the next value and so on.
Done
In case of ended (or terminated) stream it must send end-of-stream signal by calling Done function.
The Done defined as:
interface Done {
(): void;
}Undo\
The Undo function purposed to cancel pull request and defined as:
interface Undo<Type> {
(): Pull<Type>;
}The returned Pull function can be used to getting the next values from stream.
None
The value none of type None is used instead of special values like null or undefined in order to represent the lack of values. This technique allows work with special values (null and undefined) in streams like with any other ordinary values without exceptions.
Maybe\
The type Maybe is intended to represent values which may be lack. The Maybe type defined as:
type Maybe<Type> = Type | None;Simple usage example:
import { Maybe, none, some } from 'tream/stream';
let str: Maybe<string> = none;
if (some(str)) {
// do something with 'str'
console.log(str.length);
}Creating streams
The first thing which we would like to do in order to operate with streams is a creation of streams. Actually there is a much ways to do this.
Special streams
empty: The stream which has no values and ends immediately.never: The stream which not sends values and never ends.once(value): The stream which sends single value and ends.repeat(value): The stream which sends same values and never ends.
Generators
Generator is a function which returns some value each time when it called.
Also generator may return none to end the stream.
The particular case of generator is an iterator which gets an array and sends it values from first to last.
import { generate, iterate } from 'tream/stream';
const n = generate(1, s => [s * (s + 1), s]); // => 1, 2, 6, 42, 1806, ...
const v = iterate([1, 2, 3, 4, 5]); // => 1, 2, 3, 4, 5Channels
Channel is a pair of sink and stream which co-exists independently. In other words the sink can be used to send values and the stream will get all of it which is sended through channel. So the producer side doesn't need awaiting get value requests from consumer side.
import { channel, collect } from 'tream/stream';
const [[send, end], src] = channel<number>();
send(1);
send(2);
setTimeout(() => {
send(3);
end();
}, 150);
collect(src)(val => {
console.log(val); // => [1, 2, 3]
});Forks
Same stream cannot be used multiple times directly but a Fork abstraction makes it possible.
The Fork is a function which might been called to clone stream.
This function is a result of applying operation fork to the source stream.
import {iterate, fork, collect} from 'tream/stream';
const src = fork(iterate([1, 2, 3]));
const a = src();
const b = src();
const c = src();
collect(a)(val => { console.log(val); }); // => [1, 2, 3]
collect(b)(val => { console.log(val); }); // => [1, 2, 3]
collect(c)(val => { console.log(val); }); // => [1, 2, 3]Timers
TODO...
Request
Streaming algebra
To operate with streams in monadic style developed so called streaming algebra.
The streaming algebra includes the set of operations which takes a streams and creates new streams as result. Besides streams it also take functions and values which help control behavior of operations.
Map
The map operation is purposed to convert values using some function which take a value and returns new.
import {map, iterate} from 'tream/stream';
map(v => v * v,
iterate([1, 2, 3])); // => [1, 4, 9]
map(v => `a=${v}`,
iterate([1, 2, 3])); // => ["a=1", "a=2", "a=3"]Filter
The operation filter was designed to filter values in stream using some function which take a value and returns boolean.
import {filter, iterate} from 'tream/stream';
filter(v => v > 0,
iterate([-1, 0, 2, 0.1])); // => [2, 0.1]Filter Map
The operator filter_map combines filtering and mapping.
import {filter_map, iterate, none} from 'tream/stream';
filter_map(v => v > 0 ? v * 2 : none,
iterate([-1, 0, 2, 0.1])); // => [4, 0.2]To remove values from stream the function must return none.
Scan
The operator scan is a form of filter_map with state.
import {scan, iterate, none} from 'tream/stream';
scan(1, (s /* previous state */, v) =>
[s + 1 /* next state */, v > 0 ? v * s : none],
iterate([-1, 0, 2, 0.1])); // => [6, 0.4]Forward
import {repeat, iterate, forward} from 'tream/stream';
forward(() => repeat(4), iterate([1, 2, 3]));
// => [1, 2, 3, 4, 4, 4, 4, ...]Then
import {repeat, iterate, taken, then} from 'tream/stream';
then(v => taken(v, repeat(`${v}`)), iterate([1, 2, 3]));
// => ["1", "2", "2", "3", "3", "3"]Each
import {collect, id, map, taken, each} from 'tream/stream';
import {interval} from 'tream/timer';
collect(each(val => map(id(val), interval(15)),
count(taken(4, interval(50)))));
// => [1, 1, 1, 2, 2, 2, 3, 3, 3]Take
// TODOHead
// TODOSkip
// TODOTakeN
import {iterate, taken, collect} from 'tream/stream';
collect(taken(3, iterate([1, 2, 3, 4, 5])))
(val => { console.log(val); }); // => [1, 2, 3]SkipN
import {iterate, skipn, collect} from 'tream/stream';
collect(skipn(2, iterate([1, 2, 3, 4, 5])))
(val => { console.log(val); }); // => [3, 4, 5]Fold
import {iterate, fold, collect} from 'tream/stream';
collect(fold(0, (pre, val) => pre + val,
iterate([1, 2, 3, 4, 5])))
(val => { console.log(val); }); // => 15Collect
import {iterate, collect} from 'tream/stream';
collect(iterate([1, 2, 3]))
(val => { console.log(val); }); // => [1, 2, 3]Last
import {iterate, collect} from 'tream/stream';
last(iterate([1, 2, 3]))(val => { console.log(val); });
// => 3Select
import {id, map, join, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';
collect(taken(5, join([
map(id(100), interval(100)),
map(id(30), interval(30))
])))(val => { console.log(val); });
// => [30, 30, 30, 100, 30]Combine
import {id, map, combine, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';
collect(taken(5, combine([
map(id(100), interval(100)),
map(id(30), interval(30))
])))(val => { console.log(val); });
// => [[none, 30], [none, 30], [none, 30], [100, 30], [100, 30]]Join
Combine
import {id, map, join, taken, collect} from 'tream/stream';
import {interval} from 'tream/timer';
collect(taken(5, join([
map(id(100), interval(100)),
map(id(30), interval(30))
])))(val => { console.log(val); });
// => [[100, 30], [100, 30]]Chain
The operator chain concatenates streams from the first to the last.
When the first stream is ended, the second begins and so on.
import {once, empty, repeat, iterate, taken, chain, collect} from 'tream/stream';
collect(chain([
once(1),
iterate([2, 3]),
empty,
taken(3, repeat(4))
]))(val => { console.log(val); }); // => [1, 2, 3, 4, 4, 4]