awspring / spring-cloud-aws

The New Home for Spring Cloud AWS
http://awspring.io
Apache License 2.0
834 stars 286 forks source link

Add possibility to use Virtual Threads in MessageListenerContainerFactory #924

Open lazystone opened 9 months ago

lazystone commented 9 months ago

I tried to configure SqsMessageListenerContainerFactory to use VirtualThreadTaskExecutor:

    @Bean
    SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFactory(SqsAsyncClient sqsAsyncClient) {
        return SqsMessageListenerContainerFactory.builder()
                .configure(options -> options.componentsTaskExecutor(
                                new VirtualThreadTaskExecutor("sqs-message-listener-container-components-"))
                        .acknowledgementResultTaskExecutor(new VirtualThreadTaskExecutor(
                                "sqs-message-listener-container-acknowledgement-result-")))
                .sqsAsyncClient(sqsAsyncClient)
                .build();
    }

But this does not work:

Caused by: java.util.concurrent.CompletionException: io.awspring.cloud.sqs.UnsupportedThreadFactoryException: Custom TaskExecutors must use a MessageExecutionThreadFactory.
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) ~[?:?]
    at java.base/java.lang.VirtualThread.run(VirtualThread.java:311) ~[?:?]
Caused by: io.awspring.cloud.sqs.UnsupportedThreadFactoryException: Custom TaskExecutors must use a MessageExecutionThreadFactory.
    at io.awspring.cloud.sqs.listener.AbstractPipelineMessageListenerContainer.verifyThreadType(AbstractPipelineMessageListenerContainer.java:146) ~[spring-cloud-aws-sqs-3.0.2.jar:3.0.2]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1804) ~[?:?]

Would it be possible to relax this somehow, that VirtualThreadTaskExecutor can be used?

maciejwalkowiak commented 9 months ago

Sounds like something we should do.

tomazfernandes commented 9 months ago

Well, this restriction is there for a reason :)

I'll need to give some thought on how to handle this properly.

tomazfernandes commented 5 months ago

Hi everyone. Just to share some of the issues here.

The Thread Hopping Phenomenon

The threading model this solution uses is - when we receive a batch of messages, we'll send them through the MessageSink component and use the provided or default TaskExecutor to create threads that will process the messages.

The problem is - if at any point in message execution such as interceptor, the message listener itself, or error handling returns a CompletableFuture and the thread has been changed, we'll be using the returned thread instead of the original one.

To give a simple example - one might have non-blocking Interceptor that will use an async client from AWS SDK to e.g. fetch something from S3 and add to the message, and returns a completable future from the interceptor.

The execution will continue on the AsyncS3Client thread, and if the listener is a blocking listener, it will quickly starve the S3Client's threads leading to severely decreased throughput and irresponsive client.

So the way I got around this was creating our own ThreadFactory - if the component execution returns on a different thread, we hop back into one of ours. This guarantees the user won't run into this problem which would be likely really difficult for the user to trace back.

Enter Virtual Threads

I'm confident we can do something similar with virtual threads - we should be able to simply check when we return that we are in a Virtual Thread, and if we're not, we can hop back into one.

I haven't had a chance to look into how other Spring projects managed to include Virtual Thread support without changing the Java baseline from Java 17, so we can simply check the thread type. We could of course do a workaround checking the class name or something similar, but hopefully they have something better than this.

Conclusion

So there you have it - if anyone knows how other Spring projects support works for Virtual Threads, I'd be happy to collaborate to add this to the project.

Thanks.

jhinch-at-atlassian-com commented 1 month ago

spring-framework 6.1+ handles this by using a multirelease JAR

tomazfernandes commented 1 month ago

Thanks for the info @jhinch-at-atlassian-com!

Seems like we'd need this multi-target approach, I wonder if this is possible with Maven as well?

If we can use a similar approach, we might be able to lift the MessageExecutionThreadFactory restriction and check if the thread is Virtual instead, to hop back on it in case the thread returned from SqsAsyncClient is not virtual.