dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.19k stars 4.72k forks source link

Introduce abstractions for buffer managed reading and writing #28884

Closed davidfowl closed 2 years ago

davidfowl commented 5 years ago

The goal here is to introduce a minimal abstraction into the stack that supports asynchronous buffered reading and asynchronous buffered writing. System.IO.Pipelines would implement these interfaces and various places in the stack that wanted to support buffered reading and writing would take these as arguments. Exposing them as interfaces also means that we can potentially implement these on specific Streams that implemented buffering behavior.

namespace System.IO
{
    public interface IAsyncBufferedReader<T>
    {
        ValueTask<ReadResult<T>> ReadAsync(CancellationToken cancellationToken = default);
        void AdvanceTo(SequencePosition consumed, SequencePosition examined);
    }

    public interface IAsyncBufferedWriter<T> : IBufferWriter<T>
    {
        ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default);
        ValueTask FlushAsync(CancellationToken cancellationToken = default);
    }

    public readonly struct ReadResult<T>
    {
        public ReadResult(in ReadOnlySequence<T> buffer, bool isCompleted)
        {
            Buffer = buffer;
            IsCompleted = isCompleted;
        }

        public ReadOnlySequence<T> Buffer { get; }
        public bool IsCompleted { get; }
    }
}

Here's an example of what the JsonSerializer.ReadAsync<T> would look like with this overload:

private static async ValueTask<TValue> ReadAsync<TValue>(IAsyncBufferedReader<byte> utf8Json, Type returnType, JsonSerializerOptions options = null, CancellationToken cancellationToken = default)
{
    if (options == null)
        options = s_defaultSettings;

    ReadObjectState current = default;
    JsonClassInfo classInfo = options.GetOrAddClass(returnType);
    current.ClassInfo = classInfo;
    if (classInfo.ClassType != ClassType.Object)
    {
        current.PropertyInfo = classInfo.GetPolicyProperty();
    }

    var readerState = new JsonReaderState(options: options.ReaderOptions);
    List<ReadObjectState> previous = null;
    int arrayIndex = 0;

    ReadResult<byte> result;
    do
    {
        result= await utf8Json.ReadAsync(cancellationToken).ConfigureAwait(false);
        buffer = result.Buffer;

        ReadCore(
            ref readerState,
            returnType,
            result.IsCompleted,
            buffer,
            options,
            ref current,
            ref previous,
            ref arrayIndex);

        utf8Json.AdvanceTo(readerState.Position, buffer.End);

    } while (!result.IsCompleted);

    return (TValue)current.ReturnValue;
}

Notes:

cc @stephentoub @terrajobst @ahsonkhan @jkotas @pakrym @jkotalik @KrzysztofCwalina

benaadams commented 5 years ago

They'd become the base classes to PipeWriter and PipeReader?

pakrym commented 5 years ago

The semantics of ReadAsync match those of Stream, an empty ReadOnlySequence means end of data and cancellation throws an operation cancelled exception

What if the reader doesn't consume the incomplete data? It would be impossible for it to know that no more data would ever arrive.

We can throw if advance == End and no more data would be available.

jkotas commented 5 years ago

these could be interfaces

Yes, I think it would be better for these to be interfaces. We have started on this path with IBufferWriter already.

we have default interface methods now

I am not sure whether we are ready to start using default interfaces in the BCL itself in 3.0. We had several discussions about that and it looked like a minefield.

examined, determines how much of the buffer was observed by the caller

What is the examined used for?

Overall, this direction looks reasonable to me. Thanks for writing it down!

pakrym commented 5 years ago

What is the examined used for?

It's used by the reader to request more data. If ReadOnlySequence was examined to the end (even if it wasn't consumed) ReadAsync has to wait for more data to arrive. It's also used for backpressure - we limit the maximum amount of unobserved data that's stored in the Pipe to avoid using too much memory.

davidfowl commented 5 years ago

consumed - controls how of the returned buffer you're done with (don't show me that data when I call ReadAsync again) examined - controls how much of the buffer was observed by the caller.

Examined is commonly either the same as consumed or the end of the buffer. See the issue text for an example of what the JSON serializer would look like.

stephentoub commented 5 years ago

We should think it through more, but at first glance these interfaces seem reasonable to me.

we could potentially even implement them on Streams

For this to happen, I personally see that as a requirement rather than potential :wink: For the last almost two decades, Stream has been the exchange type for processing streams of data in .NET, shows up in tons of APIs as the type for doing so, and I want to avoid us causing a lot of friction by bifurcating APIs, forcing developers to pick what overloads they need to expose, etc. I'd rather be in a position where existing APIs continue to just work but could "light up" if appropriate, detecting these interfaces on a stream and if they exist and if that style of data production/consumption maps better to their model, using it as an alternate path. I'd prefer that for new APIs going forward as well: you're writing a component that deals with streams of data, and you just take a Stream... in addition to tweaking your algorithm based on CanSeek and so on, it can also be tweaked based on whether the stream is IAsyncBufferedReader/Writer, with the algorithm potentially then using the interface implementations instead of or in addition to the public virtuals on the stream.

It does raise a few questions:

As part of this proposal, could you enumerate the public streams in coreclr/corefx that would implement these, and share how you think that would best happen? I also assume these implementations would be explicit such that the only new "surface area" would be that the type implements the interface. Any types beyond streams and PipeReader/PipeWriter that would implement these, and if so, could you enumerate those as well?

