hibernate / hibernate-reactive

A reactive API for Hibernate ORM, supporting non-blocking database drivers and a reactive style of interaction with the database.
https://hibernate.org/reactive
Apache License 2.0
441 stars 92 forks source link

IllegalStateException: HR000069 with RxJava and multiple instances of Vert.x #1133

Closed DavideD closed 2 years ago

DavideD commented 2 years ago

This follows #1074.

Here the use case:

  1. The Hibernate Reactive session factory is created outside of a Vert.x context, so a new Vert.x instance is created
  2. The Session Factory is passed to a Verticle and deployed using a different Vert.x instance
  3. The Vertice will use the session factory to run some CRUD operations

Here's an example of the method in the Verticle that answer to a http request :

    private Maybe<List<Product>> listProducts(RoutingContext ctx) {
        CompletionStage<List<Product>> stage = emf.withSession( session -> session
                        .createQuery( "from Product", Product.class )
                        .getResultList() );
        return Maybe.fromCompletionStage( stage );
    }

Under stress, this causes the session to be created in one thread and then used in a different one when running the query. Because the threads are different, it throws the exception.

This has been tested using RxJava and the Stage api, I'm not sure if it's limited to this scenario (I haven't checked yet).

DavideD commented 2 years ago

It's a WIP but this branch on my repository contains an integration test about this.

andrew-dzak commented 2 years ago

@DavideD is there a work around for this for Stage api apart from setting this org.hibernate.reactive.common.InternalStateAssertions.ENFORCE to false

DavideD commented 2 years ago

@andrew-dzak What's your use case? Do you need to have the SessionFactory in a different Vert.x instance than the one used by the Verticles?

Setting org.hibernate.reactive.common.InternalStateAssertions.ENFORCE to false is usually not a good idea unless you are confident that everything works correctly anyway.

DavideD commented 2 years ago

Or do you want to create the factory in advance and then pass it to the Verticles?

andrew-dzak commented 2 years ago

@DavideD Need to have the one used to open the reactive session. Open to work around.

my code

public static <T> T findSingleByQuery(Class<T> clazz, String hql, Map<String,Object> params){
        T entity = DatabaseConfiguration.getSessionFactory().withSession(session -> {
            Stage.Query<T>  query = session.createQuery(hql,clazz);

            if(params != null && !params.isEmpty()){
                params.keySet().stream().forEach(key ->
                        query.setParameter(key,params.get(key)));
            }

           return query.getSingleResultOrNull();
        }).toCompletableFuture().join();

        return entity;
    }
java.util.concurrent.CompletionException: java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [39]: 'vert.x-eventloop-thread-3' current Thread [77]: 'vert.x-eventloop-thread-1'
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1155)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2137)
    at io.vertx.core.Future.lambda$toCompletionStage$2(Future.java:360)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.emitSuccess(FutureBase.java:60)
    at io.vertx.core.impl.future.FutureImpl.tryComplete(FutureImpl.java:211)
    at io.vertx.core.impl.future.PromiseImpl.tryComplete(PromiseImpl.java:23)
    at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:102)
    at io.vertx.sqlclient.impl.QueryResultBuilder.tryComplete(QueryResultBuilder.java:35)
    at io.vertx.core.Promise.complete(Promise.java:66)
    at io.vertx.core.Promise.handle(Promise.java:51)
    at io.vertx.core.Promise.handle(Promise.java:29)
    at io.vertx.core.impl.future.FutureImpl$3.onSuccess(FutureImpl.java:141)
    at io.vertx.core.impl.future.FutureBase.lambda$emitSuccess$0(FutureBase.java:54)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:832)
DavideD commented 2 years ago

It seems that's running on the wrong event-loop. How do you create the session factory? How do you call findSingleByQuery? Shoudln't it be:

public static <T> CompletionStage<T> findSingleByQuery(Class<T> clazz, String hql, Map<String,Object> params){
        return DatabaseConfiguration.getSessionFactory().withSession(session -> {
            Stage.Query<T>  query = session.createQuery(hql,clazz);

            if(params != null && !params.isEmpty()){
                params.keySet().stream().forEach(key ->
                        query.setParameter(key,params.get(key)));
            }

           return query.getSingleResultOrNull();
        })
    }

?

Is this a verticle? How do you deploy it? If you can provide a test case, that would be great.

DavideD commented 2 years ago

Is Hibernate Reactive creating a new Vert.x instance? You should see a message like this at start up if it's happening:

Vert.x not detected, creating a new instance
andrew-dzak commented 2 years ago

Is Hibernate Reactive creating a new Vert.x instance? You should see a message like this at start up if it's happening:

Vert.x not detected, creating a new instance

yes it does

 2022-02-18 15:26:43.841 INFO HR000002: Vert.x not detected, creating a new instance [org.hibernate.reactive.vertx.impl.DefaultVertxInstance start org.hibernate.reactive.vertx.impl.DefaultVertxInstance ]
DavideD commented 2 years ago

Ok, I'm assuming that you don't need a new Vert.x instance and it's a side effect of creating the factory outside of a Vert.x context.

In this example I show how one can create the factory before deploying the Verticles: https://github.com/hibernate/hibernate-reactive/issues/1074#issuecomment-1014770377

Or you can register the Vert.x instance you already have when you create the factory. We describe this in the documentation:

final Vertx vertx = ...

Configuration configuration = new Configuration();
StandardServiceRegistryBuilder builder = new ReactiveServiceRegistryBuilder()
        .addService( VertxInstance.class, (VertxInstance) () -> vertx )
        .applySettings( configuration.getProperties() );
StandardServiceRegistry registry = builder.build();
SessionFactory sessionFactory = configuration.buildSessionFactory( registry );
andrew-dzak commented 2 years ago

@DavideD thanks the session factory is created outside the Vert.x context. I will give this a try

DavideD commented 2 years ago

No problem. Let us know how it turns out.

By the way, you shouldn't have to use blocking calls like .toCompletableFuture().join(). This can cause some errors if the query takes too long and it's not the right approach if you are running things in an event loop.

I have a basic example of a verticle that uses CompletionStage and Future here: https://github.com/DavideD/jbang-rep/blob/d58be92c49d6fbca431b172ba1bc210c78f307bb/StageVerticle.java#L144

andrew-dzak commented 2 years ago

@DavideD Sorry for the late reply but it works perfectly after refactoring my code. Stress test with apache jmeter and had no errors

DavideD commented 2 years ago

Awesome!