housepower / ClickHouse-Native-JDBC

ClickHouse Native Protocol JDBC implementation
https://housepower.github.io/ClickHouse-Native-JDBC/
Apache License 2.0
527 stars 145 forks source link

Caused by: com.github.housepower.exception.ClickHouseSQLException: type[String] doesn't support null value #425

Closed swarupsarangi113 closed 2 years ago

swarupsarangi113 commented 2 years ago

Environment

Issue

I am trying to load a dataframe into clickhouse table by using the below code snippet:

    def load_into_clickhouse(self, df):
        df.write \
            .format("jdbc") \
            .mode("overwrite") \
            .option("driver", "com.github.housepower.jdbc.ClickHouseDriver") \
            .option("url", self.parameters["connection_properties"]["url"]) \
            .option("createTableOptions", "engine=MergeTree() order by (ProspectID) primary key (ProspectID)") \
            .option("user", "clickhouse_operator") \
            .option("password", "clickhouse_operator_password") \
            .option("dbtable", "clickhousedb.Prospect_Base_auto") \
            .option("truncate", "true") \
            .option("isolationLevel", "NONE") \
            .save()

I believe the clickhouse table that is dynamically created while writing dataframe, it is not accepting the columns that are having null values. Is there any way to resolve this issue.

Error logs

com.github.housepower.exception.ClickHouseSQLException: type[String] doesn't support null value
    at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.convertToCkDataType(ClickHousePreparedInsertStatement.java:224)
    at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.setObject(ClickHousePreparedInsertStatement.java:120)
    at com.github.housepower.jdbc.statement.AbstractPreparedStatement.setNull(AbstractPreparedStatement.java:93)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:698)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:868)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:867)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1011)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1011)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
22/09/01 12:59:44 WARN TaskSetManager: Lost task 4.0 in stage 8.0 (TID 110) (host.docker.internal executor driver): com.github.housepower.exception.ClickHouseSQLException: type[String] doesn't support null value
    at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.convertToCkDataType(ClickHousePreparedInsertStatement.java:224)
    at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.setObject(ClickHousePreparedInsertStatement.java:120)
    at com.github.housepower.jdbc.statement.AbstractPreparedStatement.setNull(AbstractPreparedStatement.java:93)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:698)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:868)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:867)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1011)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1011)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

22/09/01 12:59:44 ERROR TaskSetManager: Task 4 in stage 8.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "C:\Users\swarup.s\Documents\pythonProject\etl_on_clickhouse.py", line 78, in <module>
    load_into_clickouse.start()
  File "C:\Users\swarup.s\Documents\pythonProject\etl_on_clickhouse.py", line 68, in start
    self.load_into_clickhouse(dataframe.select("ProspectID", "FirstName", "LastName"))
  File "C:\Users\swarup.s\Documents\pythonProject\etl_on_clickhouse.py", line 47, in load_into_clickhouse
    .save()
  File "C:\Spark\spark-3.3.0-bin-hadoop3\python\pyspark\sql\readwriter.py", line 966, in save
    self._jwrite.save()
  File "C:\Spark\spark-3.3.0-bin-hadoop3\python\lib\py4j-0.10.9.5-src.zip\py4j\java_gateway.py", line 1321, in __call__
  File "C:\Spark\spark-3.3.0-bin-hadoop3\python\pyspark\sql\utils.py", line 190, in deco
    return f(*a, **kw)
  File "C:\Spark\spark-3.3.0-bin-hadoop3\python\lib\py4j-0.10.9.5-src.zip\py4j\protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o62.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 8.0 failed 1 times, most recent failure: Lost task 4.0 in stage 8.0 (TID 110) (host.docker.internal executor driver): com.github.housepower.exception.ClickHouseSQLException: type[String] doesn't support null value
    at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.convertToCkDataType(ClickHousePreparedInsertStatement.java:224)
    at com.github.housepower.jdbc.statement.ClickHousePreparedInsertStatement.setObject(ClickHousePreparedInsertStatement.java:120)
    at com.github.housepower.jdbc.statement.AbstractPreparedStatement.setNull(AbstractPreparedStatement.java:93)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:698)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:868)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:867)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1011)
    at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1011)
    at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2268)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