It's also used for backpressure - we limit the maximum amount of unobserved data that's stored in the Pipe to avoid using too much memory.

I don't understand this part. IBufferWriter's design effectively requires you not to have back pressure, unless that back pressure is applied synchronously. For it to be applied asynchronously, GetMemory and/or Advance would also need to be asynchronous. Otherwise, I can just keep calling GetMemory+Advance without calling FlushAsync on the IAsyncBufferedWriter, and I can build up a lot of unobserved data (not even published to a consumer). You could of course throttle the producer when they then call FlushAsync, but by then it's too late. That makes me question whether that's really the right interface to be exposed for writing?

IAsyncBufferedWriter : IBufferWriter

Is it on purpose that this is "Buffered" but the base is "Buffer"? I can see the logic for how that makes sense, it's just a bit odd at first.

public readonly struct ReadResult

Do we need this? Just from inspection it looks like it'd weigh in at around 40 bytes on 64-bit, which is a fairly meaty struct to be copying around. Is it worth it? Though, the bulk of that is ReadOnlySequence<T> and it's not clear how you'd avoid handing that back. My main complaint here is that this will end up non-trivially increasing the size of any state machine where one of these is awaited.

stephentoub commented 5 years ago

cc: @geoffkizer

jkotas commented 5 years ago

Would the base Stream implement these?

For this option, I think we would want to introduce bool CanProvideBuffers { get; } property that returns false and the default implementations of the other methods on base Stream would throw InvalidOperationException. Same model as other CanXXX capabilities of Streams.

stephentoub commented 5 years ago

For this option, I think we would want to introduce bool CanProvideBuffers { get; } property that returns false and the default implementations of the other methods on base Stream would throw InvalidOperationException. Same model as other CanXXX capabilities of Streams.

Sounds reasonable as well. Then presumably these interface implementations would be virtuals on the base.

davidfowl commented 5 years ago

For this to happen, I personally see that as a requirement rather than potential 😉 For the last almost two decades, Stream has been the exchange type for processing streams of data in .NET, shows up in tons of APIs as the type for doing so, and I want to avoid us causing a lot of friction by bifurcating APIs, forcing developers to pick what overloads they need to expose, etc. I'd rather be in a position where existing APIs continue to just work but could "light up" if appropriate, detecting these interfaces on a stream and if they exist and if that style of data production/consumption maps better to their model, using it as an alternate path.

I've been thinking about this for a while and I don't think it's as bad as we think. I recall there being 2 concerns in the initial pipelines discussion:

  1. We need to add PipeReader/PipeWriter versions of the Streams
  2. We need to expose PipeReader/PipeWriter where ever we expose Streams
    • We need to add overloads to methods or constructors that take Stream
    • We need to expose additional properties on objects that expose Stream

With this proposal being interfaces, the first concern goes away, an existing Stream can choose to implement these interfaces and it'll be the same backing data source/algorithm exposed in 2 different ways.

