ClickHouse / spark-clickhouse-connector

Spark ClickHouse Connector build on DataSourceV2 API
https://clickhouse.com/docs/en/integrations/apache-spark
Apache License 2.0
188 stars 66 forks source link

Can't create table from DF with non-nullable map column, the connector composes SQL with nullable fields in CREATE TABLE #236

Closed ivanychev closed 1 year ago

ivanychev commented 1 year ago

Versions:

The following snippet fails to execute:

df = spark.createDataFrame(
  [
    ('a', {'foo': 'bar'}),
    ('b', {}),
  ],
  schema=T.StructType([
    T.StructField('id', T.StringType(), nullable=False),
    T.StructField('values', T.MapType(T.StringType(), T.StringType()), nullable=False),
  ])
)
df.writeTo("clickhouse.dev.test_table").createOrReplace()

This fails with

xenon.clickhouse.exception.CHServerException: [HTTP]...}/default [43] Code: 43. DB::Exception: Nested type Map(String, Nullable(String)) cannot be inside Nullable type. (ILLEGAL_TYPE_OF_ARGUMENT) (version 23.3.1.30124 (official build))

The Spark session is created as

spark = SparkSession.builder \
    .appName("MyApp") \
    .config("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog") \
    .config("spark.sql.catalog.clickhouse.host", "...") \
    .config("spark.sql.catalog.clickhouse.protocol", "http") \
    .config("spark.sql.catalog.clickhouse.http_port", "8443") \
    .config("spark.sql.catalog.clickhouse.user", "...") \
    .config("spark.sql.catalog.clickhouse.password", "...") \
    .config("spark.sql.catalog.clickhouse.database", "default") \
    .config("spark.sql.catalog.clickhouse.option.ssl", "true") \
    .config("spark.sql.catalog.clickhouse.option.sslmode", "STRICT") \
    .getOrCreate()

What can be the reason for such failure?

ivanychev commented 1 year ago

Full stack trace:

Py4JJavaError: An error occurred while calling o2651.createOrReplace.
: xenon.clickhouse.exception.CHServerException: [HTTP]...[43] Code: 43. DB::Exception: Nested type Map(String, Nullable(String)) cannot be inside Nullable type. (ILLEGAL_TYPE_OF_ARGUMENT) (version 23.3.1.30124 (official build))
, server ClickHouseNode [uri=..., options={sslmode=STRICT}]...
    at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:141)
    at xenon.clickhouse.client.NodeClient.syncQueryAndCheck(NodeClient.scala:151)
    at xenon.clickhouse.client.NodeClient.syncQueryAndCheckOutputJSONEachRow(NodeClient.scala:68)
    at xenon.clickhouse.ClickHouseCatalog.createTable$1(ClickHouseCatalog.scala:243)
    at xenon.clickhouse.ClickHouseCatalog.createTable(ClickHouseCatalog.scala:285)
    at xenon.clickhouse.ClickHouseCatalog.createTable(ClickHouseCatalog.scala:36)
    at org.apache.spark.sql.execution.datasources.v2.ReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:183)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
    at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$2(QueryExecution.scala:229)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:243)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:392)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:188)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:985)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:342)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:229)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$withMVTagsIfNecessary(QueryExecution.scala:214)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:227)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:220)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:99)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:298)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:294)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:220)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:354)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:220)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:174)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:165)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:256)
    at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196)
    at org.apache.spark.sql.DataFrameWriterV2.internalReplace(DataFrameWriterV2.scala:215)
    at org.apache.spark.sql.DataFrameWriterV2.createOrReplace(DataFrameWriterV2.scala:134)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:306)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:195)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:115)
    at java.lang.Thread.run(Thread.java:750)
Caused by: com.clickhouse.client.ClickHouseException: Code: 43. DB::Exception: Nested type Map(String, Nullable(String)) cannot be inside Nullable type. (ILLEGAL_TYPE_OF_ARGUMENT) (version 23.3.1.30124 (official build))
, server ClickHouseNode [uri=..., options={sslmode=STRICT}]...
    at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:168)
    at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:275)
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: java.io.IOException: Code: 43. DB::Exception: Nested type Map(String, Nullable(String)) cannot be inside Nullable type. (ILLEGAL_TYPE_OF_ARGUMENT) (version 23.3.1.30124 (official build))

    at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:184)
    at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:227)
    at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:124)
    at com.clickhouse.client.AbstractClient.sendAsync(AbstractClient.java:161)
    at com.clickhouse.client.AbstractClient.lambda$execute$0(AbstractClient.java:273)
    ... 4 more
ivanychev commented 1 year ago

I notices that Spark itself casts all columns to nullable which causes Nullable(Map(...)) in the ClickHouse SQL with CREATE TABLE.

But is there any possibility at all to write columns with maps and arrays using this connector?

pan3793 commented 1 year ago

That's a known issue, it should be fine if you create a table using ClickHouse native SQL before inserting

pan3793 commented 1 year ago

Another way is creating a Spark extension, with a dedicated ClickHouseV2CreateTableAsSelectExec and rules to change the nullable behavior. It makes the connector depends on many Spark internal implementations, which is what I want to avoid as much as possible.

ivanychev commented 1 year ago

@pan3793 thanks for your answer. Of course I can create the CH table beforehand, but in this case I would need to implement "Spark DF schema -> CH CREATE TABLE query" converter, which is already done in your connector. Just trying to see is there any other option

ivanychev commented 1 year ago

Maybe it's possible to specify non-nullable fields as a property during V2 write? We could explicitly avoid making some Schema-nullable fields Nullable(...) in CREATE TABLE by using such a property.

pan3793 commented 1 year ago

Maybe it's possible to specify non-nullable fields as a property during V2 write?

Considered, but I think it's still not intuitive.

Opened https://github.com/apache/spark/pull/41070 to try solving this problem on upstream