dart-lang / sdk

The Dart SDK, including the VM, JS and Wasm compilers, analysis, core libraries, and more.
https://dart.dev
BSD 3-Clause "New" or "Revised" License
10.24k stars 1.57k forks source link

File operations hang forever after opening unrelated named pipe #55844

Open cbenhagen opened 5 months ago

cbenhagen commented 5 months ago

After opening a named pipe in an unrelated part of the code, async file operations like .delete() or .length() will hang forever. Interestingly this only starts to happen with 46 and more of them. In the code below, I added a timeout to make it more visible.

import 'dart:async';
import 'dart:io';

void main() async {
  final subscription = await readFromNamedPipe();

  // Changing this to 45 and lower does not cause the timeout
  final dir = await createTmpFiles(46);
  try {
    await deleteFiles(dir);
    print('Done deleting files');
  } on TimeoutException {
    print('Timeout while deleting files');
  } finally {
    await dir.delete(recursive: true);
  }

  await subscription.cancel();
}

Future<StreamSubscription> readFromNamedPipe() async {
  final dir = await Directory.systemTemp.createTemp();
  var fifo = File('${dir.path}/fifo');
  await Process.run('mkfifo', [fifo.path]);

  return fifo.openRead().listen((_) {});
}

Future<Directory> createTmpFiles(int count) async {
  final dir = await Directory.systemTemp.createTemp();

  for (var i = 0; i < count; i++) {
    await File('${dir.path}/file$i').create();
  }
  return dir;
}

Future<void> deleteFiles(Directory dir) async {
  List<Future<void>> futures = [];

  await for (var entity in dir.list()) {
    if (entity is File) {
      futures.add(
        entity.delete().timeout(const Duration(milliseconds: 200)),
      );
    }
  }

  await Future.wait(futures);
}

Tested on macOS 14.5 and Fedora 40 Dart SDK version: 3.4.0 (stable) (Mon May 6 07:59:58 2024 -0700) on "macos_arm64" Dart SDK version: 3.4.0 (stable) (Mon May 6 07:59:58 2024 -0700) on "linux_x64"

lrhn commented 5 months ago

Also reproduces on Linux for me. Since it exists on more than one OS, it's probably a Dart issue, not an OS issue. Seems to depend on size of file-entries in directory. I changed the filename format to file#### with leading zeros, so the number of files doesn't change the file name length. I also open the FIFO for writing ahead of time, just to be prepared to close it.

