reactiveui / refit

The automatic type-safe REST library for .NET Core, Xamarin and .NET. Heavily inspired by Square's Retrofit library, Refit turns your REST API into a live interface.
https://reactiveui.github.io/refit/
MIT License
8.59k stars 747 forks source link

feature: Support IAsyncEnumerable as a return type #1299

Open sveinungf opened 2 years ago

sveinungf commented 2 years ago

Is your feature request related to a problem? Please describe. It would be nice if Refit would support streaming deserialization of an API response. This could be useful for endpoints that return a large amount of data, since it could avoid buffering the whole API response in memory. It can also allow the consumer to start enumerating the items before the whole API response has been received.

Describe the solution you'd like Refit is now using System.Text.Json, and with .NET 6, System.Text.Json got support for streaming deserialization. Using it could be as easy as having an IAsyncEnumerable as the return type in an interface:

public interface IApi
{
    [Get("/items")]
    IAsyncEnumerable<Item> GetItems();
}

Describe alternatives you've considered Using Task<IEnumerable<Item>> as the return type, but this will buffer the whole API response in memory.

liven-maksim-infotecs commented 1 year ago

Hello guys! What's with status? Any progress?

brumlemann commented 1 year ago

I just got to a point where I needed this too. Say you have a big collection of items returned from the API call. No need to manifest the whole thing in memory in many cases. You just want to spin through the collection and do sometimes for each, async.

This I think could be a huge performance benefit.

Thinking a bit more about it, wouldn't you rather have an

IApiResponse<IAsyncEnumerable<T>>

signature (to inspect the response for faulty situations before trying to consume the observable)? Would that be a problem?

anaisbetts commented 1 year ago

since it could avoid buffering the whole API response in memory

I'm not sure if I understand what this would actually represent internally - what would back the IAsyncEnumerable<T>? The result itself is not being streamed, and there is no standard way to incrementally parse JSON (without constraining it somehow like JSON Lines or sth), so this would effectively be a Task<Lazy<List> - a saved byte buffer that, on request, gets parsed.

Ghost4Man commented 1 year ago

Streaming deserialization of root-level JSON arrays is already supported by System.Text.Json (the announcement is linked above in the OP) when using JsonSerializer.DeserializeAsyncEnumerable. But IIRC, when I last tried this in Blazor WASM, I had to enable response streaming for each request (using request.SetBrowserResponseStreamingEnabled(true); and HttpCompletionOption.ResponseHeadersRead^1) with a custom DelegatingHandler.

brumlemann commented 1 year ago

since it could avoid buffering the whole API response in memory

I'm not sure if I understand what this would actually represent internally - what would back the IAsyncEnumerable<T>? The result itself is not being streamed, and there is no standard way to incrementally parse JSON (without constraining it somehow like JSON Lines or sth), so this would effectively be a Task<Lazy - a saved byte buffer that, on request, gets parsed.

Quick chat with GPT gave the following response. I have NOT checked it for halluzinations ;-)

`using System; using System.Collections.Generic; using System.IO; using System.Net.Http; using System.Text.Json; using System.Threading.Tasks;

public async IAsyncEnumerable GetItemsAsync(string apiUrl) { using var httpClient = new HttpClient(); using var response = await httpClient.GetAsync(apiUrl);

if (!response.IsSuccessStatusCode)
{
    throw new Exception($"Failed to fetch data. Status code: {response.StatusCode}");
}

using var stream = await response.Content.ReadAsStreamAsync();
using var jsonReader = new Utf8JsonReader(stream);

while (await jsonReader.ReadAsync())
{
    if (jsonReader.TokenType == JsonTokenType.StartObject)
    {
        // Deserialize the JSON object as T and yield it.
        var item = JsonSerializer.Deserialize<T>(ref jsonReader);
        yield return item;
    }
}

}

Just for ideas ;-)`

anaisbetts commented 1 year ago

