dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.08k stars 1.56k forks source link

VM seemingly leaking memory #52715

Closed jensjoha closed 1 year ago

jensjoha commented 1 year ago

With https://dart-review.googlesource.com/c/sdk/+/309722 applied and running this script in Windows (on Linux we'll run into https://github.com/dart-lang/sdk/issues/52703) located as .\pkg\analysis_server\tool\lspPkg.dart:

// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io';

const int vmServicePort = 8181;

Future<void> main(List<String> args) async {
  checkCorrectDart();
  Arguments arguments = Arguments.defaultArgs();

  File serverFile = File.fromUri(Platform.script.resolve("../bin/server.dart"));
  if (!serverFile.existsSync()) {
    throw "Couldn't find 'analysis_server/bin/server.dart' "
        "expected it at $serverFile";
  }

  File someYamlFile =
      File.fromUri(Platform.script.resolve("../../meta/pubspec.yaml"));
  if (!someYamlFile.existsSync()) {
    throw "Couldn't find yaml file --- expected it at $someYamlFile";
  }

  Process p = await Process.start(Platform.resolvedExecutable, [
    "--enable-vm-service=$vmServicePort",
    "--serve-observatory",
    "--disable-service-auth-codes",
    serverFile.path,
    "--lsp",
    "--port=9101"
  ]);

  p.stdout.listen(listenToStdout);
  p.stderr.listen(stderr.add);
  Timer.periodic(const Duration(seconds: 1), checkLongRunningRequests);

  await initialize(p, arguments);
  print("Should now be initialized.");

  int touchedCount = 0;
  Timer.periodic(const Duration(seconds: 5), (timer) async {
    // "Touch" the yaml file.
    var content = someYamlFile.readAsBytesSync();
    someYamlFile.writeAsBytesSync(content, flush: true);
    touchedCount++;
    print("Touched the yaml file $touchedCount times.");

    if (touchedCount >= 250) timer.cancel();
  });
}

Completer<bool> analyzingCompleter = Completer();

final buffer = <int>[];

int? headerContentLength;

/// There's something weird about getting (several) id 3's that wasn't
/// requested...
int largestIdSeen = 3;

RegExp newLineRegExp = RegExp("\r?\n");

Map<int, OutstandingRequest> outstandingRequestsWithId = {};

bool printedVmServiceStuff = false;

int verbosity = 0;

void checkCorrectDart() {
  Uri exe = Uri.base.resolveUri(Uri.file(Platform.resolvedExecutable));
  Uri librariesDart =
      exe.resolve("../lib/_internal/sdk_library_metadata/lib/libraries.dart");
  if (!File.fromUri(librariesDart).existsSync()) {
    throw "Execute with a dart that has "
        "'../lib/_internal/sdk_library_metadata/lib/libraries.dart' "
        "available (e.g. out/ReleaseX64/dart-sdk/bin/dart)";
  }
}

void checkLongRunningRequests(timer) {
  bool reportedSomething = false;
  for (MapEntry<int, OutstandingRequest> waitingFor
      in outstandingRequestsWithId.entries) {
    if (waitingFor.value.stopwatch.elapsed > const Duration(seconds: 1)) {
      if (!reportedSomething) {
        print("----");
        reportedSomething = true;
      }
      print("==> Has been waiting for ${waitingFor.key} for "
          "${waitingFor.value.stopwatch.elapsed}");
    }
  }
  if (reportedSomething) {
    print("----");
  } else {
    // print(" -- not waiting for anything -- ");
  }
}

Future<void> initialize(Process p, Arguments arguments) async {
  OutstandingRequest? request =
      await send(p, Messages.initMessage(pid, arguments.rootUri));
  await request?.completer.future;
  await send(p, Messages.initNotification);
  await send(p, Messages.initMore(arguments.sdkUri));

  // Wait until it's done analyzing.
  bool isAnalyzing = await analyzingCompleter.future;
  Stopwatch stopwatch = Stopwatch()..start();
  while (isAnalyzing) {
    isAnalyzing = await analyzingCompleter.future;
  }
  print("isAnalyzing is now done after ${stopwatch.elapsed}");
}

