typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.37k stars 601 forks source link

Possibly weird behaviour with `Topic.subscribers` and `interruptWhen` #1286

Closed SystemFw closed 6 years ago

SystemFw commented 6 years ago

Still quite a lot of code, here's what I have:

import cats._, implicits._
import cats.effect._, concurrent._
import cats.effect.implicits._
import fs2._, fs2.concurrent._
import scala.concurrent.duration._

object Playground extends IOApp {
  def run(args: List[String]) = ExitCode.Success.pure[IO]

  implicit class Runner[A](s: Stream[IO, A]) {
    def yolo: Unit = s.compile.drain.unsafeRunSync
    def yoloV: Vector[A] = s.compile.toVector.unsafeRunSync
  }

  // put("hello").to[F]
  def put[A](a: A): IO[Unit] = IO(println(a))

  def prog1 =
    Stream
      .eval {
        (Topic[IO, String](""), Deferred[IO, Unit]).tupled
      }
      .flatMap {
        case (topic, stop) =>
          val kafkaConsumer = Stream
              .awakeEvery[IO](1.seconds)
              .map(_ => "message")
              .repeat
              .take(3) ++ Stream.eval(stop.complete(())).drain

          val subscribers = topic.subscribers.evalTap(put)
          val producer = kafkaConsumer.to(topic.publish)
          val consumer = Stream.sleep_[IO](200.millis) ++ topic
              .subscribe(10)
              .through(Sink.showLinesStdOut)
              .interruptWhen(stop.get.attempt)

          consumer concurrently Stream(
            producer,
            subscribers
          ).parJoinUnbounded
      }.yolo
  //  scala> prog1
  // 0
  //
  // 1
  // message
  // message
  // message
  // 0

  // scala> prog1
  // 0
  //
  // 1
  // message
  // message
  // message
  // 0
  // 1

  // scala> prog1
  // 0
  //
  // 1
  // message
  // message
  // message
  // 0

  // scala> prog1
  // 0
  // 1
  //
  // message
  // message
  // message
  // 0
  // 1

  def prog2 =
    Stream.eval(Topic[IO, String]("")).flatMap {
      topic =>
      val subscribers = topic.subscribers.evalTap(put)
      val consumer = Stream.sleep_[IO](200.millis) ++ topic
        .subscribe(10)
        .interruptWhen(timer.sleep(2.seconds).attempt)

      Stream(
        consumer,
        subscribers
      ).parJoinUnbounded.interruptWhen(timer.sleep(3.seconds).attempt)
    }.yolo
  // scala> prog2
  // 0
  // 1
  // 1

  // scala> prog2
  // 0
  // 1
  // 0
  // 1

  // scala> prog2
  // 0
  // 1
  // 1

  // scala> prog2
  // 0
  // 1
  // 0
  // 1

  def prog3 =
    Stream.eval(Topic[IO, Option[String]]("".some)).flatMap { topic => 
      val kafkaConsumer = Stream
        .awakeEvery[IO](1.seconds)
        .map(_ => "message")
        .repeat
        .take(3)
        .noneTerminate

      val subscribers = topic.subscribers.evalTap(put)
      val producer = kafkaConsumer.to(topic.publish)
      val consumer = Stream.sleep_[IO](200.millis) ++ topic
        .subscribe(10)
        .unNoneTerminate
        .through(Sink.showLinesStdOut)

      consumer concurrently Stream(
        producer,
        subscribers
      ).parJoinUnbounded
    }.yolo

  // scala> prog3 
  // 0
  // 1
  //
  // message
  // message
  // message
  // 0
  // consistently works
}
sebastianvoss commented 6 years ago

Not sure if it helps, but this also reproduces the problem.

import cats.effect._
import cats.implicits._
import fs2._
import fs2.concurrent.{SignallingRef, Topic}

import scala.concurrent.duration._

object Sample extends SampleApp

class SampleApp extends IOApp {

  def run(args: List[String]): IO[ExitCode] = {
    val kafkaConsumer = Stream.awakeEvery[IO](1.seconds).map(_ => "message".some).repeat

    val stream: Stream[IO, Unit] = for {
      topic <- Stream.eval(Topic[IO, Option[String]]("".some))
      stop  <- Stream.eval(SignallingRef[IO, Boolean](false))
      subscriberCountLogger = topic.subscribers.map(n => s"$n client(s) connected").through(Sink.showLinesStdOut)
      subscriber            = kafkaConsumer.to(topic.publish)
      subscriber1           = topic.subscribe(5).evalTap[IO](message => IO { println(s"subscriber1: $message") })
      subscriber2           = topic.subscribe(5).evalTap[IO](message => IO { println(s"subscriber2: $message") }).unNoneTerminate.interruptWhen(stop)
      s <- subscriber
        .concurrently(subscriber1)
        .concurrently(subscriber2)
        .concurrently(subscriberCountLogger)
        .concurrently(Stream.sleep(3.seconds) ++ Stream.eval[IO, Unit] {
          for {
            //r <- stop.set(true)         // interruptWhen does NOT unsubscribe subscriber2
            r <- topic.publish1(None)     // unNoneTerminate does unsubscribe subscriber2
          } yield r
        })
    } yield s

    stream.compile.drain.map(_ => ExitCode.Success)
  }

}

