ClickHouse / spark-clickhouse-connector

Spark ClickHouse Connector build on DataSourceV2 API
https://clickhouse.com/docs/en/integrations/apache-spark
Apache License 2.0
184 stars 65 forks source link

An JsonParseException error occurred while calling createOrReplace when use pyspark #179

Closed sketchmind closed 4 months ago

sketchmind commented 2 years ago

Env:

Code:

from typing import Dict, List

from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType

from common.sessions import spark

def set_nullable(ori: DataFrame, cols: List[str] = None, nullable: bool = False,
                 case_sensitive: bool = False) -> DataFrame:
    if cols is None:
        cols = ori.columns
    if not case_sensitive:
        cols = [col.lower() for col in cols]
    fields = []
    for x in ori.schema:
        if x.name if case_sensitive else x.name.lower() in cols:
            x.nullable = nullable
        fields.append(x)
    schema = StructType(fields)
    return spark.createDataFrame(ori.rdd, schema)

def set_comment(ori: DataFrame, comments: Dict[str, str], case_sensitive: bool = False) -> DataFrame:
    fields = []
    if not case_sensitive:
        comments = {k.lower(): v for k, v in comments.items()}
    for x in ori.schema:
        c = comments.get(x.name if case_sensitive else x.name.lower())
        if c is not None:
            x.metadata.update(comment=c)
        fields.append(x)
    schema = StructType(fields)
    return spark.createDataFrame(ori.rdd, schema)

demo = spark.sql(
    "SELECT * "
    "FROM VALUES (1, 'rice',0.8, 'can eat'), (2, 'vegetable', 1.2, NULL) AS tab(id, food, price, remark)")

df = set_nullable(set_comment(demo, {"food": "food", "price": "price usd"}), ["remark"], True)
df.show()
df.printSchema()

df.writeTo("clickhouse.app.test_schema_nullable") \
    .tableProperty("engine", "MergeTree()") \
    .tableProperty("order_by", "(food,)") \
    .tableProperty("settings.index_granularity", "8192") \
    .createOrReplace()

Console: image

py4j.protocol.Py4JJavaError: An error occurred while calling o214.createOrReplace.
: xenon.relocated.com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.clickhouse.client.stream.WrappedInputStream); line: 1, column: 2]
    at xenon.relocated.com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
    at xenon.relocated.com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735)
    at xenon.relocated.com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:659)
    at xenon.relocated.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2737)
    at xenon.relocated.com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
    at xenon.relocated.com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
    at xenon.relocated.com.fasterxml.jackson.databind.ObjectReader._bindAndReadValues(ObjectReader.java:2152)
    at xenon.relocated.com.fasterxml.jackson.databind.ObjectReader.readValues(ObjectReader.java:1861)
    at xenon.clickhouse.format.JSONEachRowSimpleOutput$.deserialize(JSONOutputFormat.scala:34)
    at xenon.clickhouse.client.NodeClient.$anonfun$syncQueryAndCheckOutputJSONEachRow$1(NodeClient.scala:68)
    at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:138)
    at xenon.clickhouse.client.NodeClient.syncQueryAndCheck(NodeClient.scala:149)
    at xenon.clickhouse.client.NodeClient.syncQueryAndCheckOutputJSONEachRow(NodeClient.scala:68)
    at xenon.clickhouse.ClickHouseCatalog.initialize(ClickHouseCatalog.scala:69)
    at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
    at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
    at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
    at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
    at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndIdentifier$.unapply(LookupCatalog.scala:123)
    at org.apache.spark.sql.connector.catalog.LookupCatalog$NonSessionCatalogAndIdentifier$.unapply(LookupCatalog.scala:73)
    at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$NonSessionCatalogAndTable$.unapply(ResolveCatalogs.scala:99)
    at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:69)
    at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:33)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30)
    at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:33)
    at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:28)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
    at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
    at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
    at scala.collection.immutable.List.foldLeft(List.scala:91)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
    at scala.collection.immutable.List.foreach(List.scala:431)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:215)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:209)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:172)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:193)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:192)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:88)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:196)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:196)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:88)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
    at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:194)
    at org.apache.spark.sql.DataFrameWriterV2.internalReplace(DataFrameWriterV2.scala:211)
    at org.apache.spark.sql.DataFrameWriterV2.createOrReplace(DataFrameWriterV2.scala:132)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:748)
pan3793 commented 2 years ago

Can not reproduce.

