ClickHouse / clickhouse-rs

Official pure Rust typed client for ClickHouse DB
https://clickhouse.com
Apache License 2.0
286 stars 84 forks source link

Add support for insert of Apache Arrow tables (polars DataFrame) #63

Open rbeeli opened 1 year ago

rbeeli commented 1 year ago

Hi,

The Python client of ClickHouse allows to insert a raw pyarrow.Table via the insert_arrow method, which sends the Apache Arrow encoded data 1:1 to ClickHouse through ClickHouse's ArrowStream format. This is incredibly efficient.

Code is quite short, see https://github.com/ClickHouse/clickhouse-connect/blob/fa20547d7f7e2fd3a2cf4cd711c3262c5a79be7a/clickhouse_connect/driver/client.py#L576

Surprisingly, the INSERTs using Arrow in Python are even faster than this ClickHouse Rust client using RowBinary format, though I have not investigated where this client loses time.

Has anyone looked into Apache Arrow support and benchmarked it? Rust's polars is based on Apache Arrow as backend--using the native insert format seems like the logical choice, providing an easy way to directly insert a polars DataFrame into ClickHouse. Supporting Arrow would potentially improve performance and we could directly query/insert a whole polars DataFrame.

These are all Arrow-based standards and supported by ClickHouse/polars, so the extension might be straightforward.

rbeeli commented 1 year ago

Looks like creating the Arrow native payload really is straightforward:

use std::fs::File;
use arrow2::io::ipc::write;
use polars::prelude::*;

fn write_batches(path: &str, df: DataFrame) -> PolarsResult<()> {
    let file = File::create(path)?;

    // get Arrow schema from Polars' DataFrame
    let schema = df.schema().to_arrow();

    // write out in Arrow RecordBatches
    let options = write::WriteOptions { compression: None };
    let mut writer = write::FileWriter::new(file, schema, None, options);

    writer.start()?;

    for chunk in df.iter_chunks() {
        writer.write(&chunk, None)?
    }

    writer.finish()?;

    Ok(())
}

The binary data in the file can be sent to ClickHouse, e.g.

cat df.arrow | clickhouse-client --ask-password --query="INSERT INTO schema.table FORMAT Arrow"
loyd commented 1 year ago

Hello,

Surprisingly, the INSERTs using Arrow in Python are even faster than this ClickHouse Rust client using RowBinary format, though I have not investigated where this client loses time.

Can you share your benchmark code?

This is incredibly efficient.

I expect it to be less efficient than TCP+Native, which will be the next target of this library. Native is also a column-based format.

Some Client::insert_arrow can be supported, but it's opposite to the library's design, which is built around translating a stream of events (just struct in rust) to CH.

However, providing separate api for arrow is initially good idea, I need to thing about it.

rbeeli commented 1 year ago

Hi,

The following Python clickhouse-connect insert call for a pandas DataFrame via arrow takes 0.22 s for my test dataset with 7 columns and 296'136 rows:

con.insert_arrow('my_table', pa.Table.from_pandas(df, preserve_index=False))

Note that this includes the conversion from pandas numpy backend to pyarrow Table!

The aquivalent Rust logic takes 0.24 s in release mode (if there is a faster way, please let me know):

let mut insert = con.insert("my_table")?;
for row in rows {
    insert.write(&row).await?;
}
insert.end().await?;

records is a vector of struct (Row) with the following fields:

pub symbol: String,
#[serde(with = "clickhouse::serde::time::datetime64::millis")]
pub dt_close: OffsetDateTime,
pub open: f32,
pub high: f32,
pub low: f32,
pub close: f32,
pub volume: f32,

Comes very close to the pandas/arrow version, but surprisingly, it's slower despite having to do no additional conversion to Row structs.

Apart from the potential performance gains, having arrow support would be simplifying the integration of polars and certain data pipelines, e.g. no more manual serde mapping/parsing necessary, but can rely on polars instead and use the arrow insert functionality to directly insert a polars DataFrame.

loyd commented 11 months ago

@rbeeli, have you disabled compression in this library? I mean, client.with_compression(Compression::None)? It's a common mistake in such experiments. Also, clickhouse-connect uses TCP instead of HTTP, which generally is faster.

I like the arrow format, but I'm unsure if I should move this library to TCP+Arrow instead of TCP+Native.

ThomAub commented 10 months ago

Would really love to see some TCP+Arrow capabilities 👍 I would also be happy to help !