Open CarlLeth opened 4 years ago
This also raises the question, should stream (actually not only with memory
) die if it has no more subscribers? If to follow your logic stream should be always alive and ready to be re-connected. Does hot
in your view means always alive
or always ready to raise from the dead
and continues to emit the way it had started in the beginning?
I have run into this as well. In my case I have helper methods that run expensive async code in the middle of a stream, using an approach similar to https://github.com/staltz/xstream/issues/2:
someStream
.map(s => xstream.fromPromise(SomeExpensiveAsyncFn))
.flatten()
.remember()
Every time I unsubscribe and resubscribe to someStream
, SomeExpensiveAsyncFn
is called. I would have expected .remember()
to cache this value so it wouldn't be necessary. This makes the async pattern far less useful, unfortunately, and the only way I can think of to fix this is to build a state machine around the stream to hold the last value, which isn't great.
The reason I haven't responded to this issue yet is because it's not obvious whether MemoryStreams should forget or not forget. There are use cases where you need the MemoryStream to never forget (this issue), and there are use cases where you need to do cleanup and "reset", otherwise you could create memory leaks by retaining those forever.
In your case, @matthewjamesadam, I think you can opt-in to never resetting the stream by forcing the stream to never complete.
someStream
+ .compose(dropCompletion)
.map(s => xstream.fromPromise(SomeExpensiveAsyncFn))
.flatten()
.remember()
+function dropCompletion(stream) {
+ return xs.merge(stream, xs.never())
+}
Thanks @staltz -- I can definitely appreciate that there is no single "right" behaviour here. Perhaps a parameter on .remember()
could specify which behaviour should be used on the stream?
Thanks for the suggestion -- unfortunately for some reason it doesn't seem to be working for me. My scenario is ultimately pretty complicated so something else in my stream might be causing this problem.
there are use cases where you need to do cleanup and "reset", otherwise you could create memory leaks by retaining those forever.
Would it retain the object forever, or just until the MemoryStream itself gets GCed? If you call .remember() on a stream that emits some expensive object, I think that stream possibly taking up as much memory as that expensive object is exactly what you'd expect. It's not surprising that a Promise of an Expensive Object would itself be expensive (once the promise completes) until that promise is disposed of. I use .remember() when I want something "promise-like": a state-holder whose state might not be available yet, with the added benefit that I can know when it changes.
Perhaps I'm using Streams differently than you expected, but I don't really encounter cases where closed streams are sticking around in memory forever.
I believe this is the most surprising and inconsistent behavior in xstream. The second bullet in the introduction to xstream says "Only "hot" streams", but subscribing to a closed MemoryStream can cause a deep replay of past events. That seems like distinctly "cold" behavior, and it has consequences. It means that the behavior of your program can completely change based on internal state that's outside of the abstraction of
Stream
.Scanning through the issues list, I see this over and over. Here's a quick list of issues either fully or partly caused by this:
309 is directly caused by this behavior.
283 and #282 are related to this and point out inconsistent behavior depending on whether a stream is closed or not
271 may be related to this ("need to change the way of terminating and resetting a stream")
240 points out a problem with this
239 says a workaround is to
compose(delay(0))
which sounds awfully familiar and points at the same underlying issue.startWith('')
and is remembered. This stream is not constantly listened to -- it's hooked up to a switch with flatten. Every time it's re-subscribed to, it resets the value of the state machine because it replaysstartWith()
. This is external code changing what's supposed to be internal state.The reason I choose xstream over other libraries is exactly that second bullet: only hot streams. That means I can think in only one direction: forward, and I know that later streams and listeners can't cause side-effects that back-propagate throughout my stream graph. But that's exactly what's happening here: the behavior of my streams can change based on what's happening ten modules downstream.
What's the other side of the argument? Why would you want to keep this behavior?
I've seen the discussion at https://github.com/ReactiveX/rxjs/issues/453, but it's filled with talk of ReplaySubjects and re-connectable observables and exactly the sort of thing I thought "only hot streams" meant we don't have to bother with.