launchbadge / sqlx

🧰 The Rust SQL Toolkit. An async, pure Rust SQL crate featuring compile-time checked queries without a DSL. Supports PostgreSQL, MySQL, and SQLite.
Apache License 2.0
13.04k stars 1.24k forks source link

PostgreSQL copy_in_raw did not work when upgrade v0.6.2 to v0.7.0 #2614

Closed mingjunyang closed 1 year ago

mingjunyang commented 1 year ago

Bug Description

When I use the v0.6.2,it is work ok. When I upgrade v0.6.2 to v0.7.0 , did not work.

2023-07-13 19:52:23.055 CST [350370] ERROR:  unexpected message type 0x31 during COPY from stdin
2023-07-13 19:52:23.055 CST [350370] CONTEXT:  COPY temp_test_copy, line 1
2023-07-13 19:52:23.055 CST [350370] STATEMENT:  copy temp_test_copy(f1,f2) FROM STDIN
2023-07-13 19:52:23.055 CST [350370] FATAL:  terminating connection because protocol synchronization was lost

Minimal Reproduction

use common_lib::get_env_or_default;
use log::{error, info};
use sqlx::postgres::PgPool;
use std::time::SystemTime;
use tokio::io::BufReader;

pub async fn copy_cert_data_to_table(pool: &PgPool, datas: &Vec<i64>) -> bool {
    match pool.begin().await {
        Ok(mut transaction) => {
            let now = SystemTime::now();
            sqlx::query!(
                r#"CREATE TEMP TABLE temp_test_copy (f1 bigint,f2 bigint) ON COMMIT DROP"#
            )
            .execute(&mut *transaction)
            .await
            .unwrap();
            //, NULL AS 'null'
            let mut copy_in = transaction
                .copy_in_raw("copy temp_test_copy(f1,f2) FROM STDIN")
                .await
                .unwrap();

            let temp_datas = datas
                .iter()
                .map(|x| format!("{}\t{}", x, x))
                .collect::<Vec<String>>()
                .join("\n");
            let x = String::from("") + &temp_datas;
            println!("{}", x);
            let buf = BufReader::new(x.as_bytes());

            info!("{:02X?}", buf);

            match copy_in.read_from(buf).await {
                Ok(_) => {}
                Err(e) => error!("{}", e),
            }
            match copy_in.finish().await {
                Ok(_) => {}
                Err(e) => error!("{}", e),
            }
            match transaction.commit().await {
                Ok(_) => info!(
                    "transaction ok:\t{}s\t{}",
                    now.elapsed().unwrap().as_secs_f64(),
                    datas.len()
                ),
                Err(e) => {
                    error!("sql err:{:?}", e)
                }
            }
        }
        Err(e) => error!("sql err cert:{:?}", e),
    }

    false
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    env_logger::init();
    let pg_addr = get_env_or_default!("DATABASE_URL_ENV_ARG", "DATABASE_URL");
    let pool = PgPool::connect(&pg_addr).await.unwrap();
    let ids = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0];
    let f = copy_cert_data_to_table(&pool, &ids).await;
    info!("result:{}", f);
    Ok(())
}
[package]
name = "copy_test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
chrono = { version = "0.4", features = ["serde"] }
env_logger = "0.10"
log ="0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json="1.0"
sqlx = { version = "0.7", default-features = false, features = ["macros", "bigdecimal", "postgres", "chrono", "runtime-tokio-rustls"] }
tokio = { version = "*", features = ["full"] }

Info

mingjunyang commented 1 year ago

I capture the tcp stream, found this.

The data different here. v0.6.2 add 5 byte data before my data, and v0.7.0 change my data from 2 to 5 bytes.

v0.6.2 000002EB 64 00 00 00 2b 31 09 31 0a 32 09 32 0a 33 09 33 d...+1.1 .2.2.3.3 v0.7.0 000002B7 31 00 00 00 2b 09 32 0a 33 09 33 0a 34 09 34 0a 1...+.2. 3.3.4.4.

v0.6.2

0000021C  51 00 00 00 0a 42 45 47  49 4e 00                  Q....BEG IN.
    000002A6  43 00 00 00 0a 42 45 47  49 4e 00 5a 00 00 00 05   C....BEG IN.Z....
    000002B6  54                                                 T
