Closed victormoukhortov closed 9 years ago
Hey @victormoukhortov thanks for the report. Seems like we probably have a hole in the disposer chain. I'll look into this and see what's going on. Thanks for the sample code as well--should make it easy to reconstruct a simple test case.
@victormoukhortov FYI, I'm gonna be looking into this today. I hope to have a branch for you to try later today, so we can make sure that it solves your issue.
@victormoukhortov I just pushed the fix-scan-join-dispose branch with a candidate fix. Can you try it out? I used this test program, derived from your example, to verify. I still need to add a unit test for it.
@briancavalier Thanks! I'll try it out.
@briancavalier I'm afraid it's still not working. The updated program still does not unsubscribe from the inner subscriptions. I also think your test program is flawed as your test case calls take(1)
which would seem to only return the value from getSnapshot
without initializing the inner stream.
Any chance you can post your code somewhere? It'd be great if I could use it to debug since it seems like the problem is still happening there, but not in my test.
which would seem to only return the value from getSnapshot without initializing the inner stream.
No items from getSnapshot will ever appear on the console, because each is flatMapped to a stream of updates. Only items from getUpdates will be logged. That's intentional, and is the same as your code above.
That said, maybe there is something different about my test and your original code. Can you spot any differences?
Also, I'm working on unit tests now to verify that both the outer and inner stream dispose functions are indeed called. As I finish the unit tests, they may shed some more light, too.
Just added unit tests for disposing the outer and inner streams. In the process, I found another dispose problem in join, fixed it, and added a unit test for it.
My code is pretty much verbatim what I posted. I'm trying to implement differential synchronization with STOMP over SockJS. Everything seems to be working smoothly, except that subscriptions to public topics seem to leak when the resulting streams are closed. I realize testing over a live WebSocket connection is impractical, but that's where the problem seems to be appearing. Your test case results in two closed streams?
Your test case results in two closed streams?
Yeah. It logs to the console when a stream is disposed, and here's what I get on the console:
dispose snapshot
1
dispose update
So what's happening is:
take(1)
inside getSnapshot1
via scan, which gets loggedtake(1)
at the outermost level, which limits the whole composite stream to producing a single value, ie the 1
that's logged to the console.except that subscriptions to public topics seem to leak when the resulting streams are closed
Hmmmm, I wonder if stompjs's unsubscribe
isn't working the way we expect it to, or has a bug. How have you been able to determine that topic subscriptions are leaking?
Yeah, that makes sense. Your latest fix seems to be working a little better though. take(1)
seems to be working. takeWhile(..)
however still doesn't seem to close nested streams. Also, there seems to be rather a long delay between receiving the message in the observe
handler and MostJS actually closing the inner stream. Any thoughts?
Yep, there was a one-character typo in takeWhile
that broke dispose. Fixed in 0395555, updated the PR.
rather a long delay between receiving the message in the observe handler and MostJS actually closing the inner stream
Hmmm, I'm not seeing a delay in my test. The dispose function returned from the create
callback is invoked very quickly. Quick question: What exactly do you mean by "closing the inner stream"? All most.js can do is call the disposer function (which it seems to be doing quickly in my test) ... maybe stompjs's unsubscribe function takes some time to complete? Is there any more info you can provide on the delay (maybe a timestamped log of some sort) that might help us track it down?
BTW, I really appreciate your helping to work through these issues, and all the feedback so far!
@victormoukhortov I'm going to merge #45, since there's some really critical fixes in there. I'll edit the description so that it doesn't auto-close this issue, though. We can either leave this one open until we figure out the source of the delay you're seeing, or we can open a new issue specifically for that (whichever you want to do is cool w/me).
@briancavalier A rudimentary test seems to be working. I've still got a problem, but I think it's somewhere else. Thanks for all your help. Feel free to close this issue.
@victormoukhortov No problem at all. Like I said above, I really appreciate all the help and feedback. Being able to plug those dispose holes before most.js gets to 1.0 is great.
@briancavalier This still does not seem to be working correctly (0.9.0). What seems to be happening is that the inner stream is not properly disposed of. This means that there is no UNSUBSCRIBE
message sent over STOMP. This means that when I receive a new message over this (now un-listened-to) topic, I get the following error:
Potentially unhandled rejection [3] TypeError: undefined is not a function
at Stream.disposeScan [as dispose] (return stream.dispose(t, x, state);)
at Stream.disposeCurrent [as dispose] (return s.stream.dispose(t, x, s.state);)
at ... (disposeInner when callback - return si.stream.dispose(t, x, i.state);)
at tryCatchReject
at runContinuation1
at Fulfilled.when
at Pending.run
at Scheduler._drain
at define.drain
Any thoughts?
Edit: It seems as though it's trying to dispose of the inner stream upon receiving a message there (on /topic/resource
), whereas correct behaviour would be to dispose of it right away since the outer stream provided the only event observers are interested in.
Hmmm, yes, I was just able to reproduce that error with a recent demo app I created. It looks like my fix (from #45) for scan's dispose handling isn't quite right. Reopening this while I investigate more.
@victormoukhortov I just pushed a potential fix to the fix-scan-dispose-state branch. The unit test for scan dispose was already passing, so clearly I need to beef up the test a bit. However, this change does still pass all existing tests and fix the problem in the demo app I mentioned above. So, it seems right.
It'd great if you could try it, though, to verify. Meanwhile, I'll work on creating a unit test that exposes the issue, so that I can see the test turn green :)
@briancavalier Great! I'll try it out
I did two more tests with my demo app. I added a take
here, to the inner stream, and it worked correctly: I see a second UNSUBSCRIBE as expected and no error. Then, I removed that take
, and added a take
to the outer stream here, which also worked correctly: I again saw a second UNSUBSCRIBE as expected and no error.
Trying to devise a unit test for this is proving to be tricky, tho ...
@briancavalier It seems as though the inner stream is still waiting for a message before unsubscribing. Though when it gets one it does seem to unsubscribe without error. Did you see the UNSUBSCRIBE
before any message arrived at the topic
?
I can see how unit tests would be tricky...
I just pushed another update to the fix-scan-dispose-state branch. I think it's getting close. Still no unit test, but I do have a simpler text program, derived from the demo app (but runnable in node), that shows the problem. Running it on master fails, while running it on the branch succeeds. So next, I'll try to turn that into a unit test.
inner stream is still waiting for a message before unsubscribing
Hmmm, I'm not seeing any unexpected "wait before end" with take
. This gist has a simple test program that seems to do what I'd expect: take(0)
ends instantaneously upon observing, take(1)
ends instantaneously after the first event (without waiting for a second), and similarly take(2)
ends instantaneously after 2 events.
If you're seeing something weird, put together a smaller example and I'll take a look. Or maybe fork that gist and try to reproduce what you're seeing. Thanks!
Ah ha! I think this gist shows the issue with waiting. I'll open another issue for that.
Hmm.. is that a separate issue? It seems to be that the inner stream is still not being closed when it's supposed to. In your second test, even when n is one, the inner stream persists until it receives a message..
Yeah, I think it's a separate issue. The issue is that join()
(which is used to implement flatMap
) is waiting for an inner stream to return it's "current event" rather than simply disposing it and dropping it immediately.
Since this issue primarily has been dealing with dispose not happening at all, and so far that's been caused by bugs in scan
and take
, I'd like to open a new issue that specifically addresses join
.
@victormoukhortov I should be able to finish the unit test and get this latest round of changes ready for a 0.9.1 patch release tomorrow.
@briancavalier which branch should I test it out on in the mean time? I guess it would be a merge of fix-scan-dispose-state
and dispose-inner-join-immediately
?
I just created a temporary v0.9.1 branch that has both merged into it. Give that a go :smiley:
Thanks for the branch. I'm afraid it's still not working however...
Do you mean that you're still seeing take
waiting in a way that you think it shouldn't be?
Yeah, take(1)
(on branch v0.9.1
) still doesn't close the inner stream for me. I'll try to put together a test case tomorrow.
Ok, I've made a little progress on this in the v0.9.1 branch, but nothing pushed yet. I think you were right that the best thing is to use a single branch, since there seem to be at least 2 very closely related issues (more closely related than I thought). So, I'll probably abandon the two open PR branches in favor of v0.9.1, and continue working there.
I want to take a little time to do a thorough review of the dispose architecture. I'll post updates here as I work through that.
I'll try to put together a test case tomorrow.
That'd be great. Thanks.
Hi @briancavalier, I've made that unit test. I've not run it but it should communicate the intent.
Edit: I did run it, it is indeed broken.
Really appreciate the unit test, @victormoukhortov!
In the v0.9.1 branch now, the dispose failure introduced in 0.9.0 has been fixed, along with a few other subtle cases. However, I backed out the change to make join abandon inner streams early--that will require a bit more work. So, my plan is to release 0.9.1 to address the dispose regression in 0.9.0, and then deal with join() in 0.9.2.
Also, here's a slight variant on your gist. When I run it with the v0.9.1 branch, it logs:
Dispose Outer
Dispose Inner
Inner 1
It logs "Dispose Outer" after 1 second, which is correct, but logs "Dispose Inner" after 10 seconds, which is incorrect due to the known issue with join() (which will be addressed in 0.9.2). It does dispose without an error (ie 0.9.0), and doesn't propagate "Inner 2", which is also correct.
This is working correctly, afaik, in the v0.10-merge branch
This is fixed as of 6bbe55e.
@victormoukhortov Running your unit test logs:
❯ time node experiments/dispose/outer-inner1.js
# at 1 second
Dispose Outer undefined
# at 11 seconds
Inner
Dispose Inner undefined
node experiments/dispose/outer-inner1.js 0.09s user 0.02s system 0% cpu 11.111 total
Notes:
#
comments, they are not in the unit test outputundefined
is the end signal value, ie what is passed to end()
. If it was end(123)
, it would have logged 123
instead of undefined
The outer is disposed, as expected, at 1 second. However, ending the outer via end
must allow inners that are still alive to continue, which is what happens. The take
is being applied to the joined stream, which will only emit an item from the inner stream, which happens at 11 seconds (1 second to create first inner stream, then that stream waits 10 seconds before emitting its first item). Running the test via the time
command reports that the whole thing took 11.111
wall clock seconds.
The test also works if you comment out the end
in the inner stream, since the join.take
is limiting everything to 1 event. Additionally if you insert more add(...)
s into the inner stream, they will not produce more events, also due to the join.take
. Note that it is impossible to prevent the developer from calling add
many times synchronously because JavaScript is run-to-completion. For example:
setTimeout(function() {
add("Inner 1");
add("Inner 2");
add("Inner 3");
add("Inner 4");
... and so on
}, 10000);
However, the join.take(1)
correctly prevents events beyond the first from propagating.
I think we're good. Closing, but please do try out the master branch :)
@briancavalier I'm not sure I understand why this inner stream should not be immediately disposed of as soon as the joined stream is limited via take(1)
. What I would hope would happen would be that the Dispose Inner
logging statement should be called after 1 second. If it correctly freed resources (i.e. cleared the timeout), then the Inner
logging statement should never be called.
I'm not sure I understand why this inner stream should not be immediately disposed of as soon as the joined stream is limited via take(1)
I hear you, higher order streams are tricky. The reason is that take(1)
has been applied to the outer stream, meaning that the outer stream will end when 1 event has been observed on it (or perhaps: "escapes from it", or "propagates out of it"). The operations join and flatMap work by merging events from inner streams to the outer stream. The materialization of an inner stream is not an emitted event because it is unobservable. If the inner stream never emits any events, neither will the outer.
Any join()
ed higher order stream with exactly 1 inner stream is indistinguishable from the inner stream. For example:
var alsoEmpty = most.of(most.empty()).join();
var alsoNever = most.of(most.never()).join();
var alsoDelayedByOneSecond = most.of(most.of('delayed').delay(1000)).join();
These are similar to the case from your unit test. In the unit test, once the inner stream has materialized (which doesn't cause the outer stream to emit an event, ie materialization !== event), it waits 10 seconds before emitting its first event. That first inner event is then emitted from the outer stream. So, a total of 11 seconds pass before the event is emitted from the outer stream. Only at that point, when the first observable event has been emitted from the outer stream, has take(1)
been satisfied.
The explicit end()
invoked by the outer stream's publisher function is simply an indication that no additional inner streams will be materialized.
I verified the current behavior against a other impls (RxJS and kefir) as well, and it's consistent.
What I would hope would happen would be that the Dispose Inner logging statement should be called after 1 second
The particular arrangement of combinators in the test sets up a different behavior than that. By using a different arrangement, you can certainly get an "inner ends after 1 second" behavior. Here's an example (I know this is not what you're after, just illustrating):
var endsInAtMostOneSecondNoMatterWhat = join.takeUntil(most.of().delay(1000));
I think the right thing is to think about the overall behavior you want, and then work to the arrangement of combinators that represents that behavior. I'm betting that's how you approached it originally, as did I when I wrote that original demo program with the websockets--and I may not have gotten it right!
I've kind of lost sight of what your original goal was (sorry!) that led to the take(1)
example, but I'd be happy to try to help work through it.
@briancavalier Thank you for your detailed reply. I've not run any tests yet, but for practical purposes, I'd be concerned that resources were not being freed as soon as possible. If a limited, joined stream had to wait for all of it's component streams to fire an event before freeing their resources, then these resources might never be freed.
join.take(1)
from my example should free the inner stream after 1 second because join.take(1)
emits an event after one second (because outerStream
emits an event after one second), at which point we know we are no longer interested in events from any of the component streams and can thus free resources held by them.
join.take(1) from my example should free the inner stream after 1 second because join.take(1) emits an event after one second
This is the part that doesn't match the situation being setup by the code. The stream named join
doesn't emit an event after 1 second. The stream named outer
does, which causes the stream named join
materializes an inner stream after 1 second. The stream named join
only emits an event 10 seconds later when inner
emits its first event.
Hmm.. so how would I write it to be as I described? So that join
emits an event as soon as outer
does?
One thing that may help is if we think of join
differently (and more correctly). Mechanically speaking, inside the imperative implementation of most.js, there are inner and outer streams being managed by flatMap.
Mathematically speaking however, there are not: the stream named join
is a first-order stream containing the string "Inner" at time t = 11 seconds. Any higher-order-ness has been prevented by flatMap
.
Hmm.. so how would I write it to be as I described?
I think it comes down to what is the larger goal. I'm still not quite sure I have a handle on that, but I'll try to answer the next question as best I can.
So that join emits an event as soon as outer does?
In the code exactly as it is in the gist right now, there is no way to emit the string "Inner" in 1 second. because "Inner" doesn't even exist until time t = 11 seconds. If inner immediately emitted "Inner" without setTimeout, it would appear on the console at t = 1 second. Afaict, removing the setTimeout
is the only way (barring time travel, of course! :) )
Another option would be to use map
instead of flatMap
. That will, however, emit the inner
stream itself (ie the JavaScript object) as an event in join
after 1 second, at which point outer
will end and be disposed, and the inner
stream will never have been started. That doesn't seem like what you're after, though.
It seems as though streams which are nested using
flatMap
are not closed out correctly. Code sample:When running code such as:
I expect both streams which are created (
getSnapshot
andgetUpdates
) to be disposed of. Currently only the stream generated bygetSnapshot
is disposed of correctly while the stream generated bygetUpdates
remains un-disposed.