jOOQ / jOOQ

jOOQ is the best way to write SQL in Java
https://www.jooq.org
Other
6.17k stars 1.21k forks source link

Allow to extract R2DBC connection for a subscriber #13704

Closed dstepanov closed 2 years ago

dstepanov commented 2 years ago

I'm implementing Micronaut R2DBC jOOQ integration and I have a feature request about how it would be nice to integrate transaction propagation with jOOQ. We use Reactor and propagate the state (transactions, sessions etc) inside the Reactor's context.

Right now, I have to write something like this that is needed in the user code:

protected <T> Mono<T> withDSLContextMono(Function<DSLContext, Publisher<T>> fn) {
        return Mono.deferContextual(contextView -> {
            ReactiveTransactionStatus<Connection> status = getTransactionStatus(contextView);
            if (status == null) {
                return Mono.from(fn.apply(ctx));
            }
            return Mono.from(fn.apply(DSL.using(status.getConnection())));
        });
    }

It would be nice if there was a way to provide a custom R2DBC connection extractor for Subscriber. The Reactor's subscriber is CoreSubscriber and has the state in it https://github.com/reactor/reactor-core/blob/c8a6bb8c8fd4651e7f90d80b82480bc901f194a0/reactor-core/src/main/java/reactor/core/CoreSubscriber.java so there is a way to extract possible connection.

This code can be modified to check for the possible connection being inside the subscriber:

default void subscribe(Subscriber<? super R> subscriber) {
        Connection r2dbcConn = configuration().r2dbcConnectionProvider().provide(subscriber);
        if (r2dbcConn != null) {
             subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, ResultSubscriber::new, r2dbcConn));
             return;
        }

        ConnectionFactory cf = configuration().connectionFactory();

        if (!(cf instanceof NoConnectionFactory))
            subscriber.onSubscribe(new QuerySubscription<>(this, subscriber, ResultSubscriber::new));
        else
            subscriber.onSubscribe(new BlockingRecordSubscription<>(this, subscriber));
    }
lukaseder commented 2 years ago

Thanks for your feature request.

I'm afraid, I'm not quite following. You've described a potential solution, but I don't really understand the problem this is solving yet.

I'm guessing that you want a reactive version of DSLContext.connectionResult(ConnectionCallable), is that it? E.g.

public interface DSLContext {
    <T> Publisher<T> connectionPublisher(ConnectionPublishable<T> p);
}

With

@FunctionalInterface
public interface ConnectionPublishable<T> {
    Publisher<T> run(io.r2dbc.Connection connection);
}
dstepanov commented 2 years ago

I want to propagate the connection opened by the transaction manager directly to jOOQ.

To support this scenario:

    @Transactional
    protected Mono<IPet> myTxMethod() {
        return Mono.from(ctx.select(PET_ID, PET_NAME, PET_TYPE, PET_OWNER).from(PET_TABLE)).flatMap(this::map);
    }

The transaction manager opens a transaction using a method interceptor and propagates it in the Reactor context. Then I need some provider that can do the connection extraction from the subscriber:

public interface ConnectionFromSubscriberProvider<T> {
    @Nullable
    io.r2dbc.Connection provide(org.reactivestreams.Subcriber<?> subscriber);
}

In the implementation, I can cast the subscriber to reactor.core.CoreSubscriber and extract the connection.

lukaseder commented 2 years ago

I'm sorry, I don't get it. To me, that doesn't sound like something that belongs into jOOQ.

dstepanov commented 2 years ago

Why not? Simply speaking reactive subscribers can be connection aware and provide the R2DBC connection to jOOQ.

With JDBC we can inject transactional data-source and do some thread-local delegation but with Reactive it's impossible.

To me, it looks like a nice solution for https://github.com/jOOQ/jOOQ/issues/12218

lukaseder commented 2 years ago

but with Reactive it's impossible.

Why is it not possible? If it isn't, then the @Transactional annotation seems to be broken? To my understanding, once you enter a @Transactional scope, all that scope's injected properties, including the ConnectionFactory, should receive scope context automatically.

If that doesn't work, then it seems to be broken, no?

lukaseder commented 2 years ago

To be clear, jOOQ doesn't know anything about reactor, nor should it. jOOQ is reactive-streams implementation agnostic. It was very important to me when R2DBC support was added that the only new dependency is the SPI.

Spring Boot, on the other hand, has starters for jOOQ and provides users with pre-configured jOOQ Configuration or DSLContext objects. So, the typical use-case here should be things work out of the box with Spring transactions both when:

I don't see why this should work with JDBC/jOOQ but not with R2DBC/jOOQ. If there's a flaw, this doesn't seem to be jOOQ's flaw, but Spring's flaw.

What am I missing?

dstepanov commented 2 years ago

