typelevel / cats-effect

The pure asynchronous runtime for Scala
https://typelevel.org/cats-effect/
Apache License 2.0
1.99k stars 511 forks source link

IO.race doesn't propagate winner's IOLocal context #3100

Open vladislavsheludchenkov opened 1 year ago

vladislavsheludchenkov commented 1 year ago

We're using IOLocal for trace id propagation. It worked for us perfectly as expected when dealing just with IO instances. However, currently we can't use IOLocal magic with fs2 streams, due to them using IO.race to handle interruptions.

Here's a cats-effect example:

import cats.effect.IO
import cats.effect.IOLocal
import cats.effect.unsafe.implicits.global

val effectWithoutRace = for {
  local <- IOLocal[Option[String]](None)
  _ <- local.set(Some("uhh... hi!"))
  _ <- local.get.flatMap(IO.println)
} yield ()

effectWithoutRace.unsafeRunSync() // prints Some(uhh... hi!)

def withRace[A](effect: IO[A]): IO[A] = effect.race(IO.never).map(_.merge)

val effectWithRace = for {
  local <- withRace(IOLocal[Option[String]](None))
  _ <- withRace(local.set(Some("uhh... hi!")))
  _ <- withRace(local.get.flatMap(IO.println))
} yield ()

effectWithRace.unsafeRunSync() // prints None

Scastie for this code snippet: https://scastie.scala-lang.org/jshavzY0STWiAESgLgTqHg

Wrapping IOLocal.set statement in IO.race leads to context being enriched with a new value and then immediately discarded on exiting the race.

And here's an example with fs2 that we've actually encountered in the wild:

import cats.effect.IO
import cats.effect.IOLocal
import cats.effect.unsafe.implicits.global

val stream = for {
  local <- fs2.Stream.eval(IOLocal[Option[String]](None))
  _ <- fs2.Stream.eval(local.set(Some("uhh... hi!")))
  _ <- fs2.Stream.eval(local.get.flatMap(IO.println))
} yield ()

stream.compile.drain.unsafeRunSync() // prints Some(uhh... hi!)

stream.interruptWhen(IO.never[Either[Throwable, Unit]]).compile.drain.unsafeRunSync() // prints None

Scastie for this code snippet: https://scastie.scala-lang.org/LghgY5G4RMWx0f4CRmUIqw

So any usage of interruptWhen on a fs2.Stream wraps every eval into F.race to handle actual interruptions.

A possible solution to make IOLocal usable with fs2 would be preserving IOLocal changes made within the race if they're made from an effect that won the race. While it makes sense to avoid merging contexts after joining back from IO.both, IO.race should be less problematic, as no merging is required here, just preserving winner's context.

Shoutout to @danielleontiev for debugging this issue with me

armanbilge commented 1 year ago

Thanks for reporting. Currently this is by-design.

We also encountered this when investigating tracing for Streams. See

See also some discussion on more sophisticated semantics for IOLocals.

vladislavsheludchenkov commented 1 year ago

So it's a "can't fix" from fs2 side and "won't fix" from cats-effect side? That's sad, IOLocals are amazing when they work properly.

But I didn't see a discussion about race semantics in the initial merge request, which is not exactly the same as fork/join. Since racePair is already handled within IOFiber#runLoop, it looks like localState may be handled there as well. It will introduce some complexity, but not too much IMO. What do you think?

vladislavsheludchenkov commented 1 year ago

I've taken a closer look on a racePair, and it looks like it's also used for situations where there is no clear winner (e.g. both), so always propagating localState there is not an option, as it'll make these situations a lot more confusing.

So it's a bit more complexity than I expected initially, but we can still reimplement race and raceOutcome so they preserve localState somehow, because they have a clear winner and we don't have to debate which context we should keep and which to discard

djspiewak commented 1 year ago

So the general solution that was envisioned for this when we decided the local semantics is actually IOLocalRef, which ironically wasn't implemented at the time because no one could come up with a practical scenario where it was needed. Fast forward a bit and this is actually the first time I've heard of this interaction with Fs2! Whoops.

The problem in general with IOLocal itself having special interactions with fiber joining is that most interesting concurrent code uses Deferred or Queue rather than join, and obviously you can't enrich those with special magic. IOLocalRef solves this though by putting a Ref in the local and mutating that, so your context propagates around.

armanbilge commented 1 year ago

This is the first I heard about IOLocalRef :) what exactly are its semantics? It sounds a lot like putting a Ref inside an IOLocal which I'm not sure would work for tracing.

djspiewak commented 1 year ago

@armanbilge That's exactly what it was proposed to be. Like I said, we never actually implemented it. :-) But basically, it was intended to be Local's API, but with a Ref inside of it and an explicit fork (or something) operator. This would give you the ability to share state arbitrarily around fibers.

armanbilge commented 1 year ago

Just to throw it out there, another possible way we could try and support these usecases without implementing IOLocal "merging". This would require us to compromise on fiber identity instead, but then we could do something like this:

def timeout[A](ioa: IO[A], duration: Duration): IO[A] = 
  IO.fiber { fiber => // get handle to the current fiber
    (IO.sleep(duration) *> fiber.cancel).background.surround {
      ioa
    }
  }

In this case, you never leave the fiber started the timeout, so locals propagate completely naturally. (You also save the "overhead" of creating and scheduling an extra fiber to run ioa.)

This would have limited applicability however e.g. you could not implement timeoutTo like this, since timeouts require canceling a fiber, and if that's the fiber you are on, you cannot recover from that :)

djspiewak commented 1 year ago

This would have limited applicability however e.g. you could not implement timeoutTo like this, since timeouts require canceling a fiber, and if that's the fiber you are on, you cannot recover from that :)

I think this would also change the definition of timeout, since a failed timeout would result in canceled, rather than the current semantic which results in raiseError(new TimeoutException).

armanbilge commented 1 year ago

Oh you're right! I forgot that's how it's working currently, for some reason I thought it just canceled. So, yeah, tricky :)