For the second concern, exposing new properties are more problematic to me than adding new overloads of existing methods. Adding overloads to things that take Streams don't seem like that big a deal mainly because if a developer has a Stream they won't be confused as to what they need to pass t a new overloads (they won't even know it exists). We also don't have that many things in the BCL that take Stream as input. It's mainly the various Serializers and Deserializers and crypto APIs.

The 3rd thing that I don't love about the Stream downcast is that even new IAsyncBufferedReader aware API will always need this fallback logic. Ideally, I could take an IAsyncBufferedReader and turn a Stream into that by calling Stream.AsAsyncBufferedReader() or something to that effect. This sort of translation is much cleaner than the cast since the implementation can worry about a single interface and the caller can get a working implementation of an IAsyncBufferedReader out of the Stream all the time.

I'd prefer that for new APIs going forward as well: you're writing a component that deals with streams of data, and you just take a Stream... in addition to tweaking your algorithm based on CanSeek and so on, it can also be tweaked based on whether the stream is IAsyncBufferedReader/Writer, with the algorithm potentially then using the interface implementations instead of or in addition to the public virtuals on the stream.

I'm not actually sure about that. We could write adapters that expose the an IAsyncBufferedReader/Writer over the Stream easily and it would be a pretty efficient conversion that wasn't lossy. At that point it's a helper that does buffering on your behalf and cleans your code up 😄 .

Would the base Stream implement these or just particular concrete stream types? I think the answer would probably be the latter, for a few reasons. As interfaces, you want to be able to use these as markers, so that if you have an API that takes a Stream, you can query it for the interface and potentially take a different path if it exists. Also, as interfaces, it becomes more challenging to (partially) override the behavior a base class provides.

The more I thought about this the more I dislike it. I'd prefer if we added something like AsBufferedStreamReader/AsBufferedStreamWriter virtuals to Stream and the base implementation would return a new type that wrapped the Stream. This way code that uses the IAsyncBufferReader/IAsyncBufferWriter never tries to interleave calls to both the Stream and the IAsyncBufferReader/IAsyncBufferWriter. Stream implementations that are enlightened can simply return this so we get the best of both worlds.

How would a Stream behave if its new interface implementation's read methods were intermixed with its existing ones? This also ties back in to the above question and further pushes for it to not be on the base. If it were on the base, the base would need a default implementation, which wouldn't mix particularly well with the existing methods if you used these methods to do a partial read, leaving buffered data known only to the base, and then used the existing methods to read. By having these as interfaces implemented only on the concrete types, those types would be responsible for providing the implementations and could then keep the implementations in sync with the existing ones.

See the above. It feels like a much cleaner model.

Which streams in the framework would we implement these on? For IAsyncBufferedWriter, MemoryStream seems like a decent match... beyond that, it's not clear to me. You could imagine BufferedStream, FileStream, and other streams that have internal buffers implementing the interface as well, but other than MemoryStream, most other such streams today use a fixed buffer size and flush automatically as part of WriteAsync when the buffer gets filled. This interface design instead calls for supporting unbounded buffering, where the buffers just keep growing and growing and growing (either via replacement or linking), so either we'd need to not support it on such types or enable such types with that capability... the latter is more flexible but potentially also complicated in some situations. For IAsyncBufferedReader, it would also require some thought. Obviously the streams returned from PipeReader/PipeWriter.AsStream() would implement these 😉

To start, I would implement IAsyncBufferedWriter on MemoryStream but I wouldn't do the other Streams pre-emptively. Yes the IBufferWriter interface basically gives you an ever growing set of memory that cannot be flushed to the underlying system. At least with the IAsyncBufferWriter flushes can happen inbetween at arbitrary points but that's actually a design point. The interface inverts who is responsible for the flushing, it cannot be done by the implementation of the IAsyncBufferWriter, instead, its the owners job.

As part of this proposal, could you enumerate the public streams in coreclr/corefx that would implement these, and share how you think that would best happen? I also assume these implementations would be explicit such that the only new "surface area" would be that the type implements the interface. Any types beyond streams and PipeReader/PipeWriter that would implement these, and if so, could you enumerate those as well?

I'll spend some time looking but since these interfaces are generic, we could potentially do something similar with TextReader and TextWriter (IAsyncBufferedReader<char> and IAsyncBufferWriter<char>)

I don't understand this part. IBufferWriter's design effectively requires you not to have back pressure, unless that back pressure is applied synchronously. For it to be applied asynchronously, GetMemory and/or Advance would also need to be asynchronous. Otherwise, I can just keep calling GetMemory+Advance without calling FlushAsync on the IAsyncBufferedWriter, and I can build up a lot of unobserved data (not even published to a consumer). You could of course throttle the producer when they then call FlushAsync, but by then it's too late. That makes me question whether that's really the right interface to be exposed for writing?

This sounds like a misunderstanding. @pakrym was explaining how the default pipe implementation uses examined (and PipeWriter does have a FlushAsync). The current design lets you allocate until you flush, and yes you may have written too much by then but it would be extremely inefficient to make GetMemory or Advance async (it's like saying new should be async 😄). One of the design philosophies of these new types were that parsing and writing should be synchronous and flushing should be async for performance reasons. If you've written too much then you won't be able to yield from flush until the underlying "device" has seen enough of that buffer. It's really no different to calling WriteAsync with a giant buffer after having allocated from the buffer pool yourself.

Is it on purpose that this is "Buffered" but the base is "Buffer"? I can see the logic for how that makes sense, it's just a bit odd at first.

Yes we can change it but i was bring pedantic. We're not reading from a buffer, we're reading buffered data.

Do we need this? Just from inspection it looks like it'd weigh in at around 40 bytes on 64-bit, which is a fairly meaty struct to be copying around. Is it worth it? Though, the bulk of that is ReadOnlySequence and it's not clear how you'd avoid handing that back. My main complaint here is that this will end up non-trivially increasing the size of any state machine where one of these is awaited.

It makes the API cleaner. There's a case where the buffer isn't drained but there's no more data left to be read. To avoid an infinite loop, you need to know that ReadAsync is "done" but there's unconsumed data. We'd have to throw an exception somewhere and I like giving that control to the reader. I understand it increases the state machine but does it matter if you're reading big chunks of data 😄 ?

davidfowl commented 5 years ago

For this option, I think we would want to introduce bool CanProvideBuffers { get; } property that returns false and the default implementations of the other methods on base Stream would throw InvalidOperationException. Same model as other CanXXX capabilities of Streams.

I really dislike it, but if it's something we add to Stream as well I'm fine. I really want these interfaces to exist, it gives the maximum flexibility with what we can do later on in terms of what types implement them.

davidfowl commented 5 years ago

I added a WriteAsync overload to the IAsyncBufferedWriter

simonferquel commented 5 years ago

One thing I dislike here is that Async Interfaces inherit from Sync interfaces. IMO doing Sync over Async or Async over Sync should always be an explicit thing. We can provide factories of IBufferedWriter from IAsyncBufferedWriter and vice-versa, but using them should be explicit.

stephentoub commented 5 years ago

I added a WriteAsync overload to the IAsyncBufferedWriter

How does that interact with GetMemory + Advance?

We also don't have that many things in the BCL that take Stream as input. It's mainly the various Serializers and Deserializers

There are ~300 public APIs across corefx that take a Stream as input. It's also not just about coreclr/corefx, but the ecosystem at large. There are two key aspects to exchange types. One is that they're in a place where everyone can use them. The other is that they're agreed upon as the thing that should be used. And that works best when there's one type for one concept. That's the world we've been in with Stream, and if we started promoting this interface in this fashion, we lose that, which I believe is a huge loss. I don't have a problem with the interfaces existing, but we should continue to provide the guideline that Stream is the exchange type. I don't want to see feedback to a component like, for example, Apache Arrow's C# implementation that it should start accepting these interfaces in addition to stream.

A component should be able to take a Stream and then decide how it wants to consume it. And I think it's great that this discussion could result in allowing a consumer of Stream to have additional buffer management provided to them, optimized further by a given concrete implementation. For PipeReader/Writer, its AsStream implementation can just forward to the interfaces on the reader/writer for basically no loss of efficiency if that's the desired consumption model, and can use Read/WriteAsync if that's the desired consumption model.

I'm also a little skeptical we're going to see a non-trivial number of implementations of these interfaces. Stream, PipeReader/Writer, … what else? That also suggests they shouldn't be pushed as such an exchange type.

Specifically for the JsonSerializer.Deserialize that caused this discussion, it can just accept Stream, and if it makes sense for it to go through the buffer-management layer, great, and if it doesn't, great, but that becomes an implementation detail that doesn't affect the public surface area, and that can be changed and tweaked over time. Meanwhile developers passing streams around as their exchange type get to continue doing so. And there's no confusion when you go to look at the overloads list and see Stream but also these other strange interfaces.

We could write adapters that expose the an IAsyncBufferedReader/Writer over the Stream easily and it would be a pretty efficient conversion that wasn't lossy

I'm not sure what this means. I like the idea of being able to use these APIs to help with buffer management as an opt-in thing, and whether as a base implementation or a wrapper we'd want to have a solid implementation that helped the developer with these concerns, but it's not always going to be the right thing to use.

I'd prefer if we added something like AsBufferedStreamReader/AsBufferedStreamWriter virtuals to Stream and the base implementation would return a new type that wrapped the Stream.

That sounds reasonable, as well; I think I'd be fine with that (though it'd be great to see if more fully fleshed out). That, however, also adds additional complication when you try to go the other direction: you've got one of these interfaces and you want to call something that takes a stream... what do you do? Wrap a stream around the reader/writer that wraps the stream? Presumably we'd provide helpers for that, too? Are they able to take advantage of unwrapping, e.g. I've got one of these readers and its still got some data buffered, I get a stream for it, I finish consuming previously buffered data, now am I able to go directly against the underlying stream rather than through the intermediary? What would need to be exposed on the interfaces to make that possible?

it's like saying new should be async

If you're talking about back pressure, yeah, that's a thing, e.g. if you're working with a pool of buffers and you want to throttle the number of buffers that can be outstanding, you make the rental async.

I understand it increases the state machine but does it matter if you're reading big chunks of data

You're often not reading big chunks of data. :-) But, regardless, the difference between the 32 bytes for ReadOnlySequence<T> and the 40 for ReadResult is fine; it's really more about ReadOnlySequence<T> being so big, but it is what it is.

benaadams commented 5 years ago

it's really more about ReadOnlySequence<T> being so big, but it is what it is.

It can always be shrunk by 8 bytes to 24 bytes by deconstructing the two SequencePositions into parts in the ReadOnlySequence<T> as 8 bytes of it is struct padding.

davidfowl commented 5 years ago

How does that interact with GetMemory + Advance?

I'd make it throw if the caller was in between a GetMemory/GetSpan and Advance call. The extension method version of WriteAsync would call GetMemory and Advance in a loop to write the data followed by FlushAsync (See https://github.com/dotnet/corefx/blob/c710854045bf7ffdffff3c2127b8fddf533ed46f/src/System.Memory/src/System/Buffers/BuffersExtensions.cs#L114).

There are ~300 public APIs across corefx that take a Stream as input. It's also not just about coreclr/corefx, but the ecosystem at large. There are two key aspects to exchange types. One is that they're in a place where everyone can use them. The other is that they're agreed upon as the thing that should be used. And that works best when there's one type for one concept. That's the world we've been in with Stream, and if we started promoting this interface in this fashion, we lose that, which I believe is a huge loss. I don't have a problem with the interfaces existing, but we should continue to provide the guideline that Stream is the exchange type. I don't want to see feedback to a component like, for example, Apache Arrow's C# implementation that it should start accepting these interfaces in addition to stream.

300? Can you show me that? I had a hard time looking that up. BTW I think we;re already headed down that path and I think it's a fine thing for library to offer more efficient overloads to do different things.

T[] and ArraySegment<T> have existed for years and now we have Span<T>, ReadOnlySpan<T>, Memory<T>, ReadOnlyMemory<T>, ReadOnlySequence<T>, IMemoryOwner<T>. That's not to say we should make more types for fun but more as an example that they exist for a reason and now APIs in the ecosystem at large have to adopt overloads for these types. More types mean customers have to make a choice but we usually build bridges to make types interop (see AsMemory and AsSpan on T[] or TryGetMemory for Memory\<T> to get the ArraySegment\<T> back).

I don't know why Stream is a sacred in this sense. Sometimes things evolve and they require new types. Adding overloads really isn't the worst thing in the world. Also since it's easy to build adapters in either direction the interop story would be clean.

A component should be able to take a Stream and then decide how it wants to consume it. And I think it's great that this discussion could result in allowing a consumer of Stream to have additional buffer management provided to them, optimized further by a given concrete implementation. For PipeReader/Writer, its AsStream implementation can just forward to the interfaces on the reader/writer for basically no loss of efficiency if that's the desired consumption model, and can use Read/WriteAsync if that's the desired consumption model.

I don't disagree here that's why I'm pushing for the interfaces to exist.

I'm also a little skeptical we're going to see a non-trivial number of implementations of these interfaces. Stream, PipeReader/Writer, … what else? That also suggests they shouldn't be pushed as such an exchange type.

Specifically for the JsonSerializer.Deserialize that caused this discussion, it can just accept Stream, and if it makes sense for it to go through the buffer-management layer, great, and if it doesn't, great, but that becomes an implementation detail that doesn't affect the public surface area, and that can be changed and tweaked over time. Meanwhile developers passing streams around as their exchange type get to continue doing so. And there's no confusion when you go to look at the overloads list and see Stream but also these other strange interfaces.

It's not an implementation detail though. It's about using the type system in a more honest way. I don't see this as some behind the scenes optimization and the reason I'd like the interface to exist is so that libraries have the option to get a buffer managed primitive to do reading from and writing to. Here's an alternative that you'll dislike 😄:

Today the JSON serializer and other things that do IO will each have their own buffering behavior and semantics when it comes to parsing, what pool to allocate buffers from, what buffer size to use etc. This new interface basically exposes a consumption model that free the consumer from these decisions so we can have a common component (or various components) that handles this policy and the JSON serializer (and others) just have to worry about parsing. What that would look like is the following:

public static class JsonSerializer
{
    public static ValueTask<TValue> ReadAsync<TValue>(IAsyncBufferedReader<byte> utf8Json, CancellationToken cancellationToken = default);
}

Example of using different buffering policies decoupled from the JsonSerializer. What's great about this is that we don't have to bake buffering policies into every consumer. Those can be provided by the outside so callers of the API get to choose. That's one major advantage exposing this in public API.

app.Use(async (context, next) =>
{
    // Normal reader, default buffering policy
    IAsyncBufferedReader<byte> reader = context.Request.Body.AsBufferedReader();
    Person p = JsonSerializer.ReadAsync<Person>(reader);

    // Alternative buffer policy using an arena based MemoryPool<byte>
    IAsyncBufferedReader<byte> reader = context.Request.Body.AsBufferedReader(FancyArenaBaseMemoryPool.Instance);
    Person p = JsonSerializer.ReadAsync<Person>(reader);

    // Alternative buffering policy using an 8K buffer
    IAsyncBufferedReader<byte> reader = context.Request.Body.AsBufferedReader(bufferSize: 81920);
    Person p = JsonSerializer.ReadAsync<Person>(reader);
});

I'm not sure what this means. I like the idea of being able to use these APIs to help with buffer management as an opt-in thing, and whether as a base implementation or a wrapper we'd want to have a solid implementation that helped the developer with these concerns, but it's not always going to be the right thing to use.

When you say it's not always the right thing to use, what do you mean exactly? That a developer may want to write the code themselves? Then sure I think those options all need to be there.

That sounds reasonable, as well; I think I'd be fine with that (though it'd be great to see if more fully fleshed out). That, however, also adds additional complication when you try to go the other direction: you've got one of these interfaces and you want to call something that takes a stream... what do you do? Wrap a stream around the reader/writer that wraps the stream? Presumably we'd provide helpers for that, too? Are they able to take advantage of unwrapping, e.g. I've got one of these readers and its still got some data buffered, I get a stream for it, I finish consuming previously buffered data, now am I able to go directly against the underlying stream rather than through the intermediary? What would need to be exposed on the interfaces to make that possible?

