apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.23k stars 2.17k forks source link

Unable to write to iceberg table using spark #8419

Open palanik1 opened 1 year ago

palanik1 commented 1 year ago

Query engine

Setup: Spark: 3.3.3 Scala: 2.12.15 (OpenJDK 64-Bit Server VM, Java 11.0.20)

sparkConf=(SparkConf() .set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,software.amazon.awssdk:bundle:2.20.18,software.amazon.awssdk:url-connection-client:2.20.18") .set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .set("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .set("spark.sql.defaultCatalog", "iceberg")
.set("spark.sql.catalog.iceberg.type", "rest") ) spark = SparkSession.builder.config(conf=conf).getOrCreate()

Question

I have a simple pyspark program that uses spark.sql to create a table and insert into it

df = spark.sql("CREATE TABLE iceberg.db.sample_table (id bigint, data string) USING iceberg")
df = spark.sql("INSERT INTO TABLE iceberg.db.sample_table VALUES (1,'a')")

I get a java.lang.ClassNotFoundException: org.apache.iceberg.spark.source.SparkWrite$WriterFactory error as follows, although I have org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1 included in the spark conf.

23/08/29 18:39:26 WARN TaskSetManager: Lost task 0.0 in stage 6.0 (TID 5) (10.168.113.20 executor 0): java.lang.ClassNotFoundException: org.apache.iceberg.spark.source.SparkWrite$WriterFactory
        at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
        at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
        at java.base/java.lang.Class.forName0(Native Method)
        at java.base/java.lang.Class.forName(Class.java:398)
        at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
        at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
        at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
        at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

23/08/29 18:39:26 ERROR TaskSetManager: Task 0 in stage 6.0 failed 4 times; aborting job
23/08/29 18:39:26 ERROR AppendDataExec: Data source write support IcebergBatchWrite(table=iceberg.db.sample_table, format=PARQUET) is aborting.
23/08/29 18:39:26 ERROR AppendDataExec: Data source write support IcebergBatchWrite(table=iceberg.db.sample_table, format=PARQUET) aborted.
Traceback (most recent call last):
  File "~/examples/spark-example.py", line 93, in <module>
    df = spark.sql("INSERT INTO TABLE iceberg.db.sample_table VALUES (1,'a')")
  File "/root/spark/spark-3.3.3-bin-hadoop3/python/pyspark/sql/session.py", line 1034, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self)
  File "/root/spark/spark-3.3.3-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1321, in __call__
  File "/root/spark/spark-3.3.3-bin-hadoop3/python/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/root/spark/spark-3.3.3-bin-hadoop3/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py", line 326, in get_return_value

        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:332)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:331)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:244)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:219)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invo
        at java.base/java.lang.Class.forName(Class.java:398)
        at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:71)
        at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2003)
        at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1870)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2201)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.readArray(ObjectInputStream.java:2134)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1675)
        at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2496)
        at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2390)
        at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2228)
        at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1687)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:489)
        at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:447)
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:87)
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:129)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:83)
        at org.apache.spark.scheduler.Task.run(Task.scala:136)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGSchedule

Any thoughts on what I might be missing. Thank you!

maulanaady commented 1 year ago

Have you solved this problem? I have the same issue

palanik1 commented 1 year ago

What seemed to work for me was to change

