aws-samples / amazon-s3-crr-preexisting-objects

Trigger Cross-Region Replication of Pre-Existing Objects with S3 Inventory and Amazon EMR
MIT No Attribution
3 stars 5 forks source link

An error occurred while calling o73.saveAsTextFile #9

Open carcuevas opened 4 years ago

carcuevas commented 4 years ago

Hi,

I tried the method as described in: https://aws.amazon.com/blogs/big-data/trigger-cross-region-replication-of-pre-existing-objects-using-amazon-s3-inventory-amazon-emr-and-amazon-athena/

And somehow once the EMR cluster is created, and the QUERY is done to the Athena DB, somehow it's not starting to copy the files, as it can be seen here after:

only showing top 20 rows

Repartitioning to 1
Traceback (most recent call last):
  File "/home/hadoop/copy_objects.py", line 187, in <module>
    .saveAsTextFile(output_path)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1585, in saveAsTextFile
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o73.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
    at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:96)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply$mcV$sp(PairRDDFunctions.scala:1096)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1.apply(PairRDDFunctions.scala:1094)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply$mcV$sp(PairRDDFunctions.scala:1067)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$4.apply(PairRDDFunctions.scala:1032)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply$mcV$sp(PairRDDFunctions.scala:958)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopFile$1.apply(PairRDDFunctions.scala:958)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:957)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply$mcV$sp(RDD.scala:1493)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1472)
    at org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1472)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1472)
    at org.apache.spark.api.java.JavaRDDLike$class.saveAsTextFile(JavaRDDLike.scala:550)
    at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 39, ip-10-0-1-185.ec2.internal, executor 1): org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1573122651393_0001/container_1573122651393_0001_01_000002/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1573122651393_0001/container_1573122651393_0001_01_000002/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2457, in pipeline_func
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2457, in pipeline_func
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 370, in func
  File "/home/hadoop/copy_objects.py", line 150, in <lambda>
  File "/home/hadoop/copy_objects.py", line 125, in copy_rows
  File "/home/hadoop/copy_objects.py", line 69, in copy_object
  File "/home/hadoop/copy_objects.py", line 47, in _get_object_attributes
  File "/usr/local/lib/python2.7/site-packages/boto3/resources/factory.py", line 339, in property_loader
    self.load()
  File "/usr/local/lib/python2.7/site-packages/boto3/resources/factory.py", line 505, in do_action
    response = action(self, *args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/boto3/resources/action.py", line 83, in __call__
    response = getattr(parent.meta.client, operation_name)(**params)
  File "/usr/local/lib/python2.7/site-packages/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python2.7/site-packages/botocore/client.py", line 661, in _make_api_call
    raise error_class(parsed_response, operation_name)
ClientError: An error occurred (404) when calling the HeadObject operation: Not Found

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:124)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414)
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
    ... 8 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1753)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1741)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1740)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1740)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:871)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:871)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1974)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1923)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1912)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:682)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
    at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
    ... 41 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:151)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:79)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1573122651393_0001/container_1573122651393_0001_01_000002/pyspark.zip/pyspark/worker.py", line 230, in main
    process()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1573122651393_0001/container_1573122651393_0001_01_000002/pyspark.zip/pyspark/worker.py", line 225, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2457, in pipeline_func
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2457, in pipeline_func
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 370, in func
  File "/home/hadoop/copy_objects.py", line 150, in <lambda>
  File "/home/hadoop/copy_objects.py", line 125, in copy_rows
  File "/home/hadoop/copy_objects.py", line 69, in copy_object
  File "/home/hadoop/copy_objects.py", line 47, in _get_object_attributes
  File "/usr/local/lib/python2.7/site-packages/boto3/resources/factory.py", line 339, in property_loader
    self.load()
  File "/usr/local/lib/python2.7/site-packages/boto3/resources/factory.py", line 505, in do_action
    response = action(self, *args, **kwargs)
  File "/usr/local/lib/python2.7/site-packages/boto3/resources/action.py", line 83, in __call__
    response = getattr(parent.meta.client, operation_name)(**params)
  File "/usr/local/lib/python2.7/site-packages/botocore/client.py", line 357, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python2.7/site-packages/botocore/client.py", line 661, in _make_api_call
    raise error_class(parsed_response, operation_name)
ClientError: An error occurred (404) when calling the HeadObject operation: Not Found

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:124)
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:123)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1414)
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:135)
    ... 8 more

Somehow, there is no more logs about it, the config I used in the script is:


