ClickHouse / spark-clickhouse-connector

Spark ClickHouse Connector build on DataSourceV2 API
https://clickhouse.com/docs/en/integrations/apache-spark
Apache License 2.0
180 stars 64 forks source link

Usage with SparkConnect #340

Closed mohaidoss closed 1 month ago

mohaidoss commented 1 month ago

Following the example with spark-shell

I get the error below when connecting a remote spark using sparkConnect

pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 12) (10.2.3.29 executor 3): java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition.inputPartitions of type scala.collection.Seq in instance of org.apache.spark.sql.execution.datasources.v2.DataSourceRDDPartition
        at java.base/java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2096)
        at java.base/java.io.ObjectStreamClass$FieldReflector.checkObjectFieldValueTypes(ObjectStreamClass.java:2060)
        at java.base/java.io.ObjectStreamClass.checkObjFieldValueTypes(ObjectStreamClass.java:1347)
        at java.base/java.io.ObjectInputStream$FieldValues.defaultCheckFieldValues(ObjectInputStream.java:2679)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2486)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
        at java.base/java.io.ObjectInputStream$FieldValues.<init>(ObjectInputStream.java:2606)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2457)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2257)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:509)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:467)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:579)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.r..

Code snippet to reproduce

from pyspark.sql import SparkSession

spark: SparkSession = SparkSession. \
    builder. \
    appName("spark-ch"). \
    remote("sc://spark-connect.ns"). \
    getOrCreate()

spark.sql("select * from `clickhouse`.db.tablename").show()

Spark version : 3.5.0

pan3793 commented 1 month ago

How do you setup your connect server? e.g. version, jars, start commands, etc.

mohaidoss commented 1 month ago

@pan3793 Version same as client: 3.5.0 Jars used:

