Azure / azure-cosmos-dotnet-v3

.NET SDK for Azure Cosmos DB for the core SQL API
MIT License
741 stars 494 forks source link

ReadManyItemsAsync -> ReadManyTaskHelperAsync wraps forked IO task into Task.Run #4527

Open yar-shukan opened 5 months ago

yar-shukan commented 5 months ago

Describe the bug ReadManyTaskHelperAsync code has suspicious Task.Run call that looks as a potential issue: Task.Run schedules task on thread pool's thread and potentially can cause thread pool being exhausted.

This Task.Run call looks unnecessary and instead of wrapping async code into Task.Run (that can put higher pressure on thread pool) ReadManyTaskHelperAsync -> for loop -> tasks.Add(Task.Run(async () => why code doesn't do this instead: ReadManyTaskHelperAsync -> for loop -> tasks.Add(DoStuffAsync(params, semaphore))

private Task<List<ResponseMessage>> DoStuffAsync(.. otherParams, SemaphoreSlim semaphore)
{
    try
    {
       QueryDefinition queryDefinition = ((this.partitionKeySelectors.Count == 1) && (this.partitionKeySelectors[0] == "[\"id\"]"))
            ? this.CreateReadManyQueryDefinitionForId(entry.Value, indexCopy)
            : this.CreateReadManyQueryDefinitionForOther(entry.Value, indexCopy);

        return await this.GenerateStreamResponsesForPartitionAsync(queryDefinition,
            entry.Key,
            readManyRequestOptions,
            childTrace,
            cancellationToken);
    }
    finally
    {
        semaphore.Release();
        childTrace.Dispose();
    }
}

To Reproduce Run ReadManyTaskHelperAsync

Expected behavior Task.Run is not called in ReadManyItemsAsync execution. Thread pool is not used.

Actual behavior Task.Run is called in ReadManyItemsAsync execution. Thread pool is used.

Environment summary SDK Version: 3.39.1 OS Version: Windows

Additional context There was potential issue reported here but it was not really proven that issue came out of this method call and reporter eventually mitigated with other solution.

cc @ealsur

yar-shukan commented 4 months ago

@ealsur , @Pilchie , a friendly reminder about PR to progress with it. do you have any concern regarding it or can we move on with it? thanks!

yar-shukan commented 1 month ago

Running benchmark (see code below) with bombarder (Run example: .\bombardier-windows-amd64.exe https://localhost:5001/api/diagscenario/withtaskrun/500 -d 90s -c 500) I came to conclusion this change doesn't worth it: I can't see any big difference our improvement in throughput/latency and I can't see any diff in dotnet counters regarding ThreadPool Thread Count or ThreadPool Queue Length

[HttpGet]
[Route("withtaskrun/{taskCount}")]
public async Task<ActionResult<int>> WithTaskRun(int taskCount)
{
    _logger.LogInformation($"Started with: #{Interlocked.Increment(ref _counterStarted)}");

    var semaphore = new SemaphoreSlim(parallelization, parallelization);

    var tasks = new List<Task<Customer>>();

    for (int i = 0; i < taskCount; i++)
    {
        tasks.Add(Task.Run(async () =>
        {
            await semaphore.WaitAsync().ConfigureAwait(false);

            try
            {
                return await PretendQueryCustomerFromDbAsync(Guid.NewGuid().ToString()).ConfigureAwait(false);
            }
            finally
            {
                semaphore.Release();
            }
        }));
    }

    await Task.WhenAll(tasks).ConfigureAwait(false);

    var completed = Interlocked.Increment(ref _counterCompleted);

    _logger.LogInformation($"Completed with: #{completed}");

    return completed;
}

[HttpGet]
[Route("withouttaskrun/{taskCount}")]
public async Task<ActionResult<int>> WithoutTaskRun(int taskCount)
{
    _logger.LogInformation($"Started without: #{Interlocked.Increment(ref _counterStarted)}");

    var semaphore = new SemaphoreSlim(parallelization, parallelization);

    var tasks = new List<Task<Customer>>();

    for (int i = 0; i < taskCount; i++)
    {
        tasks.Add(PretendQueryCustomerFromDbAsync(semaphore));
    }

    await Task.WhenAll(tasks).ConfigureAwait(false);

    var completed = Interlocked.Increment(ref _counterCompleted);

    _logger.LogInformation($"Completed without: #{completed}");

    return completed;
}

async Task<Customer> PretendQueryCustomerFromDbAsync(SemaphoreSlim semaphore)
{
    await semaphore.WaitAsync().ConfigureAwait(false);

    try
    {
        return await PretendQueryCustomerFromDbAsync(Guid.NewGuid().ToString()).ConfigureAwait(false);
    }
    finally
    {
        semaphore.Release();
    }
}

async Task<Customer> PretendQueryCustomerFromDbAsync(string customerId)
{
    int c = 0;
    var r = new Random();

    for(int i = 0; i < 100; i++)
    {
        c += r.Next(0, 100);
    }

    await Task.Delay(100).ConfigureAwait(false);

    for (int i = 0; i < 100; i++)
    {
        c += r.Next(0, 100);
    }

    return new Customer(customerId + c.ToString());
}
Semaphore Connections ThreadCount WithTaskRun: Reqs/sec WithoutTaskRun: Reqs/sec WithTaskRun: Latency(ms) WithoutTaskRun: Latency(ms)
40 125 1 1228 1207 108 107.94
40 125 3 1189.41 1192.93 108.17 108.15
40 125 10 1196.73 1217.05 108.15 108.07
40 125 50 607.09 587.95 215.92 216.36
40 125 100 415.85 396.91 323.66 323.26
40 125 500 88.94 89.05 1420 1400
40 500 1 2244.22 2126.54 228.45 253.18
40 500 3 2336.36 2416.46 222.33 215.54
40 500 10 2221.78 2366.41 232.82 219.07
40 500 50 2381.99 2326.66 216.49 219.06
40 500 100 1576.7 1592.65 324.74 324.52
40 500 500 382.75 367.4 1410 1410
kirankumarkolli commented 1 month ago

Thank you @yar-shukan for doing through analysis.

Task.Run was suspected already few times and it sure does put spiky load on the threadpool, so the real value is to control bustability. Assuming that the perf wise there is no-impact, I am inclined towards removing Task.Run (i.e. commit this PR) as there are no known side-affects. Thoughts?

yar-shukan commented 1 month ago

@kirankumarkolli I couldn't really see any difference between 2 of approaches: I see both ThreadPool Thread Count and ThreadPool Queue Length behave same and ThreadPool Thread Count in my test did not go ever above SemaphoreSlim initialCount parameter (I used 40 in my tests simulating Environment.ProcessorCount=4 case in codebase). I don't fully understand why though.

In theory the version without Task.Run should be more lightweight, but I could not proof it running test above.

@stephentoub your check and comment on this case would be very helpful. thanks!

stephentoub commented 1 month ago

why code doesn't do this instead

If I'm understanding the question right, one reason such a Task.Run might be used is if the body of the work does a non-trivial amount of synchronous work before getting to anything that might await. Imagine it was this:

List<Task> tasks = new();
for (int i = 0; i < 100; i++)
{
    tasks.Add(ProcessAsync(i));
}

static async Task ProcessAsync(int i)
{
    Thread.Sleep(1_000); // representing some amount of work
    await SomethingElse();
}

With this scheme, that SpinWait in ProcessAsync is part of the synchronous call and thus is part of the loop, e.g. the 99th iteration of the loop won't even call ProcessAsync for over a minute, because the 99 iterations that came before it all did at least a second's worth of work. If instead it's:

List<Task> tasks = new();
for (int i = 0; i < 100; i++)
{
    int iter = i;
    tasks.Add(Task.Run(() => ProcessAsync(iter));
}

static async Task ProcessAsync(int i)
{
    Thread.Sleep(1_000); // representing some amount of work
    await SomethingElse();
}

now the only work the loop itself is doing is queueing 100 work items, such that none of those work items is at all dependent on any other iteration's up-front work completing before it's invoked.

I don't know if that's the case here, but that's the primary reason someone would opt to use a Task.Run in a case like this.

yar-shukan commented 1 month ago

@stephentoub , in case of this particular code it doesn't seem to do heavy CPU (see details here): it does some StringBuilder stuff, but it doesn't look heavy operation.

Considering that this code might be executed 100 times in loop (e.g. the CosmosDB collection partitions count = 100) my concern was that it would need thread pool threads instead of doing this on main calling thread and it can cause thread pool thread starvation.

But running simulating benchmark (I used for loop with 100 items, see example above) I couldn't really spot a difference between both versions of the code with async method or Task.Run in terms of thread pool usage. That's the main part I am trying to understand why?

Having that I am not sure if we really need to do anything with this Task.Run and use async yielded methods instead.

stephentoub commented 1 month ago

and it can cause thread pool thread starvation.

Why would it cause starvation? Is this thread synchronously blocking on something after kicking off those work items?

yar-shukan commented 1 month ago

Starvation probably is not the right word here: assumption was that Task.Run version would use thread pool heavier compared to code that does not use Task.Run and uses Tasks from async methods instead which execute on calling thread until place where there's await yield on I/O operation call (down the call stack) or await on SemaphoreSlim.WaitAsync() call.

stephentoub commented 1 month ago

It will queue a work item. That doesn't necessarily translate into heavier use of the pool, though. For example, there's a good chance this code is already running on the thread pool, in which case that queued task will end up going into the thread's local queue, and if there's no other thread available to pick up that work, this thread will just process it the next time it yields back to the thread pool's dispatch loop.