#####################################################
# The environment variables are specific to the demo.
# Populate the values below to match your setup.
#
# EMR cluster variables
AWS_PROFILE='default'
REGION='us-east-1'
SUBNET_ID='subnet-48fdbb8'
EMR_CLUSTER_NAME='CrrPreexistingDemo'
# NOTE: The Spark application stores its results in the inventory bucket.
#       This script clears the results on each run using s3 rm --recursive.
INVENTORY_BUCKET='mytest2233-s3-inventories'
MASTER_INSTANCE_TYPE='m4.large'
CORE_INSTANCE_TYPE='m4.xlarge'
CORE_INSTANCE_COUNT='1'
EMR_RELEASE='emr-5.17.0'

# EMR job variables
GLUE_DATABASE_NAME='default'
ATHENA_TABLE_NAME='crr_private_files'
INVENTORY_DATE='2019-11-06-00-00'
PARTITIONS='1'
#####################################################

The Athena DB and tables are ok (I gave different names here) ... but somehow it seems to be that maybe it should be installed some additional library or something???

Thanks very much

carcuevas commented 4 years ago

Also tried with

EMR_RELEASE='emr-5.27.0'

with the same result

carcuevas commented 4 years ago

One more detail, can it be because certain files have special characters which are not allowed by S3 as for example:

logos/files/000/000/007/original/photo%282%29.JPG

I will try to remove the files from the list it's because of that maybe...

msambol commented 4 years ago

I think you're right about the special characters. We're looking at this -- thanks for opening the issue! 👍

carcuevas commented 4 years ago

Thanks to you for looking at it ;-) btw I tried this in the copy_objects.py:

def copy_objects(spark, inventory_table, inventory_date, partitions, copy_acls):
    query = """
        SELECT bucket, key
        FROM {}
        WHERE dt = '{}'
        AND key like '%=%%' ESCAPE '='
        AND (replication_status = '""'
        OR replication_status = '"FAILED"')
        """.format(inventory_table, inventory_date)

but no luck with the

ESCAPE '='

I got this, I get I am missing here something in the syntax...

Query: 
        SELECT bucket, key
        FROM default.crr_private_files
        WHERE dt = '2019-11-06-00-00'
        AND key like '%=%%' ESCAPE '='
        AND (replication_status = '""'
        OR replication_status = '"FAILED"')

Traceback (most recent call last):
  File "/home/hadoop/copy_objects.py", line 185, in <module>
    copied_objects = copy_objects(spark, inventory_table, inventory_date, partitions, acls)
  File "/home/hadoop/copy_objects.py", line 143, in copy_objects
    crr_failed = spark.sql(query)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 767, in sql
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 73, in deco
pyspark.sql.utils.ParseException: u'\nmismatched input \'ESCAPE\' expecting <EOF>(line 5, pos 28)\n\n== SQL ==\n\n        SELECT bucket, key\n        FROM default.crr_private_files\n        WHERE dt = \'2019-11-06-00-00\'\n        AND key like \'%=%%\' ESCAPE \'=\'\n----------------------------^^^\n        AND (replication_status = \'""\'\n        OR replication_status = \'"FAILED"\')\n        \n'
carcuevas commented 4 years ago

Somehow I cannot make it work, I guess there is some problem with the parser for the query, maybe also doesn't like the % symbols, so I cannot excluded the files with bad character names...


        SELECT bucket, key
        FROM default.crr_private_files
        WHERE dt = '2019-11-06-00-00'
        AND (replace (key, '%', '@') NOT LIKE '%@%
        AND (replication_status = '""'
        OR replication_status = '"FAILED"'))

Traceback (most recent call last):
  File "/home/hadoop/copy_objects.py", line 185, in <module>
    copied_objects = copy_objects(spark, inventory_table, inventory_date, partitions, acls)
  File "/home/hadoop/copy_objects.py", line 143, in copy_objects
    crr_failed = spark.sql(query)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 767, in sql
  File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 73, in deco
pyspark.sql.utils.ParseException: u'\nmismatched input \'FROM\' expecting <EOF>(line 3, pos 8)\n\n== SQL ==\n\n        SELECT bucket, key\n        FROM default.crr_private_files\n--------^^^\n        WHERE dt = \'2019-11-06-00-00\'\n        AND (replace (key, \'%\', \'@\') NOT LIKE \'%@%\n        AND (replication_status = \'""\'\n        OR replication_status = \'"FAILED"\'))\n        \n'```
msambol commented 4 years ago

@carcuevas Sorry for the delay. We haven't been able to reproduce this, unfortunately. Can you try with this branch: head-fix? We think the object may have no longer existed, i.e., it was in inventory but got deleted since. The branch catches any errors and writes CopyInPlace to FALSE. Then you can filter the results to see the objects that failed.

carcuevas commented 4 years ago

Hi @msambol tomorrow morning I am going to give a try :) Thanks very much for trying to solve this...!! :) I'll let you know how it looks ....

thanks

avoidik commented 4 years ago

@carcuevas I beg my pardon, but were you able to test it?

zhasulan commented 4 years ago

@carcuevas And yet, how did you solve this problem?