quarkusio / quarkus

Quarkus: Supersonic Subatomic Java.
https://quarkus.io
Apache License 2.0
13.46k stars 2.59k forks source link

Hibernate Reactive does not combine with blocking calls in Quarkus 3 #32665

Closed wjglerum closed 7 months ago

wjglerum commented 1 year ago

Describe the bug

In Quarkus 2 it's possible to combine blocking calls when writing to the database with Hibernate Reactive with Panache. It looks like this is not possible anymore in Quarkus 3.

Take the following use case:

@Entity
public class Fruit extends PanacheEntity {

    @Column
    public String name;

    public static Fruit of(String name) {
        var fruit = new Fruit();
        fruit.name = name;
        return fruit;
    }
}

@ApplicationScoped
public class FruitRepository implements PanacheRepository<Fruit> {
}

@ApplicationScoped
public class FruitService {

    public Uni<Fruit> random() {
        return Uni.createFrom().item(Unchecked.supplier(() -> {
            Log.info("Fetching fruits!");
            Thread.sleep(3000);
            // Not so random ;)
            return Fruit.of("Banana");
        })).runSubscriptionOn(Infrastructure.getDefaultWorkerPool());
    }
}

@Singleton
@WithTransaction
public class FruitScheduler {

    @Inject
    FruitService fruitService;

    @Inject
    FruitRepository fruitRepository;

    @Scheduled(every = "1m")
    public Uni<Void> store() {
        return fruitService.random().flatMap(fruitRepository::persist).replaceWithVoid();
    }
}

This all worked fine on Quarkus 2 (the latest I tried was 2.16.6.Final), but doesn't work on Quarkus 3 (3.0.0.CR2)

Expected behavior

I would expect that we can do some blocking work during a transaction when using Hibernate Reactive, especially when we delegate that blocking work to a worker thread and switch back to an event loop thread when we persist the entity with Hibernate Reactive.

Actual behavior

The scheduled method fails with the following exception:

2023-04-15 13:21:04,408 ERROR [io.qua.sch.com.run.StatusEmitterInvoker] (executor-thread-1) Error occurred while executing task for trigger IntervalTrigger [id=1_nl.wjglerum.FruitResource#store, interval=60000]: java.util.concurrent.CompletionException: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    [Exception 0] io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    [Exception 0] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
    [Exception 1] java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'executor-thread-1'
    [Exception 1] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:874)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
    at io.smallrye.context.impl.wrappers.SlowContextualConsumer.accept(SlowContextualConsumer.java:21)
    at io.smallrye.mutiny.helpers.UniCallbackSubscriber.onFailure(UniCallbackSubscriber.java:62)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:67)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:67)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownFailure$KnownFailureSubscription.forward(UniCreateFromKnownFailure.java:38)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownFailure.subscribe(UniCreateFromKnownFailure.java:23)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:99)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:65)
    at io.smallrye.mutiny.operators.uni.UniOnTermination$UniOnTerminationProcessor.onFailure(UniOnTermination.java:52)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.helpers.EmptyUniSubscription.propagateFailureEvent(EmptyUniSubscription.java:40)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:26)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
    at org.hibernate.reactive.context.impl.VertxContext.execute(VertxContext.java:90)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.subscribe(UniRunSubscribeOn.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnTermination.subscribe(UniOnTermination.java:21)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap.subscribe(UniOnItemOrFailureFlatMap.java:27)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:99)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.onFailure(UniOnItemOrFailureFlatMap.java:65)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOnTermination$UniOnTerminationProcessor.onFailure(UniOnTermination.java:52)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOnCancellationCall$UniOnCancellationCallProcessor.onFailure(UniOnCancellationCall.java:59)
    at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.performInnerSubscription(UniOnFailureFlatMap.java:94)
    at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.dispatch(UniOnFailureFlatMap.java:83)
    at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.onFailure(UniOnFailureFlatMap.java:60)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onFailure(UniOperatorProcessor.java:55)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:73)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem$KnownItemSubscription.forward(UniCreateFromKnownItem.java:38)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromKnownItem.subscribe(UniCreateFromKnownItem.java:23)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni.subscribe(UniOnItemTransformToUni.java:25)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransform.subscribe(UniOnItemTransform.java:22)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:81)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.onItem(UniOnItemTransformToUni.java:57)
    at io.smallrye.mutiny.operators.uni.UniOperatorProcessor.onItem(UniOperatorProcessor.java:47)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:29)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
    at io.quarkus.vertx.core.runtime.VertxCoreRecorder$14.runWith(VertxCoreRecorder.java:576)
    at org.jboss.threads.EnhancedQueueExecutor$Task.run(EnhancedQueueExecutor.java:2513)
    at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1538)
    at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
    at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
     o.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    [Exception 0] io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    [Exception 0] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
    [Exception 1] java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'executor-thread-1'
    [Exception 1] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
    at io.smallrye.mutiny.groups.UniOnItemOrFailure.lambda$call$1(UniOnItemOrFailure.java:75)
    at io.smallrye.context.impl.wrappers.SlowContextualBiFunction.apply(SlowContextualBiFunction.java:21)
    at io.smallrye.mutiny.operators.uni.UniOnItemOrFailureFlatMap$UniOnItemOrFailureFlatMapProcessor.performInnerSubscription(UniOnItemOrFailureFlatMap.java:86)
    ... 52 more
    Suppressed: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
        at org.hibernate.reactive.common.InternalStateAssertions.assertCurrentThreadMatches(InternalStateAssertions.java:46)
        at org.hibernate.reactive.session.impl.ReactiveSessionImpl.threadCheck(ReactiveSessionImpl.java:190)
        at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1786)
        at org.hibernate.internal.AbstractSharedSessionContract.checkOpenOrWaitingForAutoClose(AbstractSharedSessionContract.java:447)
        at org.hibernate.internal.SessionImpl.checkOpenOrWaitingForAutoClose(SessionImpl.java:616)
        at org.hibernate.internal.SessionImpl.closeWithoutOpenChecks(SessionImpl.java:410)
        at org.hibernate.internal.SessionImpl.close(SessionImpl.java:397)
        at org.hibernate.reactive.session.impl.ReactiveSessionImpl.reactiveClose(ReactiveSessionImpl.java:1738)
        at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
        at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:24)
        ... 47 more