With that, the problem starts at 45 files, with one file not being deleted. (It seems that it was opening the FIFO that change the number from 46 to 45, so maybe it's related to open file descriptors.)

My test program below has the following behavior for different temporary file counts:

That's a consistent 31/32 distance between changes, which suggests that it's related to some kind of block size. (Or chunks of open file descriptors.) That's corroberated by the files that are not deleted consistently being at the same position in the iteration for any file-count, but differs for different file counts.

My best guess is that the async list and multiple async deletes in the same directory are somehow introducing a race condition, that makes something go wrong.

Maybe the delete tries to remove a file at the same time that the previous delete causes the directory structure to be updated, and then the second delete fails to update the directory structure after deleting the file. Or the directory structure is updated and compressed just as the iteration is about to switch to a new sector, which is now no longer part of the directory. That all depends on the individual operations not being atomic, and iteration not being stable against deletes (it probably is against atomic deletes, but if the delete isn't atomic, who knows what happens.)

Or maybe it's Dart reusing file descriptors. I know MacOS is known for having a low number of available file descriptors, so if Dart tries to reuse, and does so too early, that might have an effect. (But that's guessing blindly, only based on that doing fifo.openWrite() makes the problems start at 45 instead of 46.

Why that depends on having an open FIFO is where it gets weird. Or rather, depends on having an open FIFO file read that is blocked on missing input. If I close the FIFO file by fifo.openWrite().close(); before the deletion step, the issue goes away. (Doing the fifo-write-close after deleting prevents the subscription.cancel() from hanging, but doesn't avoid the failing deletions.)

So something goes wrong with deleting, and sometimes with iterating.

:warning: Notice :warning:: The pattern used here, of asynchronously iterating the directory, deleting files asynchronously while iterationg, and just storing the futures until after the loop, is known to be unsafe. Not because of this weird behavior, which might still be a bug, but because it's storing a future with no handler across an asynchronous gap. If that gap takes more time than expected, and a future completes with an error, that error becomes unhandled because the handler isn't added until after the loop completes. (Which is why the error becomes uncaught in the 109-file case where the loop takes too long to complete). The code should at least do .ignore() on the future before storing it.

The heavily instrumented test code I used:

import 'dart:async';
import 'dart:io';

final Stopwatch sw = Stopwatch();
void log(String text) {
  var e = sw.elapsedMilliseconds;
  print("(${"$e".padLeft(3)}): $text");
}

// Args[0] is number of temporary files.
// 45+: One undeleted file
//   - 60: No undeleted files
// 78+: Two undeleted files
//   - 89, 108: Stalls in `list`
// 109+: Three undeleted files
//   - 121, 124: Stalls in `list`
// 140+: Four undeleted files
//   - 152, 153: Stalls in `list`
// 172+: Five undeleted files
//   - 184, 185: Stalls in `list`
// 204+: Six undeleted files
void main(List<String> args) async {
  // Safety wrapper around real main, to catch uncaught errors
  // and exit if stalled.
  var exitCode = await runZonedGuarded(() => _main(args), (e, s) {
    log("Uncaught error: $e\n$s");
  }, zoneValues: {#_ez: "Own error zone"})!
      .timeout(Duration(milliseconds: 500), onTimeout: () {
    log("Stalled, exiting");
    throw exit(0);
  });
  exit(exitCode);
}

Future<int> _main(List<String> args) async {
  var exitCode = 0;
  var fileCount = 45;
  if (args.isNotEmpty) fileCount = int.tryParse(args.first) ?? fileCount;

  sw.start();

  log("Creating named pipe or file");
  final (pipeDir, fifo) = await createNamedPipe();
  log("Listen to named pipe or file");

  final openRead = fifo.openRead().listen((v) {
    log("FIFO EVENT: $v");
  }, onError: (e, s) {
    log("FIFO ERROR: $e");
  }, onDone: () {
    log("FIFO DONE");
  });
  final openWrite = await fifo.openWrite(mode: FileMode.write);

  final dir = await createTmpFiles(fileCount);

  try {
    await deleteFiles(dir, fileCount);
  } on ParallelWaitError<List<void>, List<Object?>> catch (e) {
    exitCode = 1;
    log('Timeout while deleting files: ${e.errors.nonNulls.length}');
    var errors = e.errors;
    for (var i = 0; i < errors.length; i++) {
      var error = errors[i];
      if (error != null) print("     : Caught error #$i: ${error}");
    }
    for (var entity in dir.listSync()) {
      log("Surviving file: ${entity.path}");
    }
  } catch (e) {
    exitCode = 1;
    log("Unexpected error: $e");
  } finally {
    log("Deleting temporary file directory");
    try {
      await dir.delete(recursive: true);
      log("Deleted directory");
      if (dir.existsSync()) print("   Not successfully?");
    } catch (e) {
      log("Unexpected error: $e");
    }
  }
  log("Cancelling subscription");

  openWrite.close();
  await (openRead.cancel());
  log("Subscription cancelled");
  await pipeDir.delete(recursive: true);
  log("Pipe directory deleted");
  return exitCode;
}

Future<(Directory, File)> createNamedPipe() async {
  final dir = await Directory.systemTemp.createTemp();
  var fifo = File('${dir.path}/fifo');
  await Process.run('mkfifo', [fifo.path]);
  print("CREATED FIFO: ${fifo.path}");
  return (dir, fifo);
}

Future<Directory> createTmpFiles(int count) async {
  log("Creating temporary files");
  final dir = await Directory.systemTemp.createTemp();
  log("Created temporary file directory: ${dir.path}");
  for (var i = 0; i < count; i++) {
    var file = File('${dir.path}/file${"$i".padLeft(4, "0")}');
    await file.create();
    log("Created file: ${file.path}");
  }
  log("Created temporary files");
  return dir;
}

Future<void> deleteFiles(Directory dir, int fileCount,
    [int timeout = 200]) async {
  var t0 = sw.elapsedMilliseconds;
  log("Deleting temporary files");
  List<Future<void>> futures = [];
  var dur = Duration(milliseconds: timeout);
  log("Listing temporary files");
  int i = 0;
  int completed = 0;
  await for (var entity in dir.list()) {
    if (entity is File) {
      var index = i++;
      log("Deleting #$index: ${entity.path}");
      var e0 = sw.elapsedMilliseconds - t0;
      if (e0 > timeout) log("Still listing files after $e0 ms");
      futures.add(
        entity.delete().then((v) {
          completed++;
          var e1 = sw.elapsedMilliseconds - t0;
          log("Deleted #$index: ${entity.path}${e1 > timeout ? " too late($e1)" : ""}");
          return v;
        }).timeout(dur, onTimeout: () {
          var e2 = sw.elapsedMilliseconds - t0;
          log("Timeout deleting file #$index of $fileCount: ${entity.path} after $e2 ms");
          var zone = Zone.current[#_ez] ??
              (identical(Zone.current, Zone.root) ? "Root" : "Unknown");
          throw TimeoutException(
              "in $zone: Deleting file #$index of $fileCount: ${entity.path}");
        })
          ..ignore(),
      );
    }
  }
  log("All deletes initiated, $completed of $i already complete.");
  await futures.wait;
  log("Done deleting temporary files");
}
cbenhagen commented 5 months ago

@lrhn thanks for your analysis! Just wanted to note that replacing the .delete() with .length() shows the same symptoms.

Quoting @mraleph from Discord:

I think dart:io does wrong thing here - it most likely just tries to read this pipe using a blocking IO and exhausts the thread pool that is used for this. Instead it should check if the file you are opening is a pipe and do async IO instead

lrhn commented 5 months ago

If lenght has the same problem, then it's not due to concurrent updates of the directory structure. More curioser!

cbenhagen commented 3 months ago

@brianquinlan is there anything I can do to help fix this?

cbenhagen commented 2 months ago

@lrhn any ideas what I could do to help?

lrhn commented 2 months ago

No clue, sorry. This is in the file system code somewhere, but that's not my area of expertise. Throttle your code to not have many simultaneous operations, perhaps.

cbenhagen commented 2 months ago

@lrhn ok thanks. Who would know that code?

brianquinlan commented 2 months ago

I'm responsible for that code and this is on my plate but I won't get to it for a while.

Would it be possible to reduce that number of parallel I/O operations that you are doing as a work-around?

cbenhagen commented 2 months ago

@brianquinlan I need a really fast way to recursively get file names and sizes of a huge amount of files. I am not sure how to achieve that effectively without parallelizing I/O. The odd thing here is that everything works fine until I open a named pipe in an unrelated part of the code. I am happy for every idea for a workaround as well as pointers on how I could help you pin down the actual cause of this odd behaviour.

mraleph commented 2 months ago

Let me explain what is happening here: async IO operations which do not fit into a normal epoll/kqueue model (which handles working with sockets) are performed in a thread pool. For example, when you try to read from a file a task to read from a file is put into the thread pool and when it completes the message is passed back to Dart with the data. This pool is organized through native ports created and managed by IOService. These ports are created dynamically as needed but we cap their number at 32 to avoid spawning too many auxiliary threads. If we reach the limit and all ports are busy we pick up an existing port using requestId % maxPorts formula. This causes the following problem: if the port we picked up is busy with an operation which never completes all subsequent requests on the same port will get stuck. In the code above the operation which never completes is the one which is trying to read from a pipe.

So there are two issues to fix here:

  1. the way _IOService.dispatch / _IOServicePorts.getPort is written means that a long IO operation can be delaying (possibly indenfinetely) other IO operations which just happened to be allocated to the same port. This can happen even if there are other ports ready to serve the request. There are two way to fix it: quick'n'dirty and proper. Quick'n'dirty is to change getPort to wait until some port to be free instead of picking an existing port. This will introduce higher latency in the handling of IO operations though (because it requires the response to arrive before port can start processing the next operation). A much better but more involved way to fix this is to stop doing this dance on the Dart side and instead fix Dart_NewNativePort: it should just do the actual work balancing and thread pool management inside, instead of requiring the sender to do that.
  2. when you do openRead on a File you get an instance of _FileStream. This should be special-cased for pipes to create a _Socket._readPipe instead (compare to _getStdioInputStream). I also noticed that Pipe class which was introduced to represent anonymous pipes has a similar problem - it uses _FileStream instead of more appropriate Socket constructors.

To fix this concrete manifestation of the problem it is enough to simply address the second issue - the fix should be rather straightforward (probably just copy what _getStdioInputStream does). But ultimately we should look at fixing 1 as well - the current design is suboptimal.

brianquinlan commented 2 months ago

@mraleph - you obsoleted my writeup. Just be a explicit: POSIX open() on a FIFO blocks until the other end is opened

I was planning on doing the (1) Quick'n'dirty approach (though, in my mind, I wasn't labelling that). I think that this will only increase latency if all 32 threads are going used, which I'd expect to be unusual. Could you explain a bit more what your non-hacky approach would look like?

brianquinlan commented 2 months ago

My naive implementation: https://dart-review.googlesource.com/c/sdk/+/381806?tab=checks

causes standalone/io/many_file_operations_test to go from 0.268s to 1m6.496s. The test is:

import "dart:io";

main() {
  for (var i = 0; i < 10000; i++) {
    File f = new File("xxx");
    f.exists().then(print);
  }
}
brianquinlan commented 2 months ago

I modified my approach to reduce latency. Lime is down to 1.547s for many_file_operations_test

Unfortunately, many tests don't pass (yet).

mraleph commented 2 months ago

@brianquinlan I had restless fingers so I spent some time investigating how my other proposal (e.g. fix Dart_NewNativePort implementation) could look like. Here is the CL. Overall, it does not look too bad. There is a bunch of refactorings and/or small tweaks mixed into the CL - which can be split out, so overall CL size is much smaller then it needs to be. The only real wart which I have discovered is the semantics of Dart_CloseNativePort API and associated MessageHandler::RequestDeletion method. It makes MessageHandler deletion asynchronous and translating that into asynchronous shutdown/deletion of a thread pool required some tweaks.

brianquinlan commented 2 months ago

Cool!

Building Dart in debug mode:

$ # HEAD
$ time xcodebuild/DebugARM64/dart tests/standalone/io/many_file_operations_test.dart >/dev/null
real    0m0.806s
...
$ # mraleph patch
$ time xcodebuild/DebugARM64/dart tests/standalone/io/many_file_operations_test.dart >/dev/null
real 0m1.318s
...
$ # bquinlan patch
$ time xcodebuild/DebugARM64/dart tests/standalone/io/many_file_operations_test.dart >/dev/null
real 0m1.547s
...

@mraleph Do you understand why your patch would be slower than the baseline?

mraleph commented 2 months ago

@brianquinlan What I see is that baseline seems to have regressed at some point. If I measure timings using 810301045d700c9f40406c2c5a46d82ac487ffb9 as baseline I see ~1.5s both with and without my patch. Furthermore, it seems most of the time is spent somewhere else and not in the IO. Consider slightly adjusted test:

main() async {
  final sw = Stopwatch()..start();
  await Future.wait([
    for (var i = 0; i < 10000; i++) File("xxx").exists().then((result) => null)
  ]);
  print(sw.elapsedMilliseconds);
}

This prints:

$ xcodebuild/DebugARM64/dart --trace-shutdown tests/standalone/io/many_file_operations_test.dart
[+62ms] SHUTDOWN: Shutdown starting for group kernel-service
[+62ms] SHUTDOWN: Notifying isolate group shutdown (kernel-service)
[+62ms] SHUTDOWN: Done Notifying isolate group shutdown (kernel-service)
[+62ms] SHUTDOWN: Done shutdown for group kernel-service
197
[+1518ms] SHUTDOWN: Shutdown starting for group main
...

The actual IO completes in 197 ms and then crickets.

Looking at it now.

mraleph commented 2 months ago

It seems that the actual time is spent prior to main in compilation. When I benchmarked I simply build dart target - which does not rebuild kernel-service.dart.snapshot. If dart fails to load kernel-service from snapshot then it simply runs it from embedded Kernel blob - which leads to 1 second discrepancy. Redoing benchmarks with properly rebuilt stuff:

$ rm xcodebuild/DebugARM64/{dart,kernel-service.dart.snapshot} && tools/build.py -nvh -a arm64 -m debug dart kernel-service.dart.snapshot

Does not reveal any large regression from my patch.

brianquinlan commented 2 months ago

Awesome! Let me know if I can do anything to help you land this.

mraleph commented 2 months ago

Awesome! Let me know if I can do anything to help you land this.

I am a bit busy this week (and the start of the next week), but I will split it up and upload for review next time I have a chance.

mraleph commented 2 months ago

The first half of the problem is now fixed by 5a32d8b. Opening a pipe like that no longer causes unrelated operations to hang.

The program overall still hangs however - because there is a pending IO operation which never completes (there is no builtin way to cancel such operations).

We need to fix our IO implementation to actually use async IO for pipes, so I will keep this issue open.

mraleph commented 2 months ago

(Also if you open enough pipes you will still saturate available thread pool for IO tasks - so the problem is only kinda fixed)

brianquinlan commented 1 month ago

Can we call this a P3 since this issue is mostly mitigated?

mraleph commented 1 month ago

Well, it is somewhat mitigated... If the code opens >=32 pipes for reading things will get stuck again. So it would be nice to make a patch to fix the pipe implementation which should be fairly trivial (famous last words).

a-siva commented 1 week ago

Downgrading to P3 based on comment above.