Closed RuggeroDAlo closed 2 years ago
/cc @FroMage, @cescoffier, @loicmathieu, @ozangunalp
I can also reproduce the issue with 2.7.2
With 2.7.3 panache is still failing, but pure hibernate is also failing now on the first message with
2022-03-04 09:49:42,507 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-1) SRMSG18228: A failure has been reported for Kafka topics '[wordsin-hibernate]': io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
[Exception 0] java.lang.IllegalStateException: The current operation requires a safe (isolated) Vert.x sub-context, but the current context hasn't been flagged as such. You can still use Hibernate Reactive, you just need to avoid using the methods which implicitly require accessing the stateful context, such as MutinySessionFactory#withTransaction and #withSession.
[Exception 1] java.lang.IllegalStateException: The current operation requires a safe (isolated) Vert.x sub-context, but the current context hasn't been flagged as such. You can still use Hibernate Reactive, you just need to avoid using the methods which implicitly require accessing the stateful context, such as MutinySessionFactory#withTransaction and #withSession.
at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureConsume$UniOnItemOrFailureConsumeProcessor.invokeCallback(UniOnItemOrFailureConsume.java:59)
at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureConsume$UniOnItemOrFailureConsumeProcessor.onFailure(UniOnItemOrFailureConsume.java:45)
at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:54)
at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.invokeEventHandler(UniOnItemConsume.java:82)
at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.onItem(UniOnItemConsume.java:42)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.access$100(UniCreateFromKnownItem.java:26)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniOnItemConsume.subscribe(UniOnItemConsume.java:30)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureConsume.subscribe(UniOnItemOrFailureConsume.java:24)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap.subscribe(UniOnItemOrFailureFlatMap.java:27)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
at io.smallrye.mutiny.operators.uni.UniOnItemTransform$UniOnItemTransformProcessor.onItem(UniOnItemTransform.java:43)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:60)
at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onItem(UniOperatorProcessor.java:46)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:29)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap.subscribe(UniOnFailureFlatMap.java:31)
at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onItem(UniOperatorProcessor.java:46)
at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage$CompletionStageUniSubscription.forwardResult(UniCreateFromCompletionStage.java:63)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073)
at io.vertx.core.Future.lambda$toCompletionStage$2(Future.java:360)
at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.Mapping.onSuccess(Mapping.java:40)
at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
at io.vertx.core.impl.EventLoopContext.execute(EventLoopContext.java:81)
at io.vertx.core.impl.DuplicatedContext.execute(DuplicatedContext.java:173)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:51)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.handle(SqlConnectionPool.java:205)
at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.handle(SqlConnectionPool.java:195)
at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.handle(SqlConnectionPool.java:175)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:274)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:22)
at io.vertx.core.net.impl.pool.SimpleConnectionPool$LeaseImpl.emit(SimpleConnectionPool.java:702)
at io.vertx.core.net.impl.pool.SimpleConnectionPool$ConnectSuccess$2.run(SimpleConnectionPool.java:336)
at io.vertx.core.net.impl.pool.CombinerExecutor.submit(CombinerExecutor.java:50)
at io.vertx.core.net.impl.pool.SimpleConnectionPool.execute(SimpleConnectionPool.java:245)
at io.vertx.core.net.impl.pool.SimpleConnectionPool.lambda$connect$2(SimpleConnectionPool.java:257)
at io.vertx.sqlclient.impl.pool.SqlConnectionPool$2.lambda$connect$0(SqlConnectionPool.java:117)
at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.Mapping.onSuccess(Mapping.java:40)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
at io.vertx.core.Promise.complete(Promise.java:66)
at io.vertx.sqlclient.impl.ConnectionFactoryBase.lambda$doConnectWithRetry$2(ConnectionFactoryBase.java:111)
at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.mpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.Composition$1.onSuccess(Composition.java:62)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.FixedMapping.onSuccess(FixedMapping.java:31)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
at io.vertx.core.impl.future.PromiseImpl.onSuccess(PromiseImpl.java:49)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:41)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
at io.vertx.sqlclient.impl.command.CommandResponse.fire(CommandResponse.java:46)
at io.vertx.sqlclient.impl.SocketConnectionBase.handleMessage(SocketConnectionBase.java:287)
at io.vertx.pgclient.impl.PgSocketConnection.handleMessage(PgSocketConnection.java:96)
at io.vertx.sqlclient.impl.SocketConnectionBase.lambda$init$0(SocketConnectionBase.java:99)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
at io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:274)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:22)
at io.vertx.core.net.impl.NetSocketImpl.handleMessage(NetSocketImpl.java:394)
at io.vertx.core.net.impl.ConnectionBase.read(ConnectionBase.java:156)
at io.vertx.core.net.impl.VertxHandler.channelRead(VertxHandler.java:153)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.vertx.pgclient.impl.codec.PgEncoder.lambda$write$0(PgEncoder.java:87)
at io.vertx.pgclient.impl.codec.InitCommandCodec.handleReadyForQuery(InitCommandCodec.java:120)
at io.vertx.pgclient.impl.codec.PgDecoder.decodeReadyForQuery(PgDecoder.java:237)
at io.vertx.pgclient.impl.codec.PgDecoder.channelRead(PgDecoder.java:96)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:829)
Suppressed: java.lang.IllegalStateException: The current operation requires a safe (isolated) Vert.x sub-context, but the current context hasn't been flagged as such. You can still use Hibernate Reactive, you just need to avoid using the methods which implicitly require accessing the stateful context, such as MutinySessionFactory#withTransaction and #withSession.
at io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.checkIsSafe(VertxContextSafetyToggle.java:80)
at io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.validateContextIfExists(VertxContextSafetyToggle.java:63)
at io.quarkus.hibernate.reactive.runtime.customized.CheckingVertxContext.remove(CheckingVertxContext.java:52)
at org.hibernate.reactive.mutiny.impl.MutinySessionFactoryImpl.lambda$withSession$19(MutinySessionFactoryImpl.java:253)
at io.smallrye.mutiny.Uni.lambda$eventually$1(Uni.java:573)
at io.smallrye.context.impl.wrappers.SlowContextualBiConsumer.accept(SlowContextualBiConsumer.java:21)
at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureConsume$UniOnItemOrFailureConsumeProcessor.invokeCallback(UniOnItemOrFailureConsume.java:55)
... 124 more
Caused by: java.lang.IllegalStateException: The current operation requires a safe (isolated) Vert.x sub-context, but the current context hasn't been flagged as such. You can still use Hibernate Reactive, you just need to avoid using the methods which implicitly require accessing the stateful context, such as MutinySessionFactory#withTransaction and #withSession.
at io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.checkIsSafe(VertxContextSafetyToggle.java:80)
at io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle.validateContextIfExists(VertxContextSafetyToggle.java:63)
at io.quarkus.hibernate.reactive.runtime.customized.CheckingVertxContext.put(CheckingVertxContext.java:40)
at org.hibernate.reactive.mutiny.impl.MutinySessionFactoryImpl.lambda$withSession$17(MutinySessionFactoryImpl.java:251)
at io.smallrye.mutiny.groups.UniOnItem.lambda$invoke$0(UniOnItem.java:59)
at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
at io.smallrye.mutiny.operators.uni.UniOnItemConsume$UniOnItemComsumeProcessor.invokeEventHandler(UniOnItemConsume.java:77)
... 121 more
This is expected, kafka does not use safe duplicated context yet. We are working on that.
In 2.6.2 is working fine. 2.7.x doesn't work
With some sort of "working". It was actually not working correctly as the session was stored and shared between unrelated processing.
We have the same HR000061 issue with Panache + Smalltye-amqp + postgresql but only after the second message consumed .
@cescoffier Hey, we are facing the same issue when we upgraded from 2.6.x to 2.7.x. Any workaround until the kafka safe duplicated context is implemented? Thanks
@eldimious As far as I can tell 2.8.0.CR1 / 2.8.0.Final should implement the duplicated context handling for kafka + pure hibernate reactive (no panache).
Panache is still failing with the original error though.
@RuggeroDAlo thanks for the info. Hm weird, as we have tried 2.8.0.CR1 + io.quarkus:quarkus-hibernate-reactive and the problem remains: The current operation requires a safe (isolated) Vert.x sub-context, but the current context hasn't been flagged as such. You can still use Hibernate Reactive, you just need to avoid using the methods which implicitly require accessing the stateful context, such as MutinySessionFactory#withTransaction and #withSession.
Perhaps will be included on 2.8.0.Final
@eldimious Strange, I tried with the following code on 2.8.0.CR1
and it seems to be working:
https://github.com/RuggeroDAlo/quarkus-23736/blob/main/src/main/java/org/acme/MyReactiveMessagingApplication.java#L37
Anyway better wait for confirmation from @cescoffier , I'm simply guessing :smile:
Yes, this approach should work. Note that we have fixed the upstream issues (so Kafka records are now processed in their own duplicated context)
Ηey @RuggeroDAlo and @cescoffier thanks for your info. https://github.com/arconsis/Eshop-EDA/blob/main/v2/services/PaymentService/src/main/kotlin/com/arconsis/data/payments/PaymentsRepository.kt#L26 Our impl is a bit different as first we "simulate" a external REST call using paymentsRemoteStore.createPayment
which under the hood creates a Uni with some delay:Uni.createFrom().item(payment).onItem().delayIt().by(Duration.ofMillis(3000))
and then we flatMap this Uni to hibernate sessionFactory.withTransaction
. At this point when we try to execute flatMap we receive the above issue.
Btw if i remove paymentsRemoteStore.createPayment
function the execution of hibernate transaction is working fine
That is expected. Your delay switches to a worker thread and so you are not on the duplicated context anymore.
@cescoffier is this fixed?
Yes!
@gsmet
Hi~ I've checked and tested 2.9.0.Final and found that it also failed by Caused by: io.vertx.core.impl.NoStackTraceThrowable: Timeout
just now.
Should we confirm which commit or PR has solved this problem?
I also tested with 999-SNAPSHOT and I also don't think this is fixed. There was a similar issue with graphql that was fixed https://github.com/quarkusio/quarkus/issues/21111, but panache still seems broken.
@RuggeroDAlo I also follow this and other related issues. If there is any new situation, please mention it to me if it is convenient for you. Thanks~ By the way, should the issue be reopened if it is not resolved?
@cdmikechen Yes I think this should still be open, but I can't reopen it, @cescoffier should I open another issue?
Yes please, with a reproducer as this one works for me.
Strange @cescoffier , I just reproduced with the repo linked in the issue
2022-05-12 09:01:28,479 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-5) SRMSG18256: Initialize record store for topic-partition 'wordsin-panache-0' at position -1.
2022-05-12 09:01:28,514 INFO [org.acm.MyReactiveMessagingApplication] (vert.x-eventloop-thread-5) Processing message - panache - 1
2022-05-12 09:01:28,710 INFO [org.acm.MyReactiveMessagingApplication] (vert.x-eventloop-thread-5) query done - panache
2022-05-12 09:01:28,714 INFO [org.acm.MyReactiveMessagingApplication] (vert.x-eventloop-thread-5) Processing message - panache - 2
2022-05-12 09:01:28,732 INFO [org.acm.MyReactiveMessagingApplication] (vert.x-eventloop-thread-5) query done - panache
2022-05-12 09:01:28,734 INFO [org.acm.MyReactiveMessagingApplication] (vert.x-eventloop-thread-5) Processing message - panache - 3
2022-05-12 09:01:28,755 INFO [org.acm.MyReactiveMessagingApplication] (vert.x-eventloop-thread-5) query done - panache
2022-05-12 09:01:28,756 INFO [org.acm.MyReactiveMessagingApplication] (vert.x-eventloop-thread-5) Processing message - panache - 4
2022-05-12 09:01:28,762 INFO [org.acm.MyReactiveMessagingApplication] (vert.x-eventloop-thread-5) query done - panache
2022-05-12 09:01:31,204 INFO [org.acm.MyReactiveMessagingApplication] (vert.x-eventloop-thread-5) Processing message - panache - 55
2022-05-12 09:02:01,211 ERROR [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-5) SRMSG18203: A message sent to channel `wordsin-panache` has been nacked, fail-stop
2022-05-12 09:02:01,211 WARN [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-5) SRMSG18228: A failure has been reported for Kafka topics '[wordsin-panache]': io.vertx.core.impl.NoStackTraceThrowable: Timeout
2022-05-12 09:02:01,216 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-5) SRMSG00201: Error caught while processing a message: io.vertx.core.impl.NoStackTraceThrowable: Timeout
2022-05-12 09:02:01,218 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-5) SRMSG00201: Error caught while processing a message: io.vertx.core.impl.NoStackTraceThrowable: Timeout
Maybe you sent messages to the hibernate reactive queue (wordsin-hibernate) and not the panache queue (wordsin-panache)?
hum, weird, I will have another look. It seems to be a connection shortage issue (connection not released). But, I tried with 50 messages, without issues. Let me re-open and recheck.
@RuggeroDAlo
How do you deal with this problem in current quarkus version?
Because the interface in my project is reusable, I added a local RestClient
in order not to rewrite codes (The original codes has a rest api). Every time I consume kafka messages (messages in kafka are CDC transmitted through debezium, and the new frequency of the table is within a controllable range), I will call my exposed rest api in the same service.
This may not be the best solution, but I think it is also a compromise when the concurrency is not particularly large.
hum, weird, I will have another look. It seems to be a connection shortage issue (connection not released). But, I tried with 50 messages, without issues. Let me re-open and recheck.
In my case, I have a service that handles many hibernate-reactive
transactions using reactive panache (taged with @ReactiveTransactional). The consumed messages are always blocked at the first hibernate query.
@cdmikechen luckily for me my use of panache was very limited, I just stopped using it and use hibernate reactive.
Is there any progress on this issue? I have a similar setup running with panache reactive along with postgres and kafka. I consume messages from kafka and persist them into the database. After a couple of messages, the application hangs. I can't even trigger my rest resource that should return the list of all my entities.
@Incoming("words-in")
@ReactiveTransactional
public Uni<Void> consume(Message<String> message) {
MyEntity myEntity = new MyEntity();
myEntity.myProp = message.getPayload();
return MyEntity.persist(myEntity).replaceWithVoid();
}
After a while I'm getting a timeout:
2022-06-24 15:59:56,056 INFO [io.sma.rea.mes.kafka] (vert.x-eventloop-thread-4) SRMSG18256: Initialize record store for topic-partition 'words-0' at position -1.
2022-06-24 16:00:28,332 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-4) SRMSG00201: Error caught while processing a message: io.vertx.core.impl.NoStackTraceThrowable: Timeout
2022-06-24 16:00:28,336 ERROR [io.sma.rea.mes.provider] (vert.x-eventloop-thread-4) SRMSG00201: Error caught while processing a message: io.vertx.core.impl.NoStackTraceThrowable: Timeout
2022-06-24 16:00:37,516 ERROR [org.hib.rea.errors] (vert.x-eventloop-thread-2) HR000057: Failed to execute statement [select myentity0_.id as id1_0_, myentity0_.myProp as myprop2_0_ from MyEntity myentity0_]: could not execute query: java.util.concurrent.CompletionException: io.vertx.core.impl.NoStackTraceThrowable: Timeout
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:636)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at io.vertx.core.Future.lambda$toCompletionStage$2(Future.java:362)
at io.vertx.core.impl.future.FutureImpl$3.onFailure(FutureImpl.java:153)
at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)
at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
at io.vertx.core.impl.future.Mapping.onFailure(Mapping.java:45)
at io.vertx.core.impl.future.FutureBase.emitFailure(FutureBase.java:75)
at io.vertx.core.impl.future.FutureImpl.tryFail(FutureImpl.java:230)
at io.vertx.core.impl.future.PromiseImpl.tryFail(PromiseImpl.java:23)
at io.vertx.core.impl.future.PromiseImpl.onFailure(PromiseImpl.java:54)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:43)
at io.vertx.core.impl.future.PromiseImpl.handle(PromiseImpl.java:23)
at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.lambda$null$1(SqlConnectionPool.java:218)
at io.vertx.core.net.impl.pool.SimpleConnectionPool$Cancel.run(SimpleConnectionPool.java:674)
at io.vertx.core.net.impl.pool.CombinerExecutor.submit(CombinerExecutor.java:50)
at io.vertx.core.net.impl.pool.SimpleConnectionPool.execute(SimpleConnectionPool.java:245)
at io.vertx.core.net.impl.pool.SimpleConnectionPool.cancel(SimpleConnectionPool.java:636)
at io.vertx.sqlclient.impl.pool.SqlConnectionPool$1PoolRequest.lambda$onEnqueue$2(SqlConnectionPool.java:215)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:893)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:860)
at io.vertx.core.impl.EventLoopContext.emit(EventLoopContext.java:50)
at io.vertx.core.impl.DuplicatedContext.emit(DuplicatedContext.java:168)
at io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:53)
at io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:883)
at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.vertx.core.impl.NoStackTraceThrowable: Timeout
2022-06-24 16:00:37,567 ERROR [io.qua.ver.htt.run.QuarkusErrorHandler] (vert.x-eventloop-thread-2) HTTP Request to /hello failed, error id: d0ad0d12-7783-4f83-9509-55dcbc6ff586-1: io.vertx.core.impl.NoStackTraceThrowable: Timeout
2022-06-24 16:00:37,570 ERROR [org.jbo.res.rea.com.cor.AbstractResteasyReactiveContext] (vert.x-eventloop-thread-2) Request failed: io.vertx.core.impl.NoStackTraceThrowable: Timeout
I do not believe @ReactiveTransactional works with reactive messaging. It's specific to RestEasy reactive. You will. We'd to handle the transaction your self for now.
I tried it this way without success:
@Incoming("words-in")
public Uni<Void> consume(Message<String> message) {
MyEntity myEntity = new MyEntity();
myEntity.myProp = message.getPayload();
return Panache.withTransaction(myEntity::persist).replaceWithVoid();
}
I can consume 3 messages. The fourth message fails with the exception mentioned above.
@tobi-quarkus-coder the issue is that the icomming message is not acked, you need to call message.ack
somewhere (best is withing the Mutiny operation flow).
@cescoffier we definitly need to document somewhere how to use reactive messaging with transaction as it's a very common use case: both with blocking and reactive one.
@loicmathieu I tried the following without any effect.
@Incoming("words-in")
public Uni<Void> consume(Message<String> message) {
MyEntity myEntity = new MyEntity();
myEntity.myProp = message.getPayload();
return Panache.withTransaction(myEntity::persist)
.onItem().invoke(() -> message.ack())
.replaceWithVoid();
}
@tobi-quarkus-coder i had the same problem and I used the following workaround.
@Incoming("words-in")
public Uni<Void> consume(Message<String> message) {
MyEntity myEntity = new MyEntity();
myEntity.myProp = message.getPayload();
return sf.withTransaction((s,t) -> s.persist(myEntity)).replaceWithVoid();
}
@Inject
Mutiny.SessionFactory sf;
@cbianco Thx for the workaround. Yeah this works but bypasses Panache unfortunately. I'd really like to use the full stack including Panache. In my real world scenarios I'll have to perform more complex queries and use the simplicity of Panache for it.
The problem comes from Panache.withTransaction
as it cannot know if the session must be closed or not.
You can use the rest of Panache and just use the session to start a transaction and close when you are done.
@cescoffier Thanks for this! I adapted the code a little and made it a bit more "Panache-style":
@Incoming("words-in")
public Uni<Void> consume(String message) {
MyEntity entity = new MyEntity();
entity.myProp = message;
return Panache.getSession().chain(session ->
session.withTransaction(transaction -> entity.persistAndFlush())
.replaceWithVoid().onTermination().call(() -> session.close()));
}
With this, I don't have to inject the Mutiny Session my own, it'll be handled under the hood.
I think the way we need to suggest in the doc is to use the Mutiny.SessionFactory.withTransaction
which closes the session properly. I'll follow up on #26379
@ozangunalp I tried it and got the same timeout Exception. I can not confirm that using the session factory with getTransaction works.
This is what I tried:
@Inject
Mutiny.SessionFactory sessionFactory;
@Incoming("words-in")
public Uni<Void> consume(String message) {
MyEntity entity = new MyEntity();
entity.myProp = message;
return sessionFactory.withTransaction(session ->
entity.persistAndFlush()).replaceWithVoid();
}
Hum, doing the persist on the session works. If I use Panache it doesn't.
Works:
return sessionFactory.withTransaction((session, transaction) -> session.persist(entity));
Doesn't work:
return sessionFactory.withTransaction((session, transaction) -> entity.persist()).replaceWithVoid();
Try to add .onTermination().call(() -> session.close()));
after replaceWithVoid()
. This problem happens when the session isn't correctly closed.
Yes, this works:
@Inject
Mutiny.Session session;
@Incoming("words-in")
public Uni<Void> consume(String message) {
MyEntity entity = new MyEntity();
entity.myProp = message;
return session.withTransaction(session ->
entity.persistAndFlush())
.replaceWithVoid()
.onTermination().call(() -> session.close());
}
I was wondering as @ozangunalp was suggesting to use sessionFactory which handles closing the session on it's own. Couldn't get this working.
@tobi-quarkus-coder Thanks for the feedback! That's what I suspected.
Is there any plan to fix the original issue so that Panache can be used easily? Or is it basically impossible?
Since 2.13.0 version of Quarkus, using the session.withTransaction then closing it as per the documentation is not working anymore. Throwing multiple exceptions:
java.util.concurrent.CompletionException: java.lang.IllegalStateException: HR000061: Session is currently connecting to database
...
Caused by: java.lang.IllegalStateException: Session/EntityManager is closed
The migration guide indicates:
Prior to Quarkus 2.13, the Reactive Messaging consuming methods were called with an active CDI request context, inadvertently propagated, and were never terminated. Quarkus corrects this behaviour and makes sure the request context is not activated unnecessarily on message consuming methods. Code relying on the presence of the RequestScoped beans might need to start a request scope explicitly; for example, using @ActivateRequestContext annotation on the message consuming method.
Adding @ActivateRequestContext does not seem to fix the issue.
Please open a new issue with a reproducer.
Hi @cescoffier, yes created it https://github.com/quarkusio/quarkus/issues/29019
Is there any plan to fix the original issue so that Panache can be used easily? Or is it basically impossible?
Any update on this?
I've stumble upon this thread and found it very helpful and useful and I got it working with reactive hibernate, just not with Panache.
I tried to call session.close but when I do when using Panache, I get the error Connection released twice
.
Tried with:
return sessionFactory
.withSession(session -> entity
.persist()
.replaceWithVoid()
.onTermination()
.call(session::close))
.onFailure().invoke(throwable -> log.error("FAIL", throwable));
I think what might be part of the confusion is wether panache or reactive-panache is being used. I think the latter is the problem here. Any tips?
Describe the bug
After 4 calls to panache in a reactive kafka channel method the following error is raised:
This doesn't work:
This does:
I've created a reproducer, this only happens when using panache with kafka on 2.7.1, pure hibernate reactive with kafka on 2.7.1 works, panache with a reactive endpoint on 2.7.1 works, 2.7.0 works for everything.
I think these may be related: https://github.com/quarkusio/quarkus/issues/23736 , https://github.com/quarkusio/quarkus/issues/23278
Expected behavior
A kafka consumer should be able to make more than 4 calls to panache
Actual behavior
After the 4th call to panache in a kafka consumer all queries fail.
How to Reproduce?
quarkus:dev
kafka-console-producer.sh --topic wordsin-panache --bootstrap-server localhost:PORT
Output of
uname -a
orver
5.13.0-28-generic #31-Ubuntu SMP Thu Jan 13 17:41:06 UTC 2022 x86_64 x86_64 x86_64 GNU/Linux
Output of
java -version
Java version: 11.0.13, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64
GraalVM version (if different from Java)
NA
Quarkus version or git rev
2.7.1
Build tool (ie. output of
mvnw --version
orgradlew --version
)Apache Maven 3.8.4 (9b656c72d54e5bacbed989b64718c159fe39b537)
Additional information
No response