Clickhouse Scala Client that uses Pekko Http to create a reactive streams implementation to access the Clickhouse database in a reactive way.
Features:
We do not guarantee api-backwards compatibility, although the API has been very stable over the last years.
Scala version:
Artifacts: https://mvnrepository.com/artifact/com.crobox.clickhouse/client_2.12 https://oss.sonatype.org/content/repositories/snapshots/com/crobox/clickhouse
for sbt you can use
// https://mvnrepository.com/artifact/com.crobox/clickhouse-scala-client_2.12
libraryDependencies += "com.crobox.clickhouse" %% "client" % "0.9.0"
When in doubt about the documentation please read the tests to find the truth.
val config: Config
val queryDatabase: String = "default"
implicit val system:ActorSystem
val client = new ClickhouseClient(config, queryDatabase)
client.query("SELECT 1 + 1").map(result => {
println(s"Got query result $result")
})
val config: Config
val client: ClickhouseClient
val sink = ClickhouseSink.insertSink(config, client)
sink.runWith(Source.single(Insert("clicks", "{some_column: 3 }")))
crobox.clickhouse.client
crobox.clickhouse.indexer
. You can also provide specific overrides based on the indexer name by using the same configs under the prefix crobox.clickhouse.indexer.{indexer-name}
You can find all the configuration options in the reference file, with explanatory comments about their usage.
Three different connection modes are supported.
The balancing-hosts
and cluster-aware
connections are setting up health checks for each host, by running a simple http request on clickhouse host as specified in the clickhouse docs. For the healthchecks we use separate Cached Host Connection Pools
with a maximum of one connection to ensure we never run more than one health check at the same time for the same host. When a host fails the healthchecks we will no longer use it to run queries. If all the health checks are failing the queries will fail fast.
crobox.clickhouse.client.connection {
health-check {
interval = 5 seconds #minimum interval between two health checks
timeout = 1 second #health check will fail if it exceed timeout
}
}
crobox.clickhouse.client {
connection: {
type = "single-host",
host = "localhost",
port = 8123
}
}
This will not setup a health check and will dispatch all queries to the configured host.
Round robin on the configured hosts.
crobox.clickhouse.client {
connection: {
type = "balancing-hosts"
hosts: [
{
host = "localhost",
port = 7415
}
]
}
}
The host and the port will be used to continually update the list of clickhouse nodes by querying and using the host-name
from the system.cluster
clickhouse table. (check scanning-interval
)
You can specify a specific clickhouse cluster to run queries only on the respective cluster.
Please do note that this connection type will default to using the port of 8123 for all nodes.
crobox.clickhouse.client {
connection: {
type = "cluster-aware"
host = "localhost"
port = 8123
cluster = "cluster" # use only hosts which belong to the "cluster" cluster
health-check {
interval = 5 seconds
timeout = 1 second
}
scanning-interval = 10 seconds # min interval between running a new query to update the list of hosts from the system.cluster table
}
}
Inserting into clickhouse is done using an pekko stream. All the settings are applied on a per table basis.
We will do one insert when the maximum number of items batch-size
or the maximum time has been exceeded flush-interval
. Based on the number of concurrent-requests
we can run multiple inserts in parallel for the same table.
crobox.clickhouse {
indexer {
batch-size = 10000
concurrent-requests = 1
flush-interval = 5 seconds
fast-indexer {
flush-interval = 1 second
batch-size = 1000
}
}
}
To set authentication or a settings profile for the client you can update the following configs. You can also set custom settings as presented in the clickhouse documentation
crobox.clickhouse.client{
settings {
authentication {
user = "default"
password = ""
}
profile = "default"
http-compression = false
// https://clickhouse.yandex/docs/en/operations/settings/settings/
custom {
distributed_product_mode = "local"
}
}
}
Read only queries
val client: ClickhouseClient
client.query("SELECT 1").map(result => println(result))
Write queries
val client: ClickhouseClient
client.execute("ALTER TABLE my_table DELETE WHERE id = 'deleted'").map(result => println(result))
Streaming delimited result (by new line)
val client: ClickhouseClient
client.source("SELECT * FROM my_table").runWith(Sink.foreach(line => println(line)))
Streaming raw result (ByteString)
val client: ClickhouseClient
client.sourceByteString("SELECT * FROM my_table").runWith(Sink.foreach(byteString => println(byteString)))
Sink streaming body
val client: ClickhouseClient
client.sink("INSERT INTO my_table", Source.single(ByteString("el1"))).map(result => println(result))
@Experimental - might not be complete
We only expose progress when running read only queries. The current implementation is recommended to be used only for long running queries which return a result relatively small in size (fits easily in memory). The returned source is materialized with the query result.
When running queries with progress we set a custom client transport for the super pool used by client to run the queries. Due to limitation in the pekko implementation which does not allow for the headers to be streamed we are parsing the raw http output and intercept the http headers to receive the progress.
We expose multiple events for the progress:
val client: ClickhouseClient
client.queryWithProgress("SELECT uniq(timestamps), uniq(mosquito_name) FROM mosquito_bites")
.toMat(Sink.forEach(progress => println(progress)))(Keep.left)
.run()
.map(result => println(result))
Every call to the client accepts an implicit QuerySettings
object which can override settings for that specific query.
ClickhouseSink
(Indexer), or running queries using client.query/client.execute
.val client: ClickhouseClient
implicit val settings = QuerySettings(queryId = Some("expensive_query"),settings = Map("replace_running_query" -> "1"))
client.query("SELECT uniq(expensive) FROM huge_table")//start query
client.query("SELECT uniq(expensive) FROM huge_table")//replaces existing query
Query retrying takes advantage of host balancing and will request another host for each retry.
The queries that use the client api source
, sink
are not going to be retried.
All the read only queries are considered idempotent and are retried up to a maximum number of configurable times. (3 times by default, so 4 total execution, 1 the initial execution and 3 retries)
crobox.clickhouse.retries = 3
By using the ClickhouseSink
you can also retry inserts by setting the idempotent
setting to true on the query settings.
Typed/composable DSL that is interpreted and parsed into queries, with ofcourse full seamless integration into the driver.
For more information see the wiki
We also expose an utility test kit which provider a helpful spec with testing utilities. It automatically creates a single use database before all tests and drops it afterwards.
// https://mvnrepository.com/artifact/com.crobox/clickhouse-scala-client_2.12
libraryDependencies += "com.crobox.clickhouse" %% "testkit" % <latest_version>
Check the spec for more details.