hibernate / hibernate-reactive

A reactive API for Hibernate ORM, supporting non-blocking database drivers and a reactive style of interaction with the database.
https://hibernate.org/reactive
Apache License 2.0
442 stars 92 forks source link

Improve stack trace for Session used across threads detection error HR000069 when using Mutiny #1376

Open noobgramming opened 2 years ago

noobgramming commented 2 years ago

Hello again! We've been using Hibernate Reactive in prod for a while and there's a common mistake I would like to prevent when using Reactive DAO's.

We're writing DAO's much like you would in blocking code, here is an example DAO method:

  /**
   * List all entities. Force providing max results to avoid perf problems
   *
   * @return list of entities
   */
  @CheckReturnValue
  public Uni<List<E>> list(final int maxResults) {
    Preconditions.checkArgument(
        maxResults > 0, "maxResults must be more than zero. Value was %s", maxResults);
    return sessionFactory.withSession(
        session -> {
          final var query = sessionFactory.getCriteriaBuilder().createQuery(cls);
          // need root for query to be valid
          query.from(this.cls);
          return session.createQuery(query).setMaxResults(maxResults).getResultList();
        });
  }

A very common problem with this pattern is accidentally creating another Session because you have multiple reactive streams. And related, accidentally creating a second Reactive Session in a different thread because context wasn't propagated when using adapters between Mutiny/CompletableFuture/RxJava. This is actually detected and logged by Hibernate Reactive already, but at a Debug level and not exposed to the user API.

// EXISTING CODE  MutinySessionFactoryImpl.withSession()
    @Override
    public <T> Uni<T> withSession(Function<Mutiny.Session, Uni<T>> work) {
        Objects.requireNonNull( work, "parameter 'work' is required" );
        Mutiny.Session current = context.get( contextKeyForSession );
        if ( current != null && current.isOpen() ) {
            LOG.debug( "Reusing existing open Mutiny.Session which was found in the current Vert.x context" );
            return work.apply( current );
        }
        else {
            LOG.debug( "No existing open Mutiny.Session was found in the current Vert.x context: opening a new instance" );
            return withSession( openSession(), work, contextKeyForSession );
        }
    }

Can we expose this logic so I can warn or something when calling DAO methods without a session in progress?

           Optional<Mutiny.Session> getCurrentSession() {
               Mutiny.Session current = context.get( contextKeyForSession );
        if ( current != null && current.isOpen() ) {
            return Optional.of(current);
        }
        else {
            return Optional.empty();
        }
            }

Another option is exposing a set of withSession()/withTransaction() variants on SessionFactory that blow up if a session isn't in progress... maybe named withExistingSession(). This my plan for our code-base since we already wrap SessionFactory to hide Open*** variants because lack of automatic flush has bitten us so many times.

My goal is preventing implicit Session creation while transparently using an existing one in our DAO's. Because IMO it's too easy to accidentally drop your Session out of context using streams or threading improperly, and end up with utterly intractable and random IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread

DavideD commented 2 years ago

I need to think about it, but you can get the context via the Implementor interface:

Implementor implementor = (Implementor)sessionFactory;
Context context = implementor.getContext();
Context.Key sessionKey = new BaseKey<>( Mutiny.Session.class, implementor.getUuid() );
Mutiny.Session current = context.get( sessionKey );
noobgramming commented 2 years ago

Hello David, I've been fiddling with this on my end. The ability to get current session remains useful, but I'm starting to doubt my other suggestions.

I found a more effective way to fix IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread although I'm unsure if it can be done in a general way without causing unacceptable performance impact (I put our implementation behind a run-time modifiable config option)

I realized the main problem with Hibernate's existing error message is that it doesn't preserve the original stack trace. When you get HR000069, it can be coming from anywhere in your application as the provided trace doesn't contain any of the user's stack frames.

So for our SessionFactory wrapper, I'm now saving off a new RuntimeException() along with current threadId and name as the value in a concurrent map with weak keys of Mutiny.Session whenever the Session is used. Like this:

  private record ThreadInfo(
      @NonNull long threadId, @NonNull String name, @NonNull RuntimeException traceHolder) {}
  /** This cache has weak keys to prevent a memory leak holding onto Session objects forever * */
  private static final Cache<Mutiny.Session, ThreadInfo> sessionToThreadMap =
      CacheBuilder.newBuilder().weakKeys().build();

