kefirjs / kefir

A Reactive Programming library for JavaScript
https://kefirjs.github.io/kefir/
MIT License
1.87k stars 97 forks source link

Atomic updates #97

Open iofjuupasli opened 9 years ago

iofjuupasli commented 9 years ago
var a = Kefir.sequentially(100, [1,2,3]);
var b = a.map(function(val){return val + 2});
var c = a.map(function(val){return val * 2});
var d = Kefir.combine([b,c], function(b, c){return [b,c]});
d.log()

Output:

[combine] <value> [3, 2]
kefir.js:841 [combine] <value> [4, 2]
kefir.js:841 [combine] <value> [4, 4]
kefir.js:841 [combine] <value> [5, 4]
kefir.js:841 [combine] <value> [5, 6]
kefir.js:843 [combine] <end>

Maybe flyd implementation of atomic updates will help: https://github.com/paldepind/flyd/blob/master/flyd.js#L66-L101

rpominov commented 9 years ago

This is something we want to have from the very beginning, but not easy one to do.

First of all, It pretty hard problem to solve by itself. As I saw in Bacon there was number of attempts to do it properly, before it was done. And even now atomic updates doesn't work with Bacon.Bus, as far as I know.

Secondly this has bad influence on performance, again in Bacon and if I understand it right.

I don't know much about Flyd, but I think it doesn't have concept of activation which simplifies thing a lot, in particular the implementation of atomic updates. But I'll look into it more carefully later perhaps we can borrow something from Flyd indeed.

Perhaps @raimohanska and @paldepind could correct my assumptions.


For now we have following workarounds:

1) Add .debounce(0) after .combineKefir.combine([b, c], fn).debounce(0).

Caveats: It will cut out all events coming synchronously in a group (what we see with not atomic updates, but this is not the only case); еvents will be delayed by 0ms (emitted in the next tick).

2) Pass part of sources as "passive" — Kefir.combine([b], [c], fn)

Caveats: you need to know which observables have common sources and manually put them in different arrays.

paldepind commented 9 years ago

Flyd doesn't have a concept of activation. What value is there in that?

Once a stream updates, Flyd does a topological sort of its dependency tree. That takes O(n) time (i.e. it's pretty fast) so it potentially even lead to better performance since it avoids doing a lot of updates.

It's really not that hard and I don't see how activation would make it harder. You could just skip inactive streams. But there maybe something I'm missing.

rpominov commented 9 years ago

Flyd doesn't have a concept of activation. What value is there in that?

It allows automatic resources management.

For, instance when you create a stream like stream = Kefir.fromEvents(window, 'scroll').map(...).filter(...).take(...) nothing happens until you subscribe to the stream, even if there is a lot of scrolling happening.

Also if a stream is inactive and there is no references to it, then it can be garbage collected.

We have a discussion on removing activation, but current conclusion is that we need it, even though the removing could simplify everything a lot.

paldepind commented 9 years ago

Ah. It's laziness. I have deliberately not included that in Flyd. My conclusion is that it can give performance gains but that any performance issues can be solved by other means so it's not worth the complexity. But that's obviously just my own opinion.

In Flyd every stream has an array of it dependents. I image Kefir might have something similar. That forms a graph. Then I just do a simple topological sort of the graph and update every dependent stream given by the sort. Skipping inactive streams in that process would not be too hard would it?

rpominov commented 9 years ago

If every stream will have list of it's dependents, following wouldn't work:

if a stream is inactive and there is no references to it, then it can be garbage collected.

For instance we have b = a.map() and interested to dispose b. For now doing b = null is enough, but if a will have a link to it as a dependent, that won't work.

paldepind commented 9 years ago

How do you get updates from a stream to it's dependencies if you don't keep a reference in the stream to it's dependencies? Assuming that both a and b are active.

rpominov commented 9 years ago

Well, that why "if a stream is inactive ..." in the equation :) The a has a link to b only when b is active.

So when we not interested in a stream we just need to unsubscribe from it, and remove our reference to it. And if nobody also interested the stream will be garbage collected.

rpominov commented 9 years ago

Hm, although you are right. We need links only to active dependents for atomic updates. Will think about it!

paldepind commented 9 years ago

So if b is active I can't do b = null? I.e. I'll have to remember whether or not it's active. And if I remember (or guess) incorrectly I've introduced a memory leak? That doesn't sound like a good thing to me.

Hm, although you are right. We need links only to active dependents for atomic updates. Will think about it!

Indeed! And based on that doing a topological sort is quite easy (you do a depth first search and add elements to a queue. There are many fine explanations on the web).

rpominov commented 9 years ago

So if b is active I can't do b = null? I.e. I'll have to remember whether or not it's active. And if I remember (or guess) incorrectly I've introduced a memory leak? That doesn't sound like a good thing to me.

You don't have to think of whether it active, instead you need to care only about your own subscription. A stream activates when it gets a first subscriber, and deactivates when the last one is removed. So you only need to unsubscribe and remove reference. If all interested parties do that, the stream will be garbage collected.

paldepind commented 9 years ago

I get it know. It's a nice feature! If you do something like this someSrc.map(fn).filter(fn) you won't have to worry about ending the intermediate stream from map.

rpominov commented 9 years ago

Yep, but comes with a lot of cost.

paldepind commented 9 years ago

