dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.09k stars 1.56k forks source link

Leaking `_Closure`s et al when using overlapping `Directory.watch` streams #52703

Closed jensjoha closed 1 year ago

jensjoha commented 1 year ago

TL;DR: I can leak as much ram as I want. And the analyzer does.

Consider this example:

import 'dart:async';
import 'dart:developer';
import 'dart:io';

final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions = [];

void main() async {
  print('pause...');
  debugger();
  for (int i = 0; i < 10; i++) {
    await create(Uri.base);
    print('$i');

    print('pause...');
    debugger();
  }
  print('Now created 10 ones...');
}

Future<void> create(Uri uri) async {
  // Create temporary one.
  List<StreamSubscription<FileSystemEvent>> tmpSubscriptions =
      <StreamSubscription<FileSystemEvent>>[];
  Stream<FileSystemEvent> stream = Directory.fromUri(uri).watch();
  tmpSubscriptions.add(stream.listen((event) {}));

  // Cancel previous one(s) if any.
  for (StreamSubscription<FileSystemEvent> prev in watcherSubscriptions) {
    await prev.cancel();
  }
  watcherSubscriptions.clear();

  // Create new one.
  stream = Directory.fromUri(uri).watch();
  watcherSubscriptions.add(stream.listen((event) {}));

  // Cancel temporary one.
  for (StreamSubscription<FileSystemEvent> prev in tmpSubscriptions) {
    await prev.cancel();
  }
}

which could be run for instance (with the file saved as leaking_closures) like this:

out/ReleaseX64/dart --enable-vm-service --serve-observatory leaking_closures.dart

This will start the observatory.

Opening it up I can go to allocation profile, hit the GC button at the top, search for _Closure and see that there's 25 of them.

I can click it, scroll down and click the all direct instances to get a list of those 25.

Clicking that list I will execute this expression:

print(map((dynamic e) => identityHashCode(e)).join(","))

and copy the data from the terminal. In my case it's

1957801911,2799861565,295749294,3607033467,2001722021,1819156124,880687672,2476592613,3065591878,1366402745,3020542853,331678246,4083017078,1514761460,1124323878,764042262,4207241379,3838757159,4215517583,219788470,556663193,3410728899,4031482779,2704432541,2323354312

I'll then go to the debugger and continue, then back to the allocation profile, hitting GC again. I now have 70 _Closure objects. Again I'll click it, click all direct instances, go to the list and this time execute

() {
var set = this.map((dynamic e) => identityHashCode(e)).toSet().difference({1957801911,2799861565,295749294,3607033467,2001722021,1819156124,880687672,2476592613,3065591878,1366402745,3020542853,331678246,4083017078,1514761460,1124323878,764042262,4207241379,3838757159,4215517583,219788470,556663193,3410728899,4031482779,2704432541,2323354312});

return this.where((e) => set.contains(identityHashCode(e))).toList();
}()

(replace the data in the hardcoded set with what you got before). I get a list of 45 new ones. I'll pick one called Closure (_cancel), click it and execute

identityHashCode(this)

In my case it gives 1365245867.

I'll also click Retaining path and I get

Closure (_cancel) {  ⊞  }
retained by onCancel of _AsyncBroadcastStreamController {  ⊞  }
retained by _controller@4048458 of _BroadcastSubscription {  ⊞  }
retained by 0 of _GrowableList (1) {  ⊞  }
retained by offset -1 of top-level final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions
retained by a GC root (user global)

Going to the debugger and continuing, back to allocation profile, hitting GC again. I now have 79 _Closure objects. Again I'll click it, click all direct instances, go to the list and execute

where((e) => identityHashCode(e) == 1365245867).toList()

(substitute with the identity hash code from above).

I get a list with one entry, I click that entry.

The retaining path is

Closure (_cancel) {  ⊞  }
retained by onCancel of _AsyncBroadcastStreamController {  ⊞  }
retained by 0 of Context (1) {  ⊞  }
retained by _context@0150898 of Closure (_close) {  ⊞  }
retained by _onDone@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by addSubscription of _AddStreamState {  ⊞  }
retained by _addStreamState@4048458 of _AsyncBroadcastStreamController {  ⊞  }
retained by _controller@4048458 of _BroadcastSubscription {  ⊞  }
retained by 0 of _GrowableList (1) {  ⊞  }
retained by offset -1 of top-level final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions
retained by a GC root (user global)

I do the same thing again, there's now 91 and the retaining path for the chosen one is

