ClickHouse / clickhouse-rs

Official pure Rust typed client for ClickHouse DB
https://clickhouse.com
Apache License 2.0
315 stars 87 forks source link

Panic on write #138

Closed vikulikov closed 1 month ago

vikulikov commented 2 months ago

I've got the panic

panicked at /usr/local/cargo/registry/src/index.crates.io-6f17d22bba15001f/clickhouse-0.12.2/src/insert.rs:231:18:                                                                                                                                                         
write() after error

with this implementation

    async fn insert_serializable_rows<T: Serialize + clickhouse::Row>(
        &self,
        table_name: &str,
        mut rows: impl Iterator<Item = T>,
    ) -> clickhouse::error::Result<()> {
        let mut res = Ok(());

        'attempt: for _ in 0..MAX_REQUESTS_RETRY_NUM {
            res = Ok(());
            let Some(first) = rows.next() else {
                return Ok(());
            };
            let mut insert = match self.client.insert(table_name) {
                Ok(insert) => insert,
                Err(e) => {
                    res = Err(e);
                    continue 'attempt;
                }
            };

            let insert_res = insert.write(&first).await;
            if res.is_err() {
                res = insert_res;
                continue 'attempt;
            }

            while let Some(row) = rows.next() {
                let insert_res = insert.write(&row).await;
                if res.is_err() {
                    res = insert_res;
                    continue 'attempt;
                }
            }

            insert.end().await?;

            break;
        }

        res
    }

so it should create new insert on the first error. self.client is client: clickhouse::Client. Am I supposed to recreate the whole client on an error or there is a bug in the lib?

slvrtrn commented 2 months ago

In case of an insert error, it should not be necessary to recreate the entire client instance; it should be enough to just use a new insert.

Can you please provide a runnable minimal repro so that it is easier to debug?

vikulikov commented 1 month ago

Error on clickhouse-server side I've got in time of panic

2024.09.07 05:48:11.830692 [ 282847 ] {} <Error> ServerErrorHandler: Poco::Exception. Code: 1000, e.code() = 107, Net Exception: Socket is not connected, Stack trace (when copying this message, always include the lines below):

0. Poco::Net::SocketImpl::error(int, String const&) @ 0x0000000012d15528
1. Poco::Net::SocketImpl::peerAddress() @ 0x0000000012d180f4
2. DB::HTTPServerRequest::HTTPServerRequest(std::shared_ptr<DB::IHTTPContext>, DB::HTTPServerResponse&, Poco::Net::HTTPServerSession&, StrongTypedef<unsigned long, ProfileEvents::EventTag> const&) @ 0x00000000109fc5ac
3. DB::HTTPServerConnection::run() @ 0x00000000109fb088
4. Poco::Net::TCPServerConnection::start() @ 0x0000000012d1ac38
5. Poco::Net::TCPServerDispatcher::run() @ 0x0000000012d1b114
6. Poco::PooledThread::run() @ 0x0000000012e12e98
7. Poco::ThreadImpl::runnableEntry(void*) @ 0x0000000012e11290
8. ? @ 0x000000000007d5c8
9. ? @ 0x00000000000e5edc
 (version 24.5.3.5 (official build))
vikulikov commented 1 month ago

Cargo.toml

[package]
name = "clickhouse-minimal"
version = "0.1.0"
edition = "2021"

[dependencies]
serde = "1"
serde_derive = "1"
clickhouse = "0.12"
hyper-util = { version = "0.1.7", features = ["client"] }

tokio = { version = "1.21.0", features = [
    "rt",
    "sync",
    "macros",
    "time",
    "rt-multi-thread",
] }

main.rs

use serde::Serialize;
use serde_derive::Serialize;

#[tokio::main]
async fn main() {
    let client = ClientWrapper::new();

    let sample_data = std::array::from_fn::<_, 100, _>(|num| ExampleData {
        timestamp: (num * 1000) as u64,
    });

    client
        .insert_serializable_rows("table", sample_data.into_iter())
        .await
        .unwrap();
}

const MAX_REQUESTS_RETRY_NUM: usize = 3;

struct ClientWrapper {
    client: clickhouse::Client,
}

impl ClientWrapper {
    fn new() -> Self {
        let mut connector = hyper_util::client::legacy::connect::HttpConnector::new();
        connector.set_nodelay(true);
        let hyper_client =
            hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::new())
                .build(connector);

        let client = clickhouse::Client::with_http_client(hyper_client)
            .with_url("")
            .with_user("")
            .with_password("")
            .with_database("")
            .with_option(
                "format_binary_max_string_size",
                640897190600_u64.to_string(),
            )
            .with_compression(clickhouse::Compression::Lz4);

        Self { client }
    }

    async fn insert_serializable_rows<T: Serialize + clickhouse::Row>(
        &self,
        table_name: &str,
        mut rows: impl Iterator<Item = T>,
    ) -> clickhouse::error::Result<()> {
        let mut res = Ok(());

        'attempt: for _ in 0..MAX_REQUESTS_RETRY_NUM {
            res = Ok(());
            let Some(first) = rows.next() else {
                return Ok(());
            };
            let mut insert = match self.client.insert(table_name) {
                Ok(insert) => insert,
                Err(e) => {
                    res = Err(e);
                    continue 'attempt;
                }
            };

            let insert_res = insert.write(&first).await;
            if res.is_err() {
                res = insert_res;
                continue 'attempt;
            }

            while let Some(row) = rows.next() {
                let insert_res = insert.write(&row).await;
                if res.is_err() {
                    res = insert_res;
                    continue 'attempt;
                }
            }

            insert.end().await?;

            break;
        }

        res
    }
}

#[derive(Default, Serialize, clickhouse::Row)]
struct ExampleData {
    timestamp: u64,
}

Unfortunately, I cannot disclose table structure

slvrtrn commented 1 month ago

@loyd @serprex, could you have a look as well?

serprex commented 1 month ago

@vikulikov in your code you have

            let insert_res = insert.write(&first).await;
            if res.is_err() {
                res = insert_res;
                continue 'attempt;
            }

you should be checking insert_err, not res. In general is_err() should be avoided

            if let Err(insert_res) = insert.write(&first).await {
                res = Err(insert_res);
                continue 'attempt;
            }
vikulikov commented 1 month ago

Oh, my bad. Thank you very much