This allows me to lookup which thread the Session was created on each time it's used, and more importantly, the stored RuntimeException allows me to re-hydrate the original stack trace. Finally, the weak keys ensure I don't create a memory leak holding onto Sessions no longer in use. Usage looks something like this

final var threadInfo =
    Optional.ofNullable(sessionToThreadMap.getIfPresent(session));
threadInfo.ifPresent(
    originalThreadInfo -> {
      if (originalThreadInfo.threadId() != Thread.currentThread().getId()) { 
            log.error("Hibernate Session used in a different thread " +Thread.currentThread().getName() + 
            " than where it was created ("+originalThreadInfo.name()+") ! See trace", new RuntimeException());
            log.error("Session originated here", originalThreadInfo.traceHolder());

The main problem with my implementation is that creating a new RuntimeException() and stuffing it into the map every time a Session is used is very inefficient. I'm not clever or familiar enough with Hibernate to find a solution to this.

Maybe there is a similar but more performant way to improve the original HR000069 error messages that led me down this rabbithole?

DavideD commented 2 years ago

Ah, we didn't notice that. Good point!

When using the Mutiny API, we create a Uni that will eventually be executed at subscription and so, in case of error, it doesn't contain the caller of the session method causing the error.

For reference, this is the stacktrace when running InternalStateAssertionsTest#testFindWithStage:

java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [19]: 'vert.x-eventloop-thread-0' current Thread [26]: 'org.hibernate.reactive.InternalStateAssertionsTest-thread'
    at org.hibernate.reactive.common.InternalStateAssertions.assertCurrentThreadMatches(InternalStateAssertions.java:46)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.threadCheck(ReactiveSessionImpl.java:158)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1580)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.reactiveFind(ReactiveSessionImpl.java:1027)
    at org.hibernate.reactive.stage.impl.StageSessionImpl.find(StageSessionImpl.java:101)

        // Line in the test that causes the exception
        at org.hibernate.reactive.InternalStateAssertionsTest.lambda$testFindWithStage$1(InternalStateAssertionsTest.java:80)

    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072)
    at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)
    at java.base/java.lang.Thread.run(Thread.java:829)

And this is the stack trace when running InternalStateAssertionsTest#testFindWithMutiny:

java.lang.IllegalStateException: HR000069: Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [19]: 'vert.x-eventloop-thread-0' current Thread [27]: 'vert.x-eventloop-thread-1'
    at org.hibernate.reactive.common.InternalStateAssertions.assertCurrentThreadMatches(InternalStateAssertions.java:46)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.threadCheck(ReactiveSessionImpl.java:158)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.checkOpen(ReactiveSessionImpl.java:1580)
    at org.hibernate.reactive.session.impl.ReactiveSessionImpl.reactiveFind(ReactiveSessionImpl.java:1027)
    at org.hibernate.reactive.mutiny.impl.MutinySessionImpl.lambda$find$3(MutinySessionImpl.java:109)
    at io.smallrye.mutiny.operators.uni.builders.UniCreateFromCompletionStage.subscribe(UniCreateFromCompletionStage.java:24)
    at io.smallrye.mutiny.operators.AbstractUni.subscribe(AbstractUni.java:36)
    at io.smallrye.mutiny.operators.uni.UniRunSubscribeOn.lambda$subscribe$0(UniRunSubscribeOn.java:27)
    at org.hibernate.reactive.context.impl.VertxContext.lambda$execute$0(VertxContext.java:86)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
    at io.vertx.core.impl.EventLoopContext.lambda$runOnContext$0(EventLoopContext.java:43)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:829)

And it doesn't contain any reference to InternalStateAssertionsTest.

DavideD commented 2 years ago

I don't think I would keep a map around to fix this issue. You can create a class that wraps the session and every time you call one of the method in it, keep track of possible errors.

You can see an example - it might become a future fix - here: https://github.com/DavideD/hibernate-reactive/blob/stacktrace/hibernate-reactive-core/src/main/java/org/hibernate/reactive/mutiny/impl/MutinySessionImpl.java#L57

