typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 601 forks source link

Topic#subscribe on closed topics has surprising (non-terminating) semantics #2558

Open lorandszakacs opened 3 years ago

lorandszakacs commented 3 years ago

using fs2 3.0.6 and 3.1.0. Here's a scatsie. All code pasted in this issue is also in the scatsie.

We have as an example of subscriber to a topic subscribing after the topic gets closed:

def topicSubscribeClosedTopicNonTerminating(topic: Topic[IO, Int]): IO[Unit] = {
  val subscriber1: Stream[IO, Unit] =
    topic
      .subscribe(maxQueued = 1)
      .evalMap(i => Console[IO].println(s"subscriber 1 received: $i"))

  for {
    fib1 <- subscriber1.compile.drain.start
    _ <- Console[IO].println(s"closing topic")
    _ <- topic.close
    _ <- fib1.join
  } yield ()
}

This consistently ends up non-terminating, and it's a bit surprising given that other methods seem to signal operating on closed topics better. I can think of 3 possibilities here: 1) write in the scaladoc of subscribe a warning about this behaviour 2) have the method return a Stream.empty 3) have the method return a Stream.raiseError(...)

Additionally, you'll see in the scatsie the methods topicAwaitInStream*, which could be included as an example in the scaladoc on how to avoid surprising semantics.

lorandszakacs commented 3 years ago

I'll happily tackle this myself if you can decide which fix you want for this.

SystemFw commented 3 years ago

Sorry, this completely flew under my radar, so I'm just looking at it