zio / zio

ZIO — A type-safe, composable library for async and concurrent programming in Scala
https://zio.dev
Apache License 2.0
4.1k stars 1.29k forks source link

Fiber observers may not be called #9040

Closed steinybot closed 3 months ago

steinybot commented 3 months ago

We were fairly reliably experiencing deadlocks in our tests. It took a while to reproduce locally but I eventually tracked it down to toEffect from interop-cats (https://github.com/zio/interop-cats/issues/694).

Digging a bit deeper it seems that actually the issue is that the callback passed to addObserver might not be called. The same problem affects zio.Runtime.default.run and likely anything else that uses addObserver.

Here is a minimal reproduction:

      test("observers must be called") {
        ZIO.loopDiscard(1)(_ <= 1_000_000, _ + 1) { n =>
          Runtime.default.run(ZIO.succeed(n))
        }.map(_ => assertTrue(true))
      }

On my local environment that will hang by about the 150,000th iteration although it is highly dependent on timing.

There are a few different problems. The modifications to observers are non-atomic and can interleave. Also there is no mutex around the _exitValue check and the modification to observers.

kyri-petrou commented 3 months ago

Hey @steinybot, thanks for your contribution. However, I think the fix is much simpler. The issue isn't with updates being atomic (there can only be 1 thread modifying the state of that var), but rather with _observers not defined as @volatile. When a fiber enters async resumption and a different thread "takes control" of it, it's possible that the state isn't immediately propagated to it.

Can you add the @volatile annotation to _observers and see if that resolves the issue you're experiencing?

steinybot commented 3 months ago

I initially thought the same thing but I don't think it is that simple. The thread calling addObserver is almost certainly not the thread that calls setExitValue. I can double check.

kyri-petrou commented 3 months ago

It doesn't need to be the same one, as long as the fiber is not running yet. I had a quick look at the occurrences of addObserver and they all seem to me to be happening before the fiber is started (note that startSuspended doesn't actually start it until the apply method is called on the returned function)

steinybot commented 3 months ago

Right, gotcha.

It looks like it still does lock up even when @volatile. I'll see if I can find anything definitive.

steinybot commented 3 months ago

fork calls fiber.startConcurrently(zio) which may start the fiber right?

kyri-petrou commented 3 months ago

Yes but the observers are added before fiber.startConcurrently AFAICT. But also, in your reproduction there is no forking happening which is the really bizarre part

kyri-petrou commented 3 months ago

Ahh Runtime.default.run uses async, but also addObserver (which is wrong).. Let me take a look into it

steinybot commented 3 months ago

In the 2 places I have looked which is in https://github.com/zio/zio/blob/8168cddf5026b9fe507f745c3d11d2f82836aeb6/core/shared/src/main/scala/zio/Runtime.scala#L52-L54

and: https://github.com/zio/interop-cats/blob/879070e7f9d2310f21734f81b4d42d35030acc05/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala#L39-L45

They are being added to the fiber returned from fork.

runToFuture calls makeFiber before addObserver so I think that would behave correctly: https://github.com/zio/zio/blob/8168cddf5026b9fe507f745c3d11d2f82836aeb6/core/shared/src/main/scala/zio/Runtime.scala#L176-L180

steinybot commented 3 months ago

Using onExit on the ZIO rather than addObserver on the fiber should also fix it I think.

kyri-petrou commented 3 months ago

Using onExit on the ZIO rather than addObserver on the fiber should also fix it I think.

Yeah let's do this. By the way is Runtime.default.run being used in the interop-cats package? I don't think it's really meant to be used outside of testing (as mentioned in the scaladoc). Can you point me to where it's being used?

steinybot commented 3 months ago

No, it's not using Runtime.default.run but it is using fork and addObserver in the same way. https://github.com/zio/interop-cats/blob/879070e7f9d2310f21734f81b4d42d35030acc05/zio-interop-cats/shared/src/main/scala/zio/interop/package.scala#L39-L45

steinybot commented 3 months ago

Looks like this also works:

          fiber.tell(
            FiberMessage.Stateful(fiber => fiber.addObserver(observer))
          )
steinybot commented 3 months ago

Maybe the UnsafeAPI implementation ought to be doing this?

steinybot commented 3 months ago

All three test runners have the same problem: https://github.com/zio/zio/blob/8168cddf5026b9fe507f745c3d11d2f82836aeb6/test/shared/src/main/scala/zio/test/TestRunner.scala#L67-L72

kyri-petrou commented 3 months ago

Damn, I just realised the unsafe API is public on Fiber.Runtime. Then yes, its implementation should definitely be changed to this:

fiber.tell(
  FiberMessage.Stateful(fiber => fiber.addObserver(observer))
)
kyri-petrou commented 3 months ago

More precisely, it should be changed to something similar to this: https://github.com/zio/zio/blob/8168cddf5026b9fe507f745c3d11d2f82836aeb6/core/shared/src/main/scala/zio/internal/FiberRuntime.scala#L77-L80

steinybot commented 3 months ago

Is that necessary? fiber.addObserver will do the same thing right?

kyri-petrou commented 3 months ago

Let's move the discussion to the PR