There are 2 options:

  1. You would need to add AsStream on both interfaces so that implementations that wrap a stream could just return the underlying Stream (similar to how the pipelines integration works).
  2. It could be an extension method that lived in the same assembly as the adapter with internal knowledge and the ability to unwrap it.

If you're talking about back pressure, yeah, that's a thing, e.g. if you're working with a pool of buffers and you want to throttle the number of buffers that can be outstanding, you make the rental async.

Yea I don't think we should do that. I think FlushAsync is sufficient and maybe exposing something like a Length on the writer interface could be used by the caller to determine when to FlushAsync. We don't need async versions of GetMemory or Advance.

benaadams commented 5 years ago

it's really more about ReadOnlySequence<T> being so big, but it is what it is.

It can always be shrunk by 8 bytes to 24 bytes

PR for the shrink https://github.com/dotnet/corefx/pull/35860

stephentoub commented 5 years ago

I'd make it throw if the caller was in between a GetMemory/GetSpan and Advance call. The extension method version of WriteAsync would call GetMemory and Advance in a loop to write the data followed by FlushAsync

Why add the WriteAsync though? At that point, why isn't this just Stream with this additional support added?

300? Can you show me that?

Search all .cs files in corefx with \ref\ in the path, looking for "(System.IO.Stream" or ", System.IO.Stream".