I'm not familiar w/ python, I translated and simplify you test case to a scala version. No error occurs.

  test("create or replace table 2") {
    def set_comment(ori: DataFrame, comments: Map[String, String]): DataFrame = {
      val newSchema = ori.schema.map { field =>
        comments.get(field.name).map(comment => field.withComment(comment)).getOrElse(field)
      }
      spark.createDataFrame(ori.rdd, StructType(newSchema))
    }

    val demo = spark.sql(
      """SELECT * FROM VALUES 
        |  (1, 'rice',      0.8, 'can eat'),
        |  (2, 'vegetable', 1.2, NULL     ) AS tab(id, food, price, remark)
        |""".stripMargin
    )
    val df = set_comment(demo, Map("food"-> "food", "price" -> "price usd") )
    spark.sql("create database app")
    df.writeTo("clickhouse.app.test_schema_nullable")
      .tableProperty("engine", "MergeTree()")
      .tableProperty("order_by", "(food,)")
      .tableProperty("settings.index_granularity", "8192")
      .tableProperty("settings.allow_nullable_key", "1")  // it's known issue
      .createOrReplace()
  }
sketchmind commented 2 years ago

Can not reproduce.

I'm not familiar w/ python, I translated and simplify you test case to a scala version. No error occurs.

  test("create or replace table 2") {
    def set_comment(ori: DataFrame, comments: Map[String, String]): DataFrame = {
      val newSchema = ori.schema.map { field =>
        comments.get(field.name).map(comment => field.withComment(comment)).getOrElse(field)
      }
      spark.createDataFrame(ori.rdd, StructType(newSchema))
    }

    val demo = spark.sql(
      """SELECT * FROM VALUES 
        |  (1, 'rice',      0.8, 'can eat'),
        |  (2, 'vegetable', 1.2, NULL     ) AS tab(id, food, price, remark)
        |""".stripMargin
    )
    val df = set_comment(demo, Map("food"-> "food", "price" -> "price usd") )
    spark.sql("create database app")
    df.writeTo("clickhouse.app.test_schema_nullable")
      .tableProperty("engine", "MergeTree()")
      .tableProperty("order_by", "(food,)")
      .tableProperty("settings.index_granularity", "8192")
      .tableProperty("settings.allow_nullable_key", "1")  // it's known issue
      .createOrReplace()
  }

I don't have this problem using 0.4.0, but 0.5.0-SNAPSHOT which get from snapshot repositories instead of compiled by me will also throw the error

pan3793 commented 2 years ago

Local test w/ Spark 3.2.1 and Spark 3.3.0, everything goes well

camper42 commented 2 years ago

@sketchmind your clickhouse server version ?

sketchmind commented 2 years ago

@sketchmind your clickhouse server version ?

21.3.7.62

camper42 commented 2 years ago

not same as us, can't do a quick test

btw, after #177 , @pan3793 align jackson version instead of create a shaded jar

xenon.relocated.com.fasterxml.jackson point it come from a snapshot before #177 and after #153 ?

try 0.5.0 stable ?

sketchmind commented 2 years ago

not same as us, can't do a quick test

btw, after #177 , @pan3793 align jackson version instead of create a shaded jar

xenon.relocated.com.fasterxml.jackson point it come from a snapshot before #177 and after #153 ?

try 0.5.0 stable ?

Thx, let me have a test

sketchmind commented 2 years ago

not same as us, can't do a quick test

btw, after #177 , @pan3793 align jackson version instead of create a shaded jar

xenon.relocated.com.fasterxml.jackson point it come from a snapshot before #177 and after #153 ?

try 0.5.0 stable ?

0.5.0 stable still has this problem, I will recently configured a scala environment to debug it

gfunc commented 2 years ago

experienced the same error when upgraded to 0.5.0 release and configed to use grpc port(9100) with sparksql, but things worked when switched to http port (8123)

jars used

setup:

error message:

22/08/22 06:37:40 ERROR SparkSQLDriver: Failed in [use clickhouse]
com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.clickhouse.client.stream.WrappedInputStream); line: 1, column: 2]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:659)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2737)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
        at com.fasterxml.jackson.databind.ObjectReader._bindAndReadValues(ObjectReader.java:2152)
        at com.fasterxml.jackson.databind.ObjectReader.readValues(ObjectReader.java:1861)
        at xenon.clickhouse.format.JSONEachRowSimpleOutput$.deserialize(JSONOutputFormat.scala:34)
        at xenon.clickhouse.client.NodeClient.$anonfun$syncQueryAndCheckOutputJSONEachRow$1(NodeClient.scala:67)
        at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:138)
        at xenon.clickhouse.client.NodeClient.syncQueryAndCheck(NodeClient.scala:150)
        at xenon.clickhouse.client.NodeClient.syncQueryAndCheckOutputJSONEachRow(NodeClient.scala:67)
        at xenon.clickhouse.ClickHouseCatalog.initialize(ClickHouseCatalog.scala:69)
        at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
        at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
        at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndNamespace$.unapply(LookupCatalog.scala:86)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:33)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
        at org.apache.spark.sql.catalyst.plans.logical.SetCatalogAndNamespace.mapChildren(v2Commands.scala:752)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:32)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:28)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:286)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
