Closed bbarker closed 2 years ago
@OlivierBlanvillain I just added a prototype for map, which induces some behavior changes in a couple of Rx tests, that still needs to be debugged (we seem to get extra rounds of execution sometimes).
Afterwards, I'll probably try to refactor the newly added logic and consolicate it somehow so each Rx case class doesn't have 15+ lines of code added.
@OlivierBlanvillain I believe I identified one problem in the current implementation in this branch, which can be seen from the test "Referential transparency with map" in this branch: the rx @ Map binding is using itself recursively, causing an explosion in the number of effects. I'll need better sleep before looking at this more, but I tried to remove it by creating MapProto
which is identical to the old Map
and using that as an internal Map
until it is time to share (however, it gets a StackOverflow and I'm not sure why as yet):
def run[A](rx: Rx[A])(effect: A => Unit): Cancelable = rx match {
case MapProto(self, f) =>
run(self)(x => effect(f(x)))
case Map(self: Rx[A], f) => {
var refCount = 0
var shareRxMaybe: Option[Rx[A]] = None
var mapInitRx: Rx[A] = null
var ccMapInit = Cancelable.empty
var ccSharing = Cancelable.empty
val ccMap = run(self) { x =>
refCount += 1
if (refCount < 2) {
mapInitRx = self.mapProto(x => f(x))
ccMapInit = run(mapInitRx)(identity => ())
}
else {
val shareRx = shareRxMaybe.getOrElse{
val srx: Rx[A] = mapInitRx.impure.sharing
shareRxMaybe = Some(srx)
srx
}
ccSharing = run(shareRx)(effect)
}
}
Cancelable { () => ccMapInit.cancel; ccSharing.cancel; ccMap.cancel }
}
@OlivierBlanvillain In the current working solution for Map, I don't have a good intuition for why Share[A]
has A contravariant, other than avoiding the "GADT Skolem" bug.
@OlivierBlanvillain ehm... I realized that I forgot to add the refCount checks in earlier in my message on gitter, which as largely the point. That is fixed now, but:
There are a few failing tests. I'm not sure if these are a problem; I believe the couple of extra counts in execution in some of the tests may not be a big deal, all things considered, but this may require more extensive analysis, as what we are trying to do here, ultimately, is reduce the number of executions, however, there seems to be some degree of overhead in setting this up (run the proto, or constructed the shared instance and run it). Maybe this can be further optimized.
The test here, I haven't had a chance yet to understand:
[info] - Optimisation: Applicative style is faster than monadic style. *** FAILED ***
[info] List(3, 5, 7) did not equal List(3, 5, 5, 5, 5, 7) (RxTests.scala:296)
@OlivierBlanvillain
I'm not sure exactly what you tried to do in the linked commit, but one way do reduce the overhead for non shared cases would be to register the first callback "as usual" (by that I mean directly interpreting the remainder of the tree standalone, without going thought sharingMemo), then, once we get a second callback, we would "hot swap" the first callback to go thought sharingMemo instead of being wired directly, and register the second callback on sharingMemo.
I believe I tried to do something like this here, which largely has code similar to the most recent commit in this PR, except for the highlighted section. Unfortunately, there is some unintended behavior.
If this is not what you mean by hot swap though, please let me know.
@bbarker I see, but I think this won't be enough. Looking at the code you linked it seems that this will keep the first two interpretation running "standalone" forever. So if you have 20 interpretations, the first 2 will stay on their own, unshared, and the next 18 will all share the same stream.
What I meant by "hot swap" is that, as soon as we detect that we can share we should cancel the initial "standalone" interpretations and to make a new shared one shared between all instance (by swapping some callbacks I would guess).
@OlivierBlanvillain I just pushed what matches your description, as closely as I can think to make it. Unfortunately there are some redundant executions, still. If you see anything obviously wrong with this approach, let me know.
@OlivierBlanvillain I pushed another update; this one is a little messy currently but interesting; if we look at the test "Referential transparency with map", we can clearly see that the asserts for b.impure.value
are working, but the final assert for listb
fails.
If we can't get either of these two solutions working, I propose that we effectively put each Rx inside of a container in the execution graph.
To handwave:
trait Rx { protected[Rx] var innerRx: InnerRx[A] }.
InnerRx
would be what is currently Rx
, and the new Rx
would be the wrapper. We've already demonstrated sharing works when used manually. With this approach, we could manually patch the Rx graph by changing what the innerRx
points to:
if (refCount > 2) {
this.innerRx = this.innerRx.sharing
}
Maybe there's a better way to patch the graph, like using an actual graph data structure of some sort to hold Rx's, but that seems like overkill.
As far as I can tell when refCount
becomes >2 you do not cancel the first interpretation of a Rx. This means that you will effectively have two interpretation running, the "standalone one" for the first interpretation, and a shared one for the following interpretation.
Sorry, that should have been refCount > 1 (as in the code).
On Tue, Mar 6, 2018 at 4:49 PM, Olivier Blanvillain < notifications@github.com> wrote:
As far as I can tell when refCount becomes >2 you do not cancel the first interpretation of a Rx. This means that you will effectively have two interpretation running, the "standalone one" for the first interpretation, and a shared one for the following interpretation.
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/OlivierBlanvillain/monadic-html/pull/94#issuecomment-370940966, or mute the thread https://github.com/notifications/unsubscribe-auth/AA37jsiwdoOC7CW69e8QmFL0_7XdPo6gks5tbwRwgaJpZM4R5_DC .
-- Brandon Barker brandon.barker@gmail.com
@OlivierBlanvillain Was your last comment about this code? :
case rx @ Map(self, f) => {
rx.refCount += 1
var cc = Cancelable.empty
cc = run(self){x =>
if (rx.refCount > 1) {
cc.cancel // ********* Cancelled here ******************
val sharedRx: Rx[A] with Share[A]= rx.sharedRx.getOrElse{
val srx = self.mapShare(x => f(x))
rx.sharedRx = Some(srx.asInstanceOf[Rx[Any]])
srx
}.asInstanceOf[Rx[A] with Share[A]]
cc = sharedRx.share(effect.asInstanceOf[Any => Unit])
}
else effect(f(x))
}
cc
}
I see now, I read it too fast, sorry. But still, if you execute the then branch of this if (rx.refCount > 1)
test on every element of the stream, once refCount
becomes >1
you will cancel and share again on every single emitted element?
@OlivierBlanvillain the cancel part is a problem, but the sharing should be cached in an Option
. But after you pointed that out, I see that the logical structure (where conditionals were inside of the run
) didn't make a lot of sense anyway; this seems to be working much better:
case rx @ Map(self, f) =>
rx.refCount += 1
var cc = Cancelable.empty
if (rx.refCount == 1) {
cc = run(self)(x => effect(f(x)))
}
else if (rx.refCount == 2) {
cc.cancel
}
if (rx.refCount > 1) {
val sharedRx: Rx[A] with Share[A]= rx.sharedRx.getOrElse{
val srx = self.mapShare(x => f(x))
rx.sharedRx = Some(srx.asInstanceOf[Rx[Any]])
srx
}.asInstanceOf[Rx[A] with Share[A]]
cc = sharedRx.share(effect.asInstanceOf[Any => Unit])
}
cc
I'll try to clean it up in a few hours and ping you then.
See the next if block?
On Mar 7, 2018 7:54 AM, "Olivier Blanvillain" notifications@github.com wrote:
I'm very confused by this snippet. Here if rx.refCount == 2 you would just do nothing and return an empty Cancelable?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/OlivierBlanvillain/monadic-html/pull/94#issuecomment-371129416, or mute the thread https://github.com/notifications/unsubscribe-auth/AA37jnjnnEP1sAqBXZYJaYqYP9u4K9wjks5tb9iNgaJpZM4R5_DC .
I guess nested if blocks may be clearer in general, as long as nesting isn't deep.
On Mar 7, 2018 8:00 AM, "Brandon Barker" brandon.barker@gmail.com wrote:
See the next if block?
On Mar 7, 2018 7:54 AM, "Olivier Blanvillain" notifications@github.com wrote:
I'm very confused by this snippet. Here if rx.refCount == 2 you would just do nothing and return an empty Cancelable?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/OlivierBlanvillain/monadic-html/pull/94#issuecomment-371129416, or mute the thread https://github.com/notifications/unsubscribe-auth/AA37jnjnnEP1sAqBXZYJaYqYP9u4K9wjks5tb9iNgaJpZM4R5_DC .
@OlivierBlanvillain well, after I got a chance to sit down and see what was going on, I still don't know why we have one additional redundant execution per event. The good news is nothing is broken, and we aren't getting extra events per depdendent Rx, e.g., if you add a noshare3
in the "Verify sharing memoization and updates", there seems to be no additional penalty.
We seem to be close!
Let i1
and i2
be the first two interpretations of a Map. This is the trace of execution for each interpretation:
case rx @ Map(self, f) => // i1 & i2
rx.refCount += 1 // i1 & i2
var cc = Cancelable.empty // i1 & i2
if (rx.refCount == 1) { // i1 & i2
cc = run(self)(x => effect(f(x))) // i1
} //
else if (rx.refCount == 2) { // i1 & i2
cc.cancel // i2
} //
if (rx.refCount > 1) { // i2
val sharedRx: Rx[A] with Share[A]= rx.sharedRx.getOrElse{ // i2
val srx = self.mapShare(x => f(x)) // i2
rx.sharedRx = Some(srx.asInstanceOf[Rx[Any]]) // i2
srx // i2
}.asInstanceOf[Rx[A] with Share[A]] // i2
cc = sharedRx.share(effect.asInstanceOf[Any => Unit]) // i2
} //
cc // i1 & i2
So, to summarize:
cc_1
allocated a first time for i1
, set to the cancelable returned by run(self)(x => effect(f(x)))
and returned.
cc_2
is then allocated a second time, for i2
(note that this is not the same cc_1
as before, we are in another call of the run method!), cc.cancel
is then called (but this does nothing, given than at this point cc
still has its default value of Cancelable.empty
!)
cc_2
is then set to the output of sharedRx.share(effect)
At this point, neither cc_1
nor cc_2
where canceled, meaning that both interpretation are running on their own. This is what's causing the redundant execution per event.
Thanks for clarifying; I actually sort of realized the issue with not retrieving cc_1 shortly after and tried something like the following, but it breaks RT (everything else works):
case rx @ Map(self, f) =>
rx.refCount += 1
val cc = rx.ccShareRx.getOrElse{
val cctmp = run(self)(x => effect(f(x)))
rx.ccShareRx = Some(cctmp)
cctmp
}
if (rx.refCount == 2) {
cc.cancel
}
if (rx.refCount > 1) {
val sharedRx: Rx[A] with Share[A]= rx.sharedRx.getOrElse{
val srx = self.mapShare(x => f(x))
rx.sharedRx = Some(srx.asInstanceOf[Rx[Any]])
srx
}.asInstanceOf[Rx[A] with Share[A]]
rx.ccShareRx = Some(sharedRx.share(effect.asInstanceOf[Any => Unit]))
}
rx.ccShareRx.getOrElse{throw new RuntimeException("Logic error"); Cancelable.empty}
@OlivierBlanvillain More specifically, the following test fails:
test("Referential transparency with map") {
val a: Var[Int] = Var(0)
val b: Rx[Int] = a.map(identity)
var lista: List[Int] = Nil
val cca = a.impure.run(n => lista = lista :+ n)
var listb: List[Int] = Nil
val ccb = b.impure.run(n => listb = listb :+ n)
assert(b.impure.value == 0)
assert(a.map(identity).impure.value == 0)
a := 1
assert(b.impure.value == 1)
assert(a.map(identity).impure.value == 1)
a := 2
assert(b.impure.value == 2)
assert(a.map(identity).impure.value == 2)
a := 3
a := 4
assert(lista == List(0,1,2,3,4))
assert(listb == List(0,1,2,3,4)) //FIXME: this breaks with:
// List(0) did not equal List(0, 1, 2, 3, 4) (RxTests.scala:69)
cca.cancel
ccb.cancel
assert(a.isCold)
}
Since this is happening as a result of the non-RT api (run), I'm not yet convinced it is a problem.
Actually, looks like RT is still a problem, as this has the same resulting error
test("Referential transparency with map") {
val a: Var[Int] = Var(0)
val b: Rx[Int] = a.map(identity)
var lista: List[Int] = Nil
val aa = a.map{n => lista = lista :+ n; n}
//val cca = a.impure.run(n => lista = lista :+ n)
var listb: List[Int] = Nil
//val ccb = b.impure.run(n => listb = listb :+ n)
val bb = b.map{n => listb = listb :+ n; n}
val cc_aa_bb = aa.merge(bb).impure.run(_ => ())
assert(b.impure.value == 0)
assert(a.map(identity).impure.value == 0)
a := 1
assert(b.impure.value == 1)
assert(a.map(identity).impure.value == 1)
a := 2
assert(b.impure.value == 2)
assert(a.map(identity).impure.value == 2)
a := 3
a := 4
assert(lista == List(0,1,2,3,4))
assert(listb == List(0,1,2,3,4))
//cca.cancel
//ccb.cancel
cc_aa_bb.cancel
assert(a.isCold)
}
@OlivierBlanvillain Actually wait, it does still qualify as being RT - the reason it didn't work originally is because the function in the map is impure.