Closed bbarker closed 2 years ago
@OlivierBlanvillain When you discuss the example of val out = rx1.dropIf(_ % 2 == 0)(-1).merge(rx2.dropIf(_ % 2 == 0)(-1))
, are you assuming the dropIf
rxs may be registered at different times than the merge
rx? If so, I can easily see how this could give rise to issues, though in the example I pushed where they are all at the same time, I do not see how it can be any worse than tail-shareable. Still, I guess we must always assume "non-temporally transparency", but I am just curious.
I just tried, I could't make an trace for that example that would be worse than tail-shareable. I must have messed up at the time, which means it might compose better than I thought :smile:
are you assuming the dropIf rxs may be registered at different times than the merge rx?
That would be impossible, out
is a "Rx expression" that can only be interpreted at a single point of time, as a whole. There is no way to "register" a sub expression of a Rx and use the result as part of a bigger "Rx expression": it would break RT.
Right, I get that different registration points can't exist for this particular expression contained in out
; maybe it gets more complicated when we break down the expressions individually, which would allow certain time delays to happen between the registration of the various rxs, which is what I was thinking when I said non-temporally transparent (which I guess is just another way of saying not always-shareable).
There is no way to "register" a sub expression of a Rx and use the result as part of a bigger "Rx expression": it would break RT.
OK, I understand the sentiment but I think I'm still missing something. Isn't that essentially what we are doing with showing different registration points in these marble diagrams? I suppose one way to do that in code would be to have different threads, with different delays (artificially performed with Thread.sleep(x)
, but more likely due to user input or network latency in the wild).
That wouldn't be only thing that falls apart if someone uses the current implementation from multiple thread, thankfully that's not an issue on js :smile:
@OlivierBlanvillain Oh right, I proved to myself that the right way to do this in JS is probably using setTimeout
. I added an example for merge
in the latest commit - let me know what you think about the slight test restructuring. If that's all good, I can think of two things to do next:
I'm working on a proof (it doesn't look that hard), I just found an interesting example to shows why we need to wait for at least two emitted elements before sharing:
val numbers: Rx[Int]
val odd: Rx[Int] = numbers.dropIf(_ % 2 == 0)(5).dropRepeats
// numbers => 0 0 3 4 5 7 ...
// odd1 => •5 3 5 7 ...
// odd2 => •5 7 ...
// ^ no 5 here for odd2
(it doesn't look that hard)
I'm glad you think so! I wouldn't mind trying it but I'm sure it would be slow going for me anyway for more than one reason :smile:
That's certainly an interesting example, particularly since dropRepeats
is always-shareable.
Here is what I have, it's not very polished but I think it works. To summaries, tail-shareability (and the proof below) breaks for interpretation with internal state, which includes zip
, dropRepeats
and sampleOn
. The way to see it is as follows: if this internal state holds an initial value, then the Rx won't be shareable. For instance, in the following zp2
will hold an initial value 9
and use it in all its outputs, which makes it non shareable:
val r1: Rx[Int]
val r2: Rx[Int]
val zp: Rx[Int] = r1.zip(r2.dropIf(_ % 2 == 0)(9))
// r1 => 0 3 4 5 6 ...
// r2 => 0 1 ...
// zp1 => •(0,0) (3,0) (4,0) (5,0) (6,0) ...
// zp2 => •(0,9) (3,9) (4,9) (5,9) (6,9) ...
I think instead if tail-shareability we should use another property to decide when to switch into "shared" mode (a switch that needs to happens at runtime, after variable number of emitted values). I'm thinking about something like the following: two interpretations i1 and i2 are shareable when the internal state of both i1 and i2 only depends on values emitted from its underlying Rx
. This will never be true for folds, and it the above example it will be false for as long as r2
doesn't emits an even value.
Anyway, here is the proof I wrote yesterday. It's not super useful as is given that it only covers map
, merge
, collect
but I think the structure could be reused with the more general property stated above.
Definition: a Rx
is tail-shareable if any two interpretations i1 and i2 of that Rx
are equivalent after they emitted their first value. Two interpretation are equivalent if there is no observable difference between canceling i1 or canceling i2.
Lemma 1: Values emitted by interpretations of Rx
composed of map
, merge
, collect
and Vars
are either 1. the first value of that interpretation or 2. triggered by an update from one of the Vars
.
Proof: by structural induction on Rx
with the above structure. Vars
form the base case and Map
, Merge
and Collect
the inductive case. We inspect the implementation:
The of interpretation Vars
forwards to the underlying foreach which implements what is stated in the Lemma.
case leaf: Var[A] =>
leaf.foreach(effect)
Map
interprets the underlying Rx
with a pure function: holds by induction hypothesis
case Map(self, f) =>
run(self)(x => effect(f(x)))
Merge
discards the first value of the other Rx
, then holds by induction hypothesis
case Merge(self, other) =>
val c1 = run(self)(effect)
var discard = true
val c2 = run(other) { x =>
if (discard) discard = false
else effect(x)
}
Cancelable { () => c1.cancel; c2.cancel
Collect
first value is either fallback or comes from the underlying Rx
m then it holds by induction hypothesis
case Collect(self, f, fallback) =>
var first = true
run(self) { a =>
val out =
if (f.isDefinedAt(a))
effect(f(a))
else if(first)
effect(fallback)
else ()
first = false
out
}
Lemma 2: The interpretation of Rx
composed of map
, merge
, collect
and Vars
is either pure (no var/mutation) or effectively pure after emitting the first value (it has two states, one to emit the first value and another one for all subsequent values).
Proof: by structural induction on Rx
with the above structure. Vars
form the base case and Map
, Merge
and Collect
the inductive case. We inspect the implementation:
Vars
: no var/mutation (one also has to look inside the implementation of foreach)
case leaf: Var[A] =>
leaf.foreach(effect)
Map
: no var/mutation
case Map(self, f) =>
run(self)(x => effect(f(x)))
Merge
: discard stays false after the first element
case Merge(self, other) =>
val c1 = run(self)(effect)
var discard = true
val c2 = run(other) { x =>
if (discard) discard = false
else effect(x)
}
Cancelable { () => c1.cancel; c2.cancel
Collect
: first stays false after the first element
case Collect(self, f, fallback) =>
var first = true
run(self) { a =>
val out =
if (f.isDefinedAt(a))
effect(f(a))
else if(first)
effect(fallback)
else ()
first = false
out
}
Theorem: Rx
composed of map
, merge
, collect
and Vars
are tail-shareable.
Proof: Given two interpretations i1 and i2 of such Rx
, consider any non first value emitted by i1. By Lemma 1, this value is triggered by an update from one of the Vars
, which by constructions must necessarily be at leaf position. By Lemma 2, both interpretation are effectively pure, meaning that i2 will necessarily produce the same result i1. In other words, i1 and i2 are equivalent, and the Rx
is tail-shareable.
@OlivierBlanvillain proof looks good afaikt, I read it on the way down but then got pretty busy/lazy with other things during the holiday :). Just posting a few of my notes here before continuing on:
dropIf
/keepIf
simply call collect
, so we get them for free.I think instead if tail-shareability we should use another property to decide when to switch into "shared" mode (a switch that needs to happens at runtime, after variable number of emitted values). ...
Nice, for this, once the proof is finalized, this means we could simply check to see if the given Rx still maintains/uses internal state, and if not, set it to sharing. One way to do this (not sure if the best way, or even if it works for sure) would be to change Sharing
to be a trait, then have all Rx's that may be eventually shareable extend it, and use the members of Sharing
appropriately. However, at the very least that might result in (non-trivial) ode duplication and make things messier, so ideally we could instead do something like:
trait Stateful {
// Assume using state initially to be safe
usingState = true
}
// Change Merge to exted Stateful and change Sharing to require Stateful parameter
final case class Merge [A, B >: A](self: Rx[A], other: Rx[B]) extends Rx[B] with Stateful
final case class Sharing [A] (self: Rx[A] with Stateful)
//Just added an if/else at the outer level to existing Sharing implementation
case rx @ Sharing(self) =>
if (self.usingState) run(self)(effect)
else {
if (!rx.isSharing) {
rx.sharingCancelable = run(self)(rx.sharingMemo.:=)
}
val foreachCancelable = rx.sharingMemo.foreach(x => effect(x.asInstanceOf[A]))
Cancelable { () =>
foreachCancelable.cancel
if (rx.sharingMemo.subscribers.isEmpty) {
rx.sharingCancelable.cancel
rx.sharingCancelable = Cancelable.empty
}
}
}
case Merge(self, other) =>
val c1 = run(self)(effect)
usingState = true
val c2 = run(other) { x =>
if (usingState) usingState = false
else effect(x)
}
Cancelable { () => c1.cancel; c2.cancel }
@OlivierBlanvillain
Here is a short sketch of the remaining cases, handled under a modified version of the proof where we only require that the Rx
is eventually state-free ("effectively pure"); certainly this is far from formal, but if it looks good enough, maybe lets implement and clean up the proof later?
zip
- Appears to be eventually shareable, depeding on its underlying Rx
s: in the implementation of Zip
, v1
and v2
are stateful representations of the values in the underlying Rx
s self
and other
: only after both self
and other
themselves have become eventually shareable, does the zipped Rx
become shareable.dropRepeats
is eventually shareable, see the following discussion. dropRepeats
as late as possible.sampleOn
is similar to zip
: currentA
maintains state that reflects the value of the self
Rx
, and has no other intracies, therefore SampleOn
is shareable once both the self
and other
Rx
s are shareable.Regarding flatmap
, as discussed it maintains no persistent state, but is not necessarily effetively pure as it depends not only on an underlying Rx
, but on the emitted Rx
type. So no guarantees can be made, though it is possible we may be able to optimize sharing if we can dynamically change it each time the shareability of the emitted Rx
class changes. However, we'll probably want to think about how to implement that later after implementing initial optimization for the simpler cases.
@bbarker I agree on everything you said expect the dropRepeats
case. I have the intuition than as soon as the previous value hold inside the dropRepeats
is clean from any "initial propagation" value, then its becomes shareable. If you have a diagram in mind please write it down, it's easier to discuss with a diagram at hand :)
@OlivierBlanvillain I see what you mean now; though state is maintained, it doesn't affect the output value, other than potentially at initialization depending on the underlying Rx. It does affect when the Rx has an effect, but this will strictly depend on the underlying Rx, so it will be at the same time between shared Rx's.
@bbarker Yes, that's exactly it!
@OlivierBlanvillain One issue is that having sharing at every node might generate unwanted overhead for large Rx graphs; ideally, we could bypass the issue if the interpreter could find out when an Rx node in a graph has multiple dependent Rxs; but it looks like there is probably no way to do this, without adding additional overhead to the Rx, so probably not worthwhile (at least in the first pass). Though the overhead might be minimal, something like subscribers
for Var
, but only a reference count:
object Rx {
//...
var refCount = 0
// Then each rx method can increment like:
def collect[B](f: PartialFunction[A, B])(b: B): Rx[B] = {this.refCount+=1; Collect[A, B](this, f, b)}
}
However, I'm not exactly sure how to deal with decrements to refCount
, which I guess would only happen whenever a particular Rx is gc'd.
I'm not sure I follow, could you give a concrete example?
@OlivierBlanvillain Something like this, where each dashed, black line (other than the first, which is a Var
) is an ordinary Rx that can be shared (shared = blue circles). If the optimizer simply places them anywhere possible, then we wind up with something like the right figure, I think, where ideally, we only need something like the left.
I think the algorithm I have in mind would do the optimal thing in your example. One implementation would be to have a global mutable map from node to interpretation (modulo the sharability logic), whenever an edge is interpreted the global map is first checked for shareable instances, if none is found, then a new interpretation is created for that node and added to the map.
|
edges are rx nodes) var
|
|
/ \
| |
| |
var
|
|x
/
|
|y
run on the left leaf
!
are edges shared with previous interpretations) !x
\
|
|z
run on the right left
!y
run on the left leaf
!z
run on the right leaf
This means that the runtime actually has a single "interpreted" copy of the graph:
var
|
|x
/ \
| |
|y |z
Where x y and z have more than one callback attached to them.
@OlivierBlanvillain This sounds good to me in general, but would this be prone to memory leaks? I assume we'll have a mechanism from removing entries from the mutable map when there are zero callbacks, but more on that below:
Drilling down to the details a bit:
whenever an edge is interpreted the global map is first checked for shareable instances
There still needs to be a way to see if we even need to create a shareable instance, i.e., how does the first or second interpretation in the example above know that x
(for instance) has more than one callback? What are callbacks in this case (I assume these are not something currently in monadic-rx that go by another name, unless we mean the effect
s)?
@bbarker This global mutable map thing is just the first idea than came to my mind, I didn't really think it through. Let me try to answer your questions (brainstorming):
but would this be prone to memory leaks
This should be fine, there is already a mechanism in place to deal with that, I don't see why it couldn't be extended to this sharing thing.
What are callbacks in this case
Yes, I'm talking about what I've called effect
in the current implementation.
There still needs to be a way to see if we even need to create a shareable instance
Indeed this is something that I completely overlooked. The naive solution is to create a dummy shareable node for every actual node, but this might result in quite a bit of wasted resources. (I see that this is what you draw on your right hand side diagram, I get it now!)
An alternative that comes to my mind: keep track of the current callback inserted on every node (in a mutable field). Whenever a second traversal hits on a node to be shared, then create the sharing node on the fly, and "path" the previous callback to go thought that new sharing node instead.
An alternative that comes to my mind:
@OlivierBlanvillain That sounds pretty solid to me; I think the refCount
variable, mentioned above would still be used in this case; I don't anticipate any use cases where we would have a number of references on the order where integer overflow would occur ... but I guess one never knows, but at least I think we can ignore that issue for now.
I have more questions, but at this point I think it might be more clear to try and code something up to see how it goes, so will try to do that soon.
This is a WIP, still need to (and appreciate any help or suggestions for):