micronaut-projects / micronaut-data

Ahead of Time Data Repositories
Apache License 2.0
466 stars 197 forks source link

Can't inject a Repository bean extending RxJavaCrudRepository #3018

Open aliabdikk opened 4 months ago

aliabdikk commented 4 months ago

I have a Micronaut project, and in a part of it I'm using reactive programming to perform a IO-heavy job that fetches a lot of data from database, of course from different threads. I was using the JpaRepository like normal DB connections, and it was working OK.

After migrating to Micronaut 4 (and consequently Hibernate 6), my CLI job started to fail with java.lang.IllegalStateException: Illegal pop() with non-matching JdbcValuesSourceProcessingState. After digging a bit, I fount out the culprit could be that the Hibernate transaction is being used in a multi-threaded context in parallel, which should not be.

One option is trying to acquire a new transaction on each operation through TransactionDefinition.Propagation.REQUIRES_NEW, and that worked well.

But to me the cleanest way is to actually use the reactive repository implementation provided by Micronaut data. So I switched to use RxJavaCrudRepository, but the moment I change my repository interface from extending JpaRepository to RxJavaCrudRepository, it fails to start with the following message:

No bean of type [com.my.project.data.MyReactiveRepository] exists.
Make sure the bean is not disabled by bean requirements (enable trace logging for
'io.micronaut.context.condition' to check) and if the bean is enabled then ensure the 
class is declared a bean and annotation processing is enabled (for Java and Kotlin 
the 'micronaut-inject-java' dependency should be configured as an annotation processor).

So apparently Micronaut is not instantiating a bean when the interface is extending RxJavaCrudRepository.

I did the following steps to replace my normal repository with a reactive one:

  1. Added io.micronaut.data:micronaut-data-hibernate-reactive to the dependancies.
  2. Configured a reactive JPA datasource, like this (I already had a read-only and read-write datasource)
    datasources:
    default: # Read-Only datasource
    url: ${JDBC_URL_RO}
    username: ${JDBC_USER}
    password: ${JDBC_PASSWORD}
    dialect: POSTGRES
    read-only: true
    rx: # Reactive datasource (Read-Only)
    url: ${JDBC_URL_RO}
    username: ${JDBC_USER}
    password: ${JDBC_PASSWORD}
    dialect: POSTGRES
    read-only: true
    reactive: true
    rw: # Read-Write datasource
    url: ${JDBC_URL}
    username: ${JDBC_USER}
    password: ${JDBC_PASSWORD}
    dialect: POSTGRES
  3. Define my repository like this

    import io.micronaut.data.repository.reactive.RxJavaCrudRepository;
    ...
    @Repository("rx")
    interface MyReactiveRepository
    extends RxJavaCrudRepository<Entity, EntityId> {
    
    @Query("SELECT e FROM Entity e WHERE e.field IN :field")
    Single<List<Entity>> getAllByCodes(List<String> field);
    }
  4. Inject the repository in my job and use it:

    public class MyJob {
    private final MyReactiveRepository repository;
    
    public MyJob(MyReactiveRepository repository) {
        this.repository = repository;
    }
    
    // Just a minimised version to make the point, not the actual implementation
    public void run() {
        Flowable.fromIterable(batchOfData)
            .buffer(bufferSize)
            .flatMap(batch -> repository.getAllByCodes(batch).toFlowable())
            .parallel(entryGenerationParallelism)
            .runOn(io)
            .blockingGet();
    }
    }

The above implementation starts successfully when using JpaRepository, so I assume I'm missing some configuration to make Micronaut instantiate the bean, but after many investigations, I couldn't find anything.

Any help will be appreciated :)

dstepanov commented 2 months ago

Please create a sample app. Make sure you have RxJava on the classpath. It's recommended to use Reactor sure it will work with RxJava