com.fasterxml.jackson.core.JsonParseException: Unexpected character (':' (code 58)): expected a valid value (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (com.clickhouse.client.stream.WrappedInputStream); line: 1, column: 2]
        at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2391)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:735)
        at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:659)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2737)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:902)
        at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:794)
        at com.fasterxml.jackson.databind.ObjectReader._bindAndReadValues(ObjectReader.java:2152)
        at com.fasterxml.jackson.databind.ObjectReader.readValues(ObjectReader.java:1861)
        at xenon.clickhouse.format.JSONEachRowSimpleOutput$.deserialize(JSONOutputFormat.scala:34)
        at xenon.clickhouse.client.NodeClient.$anonfun$syncQueryAndCheckOutputJSONEachRow$1(NodeClient.scala:67)
        at xenon.clickhouse.client.NodeClient.syncQuery(NodeClient.scala:138)
        at xenon.clickhouse.client.NodeClient.syncQueryAndCheck(NodeClient.scala:150)
        at xenon.clickhouse.client.NodeClient.syncQueryAndCheckOutputJSONEachRow(NodeClient.scala:67)
        at xenon.clickhouse.ClickHouseCatalog.initialize(ClickHouseCatalog.scala:69)
        at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
        at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:53)
        at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
        at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:53)
        at org.apache.spark.sql.connector.catalog.LookupCatalog$CatalogAndNamespace$.unapply(LookupCatalog.scala:86)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:33)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs$$anonfun$apply$1.applyOrElse(ResolveCatalogs.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$4(AnalysisHelper.scala:175)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1228)
        at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1227)
        at org.apache.spark.sql.catalyst.plans.logical.SetCatalogAndNamespace.mapChildren(v2Commands.scala:752)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:175)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning(AnalysisHelper.scala:99)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsWithPruning$(AnalysisHelper.scala:96)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators(AnalysisHelper.scala:76)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperators$(AnalysisHelper.scala:75)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:32)
        at org.apache.spark.sql.catalyst.analysis.ResolveCatalogs.apply(ResolveCatalogs.scala:28)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211)
        at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
        at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
        at scala.collection.immutable.List.foldLeft(List.scala:91)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200)
        at scala.collection.immutable.List.foreach(List.scala:431)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:227)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:172)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:223)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:187)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:208)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:207)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:184)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:651)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
        at scala.collection.Iterator.foreach(Iterator.scala:943)
        at scala.collection.Iterator.foreach$(Iterator.scala:943)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:286)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:958)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1046)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1055)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
pan3793 commented 2 years ago

Hi @gfunc, have you set other configurations?

pan3793 commented 2 years ago

Please disable compression when you are using gRPC w/ old(before 22.3) ClickHouse and see what will happen.

spark.clickhouse.write.compression.codec=none
spark.clickhouse.read.compression.codec=none
sketchmind commented 2 years ago

Please disable compression when you are using gRPC w/ old(before 22.3) ClickHouse and see what will happen.

spark.clickhouse.write.compression.codec=none
spark.clickhouse.read.compression.codec=none

I have set options like this before, still the same error

pan3793 commented 2 years ago

I just merged https://github.com/housepower/spark-clickhouse-connector/pull/183 which added clickhouse 21.3 21.8 22.3 22.8 in CI for testing, it would be good if you can tune the build-in test case to reproduce the error in CI.

gfunc commented 2 years ago

Please disable compression when you are using gRPC w/ old(before 22.3) ClickHouse and see what will happen.

spark.clickhouse.write.compression.codec=none
spark.clickhouse.read.compression.codec=none

the same error with below configuration

kubectl exec -ti -n ${namespace} ${spark_cluster}-master-0 -- spark-sql \
        --master spark://${spark_cluster}-master-svc:7077 \
        --conf spark.sql.catalog.clickhouse=xenon.clickhouse.ClickHouseCatalog \
        --conf spark.sql.catalog.clickhouse.host=clickhouse-prod.infra.svc.cluster.local \
        --conf spark.sql.catalog.clickhouse.protocol=grpc \
        --conf spark.sql.catalog.clickhouse.grpc_port=9100 \
        --conf spark.clickhouse.write.compression.codec=none \
        --conf spark.clickhouse.read.compression.codec=none \
        --conf spark.sql.catalog.clickhouse.user=${CH_TEST_USER} \
        --conf spark.sql.catalog.clickhouse.password=${CH_TEST_USER_PW} \
        --conf spark.sql.catalog.clickhouse.database=default
camper42 commented 1 year ago

I notice that we run our tests against http protocol only @pan3793

we use http write to clickhouse catalog no matter grpcEnabled

camper42 commented 1 year ago

https://github.com/ClickHouse/clickhouse-java#features

⚠️ experimental, works with 22.3+, known to has issue with lz4 compression and may cause high memory usage on server

mzitnik commented 4 months ago

GRCP will not be supported with the next version of the Java client. We started to deprecate the package