pan3793 commented 2 years ago

Hi @swarupsarangi113 https://github.com/housepower/spark-clickhouse-connector is recommended for Spark 3.3 users

pan3793 commented 2 years ago

And your error message indicates you are trying to insert NULL to a String column that does not accept NULL values.

"type[String] doesn't support null value"

swarupsarangi113 commented 2 years ago

Hi @pan3793 , I am using Spark 3.3.0 in my system. Could you explain a little bit about your resolution. I couldn't understanding since I am little new to this.

pan3793 commented 2 years ago

ClickHouse has its own type system, String means STRING NOT NULL, and Nullable(String) mean STRING which accepts NULL value.

pan3793 commented 2 years ago

OK, I missed your mention that

dynamically created while writing dataframe

It's the limitation of Spark API, it does not expose the dataframe nullable to the JDBC parts, so the developer just always creates a nullable or not null schema

pan3793 commented 2 years ago

please pre-create the clickhouse table before writing

swarupsarangi113 commented 2 years ago

@pan3793 How to create an empty table with my custom DDL programmatically ?

I tried something below like this:

    def create_table(self):
        self.spark.sql(
            """
            CREATE TABLE clickhousedb.Prospect_Base
                    (
                        ProspectID CHAR(50) not null ,
                        FirstName CHAR(50) null,
                        LastName CHAR(30) null
                    )
                    ENGINE = MergeTree()
                    PRIMARY KEY ProspectID
                    ORDER BY ProspectID
                    SETTINGS index_granularity = 8192;

            """
            )

But it is throwing error

java.lang.NoSuchMethodError: 'java.lang.Object 
org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(org.antlr.v4.runtime.ParserRuleContext, scala.Function0)'
    at io.delta.sql.parser.DeltaSqlAstBuilder.visitSingleStatement(DeltaSqlParser.scala:244)
    at io.delta.sql.parser.DeltaSqlAstBuilder.visitSingleStatement(DeltaSqlParser.scala:146)
    at io.delta.sql.parser.DeltaSqlBaseParser$SingleStatementContext.accept(DeltaSqlBaseParser.java:165)
    at org.antlr.v4.runtime.tree.AbstractParseTreeVisitor.visit(AbstractParseTreeVisitor.java:18)
    at io.delta.sql.parser.DeltaSqlParser.$anonfun$parsePlan$1(DeltaSqlParser.scala:74)
    at io.delta.sql.parser.DeltaSqlParser.parse(DeltaSqlParser.scala:103)
    at io.delta.sql.parser.DeltaSqlParser.parsePlan(DeltaSqlParser.scala:73)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$2(SparkSession.scala:620)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:620)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
    at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
    at java.base/java.lang.reflect.Method.invoke(Method.java:577)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)

Is this due to driver incompatibility with delta jar packages ?

pan3793 commented 2 years ago

I don't think the Spark JDBC datasource provides the ability to run native query.

The spark-clickhouse-connector has such ability.

https://housepower.github.io/spark-clickhouse-connector/quick_start/03_play_with_spark_shell/#operations

swarupsarangi113 commented 2 years ago

I was having trouble with connecting to clickhouse from the above connector. Probably something from backend side which I have no control of. meanwhile, I identified the column which was having null values and replaced them with empty strings using df.na.fill("", ["source"])

@pan3793 do you think this type of approach can dismiss the need for the clickhouse-connector for now or will it have other disadvantages ?

pan3793 commented 2 years ago

I was having trouble with connecting to clickhouse from the above connector.

The connector is actively maintained now, you can open issues for specific questions, and you are free to choose the tech stack based on your decision.

... will it have other disadvantages?

If it meets your requirements, it's the best option.