T[] and ArraySegment have existed for years and now we have Span, ReadOnlySpan, Memory, ReadOnlyMemory, ReadOnlySequence, IMemoryOwner.

APIs shouldn't be defined to take IMemoryOwner<T>. ArraySegment<T>, Span<T>, ReadOnlySpan<T>, Memory<T>, and ReadOnlyMemory<T> aren't about alternate functionality and aren't fundamentally changing the model in which a consumer writes code; they're really just about passing around the data along with an offset and length. It's also effectively free to create one from an array, and except for the limitations inherent to span, you get full fidelity round tripping between them and arrays, in that you can just ask for the original array back. Further, having all of them does actually introduce problems and friction; we've eased that with the introduction of more implicit casts, but the fact that we can introduce implicit casts further highlights that they're really all just the same thing.

I don't know why Stream is a sacred in this sense

All core exchange types are sacred. We think long and hard about introducing new ones.

It's not an implementation detail though.

Sure it is. A method is given a Stream. It's an implementation detail of that method whether it calls ReadByte vs Read vs ReadAsync(byte[], ...) vs ReadAsync(Memory, ...). It's an implementation detail whether it chooses to wrap that stream in a StreamReader and read that way. It's an implementation detail whether it chooses to call GetMemory + Advance + FlushAsync. The caller doesn't know how the method is implemented and doesn't need to know; it trusts that the implementation will do the right thing functionally and ideally will be as efficient as possible. That's kind of the definition of an implementation detail :smile:

What's great about this is that we don't have to bake buffering policies into every consumer.

It's just an abstraction, as is Stream. Everything you described can be done with just Stream as well.

Yea I don't think we should do that. I think FlushAsync is sufficient and maybe exposing something like a Length on the writer interface could be used by the caller to determine when to FlushAsync. We don't need async versions of GetMemory or Advance.

I'm simply saying that when talking about back pressure, you're talking about throttling producers access to resources in various ways, one of which is access to buffers. And currently these APIs do not provide back pressure around buffers. I'm not actually suggesting GetMemory needs to be async, and you can of course get access to buffers in other ways, but these APIs are explicitly about buffer management, so it's weird to me to talk about back pressure in this context. It's possible as you say that I just took the previous comments out of context.

davidfowl commented 5 years ago

Why add the WriteAsync though? At that point, why isn't this just Stream with this additional support added?

It allows the implementer to optimize WriteAsync calls (avoid locks and managing state across the GetMemory()/Advance). It's come in pretty handy in pipelines and allows passing through directly to an underlying stream.

Search all .cs files in corefx with \ref\ in the path, looking for "(System.IO.Stream" or ", System.IO.Stream".

Sure it is. A method is given a Stream. It's an implementation detail of that method whether it calls ReadByte vs Read vs ReadAsync(byte[], ...) vs ReadAsync(Memory, ...). It's an implementation detail whether it chooses to wrap that stream in a StreamReader and read that way. It's an implementation detail whether it chooses to call GetMemory + Advance + FlushAsync. The caller doesn't know how the method is implemented and doesn't need to know; it trusts that the implementation will do the right thing functionally and ideally will be as efficient as possible. That's kind of the definition of an implementation detail 😄

I think decoupling the buffering policy from the underlying implementation (JsonsSerializer, StreamReader etc) is a good thing and exposing this type in arguments and constructors allows for that. That said I won't die on this hill, if we have the interface and a way for a Stream to efficiently get the underlying IAsyncBufferedReader<byte> and IAsyncBufferedWriter<byte>. I still think we should take advantage of the type system and actually support taking it in natively without forcing the caller to wrap it in a stream.

I'm simply saying that when talking about back pressure, you're talking about throttling producers access to resources in various ways, one of which is access to buffers. And currently these APIs do not provide back pressure around buffers. I'm not actually suggesting GetMemory needs to be async, and you can of course get access to buffers in other ways, but these APIs are explicitly about buffer management, so it's weird to me to talk about back pressure in this context. It's possible as you say that I just took the previous comments out of context.

I understand.

davidfowl commented 5 years ago

To add to what I said, there are 3 actors here:

Today the state of the art is that the "Serializer" owns the buffering policy (which pool, what buffer size etc) and this simplifies that and removes that responsibility from it. Not only does it decouple the buffering policy, it gives the caller a cross cutting way to provide the policy (the IAsyncBufferedReader/IAsyncBufferedWriter). I that's something I think is worth consider as it both simplifies the "Serializer" and allows customers like me to specify a policy of my choice when the default isn't enough or isn't good enough for my workload.

This might actually be a clean way to avoid having to push a specific buffer pool everywhere into the BCL as it provides an abstraction to do reads and writes that encapsulate a buffering policy.

davidfowl commented 5 years ago

APIs shouldn't be defined to take IMemoryOwner. ArraySegment, Span, ReadOnlySpan, Memory, and ReadOnlyMemory aren't about alternate functionality and aren't fundamentally changing the model in which a consumer writes code; they're really just about passing around the data along with an offset and length. It's also effectively free to create one from an array, and except for the limitations inherent to span, you get full fidelity round tripping between them and arrays, in that you can just ask for the original array back. Further, having all of them does actually introduce problems and friction; we've eased that with the introduction of more implicit casts, but the fact that we can introduce implicit casts further highlights that they're really all just the same thing.