void listenToStdout(List<int> event) {
  // General idea taken from
  // pkg/analysis_server/lib/src/lsp/lsp_packet_transformer.dart
  for (int element in event) {
    buffer.add(element);
    if (verbosity > 3 && buffer.length >= 1000 && buffer.length % 1000 == 0) {
      print("DEBUG MESSAGE: Stdout buffer with length ${buffer.length} so far: "
          "${utf8.decode(buffer)}");
    }
    if (headerContentLength == null && _endsWithCrLfCrLf()) {
      String headerRaw = utf8.decode(buffer);
      buffer.clear();
      // Use a regex that makes the '\r' optional to handle "The Dart VM service
      // is listening on [..." message - at least on linux - being \n terminated
      // which would otherwise mean that we'd be stuck because no message would
      // start with 'Content-Length:'.
      List<String> headers = headerRaw.split(newLineRegExp);
      for (String header in headers) {
        if (!printedVmServiceStuff &&
            header.startsWith("The Dart VM service")) {
          print("\n\n$header\n\n");
          printedVmServiceStuff = true;
        }
        if (header.startsWith("Content-Length:")) {
          String contentLength =
              header.substring("Content-Length:".length).trim();
          headerContentLength = int.parse(contentLength);
          break;
        }
        print("stdout> $header");
      }
    } else if (headerContentLength != null &&
        buffer.length == headerContentLength!) {
      String messageString = utf8.decode(buffer);
      buffer.clear();
      headerContentLength = null;
      Map<String, dynamic> message =
          json.decode(messageString) as Map<String, dynamic>;

      // {"jsonrpc":"2.0","method":"$/analyzerStatus","params":{"isAnalyzing":false}}
      dynamic method = message["method"];
      if (method == r"$/analyzerStatus") {
        dynamic params = message["params"];
        if (params is Map) {
          dynamic isAnalyzing = params["isAnalyzing"];
          if (isAnalyzing is bool) {
            analyzingCompleter.complete(isAnalyzing);
            analyzingCompleter = Completer<bool>();
            if (verbosity > 0) {
              print("Got analyzerStatus isAnalyzing = $isAnalyzing");
            }
          }
        }
      }
      dynamic possibleId = message["id"];
      if (possibleId is int) {
        if (possibleId > largestIdSeen) {
          largestIdSeen = possibleId;
        }

        if (verbosity > 0) {
          if (messageString.length > 100) {
            print("Got message ${messageString.substring(0, 100)}...");
          } else {
            print("Got message $messageString");
          }
        }

        OutstandingRequest? outstandingRequest =
            outstandingRequestsWithId.remove(possibleId);
        if (outstandingRequest != null) {
          outstandingRequest.stopwatch.stop();
          outstandingRequest.completer.complete(message);
          if (verbosity > 2) {
            print(" => Got response for $possibleId in "
                "${outstandingRequest.stopwatch.elapsed}");
          }
        }
      } else if (verbosity > 1) {
        if (messageString.length > 100) {
          print("Got message ${messageString.substring(0, 100)}...");
        } else {
          print("Got message $messageString");
        }
      }
    }
  }
}

Future<OutstandingRequest?> send(Process p, Map<String, dynamic> json) async {
  // Mostly copied from
  // pkg/analysis_server/lib/src/lsp/channel/lsp_byte_stream_channel.dart
  final jsonEncodedBody = jsonEncode(json);
  final utf8EncodedBody = utf8.encode(jsonEncodedBody);
  final header = 'Content-Length: ${utf8EncodedBody.length}\r\n'
      'Content-Type: application/vscode-jsonrpc; charset=utf-8\r\n\r\n';
  final asciiEncodedHeader = ascii.encode(header);

  OutstandingRequest? result;

  dynamic possibleId = json["id"];
  if (possibleId is int) {
    if (possibleId > largestIdSeen) {
      largestIdSeen = possibleId;
    }
    result = OutstandingRequest();
    outstandingRequestsWithId[possibleId] = result;
    if (verbosity > 2) {
      print("Sending message with id $possibleId");
    }
  }

  // Header is always ascii, body is always utf8!
  p.stdin.add(asciiEncodedHeader);
  p.stdin.add(utf8EncodedBody);
  await p.stdin.flush();
  if (verbosity > 2) {
    print("\n\nMessage sent...\n\n");
  }
  return result;
}