00000227  50 00 00 00 55 73 71 6c  78 5f 73 5f 31 00 43 52   P...Usql x_s_1.CR
00000237  45 41 54 45 20 54 45 4d  50 20 54 41 42 4c 45 20   EATE TEM P TABLE 
00000247  74 65 6d 70 5f 74 65 73  74 5f 63 6f 70 79 20 28   temp_tes t_copy (
00000257  66 31 20 62 69 67 69 6e  74 2c 66 32 20 62 69 67   f1 bigin t,f2 big
00000267  69 6e 74 29 20 4f 4e 20  43 4f 4d 4d 49 54 20 44   int) ON  COMMIT D
00000277  52 4f 50 00 00 00 44 00  00 00 0e 53 73 71 6c 78   ROP...D. ...Ssqlx
00000287  5f 73 5f 31 00 53 00 00  00 04                     _s_1.S.. ..
    000002B7  31 00 00 00 04 74 00 00  00 06 00 00 6e 00 00 00   1....t.. ....n...
    000002C7  04 5a 00 00 00 05 54                               .Z....T
00000291  42 00 00 00 18 00 73 71  6c 78 5f 73 5f 31 00 00   B.....sq lx_s_1..
000002A1  01 00 01 00 00 00 01 00  01 45 00 00 00 09 00 00   ........ .E......
000002B1  00 00 00 43 00 00 00 06  50 00 53 00 00 00 04      ...C.... P.S....
    000002CE  32 00 00 00 04 43 00 00  00 11 43 52 45 41 54 45   2....C.. ..CREATE
    000002DE  20 54 41 42 4c 45 00 33  00 00 00 04 5a 00 00 00    TABLE.3 ....Z...
    000002EE  05 54                                              .T
000002C0  51 00 00 00 2a 63 6f 70  79 20 74 65 6d 70 5f 74   Q...*cop y temp_t
000002D0  65 73 74 5f 63 6f 70 79  28 66 31 2c 66 32 29 20   est_copy (f1,f2) 
000002E0  46 52 4f 4d 20 53 54 44  49 4e 00                  FROM STD IN.
    000002F0  47 00 00 00 0b 00 00 02  00 00 00 00               G....... ....
000002EB  64 00 00 00 2b 31 09 31  0a 32 09 32 0a 33 09 33   d...+1.1 .2.2.3.3
000002FB  0a 34 09 34 0a 35 09 35  0a 36 09 36 0a 37 09 37   .4.4.5.5 .6.6.7.7
0000030B  0a 38 09 38 0a 39 09 39  0a 30 09 30               .8.8.9.9 .0.0
00000317  63 00 00 00 04                                     c....
    000002FC  43 00 00 00 0c 43 4f 50  59 20 31 30 00 5a 00 00   C....COP Y 10.Z..
    0000030C  00 05 54                                           ..T
0000031C  51 00 00 00 0b 43 4f 4d  4d 49 54 00               Q....COM MIT.
    0000030F  43 00 00 00 0b 43 4f 4d  4d 49 54 00 5a 00 00 00   C....COM MIT.Z...
    0000031F  05 49                                              .I
00000328  53 00 00 00 04                                     S....
    00000321  5a 00 00 00 05 49                                  Z....I

v0.7.0

000001E8  51 00 00 00 0a 42 45 47  49 4e 00                  Q....BEG IN.
    0000028C  43 00 00 00 0a 42 45 47  49 4e 00 5a 00 00 00 05   C....BEG IN.Z....
    0000029C  54                                                 T
000001F3  50 00 00 00 55 73 71 6c  78 5f 73 5f 31 00 43 52   P...Usql x_s_1.CR
00000203  45 41 54 45 20 54 45 4d  50 20 54 41 42 4c 45 20   EATE TEM P TABLE 
00000213  74 65 6d 70 5f 74 65 73  74 5f 63 6f 70 79 20 28   temp_tes t_copy (
00000223  66 31 20 62 69 67 69 6e  74 2c 66 32 20 62 69 67   f1 bigin t,f2 big
00000233  69 6e 74 29 20 4f 4e 20  43 4f 4d 4d 49 54 20 44   int) ON  COMMIT D
00000243  52 4f 50 00 00 00 44 00  00 00 0e 53 73 71 6c 78   ROP...D. ...Ssqlx
00000253  5f 73 5f 31 00 53 00 00  00 04                     _s_1.S.. ..
    0000029D  31 00 00 00 04 74 00 00  00 06 00 00 6e 00 00 00   1....t.. ....n...
    000002AD  04 5a 00 00 00 05 54                               .Z....T