I think its a very similar situation as what we're describing here, yes it can be much less impactful to add support for Span<T> and Memory<T>, it's just another example of adding overloads that the existing ecosystem will adopt over time. I also don't believe we need to add 300 overloads all at once nor implement these interfaces in all the places at once either. Doing the work to figure out what we could do is valuable though, I'll look into that. I think having the adapters will go along way as well.

geoffkizer commented 5 years ago

I'd like to clarify the goal here. Is it:

(1) Simplify Stream consumption (2) Improve perf by avoiding buffer copies for Streams that already do buffering (e.g. SslStream) (3) Both (4) Something else?

KrzysztofCwalina commented 5 years ago

One thing I wanted to throw in: I made IBufferWriter an interface, as it was implemented by some structs. I don't see a reason to implement the async variants of it by structs, so I am not sure they should be interfaces.

KrzysztofCwalina commented 5 years ago

Also, I think it would be better to move the existing abstractions (PipeReader/Writer) into the core (if we can) than to introduce new abstractions. Less types, especially in a hierarchy, the better.

pakrym commented 5 years ago

Also, I think it would be better to move the existing abstractions (PipeReader/Writer) into the core (if we can) than to introduce new abstractions. Less types, especially in a hierarchy, the better.

PipeReader is much harder for someone to implement then IAsyncBufferedReader and is overkill as an abstraction for parsers.

ahsonkhan commented 5 years ago

Making them interfaces also has the benefit that Stream can implement both IAsyncBufferReader/IAsyncBufferWriter (if we were to decide to do so).

davidfowl commented 5 years ago

@ahsonkhan hit the nail on the head, think of it as introducing capabilities. PipeReader and PipeWriter will implement both. MemoryStream will implement IAsyncBufferWriter for e.g.