Caused by: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
    [Exception 0] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
    [Exception 1] java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'executor-thread-1'
    ... 30 more
    Suppressed: java.lang.IllegalStateException: HR000068: This method should exclusively be invoked from a Vert.x EventLoop thread; currently running on thread 'executor-thread-1'
        at org.hibernate.reactive.common.InternalStateAssertions.assertUseOnEventLoop(InternalStateAssertions.java:40)
        at org.hibernate.reactive.session.impl.ReactiveSessionImpl.getReactiveConnection(ReactiveSessionImpl.java:1727)
        at org.hibernate.reactive.mutiny.impl.MutinySessionImpl$Transaction.rollback(MutinySessionImpl.java:477)
        at io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
        at io.smallrye.mutiny.groups.UniOnFailure.lambda$call$5(UniOnFailure.java:133)
        at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
        at io.smallrye.mutiny.groups.UniOnFailure.lambda$call$4(UniOnFailure.java:102)
        at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
        at io.smallrye.mutiny.operators.uni.UniOnFailureFlatMap$UniOnFailureFlatMapProcessor.performInnerSubscription(UniOnFailureFlatMap.java:92)
        ... 29 more
Caused by: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'
    at org.hibernate.reactive.common.InternalStateAssertions.assertCurrentThreadMatches(InternalStateAssertions.java:46)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.threadCheck(ReactiveSessionImpl.java:190)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1786)
    at org.hibernate.internal.SessionImpl.contains(SessionImpl.java:1544)
    at org.hibernate.reactive.mutiny.impl.MutinySessionImpl.contains(MutinySessionImpl.java:109)
    at io.quarkus.hibernate.reactive.panache.common.runtime.AbstractJpaOperations.lambda$persist$0(AbstractJpaOperations.java:41)
    at io.smallrye.context.impl.wrappers.SlowContextualFunction.apply(SlowContextualFunction.java:21)
    at io.smallrye.mutiny.operators.uni.UniOnItemTransformToUni$UniOnItemTransformToUniProcessor.performInnerSubscription(UniOnItemTransformToUni.java:68)
    ... 21 more

How to Reproduce?

Attached reactive.zip is a simple project that reproduces the error.

  1. ./mvnw quarkus:dev
  2. And see the exceptions
  3. Turn of the Hibernate checks
  4. ./mvnw quarkus:dev -Dorg.hibernate.reactive.common.InternalStateAssertions.ENFORCE=false
  5. No more errors occur

Output of uname -a or ver

Darwin Willem's-Tiny-MacBook-Pro 22.4.0 Darwin Kernel Version 22.4.0: Mon Mar  6 21:00:41 PST 2023; root:xnu-8796.101.5~3/RELEASE_ARM64_T8103 arm64

Output of java -version

openjdk 17.0.6 2023-01-17 OpenJDK Runtime Environment Temurin-17.0.6+10 (build 17.0.6+10) OpenJDK 64-Bit Server VM Temurin-17.0.6+10 (build 17.0.6+10, mixed mode)

GraalVM version (if different from Java)

No response

Quarkus version or git rev

3.0.0.RC2

Build tool (ie. output of mvnw --version or gradlew --version)

Apache Maven 3.8.8 (4c87b05d9aedce574290d1acc98575ed5eb6cd39) Maven home: /Users/wjglerum/.m2/wrapper/dists/apache-maven-3.8.8-bin/67c30f74/apache-maven-3.8.8 Java version: 17.0.6, vendor: Eclipse Adoptium, runtime: /Users/wjglerum/.sdkman/candidates/java/17.0.6-tem Default locale: en_NL, platform encoding: UTF-8 OS name: "mac os x", version: "13.3", arch: "aarch64", family: "mac"

Additional information

Looks related to https://github.com/quarkusio/quarkus/issues/32533

quarkus-bot[bot] commented 1 year ago

/cc @DavideD (hibernate-reactive), @Sanne (hibernate-reactive), @gavinking (hibernate-reactive)

wjglerum commented 1 year ago

Hmm added some more logs to see what's happening, looks like the persisting is indeed done on a worker thread (both on Quarkus 2 and 3)

2023-04-15 14:01:52,243 INFO  [nl.wjg.FruitScheduler] (vert.x-eventloop-thread-1) Scheduling fetching of fruit
2023-04-15 14:01:52,245 INFO  [nl.wjg.FruitService] (executor-thread-0) Fetching fruit!
2023-04-15 14:01:55,251 INFO  [nl.wjg.FruitRepository] (executor-thread-0) Saving fruit Banana
wjglerum commented 1 year ago

Could we (force) switch back to the event loop thread?

mkouba commented 1 year ago

Have you tried something like fruitService.random().chain(fruitRepository::persist).runSubscriptionOn(Infrastructure.getDefaultExecutor()).replaceWithVoid()?

CC @jponge

jponge commented 1 year ago

I don't think this is a Mutiny issue per-se.

Infrastructure.getDefaultExecutor() indeed shifts to a worker thread.

