ClickHouse / spark-clickhouse-connector

Spark ClickHouse Connector build on DataSourceV2 API
https://clickhouse.com/docs/en/integrations/apache-spark
Apache License 2.0
187 stars 66 forks source link

df.write/df.writeTo cannot create table #286

Closed dolfinus closed 6 months ago

dolfinus commented 9 months ago

Hi.

I'm trying to save dataframe to Clickhouse table which does not exists yet (without explicit CREATE TABLE ... query). But it looks like that both DataFrameWriterV2 and DataFrameWriterV1 are failing in this case.

  1. Install pyspark:

    pip install pyspark[sql]==3.4.1
  2. Start Clickhouse in docker container.

  3. Setup connection using v0.7.2 release:

    
    from pyspark.sql import SparkSession

packages = [ "com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.2", "com.clickhouse:clickhouse-client:0.4.6", "com.clickhouse:clickhouse-http-client:0.4.6", ]

spark = SparkSession.builder.config("spark.jars.packages", ",".join(packages)).getOrCreate()

spark.conf.set("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog") spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1") spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http") spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123") spark.conf.set("spark.sql.catalog.clickhouse.user", "default") spark.conf.set("spark.sql.catalog.clickhouse.password", "") spark.conf.set("spark.sql.catalog.clickhouse.database", "default")


