timcassell / ProtoPromise

Robust and efficient library for management of asynchronous operations in C#/.Net.
MIT License
136 stars 13 forks source link

await Promise<T>.Deferred unexpectedly releasing #458

Open drew-512 opened 3 days ago

drew-512 commented 3 days ago

Hi Tim,

I'm getting a consistent issue that I can't explain -- I'm waiting on a deferred promise and the second time I do so, it immediately returns.

The below client code logging into a server via tcp, where the reply message eventually calls SetStatus() causing the deferred promise to release the await you see in DoLogin().

This issue was present when I was using 2.9 (or so) then upgraded to 3.1.0 and the issue was unchanged.


        public async Promise<Err>           DoLogin() {
            ...
            var err = await Host.AwaitMetaOp(...); // first call -- blocks as expected
            if (err != null) 
                return err;
            ...
            err = await Host.AwaitMetaOp(...);  // ISSUE -- immediately releases!
            if (err != null) 
                return err;
            ...
        }

        public enum OpStatus {
              NotStarted = 0,
              Syncing,
              Busy,
              Synced,
              Closed,
        }

        public Promise<Err>                 AwaitMetaOp(TagSpec attrSpec, IMessage elemVal, ClientOp op) { 
            ...
            op.SetStatus(OpStatus.Syncing);
            var promise = op.Await(OpStatus.Synced);
            _outbox.Enqueue(tx);
            _outboxCount.Release();
            return promise;
        }

        ConcurrentQueue<TxMsg>              _outbox      = new ConcurrentQueue<TxMsg>();
        SemaphoreSlim                       _outboxCount = new SemaphoreSlim(0);

        public Promise<Err>                 Await(OpStatus status) {
            var waiter = new StatusAwaiter(status);
            _waiters ??= new List<StatusAwaiter>(1);
            _waiters.Add(waiter);
            return waiter.Deferred.Promise;
        }
        List<StatusAwaiter> _waiters;

        public bool                         SetStatus(OpStatus newStatus) {
            for (int i = 0; i < _waiters.Count; i++) {
                var wi = _waiters[i];
                if (wi.WaitingFor == newStatus) {
                    _waiters.RemoveAt(i);
                    i--;
                    wi.Deferred.Resolve(_lastErr);
                }
            }
            return true;
        } 

        readonly struct StatusAwaiter {
            public readonly OpStatus              WaitingFor;
            public readonly Promise<Err>.Deferred Deferred;

            public StatusAwaiter(OpStatus status) {
                this.WaitingFor = status;
                this.Deferred   = Promise.NewDeferred<Err>(); 
            }
        }
drew-512 commented 3 days ago

If I break when issuing the second deferred promise, I see _deferredId and _promiseId are 1 -- which I would expect to be 2?

I tried stepping in but it I was hard to know what is the correct behavior since the first deferred promise has already been resolved by this point.

Screenshot 2024-07-01 at 6 44 28 PM

drew-512 commented 3 days ago

Open to feedback on code as well if you see a nicer pattern.

timcassell commented 3 days ago

How do you know it continued immediately? Your debug info says the promise is pending.

Is SetStatus being called on another thread?

timcassell commented 2 days ago

I think I understand what's happening. I'll post a detailed explanation and suggested fix soon.

timcassell commented 2 days ago

The simple explanation is promise continuations are executed synchronously by default. When you call wi.Deferred.Resolve(_lastErr); inside SetStatus method, it continues the DoLogin async method, which recursively calls into AwaitMetaOp which calls into Await which calls _waiters.Add(waiter);. When the stack unwinds back to SetStatus, the next time it goes around the loop the _waiters list has a new item added to it, so it also processes that. If you check the stacktrace when you hit that breakpoint, you will see it.

You only want each server response to continue a single promise. The simple fix is to simply break out of the loop when the first eligible waiter is found.

        public bool                         SetStatus(OpStatus newStatus) {
            for (int i = 0; i < _waiters.Count; i++) {
                var wi = _waiters[i];
                if (wi.WaitingFor == newStatus) {
                    _waiters.RemoveAt(i);
-                   i--;
                    wi.Deferred.Resolve(_lastErr);
+                   break;
                }
            }
            return true;
        } 

But, this is just a bandaid. Your architecture can only safely handle a single request at a time. 2 or more concurrent requests introduces all sorts of nasty race conditions. (Scanning the _waiters list and matching on the enum does not guarantee that the response is mapped to the correct request.) Unless your lower level socket only supports 1 request at a time (which you didn't post the code for it, but I assume it's running on a separate thread and only handles 1 request at a time based on your use of the semaphore). And your _waiters list is not synchronized.

