dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
14.99k stars 4.67k forks source link

[API Proposal]: Aggregate streams and treat them as one #83416

Open jozkee opened 1 year ago

jozkee commented 1 year ago

Background and motivation

Another follow-up to our stream backlog, yet another popular operation requested for streams is the ability to aggregate multiple and treat them as one super stream.

While I haven't found any application of aggregated streams in our libraries, there's plenty of posts requesting it: https://stackoverflow.com/q/3879152/4231460 https://www.c-sharpcorner.com/article/combine-multiple-streams-in-a-single-net-framework-stream-o/

API Proposal

namespace System.IO
{
    public class AggregatedStream : Stream
    {
        public Stream? Current { get; }
        public AggregatedStream(Stream first, Stream second, bool leaveOpen = false) { }
        public AggregatedStream(Stream[] streams, bool leaveOpen = false) { }

        // all the necessary overrides...
    }
}

API Usage

Stream[] streams = { stream1, stream2 };
using AggregatedStream aggregatedStream = new(streams);

int bytesRead;
byte[] buffer = new byte[1024];

while ((bytesRead = aggregatedStream.Read(buffer)) > 0) 
{
    // Do something...
}

Notes

Writability:

It is likely that we should not allow writing, e.g: when you create a new file (with File.Create and get the FileStream) it always starts with length zero, it seems unintuitive that you want to aggregate multiple streams for writing to them if this is the common case for most writable streams, unless you are using Streams with fixed length as a buffer/cache.

Seekability:

There's three ways to support seeking IMO.

Disposing:

If leaveOpen is false, we can dispose all Streams once we finish reading them all, doing this will also enable seeking throughout multiple inner streams when supported.

Alternatively, early disposing could still be achieved by using the Current property:

while ((bytesRead = aggregatedStream.Read(buffer)) > 0)
{
    // Do something...
    // ...
    // dispose all streams that we no longer need.
    int idx = Array.IndexOf(streams, aggregatedStream.Current);
    for (int i = 0; i < idx; i++)
    {
        streams[i].Dispose();
    }
}

Alternative Designs

Alt. desing 1: If we are not confident about needing a public Stream-derived type, we could just add a factory:

namespace System.IO
{
    public static class StreamFactory
    {
        public static Stream Aggregate(params Stream[] streams) { }
        public static Stream Aggregate(Stream[] streams, bool leaveOpen = false) { }
    }
}

Alt. design 2: Use a different name. As prior-art we have string.Concat, we could follow the pattern and call them ConcatStream and StreamFactory.Concat respectively.

Another words I saw in the wild used interchangeably are concatenate, combine, chain, merge.

Risks

No response

ghost commented 1 year ago

Tagging subscribers to this area: @dotnet/area-system-io See info in area-owners.md if you want to be subscribed.

Issue Details
### Background and motivation Another follow-up to [our stream backlog](https://github.com/dotnet/runtime/issues/58216), yet another popular operation requested for streams is the ability to aggregate multiple and treat them as one super stream. While I haven't found any application of aggregated streams in our libraries, there's plenty of posts requesting it: https://stackoverflow.com/q/3879152/4231460 https://www.c-sharpcorner.com/article/combine-multiple-streams-in-a-single-net-framework-stream-o/ ### API Proposal ```cs namespace System.IO { public class AggregatedStream : Stream { public Stream? Current { get; } public AggregatedStream(Stream first, Stream second, bool leaveOpen = false) { } public AggregatedStream(Stream[] streams, bool leaveOpen = false) { } // all the necessary overrides... } public static class StreamFactory { // convenience method, passes leaveOpen = false to the ctor. public static AggregatedStream Aggregate(params Stream[] streams) { } } } ``` ### API Usage ```cs Stream[] streams = { stream1, stream2 }; using AggregatedStream aggregatedStream = new(streams); int bytesRead; byte[] buffer = new byte[1024]; while ((bytesRead = aggregatedStream.Read(buffer)) > 0) { // Do something... } ``` ### Notes Writability: It is likely that we should not allow writing, e.g: when you create a new file (with `File.Create` and get the `FileStream`) it always starts with length zero, it seems unintuitive that you want to aggregate multiple streams for writing to them if this is the common case for most writable streams, unless you are using Streams with fixed length as a buffer/cache. Seekability: There's three ways to support seeking IMO. * The combined capabilities of all the inner streams: it may be restrictive over streams that support seeking. * The capabilities of the current inner stream: it may be inconsistent with the ones that **do not support** it. * Do not support seeking ever. Disposing: If `leaveOpen` is false, we can dispose all Streams once we finish reading them all, doing this will also enable seeking throughout multiple inner streams when supported. Alternatively, early disposing could still be achieved by using the `Current` property: ```cs while ((bytesRead = aggregatedStream.Read(buffer)) > 0) { // Do something... // ... // dispose all streams that we no longer need. int idx = Array.IndexOf(streams, aggregatedStream.Current); for (int i = 0; i < idx; i++) { streams[i].Dispose(); } } ``` ### Alternative Designs Alt. desing 1: We could leave out AggregateStream form the public and just keep the factory: ```cs namespace System.IO { public static class StreamFactory { public static Stream Aggregate(params Stream[] streams) { } } } ``` Alt. design 2: Use a different name. As prior-art we have `string.Concat`, we could follow the pattern and call them `ConcatStream` and `StreamFactory.Concat` respectively. Another words I saw in the wild used to describe the same concept: **concatenate, combine, chain, merge**. ### Risks _No response_
Author: Jozkee
Assignees: -
Labels: `api-suggestion`, `area-System.IO`, `untriaged`
Milestone: -
jeffhandley commented 1 year ago

