Open bsutton opened 2 years ago
It's true that with asynchronous delivery, the consumer may not be able to stop production in a timely manner.
If you eagerly add data to the stream, and only stop on a pause
, then you can have a lot of data events already scheduled and being added while waiting for the first of those events to be delivered.
It does surprise me slightly that pause+resume isn't enough, because the implementation of (non-broadcast) stream subscriptions are such that resuming while there are buffered events delays the actual resume until the buffered events are gone. So, if you stuff thousands of files into the stream, then the stream's await for
loop pauses on the first event, and then it resumes again, the producer of that stream shouldn't get a resume event until the thousands of buffered files are all consumed.
I'd check again that something isn't ignoring the pause.
The problem is that there is no way for either the consumer nor the producer to know when all the events have been consumed.
Having access to a count would solve the problem.
I've been looking at the quiver StreamBuffer (haven't actually ran it up as yet) as it may go some way alleviate the problem but still doesn't solve the underlying issue. The stream buffer pauses the producer when its buffer hits a defined limit.
The producer could still get away from it but after a while the stream should be pulled back to the StreamBuffer's imposed limit. This would certainly be better than my current implementation which completely runs away.
This does assume that the async event queue does a reasonable job of scheduling task. I'm guessing that this might be a poor assumption. If the consumer is heavily cpu bound and only does sync operations I'm guessing that the event loop will never run (I assume it only gets schedule when an async operation occurs given we are using a non-preemptive threading model).
Of course if the event loop can't run then I assume that the produce won't run either.
My understanding is that file operations actually run in a different thread. Does this mean that Directory.list will keep buffering even when the stream is paused?
This still feels like I'm trying to work between the cracks rather than having a coherent and reliable solution.
Given that Stream is a fundamental part of the Dart library this seems like an problematic state of affairs.
At the very least this problem should be acknowledged in the documentation. My take away from the existing doco is that if you use pause/resume correctly then the system will behave itself, this is clearly not correct.
So I've hacked around and did come up with a workable but somewhat ugly solution.
The solution breaks the stream contract as you can't use controller.sink and you must use controller.asyncAdd. But it does solve the out of memory issue. My original app was consuming 1.6GB of ram to scan my disk whereas this version maxes out at under 200MB. Still more than I think it should need but the memory does seem to be released at the end.
So I guess there are two issues: 1) the doco is currently misleading. My takeaway from the doco was that if you use pause/resume you won't have memory issues. This is clearly not the case. 2) There doesn't seem (at least within my abilities) to be a way to resolve this problem and maintain the stream contract. It would be nice to see an off the shelf solution. As we seem more server side apps emerge this will become more of a problem.
I believe that 1) should be fixed. 2) would be a nice to have.
import 'dart:async';
import 'async_circular_buffer.dart';
class LimitedStreamController<T> implements StreamController<T> {
/// Creates a new [LimitedStreamController] that limits the no.
/// of elements that can be in the queue.
LimitedStreamController(
{void Function()? onListen, void Function()? onCancel, bool sync = false})
: _streamController = StreamController<T>(
onListen: onListen, onCancel: onCancel, sync: sync);
// _buffer = AsyncCircularBuffer(limit);
// final AsyncCircularBuffer<T> _buffer;
final StreamController<T> _streamController;
/// Returns the no. of elements waiting in the stream.
int get length => _count; // _buffer.length;
@override
bool get isClosed => _streamController.isClosed;
@override
bool get hasListener => _streamController.hasListener;
@override
bool get isPaused => _streamController.isPaused;
@Deprecated('Use asyncAdd')
@override
void add(T event) {
throw UnsupportedError('Use asyncAdd');
}
/// Tracks the no. of elements in the stream.
var _count = 0;
/// Used to indicate when the stream is full
var _full = Completer<bool>();
/// Add an event to the stream. If the
/// stream is full then this method will
/// wait until there is room.
Future<void> asyncAdd(T event) async {
// await _buffer.add(event);
if (_count > 0) {
await _full.future;
}
_full = Completer<bool>();
_count++;
_streamController.add(event);
}
@override
// ignore: prefer_expression_function_bodies
Stream<T> get stream async* {
/// return _buffer.stream();
await for (final element in _streamController.stream) {
_count--;
_full.complete(true);
yield element;
}
}
@override
void addError(Object error, [StackTrace? stackTrace]) {
_streamController.addError(error, stackTrace);
}
@override
Future addStream(Stream<T> source, {bool? cancelOnError = true}) {
throw UnsupportedError('Use asyncAdd');
// return _streamController.addStream(source, cancelOnError: cancelOnError);
}
@override
Future<dynamic> close() => _streamController.close();
@override
Future get done => _streamController.done;
// @override
// StreamSink<T> get sink => _streamController.sink;
@override
StreamSink<T> get sink => throw UnsupportedError('Use asyncAdd');
@override
set onListen(void Function()? onListenHandler) {
_streamController.onListen = onListenHandler;
}
@override
ControllerCallback? get onListen => _streamController.onListen;
@override
set onPause(void Function()? onPauseHandler) {
_streamController.onPause = onPauseHandler;
}
@override
ControllerCallback? get onPause => _streamController.onPause;
@override
set onResume(void Function()? onResumeHandler) {
_streamController.onResume = onResumeHandler;
}
@override
ControllerCallback? get onResume => _streamController.onResume;
@override
set onCancel(void Function()? onCancelHandler) {
_streamController.onCancel = onCancelHandler;
}
@override
ControllerCancelCallback? get onCancel => _streamController.onCancel;
}
I'm trying to write a function that creates a baseline of a disk for pci compliance.
The job of the function is to find every file on disk and create a checksum of each file.
I'm using streams to accomplish this.
The producer scans the Disk using the Directory.list method adding each file to a stream.
The producer implements the onListen, onPause, onResume and onCancel methods. When the consumer is paused it stops adding files to the stream.
The consumer listens to the stream takes a file and calculates the checksum. The consumer pauses each time an event arrives and resumes as it completes each event.
The problem is that the app is using large quantities of memory (> 500MB).
Performing a memory analysis shows that the items placed in the stream are consuming the memory.
I don't believe this is a memory leak but rather a gap in the design of the stream pause/resume logic.
The issue is that the producer is fast, able to put several thousand files into the stream every second (pumping 2M files through the stream and then just discarding them takes about 20 seconds).
On the other hand the consumer is slow taking several minutes for multi-GB files.
Every time the consumer resumes the stream the producer is scheduled and it is able to place several thousand files into the stream.
The result is that we end with the stream containing over 1M files despite using the appropriate pause/resume logic.
This feels like a fundamental design flaw in streams as there appears to be no way to reliably stop a system running out of memory if there is a significant disparity between the producer and consumer and a large enough data set is being processed.
In the ideal situation I should be able to control the rate that I fill the stream to ensure that I don't run out of memory.
P.S. I'm the developer of DCli and the above code uses dcli's find function which generates the above stream. The app is as much a test case for dcli's find function working correctly as anything. The point being that a non-stream solution isn't what I'm looking for.