If you want to bring back some execution to a Vert.x event-loop then you might capture the Vert.x context (see Vertx.currentContext()) then eventually run something like context.runOnContext(v -> {...}).

mkouba commented 1 year ago

Infrastructure.getDefaultExecutor() indeed shifts to a worker thread.

Hm, what's the difference between Infrastructure.getDefaultWorkerPool() and Infrastructure.getDefaultExecutor()? It might make sense to add some javadoc to the io.smallrye.mutiny.infrastructure.Infrastructure.

If you want to bring back some execution to a Vert.x event-loop then you might capture the Vert.x context (see Vertx.currentContext()) then eventually run something like context.runOnContext(v -> {...}).

That's not very convenient. Maybe Uni#runSubscriptionOn() is not the best approach for similar use cases?

jponge commented 1 year ago

Hm, what's the difference between Infrastructure.getDefaultWorkerPool() and Infrastructure.getDefaultExecutor()? It might make sense to add some javadoc to the io.smallrye.mutiny.infrastructure.Infrastructure.

They're the same under the hood. For a long time the Quarkus executor was not a scheduled thread pool so we had to provide a wrapper on top of a non-scheduled executor, but that's not the case anymore.

That's not very convenient. Maybe Uni#runSubscriptionOn() is not the best approach for similar use cases?

Well when you subscribe, it happens from the caller thread. If it's a Vert.x event loop then the subscription starts in such a context. runSubscriptionOn is used to offset the subscription from the caller thread to another thread (see https://smallrye.io/smallrye-mutiny/2.1.0/guides/emit-on-vs-run-subscription-on/).

mkouba commented 1 year ago

@wjglerum Since the FruitService#random() is a blocking operation you can also try to leverage the VertxContextSupport.subscribeAndAwait() util method instead, i.e. change the code like:

// scheduled method is executed on a worker thread
@Scheduled(every = "1m") 
void store() {
     Fruit fruit = fruitService.random();
     VertxContextSupport.subscribeAndAwait(() -> {
        Panache.withTransaction(() -> fruitRepository.persist(fruit));
     });
 }
jponge commented 1 year ago

@wjglerum Since the FruitService#random() is a blocking operation you can also try to leverage the VertxContextSupport.subscribeAndAwait() util method instead, i.e. change the code like:

👍

wjglerum commented 1 year ago

@wjglerum Since the FruitService#random() is a blocking operation you can also try to leverage the VertxContextSupport.subscribeAndAwait() util method instead, i.e. change the code like:

// scheduled method is executed on a worker thread
@Scheduled(every = "1m") 
void store() {
     Fruit fruit = fruitService.random();
     VertxContextSupport.subscribeAndAwait(() -> {
        Panache.withTransaction(() -> fruitRepository.persist(fruit));
     });
 }

That indeed works, thanks!

This wouldn't really work for more complex examples where we have more calls with uni's and multi's in the reactive world.

Next to .runSubscriptionOn(Infrastructure. getDefaultWorkerPool()) I think it would be useful to have something similar to switch to an event loop.

I don't think this is a Mutiny issue per-se.

Infrastructure.getDefaultExecutor() indeed shifts to a worker thread.

If you want to bring back some execution to a Vert.x event-loop then you might capture the Vert.x context (see Vertx.currentContext()) then eventually run something like context.runOnContext(v -> {...}).

I tried doing something like it, but can't seem to really figure out how the code should look like ... As runOnContext() returns a void and not Uni<Void>

I came up with this:

@Singleton
@WithTransaction
public class FruitScheduler {

    @Inject
    FruitService fruitService;

    @Inject
    FruitRepository fruitRepository;

    @Scheduled(every = "1m")
    public void store() {
        Log.info("Scheduling fruit!");
        Context context = Vertx.currentContext();
        Fruit fruit = fruitService.random();
        context.runOnContext(v -> fruitRepository.save(fruit));
    }
}

However that doesn't really work, as we now don't have a session on the context

2023-04-19 10:21:24,855 INFO  [io.qua.dep.dev.RuntimeUpdatesProcessor] (Aesh InputStream Reader) Live reload total time: 1.175s 
2023-04-19 10:21:25,004 INFO  [nl.wjg.FruitScheduler] (vert.x-worker-thread-1) Scheduling fruit!
2023-04-19 10:21:25,005 INFO  [nl.wjg.FruitService] (vert.x-worker-thread-1) Fetching fruits!
2023-04-19 10:21:28,008 INFO  [nl.wjg.FruitRepository] (vert.x-eventloop-thread-13) Persisting fruit!
2023-04-19 10:21:28,009 ERROR [io.qua.ver.cor.run.VertxCoreRecorder] (vert.x-eventloop-thread-13) Uncaught exception received by Vert.x: java.lang.IllegalStateException: No current Mutiny.Session found
    - no reactive session was found in the context and the context was not marked to open a new session lazily
    - you might need to annotate the business method with @WithSession
    at io.quarkus.hibernate.reactive.panache.common.runtime.SessionOperations.getSession(SessionOperations.java:155)
    at io.quarkus.hibernate.reactive.panache.common.runtime.AbstractJpaOperations.getSession(AbstractJpaOperations.java:351)
    at io.quarkus.hibernate.reactive.panache.common.runtime.AbstractJpaOperations.persist(AbstractJpaOperations.java:36)
    at io.quarkus.hibernate.reactive.panache.PanacheRepositoryBase.persist(PanacheRepositoryBase.java:54)
    at nl.wjglerum.FruitRepository.save(FruitRepository.java:13)
    at nl.wjglerum.FruitRepository_ClientProxy.save(Unknown Source)
    at nl.wjglerum.FruitScheduler.lambda$store$0(FruitScheduler.java:27)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
    at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)