One note on the naming. "Aggregate" does not communicate the way in which the streams are aggregated. A word like "Concatenate" can communicate more precisely what form of aggregation will be used.

Aggregate:

taking all units as a whole

Concatenate:

to link together in a series or chain

vukovinski commented 1 year ago

If writing to multiple streams would not be the primary use case (which I don't understand why), what would be the semantics of the obverse case, i.e. reading from multiple streams?

Could this be implemented as delegate based with asynchronous operations provided by the abstraction itself?

public class AggregatedReadStreamBuilder
{
    public AggregatedReadStreamBuilder(Action<Span<byte>,Span<byte>> onBothRead) { ... }
    public AggregatedReadStreamBuilder(Action<Span<byte>> onFirstRead, Action<Span<byte>> onSecondRead) { ... }
}

Just brainstorming. But I don't see the immediate value of an abstraction such as AggregatedStream. I do think there are some more derived versions with more clear use cases would be a better way forward: such as a read only 2 stream combinator that tries reading from the left stream and if that fails, tries reading from the right stream. Or a 3 stream variant of the same. However, for writing, those combinators don't make much sense because, likely, you will want all writes to succeed. 🫤

jozkee commented 1 year ago

what would be the semantics of the obverse case, i.e. reading from multiple streams?

For both read and write, this type is a concatenation of the specified first and second Streams. As @jeffhandley pointed out, we need a name that better alludes at a concat. operation.

Could this be implemented as delegate based with asynchronous operations provided by the abstraction itself?

For this type, I don't think that's desired if we want to keep it simple. Having a type that reads/writes from multiple backing streams at once asynchronously and reports the results via a delegate is specific enough to have its own API proposal IMO.

But I don't see the immediate value of an abstraction such as AggregatedStream.

Can you please elaborate? In the StackOverflow post I linked, it seems to me that stream concatenation is quite a common thing needed by .NET developers. If this API has potential to be problematic (pit-of-failure), that I can buy, but I need you (or anyone) to state the reasons.

vukovinski commented 1 year ago

Looking at the article, yes, the below makes much more sense.

https://stackoverflow.com/q/3879152/4231460

It uses the IEnumerable interface in the constructor, which is much more potent than the original proposal, and it has a simple semantics, as both of you mentioned, concatenation, therefore, one stream after the other.

The source of my confusion might be that backlog on streams, which implies some bigger design work.

class ConcatenatedStream : Stream
{
    Queue<Stream> streams;

    public ConcatenatedStream(IEnumerable<Stream> streams)
    {
        this.streams = new Queue<Stream>(streams);
    }
}
jeffhandley commented 1 year ago

We're moving this out to Future and it will be considered again for .NET 9.

WeihanLi commented 6 months ago

think this can be related to the TextWriter.CreateBroadcasting https://github.com/dotnet/runtime/issues/93623

maybe we could use CreateBroadcasting to align the new TextWriter.CreateBroadcasting API

namespace System.IO;

public static class Stream
{
    public static Stream CreateBroadcasting(params Stream[] streams);
    public static Stream CreateBroadcasting(Stream[] streams, bool leaveOpen = false);
}
jozkee commented 5 months ago

@WeihanLi no, TextWriter.CreateBroadcasting writes the same input to all writers. In this case, you want to read or write sequentially to the inner streams and only jump to the next one when the current inner stream position reaches the end.

WeihanLi commented 5 months ago

@Jozkee many thanks for the explanation