fsprojects / FSharp.Control.AsyncSeq

Asynchronous sequences for F#
https://fsprojects.github.io/FSharp.Control.AsyncSeq/
Other
161 stars 59 forks source link

Replace use of Task.WaitAny for Task.WhenAny #73

Closed johlrich closed 6 years ago

johlrich commented 6 years ago

Added tests for bufferByTime/bufferByCountAndTime (timing based tests, but should be little risk of false positives from them I think). Could probably be simpler, but this way worked out using the reported repro.

While I focused on those two for the tests since that's what the original issue reported (and bufferByCountAndTime was my use case too), I noticed chooseTasks/2 are being used in quite a few functions, so not sure if there should be more tests added.

Should fix #65

johlrich commented 6 years ago

Will push other commit after CI shows red for failing test

johlrich commented 6 years ago

Travis red, but looks like there are some changes pending to it in another PR. AppVeyor shows red w. failing test and green after proposed fix though.

eulerfx commented 6 years ago

Hey, thx for the PR.

Regarding WaitAny vs WhenAny - I recall that some benchmarks show that the blocking call is faster. I think it depends on the scenario and something I've struggled to find a great solution for. On the one hand, it can be more efficient to have a few threads blocked, rather than incurring the cost of the Async machinery. However, if there are lots of computations like this, then more efficient to go the non-blocking route.

johlrich commented 6 years ago

Any insights into the various scenarios favoring one approach vs another for perf? I can take a deeper look and do some benchmark/profiling to see if we can come up with a single approach.

eulerfx commented 6 years ago

Hey, just following up on this. I tested with the following workload:

let N = 100L
let bufferSize = 100
let bufferTime = 1000
let P = 1000

let go n = async {
  return!
    AsyncSeq.init n id
    |> AsyncSeq.mapAsync (fun i -> async {
      do! Async.Sleep 0
      return i })
    |> AsyncSeq.bufferByCountAndTime2 bufferSize bufferTime
    |> AsyncSeq.iter ignore
}

Seq.init P id
|> Seq.map (fun _ -> go N)
|> Async.Parallel
|> Async.RunSynchronously

And your solution does quite a bit better, especially in cases where there is more parallelism. This is expected, since the current solution blocks, increasing contention.

In the following, the first result is the existing solution and the second, your non-blocking solution:

let N = 100000L
let bufferSize = 100
let bufferTime = 1000
let P = 10

//Real: 00:02:03.090, CPU: 00:04:31.515, GC gen0: 917, gen1: 864, gen2: 53
//Real: 00:01:27.512, CPU: 00:03:20.750, GC gen0: 986, gen1: 914, gen2: 59
let N = 10000L
let bufferSize = 100
let bufferTime = 1000
let P = 100

//Real: 00:02:46.513, CPU: 00:03:01.734, GC gen0: 993, gen1: 927, gen2: 66
//Real: 00:00:52.050, CPU: 00:02:03.796, GC gen0: 996, gen1: 917, gen2: 69
let N = 100L
let bufferSize = 100
let bufferTime = 1000
let P = 1000

// ???
//Real: 00:00:04.838, CPU: 00:00:09.859, GC gen0: 103, gen1: 97, gen2: 6

I also tried an alternate implementation:

    static member chooseChoice (a:Async<'a>) (b:Async<'b>) : Async<Choice<'a * Async<'b>, 'b * Async<'a>>> = async {
      let! ct = Async.CancellationToken
      return!
        Async.FromContinuations <| fun (ok,err,cnc) ->
          let state = ref 0
          let resA = TaskCompletionSource<_>()
          let resB = TaskCompletionSource<_>()
          let inline oka a =
            if (Interlocked.CompareExchange(state, 1, 0) = 0) then 
              ok (Choice1Of2 (a, resB.Task |> Async.AwaitTask))
            else
              resA.SetResult a
          let inline okb b =
            if (Interlocked.CompareExchange(state, 1, 0) = 0) then 
              ok (Choice2Of2 (b, resA.Task |> Async.AwaitTask))
            else
              resB.SetResult b
          let inline err (ex:exn) =
            if (Interlocked.CompareExchange(state, 1, 0) = 0) then 
              err ex
          let inline cnc ex =
            if (Interlocked.CompareExchange(state, 1, 0) = 0) then
              cnc ex
          Async.startThreadPoolWithContinuations (a, oka, err, cnc, ct)
          Async.startThreadPoolWithContinuations (b, okb, err, cnc, ct) }

That actually does slightly better, but needs more testing with respect to how it handles cancellations.

So, I'm tempted to merge this PR as is.

eulerfx commented 6 years ago

Released: https://www.nuget.org/packages/FSharp.Control.AsyncSeq/2.0.17

johlrich commented 6 years ago

Sweet, thanks for following up!