We can fix that by wrapping this in a Panache.withTransaction() but also not really great if we combine multiple calls.

wjglerum commented 1 year ago

For example with something like this:

@Singleton
@WithTransaction
public class FruitScheduler {

    @Inject
    FruitService fruitService;

    @Inject
    FruitRepository fruitRepository;

    @Scheduled(every = "1m")
    public Uni<Void> store() {
        Log.info("Scheduling fruit!");
        return fruitRepository.listAll()
                .chain(fruits -> fruitService.random().chain(fruit -> fruitRepository.save(fruit)))
                .replaceWithVoid();
    }
}
jponge commented 1 year ago

Next to .runSubscriptionOn(Infrastructure. getDefaultWorkerPool()) I think it would be useful to have something similar to switch to an event loop.

Noted 👍

geoand commented 1 year ago

What's the status of this? I was not able to get a clear picture from the existing comments.

astappiev commented 11 months ago

Hi, I would like to ask on a similar matter.

Let's imagine we have a hibernate request, blocking action and hibernate request again. What is the proper way to implement that? (I know getting rid of blocking is the best, however)

Now I ended up with that, how optimal is it?

    @POST
    @Authenticated
    @Path("/completions")
    public CompletionResults completions(@Valid CompletionQuery query) throws Throwable {
        Chat chat = VertxContextSupport.subscribeAndAwait(() -> getOrCreateChat(query));
        CompletionResults results = chatService.completions(query, securityIdentity.getPrincipal());
        results.setChatId(chat.id);
        VertxContextSupport.subscribeAndAwait(() -> persistMessages(chat, query.getMessages(), results.getChoices()));
        return results;
    }

Ideally, I want to run a hibernate request in the event loop, spawn a new thread for the blocking method and then return back to the event loop. My first attempt was like this .chain(call).runSubscriptionOn(Infrastructure.getDefaultWorkerPool()), however, I haven't figured out how to return back to the event loop after worker pool.

gsmet commented 11 months ago

Frankly, if you're in a blocking world, just use Hibernate ORM: it will be easier for you.

geoand commented 11 months ago

If one is going to use Hibernate ORM as opposed to Hibernate Reactive, there are no efficiancy gains to be had by offloading to another thread - if anything it would be worse. Moreover you might run into to subtle issues with the request scope propagation (which in turn affects things like tracing).

So the gist is, if you absolutely want to gain maximum efficiency, use Hibernate Reactive, otherwise stick with blocking operations using Hibernate ORM.

Syl2010 commented 11 months ago

The problem is that, if your operation take too long on an IO thread, Quarkus will stop the request and ask to use @Blocking (worker thread). But if you do that, you can't use Reactive Hibernate anymore because he want an IO thread (non blocking). So, beside of having a way to offload hibernate operations on the io thread from a blocking request, it's a bit a hard stuck

geoand commented 11 months ago

I am not sure how that relates as you are never supposed to block any event loop, no matter what operations you perform.

Syl2010 commented 11 months ago

True, but Quarkus still cancel the request if the operation take too long, even if it's actually not blocking

geoand commented 11 months ago

That is configurable

Froidoh commented 11 months ago

Frankly, if you're in a blocking world, just use Hibernate ORM: it will be easier for you.

If we were in a world where we could combine hibernate orm and hibernate-reactive in one and the same project and decide per API which one we use (or have long running background cron-jobs use hibernate orm and the API for querying some status or such in hibernate-reactive) this would be acceptable.

As we can not freely combine the two, this is, frankly, very disappointing.

I'm used to work in many languages, Scala, Kotlin, Rust, Node.js (bah) to name a few modern ones I use(d) professionally and now with Quarkus it's quite often needed to re-learn concepts and find workarounds for tasks that are easy with other solutions.

This is not meant as a diss, I am super grateful you guys are doing an amazing work to bring Java forward, but at the same time I must say: The tradeoffs hurt.

We are currently evaluating migrating off of quarkus at some client of mine because of stuff like this. This not a threat or anything, you don't have to care, it's not about emotions or hurting the project, it's a matter of fact I wanted to share with you.

We had a service running for many months now. Since yesterday the load increased and we suddenly get (without a new deployment!)

[Exception 0] java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [136]: 'vert.x-eventloop-thread-4' current Thread [122]: 'executor-thread-1'

This is for a non-blocking call.

geoand commented 11 months ago

Negative feedback is often more valuable than positive feedback, so thanks for sharing!

The mixing of Hibernate ORM and Hibernate Reactive is something that is on the radar, but hasn't been done yet because other persistence related things get higher priority.

As for the exception you are seeing, that is definitely a bug and we absolutely need to fix it. Can you open a new issue and attach a sample application that exhibits this problematic behavior?

Froidoh commented 11 months ago

@geoand First off: Thanks!

Currently I simply don't have the time to create a minimal reproducible example as this only happened since yesterday under more load and I cannot reproduce it locally so far. Of course I cannot add the whole application for various reasons (Setting it up would also add quite some burden to you).

I am an open source maintainer myself (under another nick ;)) and would like to give back a little, but right now: I can't.

geoand commented 11 months ago

Of course I cannot add the whole application for various reasons

Yeah, that's completely understood.

I am an open source maintainer myself (under another nick ;)) and would like to give back a little, but right now: I can't.

No problem. If and when you can create a sample we can use to debug the problem, please let us know - cc @DavideD

maxandersen commented 11 months ago

@Froidoh thanks for your candid comments.

the limitations of not being able to mix hibernate classic orm and hibernate reactive is one of the main reasons why Hibernate Reactive continues to be marked with "preview" status - we know it is annoying but also didn't wan't to hold back those using fully reactive from not being able to access reactive. The next update of docs will make the limitations more explicitly documented.

