ClickHouse / clickhouse-rs

Official pure Rust typed client for ClickHouse DB
https://clickhouse.com
Apache License 2.0
315 stars 87 forks source link
clickhouse http rust streaming tokio

clickhouse-rs

Official pure Rust typed client for ClickHouse DB.

Crates.io Documentation License Build Status

Note: ch2rs is useful to generate a row type from ClickHouse.

Usage

To use the crate, add this to your Cargo.toml:

[dependencies]
clickhouse = "0.13.1"

[dev-dependencies]
clickhouse = { version = "0.13.1", features = ["test-util"] }
### Note about ClickHouse prior to v22.6 CH server older than v22.6 (2022-06-16) handles `RowBinary` [incorrectly](https://github.com/ClickHouse/ClickHouse/issues/37420) in some rare cases. Use 0.11 and enable `wa-37420` feature to solve this problem. Don't use it for newer versions.
### Create a client ```rust,ignore use clickhouse::Client; let client = Client::default() .with_url("http://localhost:8123") .with_user("name") .with_password("123") .with_database("test"); ``` * Reuse created clients or clone them in order to reuse a connection pool.
### Select rows ```rust,ignore use serde::Deserialize; use clickhouse::Row; #[derive(Row, Deserialize)] struct MyRow<'a> { no: u32, name: &'a str, } let mut cursor = client .query("SELECT ?fields FROM some WHERE no BETWEEN ? AND ?") .bind(500) .bind(504) .fetch::>()?; while let Some(row) = cursor.next().await? { .. } ``` * Placeholder `?fields` is replaced with `no, name` (fields of `Row`). * Placeholder `?` is replaced with values in following `bind()` calls. * Convenient `fetch_one::()` and `fetch_all::()` can be used to get a first row or all rows correspondingly. * `sql::Identifier` can be used to bind table names. Note that cursors can return an error even after producing some rows. To avoid this, use `client.with_option("wait_end_of_query", "1")` in order to enable buffering on the server-side. [More details](https://clickhouse.com/docs/en/interfaces/http/#response-buffering). The `buffer_size` option can be useful too.
### Insert a batch ```rust,ignore use serde::Serialize; use clickhouse::Row; #[derive(Row, Serialize)] struct MyRow { no: u32, name: String, } let mut insert = client.insert("some")?; insert.write(&MyRow { no: 0, name: "foo".into() }).await?; insert.write(&MyRow { no: 1, name: "bar".into() }).await?; insert.end().await?; ``` * If `end()` isn't called, the `INSERT` is aborted. * Rows are being sent progressively to spread network load. * ClickHouse inserts batches atomically only if all rows fit in the same partition and their number is less [`max_insert_block_size`](https://clickhouse.tech/docs/en/operations/settings/settings/#settings-max_insert_block_size).
### Infinite inserting Requires the `inserter` feature. ```rust,ignore let mut inserter = client.inserter("some")? .with_timeouts(Some(Duration::from_secs(5)), Some(Duration::from_secs(20))) .with_max_bytes(50_000_000) .with_max_rows(750_000) .with_period(Some(Duration::from_secs(15))); inserter.write(&MyRow { no: 0, name: "foo".into() })?; inserter.write(&MyRow { no: 1, name: "bar".into() })?; let stats = inserter.commit().await?; if stats.rows > 0 { println!( "{} bytes, {} rows, {} transactions have been inserted", stats.bytes, stats.rows, stats.transactions, ); } ``` * `Inserter` ends an active insert in `commit()` if thresholds (`max_bytes`, `max_rows`, `period`) are reached. * The interval between ending active `INSERT`s can be biased by using `with_period_bias` to avoid load spikes by parallel inserters. * `Inserter::time_left()` can be used to detect when the current period ends. Call `Inserter::commit()` again to check limits if your stream emits items rarely. * Time thresholds implemented by using [quanta](https://docs.rs/quanta) crate to speed the inserter up. Not used if `test-util` is enabled (thus, time can be managed by `tokio::time::advance()` in custom tests). * All rows between `commit()` calls are inserted in the same `INSERT` statement. * Do not forget to flush if you want to terminate inserting: ```rust,ignore inserter.end().await?; ```
### Perform DDL ```rust,ignore client.query("DROP TABLE IF EXISTS some").execute().await?; ```
### Live views Requires the `watch` feature. ```rust,ignore let mut cursor = client .watch("SELECT max(no), argMax(name, no) FROM some") .fetch::>()?; let (version, row) = cursor.next().await?.unwrap(); println!("live view updated: version={}, row={:?}", version, row); // Use `only_events()` to iterate over versions only. let mut cursor = client.watch("some_live_view").limit(20).only_events().fetch()?; println!("live view updated: version={:?}", cursor.next().await?); ``` * Use [carefully](https://github.com/ClickHouse/ClickHouse/issues/28309#issuecomment-908666042). * This code uses or creates if not exists a temporary live view named `lv_{sha1(query)}` to reuse the same live view by parallel watchers. * You can specify a name instead of a query. * This API uses `JSONEachRowWithProgress` under the hood because of [the issue](https://github.com/ClickHouse/ClickHouse/issues/22996). * Only struct rows can be used. Avoid `fetch::()` and other without specified names.

See examples.

Feature Flags

TLS

By default, TLS is disabled and one or more following features must be enabled to use HTTPS urls:

If multiple features are enabled, the following priority is applied:

How to choose between all these features? Here are some considerations:

Data Types

See also the additional examples:

Mocking

The crate provides utils for mocking CH server and testing DDL, SELECT, INSERT and WATCH queries.

The functionality can be enabled with the test-util feature. Use it only in dev-dependencies.

See the example.