dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.25k stars 4.73k forks source link

API Proposal : System.IO.Pipelines generic APIs #28556

Open fredrikhr opened 5 years ago

fredrikhr commented 5 years ago

Background

Currently, the System.IO.Pipelines package only allows for pipelines that operate on raw bytes. With some trickery and the usage of MemoryMarshal et.al., the usage of Pipelines can be extended to blittable types.

The package description on NuGet states:

Single producer single consumer byte buffer management.

However, single producer single consumer scenarios are not necessarily restricted to byte operations, and the techniques described in the Blog post can apply in other scenarios as well.

Since MemoryPool, Memory, and ReadOnlySequence are generic Pipe does not necessarily need to be restricted to bytes only.

This issue proposes to add generic variants of the existing System.IO.Pipelines APIs and keep Pipe as the default implementation for byte-backed operations.

Even though System.IO.TextReader.ReadLine does allocate strings on the heap, a Pipe<string> could still be useful for implementing a line-by-line parser (e.g. a CSV-parser).

Having a generic Pipe could also allow for efficient chaining of Pipes whenever the raw data needs to go through several stages of transformation before it reaches its final higher-level data structure.

Proposed API

In order to support a generic Pipe<T> class, several of the related types in System.IO.Pipelines will need generic variants. Wherever possible, the existing types should augment the generic APIs.

All proposed changes purposefully avoid breaking changes. With this proposal old code should still work without changes.

PipeOptions

Move the existing API from PipeOptions to a new PipeOptions<T> and make PipeOptions a derivative of that.

namespace System.IO.Pipelines
{
    public partial class PipeOptions<T>
    {
        public PipeOptions(System.Buffers.MemoryPool<T> pool = null, System.IO.Pipelines.PipeScheduler readerScheduler = null, System.IO.Pipelines.PipeScheduler writerScheduler = null, long? pauseWriterThreshold = null, long? resumeWriterThreshold = null, int? minimumSegmentSize = null, bool useSynchronizationContext = true) { }
        public static System.IO.Pipelines.PipeOptions<T> Default { get { throw null; } }
        public int MinimumSegmentSize { get { throw null; } }
        public long PauseWriterThreshold { get { throw null; } }
        public System.Buffers.MemoryPool<T> Pool { get { throw null; } }
        public System.IO.Pipelines.PipeScheduler ReaderScheduler { get { throw null; } }
        public long ResumeWriterThreshold { get { throw null; } }
        public bool UseSynchronizationContext { get { throw null; } }
        public System.IO.Pipelines.PipeScheduler WriterScheduler { get { throw null; } }
    }
    public partial class PipeOptions : System.IO.Pipelines.PipeOptions<byte>
    {
        public PipeOptions(System.Buffers.MemoryPool<byte> pool = null, System.IO.Pipelines.PipeScheduler readerScheduler = null, System.IO.Pipelines.PipeScheduler writerScheduler = null, long pauseWriterThreshold = (long)32768, long resumeWriterThreshold = (long)16384, int minimumSegmentSize = 2048, bool useSynchronizationContext = true) { }
        public new static System.IO.Pipelines.PipeOptions Default { get { throw null; } }
    }
}

Note: The default values for thresholds and minimum segment size will need to be calculated depending on <T>, e.g. by using sizeof(T). Therefore, the constructor of the generic PipeOptions<T> takes nullable values instead of using const values. The constructor of the non-generic PipeOptions type remains unchanged.

ReadResult

In order to make the ReadResult struct generic, a new ReadResult<T> needs to be defined that contains the same members as the non-generic type, but which uses a generic ReadOnlySequence<T>. For simplicity an implicit conversion between ReadResult<byte> and ReadResult is added.

namespace System.IO.Pipelines
{
    public readonly partial struct ReadResult
    {
        public static implicit operator System.IO.Pipelines.ReadResult(System.IO.Pipelines.ReadResult<byte> byteResult) { throw null; }
    }
    public readonly partial struct ReadResult<T>
    {
        private readonly object _dummy;
        public ReadResult(System.Buffers.ReadOnlySequence<T> buffer, bool isCanceled, bool isCompleted) { throw null; }
        public System.Buffers.ReadOnlySequence<T> Buffer { get { throw null; } }
        public bool IsCanceled { get { throw null; } }
        public bool IsCompleted { get { throw null; } }
    }
}

