Closed kevin70 closed 5 years ago
Try use concatMap
to fast-fix your problem, like following:
Flux.range(0, c).map {
User(email = "integration-test$it@mail.com", password = "[PASSWORD]")
}.concatMap(userRepository::insert)
Or try use multi-bindings in UserRepository.insert
.
I'm not sure that its operation is in line with expectations. Because SQL engine does not support transaction with multiplexed connections, so if we begin a transaction, we should execute statement one-by-one in the connection which is beginning transaction.
If we begin a transaction, transaction management framework maybe bind this connection in subscriber contexts. Then all queries will use same connection, and then we will be issued multiple commands on a single connection.
In R2DBC, a single connection can be used only by a single conversation. We cannot (should not) expect to issue multiple commands on a single connection because that's not the way relational databases work.
But this is not clear to me, so I need to create/build more test cases/environments for verify your problem. If this affects your development, try the fast-fix mentioned above first please.
@mirromutth
I tried to have the same problem with r2dbc-client.
@Test
fun `test insert`() {
val sql = "INSERT INTO DUIC_USER(email,password,created_at,updated_at) VALUES(?,?,NOW(),NOW())"
val p = r2dbc.inTransaction { h ->
Flux.range(0, 30).flatMap {
h.execute(sql, "integration-test$it@mail.com", "[PASSWORD]")
}
}
val n = p.sum().block()
println(n)
}
Unknown message header 0x3 and readable bytes is 23 on command phase
io.r2dbc.spi.R2dbcNonTransientResourceException: Unknown message header 0x3 and readable bytes is 23 on command phase
But the following code work.
@Test
fun `test insert2`() {
val p = r2dbc.inTransaction { h ->
Flux.range(0, 30).flatMap {
h.execute("INSERT INTO DUIC_USER(email,password,created_at,updated_at) VALUES('integration-test$it@mail.com','[PASSWORD]',NOW(),NOW())")
}
}
val n = p.sum().block()
println(n)
}
Looks like this is undefined behavior, the Handle
in r2dbc-client
is one-to-one Connection
, so all queries will use same connection in this case.
The flatMap
does not care about the order of the items, which means flatMap
will send each query after it published rather than its result completed.
Use concatMap
is right way because it always send next query after previous result completed in this case.
Just like @mp911de said, we cannot (should not) expect to issue multiple commands on a single connection because that's not the way relational databases work.
Why simple query work but not work in prepare query? Because running a parameterized statement needs to happen in multiple roundtrips and client should know what's current status in this parameterized statement. In other words, prepare query is stateful, simple query is stateless. So it work on simple query but not work on prepare query.
@mirromutth I don't understand. Transactions should be done in a single connection.
I use r2dbc-postgresql to get the desired results.
we cannot (should not) expect to issue multiple commands on a single connection because that's not the way relational databases work.
The multiple command statement relates to something like:
Connection c = …;
c.createStatement("SELECT id FROM person")
.execute().flatMap(result -> result.map((row, metadata) -> row.get(0, Integer.class))
.flatMap(id -> c.createStatement("INSERT INTO my_table VALUES(" + id+")").execute());
That is, executing commands while another result stream is active. The reason why we should not expect this to work is because the command response (response stream of rows) isn't fully consumed and issuing commands can lock up the connection because of response buffers and backpressure demand.
Another issue is concurrent command queueing (concurrent subscriptions). That is what we typically call thread-safety in imperative programming. If a command on a connection is still active (e.g. running an insert or changing transaction state (BEGIN/COMMIT/ROLLBACK)), it would be good if connections could queue commands so a queued command only is executed if the previous one was completed (assuming that pipelining is not supported).
That is somewhat tricky to achieve because it requires a recursive queue and potentially deferred command creation if commands require a connection-state-specific identifier. I did this for R2DBC SQL Server. R2DBC Postgres has a similar implementation.
The background of concurrent subscriptions is that a transaction manager can issue a commit (transaction cleanup) either onComplete
or onCancel
. While onComplete
allows for synchronization (typically via Mono.usingWhen
or Flux.usingWhen
), onCancel()
has no synchronization.
A case for such behavior is Mono<Object> singleFromFlux = transactionalOperator.transactional(data).next();
.
.next()
or .take()
operators cancel their upstream subscription to send a completion signal downstream. With usingWhen(…)
, cancellation typically leads to a commit because the transaction was successful. usingWhen
receives a cancellation and issues a commit
. If a subsequent invocation wants to operate on the connection, the connection may be still busy with transaction cleanup.
It solved by PR #52 , target version is 0.3.0.M3.
exception:
test code:
The code failed to run under spring reactive transaction. Normal operation without transaction.