@geoffkizer The goal is to introduce interfaces that explicitly expose a contract for buffered reading and writing. That is, if some Stream or any other sink has buffers, they can be exposed to the caller in a consistent manner for reading and writing instead of forcing a copy in all cases. An added benefit is that it can make it easier to consume Streams, especially when trying to use buffer pooling or any other related buffer policy (see the above discussion for more on that https://github.com/dotnet/corefx/issues/35808#issuecomment-470676401).

geoffkizer commented 5 years ago

@davidfowl Ok, per your "instead of forcing a copy" comment, it seems like the main goal here is to avoid buffer copies. Please correct me if I'm wrong.

I am sympathetic to the goal of avoiding copies. That said, in all the perf analysis that I've seen of various scenarios, I have yet to see buffer copies show up in any meaningful way. Before we go defining new abstractions to solve this problem, I'd like to understand whether it is a real problem or not. Do you have data that demonstrates that it is?

davidfowl commented 5 years ago

@geoffkizer I don't want to argue that buffer copies are the slowest thing in the world but it can definitely affect server applications at scale. That isn't the point of this proposal though, it just one of the net positives when buffers are exposed by the underlying system. Why copy and manage buffers if you don't have to? The major win is that the buffering policy is decoupled from the consumer of the sink and the calling code is dramatically simplified:

I might be biased but this sort of loop is beautiful https://github.com/dotnet/corefxlab/blob/e075d78df60452b68d212e3333fd3f37cd28d4f0/src/System.Text.JsonLab.Serialization/System/Text/Json/Serialization/JsonSerializer.Read.Pipe.cs#L53-L72 compared to this one https://github.com/dotnet/corefxlab/blob/e075d78df60452b68d212e3333fd3f37cd28d4f0/src/System.Text.JsonLab.Serialization/System/Text/Json/Serialization/JsonSerializer.Read.Stream.cs#L58-L116.

So lets focus on that being the main benefit and treat less copies as an added bonus.

geoffkizer commented 5 years ago

@davidfowl Ok, so if I'm understanding you correctly, the main goal here is to simplify Stream consumption. Please correct me if I'm wrong.

I'm very sympathetic to that goal. The most straightforward way to do that, it seems to me, is to add some utility classes that provide simple buffer management on top of a Stream. This is fairly straightforward to do, and doesn't require any new abstractions.

davidfowl commented 5 years ago

@geoffkizer Sure but then lets also consider what it would look like if you could just implement them anywhere, like on Streams themselves, you don't pay the cost of the helper wrapper object if the underlying system is enlightened. This abstraction is also trying to be extremely minimal so that it's easier to implement (well you have to implement buffering but not a bunch of other virtuals 😄). So yes, the methods I mention above AsBufferedReader and AsBufferedWriter cover your suggestion.

geoffkizer commented 5 years ago

@davidfowl

Sure but then lets also consider what it would look like if you could just implement them anywhere, like on Streams themselves

The problem is, this isn't a great fit for implementing on Stream.

I think we get confused here because some Streams, like SslStream, already do their own buffering. So we naturally want to define a way to have direct access to the SslStream buffer, so we can avoid copies.

The problem is that the buffering requirements of arbitrary users -- whether JsonSerializer or whatever -- are different that the buffering requirements of SslStream itself. SslStream has very specific buffering needs; it needs to fill its buffer until it has a full SSL frame, then it needs to decrypt it in place and deliver this to the user. This is very different than the IAsyncBufferedReader defined above.

davidfowl commented 5 years ago

I don’t see why that couldn’t be implemented but maybe I’m missing something obvious. Do you have an example of why the above wouldn’t be a good fit and also a proposal for what this new API that did want to expose the buffers would look like?

I think buffering shouldn’t be a black box which it is today and I’m not sure I’ve seen examples as to why it needs to be hard and unique. Actually a ton of other systems have similar patterns in other ecosystems and it seems to work well.

But I’m open for suggestions to tweak the above design.

geoffkizer commented 5 years ago

Actually a ton of other systems have similar patterns in other ecosystems and it seems to work well.

Can you point me to examples?

jkotas commented 5 years ago

a ton of other systems have similar patterns in other ecosystems and it seems to work well.

Could you please share examples? Are these patterns platform-wide exchange types or are they domain-specific?

I agree that there are number of domain-specific ones. For example, I have worked on a cooperative buffer management scheme optimized for server-side processing of images some years ago. But I have not seen a platform-wide exchange types like these.

davidfowl commented 5 years ago

netty’s buffers are pushed to consumers and the entire system is built on top of that (it’s sorta cooperative memory management). Node streams throw buffers at you (https://medium.freecodecamp.org/node-js-streams-everything-you-need-to-know-c9141306be93) but doesn’t let you put them back.

I can’t read rust well but I believe tokio has similar semantics.

I don’t see how ssl stream is special here though, it would seem intuitively, that if you can implement it when the buffer is passed in, you should be able to implement it without that no?

davidfowl commented 5 years ago

I agree that there are number of domain-specific ones. For example, I have worked on a cooperative buffer management scheme optimized for server-side processing of images some years ago. But I have not seen a platform-wide exchange types like these.

We should be ambitious. I think the above abstraction is a really good start for encapsulating the buffering logic for a steam of incoming data and also outgoing data.

It isn’t tied to any specific pool implementation, can be implemented on types that buffer themselves (barring complications like steams have a bunch of other virtuals that need to interact with this properly)

geoffkizer commented 5 years ago

I don’t see how ssl stream is special here though, it would seem intuitively, that if you can implement it when the buffer is passed in, you should be able to implement it without that no?

It depends on what you mean.

If the goal is to just expose the buffer rather than copying it out to another buffer, that probably isn't too hard to do. I haven't thought through it.

But IAsyncBufferedReader requires much more than that; it requires the Stream to buffer an arbitrary amount of incoming data, and keep track of how much of this data has been "consumed" and "examined".

geoffkizer commented 5 years ago

netty’s buffers are pushed to consumers and the entire system is built on top of that (it’s sorta cooperative memory management).

Can you provide a link to a specific example?

Node streams throw buffers at you (https://medium.freecodecamp.org/node-js-streams-everything-you-need-to-know-c9141306be93) but doesn’t let you put them back.

This seems like a poor example, because it doesn't allow you to "put them back," which seems essential in the IAsyncReaderBuffered model.

davidfowl commented 5 years ago

Can you provide a link to a specific example?

Netty uses (ByteBuf https://netty.io/4.0/api/io/netty/buffer/ByteBuf.html). The transport layer rents a buffer from the pool and that flows to a ChannelHandler (usually a https://netty.io/4.0/api/io/netty/handler/codec/ByteToMessageDecoder.html or similar). The ByteBuf keeps trace of consumed data (via indexes) so that the next call to the handler will have the right set of data.

But IAsyncBufferedReader requires much more than that; it requires the Stream to buffer an arbitrary amount of incoming data, and keep track of how much of this data has been "consumed" and "examined".

That's a fundamental part of the buffer management though right? When you're parsed some data and it isn't enough you need to store some to combine with what you're doing to get next. Isn't that naturally what you open code today each time you use a Stream? It also means you have a place to return the consumed data if that came from a memory pool. That same rent/return pattern is encapsulated nicely here.

geoffkizer commented 5 years ago

Netty uses (ByteBuf

I think what would be really helpful here is, to understand how Netty surfaces data from buffered streams (e.g. SSL streams, or compressed streams, or whatever) to consumers, and how they are expected to manage the returned data.

davidfowl commented 5 years ago

Have at it 😄 https://github.com/netty/netty/blob/4.1/handler/src/main/java/io/netty/handler/ssl/SslHandler.java. Generally netty throws around buffers. At a super high level handlers get called back when a buffer is ready and the consumer of the buffer can process it, mark some data as consumed so that it can be returned to the underlying pool (see https://github.com/netty/netty/blob/0811409ca3f4a8df94c9d2e921c47c7487e90793/codec/src/main/java/io/netty/handler/codec/ByteToMessageDecoder.java#L269) and the remaining data can be copied or merged via a list (there are different strategies).

Anyways, we're way off topic here. I don't see any fundamental blockers to making SSL Stream work or SSL work in general even with consumed and examined. I think the above interfaces nicely encapsulates some of the basic behavior required to manage memory with a minimal set of APIs. I'm happy to discuss more examples though, the more concrete the better. Maybe we can even write some pseudo code to help flesh out any concerns.

oliverjanik commented 4 years ago

I believe we .NET needs a more lightweight Stream alternatives akin to Golangs Reader/Writer. The proposed interfaces look like they fit the purpose nicely 👍

Without these, all the work that has been done on Spans, Memory and others feels a bit incomplete. I find myself reverting back to byte[] or Streams in a lot of situations.

jkotas commented 2 years ago

Similar to https://github.com/dotnet/runtime/issues/66611