ClickHouse / spark-clickhouse-connector

Spark ClickHouse Connector build on DataSourceV2 API
https://clickhouse.com/docs/en/integrations/apache-spark
Apache License 2.0
176 stars 63 forks source link

Support Struct Date Type #337

Open BentsiLeviav opened 1 week ago

BentsiLeviav commented 1 week ago

Is your feature request related to a problem? Please describe.

When trying to insert a Spark df that contains Struct data typed column, the following exception is being thrown:

Exception in thread "main" xenon.clickhouse.exception.CHClientException:  [-1] Unsupported type: StructType(StructField(name,StringType,true),StructField(age,IntegerType,true))
    at org.apache.spark.sql.clickhouse.SchemaUtils$.toClickHouseType(SchemaUtils.scala:92)
    at org.apache.spark.sql.clickhouse.SchemaUtils$.$anonfun$toClickHouseSchema$1(SchemaUtils.scala:108)
    at scala.collection.ArrayOps$.map$extension(ArrayOps.scala:929)
    at org.apache.spark.sql.clickhouse.SchemaUtils$.toClickHouseSchema(SchemaUtils.scala:107)
    at xenon.clickhouse.ClickHouseCatalog.createTable(ClickHouseCatalog.scala:221)
    at xenon.clickhouse.ClickHouseCatalog.createTable(ClickHouseCatalog.scala:36)
    at org.apache.spark.sql.execution.datasources.v2.ReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:179)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
    at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:195)
    at org.apache.spark.sql.DataFrameWriterV2.internalReplace(DataFrameWriterV2.scala:213)
    at org.apache.spark.sql.DataFrameWriterV2.createOrReplace(DataFrameWriterV2.scala:133)
    at org.example.Main.main(Main.java:54)

Describe the solution you'd like Implement a Struct data type. Changes are supposed to be in https://github.com/ClickHouse/spark-clickhouse-connector/blob/main/spark-3.5/clickhouse-spark/src/main/scala/org/apache/spark/sql/clickhouse/SchemaUtils.scala#L80:

Describe alternatives you've considered An alternative would be to use the Map data type, although it is not always equivalent.