launchbadge / sqlx

🧰 The Rust SQL Toolkit. An async, pure Rust SQL crate featuring compile-time checked queries without a DSL. Supports PostgreSQL, MySQL, and SQLite.
Apache License 2.0
12.51k stars 1.19k forks source link

Support for INSERTing multiple records? #294

Open greglearns opened 4 years ago

greglearns commented 4 years ago

How do you do the equivalent of

INSERT INTO my_table (my_num, my_str) VALUES
 ($1,$2)
,($3, $4)
,($5,$6)
... etc. depending on how many records you pass in

By the way, this project looks great!

connec commented 4 years ago

This seems a popular concern right now! I asked this question on Discord and the answer for now is this is not possible, unless you generate the query yourself (and lose a lot of the compile-time-safety features as a result).

There seems to be some activity in the area though:

greglearns commented 4 years ago

@connec I missed those related topics! Thanks for pointing them out.

Yes, for me, this is the biggest need for interacting with SQL -- I can write SQL myself and make sure it is safe, and binding a few arguments isn't hard, but it's a pain to bind an arbitrary number of ($n, $n+1), ($n+2, $n+3), etc.!

jsdw commented 4 years ago

I ran into a similar issue. I think I have a solution (for now), but it is somewhat hideous:

sqlx::query!(
    "WITH
        a AS (SELECT row_number() over(), * FROM UNNEST( $1::UUID[] ) as group_id),
        b AS (SELECT row_number() over(), * FROM UNNEST( $2::UUID[] ) as variable_id),
        c AS (SELECT row_number() over(), * FROM UNNEST( $3::INTEGER[] ) as ordering)
    INSERT INTO groups
    SELECT a.group_id, b.variable_id, c.ordering FROM a
        JOIN b ON a.row_number = b.row_number
        JOIN c ON a.row_number = c.row_number",
    &group_id_arr,
    &self.variable_ids,
    &order_arr
).execute(&mut *conn).await?;

The "trick" is that you can pass in a Vector of items to be a Postgres ARRAY type, and UNNEST that array to turn it into a set of rows. So, above, I basically split the thing I want to insert into a vec for each column of values (fortunately only 3 for me), which each get turned into a temporary table with row numbers. Once I have these, I can join them all on the row numbers to produce a final table of values, which I then pass to the INSERT.

It's pretty hideous, and I'd love to find a more concise or better approach, but it's the best I have so far!

I need to actually test it properly next, to make sure it does what I am hoping it does..

jsdw commented 4 years ago

As an aside, I also tried doing a simpler approach of passing a single array of all of the values I wanted to update at once, but I hit a wall where SQLx did not understand the type of the array I was trying to provide, so that's what led me to splitting the array up and recombining it in SQL instead.

jsdw commented 4 years ago

Just a quick FYI that, at least from a postgres POV, this works as I'd like. Here's a brief example:

db=> create table foo (id integer, foo text, bar boolean);
CREATE TABLE
db=> WITH a AS (select row_number() over(), * from unnest(ARRAY[1,2,3]) as id), b AS (select row_number() over(), * from unnest(ARRAY['a','b','c']) as foo), c AS (select row_number() over(), * from unnest(ARRAY[true,false,true]) as bar) INSERT INTO foo SELECT a.id, b.foo, c.bar FROM a JOIN b ON a.row_number = b.row_number JOIN c ON a.row_number = c.row_number;
INSERT 0 3
db=> select * from foo;
 id | foo | bar 
----+-----+-----
  1 | a   | t
  2 | b   | f
  3 | c   | t
shiftrtech commented 3 years ago

Hi, is there a way to do this i need to make a 25 inserts at once

greglearns commented 3 years ago

For Postgres, the below works, and is easy. Maybe this is good enough and nothing else is needed to support INSERTing multiple records, at least for Postgres.