If you want to be able to support multiple concurrent requests, you should consider re-architecting your network infrastructure to include a request id in the payload. The response payload should include the same id so you can map it to the correct promise. Something like this:

public class Network
{
    private const int MaxConcurrentRequests = 4;

    private int _id;
    private readonly ConcurrentDictionary<int, Promise<Response>.Deferred> _deferreds = new ConcurrentDictionary<int, Promise<Response>.Deferred>();
    private readonly AsyncSemaphore _sem = new AsyncSemaphore(MaxConcurrentRequests);

    public async Promise<Response> SendRequest(Request request)
    {
        using (await _sem.EnterScopeAsync())
        {
            int requestId = Interlocked.Increment(ref _id);
            var deferred = Promise.NewDeferred<Response>();
            if (!_deferreds.TryAdd(requestId, deferred))
            {
                _deferreds.TryRemove(requestId, out var oldDeferred);
                oldDeferred.Reject("Response never received.");
                _deferreds.TryAdd(requestId, deferred);
            }

            // Serialize request and requestId and send it to your socket here.

            return await deferred.Promise;
        }
    }

    // Called from your socket after deserializing the requestId and response.
    private void Resond(int requestId, Response response)
    {
        if (!_deferreds.TryRemove(requestId, out var deferred))
        {
            throw new ArgumentException($"No request is waiting for requestId {requestId}.");
        }
        deferred.Resolve(response);
    }
}
drew-512 commented 2 days ago

Appreciate the analysis.

Yeah, I thought .Deferred.Resolve() releases on context relinquish, not synchronously yikes. Too much time in Go lately.

Basically, on the client side, the idea is to dispatch any number of client requests ("ops"), but as an op's status is altered from a reply via SetStatus(), those waiting on the op are released. This way I can block whatever easily until the op is at the desired state. However, .Deferred.Resolve() having sub-dispatch (vs queued-dispatch) behavior makes that totally wrong as you covered.

Your mock code is what I had initially but then others calls need to block until the request comes back -- and those other calls can materialize after the request is sent -- hence the introduction of Await(OpStatus status)

The behavior I'm looking for is something that resolves a deferred promise from "outside" the main run loop. That seems available by moving matching waiters into a closure that is executed later.

Trying to think of an efficient pattern for that.

timcassell commented 2 days ago

Yeah, I thought .Deferred.Resolve() releases on context relinquish, not synchronously yikes. Too much time in Go lately. ... The behavior I'm looking for is something that resolves a deferred promise from "outside" the main run loop. That seems available by moving matching waiters into a closure that is executed later.

Trying to think of an efficient pattern for that.

You can use promise.WaitAsync(SynchronizationContext.Current, forceAsync: true) to make the continuation execute asynchronously on the current context.

Basically, on the client side, the idea is to dispatch any number of client requests ("ops"), but as an op's status is altered from a reply via SetStatus(), those waiting on the op are released. This way I can block whatever easily until the op is at the desired state. However, .Deferred.Resolve() having sub-dispatch (vs queued-dispatch) behavior makes that totally wrong as you covered.

Your mock code is what I had initially but then others calls need to block until the request comes back -- and those other calls can materialize after the request is sent -- hence the introduction of Await(OpStatus status)

I assume the "op" is the ClientOp. You could use an AsyncLazy<T> inside the ClientOp which can be awaited multiple times. Or you could try using Promise<T>.Retainer if you want to avoid the extra allocation. The mapped responses can (and should imo) be used even with your re-usable "ops".

drew-512 commented 2 days ago

I assume the "op" is the ClientOp. You could use an AsyncLazy<T> inside the ClientOp which can be awaited multiple times. Or you could try using Promise<T>.Retainer if you want to avoid the extra allocation. The mapped responses can (and should imo) be used even with your re-usable "ops".

Yeah, I already do the mapped responses so this is about deferring others waiting on a particular response state (not apparent since I tried to edit down the code for troubleshooting).

I'm trying to understand what the AsyncLazy<T> and Promise<T>.Retainer patterns look like since promise.WaitAsync(SynchronizationContext.Current, forceAsync: true) seems to address the behavior where one wants deferred invocation rather than immediate invocation.

timcassell commented 2 days ago

this is about deferring others waiting on a particular response state

I see. I thought you were just trying to wait until completion. If simply deferring the continuation solves your issue, then feel free to ignore the rest of my comments, as I do not have the full context of your architecture.

I'm trying to understand what the AsyncLazy<T> and Promise<T>.Retainer patterns look like

For this case you would need an AsyncLazy for each state. But that would probably complicate your code rather than simplify it, so your list scanning approach is probably good enough (as long as you are only operating on that unsynchronized list on 1 thread).