Closed rpominov closed 9 years ago
Have an idea of how to fix the mess with subscription methods:
{emit, error, end, emitEvent}
to {value, error, end, event}
Observable.prototype.observe()
that accepts an object with same shape as emitter
, and returns a dispose
function.observe()
called with a function as an argument, it automatically converted to {value: passedFn}
Before | After |
---|---|
obs.onValue(fn); obs.offValue(fn); |
let dispose = obs.observe(fn); dispose(); |
obs.onError(fn); obs.offError(fn); |
let dispose = obs.observe({error: fn}); dispose(); |
obs.onValue(fn1).onError(fn2); |
obs.observe({value: fn1, error: fn2}); |
In the field it might look something like this (code from here):
theResult.observe({
value(x) {
outputElement.innerHTML = x;
},
error(e) {
outputElement.innerHTML = `<span style="color:red">${e}</span>`;
}
});
Full disclosure: idea of same interface for observer and emitter borrowed from RxJS, returning function to unsubscribe — from Bacon, the method name — from Most
We should also look to where https://github.com/jhusain/asyncgenerator is moving. As I understand, current proposition for subscribing in there looks like this:
stream[@@observer](generator);
In other words:
stream[@@observer]({
next(x) {/* one can return `{done: true}` from here to unsubscribe */ ...},
throw(err) {...},
return(x) {...}
});
Note that it supports only one error, while Kefir's streams don't end on a error, and can produce many.
Updated: looks like discussion on observable interface moved to https://github.com/zenparsing/es-observable
For .valuesToErrors/errorsToValues
we can simply remove handler
argument and leave only default functionality — convert all values to errors (or other way around).
For custom converting one can use .flatMap/flatMapErrors
:
obs.valuesToErrors(x => {
return {convert: x < 0, error: 'Value must be 0+'};
});
↓
obs.flatMap(x => {
return x < 0 ? Kefir.constantError('Value must be 0+') : Kefir.constant(x);
});
Still don't like the method names though...
Update
Actually better if not remove handler
argument, but make it (value) => error
function — methods will converts all events, but user will be able to map value to error using this method.
From other perspective, why don't simply remove both those methods and be done? One can use .flatMap/flatMapErrors
for everything:
obs.valuesToErrors();
↓
obs.flatMap(Kefir.constantError);
obs.valuesToErrors(x => "bad x: " + x); // with simplified `handler`
↓
obs.flatMap(x => Kefir.constantError("bad x: " + x));
Perf differences:
A: s.valuesToErrors(), B: s.flatMap(Kefir.constantError)
----------------------------------------------------------------
Kefir A x 12,898,299 ops/sec ±2.00% (67 runs sampled)
Kefir B x 7,890,406 ops/sec ±2.38% (63 runs sampled)
-----------------------
Kefir A 1.00 Kefir B 0.61
A: s.valuesToErrors(fn), B: s.flatMap(fn)
----------------------------------------------------------------
Kefir A x 10,967,354 ops/sec ±2.13% (68 runs sampled)
Kefir B x 7,725,422 ops/sec ±2.06% (65 runs sampled)
-----------------------
Kefir A 1.00 Kefir B 0.70
Sometimes I rely on onValue
to be invoked immediately for initialization:
prop = K.combine(...).flatMap(...) [ .flatMap(...).flatMap(...) ]
...
a
prop.onValue(val => b)
c // assumes b has run and will break otherwise
But I have take care to track that prop
will have the right resulting type (ie. a property), because any change in how it's constructed can change this.
This makes me think that onValue
should be two differently named functions (or some equivalent design), where one ensures that the subscriber will be invoked immediately and one ensures that it is not. It is dangerous that properties and streams' onValue
behavior semantics are different.
So properties would have prop.observeProp(subscriber)
and streams would have stream.observeStream(subscriber)
, such that observeProp
differs from observeStream
in that it calls the subscriber with the current value right away. (Alternative names: run
and listen
)
This would help code readability versus onValue
and give an early error in case of type mismatch at runtime.
I think errors should be more like how they work in promises/A+ - the goal is to propagate an exception that occurs asynchronously somewhere to somewhere it can be acknowledged and handled. As such, error events should be an end state that propagates to every dependent stream and finally the observers and unsubscribes them.
At the same time, a stream library has the job of ensuring that the user's code runs continuously, so it would be bad to just unsubscribe observers.
So what if the default error handler for observers did this:
var observer = stream.observe({
value() { ... },
onError(err) {
console.error('Stream error:', err.stack || err)
observer.reset()
},
})
(.observe
now returns an object.)
The reset
function would (later) resubscribe the same observer. Since an error unsubscribes all observers at the same time this should reset the state of all streams where the error occurred, but observers which are not interested in bailing on error will resubscribe afterwards.
This solves the issue you detailed in #102.
All error manipulation functions like valuesToErrors
/errorsToValues
would not be needed.
@fixplz
Yeah, maybe to have two methods for subscribing (one that may call callback synchronously, and one that always calls callback asynchronously i.e., zalgo-safe) is a good idea.
This is also discussed on observable proposal for ES: https://github.com/zenparsing/es-observable/issues/25
Also note that not only properties may call callback synchronously, but streams as well: http://rpominov.github.io/kefir/#current-in-streams
I've been trying to use observe()
and find it to be clunky, because it can't be chained. Not really looking forward to on*
and off*
being dropped because of this. I'm new to FRP, so I'd guess this complaint indicates I'm doing something wrong.
The need to create a variable to "fork" a stream is bothersome; observe()
compounds the problem:
// contrived example
const s1 = Kefir.fromEvents(obj, 'foo');
s1.onValue(foo => {
console.log(`foo: ${foo}`);
})
.filter(foo => foo === 'bar')
.onValue(() => {
console.log('got bar');
})
.combine(s1.filter(foo => foo === 'baz')
.onValue(() => {
console.log('got baz');
}))
.onValue(combined => {
console.log(combined);
});
// observe-style
const s1 = Kefir.fromEvents(obj, 'foo');
const s2 = s1.filter(foo => foo === 'bar');
const s3 = s1.filter(foo => foo === 'baz');
const s4 = s2.combine(s3);
s1.observe(foo => {
console.log(`foo: ${foo}`);
});
s2.observe(() => {
console.log('got bar');
});
s3.observe(() => {
console.log('got baz');
});
s4.observe(combined => {
console.log(combined);
});
But again, there's probably an easier/better way to do this that I'm missing.
Tangentially, emitter.emitEvent({type: 'value', value: someValue})
is kind of tedious. This satisfies the "smaller API" goal, but not necessarily the "simpler" one.
I realize emitter.emit()
is not gone (yet), but perhaps keeping emitEvent()
alongside renamed emitValue()
(emit()
), emitEnd()
(end()
) and emitError()
(error()
) would be a reasonable compromise?
Hi, @boneskull .
Your example in observe-style actually seems much clearer to me. That's because pure relationship between streams and side effects are separated into two parts. This is exactly what I usually try to achieve: express as much of the program in the form of pure relationship of observables, and then add side effects to some of observables from that network.
With abstract example It's hard to understand why you feel a need to add observers in between chains of observables, so it's hard to give an advice how it could be done in more idiomatic FRP way. Maybe with a more real example I could give some advice.
emitter.value(value)
, emitter.error(error)
, emitter.end()
, and emitter.event(event)
are not going to be removed. This is stable API.
Version 3.0.0 probably won't happen very soon, but we can start collecting ideas for it already. In particular, I think the API can be made more solid and less hairy in some places.
I'm personally not very happy with following parts:
.valuesToErrors/errorsToValues
(method names and returning value shape).on*/.off*
.endOnError
probably should be.takeErrors(n)
to line up with.take(n)
Basic rules:
Example of such a change (already done): add
.last()
, but remove.reduce()
, so one can do.scan().last()
instead of reduce, but also can do just.last()
.Any ideas?
Related: #71, http://www.youtube.com/watch?v=4anAwXYqLG8