/// Copied from pkg/analysis_server/lib/src/lsp/lsp_packet_transformer.dart.
bool _endsWithCrLfCrLf() {
  var l = buffer.length;
  return l > 4 &&
      buffer[l - 1] == 10 &&
      buffer[l - 2] == 13 &&
      buffer[l - 3] == 10 &&
      buffer[l - 4] == 13;
}

class Arguments {
  Uri rootUri;
  Uri sdkUri;

  Arguments(this.rootUri, this.sdkUri);

  factory Arguments.defaultArgs() {
    Uri rootUri = Platform.script.resolve("../..");
    checkDir(rootUri);

    Uri sdkUri = Uri.base
        .resolveUri(Uri.file(Platform.resolvedExecutable))
        .resolve("..");
    checkDir(sdkUri);

    var result = Arguments(rootUri, sdkUri);

    return result;
  }

  static void checkDir(Uri uri, [String? option]) {
    if (!Directory.fromUri(uri).existsSync()) {
      if (option != null) {
        throw "Directory $uri doesn't exist. "
            "Specify existing directory with $option";
      } else {
        throw "Directory $uri doesn't exist.";
      }
    }
  }
}

// Messages taken from what VSCode sent.
class Messages {
  static Map<String, dynamic> initNotification = {
    "jsonrpc": "2.0",
    "method": "initialized",
    "params": {}
  };

  static Map<String, dynamic> initMessage(int processId, Uri rootUri) {
    String rootPath = rootUri.toFilePath();
    String name = rootUri.pathSegments.last;
    if (name.isEmpty) {
      name = rootUri.pathSegments[rootUri.pathSegments.length - 2];
    }
    return {
      "id": 0,
      "jsonrpc": "2.0",
      "method": "initialize",
      "params": {
        "processId": processId,
        "clientInfo": {"name": "lspPkgTestScript", "version": "0.0.1"},
        "locale": "en",
        "rootPath": rootPath,
        "rootUri": "$rootUri",
        "capabilities": {},
        "initializationOptions": {},
        "workspaceFolders": [
          {"uri": "$rootUri", "name": name},
        ]
      }
    };
  }

  static Map<String, dynamic> initMore(Uri sdkUri) {
    String sdkPath = sdkUri.toFilePath();
    return {
      // "id": 1,
      "jsonrpc": "2.0",
      "result": [
        {
          "useLsp": true,
          "sdkPath": sdkPath,
          "allowAnalytics": false,
        }
      ]
    };
  }
}

class OutstandingRequest {
  final Stopwatch stopwatch = Stopwatch();
  final Completer<Map<String, dynamic>> completer = Completer();
  OutstandingRequest() {
    stopwatch.start();
  }
}

It opens up an analyzer session with the pkg package, then edits a pubspec.yaml file 250 times (it takes maybe 20 minutes to get there, but the leak can be seen before that), triggering the analyzer to do some work.

Opening up the observatory for the analyzer process via http://127.0.0.1:8181/ we can observe how the process memory increases. For me I get these numbers:

Initially (i.e. after the script eventually says isAnalyzing is now done after 0:02:58.668936 and Should now be initialized.):

Process
[...]
current memory  1.38GB
peak memory 1.39GB
[..]
VM
[...]
current memory  1.27GB

So initially ~0.11GB is unaccounted for.

After 25 rounds I get:

Process
[...]
current memory  1.46GB
peak memory 1.53GB
[..]
VM
[...]
current memory  1.19GB

So now ~0.27GB is unaccounted for.