I just updated my comment. I think the argument based on intermediate streams is the strongest one.

var myStream = someSrc.map(fn).filter(fn);
// later
myStream.end();
myStream = undefined;

Now I've ended only the stream created by filter but not the one created by map. But no one is listening to that stream so it will automatically be collected. That's nice!

rpominov commented 9 years ago

Yes, except you don't end myStream you only subscribe and unsubscribe to it. And it can end by itself in response of end of someSrc. For example someSrc = Kefir.later(1000, 1) — it emits 1 after 1s then ends.

var myStream = someSrc.map(fn).filter(fn);
myStream.onValue(fn);

// later
myStream.ofValue(fn);
myStream = undefined;

Even if you could end a stream you shouldn't do that because there might be other interested parties.

Something like this also works:

// everything* will be collected after 5 events
someSrc.map(fn).filter(fn).take(5).onValue(fn);

// everything* will be collected after 5 seconds
someSrc.map(fn).filter(fn).takeUntilBy(Kefir.later(5000)).onValue(fn);

(*) except someSrc, if there is other references to it etc.


But it also good to have libraries without this feature like yours, because it has issues. Those that are related to semantics are most annoying. And we can't have simple things like property.getValue() for example.

paldepind commented 9 years ago

Yes, except you don't end myStream you only subscribe and unsubscribe to it. And it can end by itself in response of end of someSrc. For example someSrc = Kefir.later(1000, 1) — it emits 1 after 1s then ends.

Yes. I understand that a stream will end when it's source does.

But you do things a bit differently in Kefir. In Flyd you'll end a stream in order to get it to unsubscribe from it's parents.

But it also good to have libraries without this feature like yours, because it has issues. Those that are related to semantics are most annoying. And we can't have simple things like property.getValue() for example.

I'll have to take a look at those issues because I'm actually seriously considering adding the behavior to Flyd. I don't think it would be that difficult. But Flyd currently does have something equivalent to property.getValue – that's my biggest concern!

rpominov commented 9 years ago

I'll have to take a look at those issues

You can find some in #43 , but it kind of hard to read through. Basically all stateful observables have issues. For instance obs.take(5) won't count events while inactive. Or property won't update its current value while inactive, so we can't have property.getValue().

paldepind commented 9 years ago

For instance obs.take(5) won't count events while inactive.

Auf! I can see that. That behavior is indeed a bit unexpected.

rpominov commented 9 years ago

Returning to original topic of the issue: I've added workarounds to my first comment.

rpominov commented 9 years ago

To implement atomic updates we should first define what we call "simultaneous events".

Let's start with the following definition: "When two events happen in response to one parent event, those events considered as simultaneous". There is at least two cases when we can create such events.

First one is pretty obvious, and I have no question about it:

      a
     / \
    /   \
  map   map
  /       \
 b         c

b = a.map(f1)
c = a.map(f2)

Here b and c streams will contain simultaneous events. And if we combine b and c we want to treat each pair of those events as a single event.

The second case is when we use .flatten():

a ---- flatten ---- b

b = a.flatten(x => [x + 2, x * 2]);

Here b also will contain two events in response to each event from a, so formally it satisfies our definition. But depend on how you look at it, it might not seem right. At least it might be unexpected and undesirable if .combine will also eliminate this kind of simultaneous events.

The definition that don't include .flatten case is: "When two events in two different observables happen in response to one parent event, those events considered as simultaneous".


From implementation perspective it would be easier if we would count .flatten as a case of simultaneous events and also eliminate them, I think.

Looking at how it works in Bacon, it looks like it treat .flatten-like events differently depending on where in the tree flatten is inserted.

In this case it will be eliminated:

        A
       / \
      /   \
    map   map
    /       \
   /       flatten
  /           \
 B             C

But not in this one:

      A
      |
   flatten
      |
     / \
    /   \
  map   map
  /       \
 B         C
rpominov commented 9 years ago

Also, is there a problem when we merge two streams that contain simultaneous events? Should .merge also handle them specially like .combine will be if we implement atomic updates? What about the case when we use result of .merge in .combine?

One problem I see with .merge is that the order in which simultaneous events appear in the result stream is not defined, and actually depends on order in which source streams activates (get first subscribers).

rpominov commented 8 years ago

Related: http://staltz.com/rx-glitches-arent-actually-a-problem.html

raimohanska commented 8 years ago

Those are nice workarounds for the glitch problem. I don't see them as a nice solution though.

IMAO glitch prevention on the library level is a nice feature, but has its obvious drawbacks too (at least in Bacon): additional complexity, global state and degraded performance.

rpominov commented 8 years ago

Yeah, I agree this is not a solution. For now I just don't know how to better approach this problem, and just collecting all related information and opinions.

I just not sure it can be implemented 100% correctly in a dynamic FRP lib (the category to which Rx/Bacon/Most/Kefir belongs by Evan Czaplicki's classification https://www.youtube.com/watch?v=Agu6jipKfYw).

raimohanska commented 8 years ago

History has shown that a perfect 100% score is extremely hard to achieve in that field, you're right. More like, it always seems just right until someone comes up with a failing case.

steve-taylor commented 3 years ago

How about adding a .squash() operator that buffers events in the same tick and flushes them in queueMicrotask? It would emit events earlier than setTimeout and is more expressive than .debounce(0).