JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.8k stars 707 forks source link

EMRFS Pretrained Model Cache #14072

Closed Tyler-Rendina closed 3 months ago

Tyler-Rendina commented 9 months ago

Description

Problem

  1. EMR Serverless (EMR on EKS too?) has major limitations when it comes to HDFS which prevents loading pretrained models.
  2. Caching in s3 requires passing AWS Access Key ID and AWS Secret Access Key values via spark submit, which is a security risk.

Is it feasible to set spark.jsl.settings.pretrained.cache_folder to an s3 bucket and authenticate using the IAM role specified with spark submit?

Preferred Solution

Instead of using spark.jsl.settings.aws.credentials.access_key_id and spark.jsl.settings.aws.credentials.secret_access_key when setting spark.jsl.settings.pretrained.cache_folder to an s3 location, can a user instead pass an IAM role?

Additional Context

A workaround was attempted, failed, and documented below:

  1. Generate temporary IAM credentials with boto3
  2. Restart the spark context with s3 caching technique examples/python/quick_start_offline.ipynb
Tyler-Rendina commented 9 months ago

@maziyarpanahi to reproduce:

Dockerfile for ECR Image referenced by EMR Serverless Application

FROM public.ecr.aws/emr-serverless/spark/emr-6.14.0:20230928-x86_64
USER root
RUN pip3 install spark-nlp==5.1.4
USER hadoop:hadoop

Spark Submit via Console

{
    "applicationConfiguration": [
        {
            "classification": "spark-defaults",
            "configurations": [],
            "properties": {
                "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
                "spark.sql.hive.convertMetastoreParquet": "false",
                "spark.kryoserializer.buffer.max": "2000M",
                "spark.driver.maxResultSize": "0",
                "spark.jars.packages": "com.johnsnowlabs.nlp:spark-nlp_2.12:5.1.4",
                "spark.jars": "/usr/lib/hudi/hudi-spark-bundle.jar, s3://BUCKET/jars/spark-nlp_2.12-5.1.4.jar",
                "spark.jsl.settings.pretrained.cache_folder": "s3://BUCKET/sparknlp/cache",
                "spark.jsl.settings.aws.region": "REGION"
            }
        }
    ]
}

Sentence Detector DL Model is the point of contention here.

class forEachBatchProcessor:

    def __init__(self):
        document_assembler = (
            DocumentAssembler()
            .setInputCol("content")
            .setOutputCol("document")
            .setCleanupMode("shrink_full")
        )

        cleanUpPatterns = [
            "(https?:\/\/)?([\da-z\.-]+)\.([a-z\.]{2,6})([\/\w\.-]*)|^\.\.\."
        ]

        document_normalizer = (
            DocumentNormalizer()
            .setInputCols("document")
            .setOutputCol("normalizedDocument")
            .setLowercase(False)
            .setAction("clean")
            .setPatterns(cleanUpPatterns)
            .setReplacement(" ")
            .setPolicy("pretty_all")
        )

        sentence_detector_dl = (
            SentenceDetectorDLModel()
            .pretrained()
            .setInputCols(["normalizedDocument"])
            .setOutputCol("sentences")
            .setMinLength(5)
            .setExplodeSentences(False)
        )

        self.pipeline = Pipeline(
            stages=[document_assembler, document_normalizer, sentence_detector_dl]
        )

Ultimately it's a file not found error in /home/hadoop/cache_pretrained which has inspired this feature request.

Tyler-Rendina commented 9 months ago

Stack Trace while using the default cache location (adding permissions and ownership to user hadoop:hadoop included here, same as leaving it out)

3.4.1-amzn-1
5.1.4
Internet is connected.
sentence_detector_dl download started this may take some time.
Approximate size to download 354.6 KB

[ | ]sentence_detector_dl download started this may take some time.
Approximate size to download 354.6 KB
Download done! Loading the resource.