@Ghost4Man Oops, I missed that - that is interesting though I can imagine that a Correct implementation might be Weird; it seems like that's a better fit for IObservable to me (since you can't really control the upstream stream of data via not requesting items, it's Showing Up whether you like it or not - not requesting items would simply result in buffering), though I know that IAsyncEnumerable is the New Hotness that people are excited about.

IO<T> might also be tricky from a type system perspective because the type you're deserializing is fundamentally List<T> but the type you'd return would not be IO<List<T>>, it'd be IO<T>.

SWarnberg commented 10 months ago

Not sure I understand above comments. The following code with native HttpClient works well to receive streamed content without buffering. Should be easy to implement in Refit too.

  using var client = new HttpClient();
  IAsyncEnumerable<Item?> result = client.GetFromJsonAsAsyncEnumerable<Item>(uri);
  await foreach (var itm in result)
  {
    if (itm != null)
      Console.WriteLine($"Returned item {itm.Id}.");
  }
anaisbetts commented 10 months ago

I don't really see the value of being to asynchronously parse JSON items, it really seems like a hyper-small optimization. If someone has a practical example where their App is Extremely Difficult To Write without this feature I'd reconsider it, but otherwise I think this is WONTFIX

voroninp commented 10 months ago

If my application transforms response items to objects of different type but the output is relatively large, it's better to convert each item separately rather than loading the whole payload and only then applying the transformation.

SWarnberg commented 10 months ago

This is a very strange comment and fast closing of something that in some use cases are extremely useful. We're talking about client-side support for Streaming API's. We're not talking about async loading of json into a collection and store in memory. Maybe you should update yourself on streaming api's and IAsyncEnumerable @anaisbetts?

Of course we can cope without this support in refit since there is good support in HttpClient, it's just unfortunate to mix different clients frameworks..

anaisbetts commented 10 months ago

**Maybe you should update yourself on streaming api's and IAsyncEnumerable @anaisbetts?

Do you....think that I don't know what these things are? 🙃

