sfackler / rust-postgres

Native PostgreSQL driver for the Rust programming language
Apache License 2.0
3.44k stars 436 forks source link

pipelining should document (and maybe the client should provide?) ordering guarantees #930

Open maddyblue opened 2 years ago

maddyblue commented 2 years ago

The pipeline documentation should perhaps describe that there are no ordering guarantees when pipelining queries. For example:

let f1 = client.query("BEGIN");
let f2 = client.query("SOMETHING IN THE TXN");
try_join(f1, f2).await?;

Will not always execute f2 in the transaction started by f1 because query is an async fn and so will not execute anything until poll'd the first time, which isn't guaranteed by try_join. This is a subtle and easy error to make (I made it).

One thing that'd be real great is if the various query functions were sync and returned a impl Future such that they could guarantee query execution order in the order they were called. It seems like that could be achieved if hand waving the order of messages in the recv side of the InnerClient's sender channel is the same as the send order somehow.

sfackler commented 2 years ago

The work for a query will start on the first call to poll, but using query with a string rather than a Statement will involve at least two separate rounds of communication with the server (and possibly more if the statement involves any non-built-in types).

I would additionally not recommend trying to pipeline requests with the creation of a transaction - if the BEGIN call fails, you will execute the next query outside of a transaction which is probably not good.

If you are sure you need to pipeline these queries for performance reasons, I would at least recommend pre-prepared statements to avoid multiple round trips. At that point, you should be guaranteed that the queries will be sent in order of the first poll of their future.

maddyblue commented 2 years ago

The BEGIN example was contrived. My actual problem was I was already in a transaction and needed to execute some queries in order and wanted to pipeline them. All my statements are already prepared.

I wanted to use pipelining to stop waiting for network roundtrips for each Bind/Describe/Execute batch, but I don't think there's a way to do that correctly.

sfackler commented 2 years ago

If the statements are already prepared, then the queries should be guaranteed to execute in the order they were first polled, and it looks like the futures crate's try_join does poll in-order. If that's not the case, then it'd be great if you could put together a small self contained example of that and I can look into it.

maddyblue commented 2 years ago

https://github.com/sfackler/rust-postgres/blob/a624282bed658a53e7e09d194972a025c82dc3e0/tokio-postgres/src/client.rs#L370 is an await point in query_raw before the .send, which I think means even if try_join was polling its arguments in order, the actual query execution could still be in another order.

sfackler commented 2 years ago

into_statement returns a future that immediately resolves when working with a Statement: https://github.com/sfackler/rust-postgres/blob/8bb5712406c7c3c9763daa553de525cad55785d4/tokio-postgres/src/to_statement.rs#L17

maddyblue commented 2 years ago

My argument is that there should be documentation that either guarantees or does not guarantee statement execution order during pipelining because relying on a bunch of implementation details that don't seem to be documented guarantees is not helpful to users trying to write safe, fast code.

Knowing exactly how async, await, try_for, and tokio_postgres internals work is a significant bar, and they can also change at any time. try_join for example happens to execute in a certain way, but does not guarantee it. tokio_postgres happens to have at least one internal await point that immediately resolves. Are there other await points before the send? Even if there weren't, does the send ordering have any guarantees on the receive side? I couldn't find those in the tokio docs. Knowing exactly what happens during immediately resolving await points also feels like some rust obscura that we all probably get subtly wrong sometimes. If we want pipelining to make things go fast, then it should document whether or not it comes with ordering guarantees so users can use it safely.

sfackler commented 2 years ago

Even if there weren't, does the send ordering have any guarantees on the receive side?

Queries are executed in order, as the docs note: https://docs.rs/tokio-postgres/latest/tokio_postgres/#pipelining.

maddyblue commented 2 years ago

Ah, the section above it about behavior does say this

Requests are executed in the order that they are first polled, not in the order that their futures are created.

This is about half of what I wanted, but I was looking for it in the wrong place. I do think the above sentence could be improved with an API change that guarantees the order at time of calling, not polling.

maddyblue commented 2 years ago