let lala = vec![("abc", true), ("xyz", false)];
let mut v1: Vec<String> = Vec::new();
let mut v2: Vec<bool> = Vec::new();
lala.into_iter().for_each(|todo| {
    v1.push(todo.0.into());
    v2.push(todo.1);
});
dbg!((&v1, &v2));
let todo = sqlx::query(
    r#"INSERT INTO foo (description, done)
    SELECT description, done
    FROM UNNEST($1, $2) as a(description, done)
    RETURNING id, description, done"#,
)
.bind(&v1)
.bind(&v2)
.map(|row: PgRow| Todo {
    id: row.get(0),
    description: row.get(1),
    done: row.get(2),
})
.fetch_one(&mut tx)
.await.map_err(|e| dbg!(e) )?;
shiftrtech commented 3 years ago

@greglearns can i use this with #[derive(sqlx::FromRow)]?

greglearns commented 3 years ago

@shiftrtech I haven't tried it, so I don't know. Let us know what you find out!

greglearns commented 3 years ago

Slightly easier (IMHO cleaner) Sql:

r#"INSERT INTO foo (description, done)
SELECT * FROM UNNEST($1, $2)
RETURNING id, description, done"#,
itsfarseen commented 3 years ago

Does handling this in the application have any disadvantages? We could do this by doing multiple single inserts using a for loop right?

jplatte commented 3 years ago

@itsfarseen Yes, doing it in a loop means a lot more back-and-forth between application and DB.

greglearns commented 3 years ago

Until this is fully supported, you can send an entire json object/list of records, in one insert command and then in the insert query break the json into rows (in postgres, it is something like "jsonb_to_rows"). This is less efficient than being able to do it directly (using binary communication instead of json), but much much better than doing single inserts.

On Fri, Apr 30, 2021 at 4:24 AM Jonas Platte @.***> wrote:

@itsfarseen https://github.com/itsfarseen Yes, doing it in a loop means a lot more back-and-forth between application and DB.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/launchbadge/sqlx/issues/294#issuecomment-829998845, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABFXGOUGTUKBC54YKDOEE5LTLKAPJANCNFSM4MXEPZMQ .

itsfarseen commented 3 years ago

Is doing multiple single inserts that much inefficient? For example consider a web app route POST /todos that let you insert 10 rows. I understand that a post request will take longer to execute because now the request handler has to .await 10 times instead of just one time and other I/O operations can come in between those .awaits. But the data the request handler has to send to postgres, isn't it almost same as above?
"Almost" because now there will be 10 headers, one for each row instead of 1 header for all 10 rows when we send data to postgres. Comparing this with the overhead of doing the json conversion, wouldn't this be more efficient?

jplatte commented 3 years ago

If the're on different servers that are "far apart" (in different datacenters) then the JSON solution is most likely faster, maybe substantially faster. If they're on the same machine then you're probably right and the JSON solution is slower, but not slow enough that you would notice outside of microbenchmarks. If it really matters, benchmark it for your specific situation.

abonander commented 3 years ago

I plan on improving the UNNEST($1, $2, $3) approach by adding an adapter that lets you bind iterators directly so you don't need extra allocations to do the Array-of-Structs to Struct-of-Arrays conversion:

let lala = vec![("abc", true), ("xyz", false)];

let todo = sqlx::query(
    r#"INSERT INTO foo (description, done)
    SELECT * FROM UNNEST($1, $2)
    RETURNING id, description, done"#,
)
.bind(&PgArray(lala.iter().map(|it| it.0)))
.bind(&PgArray(lala.iter().map(|it| it.1)))

And yes, @shiftrtech you can rewrite this with query_as to use FromRow:

#[derive(sqlx::FromRow)]
struct Todo {
    id: i32,
    description: String,
    done: bool
}

let lala = vec![("abc", true), ("xyz", false)];
let mut v1: Vec<String> = Vec::new();
let mut v2: Vec<bool> = Vec::new();
lala.into_iter().for_each(|todo| {
    v1.push(todo.0.into());
    v2.push(todo.1);
});

let todo: Todo = sqlx::query_as(
    r#"INSERT INTO foo (description, done)
    SELECT * FROM UNNEST($1, $2)
    RETURNING id, description, done"#,
)
// .bind(&PgArray(lala.iter().map(|it| it.0)))
// .bind(&PgArray(lala.iter().map(|it| it.1)))
.bind(&v1)
.bind(&v2)
.fetch_one(&mut tx)
.await.map_err(|e| dbg!(e) )?;

