spring-cloud / spring-cloud-stream

Framework for building Event-Driven Microservices
http://cloud.spring.io/spring-cloud-stream
Apache License 2.0
1.01k stars 614 forks source link

Align sending kafka events from virtual threads #2923

Closed wyhasany closed 8 months ago

wyhasany commented 8 months ago

Sending kafka events from virtual thread causes pinning:

Thread[#83,ForkJoinPool-1-worker-6,5,CarrierThreads]
    java.base/java.lang.VirtualThread$VThreadContinuation.onPinned(VirtualThread.java:183)
    java.base/jdk.internal.vm.Continuation.onPinned0(Continuation.java:393)
    java.base/java.lang.VirtualThread.parkNanos(VirtualThread.java:621)
    java.base/java.lang.System$2.parkVirtualThread(System.java:2652)
    java.base/jdk.internal.misc.VirtualThreads.park(VirtualThreads.java:67)
    java.base/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:267)
    java.base/java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1866)
    java.base/java.util.concurrent.ForkJoinPool.unmanagedBlock(ForkJoinPool.java:3780)
    java.base/java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3725)
    java.base/java.util.concurrent.CompletableFuture.timedGet(CompletableFuture.java:1939)
    java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2095)
    org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:180)
    org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:413)
    org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:387)
    org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:364)
    org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:197)
    org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:96)
    org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:297)
    org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:102)
    org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:153)
    org.springframework.cloud.stream.binding.BindingService.doBindProducer(BindingService.java:353)
    org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.java:294)
    org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:272) <== monitors:1
    org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:168)
    org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:147)
    org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:142)

I used ReentrantLock to avoid pinning in this situation.

sobychacko commented 8 months ago

@wyhasany Thanks for the PR. There is a checkstyle error in the PR build. See details here. Could you address that?

wyhasany commented 8 months ago

@sobychacko it should be fixed now

sobychacko commented 8 months ago

One other thing I missed before: could you add your name as an author to the class?

wyhasany commented 8 months ago

Author added