[ / ]
[ — ]
[ \ ]
[ | ]
[ / ]
[ — ]
[ \ ]
[ | ]
[ / ]
[ — ]
An error occurred while calling z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.downloadModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) ([2600:1f18:3d:a503:edb3:ae2d:502b:9e73] executor 1): java.io.FileNotFoundException: File file:/home/hadoop/cache_pretrained/sentence_detector_dl_en_2.7.0_2.4_1609611052663/metadata/part-00000 does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:832)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1153)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:822)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:472)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
    at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
    at org.apache.hadoop.fs.ChecksumFileSystem.lambda$openFileWithOptions$0(ChecksumFileSystem.java:896)
    at org.apache.hadoop.util.LambdaUtils.eval(LambdaUtils.java:52)
    at org.apache.hadoop.fs.ChecksumFileSystem.openFileWithOptions(ChecksumFileSystem.java:894)
    at org.apache.hadoop.fs.FileSystem$FSDataInputStreamBuilder.build(FileSystem.java:4865)
    at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:115)
    at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:288)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:287)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:245)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:97)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2974)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2910)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2909)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2909)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1263)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1263)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1263)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3173)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3112)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3101)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2288)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307)
    at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1462)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1435)
    at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1476)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1476)
    at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:587)
    at org.apache.spark.ml.util.DefaultParamsReader.load(ReadWrite.scala:465)
    at com.johnsnowlabs.nlp.FeaturesReader.load(ParamsAndFeaturesReadable.scala:31)
    at com.johnsnowlabs.nlp.FeaturesReader.load(ParamsAndFeaturesReadable.scala:24)
    at com.johnsnowlabs.nlp.pretrained.ResourceDownloader$.downloadModel(ResourceDownloader.scala:518)
    at com.johnsnowlabs.nlp.pretrained.ResourceDownloader$.downloadModel(ResourceDownloader.scala:510)
    at com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader$.downloadModel(ResourceDownloader.scala:709)
    at com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.downloadModel(ResourceDownloader.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.FileNotFoundException: File file:/home/hadoop/cache_pretrained/sentence_detector_dl_en_2.7.0_2.4_1609611052663/metadata/part-00000 does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:832)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1153)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:822)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:472)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
    at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
    at org.apache.hadoop.fs.ChecksumFileSystem.lambda$openFileWithOptions$0(ChecksumFileSystem.java:896)
    at org.apache.hadoop.util.LambdaUtils.eval(LambdaUtils.java:52)
    at org.apache.hadoop.fs.ChecksumFileSystem.openFileWithOptions(ChecksumFileSystem.java:894)
    at org.apache.hadoop.fs.FileSystem$FSDataInputStreamBuilder.build(FileSystem.java:4865)
    at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:115)
    at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:288)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:287)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:245)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:97)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