PipeWriter

Similar to PipeOptions, the APIs of the original non-generic PipeWriter can be moved to a new PipeWriter<T> type, and then PipeWriter can derive from PipeWriter<byte>.

namespace System.IO.Pipelines
{
    public abstract partial class PipeWriter<T> : System.Buffers.IBufferWriter<T>
    {
        protected PipeWriter() { }
        public abstract void Advance(int count);
        public abstract void CancelPendingFlush();
        public abstract void Complete(System.Exception exception = null);
        public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> FlushAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
        public abstract System.Memory<T> GetMemory(int sizeHint = 0);
        public abstract System.Span<T> GetSpan(int sizeHint = 0);
        public abstract void OnReaderCompleted(System.Action<System.Exception, object> callback, object state);
        public virtual System.Threading.Tasks.ValueTask<System.IO.Pipelines.FlushResult> WriteAsync(System.ReadOnlyMemory<T> source, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
    }
    public abstract partial class PipeWriter : System.IO.Pipelines.PipeWriter<byte> { }
}

PipeReader

Because of the use of ReadResult in PipeReader, the APIs of this type need to be duplicated into a generic variant PipeReader<T> which used ReadResult<T>. Therefore, the original non-generic PipeReader remains unchanged, and the following new APIs are added.

namespace System.IO.Pipelines
{
    public abstract partial class PipeReader<T>
    {
        protected PipeReader() { }
        public abstract void AdvanceTo(System.SequencePosition consumed);
        public abstract void AdvanceTo(System.SequencePosition consumed, System.SequencePosition examined);
        public abstract void CancelPendingRead();
        public abstract void Complete(System.Exception exception = null);
        public abstract void OnWriterCompleted(System.Action<System.Exception, object> callback, object state);
        public abstract System.Threading.Tasks.ValueTask<System.IO.Pipelines.ReadResult<T>> ReadAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken));
        public abstract bool TryRead(out System.IO.Pipelines.ReadResult<T> result);
    }
}

Pipe

The original non-generic Pipe type is a sealed class, therefore the generic Pipe<T> should also be sealed. This makes deriving Pipe from Pipe<byte> impossible, however, so the APIs of Pipe are duplicated and made generic in Pipe<T>.

namespace System.IO.Pipelines
{
    public sealed partial class Pipe<T>
    {
        public Pipe() { }
        public Pipe(System.IO.Pipelines.PipeOptions<T> options) { }
        public System.IO.Pipelines.PipeReader<T> Reader { get { throw null; } }
        public System.IO.Pipelines.PipeWriter<T> Writer { get { throw null; } }
        public void Reset() { }
    }
}

IDuplexPipe

Introduce a generic clone IDuplexPipe<T> to the existing IDuplexPipe interface.

namespace System.IO.Pipelines
{
    public partial interface IDuplexPipe<T>
    {
        System.IO.Pipelines.PipeReader<T> Input { get; }
        System.IO.Pipelines.PipeWriter<T> Output { get; }
    }
}

Pull Request

Implementation of the proposed API changes in Pull Request dotnet/corefx#34928

davidfowl commented 5 years ago

I’m having a hard time seeing anything outside of char. Do you have any concrete examples that can help show why this is useful beyond that?

benaadams commented 5 years ago

The pattern I envisioned is using Pipelines to drive to and from the byte stream (as now); then use System.Threading.Channels to drive the object stream https://ndportmann.com/system-threading-channels/

Though AsyncEnumerables could work well here also.

Pipelines is batch (N bytes at once); and you push back to the Pipe if the batch isn't large enough (either in size or not yet complete, delimited etc)

Not sure Pipe<string> would work in this regard (need N strings to process the batch) vs here's a well know quantity of strings (in well formed object; that's either the Channel<T> or AsyncEnumerator T).

However, there is a potential gap (like in the csv example given) when parsing encoded data; if you were looking for a comma, return, emoji in encoded data (UTF16, UTF8) where the parser would need to be aware of how to interpret the byte data as in the encoding to do the search; prior to then converting to string or utf8string when found.