That does not solve you problem; but mentioning it here for others to be aware.

On your specific issue I'm curious to know a few things to clarify what kind of bug we are dealing with here.

1) Which version of Quarkus are you using? 2) Does any of your code manually try and run things concurrently that shares hibernate managed entities? i.e. spawning own threads, using managed executor, doing mutiny operations which can result in concurrent execution?

if you have cases of #2 then we should investigate that because that kind of behavior is inherently not safe - we try and detect such actions to prevent data loss/invalid edits - that could show up like this. This is not a "orm not mixable with reactive" but something that can happen in any stack if not being careful; though with reactive many seem to be able to do it more easily without noticing.

if you do NOT have #2 then I'm more concerned and would be great to investigate more...the version of Quarkus might give us a hint.

Thanks again.

Froidoh commented 11 months ago

@maxandersen

We use 3.1.0.Final</quarkus.platform.version> - the API in question is defined as

     @POST
     @Path("{bucket}")
     @Produces(MediaType.APPLICATION_JSON)
     public Uni<FileRow> uploadFile(
                @HeaderParam("X-File-Metadata") String metaData,
                @HeaderParam("Content-Type") String contentType,
                @PathParam("bucket") String bucketName,
                InputStream file) {

        }

It should do a streaming file upload. At the time this API was created there was no way to do streaming multipart file upload, in case you're wondering.

We then proceed to fetch a bucket config from the dabase and if it exists we try to persist the file in a non-blocking fashion via fs operations.

In there we use an AsyncInputStream and let vertx handle a lot of stuff (biting the buffer so we don't choke on huge files and do not need to allocate too much memory at once) Also a vertx.getDelegate().executeBlocking(p -> {}) for some strictly blocking Files.setAttribute calls

Once the file is uploaded we insert some data to a few db tables and return a result.

Maybe there is a footgun in there indeed.

geoand commented 11 months ago
3.1.0.Final

The first thing to do would be to tre 3.2.5.Final and / or 3.3.1

the API in question is defined as

Can you please fill in some pseudo-code showing what the impl does (and most imporantly where blocking and non-blocking calls are used)

Froidoh commented 11 months ago

@geoand trying with 3.3.1 we get:

org.hibernate.HibernateException: java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.lang.Integer (java.math.BigDecimal and java.lang.Integer are in module java.base of loader 'bootstrap')"

Oh how I missed the runtime errors of Java when working in Rust g

We do not use BigDecimal in any of our code btw. There is one occurrence of BigInteger though

geoand commented 11 months ago

@DavideD ^

Froidoh commented 11 months ago

Same with 3.2.5.Final

Not a problem with 3.1.0.Final

Didn't do a git bisect but it starts with 3.2.0.Final (didn't try any release candidates)

DavideD commented 11 months ago

org.hibernate.HibernateException: java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.lang.Integer (java.math.BigDecimal and java.lang.Integer are in module java.base of loader 'bootstrap')"

Can we have the stack trace? What operation is causing this error? The only time I think I've seen a similar error was with id generation or aggregate operations. Is this error thrown after running a query?

Froidoh commented 11 months ago

the API in question is defined as

Can you please fill in some pseudo-code showing what the impl does (and most imporantly where blocking and non-blocking calls are used)

@geoand

I am sorry, I missed this... you know what, it won't help but here it is:

@NonBlocking
public Uni<FileInformation> upload(String contentType, InputStream file, String originalFileName, String bucketName, MetaDataDto metadata) {
    String normalizedFilename = originalFileName;
    try {
        byte[] decode = base64decoder.decode(originalFileName);
        normalizedFilename = new String(decode, StandardCharsets.UTF_8);
    } catch (Exception e) {
    }
    String filename = normalizedFilename;
    return
            bucketsService.getConfigIfAllowsNewFiles(bucketName)
                    .onItem()
                    .ifNull()
                    .failWith(BucketNotFoundException.noConfig(bucketName))
                    .flatMap(bucketConfig -> {
                                return persistFile(file, contentType, bucketConfig)
                                        .flatMap(writtenFile -> repo.persistAndFlush(new FileInformation(
                                                bucketConfig,
                                                writtenFile,
                                                contentType,
                                                filename
                                        )));
                            }
                    )
                    .flatMap(f -> f.updateMetaDataAndPersistChanges(metadata))
            ;
}

  private Uni<AsyncFileWriteDto> persistFile(InputStream in, String contentType, BucketConfig bucketConfig) {
    String fileName = UUID.randomUUID() + MimeTypes.getExtensionForMimeType(contentType);
    String pathToDir = bucketConfig.pathToDir();
    String filePath = pathToDir.concat("/").concat(fileName);

    FileSystem nfs = vertx.fileSystem();
    AsyncInputStream ais = new AsyncInputStream(vertx.getDelegate(), vertx.getDelegate().getOrCreateContext(), in, d -> updateHash(sha256Digest, d.getBytes()));

    try {
        return nfs
                .mkdirs(pathToDir)
                .flatMap(v -> nfs.createFile(filePath))
                .flatMap(v -> nfs.open(filePath, new OpenOptions().setWrite(true)))
                .flatMap(asyncFile ->
                        UniHelper.toUni(ais
                                        .handler(data -> updateHash(sha256Digest, data.getBytes()))
                                        .pipeTo(asyncFile.getDelegate()))
                                .flatMap(v -> {
                                    BigInteger fileSize = BigInteger.valueOf(ais.getFileSize());
                                    if (BigInteger.ZERO.equals(fileSize)) {
                                        vertx.getDelegate().executeBlocking(p -> {
                                            log.info(String.format("Deleting empty file at %s!", filePath));
                                            nfs.deleteBlocking(filePath);
                                        });
                                        return Uni.createFrom().failure(new EmptyFileUploadException());
                                    }
                                    byte[] digest = sha256Digest.digest();
                                    BigInteger bigInteger = new BigInteger(1, digest);
                                    String fileHash = bigInteger.toString(16);
                                    // Not all file systems support creation time, so don't even bother, let's define our own, it will be good enough
                                    LocalDateTime creationTime = LocalDateTime.now();
                                    LocalDateTime retentionTime = bucketConfig.calculateRetentionTimeFrom(creationTime);
                                    // If everything was okay up until this point, we can make the file "unwritable"
                                    ZonedDateTime zdt = ZonedDateTime.of(retentionTime, ZoneId.systemDefault());
                                    vertx.getDelegate().executeBlocking(p -> {
                                        // Yes, this is blocking as I found no way to do this asynchronously in Java
                                        File f = new File(filePath);
                                        try {
                                            Files.setAttribute(Paths.get(filePath), "lastAccessTime", FileTime.from(zdt.toInstant()));
                                            if (f.setLastModified(zdt.toInstant().toEpochMilli())) {
                                                log.debug(String.format("set last modified of %s to %s", filePath, retentionTime));
                                            } else {
                                                log.error(String.format("Failed to set last modified of %s to %s", filePath, retentionTime));
                                            }
                                            if (!f.setReadOnly()) {
                                                log.error(String.format("Failed to set %s to READ_ONLY", filePath));
                                            }
                                            p.complete();
                                        } catch (IOException e) {
                                            p.fail(e);
                                        }
                                    }, res -> {
                                        if (res.succeeded()) {
                                            log.debug(String.format("Successfully set lastAccessTime of %s to %s", filePath, retentionTime));
                                        } else {
                                            log.error(String.format("Failed to set lastAccessTime of %s to %s", filePath, retentionTime));
                                        }
                                    });
                                    return Uni.createFrom().item(new AsyncFileWriteDto(filePath, creationTime, fileSize, fileHash, retentionTime));
                                }));

    } catch (Exception e) {
        log.error(String.format("Failed to persist file: %s", e));
        return Uni.createFrom().failure(new FileUploadFailedException());
    }
}