[OK!]
Traceback (most recent call last):
  File "/tmp/spark-45ccd940-724b-4145-a054-1e60d66da1ab/consolidate.py", line 260, in <module>
    instantiateForEachBatchProcessor = forEachBatchProcessor()
  File "/tmp/spark-45ccd940-724b-4145-a054-1e60d66da1ab/consolidate.py", line 77, in __init__
    SentenceDetectorDLModel()
  File "/usr/local/lib/python3.7/site-packages/sparknlp/annotator/sentence/sentence_detector_dl.py", line 467, in pretrained
    return ResourceDownloader.downloadModel(SentenceDetectorDLModel, name, lang, remote_loc)
  File "/usr/local/lib/python3.7/site-packages/sparknlp/pretrained/resource_downloader.py", line 99, in downloadModel
    raise e
  File "/usr/local/lib/python3.7/site-packages/sparknlp/pretrained/resource_downloader.py", line 96, in downloadModel
    j_obj = _internal._DownloadModel(reader.name, name, language, remote_loc, j_dwn).apply()
  File "/usr/local/lib/python3.7/site-packages/sparknlp/internal/__init__.py", line 350, in __init__
    name, language, remote_loc)
  File "/usr/local/lib/python3.7/site-packages/sparknlp/internal/extended_java_wrapper.py", line 27, in __init__
    self._java_obj = self.new_java_obj(java_obj, *args)
  File "/usr/local/lib/python3.7/site-packages/sparknlp/internal/extended_java_wrapper.py", line 37, in new_java_obj
    return self._new_java_obj(java_class, *args)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 86, in _new_java_obj
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1323, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 169, in deco
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.downloadModel.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) ([2600:1f18:3d:a503:edb3:ae2d:502b:9e73] executor 1): java.io.FileNotFoundException: File file:/home/hadoop/cache_pretrained/sentence_detector_dl_en_2.7.0_2.4_1609611052663/metadata/part-00000 does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:832)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1153)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:822)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:472)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
    at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
    at org.apache.hadoop.fs.ChecksumFileSystem.lambda$openFileWithOptions$0(ChecksumFileSystem.java:896)
    at org.apache.hadoop.util.LambdaUtils.eval(LambdaUtils.java:52)
    at org.apache.hadoop.fs.ChecksumFileSystem.openFileWithOptions(ChecksumFileSystem.java:894)
    at org.apache.hadoop.fs.FileSystem$FSDataInputStreamBuilder.build(FileSystem.java:4865)
    at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:115)
    at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:288)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:287)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:245)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:97)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2974)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2910)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2909)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2909)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1263)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1263)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1263)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3173)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3112)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3101)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2267)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2288)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2307)
    at org.apache.spark.rdd.RDD.$anonfun$take$1(RDD.scala:1462)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
    at org.apache.spark.rdd.RDD.take(RDD.scala:1435)
    at org.apache.spark.rdd.RDD.$anonfun$first$1(RDD.scala:1476)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1476)
    at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:587)
    at org.apache.spark.ml.util.DefaultParamsReader.load(ReadWrite.scala:465)
    at com.johnsnowlabs.nlp.FeaturesReader.load(ParamsAndFeaturesReadable.scala:31)
    at com.johnsnowlabs.nlp.FeaturesReader.load(ParamsAndFeaturesReadable.scala:24)
    at com.johnsnowlabs.nlp.pretrained.ResourceDownloader$.downloadModel(ResourceDownloader.scala:518)
    at com.johnsnowlabs.nlp.pretrained.ResourceDownloader$.downloadModel(ResourceDownloader.scala:510)
    at com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader$.downloadModel(ResourceDownloader.scala:709)
    at com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.downloadModel(ResourceDownloader.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.FileNotFoundException: File file:/home/hadoop/cache_pretrained/sentence_detector_dl_en_2.7.0_2.4_1609611052663/metadata/part-00000 does not exist
    at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:832)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:1153)
    at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:822)
    at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:472)
    at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:160)
    at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:372)
    at org.apache.hadoop.fs.ChecksumFileSystem.lambda$openFileWithOptions$0(ChecksumFileSystem.java:896)
    at org.apache.hadoop.util.LambdaUtils.eval(LambdaUtils.java:52)
    at org.apache.hadoop.fs.ChecksumFileSystem.openFileWithOptions(ChecksumFileSystem.java:894)
    at org.apache.hadoop.fs.FileSystem$FSDataInputStreamBuilder.build(FileSystem.java:4865)
    at org.apache.hadoop.mapred.LineRecordReader.<init>(LineRecordReader.java:115)
    at org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.liftedTree1$1(HadoopRDD.scala:288)
    at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:287)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:245)
    at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:97)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
    at org.apache.spark.scheduler.Task.run(Task.scala:141)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Tyler-Rendina commented 9 months ago

Update

I've confirmed step 1 of a workaround with the below

session = Session()
credentials = session.get_credentials()
current_credentials = credentials.get_frozen_credentials()
os.environ['AWS_ACCESS_KEY_ID'] = current_credentials.access_key
os.environ['AWS_SECRET_ACCESS_KEY'] = current_credentials.secret_key
old_spark: SparkSession = (
    SparkSession
    .builder
    .getOrCreate()
)
old_conf = old_spark.sparkContext._conf.getAll()
new_conf = SparkConf()
for key, value in old_conf:
    new_conf.set(key, value)
old_spark.stop()
new_conf.set('spark.jsl.settings.aws.credentials.access_key_id', current_credentials.access_key)
new_conf.set('spark.jsl.settings.aws.credentials.secret_access_key', current_credentials.secret_key)
new_conf.set('spark.jsl.settings.aws.credentials.session_token', current_credentials.token)
new_conf.set('spark.eventLog.overwrite', 'true')
spark = (
    SparkSession.builder
    .config(conf=new_conf)

    .getOrCreate())
print(spark.sparkContext._conf.getAll())

However, after confirming the new session does in fact have the updated config, I still receive this error. Should I be setting Hadoop configs directly instead of with spark.jsl.settings.aws.credentials?

Error: Empty access.key and secret.key hadoop configuration and parameters.