sparkConf=(SparkConf()
.set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,software.amazon.awssdk:bundle:2.20.18,software.amazon.awssdk:url-connection-client:2.20.18")

to

sparkConf=(SparkConf()
.set("spark.jars", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,software.amazon.awssdk:bundle:2.20.18,software.amazon.awssdk:url-connection-client:2.20.18")
di2mot commented 11 months ago

Are you sure it's this way sparkConf=(SparkConf() .set("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,software.amazon.awssdk:bundle:2.20.18,software.amazon.awssdk:url-connection-client:2.20.18") and not that way: sparkConf=(SparkConf() .set("spark.jars", "iceberg-spark-runtime-3.3_2.12-1.3.1,bundle:2.20.18,url-connection-client-2.20.18") ?

RussellSpitzer commented 11 months ago

Pyspark I think has some issues with setting "packages" in the Spark conf since the py4j execution means that the Spark Context has to be started a bit weirdly. I would try use --packages on the cli instead of configuring within the context to see what happens.

di2mot commented 11 months ago

This works for me in general it works:

("spark.jars.packages", "org.apache.iceberg:iceberg-spark3:0.11.0"),
("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"),
("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.spark.SparkCatalog"),
("spark.sql.catalog.iceberg.catalog.iceberg.type", "hadoop"),
("spark.sql.catalog.iceberg.warehouse", self.path)

But it when I use local, not in docker/kuber, and on the server, on the Airflow we use this one without spark.jars.packages:

...
("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"),
("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.spark.SparkCatalog"),
("spark.sql.catalog.iceberg.catalog.iceberg.type", "hadoop"),
("spark.sql.catalog.iceberg.warehouse", self.path)
...

Becouse we add them in .yaml file: packages:

ExplorData24 commented 9 months ago

@palanik1 @di2mot @maulanaady @RussellSpitzer @dacort Hello.

I am using Hive Catalog to create Iceberg tables with Spark as the execution engine:

import pyspark from pyspark.sql import SparkSession import os

DEFINE SENSITIVE VARIABLES

HIVE_URI = os.environ.get("HIVE_URI","thrift://hive-metastore:9083") WAREHOUSE = os.environ.get("WAREHOUSE", "s3a://warehouse/") AWS_ACCESS_KEY = os.environ.get("AWS_ACCESS_KEY", "xxxxxxxxxxxxxxxx") AWS_SECRET_KEY = os.environ.get("AWS_SECRET_KEY", "xxxxxxxxxxxxxxxxxxxxxxxxxxxx") AWS_S3_ENDPOINT = os.environ.get("AWS_S3_ENDPOINT", "http://minioserver:9000/")

print(AWS_S3_ENDPOINT) print(HIVE_URI) print(WAREHOUSE) conf = ( pyspark.SparkConf() .setAppName('app_name')

packages

.set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')

SQL Extensions

.set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')

Configuring Catalog

.set('spark.sql.catalog.catalog_hive', 'org.apache.iceberg.spark.SparkCatalog') .set('spark.sql.catalog.catalog_hive.type', 'hive') .set('spark.sql.catalog.catalog_hive.uri', HIVE_URI) .set('spark.sql.catalog.catalog_hive.warehouse.dir', WAREHOUSE) .set('spark.sql.catalog.catalog_hive.endpoint', AWS_S3_ENDPOINT) .set('spark.sql.catalog.catalog_hive.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') .set('spark.hadoop.fs.s3a.access.key', AWS_ACCESS_KEY) .set('spark.hadoop.fs.s3a.secret.key', AWS_SECRET_KEY) .set("spark.hadoop.fs.s3a.path.style.access", "true") )

Start Spark Session

spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running")

Create a Table

spark.sql("CREATE TABLE catalog_hive.default.tmy_table (name STRING) USING iceberg;").show()

Insert Some Data

spark.sql("INSERT INTO catalog_hive.default.my_table VALUES ('ns'), ('nd'), ('Ja')").show()

Query the Data

spark.sql("SELECT * FROM catalog_hive.default.my_table;").show()

When I try to run createTable command it gives me an exception: SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

Py4JJavaError Traceback (most recent call last) Cell In[4], line 38 35 print("Spark Running") 37 #Create a Table ---> 38 spark.sql("CREATE TABLE catalog_hive.default.tmy_table (name STRING) USING iceberg;").show() 40 #Insert Some Data 41 spark.sql("INSERT INTO catalog_hive.default.my_table VALUES ('Alex Merced'), ('Dipankar Mazumdar'), ('Jason Hughes')").show()

File ~/.local/lib/python3.10/site-packages/pyspark/sql/session.py:1034, in SparkSession.sql(self, sqlQuery, kwargs) 1032 sqlQuery = formatter.format(sqlQuery, kwargs) 1033 try: -> 1034 return DataFrame(self._jsparkSession.sql(sqlQuery), self) 1035 finally: 1036 if len(kwargs) > 0:

File ~/.local/lib/python3.10/site-packages/py4j/java_gateway.py:1321, in JavaMember.call(self, *args) 1315 command = proto.CALL_COMMAND_NAME + 1316 self.command_header + 1317 args_command + 1318 proto.END_COMMAND_PART 1320 answer = self.gateway_client.send_command(command) -> 1321 return_value = get_return_value( 1322 answer, self.gateway_client, self.target_id, self.name) 1324 for temp_arg in temp_args: 1325 temp_arg._detach()

File ~/.local/lib/python3.10/site-packages/pyspark/sql/utils.py:190, in capture_sql_exception..deco(*a, kw) 188 def deco(*a: Any, *kw: Any) -> Any: 189 try: --> 190 return f(a, kw) 191 except Py4JJavaError as e: 192 converted = convert_exception(e.java_exception)

File ~/.local/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --> 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.\n". 328 format(target_id, ".", name), value) 329 else: 330 raise Py4JError( 331 "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n". 332 format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o49.sql. : software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: 2MBCRA6QRAF6SMBQ, Extended Request ID: s41ibIYx6fFDoMXiRK+8TRNkUT/GsiwqEzR5X2Drq9cY213HQkX19/PxSacQwo+SPX8eAqTNy7k=) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85) at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:95) at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:245) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:167) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:175) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76) at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56) at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:9325) at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:422) at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:267) at java.base/sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:341) at java.base/sun.nio.cs.StreamEncoder.close(StreamEncoder.java:161) at java.base/java.io.OutputStreamWriter.close(OutputStreamWriter.java:255) at org.apache.iceberg.TableMetadataParser.internalWrite(TableMetadataParser.java:127) at org.apache.iceberg.TableMetadataParser.overwrite(TableMetadataParser.java:110) at org.apache.iceberg.BaseMetastoreTableOperations.writeNewMetadata(BaseMetastoreTableOperations.java:162) at org.apache.iceberg.hive.HiveTableOperations.doCommit(HiveTableOperations.java:234) at org.apache.iceberg.BaseMetastoreTableOperations.commit(BaseMetastoreTableOperations.java:133) at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:174) at org.apache.iceberg.CachingCatalog$CachingTableBuilder.lambda$create$0(CachingCatalog.java:261) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406) at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1908) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62) at org.apache.iceberg.CachingCatalog$CachingTableBuilder.create(CachingCatalog.java:257) at org.apache.iceberg.spark.SparkCatalog.createTable(SparkCatalog.java:192) at org.apache.iceberg.spark.SparkCatalog.createTable(SparkCatalog.java:99) at org.apache.spark.sql.execution.datasources.v2.CreateTableExec.run(CreateTableExec.scala:45) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43) at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98) at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560) at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94) at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81) at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79) at org.apache.spark.sql.Dataset.(Dataset.scala:220) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97) at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779) at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.base/java.lang.Thread.run(Thread.java:829)

Even if I use a hadoop type iceberg catalog, I always get the same error: " Py4JJavaError: An error occurred while calling o49.sql. : software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: 5KRB8NP7R6TJ2JC4, Extended Request ID: aHgN8vSpD8xf5/Mu9u34rRJF7fcKWlupodDS67WAuQJ5+pTiyWqltK51IJADZKYXEcTSqGbgl2Y=)"

For this script: import os import pyspark from pyspark.sql import SparkSession

WAREHOUSE = os.environ.get("WAREHOUSE", "s3a://warehouse/") AWS_ACCESS_KEY_ID = os.environ.get("AWS_ACCESS_KEY_ID", "xxxxxxxxxxxxxxxxxxxxxxx") AWS_SECRET_ACCESS_KEY = os.environ.get("AWS_SECRET_ACCESS_KEY", "xxxxxxxxxxxxxxxxxxxxx") AWS_S3_ENDPOINT= os.environ.get("AWS_S3_ENDPOINT","http://minioserver:9000/")

conf = ( pyspark.SparkConf() .setAppName("app_name") .set("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.0.0,org.apache.hadoop:hadoop-aws:3.3.1,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178") .set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") .set("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") .set("spark.sql.catalog.iceberg.type", "hadoop") .set("spark.sql.catalog.iceberg.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .set("spark.sql.catalog.iceberg.warehouse", WAREHOUSE) .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") .set("spark.hadoop.fs.s3a.endpoint", AWS_S3_ENDPOINT) .set("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID) .set("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY) .set("spark.hadoop.fs.s3a.path.style.access", "true")

.set("spark.hadoop.fs.s3a.requester.pays.enabled", "true")

)

spark = SparkSession.builder.config(conf=conf).getOrCreate() print("Spark Running")

spark.sql("CREATE TABLE iceberg.tab (name string) USING iceberg;")

NB: (AWS_ACCESS_KEY and AWS_SECRET_KEy, AWS _S3_ENDPOINT and S3A://warehouse / are correct and I have already tested them for read data from minio.

Any thoughts on what I might be missing. Thank you!

github-actions[bot] commented 2 weeks ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.