Closed davidfowl closed 5 years ago
Formatting and conneg could also come into play here (but I'm not sure how useful it is). A client could request that they wanted JSON streaming or XML streaming (does anyone do that?).
Conneg isn't often used in new apps to choose between XML and JSON, it's used more frequently nowadays associate a data type with a media type. yeah that's not really content negotiation... But I digress, in general media types matter to developers and clients more than they did in the bad old days of XML or JSON 😆
IObservable
doesn't do back pressure so at some point threads would get blocked. Do we try to push for IAsyncEnumerable as the model? What about Channel (that supports back pressure)? (corefxlab)
The backpressure is the only hard part of this. IObservable<>
isn't a big problem because it's possible to build backpressure with a custom IObserver<>
implementation. I reckon that's where we'd start and we can build the two others on top of that plumbing.
IMO IEnumerable<>
isn't really solvable without blocking a thread. I think IObservable<>
is a much better fit for the programming style you want anyway.
The backpressure is the only hard part of this. IObservable<> isn't a big problem because it's possible to build backpressure with a custom IObserver<> implementation. I reckon that's where we'd start and we can build the two others on top of that plumbing.
As long as we don't have to build that, I'm with you. Rx comes with throttling and all kinds of crazy observable combinators out of the box. We should play with them to see what users would need to do if they wanted this feature. I don't want us to build a scheduler implementation into MVC that handles back pressure when all we really need is a Task returned from OnNext 😄 (but I digress).
IMO IEnumerable<> isn't really solvable without blocking a thread. I think IObservable<> is a much better fit for the programming style you want anyway.
IEnumerable is an interesting one and it's very similar to what IAsyncEnumerable
might become so I wouldn't dismiss it at all really. You write an action method that just yields results as they come back from a database call that should just work (if you marked the action as Streaming of course).
public class MyStreamingApi : Controller
{
[HttpGet("/tweets")]
[Streaming]
public async IAsyncEnumerable<Tweet> Get()
{
while (var tweet = await GetOneTweet())
{
yield return tweet;
}
}
}
It seems like you're talking about what developers would want to write vs I'm talking about what we would have to do support it.
IObservable<>
just requires a custom IAwaitable
hooked up to OnNext
.
IEnumerable<>
is really bad for us because MoveNext()
is a blocking call.
IEnumerable<> is really bad for us because MoveNext() is a blocking call.
Yeah, that's what I found with SignalR exploration. We'd have to spin up a thread just to iterate the enumerable. IAsyncEnumerable<>
is really the desired API.
@rynowak is this duplicate for issue #6199?
Yep. This one has more detail though.
Of course, so we need to close the other one
So the intention of this to allow something such as an SSE Event Source to be implemented as a short lived action that returns an object which is then registered in a background service somewhere and has updates pushed to it?
I currently have SSE implemented by writing to HttpContext.Response.Body
, and blocking on a SemaphoreSlim.WaitAsync()
+ delivery queue, which loops until the connection closes. IAsyncEnumerable
would be ideal for my use case, IObservable
would be workable but would require jumping through some hoops, for example I can't imagine an efficient implementation where a background thread isn't constantly awaiting data on some queue.
So the intention of this to allow something such as an SSE Event Source to be implemented as a short lived action that returns an object which is then registered in a background service somewhere and has updates pushed to it?
Yes
IObservable would be workable but would require jumping through some hoops, for example I can't imagine an efficient implementation where a background thread isn't constantly awaiting data on some queue.
I think the heavy lifting would be on our side to build this. As long as we have something to await
(which we would build) it should be no worse than IAsyncEnumerable<>
from a perf point of view. The other reason I'm keen on IObservable<>
is that it already exists.
Also this isn't either/or, we'd do both (assuming IAsyncEnumerable<>
makes it into the framework) or make it trivial to build the IAsyncEnumerable<>
version outside of the core if it doesn't.
I'm new to ASP.NET Core, so excuse me if I'm stating something obvious...
Have you considered exception handling for action methods implemented as iterators? The "prologue" (code path from the beginning of the method to the first yield return
) of an iterator method does not execute until IEnumerator.MoveNext()
is called for the first time.
For example:
public class MyController : Controller {
public IEnumerable<string> GetValues() {
throw new InvalidOperationException("Test exception."); // Imagine this is a database connection/query, or some kind of service call that might occasionally fail.
yield return "Fred";
yield return "Tom";
}
}
You'd probably want this to fail immediately, not when the HTTP response has already started streaming to the Web browser. If ASP.NET Core starts iterating after it has already sent headers to the Web browser, there may not be a sensible way to handle any "prologue" exceptions that late.
To remedy that, you could eagerly perform the iteration over the first element...
public static class EnumerableExt {
public static IEnumerable<T> EagerFirst<T>(this IEnumerable<T> enumerable) {
using (var enumerator = enumerable.GetEnumerator()) {
if (enumerator.MoveNext())
return EagerFirstGeneric(enumerator.Current, enumerator);
return Enumerable.Empty<T>();
}
}
static IEnumerable<T> EagerFirstGeneric<T>(T first, IEnumerator<T> enumerator) {
yield return first;
while (enumerator.MoveNext())
yield return enumerator.Current;
}
public static IEnumerable EagerFirst(this IEnumerable enumerable) {
var enumerator = enumerable.GetEnumerator();
using (enumerator as IDisposable) { // It's strange IEnumerator doesn't inherit from IDisposable (the way IEnumerator<T> does), but it almost always disposable at the run-time.
if (enumerator.MoveNext())
return EagerFirstNonGeneric(enumerator.Current, enumerator);
return Enumerable.Empty<object>();
}
}
static IEnumerable EagerFirstNonGeneric(object first, IEnumerator enumerator) {
yield return first;
while (enumerator.MoveNext())
yield return enumerator.Current;
}
}
public class EagerFirstFilter : IActionFilter {
public void OnActionExecuting(ActionExecutingContext context) {
}
public void OnActionExecuted(ActionExecutedContext context) {
if (context.Result is ObjectResult object_result) {
if (object_result.Value is IEnumerable<object> enumerable_t)
object_result.Value = enumerable_t.EagerFirst();
else if (object_result.Value is IEnumerable enumerable)
object_result.Value = enumerable.EagerFirst();
}
}
}
public class Startup {
public void ConfigureServices(IServiceCollection services) {
services.AddMvc(options => options.Filters.Add(new EagerFirstFilter()));
}
}
We currently use the above code under ASP.NET Core 1.1, to make handling database exceptions in our ADO.NET-based action methods more robust. Essentially, if the connection and query execution have passed without exception (this is our "prologue"), it is extremely unlikely we'll get any exception while fetching the rows.
I'm wondering if something like above should be built-into ASP.NET Core, to avoid "gotchas" when users implement their action methods using yield return
?
@branko-d Can you file another issue for this? It's not exactly related to Streaming and there's already a standard way to eagerly evaluate an IEnumerable<T>
in .NET (Use ToList or ToArray()).
@davidfowl This is about eager evaluation of the fist iteration, not the entire sequence.
ToList
or ToArray
will materialize the entire resultset in memory, which is the opposite of "streaming". The idea here is to open a database connection, execute a query and stream the rows to the client, while still having proper exception handling (by performing the first iteration eagerly).
I don't know if you are aware of it, but databases routinely "stream" query results, i.e. they don't materialize the entire resultset in memory, but instead walk the indexes as the client fetches the rows. This is not always possible, obviously, but the DBMS will automatically do it when it is. Some DBMSes will even allow you to tell them whether query planner should optimize for that case (e.g. Oracle's FIRST_ROWS vs. ALL_ROWS query hint).
Being able to implement an MVC action method via yield return
(as opposed to ToList
or ToArray
) would mean that we can have true streaming of database rows through all layers.
That being said, I'll happily create a new issue if you think it's better that way. :)
@branko-d how well does that work without asynchrony? What does your controller actually look like? What you mention is very loosely related to streaming results to the client. Error handling is a good topic to elaborate on here but using IEnumerable<T>
to stream is a really bad idea (which we talk about in this issue).
@davidfowl We are not sure at this point. Our (admittedly very simplistic) initial tests suggest that asynchronous ADO.NET (ExecuteReaderAsync
, ReadAsync
) is only 10-20% more CPU-intensive than synchronous, but much more memory-intensive (3-4 times more allocations), which will pressure GC at some point.
While I can understand why synchronous operation would be problematic on a typical Web site, we have a different load - relatively few, relatively "fat" clients (our system will work with Mechanical CAD data, with up to few hundred engineers accessing it). I would expect the database will become bottleneck long before middle tier thread exhaustion does, but we don't know for sure at this point.
To answer your question, our action methods (will) look essentially like this (pseudocode):
public IEnumerable<SomeObject> GetSomeObjects(string some_param) {
using (var conn = new SqlConnection(SomeConnectionString)) {
conn.Open();
using (var tran = conn.BeginTransaction()) {
using (var cmd = conn.CreateCommand()) {
cmd.CommandText = "SOME_STORED_PROCEDURE";
cmd.CommandType = CommandType.StoredProcedure;
cmd.Parameters.AddWithValue(nameof(some_param), some_param);
using (var reader = cmd.ExecuteReader())
while (reader.Read())
yield return new SomeObject(reader);
}
tran.Commit();
}
}
}
What would be an asynchronous alternative to this, without having to materialize the entire resultset in memory?
Hijack? The reversed distributed Rx-over-web scenario of a client pushing a (non-file-data) multipart stream to the WebApi (as an observer) would also be of significant benefit since this currently requires web-sockets or message-bus infastructures. I'm talking about simple data streams from a device, client initiated, being pushed to the server over a short time period, similar to an multi-part file upload. For example the server-side API corresponding to https://github.com/paulcbetts/refit multi-part uploads, or perhaps a simple Rx-Overloading of https://msdn.microsoft.com/en-us/library/system.net.http.multipartstreamprovider(v=vs.118).aspx which we are looking into. Or perhaps something exists already that we have missed?
@PtwHub looking for something like this https://github.com/aspnet/SignalR/issues/515?
Indeed. This looks like a good fit. Thanks. Looking forward to it.
Is there anything in asp.net core 2.0 what would allow to repeat PushStreamContent's functionality?
Is there anything in asp.net core 2.0 what would allow to repeat PushStreamContent's functionality?
This is trivial to do with MVC today, you can just write to the response body directly from the controller.
Is there anything in asp.net core 2.0 what would allow to repeat PushStreamContent's functionality?
This is trivial to do with MVC today, you can just write to the response body directly from the controller.
@davidfowl will I be able to modify headers after I start writing to response body?
@davidfowl will I be able to modify headers after I start writing to response body?
No
@rynowak, is this something we will do in 2.2?
Move to 3.0
@davidfowl really doesn't want me to win the argument over IObservable
vs foreach async
. I'd be happy to win that argument in 3.0
@rynowak if it's any consolation I really want this feature and plan to convert it to a iobservable when using it if you don't do it here
@cdebergh - if you're interesting trying something like this out, you would probably want to start by implementing a method that returns either IEnumerable<Task<T>>
or IObservable<T>
. If you want to go the task route, you'll need to make each Task<T>
wait until the next chunk is available.
Then implement an output formatter that iterates and processes each chunk as it's available - but the key is that WriteAsync
here needs to return a Task
that won't complete until you've written all of the chunks.
IObserable is for multicast and doesn’t fit here well support IAsyncEnumerable and there’ll be a method to go from one to the other that will describe the policy to apply when backpressure needs to be applied.
I’ll spar with @rynowak later 😆
Cool I'm looking forward to have a way to convert IAsyncEnumerable
to IObservable
for this feature.
That’s the wrong way 😁
Looking forward to the right way!
Is there anything in asp.net core 2.0 what would allow to repeat PushStreamContent's functionality?
This is trivial to do with MVC today, you can just write to the response body directly from the controller.
@davidfowl
That's true if you write to the response. But what if I want to push data to the request, sending a stream of data to the endpoint I'm calling without knowing how much data it will be.
The request will not return until it completed, which is where the callback of PushStreamContent comes in.
I haven't found another way then re-implementing that content with the callback myself. It seems to work fine though. That being said, is there any better way doing that?
To give some more information about my use case: I have 3 services. A coordinator, initializing the data processing, sits in the middle. It reads a stream of data from service A, modifies/validates the data and then sends it to service B. I'm using Threading.Channels on all 3 services to buffer, process the data and actually scale the processing and then simply read/write bytes to the response or request stream. It is batch processing. No NRT needed for this use-case. It will process GBs of data and is running minutes to hours.
But what if I want to push data to the request, sending a stream of data to the endpoint I'm calling without knowing how much data it will be.
Are you talking about the client side? If yes, then you need PushStreamContent
. This bug isn't about the client, it's about the server.
PushStreamContent
is included in the ASP.NET Core applications by default.
@davidfowl
Are you talking about the client side?
Technically it is server to server (3 kestrel (micro)services).
I basically have to stream data in both directions A <= B => C
.
B reads from A, then writes to C
PushStreamContent is included in the ASP.NET Core applications by default.
Is it? Didn't find it on netcoreapp2.0 using AspNetCore.All
HttpClient is what I mean when I say the client. This is the MVC repository, there are features added here for HttpClient. That would be on corefx.
Is it? Didn't find it on netcoreapp2.0 using AspNetCore.All
It's in 2.1. You can manually add https://www.nuget.org/packages/Microsoft.AspNet.WebApi.Client/.
@davidfowl ah gotcha, yeah HttpClient. Got confused for a sec. Too many byte arrays in my head.. Thanks!
Anyways, I responded here because I'm really looking forward to see with what design you guys come up with regarding streaming data in MVC. That's basically the main thing I'm (or my team) is currently struggling with the most => transferring a lot of data from service to service. We are using ASP.NET MVC Core all the way since the beginning ;)
My 2c on your bullet points in the initial post
Formatting => Json/Xml. De/Serialization is the most limiting factor for performance. If the out of the box design leave that part up to the user, that would be fine. I'd prefer message pack for example. Otherwise, consider de/serialization of small batches so that compression actually has effect. Usually, serializing a small list of 1000 items at a time is much faster than serializing 1 at a time.
Do we want to only allow streaming results or do we want to handle streaming input as well (chunked transfer encoding both ways). => please both directions ;)
back pressure is a must for sure.
Channel
for real world apps, you need a good logging monitoring of whats going on. E.g. I'm sending metrics of each part to AI
For 3.0, we're doing a fairly scoped change viz to add support for IAsyncEnumerable<T>
in MVC. Using https://github.com/aspnet/AspNetCore/issues/11558 to track further work based on the initial issue and ideas suggested in this issue.
Edit: @rynowak hijacking top post for great justice
This would include adding support for returning
IAsyncEnumerable<>
from an action method and letting MVC do the buffering for you before giving the data to the formatter.This would require integrating with JsonResult and ObjectResult