Properties details:

   spark.jars.packages com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3,com.clickhouse:clickhouse-jdbc:0.6.0
    spark.hadoop.parquet.block.size 33554432
    spark.connect.grpc.binding.port 15002
    spark.databricks.delta.merge.repartitionBeforeWrite.enabled true
    spark.databricks.delta.optimize.repartition.enabled true
    spark.databricks.delta.properties.defaults.dataSkippingNumIndexedCols -1
    spark.databricks.delta.replaceWhere.constraintCheck.enabled false
    spark.databricks.delta.replaceWhere.dataColumns.enabled true
    spark.databricks.delta.schema.autoMerge.enabled false
    spark.decommission.enabled true
    spark.delta.logStore.s3.impl io.delta.storage.S3DynamoDBLogStore
    spark.delta.logStore.s3a.impl io.delta.storage.S3DynamoDBLogStore
    spark.driver.cores 1
    spark.driver.extraClassPath /opt/spark/jars/*
    spark.driver.extraJavaOptions -XX:+ExitOnOutOfMemoryError -XX:+UseCompressedOops -XX:+UseG1GC
    spark.driver.extraLibraryPath /opt/hadoop/lib/native
    spark.driver.host ${SPARK_DRIVER_HOST}
    spark.driver.maxResultSize 4g
    spark.driver.memory 2048m
    spark.driver.memoryOverhead 384m
    spark.dynamicAllocation.cachedExecutorIdleTimeout 600s
    spark.dynamicAllocation.enabled true
    spark.dynamicAllocation.executorAllocationRatio 1
    spark.dynamicAllocation.executorIdleTimeout 30s
    spark.dynamicAllocation.maxExecutors 4
    spark.dynamicAllocation.minExecutors 0
    spark.dynamicAllocation.schedulerBacklogTimeout 1s
    spark.dynamicAllocation.shuffleTracking.enabled true
    spark.dynamicAllocation.shuffleTracking.timeout 600s
    spark.dynamicAllocation.sustainedSchedulerBacklogTimeout 5s
    spark.eventLog.enabled false
    spark.executor.cores 8
    spark.executor.extraClassPath /opt/spark/jars/*:/home/spark/*
    spark.executor.extraJavaOptions -XX:+ExitOnOutOfMemoryError -XX:+UseCompressedOops -XX:+UseG1GC
    spark.executor.extraLibraryPath /opt/hadoop/lib/native:/home/spark
    spark.executor.memory 4096m
    spark.executor.memoryOverhead 512m
    spark.hadoop.aws.region eu-west-1
    spark.hadoop.delta.enableFastS3AListFrom true
    spark.hadoop.fs.s3.impl org.apache.hadoop.fs.s3a.S3AFileSystem
    spark.hadoop.fs.s3a.aws.credentials.provider com.amazonaws.auth.WebIdentityTokenCredentialsProvider
    spark.hadoop.fs.s3a.experimental.input.fadvise random
    spark.hadoop.fs.s3a.fast.upload true
    spark.hadoop.fs.s3a.fast.upload.default true
    spark.hadoop.fs.s3a.impl org.apache.hadoop.fs.s3a.S3AFileSystem
    spark.hive.imetastoreclient.factory.class com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory
    spark.jars.ivy /tmp/.ivy
    spark.kubernetes.allocation.driver.readinessTimeout 60s
    spark.kubernetes.authenticate.driver.serviceAccountName spark-connect
    spark.kubernetes.authenticate.executor.serviceAccountName spark-connect
    spark.kubernetes.authenticate.submission.caCertFile /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
    spark.kubernetes.authenticate.submission.oauthTokenFile /var/run/secrets/kubernetes.io/serviceaccount/token
    spark.kubernetes.container.image.pullPolicy Always
    spark.kubernetes.driver.pod.name ${SPARK_DRIVER_POD_NAME}
    spark.kubernetes.executor.annotation.eks.amazonaws.com/role-arn ${SPARK_AWS_ROLE}
    spark.kubernetes.executor.container.image sebastiandaberdaku/spark-glue-python:spark-v3.5.0-python-v3.10.12
    spark.kubernetes.executor.podTemplateFile /opt/spark/executor-pod-template.yaml
    spark.kubernetes.executor.request.cores 4000m
    spark.kubernetes.local.dirs.tmpfs false
    spark.kubernetes.namespace ns
    spark.local.dir /tmp
    spark.master k8s://https://kubernetes.default.svc.cluster.local:443
    spark.network.timeout 300s
    spark.serializer org.apache.spark.serializer.KryoSerializer
    spark.sql.catalog.spark_catalog org.apache.spark.sql.delta.catalog.DeltaCatalog
    spark.sql.catalogImplementation hive
    spark.sql.execution.arrow.pyspark.enabled true
    spark.sql.execution.arrow.pyspark.fallback.enabled true
    spark.sql.extensions io.delta.sql.DeltaSparkSessionExtension
    spark.sql.files.maxPartitionBytes 128MB
    spark.sql.hive.metastore.jars builtin
    spark.sql.parquet.datetimeRebaseModeInWrite CORRECTED
    spark.ui.port 4040
    spark.ui.dagGraph.retainedRootRDDs 100
    spark.ui.retainedJobs 100
    spark.ui.retainedStages 100
    spark.ui.retainedTasks 100
    spark.worker.ui.retainedExecutors 100
    spark.worker.ui.retainedDrivers 10
    spark.sql.ui.retainedExecutions 100
    spark.streaming.ui.retainedBatches 100
    spark.ui.retainedDeadExecutors 10

    spark.sql.catalog.clickhouse xenon.clickhouse.ClickHouseCatalog
    spark.sql.catalog.clickhouse.host ${CLICKHOUSE_HOST}
    spark.sql.catalog.clickhouse.protocol http
    spark.sql.catalog.clickhouse.http_port ${CLICKHOUSE_HTTP_PORT}
    spark.sql.catalog.clickhouse.user ${CLICKHOUSE_USER}
    spark.sql.catalog.clickhouse.password ${CLICKHOUSE_PASSWORD}
    spark.sql.catalog.clickhouse.database default
pan3793 commented 1 month ago

com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3 is only applicable to Spark 3.4, 0.8.0 has not been released yet, please clone the main branch and build via command

./gradlew clean build -x test -Dspark_binary_version=3.5 -Dscala_binary_version=2.12

(JDK 8 is required)

pan3793 commented 1 month ago

BTW, if you are going to access ClickHouse server 23.10+ (or ClickHouse Cloud), please upgrade your ClickHouse Java client too, see details at https://github.com/ClickHouse/spark-clickhouse-connector/pull/342

pan3793 commented 1 month ago

close as I suppose the issue is resolve