aws / aws-sdk-java-v2

The official AWS SDK for Java - Version 2
Apache License 2.0
2.17k stars 840 forks source link

`S3TransferManager` / `S3AsyncClient` does not seem to use `SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR`'s `Executor`. #5341

Closed neverendingqs closed 1 day ago

neverendingqs commented 3 months ago

Describe the bug

I think CompletableFutures seems to always be on or reverts to the default shared ForkJoinPool even if an Executor is provided via FUTURE_COMPLETION_EXECUTOR when making S3TransferManager calls.

Expected Behavior

If provided, CompletableFutures run on the Executor provided via FUTURE_COMPLETION_EXECUTOR when making S3TransferManager calls.

Current Behavior

CompletableFutures uses or reverts back to the default shared ForkJoinPool even if an Executor is provided via .futureCompletionExecutor() when making S3TransferManager calls.

Reproduction Steps

Disclaimer: this is the first time I've worked with Executors and CompletableFutures.

On 2.20.160 of the SDK (SDK version because it currently runs on EMR 6.15.0), when attempting something like

// https://github.com/aws/aws-sdk-java-v2/blob/501e37cea262a4c1e676f0fcceb1ff09e9a07282/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java#L487-L499
int processors = Runtime.getRuntime().availableProcessors();
int corePoolSize = Math.max(8, processors);
int maxPoolSize = Math.max(64, processors * 2);
int keepAliveTime = 10;
TimeUnit keepAliveTimeUnit = TimeUnit.SECONDS;

ThreadPoolExecutor futureCompletionExecutor = new ThreadPoolExecutor(
    corePoolSize,
    maxPoolSize,
    keepAliveTime,
    keepAliveTimeUnit,
    new LinkedBlockingQueue<>(1_000),
    new ThreadFactoryBuilder()
        .threadNamePrefix("s3-transfer-manager-s3-async-client").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()   // vs default of `ThreadPoolExecutor.AbortPolicy`
);
futureCompletionExecutor.allowCoreThreadTimeOut(true);

S3AsyncClient s3AsyncClient = S3AsyncClient.builder()
    .asyncConfiguration(
        ClientAsyncConfiguration.builder()
            .advancedOption(
                SdkAdvancedAsyncClientOption.FUTURE_COMPLETION_EXECUTOR,
                futureCompletionExecutor
            )
            .build()
    )
    // "UnsupportedOperationException: Multipart download is not yet supported. Instead use the CRT based S3 client for multipart download."
    // https://github.com/aws/aws-sdk-java-v2/blob/7fd8ea197ede83a663d6cd045ec83bd6047c8633/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java#L112-L118
    // https://github.com/aws/aws-sdk-java-v2/blob/7fd8ea197ede83a663d6cd045ec83bd6047c8633/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/TransferManagerFactory.java#L76
    // .multipartEnabled(true)
    .build();

S3TransferManager s3TransferManager = S3TransferManager.builder()
    .s3Client(s3AsyncClient)
    .build();

// ...

DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder()
    .getObjectRequest(req -> req.bucket(bucket).key(key))
    .destination(filePath.toFile())
    .build();

FileDownload downloadStatus = s3TransferManager.downloadFile(downloadFileRequest);
downloadStatus.completionFuture().join();   // throws here

in production code that attempts to do many S3TransferManager calls, I get

java.util.concurrent.RejectedExecutionException: Thread limit exceeded replacing blocked worker
    at java.util.concurrent.ForkJoinPool.tryCompensate(ForkJoinPool.java:2011)
    at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3310)
    at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)
    at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)
    ...

even though the executor is a ThreadPoolExecutor and RejectedExecutionHandler is set to ThreadPoolExecutor.CallerRunsPolicy.

Local testing on some scratch code shows that the CompletableFuture<CompletedFileDownload> object has stack.executor as null, and calling CompletableFutureUtils.forwardResultTo(sdkFuture, customFuture, futureCompletionExecutor); changes stack.executor to the Executor I provided. This seems to suggest the SDK was using the default ForkJoinPool Executor before I changed it via CompletableFutureUtils.forwardResultTo().

Scratch code (SDK: 2.26.9, software.amazon.awssdk.crt.aws-crt: 0.29.20):

