StephenCleary / AsyncEx

A helper library for async/await.
MIT License
3.49k stars 358 forks source link

AsyncLazy blocks thread until concurrent factory returns asynchronously #243

Open ugumba opened 2 years ago

ugumba commented 2 years ago

I've experienced thread starvation in these cases (using AsyncLazy to populate entries in MemoryCache).

It's only an issue with factory methods which have a long CPU bound lead-in before their first "await", and are not explicitly started on thread pool. (In my use case, I can't predict these cases, and I prefer to avoid the thread overhead.)

To resolve, I made my own AsyncLazy using SemaphoreSlim instead of lock. I'd much prefer to use a reputable nuget, though😊.

StephenCleary commented 2 years ago

Are you using ExecuteOnCallingThread? Because by default AsyncLazy will execute your factory delegate on a thread pool thread.

ugumba commented 2 years ago

Sorry, it's been a while, but yes - I'd tried both. I don't control the factories - they might be synchronously heavy or not. In the quickly-async case i didn't want the overhead of Task.Run(). In the heavy-sync case, I don't want the sync lock potentially blocking 1 thread per consumer.

Lazy<T> just seems like the wrong building block.

ugumba commented 2 years ago

This is my take on it - none of your options are implemented (as I haven't needed them).

    /// <summary>
    /// This implementation differs from Nito.Async's AsyncLazy in one important aspect:
    /// Lazy<T>.Value blocks any subsequent accessing threads while the first accessing thread is initializing Value.
    /// GetValue() below is fully async.
    /// Any exception will be cached by _task.
    /// No retries.
    /// </summary>
    public class AsyncLazy<T>
    {
        private Func<CancellationToken, Task<T?>>? _taskFactory;
        private SemaphoreSlim? _semaphore = new(1);
        private Task<T?>? _task;

        public AsyncLazy(Func<CancellationToken, ValueTask<T?>> factory)
        {
            _taskFactory = async ct => await factory(ct).ConfigureAwait(false);
        }

        public AsyncLazy(Func<CancellationToken, Task<T?>> factory)
        {
            _taskFactory = factory;
        }

        public bool IsValueCreated => _semaphore == null;

        public ValueTask<T?> GetValue(CancellationToken ct = default)
        {
            var semaphore = _semaphore;
            if (semaphore == null)
                // Fast track
                return new(_task!);
            else
                return EnsureHasValueTask(semaphore);

            async ValueTask<T?> EnsureHasValueTask(SemaphoreSlim semaphore)
            {
                using (await semaphore.LockAsync(ct).ConfigureAwait(false))
                {
                    if (_semaphore != null)
                    {
                        try
                        {
                            _task = _taskFactory!(ct);
                            // _task is now set, but may not yet be complete
                            _taskFactory = null; // Release delegate, captured context etc to GC
                        }
                        catch (OperationCanceledException ex) when (ex.CancellationToken == ct)
                        {
                            // Cache synchronous cancellation
                            _task = Task.FromCanceled<T?>(ct);
                            throw;
                        }
                        catch (Exception ex)
                        {
                            // Cache synchronous exception
                            _task = Task.FromException<T?>(ex);
                            throw;
                        }
                        finally
                        {
                            _semaphore = null; // Release to GC, enable fast track above
                        }
                    }
                }

                // Asynchronous exception will be cached here
                return await _task!.ConfigureAwait(false);
            }
        }
    }