/**
 * @author stw, antimist
 * Taken from github
 */
public class AsyncInputStream implements ReadStream<Buffer> {

    public static final int DEFAULT_READ_BUFFER_SIZE = 8192;
    private static final Logger log = Logger.getLogger(AsyncInputStream.class);

    // Based on the inputStream with the real data
    private final ReadableByteChannel ch;
    private final Vertx vertx;
    private final Context context;

    private boolean closed;
    private boolean readInProgress;

    private Handler<Buffer> dataHandler;
    private final Handler<Buffer> hashCalculator;
    private Handler<Void> endHandler;
    private Handler<Throwable> exceptionHandler;
    private final InboundBuffer<Buffer> queue;

    private final int readBufferSize = DEFAULT_READ_BUFFER_SIZE;
    private long readPos;
    private long fileSize;

    /**
     * Create a new Async InputStream that can we used with a Pump
     *
     * @param in
     *        The input stream you want to write somewhere
     */
    public AsyncInputStream(Vertx vertx, Context context, InputStream in, Handler<Buffer> hashCalculator) {
        this.vertx = vertx;
        this.context = context;
        this.ch = Channels.newChannel(in);
        this.queue = new InboundBuffer<>(context, 0);
        this.hashCalculator = hashCalculator;
        queue.handler(buff -> {
            if (buff.length() > 0) {
                handleData(buff);
            } else {
                handleEnd();
            }
        });
        queue.drainHandler(v -> doRead());
    }

    public void close() {
        closeInternal(null);
    }

    public void close(Handler<AsyncResult<Void>> handler) {
        closeInternal(handler);
    }

    /*
     * (non-Javadoc)
     * @see io.vertx.core.streams.ReadStream#endHandler(io.vertx.core.Handler)
     */
    @Override
    public synchronized AsyncInputStream endHandler(Handler<Void> endHandler) {
        check();
        this.endHandler = endHandler;
        return this;
    }

    /*
     * (non-Javadoc)
     * @see
     * io.vertx.core.streams.ReadStream#exceptionHandler(io.vertx.core.Handler)
     */
    @Override
    public synchronized AsyncInputStream exceptionHandler(Handler<Throwable> exceptionHandler) {
        check();
        this.exceptionHandler = exceptionHandler;
        return this;
    }

    /*
     * (non-Javadoc)
     * @see io.vertx.core.streams.ReadStream#handler(io.vertx.core.Handler)
     */
    @Override
    public synchronized AsyncInputStream handler(Handler<Buffer> handler) {
        check();
        this.dataHandler = handler;
        if (this.dataHandler != null && !this.closed) {
            this.doRead();
        } else {
            queue.clear();
        }
        return this;
    }

    /*
     * (non-Javadoc)
     * @see io.vertx.core.streams.ReadStream#pause()
     */
    @Override
    public synchronized AsyncInputStream pause() {
        check();
        queue.pause();
        return this;
    }

    /*
     * (non-Javadoc)
     * @see io.vertx.core.streams.ReadStream#resume()
     */
    @Override
    public synchronized AsyncInputStream resume() {
        check();
        if (!closed) {
            queue.resume();
        }
        return this;
    }

    @Override
    public ReadStream<Buffer> fetch(long amount) {
        queue.fetch(amount);
        return this;
    }

    private void check() {
        if (this.closed) {
            throw new IllegalStateException("Inputstream is closed");
        }
    }

