blackbeam / rust-mysql-simple

Mysql client library implemented in rust.
Apache License 2.0
665 stars 145 forks source link

Bulk inserts are tricky to do efficiently #59

Open Diggsey opened 8 years ago

Diggsey commented 8 years ago

I had to implement some kind of batching myself to improve performance. This involved building up a query string over time, something like INSERT INTO table (fields...) VALUES (?, ?, ?), (?, ?, ?), (?, ?, ?), ...

The query builder is not particularly well suited to this, and building up the query via string concatenation seems relatively inefficient. Also I could only support positional parameters, since the name parsing code is not exposed (which means I couldn't use Params::make_positional).

I don't know if there's a more efficient way to do this at the protocol level?

At the very least I think one or more of the following would be useful:

blackbeam commented 8 years ago
  1. Most efficient way, i think, is using LOAD DATA INFILE if applicable.
  2. The Second option is a large handcrafted statement or query:

    • For handcrafted query (:warning: will strongly not recommend) you should be aware of the max_allowed_packet value.
    • For a handcrafted statement, you also should be aware of maximum allowed parameters. It may look like this (in pseudo-rust and without checks)
     fn bulk_insert<F, P>(
        pool: &crate::Pool,
        table: String,
        cols: Vec<String>,
        objects: Vec<Obj>,
        fun: F,
    ) -> crate::Result<()>
    where
        F: Fn(&Obj) -> P,
        P: Into<Params>,
    {
        let mut stmt = format!("INSERT INTO {} ({}) VALUES ", table, cols.join(","));
        let row = format!(
            "({}),",
            cols.iter()
                .map(|_| "?".to_string())
                .collect::<Vec<_>>()
                .join(",")
        );
        stmt.reserve(objects.len() * (cols.len() * 2 + 2));
        for _ in 0..objects.len() {
            stmt.push_str(&row);
        }
    
        // remove the trailing comma
        stmt.pop();
    
        let mut params = Vec::new();
        for o in objects.iter() {
            let named_params: crate::Params = fun(o).into();
            let positional_params = named_params.into_positional(&cols)?;
            if let crate::Params::Positional(new_params) = positional_params {
                for param in new_params {
                    params.push(param);
                }
            }
        }
    
        let mut conn = pool.get_conn()?;
        conn.exec_drop(stmt, params)
    }
    // ...
    bulk_insert(pool, "table", vec!["value1", "value2"], objects, |object| {
        params! {
            "value1" => object.value1,
            "value2" => object.value2,
        }
    });
  3. Less efficient but most convenient option is to execute one statement per row. It looks like you can speed up this process by setting autocommit to 0 (:warning: run SET autocommit=0 and all of INSERT queries on specific connection by taking it from the pool), but do not forget to set autocommit back to 1. It may look like this (in pseudorust):

    fn bulk_insert<F, P>(pool: my::Pool, stmt: &str, objs: Vec<Object>, fun: F) -> my::Result<()>
    where F: Fn(&Object) -> P,
         P: Into<my::Params>,
    {
       let mut conn = try!(pool.get_conn());
       try!(conn.query("SET autocommit=0"));
       {
           let mut stmt = try!(pool.prepare(stmt));
           for obj in objs.iter() {
               try!(stmt.execute(fun(obj)));
           }
       }
       try!(conn.query("COMMIT"));
       try!(conn.query("SET autocommit=1"));
    }
    // ...
    bulk_insert(pool, "INSERT INTO table (value1, value2) VALUES (:value1, :value2)", objs, |obj| {
       params! {
           "value1" => obj.value1,
           "value2" => obj.value2,
       }
    });
Diggsey commented 8 years ago

@blackbeam Thanks for the detailed reply: 1) is impossible since I'm using google's Cloud SQL database, however it did lead me to find LOAD DATA LOCAL INFILE which does work.

It looks like at the protocol level, the MySQL server receives the query with the filename, then sends back a request to the client asking for the contents of that file, which the client then sends to the server. Based on this it seems the client could actually send data from memory rather than a file, which opens up a lot of possibilities.

2) this is essentially what I've resorted to doing at the moment, but as you say I'm at the mercy of the max_allowed_packet and other arbitrary limits.

3) I'm already in a transaction so auto-commit is off anyway. The problem is the network latency to the server, because even a few ms RTT results in long delays for 10,000 rows.

Diggsey commented 8 years ago

For reference, I'm inserting ~2 million rows into one table with fairly large rows, and ~10 million rows into another table which is much more lightweight.