By the scope context, do you mean thread-local context? With the reactive code, it's very hard to propagate a thread-local state across all the threads that can be involved. That's why Reactor introduced a context.

In Micronaut we also preconfigure DSLContext and set up a transaction-aware JDBC DataSource, I'm trying to do the same for R2DBC but using thread-local is not going to work in all the cases and there is I think a better solution.

I don't want jOOQ to know anything about Reactor, the only thing is needed is a possible provider interface that can extract an R2DBC connection from the Reactive-Streams subscriber (No Reactor dependency needed). Micronaut will preset the provider with the logic related to extracting the connection from the Reactor context.

lukaseder commented 2 years ago

the only thing is needed is a possible provider interface that can extract an R2DBC connection from the Reactive-Streams subscriber (No Reactor dependency needed).

Why can't that interface be ConnectionFactory?

dstepanov commented 2 years ago

It did some debugging and theoretically it might work:

configuration.setConnectionFactory(new ConnectionFactory() {
            @Override
            public Publisher<? extends Connection> create() {
                return Mono.deferContextual(contextView -> {
                    // Check the context and return the connection if present
                    return Mono.from(connectionFactory.create());
                });
            }

            @Override
            public ConnectionFactoryMetadata getMetadata() {
                return connectionFactory.getMetadata();
            }
        });

The problem right now is that when you create a custom subscriber it makes the reactor lose the context:

AbstractSubscription(Subscriber<? super T> subscriber) { // Here the context is still present
            this.completed = new AtomicBoolean();
            this.requested = new AtomicLong();
            this.guard = new Guard();
            // because the subscriber is not `CoreSubscriber` the context is gone
            this.subscriber = subscriber(
                subscriber::onSubscribe,
                subscriber::onNext,
                subscriber::onError,
                () -> {
                    completed.set(true);
                    subscriber.onComplete();
                }
            );
        }

I see most of the use-cases one subscriber is created by wrapping another, so I can imagine adding some helper util method (Subscriber original, Subscriber new) -> Subscriber might help to convert them to contextual.

lukaseder commented 2 years ago

Why does everything reactive always have to be such a pain? 😅

// because the subscriber is not `CoreSubscriber` the context is gone

This assumption that a subscriber is of a certain type or knows a certain context hasn't been designed by the reactive streams SPI. I mean, it could be requested here: https://github.com/reactive-streams/reactive-streams-jvm, but I'm not sure it will be adopted very soon, even if it was accepted.

Surely, jOOQ won't know about the reactor specific type, but we might be able to expose our own type, as you suggested below:

I see most of the use-cases one subscriber is created by wrapping another, so I can imagine adding some helper util method (Subscriber original, Subscriber new) -> Subscriber might help to convert them to contextual.

Well, technically, the new subscriber does have a reference to the old one via those method references. It just doesn't expose it formally. Assuming we did expose this formally (unlikely via public API). How would this help you?

dstepanov commented 2 years ago

Yeah unfortunately it's painful 😅 Actually yes, that would be a solution to introduce an interface DelegatingSubscriber which would return the original subscriber, maybe I would be able to extract the context, it depends on how many other publisher-subscriber wraps are there 😭

lukaseder commented 2 years ago

maybe I would be able to extract the context, it depends on how many other publisher-subscriber wraps are there 😭

The problem isn't the depth of nesting (the API could even offer convenience, similar to JDBC's Wrapper.unwrap(Class<T>)). The problem is to formally guarantee that some original subscriber is always reachable.

There's no TCK for that. Can you show a few tests that would validate correctness on your end? Ideally something like:

// ... ctx setup
var f = Flux.from(ctx.selectOne());
// ... your context setup
assert(f, whatever)
dstepanov commented 2 years ago

Here is the PR where the test project is using the internal TX manager -> jOOQ approach https://github.com/micronaut-projects/micronaut-sql/pull/674/files#diff-38f5b9308317b7b216744b2234e82ebdc707e6d9813b403c54f41468f1b559a6

dstepanov commented 2 years ago

The simple way test would be something like this:

Flux.defer(() -> {
            return Flux.from(ctx.selectOne());
        }).contextWrite(ctx -> ctx.put("key", "value"));

(Not sure if it will work without the defer)

and

configuration.setConnectionFactory(new ConnectionFactory() {
            @Override
            public Publisher<? extends Connection> create() {
                return Mono.deferContextual(contextView -> {
                    // Check the context is present
                    return Mono.from(connectionFactory.create());
                });
            }

            @Override
            public ConnectionFactoryMetadata getMetadata() {
                return connectionFactory.getMetadata();
            }
        });
lukaseder commented 2 years ago

