raquo / Airstream

State propagation and event streams with mandatory ownership and no glitches
MIT License
246 stars 28 forks source link

Streams with 'EventStream.fromFuture' drops result of futures if timing overlaps. #125

Closed roman-mibex-2 closed 4 months ago

roman-mibex-2 commented 4 months ago

Combining streams with EventStream.fromFuture behaves very unexpected to me when it starts to overlap. Lets say I've a EventStream (example stream of a searches) and I create a future for each event (like a fetch to a backend) and the use .flatMap to get the results as a stream, I get very unexpected behavior.

When the futures complete before the next event is triggered, everything runs as expected:

  def fetchFromBackend(i: Int): Future[String] = {
    val pr = scala.concurrent.Promise[String]()
    scala.scalajs.js.timers.setTimeout(10) {
      pr.success(s"future result for $i")
    }
    pr.future
  }
  val tt = EventStream
    .periodic(300)
    .flatMap(e =>
      EventStream.fromFuture(fetchFromBackend(e).map(r => {
        println(s"Future complete for $e"); r
      }))
    )
    .map(r => { println(s"Single entry stream complete ${r}"); r })

Then I get this a new event for each completion of a future, as expected.

Future complete for 0
Single entry stream complete future result for 0
Future complete for 1
Single entry stream complete future result for 1
Future complete for 2
Single entry stream complete future result for 2

However, as soon as the future takes more time, the future results are dropped in the stream!

  def fetchFromBackend(i: Int): Future[String] = {
    val pr = scala.concurrent.Promise[String]()
    scala.scalajs.js.timers.setTimeout(400) { # The backend takes longer than new events are created. 
      pr.success(s"future result for $i")
    }
    pr.future
  }
  val tt = EventStream
    .periodic(300)
    .flatMap(e =>
      EventStream.fromFuture(fetchFromBackend(e).map(r => {
        println(s"Future complete for $e"); r
      }))
    )
    .map(r => { println(s"Single entry stream complete ${r}"); r })

Now I get the first event, but all follow up events are dropped! (Or buffered somewhere?)

Future complete for 0
Single entry stream complete future result for 0
Future complete for 1
Future complete for 2
Future complete for 3

I still expect the stream events to arrive, but the do not arrive anymore. The 'late' events are dropped. I would expect that each future completion still results in a event.

Are my expectations wrong? How do I get all future completions in the stream?

yurique commented 4 months ago

flatMap is a flatMapSwitch, which will switch to the inner stream (https://github.com/raquo/Airstream?tab=readme-ov-file#flatmapswitch)

so what happens is this:

Though I don't know why you see the first Single entry stream complete future result for 0 - I would expect the combined stream to never emit at all 🤔

yurique commented 4 months ago

( https://scribble.ninja/u/yurique/dlnorhlaqoplfxtozjxhibfvmqm )

yurique commented 4 months ago

It doesn't emit at all (as I would expect) in Laminar 17: https://scribble.ninja/u/yurique/czrgiiejskkirkcpqdfgzsznkwkn

raquo commented 4 months ago

What Yurique said. The flatMap method was renamed to flatMapSwitch in 17.0.0 to avoid confusion like this.

As for the solution, if your parent stream was something bounded, with a small number events in its lifetime, you could use flatMapMerge. But if it's really EventStream.periodic, then the internal list of FutureStream-s compiled by that operator will grow unbounded over time, taking up memory for nothing.

In principle we could have some version of a flatMap operator that works like flatMapMerge, but eventually unsubscribes from the inner FutureStream, so that streams that won't emit anymore could be removed from the internal list. In practice, Airstream observables don't have a concept of "completion" (see https://github.com/raquo/Airstream/issues/23), so we don't know when they're done emitting events. We know when the underlying scala.Future is done emitting events, but we don't have a direct integration of Future into Airstream flatMap operators. Maybe we should have it, but it seems like an ad-hoc solution.

I guess, in the absence of a proper completion feature, we could add some optional params to the flatMapMerge method, that would indicate when to stop listening to previous inner streams, e.g. after N events, after M seconds, etc. That would be kinda ugly, ad-hoc, and not an intuitive solution for this issue. A proper completion system would be better, but it's rather non-trivial to design, given Airstream semantics.

roman-mibex-2 commented 4 months ago

Thanks for the explanations =)

I will try flatMapMerge then. In my real case the source stream isn't EventStream.periodic but driven by some user input, which is burst. So, in practice the futures will catch up quickly, as the burst ends.

raquo commented 4 months ago

@roman-mibex-2 Coolio, I just want to emphasize that the potential problem I had in mind is not so much the need for backpressure, it's that the futures won't be garbage-collected until the flatMapMerge stream itself is garbage collected. Even after the futures complete, flatMapMerge will keep holding on to them (for the reasons I mentioned).

If your flatMapMerge stream is defined locally in a Laminar component, it will probably be garbage collected when that component is unmounted (removed from the DOM). So in practice it's probably fine (but as a library we'll come up with a better way to deal with that eventually).

roman-mibex-2 commented 4 months ago

PS: flatMapMerge does work as expected =). Its good enough for now.

Hmm, yes, it will leak memory. I think for most places we use it this will be OK, as page refreshes etc will clear the memory eventually.

Otherwise we'll have to think of a way to feed js.Promises/Futures into a stream in some other way.

And in other places dropping the events with flatMapSwitch will be OK as well.