prisma / tiberius

TDS 7.2+ (Microsoft SQL Server) driver for Rust
Apache License 2.0
321 stars 118 forks source link

Broken connection when finalizing bulk insert #265

Closed leons727 closed 1 year ago

leons727 commented 1 year ago

PR: https://github.com/prisma/tiberius/pull/266

A deliberately setup bulk insert results into error during finalize() call:

Error: Io { kind: BrokenPipe, message: "An error occurred during the attempt of performing I/O: An existing connection was forcibly closed by the remote host. (os error 10054)" }

SQL Server logs show:

Error: 4014, Severity: 20, State: 17.

This is fully reproducible and appears to be related to packet sizing: increasing/decreasing number of rows by 1 from 1481 results in a successful insert, as changing first column name to C1....

Tracing shows that indeed last packet size is over 4096 bytes:

...
2022-12-25T22:02:35.688309Z TRACE tiberius::tds::codec::bulk_load: Bulk insert packet (4096 bytes)
2022-12-25T22:02:35.690583Z TRACE tiberius::tds::codec::bulk_load: Bulk insert packet (4096 bytes)
2022-12-25T22:02:35.696377Z TRACE tiberius::tds::codec::bulk_load: Finalizing a bulk insert (4107 bytes)

Repro code with tiberius = { version = "0.11.3", features = ["chrono", "time", "tds73"] }:

use std::error::Error;
use async_std::net::TcpStream;
use tiberius::{Client, Config, TokenRow, ToSql};

#[async_std::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let connect_string = "server=tcp:localhost,1433;user=SA;password=<YourStrong@Passw0rd>;TrustServerCertificate=true;";

    let config = Config::from_ado_string(connect_string)?;
    let tcp = TcpStream::connect(&config.get_addr()).await?;
    tcp.set_nodelay(true)?;
    let mut client = Client::connect(config, tcp).await?;

    let create_table = r#"
        CREATE TABLE ##TestBulkInsert
        (
            MetricResultID INT NULL,
            ExperimentID INT NOT NULL,
            MetricID int NOT NULL,
            VariantNumber TINYINT NOT NULL,
            ResultDate DATETIME2(3) NOT NULL,
            ResultValue FLOAT NULL,
            ResultLowerBound FLOAT NULL,
            ResultUpperBound FLOAT NULL,
            ResultConfidence FLOAT NULL,
            ResultIsSignificant BIT NULL,
            MetricAnalysisLogID INT NULL,
            InsertDate DATETIME2(3) NULL,
            ControlDenominator FLOAT NULL,
            ControlNumerator FLOAT NULL,
            VariantDenominator FLOAT NULL,
            VariantNumerator FLOAT NULL,
        )
    "#;

    client.simple_query(create_table).await?;

    let mut insert = client.bulk_insert("##TestBulkInsert").await?;

    let dt = chrono::Utc::now().naive_utc();

    for _ in 0..1481 {
        let mut row = TokenRow::with_capacity(16);
        row.push(0i32.to_sql());
        row.push(0i32.to_sql());
        row.push(0i32.to_sql());
        row.push(0u8.to_sql());
        row.push(dt.to_sql());
        row.push(0f64.to_sql());
        row.push(0f64.to_sql());
        row.push(0f64.to_sql());
        row.push(0f64.to_sql());
        row.push(false.to_sql());
        row.push(0i32.to_sql());
        row.push(dt.to_sql());
        row.push(0f64.to_sql());
        row.push(0f64.to_sql());
        row.push(0f64.to_sql());
        row.push(0f64.to_sql());
        insert.send(row).await?;
    }

    insert.finalize().await?;

    Ok(())
}