    private void checkContext() {
        if (!vertx.getOrCreateContext().equals(context)) {
            throw new IllegalStateException("AsyncInputStream must only be used in the context that created it, expected: " + this.context
                    + " actual " + vertx.getOrCreateContext());
        }
    }

    private synchronized void closeInternal(Handler<AsyncResult<Void>> handler) {
        check();
        closed = true;
        doClose(handler);
    }

    private void doClose(Handler<AsyncResult<Void>> handler) {

        try {
            ch.close();
            if (handler != null) {
                this.vertx.runOnContext(v -> handler.handle(Future.succeededFuture()));
            }
        } catch (IOException e) {
            if (handler != null) {
                this.vertx.runOnContext(v -> handler.handle(Future.failedFuture(e)));
            }
        }
    }

    public synchronized AsyncInputStream read(Buffer buffer, int offset, long position, int length,
                                              Handler<AsyncResult<Buffer>> handler) {
        Objects.requireNonNull(buffer, "buffer");
        Objects.requireNonNull(handler, "handler");
        Arguments.require(offset >= 0, "offset must be >= 0");
        Arguments.require(position >= 0, "position must be >= 0");
        Arguments.require(length >= 0, "length must be >= 0");
        check();
        ByteBuffer bb = ByteBuffer.allocate(length);
        doRead(buffer, offset, bb, position, handler);
        return this;
    }

    private void doRead() {
        check();
        doRead(ByteBuffer.allocate(readBufferSize));
    }

    private synchronized void doRead(ByteBuffer bb) {
        if (!readInProgress) {
            readInProgress = true;
            Buffer buff = Buffer.buffer(readBufferSize);
            doRead(buff, 0, bb, readPos, ar -> {
                if (ar.succeeded()) {
                    readInProgress = false;
                    Buffer buffer = ar.result();
                    readPos += buffer.length();
                    fileSize = readPos;
                    // Empty buffer represents end of file
                    if (queue.write(buffer) && buffer.length() > 0) {
                        doRead(bb);
                    }
                } else {
                    handleException(ar.cause());
                }
            });
        }
    }

    private void doRead(Buffer writeBuff, int offset, ByteBuffer buff, long position, Handler<AsyncResult<Buffer>> handler) {

        // ReadableByteChannel doesn't have a completion handler, so we wrap it into
        // an executeBlocking and use the future there
        vertx.executeBlocking(future -> {
            try {
                Integer bytesRead = ch.read(buff);
                future.complete(bytesRead);
            } catch (Exception e) {
                log.error(e);
                future.fail(e);
            }
        }, res -> {
            if (res.failed()) {
                context.runOnContext((v) -> handler.handle(Future.failedFuture(res.cause())));
            } else {
                // Do the completed check
                Integer bytesRead = (Integer) res.result();
                if (bytesRead == -1) {
                    //End of file
                    context.runOnContext((v) -> {
                        buff.flip();
                        writeBuff.setBytes(offset, buff);
                        buff.compact();
                        handler.handle(Future.succeededFuture(writeBuff));
                    });
                } else if (buff.hasRemaining()) {
                    long pos = position;
                    pos += bytesRead;
                    // resubmit
                    doRead(writeBuff, offset, buff, pos, handler);
                } else {
                    // It's been fully written

                    context.runOnContext((v) -> {
                        buff.flip();
                        writeBuff.setBytes(offset, buff);
                        buff.compact();
                        handler.handle(Future.succeededFuture(writeBuff));
                    });
                }
            }
        });
    }

    private void handleData(Buffer buff) {
        Handler<Buffer> handler;
        synchronized (this) {
            handler = this.dataHandler;
        }
        if (handler != null) {
            checkContext();
            hashCalculator.handle(buff);
            handler.handle(buff);
        }
    }

    private synchronized void handleEnd() {
        Handler<Void> endHandler;
        synchronized (this) {
            dataHandler = null;
            endHandler = this.endHandler;
        }
        if (endHandler != null) {
            checkContext();
            endHandler.handle(null);
        }
    }

    private void handleException(Throwable t) {
        if (exceptionHandler != null && t instanceof Exception) {
            exceptionHandler.handle(t);
        } else {
            log.error("Unhandled exception", t);
        }
    }

    public long getFileSize() {
        return fileSize;
    }
}

Maybe you see something that is absolutely wrong

Froidoh commented 11 months ago

org.hibernate.HibernateException: java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.lang.Integer (java.math.BigDecimal and java.lang.Integer are in module java.base of loader 'bootstrap')"

Can we have the stack trace? What operation is causing this error? The only time I think I've seen a similar error was with id generation or aggregate operations. Is this error thrown after running a query?

@DavideD

Yes, the fix is going from:

public class IdGenerator extends MutinyGenerator {
    @Override
    public Uni<Object> generate(Mutiny.Session session, Object owner, Object currentValue, EventType eventType) {
        return session
                .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual")
                .getSingleResult()
                .map(x -> BigInteger.valueOf((Integer) x))
                ;
    }
...

to

public class IdGenerator extends MutinyGenerator {
    @Override
    public Uni<Object> generate(Mutiny.Session session, Object owner, Object currentValue, EventType eventType) {
        return session
                .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual")
                .getSingleResult()
                .map(x -> ((BigDecimal) x).toBigInteger())
                ;
    }
...

I remember we had to change this a couple of times already in the past year(s) going through the versions.

At some point it was a generic function and the result of the native query was typed, so this error would've been caught at compile time. But at some point this changed.

DavideD commented 11 months ago

So, the issue here is that you are running a native query without specifying what value you expect to receive. Hibernate has no idea what type you expect and it returns the object that the driver is returning.

It's possible that with different versions of the driver, or Hibernate, or database, you get something different in return. I haven't tested it, but I would expect something like this to work:

               .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual", BigInteger.class)
                .getSingleResult()

or, at the very least:

               .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual", BigDecimal.class)
                .getSingleResult()
                .map( BigDecimal::toBigInteger)
Froidoh commented 11 months ago

So, the issue here is that you are running a native query without specifying what value you expect to receive. So hibernate just returns the object as is from the driver.

It's possible that with different versions of the driver, or Hibernate, or database, you get something different in return. I haven't tested it, but I would expect something like this to work:

               .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual", BigInteger.class)
                .getSingleResult()

or, at the very least:

               .createNativeQuery("select OUR_SEQUENCE.nextval FROM dual", BigDecimal.class)
                .getSingleResult()
                .map( BigDecimal::toBigInteger)

That's cool, thanks!

But one would still need to cast this to an Object (or at least add a .map(Function.identity())) because of MutinyGenerator not taking a generic (anymore?)

DavideD commented 11 months ago

Yes, I think you are right. I don't know if there is any particular reason for not using the generic any more.

I think this method is inspired by ORM where the generate returns an Object, we did a natural conversion by returning a Uni<Object>.

I will create an issue to make it generic again.

DavideD commented 11 months ago

Actually, just returning a Uni<?> would solve the issue. I think

geoand commented 11 months ago

@Froidoh so if I understand correctly what you are trying do is something like the following:

Is that write?

Froidoh commented 11 months ago

@geoand

We get the metadata also from the call, I omitted that initially, but it's just a http-header. Sorry, it was an attempt to make the code a bit leaner.

So it is:

geoand commented 11 months ago

@Froidoh thanks

Before I move on to suggestions (and @FroMage probably has some as well) I have a question

For the HTTP input, are you using multipart data is just plain bytes in the HTTP body?

Froidoh commented 11 months ago

@Froidoh thanks

Before I move on to suggestions (and @FroMage probably has some as well) I have a question

For the HTTP input, are you using multipart data is just plain bytes in the HTTP body?

Currently plain bytes as back when we started this multipart data could not be streamed at all in quarkus, so we would've needed to allocate all the files on the server, which was an absolute nogo.

A curl call would look like this:

curl --location 'localhost:8080/v1/buckets/test_bucket/files' \ --header 'X-File-Metadata: { "systemInfo": { "systemName": "Odin" }}' \ --header 'Transfer-Encoding: chunked' \ --header 'Content-Type: text/plain' \ --data '@/home/froidoh/triumph.txt'

geoand commented 11 months ago

Quarkus should not be buffering the multipart contents and if it is, that's a bug.

Now, even if you use raw HTTP body, you can just use the File or Path types and Quarkus will save the contents to disk from where you can then use them in your Resource method.

Furthermore, you can also use vertx.fileSystem() to perform async operations on the file (like moving it to a location of your choice). That should vastly simplify things and eliminate any well hidden bugs that might be lurking in the code you mentioned above.

Froidoh commented 11 months ago

@geoand is there any example how to do this in a non-blocking manor while not allocating any additional memory on the server AND not relying on any temp folder?

Because the server this runs on doesn't have a lot of HDD, the file-system we push to is a special remote file-system mounted on the server.

If a lot of files get uploaded and all of them would need to temporarily reside on the server, we would have a problem. Maybe the "temp folder" is configurable and we could "pipe it through" to the remote file-system...

geoand commented 11 months ago

I have found what looks like a bug in our File handling which I am looking into. I'll post an update when I have figured it out.

geoand commented 11 months ago

After https://github.com/quarkusio/quarkus/pull/35659 is in, you will be able to use quarkus.http.body.uploads-directory to specify where the temp files are stored. Furthermore, those files will be removed by default when the request is completed (same is already the case for multipart handling).

FroMage commented 11 months ago

If you get a lot of concurrent requests uploading files, I suspect you will still have more room in your tmp folder on your HDD than if you streamed all those in memory and then later on your larger storage. Now, perhaps you're streaming them directly from the network to your larger storage, in which case, yeah, just let RESTEasy Reactive do it for you like Georgios said, and it will be done prior to invoking your endpoint, and give you a File or Path which is easy to work with.

Note that this should work both for Multipart and a single file.

Froidoh commented 11 months ago

If you get a lot of concurrent requests uploading files, I suspect you will still have more room in your tmp folder on your HDD than if you streamed all those in memory and then later on your larger storage. Now, perhaps you're streaming them directly from the network to your larger storage, in which case, yeah, just let RESTEasy Reactive do it for you like Georgios said, and it will be done prior to invoking your endpoint, and give you a File or Path which is easy to work with.

Note that this should work both for Multipart and a single file.

Sadly no, as the server we are talking about is a container with like 200mb of hdd space available :/

FroMage commented 11 months ago

So, did you try quarkus.http.body.uploads-directory?

Froidoh commented 11 months ago

So, did you try quarkus.http.body.uploads-directory?

I did not, I am currently on vacation and I must say that for the time being 2 of 3 of our services will migrate from quarkus to spring.

This will probably lead to more hardware resources but as I am the bottleneck of all development and involved in other projects as well, the decision was met and I think it's the right one.

We'll see how everything works out with the advent of Java 21 and "green threads".

At least now I am not the only person capable of writing somewhat acceptable code because quarkus/mutiny really feels like another programming language, if you only know "classic imperative java".

I wanna say thanks so for all your support and I wish you all the best. We are staying with Quarkus for one service, that works reliable and makes users happy, but has no need of combining blocking calls with non-blocking ones :)

geoand commented 11 months ago

So, did you try quarkus.http.body.uploads-directory?

@FroMage we need my PR for that to work