4. Create dataframe:
```python
from pyspark.sql.types import StructField, StructType, IntegerType

schema = StructType(
    [
        StructField("a", IntegerType(), True),
    ],
)

df = spark.createDataFrame([[1], [2], [3]], schema)
  1. Try to save it as new table:
df.writeTo("clickhouse.default.mytable").tableProperty("engine", "Log()").createOrReplace()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/maxim/Repo/onetl/venv/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 1521, in saveAsTable
    self._jwrite.saveAsTable(name)
  File "/home/maxim/Repo/onetl/venv/lib/python3.9/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/home/maxim/Repo/onetl/venv/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 169, in deco
    return f(*a, **kw)
  File "/home/maxim/Repo/onetl/venv/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o133.saveAsTable.
: xenon.clickhouse.exception.CHServerException: [HTTP]default@127.0.0.1:8123}/default [1002] {"exception": "Code: 60. DB::Exception: Table default.mytable does not exist. Maybe you meant INFORMATION_SCHEMA.TABLES?. (UNKNOWN_TABLE) (version 24.1.1.2048 (official build))"}
, server ClickHouseNode [uri=http://127.0.0.1:8123/default, options={async=false,client_name=onetl}]@1094551918
        at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:141)
        at xenon.clickhouse.client.NodeClient.syncQueryOutputJSONEachRow(NodeClient.scala:62)
        at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:123)
        at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:36)
        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:580)
        at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:563)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.clickhouse.client.ClickHouseException: {"exception": "Code: 60. DB::Exception: Table default.mytable does not exist. Maybe you meant INFORMATION_SCHEMA.TABLES?. (UNKNOWN_TABLE) (version 24.1.1.2048 (official build))"}
, server ClickHouseNode [uri=http://127.0.0.1:8123/default, options={async=false,client_name=onetl}]@1094551918
        at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:168)
        at com.clickhouse.client.AbstractClient.execute(AbstractClient.java:282)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.sendOnce(ClickHouseClientBuilder.java:282)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.send(ClickHouseClientBuilder.java:294)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.execute(ClickHouseClientBuilder.java:349)
        at com.clickhouse.client.ClickHouseClient.executeAndWait(ClickHouseClient.java:1056)
        at com.clickhouse.client.ClickHouseRequest.executeAndWait(ClickHouseRequest.java:2154)
        at xenon.clickhouse.client.NodeClient.$anonfun$syncQuery$2(NodeClient.scala:138)
        at scala.util.Try$.apply(Try.scala:213)
        at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:138)
        ... 17 more
Caused by: java.io.IOException: {"exception": "Code: 60. DB::Exception: Table default.mytable does not exist. Maybe you meant INFORMATION_SCHEMA.TABLES?. (UNKNOWN_TABLE) (version 24.1.1.2048 (official build))"}

        at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:184)
        at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:227)
        at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:124)
        at com.clickhouse.client.AbstractClient.execute(AbstractClient.java:280)
        ... 25 more

Instead of creating table automatically (or failing that there are not enough properties, like engine) I got client exception which is not being handled by tableExists method of catalog: https://github.com/apache/spark/blob/v3.4.1/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalog.java#L164

For some reason this comparison is failing: https://github.com/housepower/spark-clickhouse-connector/blob/v0.7.2/spark-3.4/clickhouse-spark/src/main/scala/xenon/clickhouse/ClickHouseCatalog.scala#L124

pan3793 commented 9 months ago

Have a quick look, the issue may be caused by the ClickHouse Java client.

I see the following message, while 1002 means UNKNOWN_EXCEPTION

xenon.clickhouse.exception.CHServerException: [HTTP]default@127.0.0.1:8123}/default [1002] {"exception": "Code: 60. ...

https://github.com/housepower/spark-clickhouse-connector/blob/v0.7.2/clickhouse-core/src/main/java/xenon/clickhouse/exception/ClickHouseErrCode.java#L629

So I suspect that com.clickhouse.client.ClickHouseException#getErrorCode returns a different error code even though the real error is "Code: 60 ... (UNKNOWN_TABLE)"

Sorry it's too late in my timezone now, I haven't set up an environment to verify the above thought.

pan3793 commented 9 months ago

cc @zhicwu

pan3793 commented 9 months ago

0.7.3 is avaiable now, it support ClickHouse Java client 0.4, 0.5 and 0.6, would you like have a try?

dolfinus commented 9 months ago

Same:

packages = [
    "com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3",
    "com.clickhouse:clickhouse-jdbc:0.6.0",
    "com.clickhouse:clickhouse-http-client:0.6.0",
    "org.apache.httpcomponents.client5:httpclient5:5.3.1",
]
An error occurred while calling o86.replace.
: xenon.clickhouse.exception.CHServerException: [HTTP]default@127.0.0.1:8123}/default [1002] {"exception": "Code: 60. DB::Exception: Table default.mytable does not exist. Maybe you meant INFORMATION_SCHEMA.TABLES?. (UNKNOWN_TABLE) (version 24.1.1.2048 (official build))"}
, server ClickHouseNode [uri=http://127.0.0.1:8123/default, options={async=false,client_name=onetl}]@-534850370
        at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:141)
        at xenon.clickhouse.client.NodeClient.syncQueryOutputJSONEachRow(NodeClient.scala:62)
        at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:123)
        at xenon.clickhouse.ClickHouseCatalog.loadTable(ClickHouseCatalog.scala:36)
        at org.apache.spark.sql.connector.catalog.TableCatalog.tableExists(TableCatalog.java:163)
        at xenon.clickhouse.ClickHouseCatalog.tableExists(ClickHouseCatalog.scala:36)
        at org.apache.spark.sql.execution.datasources.v2.ReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:172)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
        at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:195)
        at org.apache.spark.sql.DataFrameWriterV2.internalReplace(DataFrameWriterV2.scala:207)
        at org.apache.spark.sql.DataFrameWriterV2.replace(DataFrameWriterV2.scala:129)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.lang.Thread.run(Thread.java:748)
Caused by: com.clickhouse.client.ClickHouseException: {"exception": "Code: 60. DB::Exception: Table default.mytable does not exist. Maybe you meant INFORMATION_SCHEMA.TABLES?. (UNKNOWN_TABLE) (version 24.1.1.2048 (official build))"}
, server ClickHouseNode [uri=http://127.0.0.1:8123/default, options={async=false,client_name=onetl}]@-534850370
        at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:168)
        at com.clickhouse.client.AbstractClient.execute(AbstractClient.java:282)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.sendOnce(ClickHouseClientBuilder.java:282)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.send(ClickHouseClientBuilder.java:294)
        at com.clickhouse.client.ClickHouseClientBuilder$Agent.execute(ClickHouseClientBuilder.java:349)
        at com.clickhouse.client.ClickHouseClient.executeAndWait(ClickHouseClient.java:878)
        at com.clickhouse.client.ClickHouseRequest.executeAndWait(ClickHouseRequest.java:2154)
        at xenon.clickhouse.client.NodeClient.$anonfun$syncQuery$2(NodeClient.scala:138)
        at scala.util.Try$.apply(Try.scala:213)
        at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:138)
        ... 45 more
Caused by: java.io.IOException: {"exception": "Code: 60. DB::Exception: Table default.mytable does not exist. Maybe you meant INFORMATION_SCHEMA.TABLES?. (UNKNOWN_TABLE) (version 24.1.1.2048 (official build))"}

        at com.clickhouse.client.http.ApacheHttpConnectionImpl.checkResponse(ApacheHttpConnectionImpl.java:209)
        at com.clickhouse.client.http.ApacheHttpConnectionImpl.post(ApacheHttpConnectionImpl.java:243)
        at com.clickhouse.client.http.ClickHouseHttpClient.send(ClickHouseHttpClient.java:118)
        at com.clickhouse.client.AbstractClient.execute(AbstractClient.java:280)
        ... 53 more
dolfinus commented 9 months ago

I see commit https://github.com/housepower/spark-clickhouse-connector/commit/3b42a617517dce306d2025267da384427ab42448 but I haven't found where fromCode is called

zhicwu commented 9 months ago

As I recalled, due to certain issue may or may relate to compatibility, instead of retrieving error code from response header, Java client parses error message and extracts error code from there, so it might not work all the time, especially when the error format is changed on server side.

java.io.IOException: {"exception": "Code: 60. DB::Exception: Table default.mytable does not exist. Maybe you meant INFORMATION_SCHEMA.TABLES?. (UNKNOWN_TABLE) (version 24.1.1.2048 (official build))"}

This looks like a JSON based error message returned from the server, which might be something new in 24.1. Can we reproduce the issue using curl?

dolfinus commented 9 months ago

For Clickhouse 23.9 it is fine: Exception: code=60 message=Code: 60. DB::Exception: Table default.mytable does not exist. (UNKNOWN_TABLE) (version 23.9.6.20 (official build))

For Clickhouse 23.10 it is broken: Eexception: code=1002 message={"exception": "Code: 60. DB::Exception: Table default.mytable does not exist. (UNKNOWN_TABLE) (version 23.10.1.1976 (official build))"}

But I don't see any changes in response format:

echo "SELECT * FROM unknown" | curl -u "default:" "http://localhost:8123"  --data-binary @- -v
* Host localhost:8123 was resolved.
* IPv6: ::1
* IPv4: 127.0.0.1
*   Trying [::1]:8123...
* Connected to localhost (::1) port 8123
* Server auth using Basic with user 'default'
> POST / HTTP/1.1
> Host: localhost:8123
> Authorization: Basic ZGVmYXVsdDo=
> User-Agent: curl/8.5.0
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
> 
< HTTP/1.1 404 Not Found
< Date: Wed, 07 Feb 2024 14:36:06 GMT
< Connection: Keep-Alive
< Content-Type: text/plain; charset=UTF-8
< X-ClickHouse-Server-Display-Name: 2ca6d5aa8570
< Transfer-Encoding: chunked
< X-ClickHouse-Query-Id: ae070ae0-2f92-4fbc-8fd3-3f6ea224c34d
< X-ClickHouse-Timezone: UTC
< X-ClickHouse-Exception-Code: 60
< Keep-Alive: timeout=3
< X-ClickHouse-Summary: {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0","result_rows":"0","result_bytes":"0","elapsed_ns":"700664"}
< 
Code: 60. DB::Exception: Table default.unknown does not exist. (UNKNOWN_TABLE) (version 23.9.6.20 (official build))
* Connection #0 to host localhost left intact
* Host localhost:8123 was resolved.
* IPv6: ::1
* IPv4: 127.0.0.1
*   Trying [::1]:8123...
* Connected to localhost (::1) port 8123
* Server auth using Basic with user 'default'
> POST / HTTP/1.1
> Host: localhost:8123
> Authorization: Basic ZGVmYXVsdDo=
> User-Agent: curl/8.5.0
> Accept: */*
> Content-Length: 22
> Content-Type: application/x-www-form-urlencoded
> 
< HTTP/1.1 404 Not Found
< Date: Wed, 07 Feb 2024 14:36:46 GMT
< Connection: Keep-Alive
< Content-Type: text/plain; charset=UTF-8
< X-ClickHouse-Server-Display-Name: 5a8448833d73
< Transfer-Encoding: chunked
< X-ClickHouse-Query-Id: 2572b2c3-0b96-4f06-b80c-66b1742d2fbf
< X-ClickHouse-Timezone: UTC
< X-ClickHouse-Exception-Code: 60
< Keep-Alive: timeout=3
< X-ClickHouse-Summary: {"read_rows":"0","read_bytes":"0","written_rows":"0","written_bytes":"0","total_rows_to_read":"0","result_rows":"0","result_bytes":"0","elapsed_ns":"3839604"}
< 
Code: 60. DB::Exception: Table default.unknown does not exist. (UNKNOWN_TABLE) (version 23.10.1.1976 (official build))
* Connection #0 to host localhost left intact
houxiyao commented 7 months ago

1712139334383

I'm executing the statement that creates the table, but the code that checks if the table exists is executed. The table does not exist and an exception is thrown and the table is not created. Is there something wrong with what I'm using?

pan3793 commented 7 months ago

@houxiyao I believe the whole conversation has explained the issue. Have you read it thoroughly?

dolfinus commented 6 months ago

Fixed in https://github.com/ClickHouse/clickhouse-java/pull/1577, use clickhouse-java v0.6.0-patch3 or higher, with spark-clickhouse-connector v0.7.3:

from pyspark.sql import SparkSession

packages = [
    "com.github.housepower:clickhouse-spark-runtime-3.4_2.12:0.7.3",
    "com.clickhouse:clickhouse-client:0.6.0-patch4",
    "com.clickhouse:clickhouse-http-client:0.6.0-patch4",
    "org.apache.httpcomponents.client5:httpclient5:5.3.1"
]

spark = SparkSession.builder.config("spark.jars.packages", ",".join(packages)).getOrCreate()

spark.conf.set("spark.sql.catalog.clickhouse", "xenon.clickhouse.ClickHouseCatalog")
spark.conf.set("spark.sql.catalog.clickhouse.host", "127.0.0.1")
spark.conf.set("spark.sql.catalog.clickhouse.protocol", "http")
spark.conf.set("spark.sql.catalog.clickhouse.http_port", "8123")
spark.conf.set("spark.sql.catalog.clickhouse.user", "default")
spark.conf.set("spark.sql.catalog.clickhouse.password", "")
spark.conf.set("spark.sql.catalog.clickhouse.database", "default")

from pyspark.sql.types import StructField, StructType, IntegerType

schema = StructType(
    [
        StructField("a", IntegerType(), True),
    ],
)

df = spark.createDataFrame([[1], [2], [3]], schema)

df.writeTo("clickhouse.default.mytable").tableProperty("engine", "Log()").createOrReplace()
spark.table("clickhouse.default.mytable").show()
pan3793 commented 6 months ago

@dolfinus Great! Thanks for the information