OK, I've tried to bend my mind around this issue here for a while now, and I still fail to see how this "context" thing is jOOQ's problem.

  1. If the reactive streams SPI doesn't suffice, then the SPI should be extended, not the implementations (e.g. reactor), let alone the third parties.
  2. If an implementation is based on an assumption that an SPI extension (e.g. CoreSubscriber) is present, then the implementation assumes the SPI doesn't suffice, which leads again to 1.
  3. An implementation agnostic API like jOOQ should be able to work with the SPI alone. Dependency hell is a real thing, and I'm very glad that jOOQ doesn't have a dependency on any implementation like reactor. It complicated the development of jOOQ's R2DBC support, and there are still bugs, but the benefit is significant. Besides, users may prefer a different implementation, even if Spring popularised reactor.
  4. Other implementations don't agree with reactor's approach, see e.g. https://github.com/ReactiveX/RxJava/issues/6484, so reactor's context is by no means a proven approach to passing around such contexts.
  5. jOOQ's own approach at offering a programmatic transaction API for both JDBC and R2DBC has been so incredibly simple compared to what is being requested here, see: https://blog.jooq.org/nested-transactions-in-jooq. In that post, I also criticise the declarative, aspect oriented approach. Aspects seem convenient, but outside of transactional logic, which was popularised by Java EE and Spring, I haven't seen them applied as a mainstream concept. I personally think they're very flawed. It's almost impossible for an aspect to capture all possible interactions with random user code, so perhaps, the ThreadLocal global variable hack is the only reasonable implementation for such an aspect, and that by itself seems quite telling.
  6. I still think it should be possible for Micronaut to tamper with the ConnectionFactory alone (perhaps via a dedicated connection pool?) and inject the tampered ConnectionFactory into jOOQ's Configuration without jOOQ (or user code) noticing. The test case you've shown here: https://github.com/jOOQ/jOOQ/issues/13704#issuecomment-1165304944 seems to indicate that the ConnectionFactory side might be relatively simple for you to implement, so the remaining issue is how to let @Transactional communicate with this magical contextView global variable. If you can do that, then jOOQ will not even notice you've started a transaction, just like jOOQ doesn't notice Spring has started one in the blocking JDBC world.

If that doesn't suffice, the solution from my perspective is to lobby with the reactive streams SPI folks to make the SPI more aspect friendly. Once an SPI upgrade is available, APIs like jOOQ can use that and make the context awareness available to clients. In either case, jOOQ shouldn't hack around in this area. Frameworks like Spring and Micronaut should have very clean SPIs that jOOQ and the likes can implement (without a ton of dependency requirements). Or, alternatively, it's time to say goodbye to aspects and find something better.

I'm rejecting this issue here as the requirements towards jOOQ are very fuzzy and I'm not even convinced jOOQ can solve any perceived issues properly. Declarative transactions have always been transparent to jOOQ in the JDBC world, so ideally, they should be transparent to jOOQ in the R2DBC world, as well. If they aren't / can't be, then there is a significant design problem in either the reactive streams SPI or in the aspect approach.

dstepanov commented 2 years ago

I have never said that it's jOOQ problem and didn't want to implement the context propagation in reactive streams. My idea was to abstract you from it by requesting a simple interface that can extract the connection from the interaction point - subscriber and the ability to set the implementation into Configuration. I don't see how is that such a big problem. That would improve the developer's experience with jOOQ integration.

Implementing ConnectionFactory with some ThreadLocal IMHO is a step backward and might lead to all kinds of bugs when the thread is changed.

I will keep the current solution, which is extracting the connection from the Reactor context and pass it to jOOQ:

    protected <T> Mono<T> withDSLContextMono(Function<DSLContext, Publisher<T>> fn) {
        return Mono.deferContextual(contextView -> {
            ReactiveTransactionStatus<Connection> status = getTransactionStatus(contextView);
            if (status == null) {
                return Mono.from(fn.apply(ctx));
            }
            return Mono.from(fn.apply(DSL.using(status.getConnection())));
        });
    }

https://github.com/micronaut-projects/micronaut-sql/blob/d1fc4c6abc01609f0607e301b43ee49064c0f083/tests/jooq-tests/jooq-r2dbc-postgres/src/main/java/example/jooq/reactive/AbstractRepository.java#L32-L40

Thanks for your time!

lukaseder commented 2 years ago

I have never said that it's jOOQ problem and didn't want to implement the context propagation in reactive streams. My idea was to abstract you from it by requesting a simple interface that can extract the connection from the interaction point

You didn't say the words "jOOQ problem", but you requested a "jOOQ solution", at least in parts.