Output when using interruptWhen:

0 client(s) connected
1 client(s) connected
2 client(s) connected
subscriber1: Some()
subscriber2: Some()
subscriber2: Some(message)
subscriber1: Some(message)
subscriber2: Some(message)
subscriber1: Some(message)
1 client(s) connected
2 client(s) connected              <-- this is odd
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
// now it stops as the queue of subscriber2 (which wasn't unsubscribed) is full

Output when using unNoneTerminate (commented line in snippet):

0 client(s) connected
1 client(s) connected
2 client(s) connected
subscriber1: Some()
subscriber2: Some()
subscriber2: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber2: Some(message)
subscriber1: None
subscriber2: None
1 client(s) connected
subscriber1: Some(message)
subscriber1: Some(message)
// correctly goes on forever
pchlupacek commented 6 years ago

seems to me like some fault in pubsub strategy when unsusbcribing. Seems like old state is somehow passed along on unsubscribe...

sebastianvoss commented 6 years ago

With 1.0.0-RC2 unsubscribing seems to work as expected.

Only Topic.subscribers continuously emits the subscriber count (other than 1.0.0 which only emits when there is a change). I added a filter to deal with this: topic.subscribers.filterWithPrevious(_ != _).

nikelin commented 6 years ago

Happens to me as well. Had to switch back to the non-PubSub implementation as it seems that the subscribers list is not being cleaned for some reason and the subscribers count is just keeps growing.

In my case it is also a parent subscription with an .interruptWhen on it. Parent subscription completes correctly but the subscriber entry remains in the PubSub.

P.S. Yesterday I checked the implementation myself but the amount of indirection was just to high for me to understand where the problem is.

mpilquist commented 6 years ago

This sounds like the same root cause as described #1293

SystemFw commented 6 years ago

Any decisions on this and #1293 ? It's starting to affect my teams in prod

pchlupacek commented 6 years ago

@SystemFw I have drafted solution with Resource[F, F[Unit]] just didn't have time to put up PR yet. I could manage it till Sunday. Is that ok ?

SystemFw commented 6 years ago

Sure :) I'm assuming we would want a release soon after, for this and the other small bugs (like splitAt)

pchlupacek commented 6 years ago

@SystemFw Yeah, I would like to have this done earlier so we can have release over weekend

mpilquist commented 6 years ago

@sebastienvoss This appears fixed in 1.0.0-SNAPSHOT thanks to #1308.

scala> Sample.main(Array.empty)
0 client(s) connected
2 client(s) connected
subscriber1: Some()
subscriber2: Some()
subscriber2: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber2: Some(message)
subscriber1: None
subscriber2: None
1 client(s) connected
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
subscriber1: Some(message)
mpilquist commented 6 years ago

@SystemFw Could you try your use case too?

SystemFw commented 6 years ago

Yeah the one above is the one I have directly, the other one is from another team, I'll ask them to give it a shot

mpilquist commented 6 years ago

@SystemFw Here's what I get using your samples, which I think is correct but please spot check the output:

scala> prog1
0
1

message
message
message
0

scala> prog1
0

1
message
message
message
0

scala> prog1
0

1
message
message
message
0

scala> prog2
0
1
0

scala> prog2
0
1
0

scala> prog2
0
1
0

scala> prog2
0
1
0

scala> prog2
0
1
0

scala> prog3
0
1

message
message
message
0
SystemFw commented 6 years ago

@mpilquist The problem we were having in prod was fixed by upgrading to the SNAPSHOT

sebastianvoss commented 6 years ago

Great stuff! I checked and cannot reproduce the issue anymore. Will run some more tests tomorrow and get back in case of any issues.

jano7 commented 6 years ago

Hi, could you try running this? It hangs after run 3 in version 1.0.0 (I guess because maxQueued = 2).

import cats.effect.{ContextShift, IO}
import fs2.Stream
import fs2.concurrent.{Queue, Topic}

object Test extends App {

  implicit val ioContextShift: ContextShift[IO] = IO.contextShift(scala.concurrent.ExecutionContext.Implicits.global)

  val publisher = Queue.unbounded[IO, Double].unsafeRunSync()

  val topic = Topic[IO, Double](.0).unsafeRunSync()

  val listener = publisher.dequeue.map(_ * 100).map(_.floor) to topic.publish

  listener.compile.drain.unsafeRunAsync {
    case Right(_) => println("listener stopped gracefully")
    case Left(e) => e.printStackTrace()
  }

  val maxQueued = 2

  val rpc = for {
    b <- Stream.eval(Queue.unbounded[IO, Double])
    receivedStream = for {
      _ <- Stream.eval(b.dequeue1)
      _ <- Stream.emit(math.random()) to publisher.enqueue
      r <- b.dequeue
    } yield r
    received <- receivedStream.concurrently(topic.subscribe(maxQueued) to b.enqueue)
  } yield received

  1 to 10 foreach { i =>
    val result = rpc.take(1).compile.toList.unsafeRunSync()
    println(s"run $i: $result")
  }
}
SystemFw commented 6 years ago

I'd say this can be closed now