Dasync / AsyncEnumerable

Defines IAsyncEnumerable, IAsyncEnumerator, ForEachAsync(), ParallelForEachAsync(), and other useful stuff to use with async-await
MIT License
444 stars 56 forks source link

Creating an IAsyncEnumerable from a Task (feature request) #25

Closed julian-goldsmith closed 6 years ago

julian-goldsmith commented 6 years ago

It might be useful to add an IAsyncEnumerable constructor that takes a Task<IEnumerable<T>>, and possibly Task<IAsyncEnumerable<T>>. That would make it so that you don't have to mix async and IAsyncEnumerable if you have async code that gets your IEnumerable.

A (currently-untested) example of what I mean would be something like:

private IAsyncEnumerable<T> CreateAsyncEnumerableFromTask<T>(Func<Task<IEnumerable<T>>> enumerableFunction)
{
    return new AsyncEnumerable<T>(async yield =>
    {
        IEnumerable<T> items = await enumerableFunction();

        foreach (T item in items)
        {
            await yield.ReturnAsync(item);
        }
    });
}

That could probably be implemented much better if it was included in the library itself, maybe as an AsyncEnumerable<T> constructor, or an extension method on Task<IEnumerable<T>>.

An example of how it could be used (really, a distilled version of how I'm going to use the above method) would be:

return CreateAsyncEnumerableFromTask(async delegate
{
    string response = await RunRequest(requestParameters);
    return Deserialize(response);
});
kind-serge commented 6 years ago

I'll have to decline such request, because synchronous enumeration is opposite to what this library does, thus such synchronous enumeration should not be a built-in thing for async enumerators. Instead I'd propose to re-think the problem. AsyncEnumerable is designed for streaming data, where in this scenario you have an in-memory result, which is fetched from somewhere asynchronously and then gets enumerated synchronously. So I would recommend one of 2 options:

  1. Don't use AsyncEnumerable, because there is no obvious benefit in given scenario - no data streaming.
  2. Re-write the code in a such way where RunRequest and Deserialize are parts of the same method which provide streaming, where nothing gets loaded entirely into memory - that's where you should definitely use IAsyncEnumerable. I've personally done that before by processing an HTTP response stream and parsing it with built-in XML parser. It is more complex, but that's how you can get benefits.
julian-goldsmith commented 6 years ago

Fair enough, I can see where you're coming from. The second option is unfortunately not available to me at the moment. The first would put me right back where I started, for reasons I'll explain below.

This is a rough approximation of what I'm trying to do:

class Request
{
    string[] Items;
}

public IAsyncEnumerable<Item> ExtractAsync(object[] parameters)
{
    return
        GetBatchedRequests(parameters).ToAsyncEnumerable().
        SelectMany(request => RunBatchedRequest(request).ToAsyncEnumerable());
}

private async Task<IEnumerable<Request>> GetBatchedRequests(object[] parameters)
{
    Request request = await GetRequest(parameters);         // HTTP call
    return FilterAndSplitRequestIntoBatches(request);
}

private IEnumerable<Request> FilterAndSplitRequestIntoBatches(IEnumerable<Request> request)
{
    return request.Items.
        Where(FilterFunction).
        Chunks(50).
        Select(chunk => new Request { Items = chunk.ToArray() });
}

private async Task<IEnumerable<Item>> RunBatchedRequest(Request request)
{
    VendorResponse response = await ActuallyRunRequest(request);    // HTTP call

    // Deserialize using System.Xml.Serialization.  Item is automatically generated, 
    //   and hand-coding a parser isn't feasible.
    // response.ActualResponse is XML embedded in a SOAP response.
    return DeserializeRequest(response.ActualResponse);
}

public static class AsyncEnumerableUtils
{
    public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(this Task<IEnumerable<T>> task)
    {
        return new AsyncEnumerable<T>(async yield =>
        {
            IEnumerable<T> items = await task;

            foreach (T item in items)
            {
                await yield.ReturnAsync(item);
            }
        });
    }
}

The code I'm working with is an ETL process that accesses a vendor's system. Due to requirements on their side, I can only get batches of 50 records at a time. I'm moving to AsyncEnumerable due to an issue where running a large request caused an out-of-memory error. (I had been deserializing all the responses into an array at once.) My main purpose for using it is to retrieve batches asynchronously; that is, I want to use an IEnumerable-alike that will asynchronously pull batches as needed. I'm using SelectMany to flatten the batch responses to be handled by the transform and load steps. GetBatchedRequests is turned into an IAsyncEnumerable because that way of doing it makes more sense to me than returning a Task<IAsyncEnumerable<T>>.

Item in this case is one of several classes generated by xsd.exe, based on a schema from outside my company. All of the possible classes are very large, so hand-coding a parser for any of them isn't possible at the moment. To make things worse, Item comes from XML embedded in a SOAP response (which I access through a service reference), so streaming that would require me to manually write a SOAP client as well. On the plus side, the 50-record limit means that deserializing a single response is pretty trivial.

I'd appreciate any ideas you might have on how I can improve this. For now, I'll most likely just use that extension method to deal with my IEnumerables.

kind-serge commented 6 years ago

I think you can simplify the problem, where don't actually need that feature, and where you don't need SelectMany at all. I would just hide batching in such way:

public IAsyncEnumerable<Item> ExtractAsync(object[] parameters) =>
new AsyncEnumerable<Item>(async yield =>
{
  // Produce request batches.
  foreach (var batchRequest in GetBatchedRequests(parameters))
  {
    // Fetch items in a batch (pre-load a batch into memory).
    var items = await RunBatchedRequest(batchRequest);
    // Yield items.
    foreach (var item in items)
      await yield.ReturnAsync(item);
  }
});

P.S. this is the right streaming approach, and that what we do with Azure Table Storage at a company where I'm currently working at - get a small batch (an async operation) and yield pre-fetched items.

julian-goldsmith commented 6 years ago

Thanks, @tyrotoxin. I had been trying to use ToAsyncEnumerable and SelectMany to try to simplify my code, but it seems like it had the opposite effect. I like your approach better.