Closure (_cancel) {  ⊞  }
retained by onCancel of _AsyncBroadcastStreamController {  ⊞  }
retained by 0 of Context (1) {  ⊞  }
retained by _context@0150898 of Closure (_close) {  ⊞  }
retained by _onDone@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by addSubscription of _AddStreamState {  ⊞  }
retained by _addStreamState@4048458 of _AsyncBroadcastStreamController {  ⊞  }
retained by _controller@4048458 of _BroadcastSubscription {  ⊞  }
retained by 0 of _GrowableList (1) {  ⊞  }
retained by offset -1 of top-level final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions
retained by a GC root (user global)

and again: 103 and

Closure (_cancel) {  ⊞  }
retained by onCancel of _AsyncBroadcastStreamController {  ⊞  }
retained by 0 of Context (1) {  ⊞  }
retained by _context@0150898 of Closure (_close) {  ⊞  }
retained by _onDone@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by addSubscription of _AddStreamState {  ⊞  }
retained by _addStreamState@4048458 of _AsyncBroadcastStreamController {  ⊞  }
retained by _controller@4048458 of _BroadcastSubscription {  ⊞  }
retained by 0 of _GrowableList (1) {  ⊞  }
retained by offset -1 of top-level final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions
retained by a GC root (user global)

and for good measure once more: 115 and

Closure (_cancel) {  ⊞  }
retained by onCancel of _AsyncBroadcastStreamController {  ⊞  }
retained by 0 of Context (1) {  ⊞  }
retained by _context@0150898 of Closure (_close) {  ⊞  }
retained by _onDone@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by _previous@4048458 of _BroadcastSubscription {  ⊞  }
retained by addSubscription of _AddStreamState {  ⊞  }
retained by _addStreamState@4048458 of _AsyncBroadcastStreamController {  ⊞  }
retained by _controller@4048458 of _BroadcastSubscription {  ⊞  }
retained by 0 of _GrowableList (1) {  ⊞  }
retained by offset -1 of top-level final List<StreamSubscription<FileSystemEvent>> watcherSubscriptions
retained by a GC root (user global)

So we're leaking 12 _Closure objects per iteration and they're seemingly leaked via the _previous pointer on _BroadcastSubscription.