[OK!]
Traceback (most recent call last):
  File "/tmp/spark-73462ad5-6316-40dd-84a3-b8cdc13a856c/consolidate.py", line 281, in <module>
    instantiateForEachBatchProcessor = forEachBatchProcessor()
  File "/tmp/spark-73462ad5-6316-40dd-84a3-b8cdc13a856c/consolidate.py", line 98, in __init__
    SentenceDetectorDLModel()
  File "/usr/local/lib/python3.7/site-packages/sparknlp/annotator/sentence/sentence_detector_dl.py", line 467, in pretrained
    return ResourceDownloader.downloadModel(SentenceDetectorDLModel, name, lang, remote_loc)
  File "/usr/local/lib/python3.7/site-packages/sparknlp/pretrained/resource_downloader.py", line 96, in downloadModel
    j_obj = _internal._DownloadModel(reader.name, name, language, remote_loc, j_dwn).apply()
  File "/usr/local/lib/python3.7/site-packages/sparknlp/internal/__init__.py", line 350, in __init__
    name, language, remote_loc)
  File "/usr/local/lib/python3.7/site-packages/sparknlp/internal/extended_java_wrapper.py", line 27, in __init__
    self._java_obj = self.new_java_obj(java_obj, *args)
  File "/usr/local/lib/python3.7/site-packages/sparknlp/internal/extended_java_wrapper.py", line 37, in new_java_obj
    return self._new_java_obj(java_class, *args)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 86, in _new_java_obj
  File "/usr/lib/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1323, in __call__
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py", line 175, in deco
pyspark.errors.exceptions.captured.IllegalArgumentException: requirement failed: Was not found appropriate resource to download for request: ResourceRequest(sentence_detector_dl,Some(en),public/models,5.1.2,3.4) with downloader: com.johnsnowlabs.nlp.pretrained.S3ResourceDownloader@25901693
Tyler-Rendina commented 9 months ago

Final Note

The request stands, my comments were an exercise to find a workaround.

@maziyarpanahi the request can be more concisely articulated to "EMRFS pretrained model cache". What do you think?

I don't believe there is a valid workaround for my use case as cross account hudi queries (documentation implies any data pulled across EMRFS) now fail. I believe it is because the spark context reset forces s3a instead of EMRFS. Redacted stdout:

py4j.protocol.Py4JJavaError: An error occurred while calling o707.load.
: org.apache.hudi.exception.HoodieIOException: Could not check if s3a:///is a valid table
    at org.apache.hudi.exception.TableNotFoundException.checkTableValidity(TableNotFoundException.java:59)
    at org.apache.hudi.common.table.HoodieTableMetaClient.<init>(HoodieTableMetaClient.java:137)
    at org.apache.hudi.common.table.HoodieTableMetaClient.newMetaClient(HoodieTableMetaClient.java:689)
    at org.apache.hudi.common.table.HoodieTableMetaClient.access$000(HoodieTableMetaClient.java:81)
    at org.apache.hudi.common.table.HoodieTableMetaClient$Builder.build(HoodieTableMetaClient.java:770)
    at org.apache.hudi.DefaultSource.sourceSchema(DefaultSource.scala:193)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35)
    at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:211)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on s3a:////.hoodie: com.amazonaws.services.s3.model.AmazonS3Exception: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID:; S3 Extended Request ID:/=; Proxy: null), S3 Extended Request ID:/=:400 Bad Request: Bad Request (Service: Amazon S3; Status Code: 400; Error Code: 400 Bad Request; Request ID:; S3 Extended Request ID:/=; Proxy: null)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:249)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3797)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3689)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getFileStatus$24(S3AFileSystem.java:3557)
    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2338)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2357)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:3555)
    at org.apache.hudi.common.fs.HoodieWrapperFileSystem.lambda$getFileStatus$17(HoodieWrapperFileSystem.java:410)
    at org.apache.hudi.common.fs.HoodieWrapperFileSystem.executeFuncWithTimeMetrics(HoodieWrapperFileSystem.java:114)
    at org.apache.hudi.common.fs.HoodieWrapperFileSystem.getFileStatus(HoodieWrapperFileSystem.java:404)
    at org.apache.hudi.exception.TableNotFoundException.checkTableValidity(TableNotFoundException.java:51)
github-actions[bot] commented 3 months ago

This issue is stale because it has been open 180 days with no activity. Remove stale label or comment or this will be closed in 5 days