typelevel / fs2

Compositional, streaming I/O library for Scala
https://fs2.io
Other
2.36k stars 599 forks source link

groupWithin causes double execution #1389

Open neko-kai opened 5 years ago

neko-kai commented 5 years ago

See the example below:

package example

import java.util.concurrent.atomic.AtomicLong

import cats.effect._
import cats.syntax.flatMap._
import cats.syntax.functor._
import fs2._

import scala.compat.Platform.ConcurrentModificationException
import scala.concurrent.duration._
import scala.concurrent.blocking

class MiniKafka {
  final val NO_CURRENT_THREAD = -1L
  val currentThread = new AtomicLong(NO_CURRENT_THREAD)

  def acquire() = {
    val threadId = Thread.currentThread().getId
    if (threadId != currentThread.get && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
      throw new ConcurrentModificationException
  }

  def release() = {
    currentThread.set(NO_CURRENT_THREAD)
  }

  def subscribe(): this.type = {
    acquire()
    try this finally release()
  }

  def commit() = {
    acquire()

    try {
    } finally release()
  }

  def close() = {
    acquire()
    try Thread.`yield`()
    finally release()
  }

  def poll(): String = {
    acquire()
    try {
      "abc"
    } finally {
      release()
    }
  }
}

object App extends App {

  def repro[F[_]: ContextShift: Timer](implicit F: Concurrent[F]) = {
    Stream.resource {
      Resource.make(F.delay(new MiniKafka().subscribe()))(c => F.delay(c.close()))
    }.flatMap {
      kafka =>
        Stream.repeatEval {
            F.delay(blocking(kafka.poll())).map(kafka -> _)
        }
    }
      .groupWithin(1000, 1.millis).map(_.toVector)
      .evalMap { x =>
        val kafka = x.unzip._1.head

        F.delay(blocking(kafka.commit())) >>
          F.delay(println(x.unzip._2.toString())).as(kafka)
      }
      .compile.drain
  }

  new IOApp {
    override def run(args: List[String]): IO[ExitCode] =
      repro[cats.effect.IO].as(ExitCode.Success)
  }.main(Array())
}

Run it and leave it for some time, up to ~10 minutes, with current fs2-1.0.2 and JDK11 after some time I get

Vector(abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc, abc)
java.util.ConcurrentModificationException
    at example.MiniKafka.acquire(Repro.scala:22)
    at example.MiniKafka.poll(Repro.scala:49)
    at example.App$.$anonfun$repro$6(Repro.scala:69)
    at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$1$$anon$2.block(ExecutionContextImpl.scala:75)
    at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3118)
    at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$1.blockOn(ExecutionContextImpl.scala:87)
    at scala.concurrent.package$.blocking(package.scala:146)
    at example.App$.$anonfun$repro$5(Repro.scala:69)
    at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
    at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:351)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:372)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:312)
    at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)

Which means that poll() was executed twice - in different threads (new thread from blocking). Removing groupWithin or replacing it chunkN or map(Vector(_)) makes this go away.

The same thing happens when running fs2 under scalaz ZIO, so this shouldn't be cats-effect specific.

Looking at groupWithin source I have no idea how this can happen, thoughts?

neko-kai commented 5 years ago

Smaller example:

package example

import java.util.concurrent.atomic.AtomicLong

import cats.effect._
import cats.syntax.functor._
import fs2._

import scala.compat.Platform.ConcurrentModificationException
import scala.concurrent.blocking
import scala.concurrent.duration._

class MiniKafka {
  val simultaneous = new AtomicLong(0)

  def acquire() = {
    if (!simultaneous.compareAndSet(0, 1))
      throw new ConcurrentModificationException("Simultaneous action!")
  }

  def release() = {
    simultaneous.set(0)
  }

  def action() = {
    acquire()
    try "abc" finally release()
  }
}

object App extends IOApp {

  val kafka = new MiniKafka()

