housepower / ClickHouse-Native-JDBC

ClickHouse Native Protocol JDBC implementation
https://housepower.github.io/ClickHouse-Native-JDBC/
Apache License 2.0
527 stars 145 forks source link

java.lang.NoClassDefFoundError: io/airlift/compress/zstd/ZstdCompressor #426

Closed swarupsarangi113 closed 2 years ago

swarupsarangi113 commented 2 years ago

Environment

Jars Used

clickhouse-native-jdbc-2.6.5.jar delta-core_2.12-1.0.0.jar hadoop-aws-3.2.2.jar

I am trying to load a dataframe from S3 Delta lake to ClickHouse Table. This code works fine when running locally, but when running in EMR Cluster I am getting the error below.

Error logs

py4j.protocol.Py4JJavaError: An error occurred while calling o119.save.
: java.lang.NoClassDefFoundError: io/airlift/compress/zstd/ZstdCompressor
        at com.github.housepower.buffer.CompressedBuffedWriter.<init>(CompressedBuffedWriter.java:35)
        at com.github.housepower.serde.BinarySerializer.<init>(BinarySerializer.java:36)
        at com.github.housepower.client.NativeClient.connect(NativeClient.java:62)
        at com.github.housepower.client.NativeClient.connect(NativeClient.java:48)
        at com.github.housepower.jdbc.ClickHouseConnection.createNativeContext(ClickHouseConnection.java:321)
        at com.github.housepower.jdbc.ClickHouseConnection.createClickHouseConnection(ClickHouseConnection.java:316)
        at com.github.housepower.jdbc.ClickHouseDriver.connect(ClickHouseDriver.java:58)
        at com.github.housepower.jdbc.ClickHouseDriver.connect(ClickHouseDriver.java:51)
        at com.github.housepower.jdbc.ClickHouseDriver.connect(ClickHouseDriver.java:26)
        at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
        at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProvider$.create(ConnectionProvider.scala:68)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$createConnectionFactory$1(JdbcUtils.scala:62)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:48)
        at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:46)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:90)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
        at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:301)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.ClassNotFoundException: io.airlift.compress.zstd.ZstdCompressor
        at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 51 more

My Code

def load_into_clickhouse(self, df):
    df.write \
        .format("jdbc") \
        .mode("overwrite") \
        .option("driver", "com.github.housepower.jdbc.ClickHouseDriver") \
        .option("url", self.parameters["connection_properties"]["url"]) \
        .option("createTableOptions", "engine=MergeTree() order by (prospectid) primary key (prospectid)") \
        .option("user", "clickhouse_operator") \
        .option("password", "clickhouse_operator_password") \
        .option("dbtable", "clickhousedb.Prospect_Base") \
        .option("truncate", "true") \
        .save()

Other descriptions

pan3793 commented 2 years ago

you can use clickhouse-native-jdbc-shaded-2.6.5.jar, it is a self-contained library.

swarupsarangi113 commented 2 years ago

I tried with clickhouse-native-jdbc-shaded-2.6.5.jar but it throws me the same error. However when I added aircompressor-0.21.jar that error went away.