xq v0.0.13
XQ - Reactive Promises
XQ is a hybrid between promises and reactive extensions. Its core is a Promises/A+ compliant promises implementation that can also function as a stream.
Think of it as a promise chain that you can push multiple values
down. .then == .map
Get it
Installing with NPM
`bash`
npm install -S xq
...
X = require 'xq'
### Download Source
```bash
git clone https://github.com/algesten/xq.git
Example
XQ as a Promise
X = require 'xq'
X(42).then (v) -> console.log v
#... 42
As a deferred
def = X.defer()
def.promise.then (v) ->
v * 2
.then (v) ->
console.log v
def.resolve 21
#... 42
XQ as an Event Stream
Looping over an array.
X([1,2,3,4]).forEach(v) ->
X(v * 2)
.then (v) ->
console.log v
#... 2
#... 4
#... 6
#... 8
Pushing values
def = X.defer()
def.promise.map (v) ->
v * 2
.then (v) ->
console.log v
def.push 11
def.push 21
def.push 22
#... 22
#... 42
#... 44
Deferred values and Event Streams
XQ deals with event streams. A stream can be thought of as a sequence of values, sometimes followed by an end-of-stream.
v1 - v2 - v3 - end
In XQ everything starts out as a non-ended stream.
def = X.defer()
def.promise.map((v) -> v*2).then (v) -> console.log v
def.push 2
def.push 4
def.push 8
2 - 4 - 8 -
Here we created an event stream and pushed 2
, 4
and 8
down the
stream. After running def.promise
will hold the value 8
. The
following .map
holds the value 16
(8*2) and the final .then
also
holds 16
. The stream is not ended, so we could continue pushing more
values to it. Each step would be executed for each new value pushed.
A resolved promise is an event + end
A promise is just a special stream that always is ended when the promise is resolved/rejected. The following examples are exactly equivalent.
X(2)
def = X.defer(); def.resolve(2); def.promise
def = X.defer(); def.push(2); def.end(); def.promise
OI .oi(f)
OI is a helper for passing multiple values through a promise/event chain. Read about OI.
API
Instantiation
- X(v) creates an instance resolved with value
v
. Equivalent todef.push(v); def.end()
. - X.reject(v) creates an instance that is rejected with value
v
. Equivalent todef.pushError(v); def.end()
. - X.resolver(f)
f
is synchronously called withresolve, reject
which are functions used to resolve/reject the promise.X.resolve((resolve, reject) -> ... resolve(42))
- X.binder(f)
f
is synchronously called withsink
,end
The sink function is used to sink events/errors. The signature of the sink function is(v, isErr) ->
.X.binder((sink) -> ... sink(42)... sink(err,true)
. The end function is to signal stream end.f
can optionally return an unsubscribe function which will be called when the event stream ends. - def = X.defer() creates a deferred value
def
. - def.resolve(v) to resolve the deferred with value
v
. - def.reject(e) to reject the deferred with reason
e
. - def.push(v) to push a value down the chain.
- def.pushError(e) to push an error down the chain.
- p = def.promise to get the promise from the deferred.
State
- p.isEnded() tells whether the stream has been ended.
- p.isPending() tells whether the promise is pending. Equivalent to
!p.isEnded()
. - p.isFulfilled() tells whether the promise is resolved.
- p.isRejected() tells whether the promise is rejected.
- p.onEnd(f) will call
f
when stream is ended. Returns self. - p.endOnError() makes stream stop on first encountered error. Returns self.
- p.stop() Immediately stops the stream. After this
isEnded()
will be true. Returns self.
Chaining
- p.then/map(fx,fe) attaches
fx
to receive value pushed down the chain. Optionally attachesfe
to receive errors. - p.fail/catch(fe) attaches
fe
to receive errors. - p.always/finally/fin(f) attaches
f
to receive both values and errors. The signature forf
is(v, isError) ->
where the second argument is a boolean telling whether the received value was an error. - p.serial(fx,fe) exactly like
then/map
but ensures only one argument is executed at a time. Additional events are buffered up and executed one by one. See section on everything being parallel. - p.once(fx) promise for the first event/value from a stream/promise. Automatcially ends when first value is received.
- p.settle(fx,fe) promise for the last event/value from a
stream/promise. will effectively block a stream to settle before
releasing. arguments like
.then
Arrays and Objects
- p.forEach/each(fx) attached
fx
to receive values. If the value is an array, it will invokefx
one by one. I.e.[a0,a1,a2]
will invokefx(a0)
,fx(a1)
,fx(a2)
- p.singly/oneByOne(fx) serialized version of
forEach
. Each value in the array is fed to the function only when the last value is finished. This mainly makes a difference for deferreds. See section explaining forEach and singly. - p.spread(fx) attaches
fx
to receive values. If the value is an array, the array will be destructured to arguments infx
. I.e.[a0,a1,a2]
will invokefx(a0, a1, a2)
. Non-array values will be invoked as first argument (f(v)
). - p.all(fx) attaches
fx
to receive resolved arrays/objects. If the value to be executed is an array of promises[p1,p2,...]
,fx
will only be invoked when all promises are resolved and will be receving an array with the first resolved values. For objects the function inspects each top level property (no deep inspection).{a:p1,b:p2,...}
will result in an object with the resolved values bound to the same keys. Any promise failing will abort and reject with the error of that promise. For streams it ensures there is a pushed value, it keeps the first one received regardless of there being more. - X.all(v) same as
X(v).all()
. - p.snapshot(fx) like
.all
, but uses current value instead of first. See section about the difference between all or snapshot. - X.snapshot same as
X(v).snapshot()
. - X.oi(f) chaining helper function with signature
(i,o)
. See oi doc.
Multiple
- X.merge(s1, s2, ...) merges the variable number of promises/streams to one. The resulting stream will end when all parts have ended.
Filtering
- p.filter(f) apply function
f
to each value. Iff
returns a truthy, the original value will be released down the chain. - p.find(f) exactly like
filter
. but only the first value is released down the chain, and step is closed.
Everything is parallel
Every operation in XQ is potentially executed in parallel (in a process.nextTick). For non-deferred values this is mostly never noticable.
The result of this operation will come out in the order of the array.
X([0,1,2]).forEach (v) -> v*2
There is however one situation with endOnError
where it may
matter. A somewhat contrived example.
# This doesn't work as expected!!!
X([0,1,2]).forEach (v) ->
throw new Error('fail') if v == 1
.endOnError()
.then (v) ->
#... will see 0 and 2
The user may expect the last .then
to never receive the 2. However
since all values are fed into forEach
in parallel, the error will
happen too late to stop the 2. To fix this use forEach().serial()
.
Parallel deferreds
When using deferreds the order is not guaranteed.
url1 = 'http://www.google.com/'
url2 = 'http://github.com/'
url3 = 'http://www.reddit.com/'
X([url1,url2,url3]).forEach(doRequest) # returns a promise for result
.map (result) ->
# ... ?
Depending on how slow the requests were, the .map
operation will
receive the result in any order. To fix it, we can use
forEach().serial()
which ensures that each url fed to doRequest will
return a fulfilled promise before the result is passed on to
.map
. This however means each requests will run serially.
Strategy for unwrapping deferreds
The principle for unwrapping deferreds is to unwrap on exit of each step.
X(X(42)).then((v) -> X(v)).then (v) -> #... look ma, v is still 42!
If we break down this sequence.
- Each
X()
is a step like all others. It can be thought of as.then (x) -> x
(a bit more involved since it handles errors). X()
wraps a deferredX(42)
.- On the exit of the outmost
X()
, the innerX(42)
is unwrapped to42
. 42
is therefor fed into the next.then
-step and invoked for the function(v) -> X(v)
.- That function once again wraps
v
(42) into aX(v)
which on the exit of that same.then
-step is unwrapped again back to42
- The last
.then
-step is therefore also just fed42
.
forEach has a serial pitfall
When using forEach
in combination with arrays of promises, there is
a potential pitfall. forEach
is also parallel and does not wait for
one deferred to finish before feeding the next, which means the
following code would execute doSomething
in parallell for the values
of the array.
# forEach does not work serially!
p1 = makePromise()
p2 = makePromise()
p3 = makePromise()
X([p1,p2,p3]).forEach (p) -> p.then(doSomething)...
.forEach().serial() is not serial
A mistaken attempt at fixing this would be to use serial
, as in
.forEach().serial (p) ->...
but this does not work. Having no
function to forEach
would be the equivalent to (x) -> x
and any
deferred would be unwrapped on the exit of that forEach
-step. This
means all deferred have been unwrapped in parallel already before the
invocation of .serial()
.
.singly() does things serially.
.singly()
(or alias .oneByOne()
)is a serialized version of
.forEach()
.
# singly is serial
p1 = makePromise()
p2 = makePromise()
p3 = makePromise()
X([p1,p2,p3]).singly (p) -> p.then(doSomething)... # is done one by one
It will queue up each value of the array to be executed one after
another. That means .singly (v) -> X(doSomething(v))
would wait with
feeding another value to the function until the previous has been
unwrapped. The same goes for the non-argument .singly()
.
.all or .snapshot
.all
takes the first value .snapshot
takes the current. This can
be illustrated in beautiful yet informative ascii art.
.all
stream1 a3 - a2 - a1 -> [ ]
stream2 b3 - b2 - b1 -> [ ]
stream3 c3 - c2 - [c1] ->
At this point .all
has not resolved, only value c1
in stream3 has
been. For the three streams moving into the .all
array, only the
first value will be used. Hence when the promise for .all
resolves,
we will get an array with the values [a1,b1,c1]
, the first three
values of the three streams.
For snapshot however:
.snapshot
stream1 a3 - [a2] - a1 ->
stream2 b3 - b2 - [b1] ->
stream3 [c3] c3 - c2 - c1 ->
At the point when all incoming streams have a value (stream2 being
the last), both stream1 and stream3 have taken other values. Hence
the snapshot when it resolves is the current
state [a2,b1,c3]
.
Interoperability with other .then-ables
XQ tries to play nice with other promise packages. It can both wrap and receive other promises.
X(Q(42)).then (v) -> ...42
X().then(-> Q.reject(42)).fail (v) -> ...42
Why a hybrid?
I like promises such as Q I also
like reactive extensions (FRP). However I don't like the API that
comes with libraries such as
RxJS,
Bacon.js etc. My biggest beef is with
something rx-people call flatMap
.
Comparison of Q and Bacon.js
Q(42).then((v) -> Q(2*v)).then (v) -> ...v is 84
Bacon.once(42).map((v) -> Bacon.once(2*v)).flatMap().onValue (v) -> ...v is 84
For promises we continue a chain with .then .then .then
. It doesn't
matter whether the returned value in a step is a promise for a value
Q(2*v)
or a non-deferred.
With regards to the second .then
, these two chains are equivalent.
.then(-> 4).then (v) ->
.then(-> Q(4)).then (v) ->
In the rx-world things are not so easy. As long as you just transform
simple (non-deferred) values, you keep using .map
, however if you
dare returning a deferred value (observable or event stream) you
probably want .flatMap
.
In XQ then
== map
, there is no difference and a deferred value is
just a special case of an event stream.
Another weirdness is the idea of lazy streams. It is very important to
end a rx-style chain with something that "subscribes"
(i.e. .onValue()
, .subscribe()
, .onError()
etc). It could be
argued that this makes a more obvious distinction between functions
and side effects, but I'm still not convinced. I really don't find
myself ever creating random streams that I end up not using (i.e. not
subscribing to). The point of laziness seems pedantic and unnecessary.