  def repro[F[_]: ContextShift: Timer](implicit F: Concurrent[F]) =
    Stream.repeatEval(F.delay(blocking(kafka.action())))
      .groupWithin(1000, 1.millis)
      .evalMap(_ => F.delay(blocking(kafka.action())))
      .compile.drain

  override def run(args: List[String]): IO[ExitCode] =
    repro.as(ExitCode.Success)
}
java.util.ConcurrentModificationException: Simultaneous action!
    at example.MiniKafka.acquire(Repro.scala:18)
    at example.MiniKafka.action(Repro.scala:26)
    at example.App$.$anonfun$repro$5(Repro.scala:38)
    at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$1$$anon$2.block(ExecutionContextImpl.scala:75)
    at java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3118)
    at scala.concurrent.impl.ExecutionContextImpl$DefaultThreadFactory$$anon$1.blockOn(ExecutionContextImpl.scala:87)
    at scala.concurrent.package$.blocking(package.scala:146)
    at example.App$.$anonfun$repro$4(Repro.scala:38)
    at cats.effect.internals.IORunLoop$.cats$effect$internals$IORunLoop$$loop(IORunLoop.scala:87)
    at cats.effect.internals.IORunLoop$RestartCallback.signal(IORunLoop.scala:351)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:372)
    at cats.effect.internals.IORunLoop$RestartCallback.apply(IORunLoop.scala:312)
    at cats.effect.internals.IOShift$Tick.run(IOShift.scala:36)
    at java.base/java.util.concurrent.ForkJoinTask$RunnableExecuteAction.exec(ForkJoinTask.java:1426)
    at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
    at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
    at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
    at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:177)
SystemFw commented 5 years ago

Which means that poll() was executed twice

Is that unavoidably the case, or could it be that poll and commit are executing concurrently?

neko-kai commented 5 years ago

@SystemFw That may be, since it doesn't reproduce without the commit part. Either way that means the control flow has broke.

SystemFw commented 5 years ago

Either way that means the control flow has broke.

I'm not sure you can draw that conclusion. groupWithin is inherently a concurrent operation. Note that in the kafka library at my workplace (ovotech/fs2-kafka) we explicitly guard kafka with an abstraction that ensures things are accessed serially, because eventually it breaks if you don't.

So at the moment I'm not sure if this is a bug or just a normal concurrent interleaving in groupWithin (I'm leaning towards the latter but haven't run a full analysis yet, so I may be wrong).

neko-kai commented 5 years ago

@SystemFw Makes sense. Though if by groupWithin is inherently a concurrent operation you mean that groupWithin will keep buffering while the downstream is processing - that would also mean that groupWithin does not backpressure, unlike chunkN - in that case this (and the control flow interleaving) probably should be mentioned in documentation.

Intuitively it seems to me that a chunkN+Timer with backpressure is an operation that can be implemented serially, but I may be missing something.

SystemFw commented 5 years ago

you mean that groupWithin will keep buffering while the downstream is processing - that would also mean that groupWithin does not backpressure,

No, it's not quite like that. What I mean by that is that there is a queue in between, and some concurrency. Now, if the queue was completely unbounded, then indeed there would be no back pressure at all, and you will see that behaviour pretty much immediately. However, the queue used is a synchronous queue, which doesn't allow publishing unless there is a subscriber ready, and only allows one publish at the time, which should guarantee backpressure.

So one hypothesis is that you're seeing a race in that queue (in which case it would be a bug), but I won't be able to say precisely until I have more time to dig into it.

Intuitively it seems to me that a chunkN+Timer with backpressure is an operation that can be implemented serially, but I may be missing something.

Well, just be aware that groupWithin has proven very tricky to implement correctly, but obviously I'm open to improvements :)

I don't see how you can implement a timeout without any concurrency though. You can try an implementation that doesn't rely on queues, which was indeed tried, but the current one worked better.

SystemFw commented 5 years ago

Btw, I guess I should also say that I agree that we want this combinator to not cause interleaving. What I'm talking about wrt "bug" vs "normal interleaving" is just "thing that should work in the current code but doesn't" vs "thing that wasn't thought of in the current code, and we need to try and add".

