typelevel / cats-effect

The pure asynchronous runtime for Scala
https://typelevel.org/cats-effect/
Apache License 2.0
2.03k stars 520 forks source link

`Dequeue#size` drops below 0 on concurrent `takeFront`, `offerBack`, `size` #4170

Open kamilkloch opened 5 hours ago

kamilkloch commented 5 hours ago
import cats.effect.std.Dequeue
import cats.effect.{IO, IOApp}

import scala.concurrent.duration.DurationInt

object DequeueTests extends IOApp.Simple {

  def run: IO[Unit] = {
    Dequeue.bounded[IO, Int](1024).flatMap { q =>
      val supplier = q.offerBack(1).debug("offerBack").delayBy(200.millis)
      val consumer = q.takeFront.debug("takeFront")
      val checkSize = q.size.debug("size").flatMap(n => if (n < 0) IO.raiseError(new Exception(s"q.size = $n")) else IO.unit)

      consumer.background.surround(supplier >> checkSize)
    }
  }
}

Output:

offerBack: Succeeded: ()
takeFront: Succeeded: 1
size: Succeeded: -1
java.lang.Exception: q.size = -1

(Run on CE 3.6-ecf93db)

reardonj commented 2 hours ago

I am very suspicious of this line: https://github.com/typelevel/cats-effect/blob/2d89990e15ac0ee72bed2900fab09fcbd28ed3bf/std/shared/src/main/scala/cats/effect/std/Dequeue.scala#L161

if I understand correctly, when there is a pending taker when an offer happens, the state does not increment the size. However, when _take ultimately gets called, it does decrement the size in the state. So when offer is called when a taker is pending, size gets decremented by 1 once both effects have finished executing, instead of getting incremented then decremented.