awslabs / amazon-s3-tagging-spark-util

Apache License 2.0
10 stars 2 forks source link

Tagging util does not work with glue 3 #5

Closed vprihoda closed 1 year ago

vprihoda commented 2 years ago

Hello, I have tried the library which is built with Scala 2.12 amazon-s3-tagging-spark-util-assembly_2.12-1.0.jar on pyspark glue 3 job. The job reads parquets from given path and it should write the parquets with given tags.

df = spark_session.read.parquet(path)

df\
    .write\
    .mode("overwrite")\
    .format("s3.parquet")\
    .option("tags","{\"ProjectTeam\": \"Team-A\", \"FileType\":\"parquet\"}")\
    .save(write_path)

The jobs fails with expection The specified key does not exist.

2022-01-14 09:14:29,904 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
  File "/tmp/data-ingest_a_glue3", line 19, in <module>
    df.write.mode("append").format("s3.parquet").option("tag", "{\"ProjectTeam\": \"Team-A\", \"FileType\":\"parquet\"}").save(write_path)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1109, in save
    self._jwrite.save(path)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
    return f(*a, **kw)
  File "/opt/amazon/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o80.save.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:231)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:195)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:108)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:106)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:131)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:185)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:223)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:220)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:181)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:133)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:989)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:989)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:415)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:293)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost task 2.3 in stage 1.0 (TID 14) (10.114.116.119 executor 1): org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: GGF611NKETEHN64H; S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=; Proxy: null), S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
    at com.amazonaws.services.s3.AmazonS3Client.setObjectTagging(AmazonS3Client.java:1676)
    at com.amazonaws.s3.utils.S3TaggingHelper$.setTags(S3TaggingHelper.scala:46)
    at org.apache.spark.sql.execution.datasources.parquet.S3ParquetOutputWriter.close(S3ParquetOutputWriter.scala:14)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:58)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:75)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
    ... 9 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2465)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2414)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2413)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2413)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1124)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1124)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1124)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2679)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2621)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2610)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:914)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:200)
    ... 38 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:296)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$15(FileFormatWriter.scala:210)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The specified key does not exist. (Service: Amazon S3; Status Code: 404; Error Code: NoSuchKey; Request ID: GGF611NKETEHN64H; S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=; Proxy: null), S3 Extended Request ID: S6EGLJzEClNo76hUUKXd+lDNc2zBXVPFPmkp3nDcC38K0g1j/HuA32WwTYJe+zeRyVNYz45ZiwU=
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1145)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:802)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:770)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:744)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:704)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:686)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:550)
    at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:530)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5259)
    at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:5206)
    at com.amazonaws.services.s3.AmazonS3Client.setObjectTagging(AmazonS3Client.java:1676)
    at com.amazonaws.s3.utils.S3TaggingHelper$.setTags(S3TaggingHelper.scala:46)
    at org.apache.spark.sql.execution.datasources.parquet.S3ParquetOutputWriter.close(S3ParquetOutputWriter.scala:14)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.releaseResources(FileFormatDataWriter.scala:58)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:75)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:280)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:286)
    ... 9 more

I was able to identified that it probably fails when it tries to access the temporary file.

2022-01-14 09:14:29,658 DEBUG [Thread-5] handler.RequestIdLogger (RequestIdLogger.java:afterResponse(36)): https://data-ingest-data-dev.s3.amazonaws.com/?list-type=2&delimiter=%2F&max-keys=1&prefix=data-ingest_ingest_s3_csv_prihodav_curated%2Fmytable%2F.spark-staging-5d886f7b-ac47-47fd-8925-fbedccf6d287%2F&fetch-owner=false x-amz-request-id=GGFBQ5RNRV71MJPD x-amz-id-2=z+jnJHCOTRHqc8aK6xjUiemhOb5Z6Nh7a9M+5GNCTeMrTgdkfrbdNgcVWG7dpP/rim0tiWQyQK0=
2022-01-14 09:14:29,658 DEBUG [Thread-5] s3n.S3NativeFileSystem (S3NativeFileSystem.java:getFileStatus(531)): getFileStatus could not find key 'data-ingest_ingest_s3_csv_prihodav_curated/mytable/.spark-staging-5d886f7b-ac47-47fd-8925-fbedccf6d287'
2022-01-14 09:14:29,658 DEBUG [Thread-5] s3n.S3NativeFileSystem (S3NativeFileSystem.java:delete(384)): Delete called for 's3://data-ingest-data-dev/data-ingest_ingest_s3_csv_prihodav_curated/mytable/.spark-staging-5d886f7b-ac47-47fd-8925-fbedccf6d287' but file does not exist, so returning false

Could you please provide us support with this issue?

rumeshkrish commented 2 years ago

@vprihoda Thanks for detailed explanation. Currently i am working on it, We can expect new release in few weeks.

vprihoda commented 2 years ago

@vprihoda Thanks for detailed explanation. Currently i am working on it, We can expect new release in few weeks.

Hi, @rumeshkrish is there any progress, can we expect release in the near future?

crisdo98 commented 1 year ago

@vprihoda @rumeshkrish does anyone have a workaround for this please?

rumeshkrish commented 1 year ago

Please check the new [release V2.0] (https://github.com/awslabs/amazon-s3-tagging-spark-util/releases/tag/v2.0) page and README.md. Thanks for your patience.!

Closing this issue.

jon-elofson commented 8 months ago

i'm still hitting this same error using the amazon-s3-tagging-spark-util-spark_33-scala-2.12-lib-2.0.jar with Glue 4