Azure / azure-amqp

AMQP C# library
Other
94 stars 70 forks source link

DrainAsync does not wait for messages to be released #252

Open JoshLove-msft opened 9 months ago

JoshLove-msft commented 9 months ago

The DrainAsync method on ReceivingAmqpLink does not wait for any received messages that will end up being released to actually be released. Messages will be released here if there is no active receive call. When draining, it would be better if this "cleanup" step could be awaited so that draining would provide deterministic behavior for consumers. In the Service Bus library, we rely on DrainAsync to ensure FIFO ordering of sessions.

xinchen10 commented 8 months ago

That's beyond the purpose of the DrainAsync API which is to use up the credits and stop the message flow afterwards. It does not change the behavior of inflight messages before the drain completes. To satisfy your requirement, we would need to introduce a new API like the following: Task<IList> DrainMessagesAsync(...); Which will do both drain and receive in one call. In theory you could implement it by calling ReceiveAsync in a loop while DrainAsync is in progress, and returning the accumulated messages during that time.

JoshLove-msft commented 8 months ago

The desired API wouldn't have to return the messages. It could just release the received messages. Something like DrainAndReleaseAsync.

But we could also probably implement the suggested workaround in the Service Bus library.

JoshLove-msft commented 8 months ago

Related PR - https://github.com/Azure/azure-sdk-for-net/pull/40457

m-redding commented 7 months ago

@xinchen10 I tried replacing our call to DrainAsync with the following, attempting to implement the suggested workaround (I also tried a couple other approaches (ContinueWith, checking if the drainTask .IsCompleted)

using var backgroundCts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
var drainTask = link.DrainAsyc(cancellationToken).ConfigureAwait(false);
List<AmqpMessage> additionalMessages = new();
try
{
    _ = Task.Run(async () =>
    {
        while (!backgroundCts.Token.IsCancellationRequested)
        {
            var additionalMessage = await link.ReceiveMessageAsync(maxWaitTime ?? timeout, cancellationToken).ConfigureAwait(false);
             additionalMessages.Add(additionalMessage);
        }
    }, cancellationToken);
    await drainTask;
    backgroundCts.Cancel();
}
// add additional messages to messages to return

However, I haven't been able to do so in a way that would guarantee the receive call would not exceed the drain call and basically just undo credits being drained. The FIFO in sessions ordering issue is worse with this approach - and I often see Service Bus session lock lost exceptions. Is this the approach what you had in mind? I wonder if the drain and release API needs some transparency into the lower-level information - whether that means implementing on the AMQP side or using some information already exposed to better inform if we should do another receive call.

JoshLove-msft commented 7 months ago

@xinchen10 mentioned that DrainAsync can be updated to reset the total link credits to 0 so that it works correctly when prefetch is enabled.

Currently, we have to do the following to resume prefetching after draining:

  link.Settings.TotalLinkCredit = 0;
  link.SetTotalLinkCredit((uint)_prefetchCount, true, true);