gavinking commented 2 years ago

A very common problem with this pattern is accidentally creating another Session because you have multiple reactive streams.

@noobgramming not sure what you mean by this; different streams can't share Sessions, that's against the rules.

noobgramming commented 2 years ago

@gavinking I know, but it hasn't stopped people from doing it :( . At least in our projects

paomian commented 2 years ago

I do't want to share Session In my situation. I will send email to an address which query from db. Unfortunately event loop of query email is different with send email.

consider code

  @GET
  @Path("/gaga")
  @ReactiveTransactional
  public Uni<Boolean> gagaga() {
    return Uni.createFrom()
        .item("xpaomian@gmail.com")
        .flatMap(this::sendInviteEmail);
  }

@ReactiveTransactional will check event loop which used by method gagaga but sendInviteEmail is wrapped aws ses service. aws ses service use this own event loop. aws ses return a CompletableFuture. I will get same error when i create Uni by Uni.createFrom().completionStage(future). Uni will use previous step executor I guess. Now i can run blocking CompletableFuture by get and generate a Uni by item(result),but it will be blocking work thread. So i try to use emitOn change sendInviteEmail event loop. but i do not know which executor to chose. Any one can help me?

DavideD commented 2 years ago

I'm not sure I understand what's going on, but can't you delay the creation of the session? it doesn't seem like you are using it in gagaga and I don't know what's happening in sendInviteEmail.

@ReactiveTransactional just wraps the method call using Panache.withTransaction. So you could try removing the annotation and use Panache.withTransaction when you actually need it.

It would help if you could create a project or test so that we can see the error.

Also, this issue is about improving the stack trace, I think you are asking for something different.

paomian commented 2 years ago

thanks for reply. I saw this issue https://github.com/hibernate/hibernate-reactive/issues/1194.It is a same issue.

  @GET
  @Path("/gaga")
  @ReactiveTransactional
  public Uni<Boolean> gagaga() {
    return Uni.createFrom()
        .item("xpaomian@gmail.com")
        .flatMap(this::sendInviteEmail)
        // if i remove this line below will get 
        // Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [120]: 'vert.x-eventloop-thread-5' current Thread [188]: 'sdk-async-response-0-0'
        .emitOn(MutinyHelper.executor(Vertx.currentContext()));
  }

it's work. but will it be incorrect?

this is code about sendMail

public class AWSEmailService implements EmailService {
  @Inject SesAsyncClient sesAsyncClient;

  @Override
  public Uni<Boolean> sendMail(String email, String subject, String content) {
    var body = Templates.welcome(content).render();
    var x =
        sesAsyncClient.sendEmail(
            req ->
                req.source("x@y")
                    .destination(d -> d.toAddresses(email))
                    .message(
                        msg ->
                            msg.subject(subj -> subj.data("subject"))
                                .body(b -> b.html(html -> html.data(body)))));

    return Uni.createFrom()
        .completionStage(x)
        .onItem()
        .transform(sendEmailResponse -> sendEmailResponse.sdkHttpResponse().isSuccessful());
  }
}
DavideD commented 2 years ago
        // Detected use of the reactive Session from a different Thread than the one which was used to open the reactive Session - this suggests an invalid integration; original thread [120]: 'vert.x-eventloop-thread-5' current Thread [188]: 'sdk-async-response-0-0'
        .emitOn(MutinyHelper.executor(Vertx.currentContext()));

Yes, I think it's the correct solution (I've zero experience with the S3 API, though).

You can see from the error message that the session has been used outside of the Vert.x event loop thread and this shouldn't happen. It happens because the S3AsyncClient uses a default thread executor instead of the Vert.x one.

But the code that you've showed us, doesn't seem to use the session anywhere, make sure that you actually need to use @ReactiveTransactional.

paomian commented 2 years ago

thanks. 👍

But the code that you've showed us, doesn't seem to use the session anywhere, make sure that you actually need to use @ReactiveTransactional.

It's just minimum reproduce code. I will query email address before send email and then save log to db. so i need use @ReactiveTransactional

DavideD commented 2 years ago

No problem. You should actually thank @tiagomistral :)