0000025D  42 00 00 00 18 00 73 71  6c 78 5f 73 5f 31 00 00   B.....sq lx_s_1..
0000026D  01 00 01 00 00 00 01 00  01 45 00 00 00 09 00 00   ........ .E......
0000027D  00 00 00 43 00 00 00 06  50 00 53 00 00 00 04      ...C.... P.S....
    000002B4  32 00 00 00 04 43 00 00  00 11 43 52 45 41 54 45   2....C.. ..CREATE
    000002C4  20 54 41 42 4c 45 00 33  00 00 00 04 5a 00 00 00    TABLE.3 ....Z...
    000002D4  05 54                                              .T
0000028C  51 00 00 00 2a 63 6f 70  79 20 74 65 6d 70 5f 74   Q...*cop y temp_t
0000029C  65 73 74 5f 63 6f 70 79  28 66 31 2c 66 32 29 20   est_copy (f1,f2) 
000002AC  46 52 4f 4d 20 53 54 44  49 4e 00                  FROM STD IN.
    000002D6  47 00 00 00 0b 00 00 02  00 00 00 00               G....... ....
000002B7  31 00 00 00 2b 09 32 0a  33 09 33 0a 34 09 34 0a   1...+.2. 3.3.4.4.
000002C7  35 09 35 0a 36 09 36 0a  37 09 37 0a 38 09 38 0a   5.5.6.6. 7.7.8.8.
000002D7  39 09 39 0a 30 09 30                               9.9.0.0
    000002E2  45 00 00 00 8f 53 45 52  52 4f 52 00 56 45 52 52   E....SER ROR.VERR
    000002F2  4f 52 00 43 30 38 50 30  31 00 4d 75 6e 65 78 70   OR.C08P0 1.Munexp
    00000302  65 63 74 65 64 20 6d 65  73 73 61 67 65 20 74 79   ected me ssage ty
    00000312  70 65 20 30 78 33 31 20  64 75 72 69 6e 67 20 43   pe 0x31  during C
    00000322  4f 50 59 20 66 72 6f 6d  20 73 74 64 69 6e 00 57   OPY from  stdin.W
    00000332  43 4f 50 59 20 74 65 6d  70 5f 74 65 73 74 5f 63   COPY tem p_test_c
    00000342  6f 70 79 2c 20 6c 69 6e  65 20 31 00 46 63 6f 70   opy, lin e 1.Fcop
    00000352  79 66 72 6f 6d 70 61 72  73 65 2e 63 00 4c 32 39   yfrompar se.c.L29
    00000362  32 00 52 43 6f 70 79 47  65 74 44 61 74 61 00 00   2.RCopyG etData..
    00000372  45 00 00 00 7c 53 46 41  54 41 4c 00 56 46 41 54   E...|SFA TAL.VFAT
    00000382  41 4c 00 43 30 38 50 30  31 00 4d 74 65 72 6d 69   AL.C08P0 1.Mtermi
    00000392  6e 61 74 69 6e 67 20 63  6f 6e 6e 65 63 74 69 6f   nating c onnectio
    000003A2  6e 20 62 65 63 61 75 73  65 20 70 72 6f 74 6f 63   n becaus e protoc
    000003B2  6f 6c 20 73 79 6e 63 68  72 6f 6e 69 7a 61 74 69   ol synch ronizati
    000003C2  6f 6e 20 77 61 73 20 6c  6f 73 74 00 46 70 6f 73   on was l ost.Fpos
    000003D2  74 67 72 65 73 2e 63 00  4c 34 33 36 39 00 52 50   tgres.c. L4369.RP
    000003E2  6f 73 74 67 72 65 73 4d  61 69 6e 00 00            ostgresM ain..
000002DE  63 00 00 00 04                                     c....