Currently it takes around ~2 hours to insert all the data, even using a batch size of 512. (yep, discovered that utf-8 bug ~30 mins into the import :unamused: ) and despite the large quantity of data I don't think it should take that long.

0xpr03 commented 6 years ago

@Diggsey have you tried things like LOCK TABLES `mytable` WRITE; and then done insert into.. and afterwards UNLOCK TABLES; (using the same connection). This could also be a bottleneck and is the way mysql dump works.

Diggsey commented 6 years ago

@0xpr03 I solved this using LOAD DATA FROM LOCAL INFILE and then installing a custom handler which streamed the data directly from memory.

hpca01 commented 4 years ago

@0xpr03 I solved this using LOAD DATA FROM LOCAL INFILE and then installing a custom handler which streamed the data directly from memory.

Sorry, do you mind showing the example of your LocalInfileHandler that you implemented? I am lost as to what traits the handler should be implementing.

Diggsey commented 4 years ago

@hpca01 there's an example in the documentation: https://docs.rs/mysql/18.0.0/mysql/struct.LocalInfileHandler.html

hpca01 commented 4 years ago

@Diggsey I apologize as I am still new to the language, I got the whole thing to work by doing the following:

val.conn
    .set_local_infile_handler(Some(LocalInfileHandler::new(move |_, stream| {
        for a_val in input.as_ref() {
            write!(stream, "{}", a_val).unwrap();
        }
        Ok(())
    })));
match val
    .conn
    .query("LOAD DATA LOCAL INFILE 'file' INTO TABLE schema.price_type_pricelineitem (price_type, price_amount, per_unit, linked_ndc_id, price_internal_id)")
{
    Ok(_) => {}
    Err(err) => println!("Error {}", err),
}

I wanted to re-write it to exclude the closure and have a concrete struct that implements whatever trait is required to make it easier for the next person.

Diggsey commented 4 years ago

The trait that's required is FnMut. This is implemented automatically by closures, but implementing FnMut for your own types is still unstable.

I'm not really sure what you hope to make easier by moving it to a struct, but LocalInfineHandler is just a struct, so if you want to encapsulate creating that struct you can just make a function that returns it and hides the fact that you're supplying a closure...

hpca01 commented 4 years ago

@Diggsey thanks so much for the clear explanation 😄

I will just leave it the way it is, and write some detailed comments instead. Thanks again for your patience, and your work on the crate.

orangesoup commented 4 years ago

Hey @blackbeam! I have such use case where I need to insert (or update) thousands of rows per second, so my only real option (as far as I know) is your 2nd one, handcrafting the query with ON DUPLICATE KEY UPDATE ....

I've tried to use your pseudorust code with the latest 20.0.1 version, however, I get an error:

conn.exec_drop(stmt, params)
     ^^^^^^^^^ the trait `std::convert::From<mysql::Params>` is not implemented for `mysql::Value`

For objects I've used VecDeque<SomeStruct>, otherwise haven't really changed anything, besides using exec_drop instead of prep_exec.

Is there a way to fix my issue? Will there be a "native" option in the library to do these kind of operations easily later?

blackbeam commented 4 years ago

@orangesoup, hi. params should be a Vec of mysql::Value. I've updated the code snipped in https://github.com/blackbeam/rust-mysql-simple/issues/59#issuecomment-245918807

orangesoup commented 4 years ago

Thank, will check!

midnightcodr commented 3 years ago

Thanks @blackbeam for sharing the code snippet (the second option). I found a tiny issue with the code, stmt would have a dangling comma at the end and trigger a mysql error when exec_drop() is run. To fix that, simply do a

stmt.pop();

right before line

let mut params = Vec::new();

Other than that the bulk insert code works quite nicely.

blackbeam commented 3 years ago

@midnightcodr, I've updated the snippet. Thanks.

midnightcodr commented 2 years ago

@blackbeam 2nd option's bulk_insert function signature can be improved even further by making the Obj type generic, that is

pub fn bulk_insert<F, P, T>(
    pool: &crate::Pool,
    table: String,
    cols: Vec<String>,
    objects: Vec<T>,
    fun: F,
) -> crate::Result<()>
where
    F: Fn(&T) -> P,
    P: Into<Params>,
...

By making such update, bulk_insert is now much more powerful so that I don't have to write a new bulk_insert function (under a new name of course) if I need to bulk insert for a type that's not Obj, for example. I can confirm through testing that making Object generic is working quite well.