For convenience, we might just add .bind_array() as well to the Query and QueryAs types so it might look like this:

let todo: Todo = sqlx::query_as(
    r#"INSERT INTO foo (description, done)
    SELECT * FROM UNNEST($1, $2)
    RETURNING id, description, done"#,
)
.bind_array(lala.iter().map(|it| it.0))
.bind_array(lala.iter().map(|it| it.1))
.fetch_one(&mut tx)
.await
.map_err(|e| dbg!(e) )?;
abonander commented 3 years ago

Just realized though that PgArray would need &mut on the iterator during encoding but Encode::encode() takes &self. Anyone think RefCell is a bad idea here? Or maybe it can just require the iterator to be Clone?

markose commented 2 years ago

I also need to write multiple records to the database. If I understand it correctly, there is no support for multi-inserts in sqlx yet. I have tried the following and the performance is very poor. The execution takes over 2s for 100 records. Is there a prepared statement created here and will it be reused the next time the function is called? Is the statement parsed by sqlx or is it sent directly to the database? There is an index over all 7 columns in exactly this order. Could this cause performance problems? Does anyone have experience with massive inserts into a postgres database?

pub async fn insert_index_rows(
    tx: &mut Transaction<'_, Postgres>,
    rows: &Vec<&IndexRecord>,
) -> Result<(), sqlx::Error> {
    log::debug!("insert_index_rows: {}", rows.len());
    let mut v1: Vec<String> = Vec::new();
    let mut v2: Vec<String> = Vec::new();
    let mut v3: Vec<String> = Vec::new();
    let mut v4: Vec<Vec<u8>> = Vec::new();
    let mut v5: Vec<String> = Vec::new();
    let mut v6: Vec<String> = Vec::new();
    let mut v7: Vec<String> = Vec::new();
    rows.into_iter().for_each(|row| {
        v1.push(row.path.clone());
        v2.push(row.viewname.clone());
        v3.push(row.fieldname.clone());
        v4.push(row.fieldvalue.clone());
        v5.push(row.name.clone());
        v6.push(row.viewentry.read().unwrap().id.clone());
        v7.push(row.get_hash().clone());
    });
    sqlx::query("INSERT INTO resources_index (path, viewname, fieldname, fieldvalue, name, viewentry_id, id) SELECT * FROM UNNEST ($1,$2,$3,$4,$5,$6,$7)")
    .bind(v1)
    .bind(v2)
    .bind(v3)
    .bind(v4)
    .bind(v5)
    .bind(v6)
    .bind(v7)
    .execute(&mut *tx)
    .await?;
    Ok(())
}
itsfarseen commented 2 years ago

@markose this could be because of creating 7 different vecs and lots of cloning. It might get optimized away in release builds. Not sure tho. Could you check in a release build?

markose commented 2 years ago

@itsfarseen Thanks for advice. The "problem" occurred in the release build. It is also not the Rust logic, but the statement itself that takes so much time. I know it because sqlx issues a [WARN] message. I have now rebuilt it so that the rows are inserted as multi-insert (INSERT INTO ... VALUES ...). This has improved it a bit. My main question is whether this statement, as it stands, is cached in the database as a prepared statement or is it re-parsed on each call?

markose commented 2 years ago

I'm talking about this warning, which appears very often.