I do recommend guarding kafka through other means though, regardless of this issue.

neko-kai commented 5 years ago

Yes, you can't timeout strictly without concurrency, but a naive implementation that just checks the clock after each chunk to decide the cutoff would be sufficient for my usecase and be guaranteed to behave predictably wrt control flow. I may open a PR to add chunkNTimed if such a function is within the scope of fs2 (and hasn't been tried and discarded before).

Btw, I guess I should also say that I agree that we want this combinator to not cause interleaving. What I'm talking about wrt "bug" vs "normal interleaving" is just "thing that should work in the current code but doesn't" vs "thing that wasn't thought of in the current code, and we need to try and add".

I guess we should decide whether it's worth it to dig deeper and try and remove all "normal interleaving" from groupWithin or document it and move on.

SystemFw commented 5 years ago

I think I'd like to understand what's going on before making that decision. I'm travelling these days but hopefully I'll have some time next week

SystemFw commented 5 years ago

This reproduces the behaviour very quickly, without groupWithin


import fs2._
import fs2.concurrent.Queue
import cats.effect._
import cats.effect.concurrent.Ref
import cats.implicits._

import scala.concurrent.duration._

trait MiniKafka[F[_]] {
  def poll: F[Unit]
  def commit: F[Unit]
}
object MiniKafka {
  def create[F[_]: Sync: Timer] =
    Ref[F].of(false).map { inUse =>
      new MiniKafka[F] {
        def error(s: String) = new Exception(s"Simultaneous action! $s")

        def acquire(s: String) =
          inUse.modify {
            case true => true -> Sync[F].raiseError[Unit](error(s))
            case false => true -> ().pure[F]
          }.flatten

        def release = inUse.set(false)

        def poll = acquire("poll") >> release

        def commit = acquire("commit") >> release
      }
    }
}

case class Repro[F[_]: Concurrent: Timer]() {
  def prog = MiniKafka.create[F].flatMap { kafka =>
    Queue.synchronous[F, Unit].flatMap { q =>
      q.dequeue
        .evalMap(_ => kafka.commit)
        .concurrently(Stream.repeatEval(kafka.poll).through(q.enqueue))
        .compile
        .drain
    }
  }
}

object App extends IOApp {
  def run(args: List[String]) =
    Repro[IO].prog.as(ExitCode.Success)

  def repl = Repro[IO].prog.unsafeRunSync
}

However this behaviour seems reasonable: the queue guarantees that the element can't be published unless there is a subscriber, not that the action producing it won't even start, one would need additional synchronisation for that.

I think this falls into the "things I simply hadn't thought about" category. I assumed a buffer size of 1 was not a problem, so this is pull-based with an offset of 1, whereas with kafka you want 0 offset.

I'll have to think about how (and if) this can be supported.

SystemFw commented 5 years ago

I think this is as good as it gets with the current implementation, however this inspired me to play around with a different one, we'll see if I manage to make it work

SystemFw commented 5 years ago

So actually, rewriting that repro like this seems to work (ran for half an hour):

 def prog =
    (MiniKafka.create[F], Queue.synchronous[F, Unit], SignallingRef[F, Unit](())).mapN {
      (kafka, q, sig) =>
      def consumer = Stream.repeatEval(sig.set(()) >> q.dequeue1).evalMap(_ => kafka.commit)
      def producer = sig.discrete.zipRight(Stream.repeatEval(kafka.poll)).through(q.enqueue)

      consumer
        .concurrently(producer)
        .compile
        .drain
    }.flatten

The relevant part is the use of sig.discrete to force a pull-then-emit relationship, as opposed to the emit-block-pull you get from the synchronous queue alone.

This can probably be adapted and added to the current groupWithin, but I'm not sure this additional piece of synchronisation is worth it, given that this issue is a bit of corner case (kafka java client behaviour, which is explicitly guarded in fs2-kafka libs).

@mpilquist @pchlupacek thoughts?


On another note, I've been playing around with another mechanism of resettable timeouts to replace the one currently used by groupWithin (still untested, and it doesn't have to be encapsulated in a datatype ofc):

