smallrye / smallrye-mutiny

An Intuitive Event-Driven Reactive Programming Library for Java
https://smallrye.io/smallrye-mutiny
Apache License 2.0
807 stars 124 forks source link

MultiGroupBy Failure: Thread Blocked #1637

Closed sahak1an closed 5 hours ago

sahak1an commented 3 months ago
...
.group().by(key)
flatMap(group -> switch (group.key()) {
...
})

failed with io.vertx.core.VertxException: Thread blocked

Stack tracke
    
 WARN  [io.ver.cor.imp.BlockedThreadChecker] (vertx-blocked-thread-checker) Thread Thread[vert.x-eventloop-thread-0,5,main] has been blocked for 2954 ms, time limit is 2000 ms: io.vertx.core.VertxException: Thread blocked
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.Safepoint.freezeAtSafepoint(Safepoint.java:299)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.Safepoint.slowPathSafepointCheck0(Safepoint.java:225)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.Safepoint.slowPathSafepointCheck(Safepoint.java:187)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.Safepoint.slowPathSafepointCheck(Safepoint.java:425)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.Safepoint.enterSlowPathSafepointCheck(Safepoint.java:412)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.genscavenge.ThreadLocalAllocation.slowPathNewInstanceWithoutAllocating(ThreadLocalAllocation.java:236)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.genscavenge.ThreadLocalAllocation.slowPathNewInstance(ThreadLocalAllocation.java:221)
        at java.base@22.0.1/java.util.concurrent.CopyOnWriteArrayList.getArray(CopyOnWriteArrayList.java:117)
        at java.base@22.0.1/java.util.concurrent.CopyOnWriteArrayList.add(CopyOnWriteArrayList.java:463)
        at io.smallrye.mutiny.operators.multi.MultiCacheOp.onNext(MultiCacheOp.java:92)
        at io.smallrye.mutiny.subscription.MultiSubscriberAdapter.onItem(MultiSubscriberAdapter.java:27)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
        at io.smallrye.mutiny.operators.multi.MultiGroupByOp$State.drain(MultiGroupByOp.java:400)
        at io.smallrye.mutiny.operators.multi.MultiGroupByOp$State.onItem(MultiGroupByOp.java:354)
        at io.smallrye.mutiny.operators.multi.MultiGroupByOp$GroupedUnicast.onItem(MultiGroupByOp.java:285)
        at io.smallrye.mutiny.operators.multi.MultiGroupByOp$MultiGroupByProcessor.onItem(MultiGroupByOp.java:127)
        at io.smallrye.mutiny.operators.multi.MultiCacheOp$CacheSubscription.replay(MultiCacheOp.java:183)
        at io.smallrye.mutiny.operators.multi.MultiCacheOp.onNext(MultiCacheOp.java:95)
        at io.smallrye.mutiny.subscription.MultiSubscriberAdapter.onItem(MultiSubscriberAdapter.java:27)
        at io.smallrye.mutiny.operators.multi.MultiSelectWhereOp$MultiSelectWhereProcessor.onItem(MultiSelectWhereOp.java:57)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
        at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.tryEmit(MultiFlatMapOp.java:219)
        at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapInner.onItem(MultiFlatMapOp.java:554)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
        at io.smallrye.mutiny.converters.uni.UniToMultiPublisher$UniToMultiSubscription.onItem(UniToMultiPublisher.java:94)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:60)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:60)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
        at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
        at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromPublisher$PublisherSubscriber.onComplete(UniCreateFromPublisher.java:86)
        at io.smallrye.mutiny.subscription.MultiSubscriberAdapter.onCompletion(MultiSubscriberAdapter.java:37)
        at io.smallrye.mutiny.operators.multi.MultiEmitOnOp$MultiEmitOnProcessor.isDoneOrCancelled(MultiEmitOnOp.java:248)
        at io.smallrye.mutiny.operators.multi.MultiEmitOnOp$MultiEmitOnProcessor.run(MultiEmitOnOp.java:188)
        at io.quarkus.mongodb.impl.Wrappers.lambda$toMulti$2(Wrappers.java:32)
        at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:279)
        at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:261)
        at io.vertx.core.impl.ContextInternal.lambda$runOnContext$0(ContextInternal.java:59)
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base@22.0.1/java.lang.Thread.runWith(Thread.java:1583)
        at java.base@22.0.1/java.lang.Thread.run(Thread.java:1570)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:853)
        at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:829)
   
 

Env

OS: Linux Fedora 40 Java 21 Graal reactive mutiny 2.6.0

cescoffier commented 3 months ago

You will have to give us more details. It seems that you are blocking in the "switch" part.

cescoffier commented 3 months ago

(BTW, the best is to provide a minimal reproducer; without that, it's hard to say - as the faulty code is most probably on your side)

sahak1an commented 3 months ago

 return  ...
            .group().by(Metadata::getOperationType) //enum value
            .flatMap(group -> switch (group.key()) {
                case MODIFIED -> {
                // logging, simple operations   
                }
                case NEW -> persist(group);
            })
    }

    private static Multi<?> persist(GroupedMulti<OperationType, Metadata> metadataGroup) {
        return metadataGroup
            .group().intoLists().of(4096)
            .onItem().transformToUniAndMerge(batchMetadata -> {

                // reactive mongo
                return Metadata.persist(batchMetadata);
            });
    }
jponge commented 3 months ago

There are occurrences of MultiCacheOp in the stacktrace, this seems suspicious to me.

jponge commented 3 months ago

@sahak1an are you using a cache operator somewhere?