opensource4you / astraea

釋放kafka的無限潛能
Apache License 2.0
125 stars 45 forks source link

[FS] LocalFileSystem.listFolders 不會釋放開啟的目錄 #1801

Closed chaohengstudent closed 1 year ago

chaohengstudent commented 1 year ago

Importer 在使用 local 的 file system 時會因為 Files.list() 產生 Too many open files 錯誤

[2023-06-01 08:29:06,148] ERROR [test-importer|task-0] WorkerSourceTask{id=test-importer-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:210)
2023-06-01T08:29:06.149570857Z java.io.UncheckedIOException: java.nio.file.FileSystemException: /tmp/Exporter/TestTopic/0: Too many open files
2023-06-01T08:29:06.149574109Z  at org.astraea.common.Utils.packException(Utils.java:127)
2023-06-01T08:29:06.149576425Z  at org.astraea.fs.local.LocalFileSystem.listFolders(LocalFileSystem.java:76)
2023-06-01T08:29:06.149578450Z  at org.astraea.fs.local.LocalFileSystem.listFiles(LocalFileSystem.java:65)
2023-06-01T08:29:06.149590722Z  at org.astraea.connector.backup.Importer$Task.getFileSet(Importer.java:215)
2023-06-01T08:29:06.149592902Z  at org.astraea.connector.backup.Importer$Task.take(Importer.java:160)
2023-06-01T08:29:06.149594776Z  at org.astraea.connector.SourceTask.poll(SourceTask.java:61)
2023-06-01T08:29:06.149596670Z  at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:457)
2023-06-01T08:29:06.149598555Z  at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:351)
2023-06-01T08:29:06.149600388Z  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
2023-06-01T08:29:06.149602132Z  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
2023-06-01T08:29:06.149603906Z  at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75)
2023-06-01T08:29:06.149605849Z  at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
2023-06-01T08:29:06.149607702Z  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
2023-06-01T08:29:06.149609468Z  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
2023-06-01T08:29:06.149611271Z  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
2023-06-01T08:29:06.149613049Z  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
2023-06-01T08:29:06.149615159Z  at java.base/java.lang.Thread.run(Thread.java:833)
2023-06-01T08:29:06.149616923Z Caused by: java.nio.file.FileSystemException: /tmp/Exporter/TestTopic/0: Too many open files
2023-06-01T08:29:06.149618709Z  at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:100)
2023-06-01T08:29:06.149620502Z  at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:106)
2023-06-01T08:29:06.149622987Z  at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
2023-06-01T08:29:06.149624926Z  at java.base/sun.nio.fs.UnixFileSystemProvider.newDirectoryStream(UnixFileSystemProvider.java:440)
2023-06-01T08:29:06.149626749Z  at java.base/java.nio.file.Files.newDirectoryStream(Files.java:482)
2023-06-01T08:29:06.149628527Z  at java.base/java.nio.file.Files.list(Files.java:3792)
2023-06-01T08:29:06.149630265Z  at org.astraea.fs.local.LocalFileSystem.lambda$listFolders$4(LocalFileSystem.java:76)
2023-06-01T08:29:06.149632033Z  at org.astraea.common.Utils.packException(Utils.java:125)
2023-06-01T08:29:06.149633792Z  ... 16 more

根據 doc

API Note: This method must be used within a try-with-resources statement or similar control structure to ensure that the stream's open directory is closed promptly after the stream's operations have completed

因此我的想法是將這段程式碼包裝在 try-with-resources,想請教是否合理 https://github.com/skiptests/astraea/blob/69c29a37021bd27ddb1fdcaf1d5c1a113d2aacea/fs/src/main/java/org/astraea/fs/local/LocalFileSystem.java#L76-L86

chia7712 commented 1 year ago

這是在 local fs 才會遇到的問題嗎

chaohengstudent commented 1 year ago

這是在 local fs 才會遇到的問題嗎

是的,其他檔案系統的連線數只會跟隨 maxTasks 的數量增加,也會在 connector 關閉時結束連線,只有 local 就算只有一個 task 也會耗盡所有連線數

chia7712 commented 1 year ago

或許我們要考慮將 list相關操作改成回傳Stream,避免真的一次開啟大量的檔案/目錄

你覺得呢?

chaohengstudent commented 1 year ago

有道理,那我再發 Pr 調整

chaohengstudent commented 1 year ago

避免真的一次開啟大量的檔案/目錄

在做一些修改後,我覺得錯誤應該不是因為一次開啟大量檔案造成的, 每次的listFiles()listFolder() 都只會開啟當下的目錄並且應該要在回傳子目錄的字串的 List 後就直接關閉,

主要是因為在使用到 Files.list()"在回傳子目錄的字串後就直接關閉" 這段沒有被執行,導致 task 在多次 poll 後累積了大量開啟的目錄從未關閉。

我的想法是需要透過 try-with-resource 讓這個目錄回傳字串的 List 後在流結束時關閉。

private synchronized List<String> listFolders(String path, boolean requireFile) {
    var folder = resolvePath(path);
    if (!Files.isDirectory(folder)) throw new IllegalArgumentException(path + " is not a folder");
    // 確保資源在流完成時關閉
    try (var paths = Utils.packException(() -> Files.list(folder))) {
      return paths
          .filter(f -> requireFile ? Files.isRegularFile(f) : Files.isDirectory(f))
          .map(Path::toAbsolutePath)
          .map(
              p ->
                  Path.of("/")
                      .resolve(
                          root.map(r -> p.subpath(Path.of(r).getNameCount(), p.getNameCount()))
                              .orElse(p))
                      .toString())
          .collect(Collectors.toList());
    }
  }

若是直接回傳 Stream 會讓這些資源需要在外部額外各自去關閉,所以仍然使用 List 不知道是否有其他能修改的方向。

chia7712 commented 1 year ago

我的想法是需要透過 try-with-resource 讓這個目錄回傳字串的 List 後在流結束時關閉。

這個方法可以試試看