dart-lang / core

This repository is home to core Dart packages.
https://pub.dev/publishers/dart.dev
BSD 3-Clause "New" or "Revised" License
10 stars 2 forks source link

Feature request: Flattening StreamQueue #326

Open lexaknyazev opened 4 years ago

lexaknyazev commented 4 years ago

When dealing with unaligned or highly-compressed binary inputs, it's usually convenient to request data byte-by-byte. Of course, doing this directly on IO level is quite limiting, so an intermediate caching layer is desired.

Ad-hoc combination of StreamQueue with a simple wrapper helps with this like:

Stream<List<int>> chunkStream; // input stream from file or network IO

Stream<int> _byteStream(Stream<List<int>> chunkStream) async* {
  await for (final chunk in chunkStream) {
    for (final byte in chunk) {
      yield byte;
    }
  }
}

final byteQueue = StreamQueue<int>(_byteStream(chunkStream));

It would be nice to have this functionality properly implemented internally without extra wrapping overhead:

factory StreamQueue.flatten(Stream<Iterable<T>> source); 

Similar feature is already implemented in package:collection for regular iterables, see CombinedIterableView.

Similar feature may also be implemented as a wrapper for StreamIterator from the core SDK for those who do not need queues.

natebosch commented 4 years ago
final byteQueue = StreamQueue(chunkStream.expand((l) =>l));

Similarly CombinedIterableView can be replaced with iterable.expand((e) => e). I'm not sure how much value it buys to name these concepts...

lexaknyazev commented 4 years ago

CombinedIterableView is slightly more performant (~6% using JIT) than expand((e) => e).

Also, stream.expand has a slightly different behavior than the snippet above: it adds all elements of the inner iterable to the event sink without waiting: https://github.com/dart-lang/sdk/blob/44316eaadef1150b2e9d5375f30fba8004a468aa/sdk/lib/async/stream_pipe.dart#L248-L252

natebosch commented 4 years ago

Makes sense. I think we could accept a PR for this.

lrhn commented 4 years ago

Having a method which flattens a Stream<List<T>> into a Stream<T> would be fine, but embedding it into StreamQueue would probably not be worth it. The use case is too specialized for the functionality to carry its own weight.

natebosch commented 4 years ago

We added Iterable<T> get flattened for iterables. An extension in this package for Stream<T> get flattened would be a good addition.

lrhn commented 4 years ago

Interestingly, we can actually have both of:

extension X1<T> on Stream<Stream<T>> {
  Stream<T> flatten() async* {
    await for (var s in this) yield* s;
  }
}
extension X2<T> on Stream<Iterable<T>> {
  Stream<T> flatten() async* {
    await for (var it in this) for (var e in it) yield e;
  }
}

Can't do that with instance methods!