trait Alarm[F[_]] {
  def reset(d: FiniteDuration, timeoutId: Token): F[Unit]
  def timeouts: Stream[F, Token]
}
object Alarm {
  def create[F[_]: Concurrent: Timer]: F[Alarm[F]] = {

    def now = Timer[F].clock.monotonic(NANOSECONDS).map(_.nanos)

    class Timeout(val id: Token, issuedAt: FiniteDuration, d: FiniteDuration) {
      def asOfNow:  F[FiniteDuration] = now.map(now => d - (now - issuedAt))
    }
    object Timeout {
      def issueNow(id: Token, d: FiniteDuration): F[Timeout] = now.map(new Timeout(id, _, d))
    }

    SignallingRef[F, Option[Timeout]](None).map { time =>
      def nextAfter(t: Timeout): Stream[F, Timeout] =
          time.discrete.unNone.dropWhile(_.id == t.id).head

        new Alarm[F] {
          def timeouts: Stream[F, Token] =
            Stream.eval(time.get).unNone.flatMap { timeout =>
              Stream.eval(timeout.asOfNow).flatMap { t =>
                if (t <= 0.nanos) Stream.emit(timeout.id) ++ nextAfter(timeout).drain
                else Stream.sleep_[F](t)
              }
            } ++ timeouts

          def reset(d: FiniteDuration, id: Token) = Timeout.issueNow(id, d).flatMap(t => time.set(t.some))
        }
      }
  }

In groupWithin, this could be merged into the stream of values, and then the accumulation is done in a Pull, or we could change timeouts to enqueue the Token s in the synchronous queue instead of emitting them, and leave the accumulation function as is.

The advantage of this approach is that it avoids spawning and canceling fibers all over, and uses a single long running stream instead. It also doesn't need the custom resource tracking scheme of the current version, as the timeouts stream could be handled by a simple concurrently or merge, which takes care of proper finalisation.

The disadvantage is that right now Stream.sleep (Stream.eval(Timer.sleep)) is leaky when interrupted, because even though Scope does track it correctly, it marks the computation as uninterruptible for Eval nodes, as we know. We need to fix this ofc.

Thoughts?

ChristopherDavenport commented 5 years ago

@SystemFw I created Agitation as a concept for resettable timeouts, so please feel free to start things there if you would like that to get out. I was operating with Fiber and Ref initially but if I need additional tools I'd like to get that incorporated as I expect this is a more generic pattern that will be needed in many applications.

https://github.com/ChristopherDavenport/agitation

SystemFw commented 5 years ago

@mpilquist @pchlupacek Any thoughts on the additional synchronisation in the first part of this comment? https://github.com/functional-streams-for-scala/fs2/issues/1389#issuecomment-452546727

Is it worth adding that? Or it's too much given that this is a corner case, and therefore this is a wont fix? I'd like to move forward one way or another, but I'm undecided which way

pchlupacek commented 5 years ago

@SystemFw My first instinct would be not to change anything if there is just special case for particular product (kafka). However if the change will make groupWithin more predicatable w/o significantly impacting performance and no behaviour changes, then I am for it.

I am not sure if however this has to be solved on groupWithing itself. I think we are sort of tackling transaction concept here, which perhpas we have to see if can be solved somehow in generic way.

SystemFw commented 5 years ago

My first instinct would be not to change anything if there is just special case for particular product (kafka).

Agree

However if the change will make groupWithin more predictable

It probably will

w/o significantly impacting performance

I'm not sure about that, that's the issue

I am not sure if however this has to be solved on groupWithing itself. I think we are sort of tackling transaction concept here

We are not. The transaction issue is specific to Kafka. The problem here is merely concurrency, groupWithin is push based with back pressure of 1, so very close to being purely pull based but not quite. I think many concurrent operations have to be implemented this way, I don't really see it as a general problem (hence the indecision). It's just that groupWithin really sounds like it should be non concurrent from the outside (which it can't be if you delve into it, it needs concurrency).