Open eyalfa opened 1 month ago
CC: @kyri-petrou
...I also have a thought about eliminating the fibers altogether, but this requires some more work at this point.
Yeah it's kind of a known issue: https://github.com/zio/zio/issues/7628
Zio 1 handled race as a special case but that was lost in zio 2.
@ghostdogpr will u consider accepting this as a PR (I'll fill in on the referred issue later on)
Wouldn't it be better to make race faster rather than just timeout?
first of all yes, let's make it faster! however, timeoutXXX operators are special as they don't really need the second fiber (I suspect I can get rid of the first one as well 😎 ) so I think they deserve their own optimization.
Sure, why not 😄
btw, I started investigating this from the streams angle. stream.timeoutXXX is implemented in terms of pull.timeoutXXX which basically forks per pull, effectively destroying the stream's performance.
notice Akka's impl doesn't suffer from this (it duffers from other things as it's not batching...) since adding a timeout just means the underlying actor has to react to another message
btw, I suspect moving race
into FiberRuntime
won't solve the issue here since the dominant part here is forking these two fibers, not reacting to their completion.
I also suspect there are other scenarios where at least one of the fibers can be eliminated, consider scenarios where one fiber is already running and now we're starting an operation that's supposed to race with it. This happens in ZChannel.mergeWith
where ZIO actually forks and races two Fiber.join
effects (I did partly optimize this one with a poll), I believe there's no real need for the two 'joiners' in this case.
I think the approach has some merit, but I think it can be optimized a bit further and cleaned up to make it easier to follow / debug. How does this do on the benchmark you wrote @eyalfa
final class TimeoutTo[-R, +E, +A, +B](self: ZIO[R, E, A], b: () => B) {
def apply[B1 >: B](f: A => B1)(duration: => Duration)(implicit
trace: Trace
): ZIO[R, E, B1] =
ZIO.uninterruptibleMask { restore =>
Promise
.make[E, B1]
.flatMap { promise =>
ZIO
.withFiberRuntime[R, Nothing, Unit] { (state, _) =>
val clock = state.getFiberRef(DefaultServices.currentServices).get[Clock]
clock.scheduler.flatMap { scheduler =>
val cancelTimeout = scheduler.schedule(
{ () =>
val timedOut = promise.unsafe.completeWith(Exit.succeed(b()))(Unsafe)
if (timedOut) state.tellInterrupt(Cause.interrupt(state.id))
},
duration
)(Unsafe)
restore(self).exitWith { exit =>
cancelTimeout()
promise.unsafe.done(exit.mapExit(f))(Unsafe)
Exit.unit
}
}
}
.fork
.flatMap { fib =>
restore(promise.await).exitWith(exit => fib.await *> fib.inheritAll *> exit)
}
}
}
}
@kyri-petrou I'll check, but must say I don't see the benefit of using promise instead of directly interacting with the fiber. furthermore, the fiber offers the poll functionality which enables short circuiting for extremely fast effects.
another point, my (very) sketchy impl triggered a repetitive error logging from FiberRuntime, I've currently commented it out but I guess this isn't really acceptable
@kyri-petrou , can u please explain y do u expect this to better perform? on one hand u add the promise (which should be equivalent to the asyc's callback), but on the other hand u dropped the short circuiting about ever starting the fiber or bypassing the async (when the fiber completes 'quickly').
btw, I can c now that my impl fails to inheritAll
when the fiber wins 'quickly'
@kyri-petrou , benchmark's results:
Benchmark (n) Mode Cnt Score Error Units
AwaitBenchmark.zioBaseline 10000 thrpt 15 15534344.606 ± 398913.505 ops/s
AwaitBenchmark.zioTimeout 10000 thrpt 15 87567.913 ± 1883.883 ops/s
AwaitBenchmark.zioTimeout1 10000 thrpt 15 1195589.479 ± 60252.973 ops/s
AwaitBenchmark.zioTimeout2 10000 thrpt 15 95143.597 ± 1243.427 ops/s
@kyri-petrou , @ghostdogpr , another thought, this approach gets over 10X improvement - where is it coming from?
forking one less fiber, different interrupt mechanism for the loser (always the scheduler in this benchmark), seems way more efficient, direct schedule instead of sleep, no join on the looser since it's not a fiber, as part of no join, no need to inherit fiber refs.
also, roughly the same async callback (I might have to introduce an atomic boolean instead of suppressing the error log), same unsafe operations for crating, starting and monitoring the child fiber.
where do u think we gain all of this time?
@eyalfa what's the benchmark code like? My guess would be that it's because the effect you're using .timeout
on is evaluated "too quickly" and because of that the evaluation continues synchronously. However if that's the case, it isn't really realistic because I would expect users to use timeout on effects that perform some sort of IO
I tend to agree, we definitely need more benchmarks.
I found few streaming related use cases that end up applying timeouts and races per pull, i.e. stream's completion timeout or stream inactivity timeout (implemented by ZStream.timeoutXXX
).
btw, introducing the short circuiting contributed only 20% of the gain for this benchmark so I tend to think most of the gain comes from saving a fork, interrupt and await. actually unlike fiber's interruption, cancelling the scheduler's cancellable returns immediately and certainly doesn't require setting an async callback...
@kyri-petrou , double checked: disabling the short circuiting on the 'self' fiber doesn't affect the benchmark results:
Benchmark (n) Mode Cnt Score Error Units
AwaitBenchmark.zioTimeout1 10000 thrpt 15 1135251.684 ± 95081.498 ops/s
Total time: 357 s (05:57), completed 23 Sept 2024, 14:23:51
@eyalfa how did you disable it? The FiberRuntime
loop will go into synchronous resumption if the callback is called fast enough
I've disabled the short circuiting in the TimeoutTo.apply
method, hence always returning Left(canceller)
.
it's true the fiber may 'catch' this message in the inbox before yielding, but this is also true for the original implementation which is x10 slower than this one.
I agree that this benchmark is a bit extreme but I believe it displays the overhead of the original and suggested implementations.
@kyri-petrou , I've added some 'meat' to the effect itself:
ZIO.foldLeft(0 until 1000)(0){
case (prev, x) => ZIO.succeed(prev + x)
}
and the results are a bit less dramatic but still shows a 2X gain for this approach:
Benchmark (n) Mode Cnt Score Error Units
AwaitBenchmark.zioBaseline 10000 thrpt 15 84051.666 ± 889.037 ops/s
AwaitBenchmark.zioTimeout 10000 thrpt 15 37983.031 ± 1144.914 ops/s
AwaitBenchmark.zioTimeout1 10000 thrpt 15 65933.580 ± 758.363 ops/s
AwaitBenchmark.zioTimeout2 10000 thrpt 15 38736.974 ± 984.768 ops/s
[success] Total time: 1270 s (21:10), completed 23 Sept 2024, 22:44:27
@kyri-petrou , @ghostdogpr , finally had the time to finish this, I'd appreciate your reviews on #9261 Thanks!
/attempt #9211
with your implementation plan/claim #9211
in the PR body to claim the bountyThank you for contributing to zio/zio!
/bounty $150
I've added couple of benchmark comparing simple effect evaluation, same effect with a large timeout (hence not failing on TO) and a (surprising 😎 ) alternative timeout implementation.
benchmarks results:
The original implementation is based on
raceFibersWith
which basically forks theself
affect and asleep
effect, the idea behind the alternative implementation is to eliminate the second fiber and replace it withclock.scheduler.schedule
and control the entire thing usingZIO.asyncInterrupt
.As seen in the benchmark results, I already have a branch with a crude implementation, I'd be happy to hear some feedback on the idea, and (even better) receive some additional benchmarking suggestions.