Looking at the implementation of GetFromJsonAsAsyncEnumerable, there are two possible benefits here:

  1. You could save a small amount of memory - i.e. instead of using [Size of Response as Bytes] + [Size of JSON as C# objects], you could, in an extremely optimal scenario, allocate [Size of a single Buffer frame] + [Size of JSON as C# objects]. This would require you to be in absolute perfect lockstep (with absolutely no way of ensuring it is the case), since if you fell behind calling MoveNext on the IAsyncEnumerator even a little bit, you will start to Buffer the response. This is largely Not Worth It in my opinion. If your scenario is that performance-sensitive that response buffer sizes are a concern, you would do better hand-writing the method anyways.

  2. This could add conditional cancellation to the Task-based API - i.e. you partially deserialize the result, decide "Eh, don't care", then you kill the IAsyncEnumerator by DisposeAsync'ing it, the underlying network request is cancelled. This is what GetFromJsonAsAsyncEnumerable does. This is at least slightly more compelling, though the IObservable API already supports cancellation via unsubscribing before the request completes (though not conditionally, you have to blindly cancel early). To be honest, while this is likely a Real benefit for someone, I am extremely skeptical that anyone will actually take advantage of it in practice, because you would have to Know About It, Need this Feature, then be smart enough to Dispose the enumerator early.

voroninp commented 10 months ago

Not Worth It in my opinion.

Why do you think it's that rare thing? It looks like a perfect case for ETL processing pipeline built with a chain of microservices.

anaisbetts commented 10 months ago

@voroninp I don't think that this scenario is rare, but I do think that it's rare that you are so short on memory that you cannot afford to buffer the response and parse it all at once. I think this optimization might be Intellectually Satisfying but ultimately not really meaningful in real-world apps.

voroninp commented 10 months ago

I do think that it's rare that you are so short on memory that you cannot afford to buffer the response and parse it all at once.

In theory enumerable can be infinite. I mean, if we speak about streaming, the stream can potentially contain a gazillion of items without any chance to put them all into memory.

Also, I see Refit as a nice library to be consumed from Blazor. And saving on memory allocations there is very important for performance.

Ghost4Man commented 10 months ago

I would expect the common use-case to be to use this as a simple way to display/process individual items in real-time (as soon as they arrive), not waiting for the whole response which might take very long to finish (results of a complicated search/query, results from a test runner, etc.).

Line- commented 10 months ago

My use-case is I might be receiving very large amounts of data, and it's very useful to be able to have subsequent requests pick up from where we leave off when the initial request fails due to poor internet.

anaisbetts commented 10 months ago

it's very useful to be able to have subsequent requests pick up from where we leave off when the initial request fails due to poor internet.

Now that's interesting, can you show an example of how that works in code with HttpClient?

Line- commented 10 months ago

it's very useful to be able to have subsequent requests pick up from where we leave off when the initial request fails due to poor internet.

Now that's interesting, can you show an example of how that works in code with HttpClient?

There's nothing special to it, the example that @SWarnberg wrote above covers it - the fact we're able to iterate on results as we receive them means if I'm retrieving 5 million objects but get disconnected, if the last one I've retrieved was number 1,500,000 then when I regain connectivity I can make a new API request that only asks for the remaining items, so every request can make some progress.

anaisbetts commented 10 months ago

I'll reopen this since people have come up with some compelling use-cases

JohnGalt1717 commented 9 months ago

The obvious case for this is OpenAI/Azure OpenAI. The mime-type will be application/x-ndjson.

When it is x-ndjson as a mime type then it should make the call and stream the result parsing each of the lines as ndjson.

There's already an extension on the HttpClient for a get that does this. But here's the equivalent with a post:

    public static async IAsyncEnumerable<TResponse> PostFromJsonAsAsyncEnumerable<TRequest, TResponse>(this HttpClient client, string requestUri, TRequest request, [EnumeratorCancellation] CancellationToken cancellationToken = default) {
        var requestMessage = new HttpRequestMessage(HttpMethod.Post, requestUri);

        using var ms = new MemoryStream();
        await JsonSerializer.SerializeAsync(ms, request, JsonSerializerSettings.Options, cancellationToken: cancellationToken);
        ms.Position = 0;

        using var requestContent = new StreamContent(ms);
        requestMessage.Content = requestContent;

        requestContent.Headers.ContentType = new System.Net.Http.Headers.MediaTypeHeaderValue("application/json");
        requestMessage.Headers.Add("Accept", "application/json");

        using var response = await client.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, cancellationToken);

        response.EnsureSuccessStatusCode();

        var items = response.Content.ReadFromJsonAsAsyncEnumerable<TResponse>(cancellationToken);

        await foreach (var item in items) {
            if (cancellationToken.IsCancellationRequested)
                throw new TaskCanceledException();
            if (item is null)
                continue;

            yield return item;
        }
    }

I would expect that this specific use case is going to come up a lot as people start using AI and want to stream the response back.

Here's the Refitter equivalent request for automatic generation:

Refitter Issue

anaisbetts commented 9 months ago

@JohnGalt1717 I would argue that the OpenAI streaming case would actually be better served by IObservable, and detecting application/x-ndjson and being able to stream results would definitely be a PR I'd be interested in

JohnGalt1717 commented 9 months ago

Well it really isn't an observable. It's an IAsyncEnumerable that runs until completion. It's the identical case to when you return IAsyncEnumerable for MinimalApis right now. They just don't set the content type correctly to application/x-ndjson as they should despite that being exactly what happens and exactly what the extension method on HttpClient assumes its getting.

It isn't like, later, much longer down the road, OpenAI will then go and pump something else into the Iasyncenumerable like you could with say GRPC server streams. It runs until done and then is closed. IObservable really is only for the later case like GRPC not for a stream of messages that you know ends.

So I'd suggest that this just handles the IAsyncEnumerable case properly and doesn't really care about the content type, and just makes the assumption that it's ndjson despite the content type not being set in some cases. It should just map to the default Get extension method, and should implement the suggested Post version that I gave for post/put and call it a day because that's literally what you'd expect a client to have done if you were to write it by hand.

Line- commented 4 months ago

Is this still planned to be added/supported in the future?