So something to make that step earlier could be helpful. Not sure going generic would help in this regard though?

e.g. if you were a stream of floats you'd read from the bytes with either the constraint being you need to have 4 bytes (sizeof(float)) or a collection (16 bytes for a quaternion = 4 x float); then either release the complete set of data via channel, or as a side-car on another longer lived data type, which then just indicates data is ready via non-generic Task (as its just complete or not complete, with data held elsewhere)

fredrikhr commented 5 years ago

@benadams, thanks for the link to Channels! I was not aware of these APIs and on first glance I agree that my CSV example and other line-by-line parsing scenarios probably would be better suited for use by a Channel instead of a pipeline...

fredrikhr commented 5 years ago

However, wouldn't it still be easier (or even more performant) to use a Pipe<char> instead of paying the MemoryMarshal cost for casting the buffer to chars on each read/write? Or as @benadams points out a Pipe<float>?

benaadams commented 5 years ago

would be better suited for use by a Channel instead of a pipeline...

Have the Pipeline feed the Channel

bytes => Pipe => look for new line => string => Add to Channel

Or if you are working with push

bytes => Pipe => look for new line => sync parse

@davidfowl did write an example line parser in the second style in response to a blog post

https://github.com/davidfowl/StringsAreEvil/blob/davidfowl/spanification/StringsAreEvil/Program.cs#L118-L226

paying the MemoryMarshal cost for casting the buffer to chars on each read/write?

Its an in-place cast; but generally the Pipe is a mechanism for extracting data from/to a source that deals in bytes (e.g. network, file, etc) so that conversion would be needed anyway?

fredrikhr commented 5 years ago

For an AST-Parser a Pipe<AstToken> might be useful. Take parsing a C-style if for example: After consuming all tokens for the condition and if-block, you now want to consume one more token to check whether the next token is an else. Channels (as far as I can see) cannot do that kind of peeking without actually consuming the token.

An in the AST-parser example it might very well be that AstToken is a non-blittable struct or even a class, as an AST-parser probably could get away with just using a loop over ReadLineAsync, since an AST-parser in most cases only needs to track the line (and character-position into that line) to be useful for compilers, IDEs, etc. And allocating a string to store the text-representation does not really matter in that example, since you typically end up having the entire AST (and thus the entire code file) in memory anyway.

However, an AST-parser mostly does not need to have the entire code in contiguous memory, like you get from File.ReadAllLines (or similar).

But a Pipe woule give an AST-parser the means to avoid having to traverse the file multiple times, all phases of translation from the raw text to a high level AST-representation could consume the contents the moment the earlier phase of translation has finished processing an item.

C-parser example:

A C-parser parsing the 7 phases of Translation could be implemented this way:

raw-file -> TextReader using ReadLineAsync -> Pipe<string> converts into spliced lines -> Pipe<SplicedLine> is read into a Tokenizer -> Pipe<PreProcessingToken> is put through the preprocessor -> Pipe<PostProcessingToken> is run through the un-escaper -> Pipe<UnescapedToken> is run through the string-concatenator -> Pipe<CLangToken> is interpreted by syntactical and semantic analysis.

This is obviously not an optimized scenario, but the Pipe does help composing each translation phase into its own unit, operating idependently of the others. To avoid double-allocating strings, you'd obviously use ROM pointing into sections of the string produced by the text reader.

The point of having such a composed pipeline is that none of the phases (but the last) actually need to see the entire code unit. However, multiple do need the ability to look ahead and after peeking need to make a decision whether the next item should be consumed or not. As I said, this does not really work with Channels, since reading an item consumes it. In essence that is the same problem you with streams and the reason we created the Pipeline in the first place.

fredrikhr commented 5 years ago

I the C-parser example, the preprocessor would be an example that consumes one token from the input pipe, but may write zero or more items to the output pipe, actually making the number of output items unpredictable (similar to a NetworkStream).

Code-Grump commented 5 years ago

This would be supremely useful for text-parsing. A Pipe<char>, fed by a Pipe<byte> lets me work over a set of text without having to deal with extensive buffer management.