[2021-07-24T16:00:05Z WARN  sqlx::query] INSERT INTO resources_index (path, …; rows: 0, elapsed: 1.516s

    INSERT INTO
      resources_index (
        path,
        viewname,
        fieldname,
        fieldvalue,
        name,
        viewentry_id,
        hash
      )
    VALUES
      ($1, $2, $3, $4, $5, $6, $7),
    ($8, $9, $10, $11, $12, $13, $14),
    ...
    ($687, $688, $689, $690, $691, $692, $693),
    ($694, $695, $696, $697, $698, $699, $700)

It inserts 100 rows with small values (overall ~200 bytes per row) There are two indeces, one over all columns and one over hash column and no primary keys.

Disk iops: 9 requests completed in 9.63 ms, 72 KiB written, 934 iops, 7.30 MiB/s generated 10 requests in 9.00 s, 80 KiB, 1 iops, 8.89 KiB/s min/avg/max/mdev = 925.4 us / 1.07 ms / 1.28 ms / 100.3 us

Test is run on Google GCE (16 Cores / 64 GB RAM) and local scratch disk.

abonander commented 2 years ago

@markose yes the query is prepared and cached if you use the sqlx::query() interface (both the functions and the macros), but keep in mind that you're going to be generating what looks like a unique query for every different length of the vector with your rewritten approach. Postgres might recognize that the query plans are similar but I doubt that it's that smart. Also keep in mind that the protocol has an inherent limit of 65535 bind parameters per query so you need to break up your inserts into batches no larger than floor(65535 / number_of_fields_per_row).

I have some suggestions regarding your UNNEST() version though. You're basically doing a deep clone of the entire input vector, and you're also not amortizing the allocations of the sub-vectors even though you know the exact length. How does this version perform?

pub async fn insert_index_rows(
    tx: &mut Transaction<'_, Postgres>,
    rows: &Vec<&IndexRecord>,
) -> Result<(), sqlx::Error> {
    log::debug!("insert_index_rows: {}", rows.len());
    let mut v1: Vec<&str> = Vec::with_capacity(rows.len());
    let mut v2: Vec<&str> = Vec::with_capacity(rows.len());
    let mut v3: Vec<&str> = Vec::with_capacity(rows.len());
    let mut v4: Vec<&[u8]> = Vec::with_capacity(rows.len());
    let mut v5: Vec<&str> = Vec::with_capacity(rows.len());
    let mut v6: Vec<String> = Vec::with_capacity(rows.len());
    let mut v7: Vec<&str> = Vec::with_capacity(rows.len());
    rows.into_iter().for_each(|row| {
        v1.push(&row.path);
        v2.push(&row.viewname);
        v3.push(&row.fieldname);
        v4.push(&row.fieldvalue);
        v5.push(&row.name);
        v6.push(row.viewentry.read().unwrap().id.clone());
        v7.push(row.get_hash());
    });
    sqlx::query("INSERT INTO resources_index (path, viewname, fieldname, fieldvalue, name, viewentry_id, id) SELECT * FROM UNNEST ($1,$2,$3,$4,$5,$6,$7)")
    .bind(v1)
    .bind(v2)
    .bind(v3)
    .bind(v4)
    .bind(v5)
    .bind(v6)
    .bind(v7)
    .execute(&mut *tx)
    .await?;
    Ok(())
}

For v6 it looks like another struct behind a std::sync::RwLock. There's unfortunately no way to convert that to a reference to the id field while still holding the read lock (I suppose you could collect a vector of RwLockReadGuards and then collect another vector referencing those but... gross), so you still end up needing to .clone() that. It should still be a significant performance improvement though.

In an async application I would recommend instead using tokio::sync::RwLock so that you're not blocking core threads if you have a task concurrently trying to acquire a write lock. Tokio's RwLock allows you to .map() the read guard which would help here except .bind() unfortunately won't accept a vector of those anyway since they don't implement sqlx::Type or sqlx::Encode, although I suppose we would accept a PR to fix that.

You might also consider modeling your data differently, maybe just storing that id field as viewentry_id: String directly in IndexRecord, in which case it can be bound like the other fields.

markose commented 2 years ago

@abonander Thanks a lot for your help! I will test this. Also I will change implementation to tokio::sync::RwLock, maybe I can completely remove this Arc.

markose commented 2 years ago

I've removed the hash (MD5 over all fields), which was used as ID for the row to speed up Inserts. Unfortunately, I now have the problem with the DELETE. I can now no longer use WHERE hash IN (...). Therefore I wanted to execute the DELETE statements in parallel.

pub async fn delete_index_rows(
    tx: &mut Transaction<'_, Postgres>,
    rows: Vec<&IndexRecord>,
) -> Result<(), sqlx::Error> {
    log::debug!("delete_index_rows: {}", rows.len());
    let mut futures = Vec::with_capacity(rows.len());
    for row in rows {
        let q = sqlx::query("DELETE FROM resources_index WHERE path = $1 AND viewname = $2 AND fieldname = $3 AND fieldvalue = $4 AND name = $5 AND viewentry_id = $6")
        .bind(&row.path)
        .bind(&row.viewname)
        .bind(&row.fieldname)
        .bind(&row.fieldvalue)
        .bind(&row.name)
        .bind(row.viewentry.read().unwrap().id.clone())
        .execute(&mut *tx);
        futures.push(q);
    }
    futures::future::join_all(futures);
    Ok(())
}

Unfortunately, this does not seem to be possible in the context of a transaction.

    |
293 |         .execute(&mut *tx);
    |                  ^^^^^^^^ `*tx` was mutably borrowed here in the previous iteration of the loop

Do you have an idea for this?

pauldorehill commented 2 years ago

@markose you are breaking one of the fundamental rules of rust: a single mutable reference within a scope. To get around this (though I'm not sure if you'd really want to.. or of the implications in terms of a transaction) you would need to use a shared interior mutability pattern of something like a Rc<Mutex<T>>. This will require you to block a shared thread from the async executor so may not give you any benefit. Something like the below will compile:

async fn run_query(tx: Rc<Mutex<Transaction<'_, Postgres>>>) {
    let mut tx = tx.lock().unwrap();
    query("SELECT * FROM my_table").execute(&mut *tx).await.unwrap();
}

async fn looper(tx: &Rc<Mutex<Transaction<'_, Postgres>>>) {
    let mut futures = vec![];
    for _ in 0..100 {
        futures.push(run_query(Rc::clone(tx)))
    }
    futures::future::join_all(futures).await;
}
markose commented 2 years ago

@pauldorehill I have also thought about an exclusive lock. Thank you. I will try this.

abonander commented 2 years ago

You don't need a Mutex, you can do a bulk delete using the same UNNEST trick as DELETE supports it with USING ..., but it's overkill to match on all the columns of a table in a DELETE statement.

Normally, if your table has a PRIMARY KEY or UNIQUE column (it really should anyway) you can just delete based on that with
= ANY($1):

sqlx::query("DELETE FROM resources_index WHERE path = ANY($1)")
    .bind(&paths_to_delete)
    .execute(&mut *tx)
    .await?;

Or even if none of the columns are technically unique, one of them usually has a common value when you're deleting a chunk of rows.

jprochazk commented 2 years ago

I plan on improving the UNNEST($1, $2, $3) approach by adding an adapter that lets you bind iterators directly so you don't need extra allocations to do the Array-of-Structs to Struct-of-Arrays conversion: ...

I keep running into this, could I take a stab at implementing it? I think the RefCell approach would work well. If I understand correctly, this would require exposing a type PgArray which holds the iterator, and then implementing Encode<Postgres> for that type (taking into account that it is an iterator, and not an actual array).

abonander commented 2 years ago

My ideal would be a type that implements Encode when T: IntoIterator and Decode when T: FromIterator so you can use it in both directions.

The current Encode/Decode trait story is kind of a mess, though. I've been meaning to simplify it, but it partially exists due to some really unhelpful compiler errors when you structure the traits the logical way. @mehcode has more context there since he authored these traits.

jprochazk commented 2 years ago

I think streaming rows already captures any use cases I can think of for Decode, can you elaborate a bit more?

kevincox commented 2 years ago

I had trouble inserting array fields here because UNNEST on multi-dimensional arrays fully flattens the values (at least on postgres). I ended up switching from using UNNEST to doing a huge cross join and using generate_subscripts to pick out the right entries.

Here is the slightly simplified example.

INSERT INTO entries(
    feed,
    source_url,
    categories,
    source_body_html)
SELECT
    ($1::INT8[])[i],
    NULLIF(($2::TEXT[])[i], ''),
    COALESCE(
        (SELECT ARRAY_AGG(e) FROM jsonb_array_elements_text(($6::JSONB[])[i]) as e),
        ARRAY[]::TEXT[]),
    ($8::TEXT[])[i]
FROM generate_subscripts($1::INT8[], 1) AS i
RETURNING entries.id, entries.source_url

I've only really tested with <50 rows but there is no obvious slowdown occurring here. It seems to be fairly fast.

That being said this is an incredibly bad API:

I would love to see a good interface here. Ideally I could just insert an IntoIterator<Item: impl ToRow> or similar. Batching as needed could be handled automatically. It would be nice if this used the bulk-loading features of each DBMS but that may not be feasible because at least for postgres COPY doesn't allow returning anything.

Iron-E commented 2 years ago

Just noting that you can now use QueryBuilder::push_values for this purpose.

jplatte commented 1 year ago

@arturs706 please ask on StackOverflow, Discord (see README) or https://reddit.com/r/learnrust instead of hijacking random issues to ask unrelated questions.

pureliani commented 11 months ago

@Iron-E's answer helped and i made it work with a query builder:

use sqlx::{Pool, Postgres, QueryBuilder};

struct Category(String, Option<i32>);

async fn insert_categories(categories: Vec<Category>, pool: &Pool<Postgres>) {
    let mut query_builder = QueryBuilder::new("INSERT INTO product_categories (name, parent_id) ");

    query_builder.push_values(categories, |mut b, new_category| {
        b.push_bind(new_category.0).push_bind(new_category.1);
    });

    let query = query_builder.build();

    query.execute(pool);
}
tgross35 commented 5 months ago

In case anyone finds it useful, query builder can also be used with ON DUPLICATE KEY UPDATE and RETURNING clauses (only works with MariaDB, not MySQL). EXPECTED_RECORDS is an array of structs and the column key is a unique key. I'm using this query to map a Rust enum to record IDs and ensure I only have one copy in the database (init step so there is no error handling).

    let mut qb: QueryBuilder<MySql> = QueryBuilder::new(
        "INSERT INTO event
        (key, success, description) ",
    );
    qb.push_values(EXPECTED_RECORDS.iter(), |mut b, rec| {
        b.push_bind(rec.key)
            .push_bind(rec.success)
            .push_bind(rec.description);
    });
    qb.push(
        "ON DUPLICATE KEY UPDATE
        success=VALUES(success),
        description=VALUES(description)
        RETURNING id",
    );

    let mut rows = qb.build().fetch(pool);
    let mut ret = BTreeMap::new();
    let mut count: usize = 0;

    while let Some(row) = rows.try_next().await.unwrap() {
        // FIXME: it would be best to use `try_get("id")`, but this is currently broken
        // <https://github.com/launchbadge/sqlx/issues/1530>
        let id: u32 = row.try_get(0).unwrap();
        ret.insert(EXPECTED_RECORDS[count].ty, id);
        count += 1;
    }

    // sanity check that all rows were covered
    assert_eq!(count, EXPECTED_RECORDS.len());

I think this is pretty clean, but maybe the checked macros could get some syntax to handle this too. Perhaps

let rows = sqlx::query!(
    "INSERT INTO event
        (key, success description)
    VALUES
        -- This gets expanded to `(?, ?, ?)` for each item in iter
        repeat! { (key, success, description) }
    ON DUPLICATE KEY UPDATE
        success=VALUES(success),
        description=VALUES(description)
    RETURNING id",
    EXPECTED_RECORDS.iter()
).fetch_all(pool).unwrap();

for row in rows {
    println!("id: {id}");
}
joshtriplett commented 1 month ago

For folks looking for a solution for sqlite (which doesn't have UNNEST), I compared the timing for three different approaches for a mass insert of 100k BLOB fields of 32 bytes each:

Here's the result:

Time for mass insert using JSON (batch size 100000): 204.063168ms
Time for mass insert using JSON (batch size 10000): 318.28208ms
Time for mass insert using JSON (batch size 1000): 785.099067ms
Time for mass insert using JSON (batch size 100): 3.351116507s

Time for mass insert using QueryBuilder (batch size 10000): 205.967325ms
Time for mass insert using QueryBuilder (batch size 1000): 734.117792ms
Time for mass insert using QueryBuilder (batch size 100): 3.353956729s

Time for mass insert one by one: 67.237277417s

QueryBuilder performs better, if you're within the limits of QueryBuilder. JSON isn't far behind, though, and doesn't have the same limits. (You're still limited by available memory, though.)