After 50 rounds I get:

Process
[...]
current memory  1.62GB
peak memory 1.62GB
[..]
VM
[...]
current memory  1.18GB

So now ~0.44GB is unaccounted for.

After 100 rounds I get:

Process
[...]
current memory  1.94GB
peak memory 1.95GB
[..]
VM
[...]
current memory  1.18GB

So now ~0.76GB is unaccounted for.

After 250 rounds I get:

Process
[...]
current memory  2.90GB
peak memory 2.91GB
[..]
VM
[...]
current memory  1.18GB

So now ~1.72GB is unaccounted for.

Once the analyzer is done rebuilding it lands at

Process
[...]
current memory  3.03GB
peak memory 3.05GB
[..]
VM
[...]
current memory  1.30GB

and thus ~1.73GB unaccounted for.

Forcing GCs via Observatory doesn't change anything.

/cc @mkustermann @mraleph

mraleph commented 1 year ago

I have taken a full-heap dump of the process and looked at it in WinDBG. The difference between RSS and the Dart heap is all in the native heap.

0:000> !heap -s

************************************************************************************************************************
                                              NT HEAP STATS BELOW
************************************************************************************************************************
LFH Key                   : 0xf51b606bcc994274
Termination on corruption : ENABLED
          Heap     Flags   Reserv  Commit  Virt   Free  List   UCR  Virt  Lock  Fast 
                            (k)     (k)    (k)     (k) length      blocks cont. heap 
-------------------------------------------------------------------------------------
000001b5d2060000 00000002  470020 462504 469628  16943   939    87   44    2e0   LFH
000001b5d1ef0000 00008000      64      4     64      2     1     1    0      0      
000001b5d21a0000 00001002    1472     92   1080     16     8     2    0      0   LFH
000001b5d3a80000 00000002      60      8     60      3     1     1    0      0      
-------------------------------------------------------------------------------------

I asked WinDBG to produce some statistics (using !heap -h) and there is a large number of 64k chunks of memory allocated:

0:000> !heap -s -h 000001b5d2060000
Walking the heap 000001b5d2060000 .................................
 0: Heap 000001b5d2060000
   Flags          00000002 - HEAP_GROWABLE 
   Reserved memory in segments              469628 (k)
   Commited memory in segments              462412 (k)
   Virtual bytes (correction for large UCR) 465208 (k)
   Free space                               16943 (k) (939 blocks)
   External fragmentation          3% (939 free blocks)
   Virtual address fragmentation   0% (87 uncommited ranges)
   Virtual blocks  44 - total 118820 KBytes
   Lock contention 736
   Segments        1

...

                    Default heap   Front heap       Unused bytes
   Range (bytes)     Busy  Free    Busy   Free     Total  Average 
------------------------------------------------------------------ 
...     
 65536 -  66560     6062      1      0      0      97056     16
...
------------------------------------------------------------------ 
  Total             7578    939  62309  25302    1063842     15

This points me towards OverlappedBuffer's used by dart:io (because they are 64k sized). I can see at least one possible leak there just by looking at the code - but I don't yet know what exactly is happening.

mraleph commented 1 year ago

We are leaking data_ready_ buffers when destroying DirectoryWatchHandle.

jensjoha commented 1 year ago

Hah, after finding it didn't seem to apply to Linux I started (and is currently) running an instance where I've disabled creating new watches in the analyzer (and just reuses the old ones) --- and yeah, it seems to stop the leak (although only on iteration 70 at this point).

jensjoha commented 1 year ago

With that hack on the analyzer my Windows run after 250 iterations ends up at

Process
[...]
current memory  1.45GB
peak memory 1.54GB
[..]
VM
[...]
current memory  1.34GB

i.e. ~0.11GB unaccounted for (which was also the starting point above) --- so it all seems to be related to the .watch thing.

jensjoha commented 1 year ago

Any updates on this one?

mraleph commented 1 year ago

I have a fix - though I got stuck in some refactoring. Might consider landing without refactoring if I don't unstuck.