ClickHouse / spark-clickhouse-connector

Spark ClickHouse Connector build on DataSourceV2 API
https://clickhouse.com/docs/en/integrations/apache-spark
Apache License 2.0
187 stars 66 forks source link

CHClientException: Unknown cluster: {cluster} #362

Open maver1ck opened 1 month ago

maver1ck commented 1 month ago

Hi, I'm trying to query data on my clickhouse cluster. Execution of query returns CHClientException.

Py4JJavaError: An error occurred while calling o43.sql.
: com.clickhouse.spark.exception.CHClientException:  [-1] Unknown cluster: {cluster}
    at com.clickhouse.spark.spec.TableEngineUtils$.$anonfun$resolveTableCluster$2(TableEngineUtils.scala:34)
    at scala.Option.getOrElse(Option.scala:189)
    at com.clickhouse.spark.spec.TableEngineUtils$.resolveTableCluster(TableEngineUtils.scala:34)
    at com.clickhouse.spark.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:145)
    at com.clickhouse.spark.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:44)
    at org.apache.spark.sql.connector.catalog.CatalogV2Util$.getTable(CatalogV2Util.scala:363)
    at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:337)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$resolveRelation$5(Analyzer.scala:1315)
    at scala.Option.orElse(Option.scala:447)
    at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$resolveRelation$1(Analyzer.scala:1311)
    at scala.Option.orElse(Option.scala:447)

This is table definitions:

CREATE TABLE telemetry.reference_peaks_shard ON CLUSTER '{cluster}'
(
    hash_1 String,
    hash_2 String,
    ts DateTime,
    offset Int32,
    channel Int32,
    station_id String,
    tts DateTime,
    batchid Int32,
    org_ts DateTime
)
ENGINE = MergeTree
ORDER BY ts;

CREATE TABLE telemetry.reference_peaks ON CLUSTER '{cluster}' AS telemetry.reference_peaks_shard
ENGINE = Distributed('{cluster}', 'telemetry', 'reference_peaks_shard', channel);

Spark Code:

from pyspark.sql import SparkSession

# Set up the SparkSession to include ClickHouse as a custom catalog
spark = SparkSession.builder \
    .appName("ClickHouse Catalog Example") \
    .config("spark.jars.packages", "com.clickhouse.spark:clickhouse-spark-runtime-3.5_2.12:0.8.0,com.clickhouse:clickhouse-jdbc:0.6.5,org.apache.httpcomponents.client5:httpclient5:5.3.1") \
    .config("spark.sql.catalog.clickhouse", "com.clickhouse.spark.ClickHouseCatalog") \
    .config("spark.sql.catalog.clickhouse.host", "clickhouse-kim.clickhouse.svc") \
    .config("spark.sql.catalog.clickhouse.http_port", "8123") \
    .config("spark.sql.catalog.clickhouse.database", "telemetry") \
    .config("spark.sql.catalog.clickhouse.driver", "com.clickhouse.jdbc.ClickHouseDriver") \
    .config("spark.sql.catalog.clickhouse.user", "admin") \
    .config("spark.sql.catalog.clickhouse.password", "admin") \
    .getOrCreate()

spark.sql("select * from clickhouse.telemetry.reference_peaks where channel = 1").count()