StephenCleary / AsyncEx

A helper library for async/await.
MIT License
3.53k stars 356 forks source link

AsyncCollection IsEmpty #264

Open romerod opened 2 years ago

romerod commented 2 years ago

We are using the AsyncCollection to queue messages that have to be sent to a server. The send operation usually takes some time and sending multiple messages at once is faster than sending each message separetely.

Currently we have something like this:

var cancelledToken = new CancellationToken(true);

while(true)
{
     var buffer = new Message[10000]();
     buffer.Add(await asyncQueue.TakeAsync());
     var count = 1;
     while(count < buffer.Length)
     {
          try
          {
               // take currently available items
               buffer[count] = await asyncQueue.TakeAsync(cancelledToken);
               count++;
          } catch (TaskCanceledException) {
                break;
          }
     }

    await SendMessagesAsync(buffer, count);
}

This works great except that the debug output is flooded with TaskCanceledExceptions, especially when the functionality is used in a request/response scenario when messages do not pile up.

Would it be possible to expose the Empty and/or Count property or have a method which gets a specified number of items which are currently available?

romerod commented 1 year ago

Any suggestions on this?

CZEMacLeod commented 1 year ago

@romerod @StephenCleary It seems to me that the inner wait loop in DoTakeAsync should probably be guarded on the cancellation token also: https://github.com/StephenCleary/AsyncEx/blob/0361015459938f2eb8f3c1ad1021d19ee01c93a4/src/Nito.AsyncEx.Coordination/AsyncCollection.cs#L255

while (Empty && !_completed && !cancellationToken.IsCancellationRequested)
{
  if (sync)
  {
      _completedOrNotEmpty.Wait(cancellationToken);
  }
  else
  {
      await _completedOrNotEmpty.WaitAsync(cancellationToken).ConfigureAwait(continueOnCapturedContext: false);
  }
}

I appreciate that as this is async, it might still have the occasional TaskCanceledException if the token was cancelled between the test and the wait, but it would eliminate the problem in this scenario where the token is always cancelled.

I think DoOutputAvailableAsync also needs the change, and then the main loop could be modified to be

var cancelledToken = new CancellationToken(true);

while (true)
{
  IList<Message> buffer = new List<Message>(10000);

  buffer.Add(await asyncQueue.TakeAsync());
  var count = 1;
  while (count < buffer.Count && await asyncQueue.OutputAvailableAsync(cancelledToken))
  {
    try
    {
      // take currently available items
      var message = await asyncQueue.TakeAsync(cancelledToken);
      if (message == null) break;
      buffer[count] = message;
      count++;
    }
    catch (InvalidOperationException)   // Thown if the producer completes
    {
      break;
    }
  }

  await SendMessagesAsync(buffer, count);
}

I think that perhaps the AsyncCollection should have a TryTakeAsync(CancellationToken cancellationToken) method which would be almost the same as the TakeAsync implementation, except not throwing the InvalidOperationException if the underlying take fails.

@romerod As this is open source, you could just include a private copy of the AsyncCollection class, modified for your needs if necessary...

StephenCleary commented 1 year ago

Would it be possible to expose the Empty and/or Count property or have a method which gets a specified number of items which are currently available?

No; there's too much potential for misuse; with concurrent/asynchronous collections, any Empty or Count property can be outdated by the time your code even gets the value.

However, there is a better way to meet your need: a Try* method. In the past, I've avoided Try methods because the meaning of "try" is ambiguous (the most common meaning is "don't throw", but it can also mean "don't behave asynchronously", which would be the meaning in this case). Since System.Threading.Channels has adopted the Try approach with their high-performance asynchronous collections, I'm comfortable adding Try* methods to mine as well.