// https://github.com/aws/aws-sdk-java-v2/blob/501e37cea262a4c1e676f0fcceb1ff09e9a07282/core/sdk-core/src/main/java/software/amazon/awssdk/core/client/builder/SdkDefaultClientBuilder.java#L487-L499
int processors = Runtime.getRuntime().availableProcessors();
int corePoolSize = Math.max(8, processors);
int maxPoolSize = Math.max(64, processors * 2);
int keepAliveTime = 10;
TimeUnit keepAliveTimeUnit = TimeUnit.SECONDS;

ThreadPoolExecutor futureCompletionExecutor = new ThreadPoolExecutor(
    corePoolSize,
    maxPoolSize,
    keepAliveTime,
    keepAliveTimeUnit,
    new LinkedBlockingQueue<>(1_000),
    new ThreadFactoryBuilder()
        .threadNamePrefix("s3-transfer-manager-s3-async-client").build(),
    new ThreadPoolExecutor.CallerRunsPolicy()   // vs default of `ThreadPoolExecutor.AbortPolicy`
);
futureCompletionExecutor.allowCoreThreadTimeOut(true);

try (
    S3AsyncClient s3AsyncClient = S3AsyncClient.crtBuilder()
        .credentialsProvider(
            ProfileCredentialsProvider.builder()
                .profileName("<my-profile>")
                .build())
        .futureCompletionExecutor(futureCompletionExecutor)
        .region(Region.US_EAST_1)
        .build();

    S3TransferManager tm = S3TransferManager.builder()
        .s3Client(s3AsyncClient)
        .build();
) {

    DownloadFileRequest downloadFileRequest = DownloadFileRequest.builder()
        .getObjectRequest(b -> b.bucket("<my-bucket>").key("<my-key>"))
        .destination(Paths.get("C:\\Users\\<user>\\Downloads\\<filename>"))
        .build();

    FileDownload downloadFile = tm.downloadFile(downloadFileRequest);

    CompletableFuture<CompletedFileDownload> completedFileDownloadCompletableFuture = downloadFile.completionFuture();

    // Breakpoint: `completedFileDownloadCompletableFuture`'s `stack.executor` is `null`
    CompletableFuture<CompletedFileDownload> forwardWithExecutorFuture = new CompletableFuture<>();
    CompletableFutureUtils.forwardResultTo(completedFileDownloadCompletableFuture, forwardWithExecutorFuture, futureCompletionExecutor);

    // Breakpoint: `completedFileDownloadCompletableFuture`'s `stack.executor` is `ThreadPoolExecutor`
    CompletableFuture<CompletedFileDownload> forwardWithoutExecutorFuture = new CompletableFuture<>();
    CompletableFutureUtils.forwardResultTo(forwardWithExecutorFuture, forwardWithoutExecutorFuture);

    // Breakpoint: `forwardWithExecutorFuture`'s `stack.executor` is `null`
    CompletedFileDownload downloadResult1 = completedFileDownloadCompletableFuture.join();
}

Possible Solution

No response

Additional Information/Context

JDK and SDK versions are provided in-line under Reproduction Steps.

Versions for the fields below will be based on production, which is using EMR 6.15.0.

AWS Java SDK version used

2.20.160

JDK version used

Amazon Corretto 8

Operating System and version

Amazon Linux 2.0.20240610.1

Moloch-az commented 3 months ago

I've seen this behaviour too. If you have many parallel transfers inititated (more than your CPU count), and need concurrency, you may need to raise java.util.concurrent.ForkJoinPool.common.parallelism.

zoewangg commented 3 weeks ago

Hi @neverendingqs , could you provide the following information:

github-actions[bot] commented 2 weeks ago

It looks like this issue has not been active for more than five days. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please add a comment to prevent automatic closure, or if the issue is already closed please feel free to reopen it.

zoewangg commented 2 weeks ago

Hi @neverendingqs I've reviewed the code. It seems you are using parallel streams to invoke concurrent requests. By default, parallel streams use ForkJoinPool. If the behavior is not desired, I'd suggest using a custom executor to submit those requests instead.

StreamSupport.stream(var.spliterator(), true)
github-actions[bot] commented 5 days ago

It looks like this issue has not been active for more than five days. In the absence of more information, we will be closing this issue soon. If you find that this is still a problem, please add a comment to prevent automatic closure, or if the issue is already closed please feel free to reopen it.