eclipse-vertx / vert.x

Vert.x is a tool-kit for building reactive applications on the JVM
http://vertx.io
Other
14.32k stars 2.08k forks source link

FileSystem#readDir is not cancelled when circuit breaker timeout kicks in and blocks further readDir calls #5250

Open Mulgish opened 4 months ago

Mulgish commented 4 months ago

Version

Vert.x 4.5.7

Context

Vert.x circuit breaker does not cancel FileSystem#readDir if it takes too long to execute. Furthermore, FileSystem#readDir which is running in the background will block any further FileSystem#readDir operations until the original one completes:

2024-07-10 11:35:37 WARN traceId=, parentId=, spanId=, sampled= [io.ve.co.im.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-internal-blocking-1,5,main] has been blocked for 127516 ms, time limit is 60000 ms: io.vertx.core.VertxException: Thread blocked
at java.base/java.io.UnixFileSystem.canonicalize0(Native Method)
at java.base/java.io.UnixFileSystem.canonicalize(UnixFileSystem.java:166)
at java.base/java.io.File.getCanonicalPath(File.java:626)
at io.vertx.core.file.impl.FileSystemImpl$15.perform(FileSystemImpl.java:1050)
at io.vertx.core.file.impl.FileSystemImpl$15.perform(FileSystemImpl.java:1022)
at io.vertx.core.file.impl.FileSystemImpl$BlockingAction.handle(FileSystemImpl.java:1174)
at io.vertx.core.file.impl.FileSystemImpl$BlockingAction.handle(FileSystemImpl.java:1156)
at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$1(ContextImpl.java:191)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
at io.vertx.core.impl.ContextImpl.lambda$internalExecuteBlocking$2(ContextImpl.java:210)
at io.vertx.core.impl.TaskQueue.run(TaskQueue.java:76)
at org.jboss.threads.ContextHandler$1.runWith(ContextHandler.java:18)
at org.jboss.threads.EnhancedQueueExecutor$Task.doRunWith(EnhancedQueueExecutor.java:2516)
at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2495)
at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1521)
at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:11)
at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:11)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:1583)

I am also not sure if there is a more efficient way to list files in a directory. I did not find usages of DirectoryStream in Vert.x API, so perhaps this should be a separate feature request?

Do you have a reproducer?

Below reproducer can run in a standard Quarkus archerype with Vert.x binding for Mutiny added:

        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-mutiny-vertx-core</artifactId>
        </dependency>
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-mutiny-vertx-circuit-breaker</artifactId>
        </dependency>
package com.bla;

import io.quarkus.test.junit.QuarkusTest;
import io.smallrye.mutiny.Uni;
import io.vertx.circuitbreaker.CircuitBreakerOptions;
import io.vertx.circuitbreaker.TimeoutException;
import io.vertx.mutiny.circuitbreaker.CircuitBreaker;
import io.vertx.mutiny.core.Vertx;
import io.vertx.mutiny.core.file.FileSystem;
import jakarta.inject.Inject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;

import java.util.UUID;
import java.util.stream.Stream;

@QuarkusTest
public class FsTestIT {

    //Pointing below dir to an NFS share or other slow disk should help reproduce the issue
    private static final String TMP_VERTX_TEST = "/tmp/vertx-test/";
    private static final String TMP_VERTX_TEST_VERY_LARGE_DIR = "/tmp/vertx-test/veryLargeDir/";
    private static final String TMP_VERTX_TEST_SMALL_DIR = "/tmp/vertx-test/smallDir/";

    @Inject
    Vertx vertx;

    @Test
    @DisplayName("Reproducer for directory listing issue")
    public void fsReadDirTest() {
        var circuitBreaker = CircuitBreaker.create("fs-operation-breaker", vertx,
            new CircuitBreakerOptions()
                //Adjust timeouts as needed by your machine
                .setTimeout(200)
                .setFailuresRollingWindow(5000));

        FileSystem fs = vertx.fileSystem();

        System.out.println("Creating directories...");
        fs.deleteRecursive(TMP_VERTX_TEST, true)
            .onFailure().recoverWithNull()
            .await().indefinitely();
        fs.mkdirAndAwait(TMP_VERTX_TEST);
        fs.mkdirAndAwait(TMP_VERTX_TEST_VERY_LARGE_DIR);
        fs.mkdirAndAwait(TMP_VERTX_TEST_SMALL_DIR);

        System.out.println("Creating test files...");
        //Create 100k files
        var fileCreationUnis = Stream.generate(() -> UUID.randomUUID().toString())
            //Adjust file count as needed to hit timeout issue
            .limit(100000)
            .map(fileName -> fs.createFile(TMP_VERTX_TEST_VERY_LARGE_DIR + fileName))
            .toList();

        Uni.combine().all().unis(fileCreationUnis)
            .usingConcurrencyOf(10)
            .discardItems().await().indefinitely();

        System.out.println("Starting test...");

        var responseUni = circuitBreaker.execute(
                //Try to read a large dir with a circuit breaker
                fs.readDir(TMP_VERTX_TEST_VERY_LARGE_DIR)
                    .onItem().transform(it -> "Read " + it.size() + " files")
            )
            .onFailure(TimeoutException.class).recoverWithItem("Directory is too big. Please try adding some file filtering.");

        //Circuit breaker should time out and return a message, which is true,
        //but the fs operation still continues on vert.x-internal-blocking thread
        var response1 = responseUni.await().indefinitely();
        Assertions.assertTrue(response1.contains("Directory is too big."));

        var responseUni2 = circuitBreaker.execute(
                fs.readDir(TMP_VERTX_TEST_SMALL_DIR)
                    .onItem().transform(it -> "Read " + it.size() + " files")
            )
            .onFailure(TimeoutException.class).recoverWithItem("Directory is too big. Please try adding some filtering.");

        //Circuit breaker should not be triggered and the operation should complete successfully,
        //but the previous fs operation still continues on vert.x-internal-blocking thread and blocks this call
        var response2 = responseUni2.await().indefinitely();

        //!!!! Below assertion will fail because the previous fs operation is still running !!!!
        Assertions.assertTrue(response2.contains("Read"));
    }
}

Steps to reproduce

  1. Run Filesystem.readDir agains a large directory with slow filesystem
  2. Apply agressive timeout policy using vertx circuit breaker
  3. Run Filesystem.readDir on a smaller folder and observe the call being blocked
  4. Observe original Filesystem.readDir to be running in the background

Extra

Please note that the reproducer runs on Quarkus and Mutiny bindings are used for Vertx operations.

openjdk version "21.0.3" 2024-04-16 OpenJDK Runtime Environment (Red_Hat-21.0.3.0.9-1) (build 21.0.3+9) OpenJDK 64-Bit Server VM (Red_Hat-21.0.3.0.9-1) (build 21.0.3+9, mixed mode, sharing)

Linux 5.15.146.1-microsoft-standard-WSL2 #1 SMP Thu Jan 11 04:09:03 UTC 2024 x86_64 GNU/Linux

tsegismont commented 4 months ago

Thanks for your report.