Open jansvajcr opened 4 years ago
It is currently not possible, since the TransactionalTestExecutionListener
in the Spring TestContext Framework only supports Spring's PlatformTransactionManager
with transaction state bound to a ThreadLocal
.
However, I have reworded the title of this issue and will leave it open for further discussion and brainstorming.
@mp911de, feel free to share any ideas you have regarding this topic.
The only supported approach is moving the Publisher
into a @Transactional
service method.
We're lacking a propagation model that would allow for reactive @Transactional
test methods. The reason why this is lies in how reactive vs. imperative code is executed and tested.
Imperative interaction with data and the actual assertions are enclosed within a method. A @Transactional
test method starts a transaction and the actual code runs within that scope. Synchronous transaction managers use ThreadLocal
to propagate the transaction.
Reactive transactions require a Context
to propagate the transactional state. While a reactive transaction could be started with a @Transactional
test method, there's no way how the transactional state could be propagated into the Publisher
. A test method returns void
.
Another issue are assertions: Assertions are typically modeled using StepVerifier
, which is blocking in the end. Therefore, assertions happen outside of the flow, by subscribing to the Publisher
as we cannot introduce blocking code to a reactive flow.
We need to solve both issues to make reactive @Transactional
test methods work.
Basically, from a usage perspective we have two options:
In the first variant, code could look like:
@Test @Transactional
void doTest(StepVerifier<Foo> verifier) {
flux.subscribeWith(verifier).expectNext(…).verify();
}
or
@Test @Transactional
void doTest(MyService myService) {
myService.doWork(…).as(StepVerifier::create).expectNext(…).verify();
}
class MyService() {
@Transactional
Flux<…> doWork() {…};
}
Injecting parameters is intrusive and defeats simplicity. Probably, we don't want that kind of experience. It would be also too simple to forget the injection, and tests would fail (or pass) because of a different arrangement.
The next example leaves the programming model as-is which looks much more easier to use:
@Test @Transactional
void doTest() {
flux.as(StepVerifier::create).expectNext(…).verify();
}
Now, how can we associate the transaction with our flux
? We could use a TestExecutionListener
to use Reactor's onAssembly(…)
hooks to make sure, when using Project Reactor, that operators become transaction-aware. It's a bit like Reactor's virtual time that allows the injection of behavioral aspects into a reactive flow.
@Test @Transactional
void doTest() {
flux.as(StepVerifier::create).expectNext(…).verify();
anotherFlux.as(StepVerifier::create).expectNext(…).verify();
}
The same utility will work if we use more than one component to test as all reactive flows participate in the transaction. We have only one constraint, which is parallel test execution. Reactor's hooks are JVM-wide (scoped to the ClassLoader
). If we would run multiple tests in parallel, then we would not be able to distinguish between transactions anymore. Using hooks for a transactional purpose compares well to a mutable global variable.
Another aspect that plays into the model is that typically, we expect propagation using Reactor's Context
which is carried within a Subscriber
. We don't return anything from our method and that is a bit weird. The behind-the-scenes propagation feels a bit like magic.
/cc @smaldini @bsideup
any updates on this issue? I see Spring Boot 2.3.0+ includes some updates to R2DBC Support but I am seeing this issue still exists.
Hey @mp911de! I had some trouble with this lately, and since the @Transactional
annotation is not supported yet I came out with an small helper that transform any publisher to a rollback operation:
@Component
public class Transaction {
private static TransactionalOperator rxtx;
@Autowired
public Transaction(final TransactionalOperator rxtx) {
Transaction.rxtx = rxtx;
}
public static <T> Mono<T> withRollback(final Mono<T> publisher) {
return rxtx.execute(tx -> {
tx.setRollbackOnly();
return publisher;
})
.next();
}
public static <T> Flux<T> withRollback(final Flux<T> publisher) {
return rxtx.execute(tx -> {
tx.setRollbackOnly();
return publisher;
});
}
}
Then I can use it on tests like this:
@Test
void finds_the_account_and_return_it_as_user_details() {
accountRepo.save(newAccount)
.map(Account::getUsername)
.flatMap(userDetailsService::findByUsername)
.as(Transaction::withRollback) // <-- This makes the test rollback after the transaction
.as(StepVerifier::create)
.assertNext(user -> {
assertThat(user.getUsername()).isEqualTo("mock@test.com");
})
.verifyComplete();
}
I thought this helper can be used by the @Transactional
annotation in some way. Maybe the annotation can find all publishers within the @Test
and add the transformer right before the StepVerifier
(if present?). I'm not sure how possible/easy that might be though 😅.
Another approach I was thinking is to add this right into the StepVerifier
(again, not sure if possible since it requires Spring to work) or to another implementation of the StepVerifier (something like TxStepVerifier
perhaps?). Of course, this approaches will not use the @Transactional
annotation, but will add more control over which publisher should rollback and which should not 🙂
Hope this helps, at least as a brainstorm on some ways to solve the issue.
Cheers!!
For the record, this prevents Spring Boot to support any slice test that is transactional. @DataNeo4jTest
, for instance, can't work with reactive repositories as we're facing the same problem.
Thanks @JoseLion! This worked perfectly.
I made some updates for a Kotlin version:
import org.springframework.stereotype.Component
import org.springframework.transaction.reactive.TransactionalOperator
import reactor.core.CorePublisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
@Component
class Transaction(private val transactionalOperator: TransactionalOperator) {
fun <T> withRollback(mono: Mono<T>) = setupRollback(mono).next()
fun <T> withRollback(flux: Flux<T>) = setupRollback(flux)
private fun <T> setupRollback(publisher: CorePublisher<T>) =
transactionalOperator.execute {
it.setRollbackOnly()
publisher
}
}
and example test:
internal class RepositoryTest(
@Autowired private val repository: SomeRepository,
@Autowired private val transaction: Transaction
) {
@Test
internal fun `round trip data with transaction`() {
val fakeWorkbook = "This is a fake workbook".toByteArray()
repository.save(fakeWorkbook)
.flatMap { repository.read() }
.`as` { transaction.withRollback(it) }
.`as` { StepVerifier.create(it) }
.expectNextMatches { it.contentEquals(fakeWorkbook) }
.verifyComplete()
}
}
I'm glad this helped @nhajratw 🙂
In case anyone is interested, I went one step further and created a transactional step verifier:
public interface TxStepVerifier extends StepVerifier {
static <T> FirstStep<T> withRollback(final Mono<T> publisher) {
return StepVerifier.create(publisher.as(Transactions::withRollback));
}
static <T> FirstStep<T> withRollback(final Flux<T> publisher) {
return StepVerifier.create(publisher.as(Transactions::withRollback));
}
}
This makes things a little bit easier to use 😁 so based on the same example [above]() the usage would be:
@Test
void finds_the_account_and_return_it_as_user_details() {
accountRepo.save(newAccount)
.map(Account::getUsername)
.flatMap(userDetailsService::findByUsername)
.as(TxStepVerifier::withRollback) // <-- This makes the test rollback after the transaction
.assertNext(user -> {
assertThat(user.getUsername()).isEqualTo("mock@test.com");
})
.verifyComplete();
}
Cheers!
@JoseLion using your code as inspiration, I created https://github.com/ChikliC/spring-rxtx-test-support
@mp911de maybe TxStepVerifier
can be part of spring-data-r2dbc, not as a replacement of @Transactional
, but as an alternative for testing transactional Publishers
which need to be rolled back automatically.
I found this helpful when you want to have more control over what is transactional and what's not when working on reactive tests. Also, it will provide a good option until @Transactional
is somehow supported 🙂
I'll be happy to help with a PR to spring-data-r2dbc if you think this could be a good idea 😁
Paging @simonbasle. I like the idea of a TransactionalStepVerifier
. We need to explore how to design the API for a smooth experience. TransactionalStepVerifier::withRollback
or as(TransactionalStepVerifier::create).thenRollback()
are already two variants that come to my mind.
This approach works, mainly because it doesn't really extend StepVerifier
but provides new factory methods that decorate the publisher under test.
Despite being an interface, StepVerifier
isn't really practical to extend. That is due to its composition of FirstStep
, Step
and LastStep
, which are not pluggable.
Only methods from LastStep
actually produce a StepVerifier
. The factories actually don't provide a StepVerifier
but a FirstStep
:(
Maybe we can add more composition features to these step interfaces, but I'm afraid it'll never be as smoothly integrated as something like .thenRollback()
. Plus it would have intrinsic limitations (you wouldn't likely be able to do something that you cannot directly do with a combination of existing step/stepverifier methods).
If you think that despite these constraints it could still work for the transaction case, then yeah we need a more precise API design and iterate from there.
Because of this issue, r2dbc pool cannot be properly used during tests. It keeps on accumulating connections and never releases eventually bringing local db container down.
Hi, what if we want to test the result of different queries within the same transaction ?
We would need to verify intermediate steps, but I don't see how.
Why would StepVerifier would need to be inherently blocking ?
Ok I think I can do that using Flux.cache() to have the intermediate results stored.
Hi, what if we want to test the result of different queries within the same transaction ?
We would need to verify intermediate steps, but I don't see how.
Why would StepVerifier would need to be inherently blocking ?
Ok I think I can do that using Flux.cache() to have the intermediate results stored.
unit tests are inherently blocking, and so a blocking API approach was most useful.
for more advanced cases where you want to control the timing of your checks and assertions, as well as assert intermediate state, reactor-test recently has a TestSubscriber
API (https://github.com/reactor/reactor-core/pull/2708)
Hi, what if we want to test the result of different queries within the same transaction ? We would need to verify intermediate steps, but I don't see how. Why would StepVerifier would need to be inherently blocking ? Ok I think I can do that using Flux.cache() to have the intermediate results stored.
unit tests are inherently blocking, and so a blocking API approach was most useful.
for more advanced cases where you want to control the timing of your checks and assertions, as well as assert intermediate state, reactor-test recently has a
TestSubscriber
API (reactor/reactor-core#2708)
Thanks what a great timming xD, I will try this, I was on v3.4.12 through spring boot 2.6.1, it's an easy upgrade with 2.6.2.
Hi, what if we want to test the result of different queries within the same transaction ? We would need to verify intermediate steps, but I don't see how. Why would StepVerifier would need to be inherently blocking ? Ok I think I can do that using Flux.cache() to have the intermediate results stored.
unit tests are inherently blocking, and so a blocking API approach was most useful.
for more advanced cases where you want to control the timing of your checks and assertions, as well as assert intermediate state, reactor-test recently has a
TestSubscriber
API (reactor/reactor-core#2708)
I am not sure we mean the same thing with intermediate results.
I have multiple R2DBC queries as Fluxes that i want to participate in the same transaction.
query1.thenMany(query2) .as(transactionOperator:: transaction) .thenEmpty(Mono.empty()) .as(StepVerifier::create) .verifyComplete();
If before that i do query1.subscribe(testSubscriber)
earlier it will just run the query1 alone.
The test subscriber would have to be in between but i don't see how to do that, or if it's even meant to do that.
Just wanted to add some information as it might be helpful for my understanding as well as for others - First of all thanks a lot @JoseLion and @nhajratw for sharing such amazing tricks and information.
However I tried this with Spring R2dbcRepository - for some reason this does not work when I have a little bit of customization in my code - where I have routing involved (reads and writes go through different connection factories and hence different reactive. transaction managers) - I don't have transaction operators defined in my java configuration, I just have connection factories and transaction managers defined and just declare the package name which contain all the repository interfaces. When I try to execute an integration test for a repository using Spring Testing framework (some would say why are you testing repositories but that conversation I will reserve for another day), the .as(Transaction::withRollback)
or .as(TxStepVerifier::withRollback)
did not work for me.
However I figured out that - there are a couple of things which were not configured correctly in my java configs for both the connection factories and also I ended up creating a proper test config class which uses the TransactionOperator as it should as per documentation. I also made sure that I had turned off @EnableTransactionManagement for the test config as well. Hence now I am able to test the repository as expected with automatic rollback after the tests (and yes the data is getting cleaned up automatically after the tests).
I will post the code here as it might help a few -
my config class -
@Configuration
@EnableR2dbcRepositories(basePackages = {"com.examples.raccess.r2jdbc.repo"})
public class TestMariaDBRoutingReadWriteConfig extends AbstractR2dbcConfiguration {
@Autowired
private MariaDBRWriteConfigProperties rdbRWriteConfigProperties; //these are my custom classes encapsulating host, port etc.
@Autowired
private MariaDBReadConfigProperties rdbReadConfigProperties; ////these are my custom classes encapsulating host, port etc.
@Bean(name="rwriteConnectionFactory")
public ConnectionFactory rwritePooledConnectionFactory() {
var connectionFactory = RDBBaseConfigHelper.createConnectionFactory(rdbRWriteConfigProperties);
var connectionPoolConfig = RDBBaseConfigHelper.connectionPoolConfiguration(rdbRWriteConfigProperties, connectionFactory);
ConnectionPool connectionPool = new ConnectionPool(connectionPoolConfig);
return connectionPool;
}
@Bean(name="rwriteTransactionManager")
public ReactiveTransactionManager writeOnlyTransactionManager(@Qualifier("rwriteConnectionFactory") ConnectionFactory rwriteConnectionFactory) {
ReactiveTransactionManager rwriteOnlyTransactionManager = new R2dbcTransactionManager(rwriteConnectionFactory);
return rwriteOnlyTransactionManager;
}
@Bean(name="readOnlyConnectionFactory")
public ConnectionFactory readOnlyPooledConnectionFactory() {
var connectionFactory = RDBBaseConfigHelper.createConnectionFactory(rdbReadConfigProperties);
var connectionPoolConfig = RDBBaseConfigHelper.connectionPoolConfiguration(rdbReadConfigProperties, connectionFactory);
ConnectionPool connectionPool = new ConnectionPool(connectionPoolConfig);
return connectionPool;
}
@Bean(name="readOnlyTransactionManager")
public ReactiveTransactionManager readOnlyTransactionManager(@Qualifier("readOnlyConnectionFactory") ConnectionFactory readOnlyConnectionFactory) {
ReactiveTransactionManager readOnlyTransactionManager = new R2dbcTransactionManager(readOnlyConnectionFactory);
return readOnlyTransactionManager;
}
/**
* This bean is used for unit & integration testing ONLY
* @param rwriteTransactionManager
* @return
*/
@Bean
TransactionalOperator transactionalOperator(ReactiveTransactionManager rwriteTransactionManager) {
return TransactionalOperator.create(rwriteTransactionManager);
}
@Override
public ConnectionFactory connectionFactory() {
return rwritePooledConnectionFactory();
}
}
This is the integration test -
@ExtendWith(SpringExtension.class)
@ContextConfiguration(initializers = ConfigDataApplicationContextInitializer.class,
classes = {TestMariaDBRoutingReadWriteConfig.class})
@EnableConfigurationProperties(value = {MariaDBRWriteConfigProperties.class, MariaDBReadConfigProperties.class})
@DirtiesContext(classMode= DirtiesContext.ClassMode.AFTER_CLASS)
public class EmployeeRepositoryTest {
@Autowired
private EmployeeRepository employeeRepository;
@Autowired
private TransactionalOperator operator;
@Autowired
private ReactiveTransactionManager rwriteTransactionManager;
@Test
public void testSaveEmployee() {
Employee employee = new Employee();
employee.setEmployeeName("John Doe");
employee.setEmployeeRole("Officer");
employee.setInsertDate(LocalDateTime.now());
employee.setUpdateDate(LocalDateTime.now());
employee.setInsertedBy("IntegTest12");
employee.setUpdatedBy("IntegTest12");
StepVerifier.create(TransactionalOperator.create(rwriteTransactionManager)
.execute(status -> {
status.setRollbackOnly();
return employeeRepository
.save(employee);
})).expectNextMatches(employee1 -> employee1.getEmployeeName().equals("John Doe")).verifyComplete();
}
}
I hope this helps - this is how I achieved automatic transaction rollback in my tests
@JoseLion - I was finally able to make the code and configuration work as per your suggestion earlier as well which is much cleaner - thank you.
First of all: Thank you all for that great thread. I've stumbled over the same issues yesterday and was able to solve it for me quickly. Because of this thread.
My solution was to create a component of a TrxStepVerifier which can be injected into integration tests. It basically just wraps the StepVerifier. @sgrmgj This is esentially your idea - thank you!
public class TrxStepVerifier {
private final ReactiveTransactionManager reactiveTransactionManager;
public TrxStepVerifier(ReactiveTransactionManager transactionManager) {
this.reactiveTransactionManager = transactionManager;
}
public <T> StepVerifier.FirstStep<T> create(Publisher<? extends T> publisher) {
return StepVerifier.create(
TransactionalOperator.create(reactiveTransactionManager)
.execute(trx -> {
trx.setRollbackOnly();
return publisher;
})
);
}
}
At the moment this is created as a bean inside a configuration class so I can load it easily inside DataR2bcTest together with some other configuration I need.
This said: I would be very happy to see some hints how to implement a DataR2dbcTest in the docs. I'm happy to provide some ideas. Maybe someone of you can point me to the right repo/thread to get this started?
Since we have
@Transactional
working with R2DBC repositories in 1.0 M2 (as said here), I would like to ask if there is a way to make@Transactional
working with JUnit (integration) tests (the same way we are able to do when using JDBC repositories). Is this currently possible? Will this even be possible? What is the right approach to achieve transactional tests ?Currently, running a
@Transactional
@SpringBootTest
gives mejava.lang.IllegalStateException: Failed to retrieve PlatformTransactionManager for @Transactional test
(the same problem as this guy has: http://disq.us/p/2425ot1).