Attached is a program that doesn't execute statements in order of first poll. It achieves this by first using an unprepared statement which fires off a prepare request. The above comment about immediately resolving the to_statement future is not true for unprepared statements. In general it seems the documented claim only holds if there are no string statements preceding prepared statements in a join batch.

This program fails with a primary key error due to the insert happening with an existing key, because it is executed before the delete.

use futures::future;
use tokio_postgres::{Error, NoTls};

#[tokio::main]
async fn main() -> Result<(), Error> {
    let (mut client, connection) =
        tokio_postgres::connect("host=localhost user=postgres port=5432", NoTls).await?;
    tokio::spawn(async move {
        connection.await.unwrap();
    });

    client.batch_execute("DROP TABLE IF EXISTS t").await?;
    client
        .batch_execute("CREATE TABLE t (a INT PRIMARY KEY)")
        .await?;

    let insert = client.prepare("INSERT INTO t VALUES ($1)").await.unwrap();
    let _delete = client.prepare("DELETE FROM t WHERE a = $1").await.unwrap();
    let select = client
        .prepare("SELECT count(*) FROM t WHERE a = $1")
        .await
        .unwrap();

    let i = 1;
    client.execute(&insert, &[&i]).await.unwrap();

    // In a loop, execute a transaction that deletes the row from the table then
    // inserts it. After commit, there should always be 1 row in the table if the
    // statements are executed in the order specified in try_join and "Requests
    // are executed in the order that they are first polled, not in the order that
    // their futures are created."
    loop {
        let tx = client.transaction().await.unwrap();
        future::try_join(
            tx.execute("DELETE FROM t WHERE a = $1", &[&i]),
            // Switching the delete to be a prepared statement works.
            // tx.execute(&_delete, &[&i]),
            // This will fail if there was already a row due to the primary key constraint.
            tx.execute(&insert, &[&i]),
        )
        .await
        .unwrap();
        tx.commit().await.unwrap();
        let val: i64 = client.query_one(&select, &[&i]).await.unwrap().get(0);
        assert_eq!(val, 1);
    }
}
sfackler commented 2 years ago

Yeah, that was the first thing I mentioned in my first post above. If the statement is not already prepared, executing it requires two query/response pairs and only the first will happen before later queries you're pipelining.

david-monroe commented 1 year ago

Currently it does not look like pipelining of dependent queries can really be used. None of the usual futures utilities like try_join and FuturesOrdered guarantee the order in which the futures are polled.

This could be made much simpler by having execute etc. be non-async functions that reserve a place in the outgoing queue ahead of time and only then return a future.

david-monroe commented 1 year ago

After looking at the source code I think we could have a variant of execute that only works with prepared statements and that is synchronous until https://github.com/sfackler/rust-postgres/blob/master/tokio-postgres/src/query.rs#L126

david-monroe commented 1 year ago

I'v also opened https://github.com/rust-lang/futures-rs/issues/2674 to ask for guarantees from FuturesUnordered.

xoac commented 1 year ago

I have very similar problem with pipelining. I have stream of async call to DB that I want to speed up. pipelining seems like big win for this. But it's hard for me to decide if the code below is sound. Unfortunately order of execute has matter. If I understand correctly - currently the code does what I want, but this depends on buffer_unordered or buffered implementation. Maybe it would be possible to add some wrapper to tokio-postgres that would guarantee that stream is executed in order? this could be similar with try_join!(). As for now it could be just alias - but in the future it could switch to different implementation if try_join or buffered change implementation.

// `iter` - contains items to upsert to postgres
tokio_stream::iter(
    iter.remainder()
        .iter()
        .map(|one| async { self.store_one(client, one).await }),
)
.buffer_unordered(MAX_CONCURRENT)
.try_collect::<Vec<_>>()
.await
.map(drop)
MarioIshac commented 6 months ago

I'm having the same problem. Ideally, I should be able to pipeline queries where the latter is dependent on the former, since ultimately they will execute sequentially. Having the order of future execution not be in order kills that capability.

sfackler commented 6 months ago

Prepare the statements before executing them and they will run in order.