Perhaps restricting it to unmanaged types makes the most sense, encouraging switching to Channels for higher-level objects and letting us still perform reasonable optimisations?

raffaeler commented 4 years ago

In late 2018 I demoed at #dotnext in Moscow the use of Pipelines in processing audio files. The only weirdness was the need of MemoryMarshal to convert blobs into audio chunks as the Pipelines are not able to manage anything out of bytes.

The good part of the sample is the ability to compose the blocks chaining multiple Pipes, such as:

I see a lot of cases where you may want to manage chains of blocks based on Pipelines. So I totally subscribe this proposal :)

RamType0 commented 4 years ago

I think this could be implemented so easily by just replacing some byte by T. What makes this issue neglected so longer? Constrain of T?

AlgorithmsAreCool commented 4 years ago

I have just stumbled on a personal need for a generic version of pipelines.

I have an interpreter that operates a sequence/stream of command objects and emits low level instructions.

Right now it immediately translates a command into lower level instructions one-by-one with no buffering. But I want to add an optimization stage to the interpreter so that redundant or complementary sub-sequences of commands can be considered together as a unit. The flushing semantics of pipelines aligns with this goal.

AlgorithmsAreCool commented 4 years ago

Also for a weaker anecdote with regards to the comparison to Channel,

Channels are very easy to use and work extremely well for a broad range of scenarios.

But Channel is designs for sending objects one at a time. Pipelines is clearly oriented to bulk/batched data

So to gain raw throughput I found myself re-implementing Pipelines on top of Channel to trade latency for raw throughput. Specifically, I wrapped the channel in an object with an ArrayPool and sent buffers over the channel instead of individual objects.

Since channels are already extremely fast, this is a more niche scenario, but it is one that pipelines already considered because of it's performance requirements.

FiniteReality commented 4 years ago

While using a Pipe<float> or similar for audio processing seems like an interesting concept, I personally find that treating audio as opaque bytes would simply work a lot better, particularly when dealing with various codecs where the audio primitive is not an individual sample. (E.g. Opus, MP3 and Vorbis.) Passing around pipes which contain "opaque" binary data makes composing a complete pipeline much easier, as the interface is much easier to manage.

raffaeler commented 4 years ago

@FiniteReality That's what I did and everything got more complicated with a lot of additional work and memory allocations althought I used spans to miminize them. When you transform time-based data to frequency-based data using FFT (and the reverse) having windows/arrays of floats is hugely better and avoids a lot of casting/endianess errors in all the plugins working in frequency.

FiniteReality commented 4 years ago

@raffaeler For frequency domain operations, wouldn't a Channel<FFTSample> or some such make more sense? You're performing the transform on a small section of audio, and as it's audio you're probably only really interested in 20Hz-20kHz; that seems reasonable enough to me to fit into a structure, and is probably a lot easier to pass around than a ReadOnlySequence<float> or dealing with the sliced floats.

raffaeler commented 4 years ago

@FiniteReality It doesn't really matter if you prefer a bare float or a structure like the FFTSample. The point is that I would like to push something different than bytes out of the Pipe. Probably a window of samples would be even better as that is the minimum that can be processed from each plugin. Also the audio is just an example. I have another app where I would like to push H264 frames and having a Pipe of H264Frame would be a neater solution rather than shooting byte arrays.

Let's stay focused on examples that are useful to the team in order to evaluate the opportunity to making the Pipe generic.

FiniteReality commented 4 years ago

I am; but the examples being given (in my honest opinion) are better suited for other, existing systems rather than the proposed generic API.

raffaeler commented 4 years ago

Stream-like processing is applicable to a wide number of use-cases and every time you need to transform the data, you will end up in a problem similar to the example I did. If you agree on the proposal and don't like the audio one, just propose other examples.

MineCake147E commented 1 year ago

Is there any update on this? I ran into the same issue.

ReadOnlySequence<byte> may be scattered in the middle of a certain data (e.g. Rune, Guid and Vector512<double>), once or multiple times in the worst case, making Pipe impractical to use for anything outside byte.

Additionally, Channel<T> and ConcurrentQueue<T> doesn't provide a way to efficiently insert/extract multiple items at once.