sfackler / rust-postgres

Native PostgreSQL driver for the Rust programming language
Apache License 2.0
3.42k stars 436 forks source link

Recommended way to send multiple `execute` statement to insert items in a Vec #1085

Closed skittishdev closed 6 months ago

skittishdev commented 8 months ago

Hello,

First off, thank you for the awesome crate!

I have a Vec<_> of items that I'd like to insert. The length of the Vec<_> changes a little each time based on some user input. I'd like to use prepared statements to achieve this. However, I'm running into lifetime issues, and I'm not sure if I'm missing some feature in this crate that will help me pipeline my requests.

This is what I have right now

let stmt = client.prepare("INSERT INTO table (cola, colb) values ($1, $2);").await?
for item in items {
    client.execute(&stmt, &[&item.cola, &item.colb]).await?;
}

What I already tried

This obviously throws an error because item references don't live long enough

let stmt = client.prepare("INSERT INTO table (cola, colb) values ($1, $2);").await?
let mut futs = Vec::new();
for item in items {
    futs.push(client.execute(&stmt, &[&item.cola, &item.colb]));
}
futures::future::join_all(futs).await;

I also tried preparing as many statements as I have items in the Vec<_>, but I'm still having issues

let stmts: Vec<_> = (0..items.len()).map(|_idx| {
    client.prepare("INSERT INTO table (cola, colb) values ($1, $2);")
}).collect();
let stmts = join_all(stmts).await.into_iter().map(|res| res.unwrap()).collect(); // I know the unwrap() is not ideal here, but let's ignore this for now
let futs: Vec<_> = zip(items, stmts).map(|(item, stmt)| {
    client.execute(
        &stmt,
        &[&item.cola, &item.colb],
    )
}).collect();
join_all(futs).await;
sfackler commented 8 months ago

I can't be sure without seeing the rest of the code, but I think option 2 would probably work if you iterated over items by reference instead of by value.

Depending on the size of items, you may want to use a COPY query instead to insert in bulk rather than separately.

skittishdev commented 8 months ago

I can't be sure without seeing the rest of the code, but I think option 2 would probably work if you iterated over items by reference instead of by value.

Like this?

let stmt = client.prepare("INSERT INTO table (cola, colb) values ($1, $2);").await?
let mut futs = Vec::new();
for item in items.iter() {
    futs.push(client.execute(&stmt, &[&item.cola, &item.colb]));
}
futures::future::join_all(futs).await;

Still gives me a temporary value dropped while borrowed error

Depending on the size of items, you may want to use a COPY query instead to insert in bulk rather than separately.

I don't think COPY is appropriate here, since I Vec::len() ranges from 1 to lower multiples of 10 at best.

skittishdev commented 8 months ago

If you meant the other approach with zip, I am now seeing this error:

error[E0308]: mismatched types
   --> src/main.rs:37:13
    |
35  |         client.execute(
    |                ------- arguments to this method are incorrect
36  |             stmt,
37  |             &param[..],
    |             ^^^^^^^^^^ expected `&[&dyn ToSql + Sync]`, found `&[&String]`
    |
    = note: expected reference `&[&dyn ToSql + Sync]`
               found reference `&[&String]`

Here is a minimal reproduction.

use std::iter::zip;
use tokio_postgres::{Client, NoTls, Error};
use futures::future::join_all;

#[derive(Debug)]
struct Item {
    cola: String,
    colb: String,
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    // Create items
    let items = vec![
        Item { cola: "foo".to_owned(), colb: "bar".to_owned() },
        Item { cola: "baz".to_owned(), colb: "qux".to_owned() },
    ];

    // Create client
    let client = client().await?;

    // Prepare statement
    let stmts: Vec<_> = (0..items.len()).map(|_idx| {
        client.prepare("INSERT INTO table (cola, colb) values ($1, $2);")
    }).collect();
    let stmts: Vec<_> = join_all(stmts).await.into_iter().map(|res| res.unwrap()).collect(); // I know the unwrap() is not ideal here, but let's ignore this for now

    // Create params vector
    let params: Vec<_> = items.into_iter().map(|item| {
        vec![item.cola.as_str(), item.colb.as_str()]
    }).collect();

    // Vec to track futures
    let futs: Vec<_> = zip(&params, &stmts).map(|(param, stmt)| {
        client.execute(
            stmt,
            &param[..],
        )
    }).collect();
    let res = join_all(futs).await;
    println!("{res:#?}");
    Ok(())
}

async fn client() -> Result<Client, Error> {
    let host = "localhost";
    let user = "postgres";
    let password = "postgres";
    let app = "postgrestest";
    let db = "postgres";
    let (client, conn) = tokio_postgres::connect(&format!("host={host} user={user} password={password} application_name={app} dbname={db}"), NoTls).await?;

    tokio::spawn(async move {
        if let Err(e) = conn.await {
            panic!("postgres connection error: {:#?}", e);
        }
    });

    Ok(client)
}

I also tried annotating the type of params like so:

let params: Vec<Vec<&(dyn ToSql + Sync)>> = items.into_iter().map(|item| {
        vec![item.cola.as_str(), item.colb.as_str()]
    }).collect();

But that results in a different error:

error[E0277]: a value of type `Vec<Vec<&dyn ToSql + Sync>>` cannot be built from an iterator over elements of type `Vec<String>`
    --> src/main.rs:32:8
     |
32   |     }).collect();
     |        ^^^^^^^ value of type `Vec<Vec<&dyn ToSql + Sync>>` cannot be built from `std::iter::Iterator<Item=Vec<String>>`
     |
     = help: the trait `FromIterator<Vec<String>>` is not implemented for `Vec<Vec<&dyn ToSql + Sync>>`
     = help: the trait `FromIterator<T>` is implemented for `Vec<T>`
note: the method call chain might not have had the expected associated types
    --> src/main.rs:30:67
     |
15   |       let items = vec![
     |  _________________-
16   | |         Item { cola: "foo".to_owned(), colb: "bar".to_owned() },
17   | |         Item { cola: "baz".to_owned(), colb: "qux".to_owned() },
18   | |     ];
     | |_____- this expression has type `Vec<Item>`
...
30   |       let params: Vec<Vec<&(dyn ToSql + Sync)>> = items.into_iter().map(|item| {
     |  _______________________________________________________-----------_^
     | |                                                       |
     | |                                                       `Iterator::Item` is `Item` here
31   | |         vec![item.cola, item.colb]
32   | |     }).collect();
     | |______^ `Iterator::Item` changed to `Vec<String>` here
note: required by a bound in `std::iter::Iterator::collect`
    --> /Users/rchilaka/.rustup/toolchains/stable-x86_64-apple-darwin/lib/rustlib/src/rust/library/core/src/iter/traits/iterator.rs:2049:19
     |
2049 |     fn collect<B: FromIterator<Self::Item>>(self) -> B
     |                   ^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `Iterator::collect`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `tztest` (bin "tztest") due to previous error
sfackler commented 8 months ago

You may need to type-hint the inner vec.