A "simple" interface (it never really is simple, it needs documentation, maintenance, forwards compatibility, regression testing, definition of "interaction point", maintenance of this point, extensions or adaptations in the future, in case we overlooked something, etc. etc.) pushes the problem into jOOQ by virtue of publishing what are currently undocumented jOOQ internals into an SPI that needs to be maintained forever. Your specific suggestion for this SPI was based on actual internals, which may or may not be well designed. Maybe, in half a year, it won't suffice and we'll have to re-iterate this discussion. It really can't possibly be this simple! It never is.

All the while I'm still not convinced that all possible solutions entirely outside of jOOQ have been explored, which is why I had all these reservations. Why is jOOQ a special case for you, but not R2DBC? If client code didn't want to inject a DSLContext but a ConnectionFactory from R2DBC, how would that interact with @Transactional? I'm naively assuming it would have to work the exact same way. jOOQ just wraps that ConnectionFactory without any additional assumptions, so it may even be a use-case for users to mix jOOQ usage with direct R2DBC usage, both participating in the same transaction. Here's a hypothetical Spring component:

@Component
public class Whatever {
    @Autowired DSLContext ctx;
    @Autowired ConnectionFactory cf;

    @Transactional
    public Flux<?> mixed() {

        // First do some jOOQ stuff
        return Flux.from(ctx.insert(...))

        // Then do some native R2DBC stuff, in the same transaction
                   .then(cf.create()...)
    }
}

How would that work in your case? A jOOQ-only approach wouldn't have an answer to this use-case either. Yet, jOOQ just naively wraps a ConnectionFactory, so users can reasonably expect this to "just work". After all, this is why they're using a DI container.

This is why I'm saying that any auto-configuring container (Spring, Micronaut, etc.) should be managing transactions entirely inside of the ConnectionFactory SPI, and provide that to jOOQ without jOOQ noticing. All jOOQ does is ConnectionFactory::create and Connection::close

That would improve the developer's experience with jOOQ integration.

You're saying that based on the fact that the developer currently has to know what you are doing behind the scenes. Understandably, you want to remove that friction. But I'm saying you're trying to remove it by tightly coupling Micronaut internals to jOOQ internals, all the while we have a wonderful SPI to interact transparently, which is R2DBC.

Micronaut auto-configures jOOQ, so Micronaut should auto-configure jOOQ with an already transactional ConnectionFactory.

Implementing ConnectionFactory with some ThreadLocal IMHO is a step backward and might lead to all kinds of bugs when the thread is changed.

I didn't say that ThreadLocal is the solution for you. It's a hack that works reasonably well in the blocking world. I'm positive that in the reactive world, a similar hack is needed for arbitrary aspect support. Except that the reactive version of the hack can't be thread-bound, so something else is needed.

dstepanov commented 2 years ago

Why is jOOQ a special case for you, but not R2DBC? How would that work in your case? A jOOQ-only approach wouldn't have an answer to this use-case either. Yet, jOOQ just naively wraps a ConnectionFactory, so users can reasonably expect this to "just work". After all, this is why they're using a DI container.

It's all coming down to how the context is propagated. In that example, I can implement ConnectionFactory and have the context propagated inside of the Reactor but only because I would've Reactor to Reactor interaction.

In Micronaut we use Reactor and its context to propagate whatever is needed. It's much better than trying to implement some kind of thread-local propagation, but it doesn't work everywhere.

lukaseder commented 2 years ago

But if a user uses RxJava, then the two things won't share transactions?

dstepanov commented 2 years ago

Yes, it's not supported. I think only Micronaut and Spring support transactional aspect propagation when using reactive that's because both use Reactor.

lukaseder commented 2 years ago

I think only Micronaut and Spring support transactional aspect propagation when using reactive that's because both use Reactor.

This sounds like the real problem in all of this, here. The idea of RS is to be a standard way for reactive streams to interoperate. jOOQ not using reactor internally shouldn't be a show stopper here. Of course, your current workaround helps users combine Micronaut transactions with jOOQ, and that might be sufficient for now, but I think frameworks like Micronaut (and Spring) should strive to get their aspect models working with any RS implementation, including third parties like jOOQ. (I'm not sure what other third parties exist here. I feel like this isn't too much of a mainstream topic in Java)

I've started a discussion on twitter: https://twitter.com/lukaseder/status/1541407763720556550

There are a few interesting insights:

These things confirm my gut feeling that in the long run, something much more sound than the reactor implementation dependency will be needed, although obviously, short term solutions can work too, to get stuff working.

I mainly closed this issue, because I didn't understand what exactly you were asking jOOQ to do, other than just a "simple" way to access internals (which I didn't find simple, nor maintainable). That wasn't going to be the solution. There is still a possibility to design something more sound, but I still don't fully understand your current requirements, and I prefer discussing those rather than possible implementations first (see https://xyproblem.info). It might well be that there isn't a reasonable way to work around your current issues inside of jOOQ, too...