Closed Narigo closed 4 years ago
actually the current API could be used the same way you wrote for the second, in the example I did not use nested query callbacks to keep the example simple but it would be written:
// Begin the transaction
PgTransaction tx = conn.begin();
tx.query("INSERT INTO Users (first_name,last_name) VALUES ('Julien','Viet')", ar1 -> {
tx.query("INSERT INTO Users (first_name,last_name) VALUES ('Emad','Alblueshi')", ar2 -> {
tx.commit(ar -> {
if (ar.succeeded()) {
System.out.println("Transaction succeeded");
} else {
System.out.println("Transaction failed " + ar.cause().getMessage());
}
});
});
});
so that leaves the question of using or not an async transaction/begin and it turns out this is not needed to use an asynchronous transaction start which saves one asynchronous action.
perhaps we can update the example in doc to show both ?
If I do a conn.begin()
- will it send a "begin" to the next query sent on that connection or otherwise change internal state in "conn"? Is it a special case for Postgres that it does not need to be async and might have to be async for similar APIs?
Consider this case:
useConnectionForSomething(conn, ignore1 -> {
PgTransaction tx = conn.begin(); // <- does this line change the behaviour of "conn" in the next call?
useConnectionForSomething(conn, ignore2 -> {
// probably commit / rollback / whatever necessary
});
});
Is some internal state changed in conn
so that the useConnectionForSomething()
call changes behaviour (or may not work due to not committing?)
good point, I'll keep this open and check for mysql
so what happens internally is that when you call begin it sends "BEGIN" but you don't need to wait for it. if begins fails the TX is marked as completed and all commands that have been executed will be failed
so in short if you create a tx it means you will execute somehting and if "BEGIN" fail it will fail the command which saves one asynchronous call
how does that sound to you ?
I don't know if this is a separate issue, but I really want to write a transactional query using RxJava2. I can't find any examples in the Java docs and looking at other examples it does not seem like you can start a transaction and get an observable. Is that related to transactions not being async?
I feel like transactional queries would benefit most from an observable chain since you often need the results of the previous statement to execute the next statement in a transaction. Writing each subsequent statement in the async handler of the previous statement lends itself to callback hell.
Thanks!
@davidtrobinson can you give a pseudo example ?
I was looking at this example from the vert.x JDBCClient: Transaction.java
I can't see how to do the same thing with the reactive-pg-client. I have written some code like this using the non-rx example from the Java docs:
private void reservationInsertHandler(Message<JsonObject> message){
ReservationData reservationData = new ReservationData(message.body());
pool.getConnection(ar -> {
if(ar.succeeded()){
PgConnection conn = ar.result();
PgTransaction transaction = conn.begin();
if(reservationData.containsNewParticpants()){
Tuple args = Tuple.tuple();
String sql = getParticipantQuery(reservationData, args);
conn.preparedQuery(sql,args, result -> {
if(result.succeeded()) {
processSavedParticipants(result, reservationData);
if(reservationData.containsNewParticpants()) {
var insertArgs = Tuple.tuple();
var insertSql = getInsertParticipantSql(reservationData,insertArgs);
conn.preparedQuery(insertSql, insertArgs, ir -> {
if(ir.succeeded()){
processInsertedParticipants(ir.result(),reservationData);
//TODO more queries that will be nested inside result handler
}else{
log.error(ir.cause());
}
});
}
}else{
log.error(result.cause());
}
});
}
transaction.commit(result ->{
if(result.succeeded()){
message.reply("Reservation Created");
}else{
message.fail(500, result.cause().getMessage());
}
});
}
});
}
I think this illustrates the nesting problem you can run into and all the error handling. I would like the api to flow like this:
conn
.begin()
.flatMap(ar -> conn.query(...))
.flatMap(resultsFromQuery -> conn.query(...))
.flatMap(results -> //do another query)
.commit()
.subscribe(ar -> {//final result}, errorhandler)
Similar to the example here is what I get:
pool
.rxGetConnection()
.flatMapCompletable(conn -> {
PgTransaction tx = conn.begin();
return conn
.rxPreparedQuery("some-sql", Tuple.tuple())
.flatMap(result -> conn.rxPreparedQuery("another-sql", Tuple.tuple()))
.flatMapCompletable(result -> tx.rxCommit());
});
let me know how that sounds to you
That looks good, I will try to write it that way. I wasn't sure how to compose it, thank you for the example. It might be good to throw some more "rx" examples like this into the documentation.
yes, I'm holding this example in my repo and will update the doc when you confirm it works for you
I just wrote the code following your example @vietj and it worked! Thanks for helping with this. I did notice the rx apis for Tuple and PgRowset were a little different in that PgRowset didn't allow me to use a for/each loop and Tuple and Row are both missing methods to support UUID or LocalDate. Is that something that is still being worked on? Besides those small issues I really like the code using rxJava.
Here is what my code ended up looking like, if you are curious:
private void reservationInsertHandler(Message message) {
ReservationData reservationData = new ReservationData((JsonObject) message.body());
pool.rxGetConnection()
.flatMapCompletable(connection -> {
PgTransaction tx = connection.begin();
Tuple insertArgs = Tuple.tuple();
String insertParticipants = getInsertParticipantSql(reservationData, insertArgs);
return connection
.rxPreparedQuery(insertParticipants, insertArgs)
.flatMap(result -> {
Tuple selectArgs = Tuple.tuple();
String selectParticipants = getParticipantQuery(reservationData, selectArgs);
return connection.rxPreparedQuery(selectParticipants, selectArgs);
})
.flatMap(result -> {
processSavedParticipants(result, reservationData);
Tuple resArgs = Tuple.tuple();
String insertResSql = getInsertReservationSql(reservationData, resArgs);
return connection.rxPreparedQuery(insertResSql, resArgs);
})
.flatMap(result -> {
UUID reservation_id = (UUID) result.iterator().next().getValue("id");
reservationData.setReservationId(reservation_id);
Tuple resPartArgs = Tuple.tuple();
String resPartSql = getReservationParticipantsSql(reservationData, resPartArgs);
return connection.rxPreparedQuery(resPartSql, resPartArgs);
})
.flatMapCompletable(result -> tx.rxCommit());
})
.doOnError(e -> {
message.fail(500, e.getMessage());
})
.subscribe(() ->
message.reply(new JsonObject().put("success", true)
.put("id", reservationData.getReservationId().toString()).toString())
);
}
thanks for posting your code.
concerning the Tuple API, I think this should be fixed with Vert.x 3.6.0 code generator
@davidtrobinson we have created this new API for simplified transaction API https://github.com/reactiverse/reactive-pg-client/issues/130
The current handling of transactions look like this:
In the example, I am unsure when the queries will take place and in what order they will really come into play. Also, what happens if I do anything in the
ar -> {}
handlers? If I call a helper function that uses the current connection, does it know (does it need to know?) that it is currently in transaction mode?I would vote for another API, to make it more explicit and consistent with other, most probably asynchronous actions:
Maybe we can get some inspiration from SQLite and the (stopped) Web Database Specification. I have used Cordovas JavaScript Database (I think it was SQLite) in earlier versions of Cordova and it was really easy to grasp and use due its consistency regarding "always asynchronous actions".
If we plan to return
Futures
instead of using callbacks, we can alsoawait
them quite nice in languages that support that feature... :)