This might not sound that bad, but this - though package:watcher is done in Analyzer for the opened directories; package:watcher - on Linux (seemingly only on Linux though) - then adds watchers for all subdirectories (as I understand it it has to because inotify doesn't work recursivly, whereas the windows and mac equivalents do). When editing a pubspec.yaml file the analyzer basically does what the above script does and will leak similarly, but now with lots more watched it will leak a lot more.

When opening pkg and editing a pubspec.yaml file inside, waits a bit, edits the yaml file file again waits a bit (and so on and so forth) --- doing it 25 times I have 937,835 _Closures (directly using 57.2MB of ram) --- and 937,057 Contexts (28.7MB), 157,623 _BroadcastSubscriptions (16.8MB) etc etc.

At somewhere between 70 and 80 iterations it's at: _Closure: 2,998,725 (183.0MB), Context: 2,997,881 (91.5MB), _BroadcastSubscription: 501,262 (53.5MB)

/cc @lrhn

jensjoha commented 1 year ago

The many iterations isn't that far fetched btw, in https://github.com/dart-lang/sdk/issues/52447 it likely has occurred several hundred times --- though in that instance it's a mac which doesn't suffer this significantly).

jensjoha commented 1 year ago

And for completions sake: After 250 such iterations we're at

_Closure: 8,868,011 (541.3MB) Context: 8,867,233 (270.7MB) _BroadcastSubscription: 1,479,319 (158.0MB) _AsyncBroadcastStreamController 1,477,520 (112.7MB) _FutureListener: 1,475,922 (90.1MB) _InotifyFileSystemWatcher: 1,475,918 (67.6MB)

(and the heap is now 2GB+)

lrhn commented 1 year ago

The BroadcastsSubscription._previous should only hold on to other subscriptions that are currently active on the same controller. If they are actually active, it's fair that they are not GC'ed.

So, do we have a bug in the cancellation, or is the analyzer keeping subscriptions alive longer than necessary?

jensjoha commented 1 year ago

If we keep the analyzer out of it for the moment and just look at my example code --- you tell me. I'm not great with streams et al, but I'd say the old subscriptions have been canceled.

mkustermann commented 1 year ago

If we simplify the example a bit and dump a heapsnapshot:

import 'dart:async';
import 'dart:developer';
import 'dart:io';

void main() async {
  final dir = Directory.fromUri(Uri.parse('pkg/'));

  StreamSubscription? current;
  for (int i = 0; i < 15; i++) {
    final temp = dir.watch().listen((event) {});
    await current?.cancel();
    current = dir.watch().listen((event) {});
    await temp.cancel();
  }
  NativeRuntime.writeHeapSnapshotToFile('watch.heapsnapshot');

  current?.cancel();
}

then examine the watch.heapsnapshot using <sdk>/runtime/tools/heapsnapshot

(hsa) load ../../../watch.heapsnapshot
Loaded heapsnapshot from "../../../watch.heapsnapshot".          

(hsa) all = closure roots
 all {#63864}                                                                                                                                                                                                                         

(hsa) retainers -n1 -d100 sample (filter all _Closure)
There are 1 retaining paths of                                                                                                                                                                                                        
_Closure (dart:core)
⮑ ・_AsyncBroadcastStreamController.onCancel (dart:async)
    ⮑ ﹢Context ()
        ⮑ ・_Closure.context_ (dart:core)
            ⮑ ・_BroadcastSubscription._onData (dart:async)
                ⮑ ﹢_BroadcastSubscription._next (dart:async)
                    ⮑ ﹢_BroadcastSubscription._next (dart:async)
                        ⮑ ﹢_BroadcastSubscription._next (dart:async)
                            ⮑ ﹢_BroadcastSubscription._next (dart:async)
                                ⮑ ﹢_BroadcastSubscription._next (dart:async)
                                    ⮑ ﹢_BroadcastSubscription._next (dart:async)
                                        ⮑ ﹢_BroadcastSubscription._next (dart:async)
                                            ⮑ ﹢_AsyncBroadcastStreamController._firstSubscription (dart:async)
                                                ⮑ ﹢_List (dart:core)
                                                    ⮑ ・_Map.data_ (dart:collection)
                                                        ⮑ ・Isolate._idMap ()
                                                            ⮑ ・Root ()

That indicates that a static _idMap holds on to all of them, indeed we have:

class _InotifyFileSystemWatcher {
  static final Map<int, StreamController> _idMap = {};
}

Whenever we start watching a path first time we put an entry in there, whenever the last watch of a path gets cancelled we clear it out (to share multiple watches of the same path).

Looking closer how the individual fse.watch() streams get their element we find

   _pathWatched().pipe(_broadcastController);

This will unconditionally pipe all elements from the shared watch stream into the per fse.watch() controller. Since we always keep one fse.watch() open on the path, the _patchWatched() will be an infinite stream.

Internally this _pathWatched().pipe(_broadcastController) will of course makes a .listen() on the source and adds it to the destination, but more importantly, it will never cancel that subscription.

When the fse.watch() subscription gets cancelled, we only decrement some counters, but we'll actually not stop this pipe() operation.

A change like this seems to fix it

diff --git a/sdk/lib/_internal/vm/bin/file_patch.dart b/sdk/lib/_internal/vm/bin/file_patch.dart
index d60b8f95da1..353e30430b1 100644
--- a/sdk/lib/_internal/vm/bin/file_patch.dart
+++ b/sdk/lib/_internal/vm/bin/file_patch.dart
@@ -136,6 +136,7 @@ abstract class _FileSystemWatcher {

   final StreamController<FileSystemEvent> _broadcastController =
       new StreamController<FileSystemEvent>.broadcast();
+  late StreamSubscription _sourceSubscription;

   @patch
   static Stream<FileSystemEvent> _watch(
@@ -193,7 +194,13 @@ abstract class _FileSystemWatcher {
     }
     _watcherPath = _idMap[pathId];
     _watcherPath!.count++;
-    _pathWatched().pipe(_broadcastController);
+    _sourceSubscription = _pathWatched().listen((event) {
+      _broadcastController.add(event);
+    }, onError: (error, stack) {
+      _broadcastController.addError(error, stack);
+    }, onDone: () {
+      _broadcastController.close();
+    });
   }

   void _cancel() {
@@ -222,6 +229,7 @@ abstract class _FileSystemWatcher {
       _doneWatcher();
       _id = null;
     }
+    _sourceSubscription.cancel();
   }

   // Called when (and after) a new watcher instance is created and available.
jensjoha commented 1 year ago

Martins proposed fix looks to be working to me!