JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.87k stars 712 forks source link

spark-nlp won't download pretrained model on Hadoop Cluster #5676

Closed DanielOX closed 3 years ago

DanielOX commented 3 years ago

Description

I am using the code below to get word embeddings using BERT model.

from sparknlp.pretrained import PretrainedPipeline
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

spark = SparkSession.builder\
    .master("yarn")\
    .config("spark.locality.wait", "0")\
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark23_2.11:3.0.0")\
    .config("spark.sql.autoBroadcastJoinThreshold", -1)\
    .config("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")\
    .getOrCreate()

document_assembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence") \
    .setLazyAnnotator(False)

embeddings = BertSentenceEmbeddings.pretrained("labse", "xx") \
      .setInputCols("sentence") \
      .setOutputCol("embeddings")
nlp_pipeline = Pipeline(stages=[document_assembler, sentence_detector, embeddings])
pipeline_model = nlp_pipeline.fit(spark.createDataFrame([[""]]).toDF("text"))

The script works great on spark local development mode but when i deployed the script on the Hadoop Cluster ( using YARN as a resource manager ) i get the following error

labse download started this may take some time.
Traceback (most recent call last):
  File "testing_bert_hadoop.py", line 138, in <module>
    embeddings = BertSentenceEmbeddings.pretrained("labse", "xx") \
  File "/usr/local/lib/python3.6/site-packages/sparknlp/annotator.py", line 1969, in pretrained
    return ResourceDownloader.downloadModel(BertSentenceEmbeddings, name, lang, remote_loc)
  File "/usr/local/lib/python3.6/site-packages/sparknlp/pretrained.py", line 32, in downloadModel
    file_size = _internal._GetResourceSize(name, language, remote_loc).apply()
  File "/usr/local/lib/python3.6/site-packages/sparknlp/internal.py", line 192, in __init__
    "com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize", name, language, remote_loc)
  File "/usr/local/lib/python3.6/site-packages/sparknlp/internal.py", line 129, in __init__
    self._java_obj = self.new_java_obj(java_obj, *args)
  File "/usr/local/lib/python3.6/site-packages/sparknlp/internal.py", line 139, in new_java_obj
    return self._new_java_obj(java_class, *args)
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0016/container_e199_1623058160826_0016_01_000001/pyspark.zip/pyspark/ml/wrapper.py", line 63, in _new_java_obj
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0016/container_e199_1623058160826_0016_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0016/container_e199_1623058160826_0016_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0016/container_e199_1623058160826_0016_01_000001/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize.
: java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse$default$3()Z
    at com.johnsnowlabs.nlp.pretrained.ResourceMetadata$.parseJson(ResourceMetadata.scala:61)
    at com.johnsnowlabs.nlp.pretrained.ResourceMetadata$$anonfun$readResources$1.applyOrElse(ResourceMetadata.scala:90)
    at com.johnsnowlabs.nlp.pretrained.ResourceMetadata$$anonfun$readResources$1.applyOrElse(ResourceMetadata.scala:89)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at scala.collection.Iterator$$anon$14.next(Iterator.scala:541)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:183)
    at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:45)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
    at scala.collection.AbstractIterator.to(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.toList(TraversableOnce.scala:294)
    at scala.collection.AbstractIterator.toList(Iterator.scala:1336)
    at com.johnsnowlabs.nlp.pretrained.ResourceMetadata$.readResources(ResourceMetadata.scala:92)
    at com.johnsnowlabs.nlp.pretrained.ResourceMetadata$.readResources(ResourceMetadata.scala:84)
    at com.johnsnowlabs.nlp.pretrained.S3ResourceDownloader.downloadMetadataIfNeed(S3ResourceDownloader.scala:70)
    at com.johnsnowlabs.nlp.pretrained.S3ResourceDownloader.resolveLink(S3ResourceDownloader.scala:81)
    at com.johnsnowlabs.nlp.pretrained.S3ResourceDownloader.getDownloadSize(S3ResourceDownloader.scala:159)
    at com.johnsnowlabs.nlp.pretrained.ResourceDownloader$.getDownloadSize(ResourceDownloader.scala:399)
    at com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader$.getDownloadSize(ResourceDownloader.scala:496)
    at com.johnsnowlabs.nlp.pretrained.PythonResourceDownloader.getDownloadSize(ResourceDownloader.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

I tried to manually updated the jars json4s-native, json4s-scalap and many others but the error still persists.

Expected Behavior

The pretrained pipeline should be downloaded and loaded into the pipeline_model variable

Current Behavior

Gives the above mentioned error while running on Hadoop cluster

Possible Solution

I tried to manually updated the jars json4s-native, json4s-scalap and many others but the error still persists. but maybe i am lacking some knowledge or misunderstanding the problem

Context

I was trying to get word embeddings using LABSE model for classification problem

Your Environment

Please do let me know if u need any more info. Thanks

maziyarpanahi commented 3 years ago

Hi @DanielOX

Thanks for opening this issue. The automatic download of pretrained models and pipelines relies on a valid and accessible FileSystem. In local, it will like for fs:/// and then the home directory of that user and downloads/extracts/loads from ~/cache_pretrained.

However, in the cluster, it will look for a distributed fileSystem such as HDFS, DBFS, S3, etc. The distributed fileSystem must be known and configured for Apache Spark, meaning if you do hdfs:///tmp inside Spark it knows which master node is HDFS is on, etc.

It seems you either don't have a distributed FileSystem, or it's not configured correctly, or the user you are running your Spark application via Livy doesn't have enough permission to access/create/extract models/pipelines. Unfortunately, there is not much to go on with since you only said spark comes default with Hadoop installation so I am not sure what is the actual cluster setup, but in any case, you can always download the models/pipelines and load them offline. (the models/pipelines must be on a distributed FileSystem : https://github.com/JohnSnowLabs/spark-nlp#offline)

DanielOX commented 3 years ago

Hi @maziyarpanahi

I have existing jobs written in spark, which donot use spark-nlp and they all read/save/update spark based machine learning pipelines into HDFS. I am only facing this issue with spark-nlp. I am not using any environment manager like conda etc, but the hadoop uses maven, by default. I am new to this distributed environment. When the spark job is running, the hadoop runs it with the user yarn and I can assure that the permissions were given to yarn properly as we faced another issue in our previous milestone. The problem i am stuck with is java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse$default$3()Z. How can i resolve this.

Thanks

maziyarpanahi commented 3 years ago

OK this is great that the YARN and Spark are correctly configured and work together without any issue regarding HDFS. In order to see what could have gone wrong with Spark NLP .pretrained() functionality, could you please:

The more info we have the easier it is to set up a similar environment on our side and quickly try to reproduce a similar application to see what config is missing and where.

Many thanks

DanielOX commented 3 years ago
NodeName Count RAM (each) Cores (each)
Master Node 1 38g 8
Secondary Node 1 38 g 8
Worker Nodes 4 24 g 4
Total 6 172g 32
maziyarpanahi commented 3 years ago

Many thanks, this is very useful! One last thing and then we can get to work to debug this:

In your node, you can run this command to enter the interactive pyspark shell:

pyspark \
--master "yarn" \
--deploy-mode "client" \
--driver-memory 16g \
--executor-cores 5 \
--executor-memory 5g \
--conf spark.kryoserializer.buffer.max=2000M \
--packages com.johnsnowlabs.nlp:spark-nlp_2.11:3.0.0

Once you are in that interactive pyspark shell, you can run the following code:

from sparknlp.annotator import *
from sparknlp.base import *

document_assembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence") \
    .setLazyAnnotator(False)

embeddings = BertSentenceEmbeddings.pretrained() \
      .setInputCols("sentence") \
      .setOutputCol("embeddings")

This should be enough as a test to be sure the error is not related to the whole cluster set up but Livy configuration. This test would be very helpful to rule some stuff out.

DanielOX commented 3 years ago

Okay @maziyarpanahi , just give me a minute

DanielOX commented 3 years ago

I tried running the command

pyspark \ --master "yarn" \ --deploy-mode "client" \ --driver-memory 16g \ --executor-cores 5 \ --executor-memory 5g \ --conf spark.kryoserializer.buffer.max=2000M \ --packages com.johnsnowlabs.nlp:spark-nlp_2.11:3.0.0

but it gave me this error

SPARK_MAJOR_VERSION is set to 2, using Spark2
Picked up _JAVA_OPTIONS: -Xmx8192m
Python 3.6.8 (default, Nov 16 2020, 16:55:22) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Picked up _JAVA_OPTIONS: -Xmx8192m
Picked up _JAVA_OPTIONS: -Xmx8192m
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.6.5.1175-1/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.johnsnowlabs.nlp#spark-nlp_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
:: resolution report :: resolve 2490ms :: artifacts dl 0ms
    :: modules in use:
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   1   |   0   |   0   |   0   ||   0   |   0   |
    ---------------------------------------------------------------------

:: problems summary ::
:::: WARNINGS
        module not found: com.johnsnowlabs.nlp#spark-nlp_2.11;3.0.0

    ==== local-m2-cache: tried

      file:/root/.m2/repository/com/johnsnowlabs/nlp/spark-nlp_2.11/3.0.0/spark-nlp_2.11-3.0.0.pom

      -- artifact com.johnsnowlabs.nlp#spark-nlp_2.11;3.0.0!spark-nlp_2.11.jar:

      file:/root/.m2/repository/com/johnsnowlabs/nlp/spark-nlp_2.11/3.0.0/spark-nlp_2.11-3.0.0.jar

    ==== local-ivy-cache: tried

      /root/.ivy2/local/com.johnsnowlabs.nlp/spark-nlp_2.11/3.0.0/ivys/ivy.xml

      -- artifact com.johnsnowlabs.nlp#spark-nlp_2.11;3.0.0!spark-nlp_2.11.jar:

      /root/.ivy2/local/com.johnsnowlabs.nlp/spark-nlp_2.11/3.0.0/jars/spark-nlp_2.11.jar

    ==== central: tried

      https://repo1.maven.org/maven2/com/johnsnowlabs/nlp/spark-nlp_2.11/3.0.0/spark-nlp_2.11-3.0.0.pom

      -- artifact com.johnsnowlabs.nlp#spark-nlp_2.11;3.0.0!spark-nlp_2.11.jar:

      https://repo1.maven.org/maven2/com/johnsnowlabs/nlp/spark-nlp_2.11/3.0.0/spark-nlp_2.11-3.0.0.jar

    ==== spark-packages: tried

      http://dl.bintray.com/spark-packages/maven/com/johnsnowlabs/nlp/spark-nlp_2.11/3.0.0/spark-nlp_2.11-3.0.0.pom

      -- artifact com.johnsnowlabs.nlp#spark-nlp_2.11;3.0.0!spark-nlp_2.11.jar:

      http://dl.bintray.com/spark-packages/maven/com/johnsnowlabs/nlp/spark-nlp_2.11/3.0.0/spark-nlp_2.11-3.0.0.jar

        ::::::::::::::::::::::::::::::::::::::::::::::

        ::          UNRESOLVED DEPENDENCIES         ::

        ::::::::::::::::::::::::::::::::::::::::::::::

        :: com.johnsnowlabs.nlp#spark-nlp_2.11;3.0.0: not found

        ::::::::::::::::::::::::::::::::::::::::::::::

:: USE VERBOSE OR DEBUG MESSAGE LEVEL FOR MORE DETAILS
Exception in thread "main" java.lang.RuntimeException: [unresolved dependency: com.johnsnowlabs.nlp#spark-nlp_2.11;3.0.0: not found]
    at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1297)
    at org.apache.spark.deploy.DependencyUtils$.resolveMavenDependencies(DependencyUtils.scala:53)
    at org.apache.spark.deploy.SparkSubmit$.doPrepareSubmitEnvironment(SparkSubmit.scala:363)
    at org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:249)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:170)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:136)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/shell.py", line 38, in <module>
    SparkContext._ensure_initialized()
  File "/usr/hdp/current/spark2-client/python/pyspark/context.py", line 289, in _ensure_initialized
    SparkContext._gateway = gateway or launch_gateway(conf)
  File "/usr/hdp/current/spark2-client/python/pyspark/java_gateway.py", line 95, in launch_gateway
    raise Exception("Java gateway process exited before sending the driver its port number")
Exception: Java gateway process exited before sending the driver its port number
maziyarpanahi commented 3 years ago

Sorry @DanielOX I totally forgot you are on Spark 2.3. The right package is spark23_2.11 :

pyspark \
--master "yarn" \
--deploy-mode "client" \
--driver-memory 16g \
--executor-cores 5 \
--executor-memory 5g \
--conf spark.kryoserializer.buffer.max=2000M \
--packages com.johnsnowlabs.nlp:spark-nlp-spark23_2.11:3.0.0
DanielOX commented 3 years ago

@maziyarpanahi Don't worry man! happens to me all the time :-v

DanielOX commented 3 years ago

@maziyarpanahi the pyspark command worked, here is the output

SPARK_MAJOR_VERSION is set to 2, using Spark2
Picked up _JAVA_OPTIONS: -Xmx8192m
Python 3.6.8 (default, Nov 16 2020, 16:55:22) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Picked up _JAVA_OPTIONS: -Xmx8192m
Picked up _JAVA_OPTIONS: -Xmx8192m
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/hdp/2.6.5.1175-1/spark2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.johnsnowlabs.nlp#spark-nlp-spark23_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
    confs: [default]
    found com.johnsnowlabs.nlp#spark-nlp-spark23_2.11;3.0.0 in central
    found com.typesafe#config;1.3.0 in central
    found org.rocksdb#rocksdbjni;6.5.3 in central
    found com.amazonaws#aws-java-sdk-bundle;1.11.603 in central
    found com.github.universal-automata#liblevenshtein;3.0.0 in central
    found com.google.code.findbugs#annotations;3.0.1 in central
    found net.jcip#jcip-annotations;1.0 in central
    found com.google.code.findbugs#jsr305;3.0.1 in central
    found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
    found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
    found com.google.code.gson#gson;2.3 in central
    found it.unimi.dsi#fastutil;7.0.12 in central
    found org.projectlombok#lombok;1.16.8 in central
    found org.slf4j#slf4j-api;1.7.21 in central
    found com.navigamez#greex;1.0 in central
    found dk.brics.automaton#automaton;1.11-8 in central
    found org.json4s#json4s-ext_2.11;3.5.3 in central
    found joda-time#joda-time;2.9.5 in central
    found org.joda#joda-convert;1.8.1 in central
    found com.johnsnowlabs.nlp#tensorflow-cpu_2.11;0.2.2 in central
    found net.sf.trove4j#trove4j;3.0.3 in central
:: resolution report :: resolve 710ms :: artifacts dl 18ms
    :: modules in use:
    com.amazonaws#aws-java-sdk-bundle;1.11.603 from central in [default]
    com.github.universal-automata#liblevenshtein;3.0.0 from central in [default]
    com.google.code.findbugs#annotations;3.0.1 from central in [default]
    com.google.code.findbugs#jsr305;3.0.1 from central in [default]
    com.google.code.gson#gson;2.3 from central in [default]
    com.google.protobuf#protobuf-java;3.0.0-beta-3 from central in [default]
    com.google.protobuf#protobuf-java-util;3.0.0-beta-3 from central in [default]
    com.johnsnowlabs.nlp#spark-nlp-spark23_2.11;3.0.0 from central in [default]
    com.johnsnowlabs.nlp#tensorflow-cpu_2.11;0.2.2 from central in [default]
    com.navigamez#greex;1.0 from central in [default]
    com.typesafe#config;1.3.0 from central in [default]
    dk.brics.automaton#automaton;1.11-8 from central in [default]
    it.unimi.dsi#fastutil;7.0.12 from central in [default]
    joda-time#joda-time;2.9.5 from central in [default]
    net.jcip#jcip-annotations;1.0 from central in [default]
    net.sf.trove4j#trove4j;3.0.3 from central in [default]
    org.joda#joda-convert;1.8.1 from central in [default]
    org.json4s#json4s-ext_2.11;3.5.3 from central in [default]
    org.projectlombok#lombok;1.16.8 from central in [default]
    org.rocksdb#rocksdbjni;6.5.3 from central in [default]
    org.slf4j#slf4j-api;1.7.21 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   21  |   0   |   0   |   0   ||   21  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
    confs: [default]
    0 artifacts copied, 21 already retrieved (0kB/15ms)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/06/08 04:59:58 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.johnsnowlabs.nlp_spark-nlp-spark23_2.11-3.0.0.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.typesafe_config-1.3.0.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/org.rocksdb_rocksdbjni-6.5.3.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.amazonaws_aws-java-sdk-bundle-1.11.603.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.github.universal-automata_liblevenshtein-3.0.0.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.navigamez_greex-1.0.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/org.json4s_json4s-ext_2.11-3.5.3.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.johnsnowlabs.nlp_tensorflow-cpu_2.11-0.2.2.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/net.sf.trove4j_trove4j-3.0.3.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.google.code.findbugs_annotations-3.0.1.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.google.protobuf_protobuf-java-util-3.0.0-beta-3.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.google.protobuf_protobuf-java-3.0.0-beta-3.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/it.unimi.dsi_fastutil-7.0.12.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/org.projectlombok_lombok-1.16.8.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/org.slf4j_slf4j-api-1.7.21.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/net.jcip_jcip-annotations-1.0.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.google.code.findbugs_jsr305-3.0.1.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/com.google.code.gson_gson-2.3.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/dk.brics.automaton_automaton-1.11-8.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/joda-time_joda-time-2.9.5.jar added multiple times to distributed cache.
21/06/08 05:00:05 WARN Client: Same path resource file:///root/.ivy2/jars/org.joda_joda-convert-1.8.1.jar added multiple times to distributed cache.
maziyarpanahi commented 3 years ago

Perfect, and if you run the snippet code inside this interactive shell?

from sparknlp.annotator import *
from sparknlp.base import *

document_assembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence") \
    .setLazyAnnotator(False)

embeddings = BertSentenceEmbeddings.pretrained() \
      .setInputCols("sentence") \
      .setOutputCol("embeddings")

This helps us to see if the Livy is in question or there is json4s.jackson in this cluster that is not compatible with our Spark 2.3.x release

DanielOX commented 3 years ago

Yes, i ran its downloading the model and its taking time, i will share the complete logs with you within a minute

DanielOX commented 3 years ago

i got this exception

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.2.6.5.1175-1
      /_/

Using Python version 3.6.8 (default, Nov 16 2020 16:55:22)
SparkSession available as 'spark'.
>>> from sparknlp.annotator import *
>>> from sparknlp.base import *
>>> 
>>> document_assembler = DocumentAssembler()\
...     .setInputCol("text")\
...     .setOutputCol("document")
>>> 
>>> sentence_detector = SentenceDetector() \
...     .setInputCols(["document"]) \
...     .setOutputCol("sentence") \
...     .setLazyAnnotator(False)
>>> 
>>> embeddings = BertSentenceEmbeddings.pretrained() \
...       .setInputCols("sentence") \
...       .setOutputCol("embeddings")
sent_small_bert_L2_768 download started this may take some time.
21/06/08 05:00:23 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
Approximate size to download 139.6 MB
[ | ]sent_small_bert_L2_768 download started this may take some time.
java.lang.NoSuchMethodError: com.amazonaws.services.s3.AmazonS3Client.doesObjectExist(Ljava/lang/String;Ljava/lang/String;)Z
    at com.johnsnowlabs.nlp.pretrained.S3ResourceDownloader$$anonfun$download$1.apply(S3ResourceDownloader.scala:99)
    at com.johnsnowlabs.nlp.pretrained.S3ResourceDownloader$$anonfun$download$1.apply(S3ResourceDownloader.scala:95)
    at scala.Option.flatMap(Option.scala:171)
    at com.johnsnowlabs.nlp.pretrained.S3ResourceDownloader.download(S3ResourceDownloader.scala:94)
    at com.johnsnowlabs.nlp.pretrained.ResourceDownloader$$anonfun$9.apply(ResourceDownloader.scala:314)
    at com.johnsnowlabs.nlp.pretrained.ResourceDownloader$$anonfun$9.apply(ResourceDownloader.scala:313)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
DanielOX commented 3 years ago

but the downloading loader is still rotating after the exception

DanielOX commented 3 years ago

@maziyarpanahi It's been an hour since the model is being stuck at downloading

[ | ]sent_small_bert_L2_768 download started this may take some time.
[ / ]Approximate size to download 139.6 MB
[ \ ]java.lang.NoSuchMethodError: com.amazonaws.services.s3.AmazonS3Client.doesObjectExist(Ljava/lang/String;Ljava/lang/String;)Z
    at com.johnsnowlabs.nlp.pretrained.S3ResourceDownloader$$anonfun$download$1.apply(S3ResourceDownloader.scala:99)
    at com.johnsnowlabs.nlp.pretrained.S3ResourceDownloader$$anonfun$download$1.apply(S3ResourceDownloader.scala:95)
    at scala.Option.flatMap(Option.scala:171)
    at com.johnsnowlabs.nlp.pretrained.S3ResourceDownloader.download(S3ResourceDownloader.scala:94)
    at com.johnsnowlabs.nlp.pretrained.ResourceDownloader$$anonfun$9.apply(ResourceDownloader.scala:314)
    at com.johnsnowlabs.nlp.pretrained.ResourceDownloader$$anonfun$9.apply(ResourceDownloader.scala:313)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[ / ]
maziyarpanahi commented 3 years ago

@DanielOX Yeah that crashed immediately. The model is around 20-40MB so it takes 10 seconds to download it.

If you don't mind doing another test it would be great:

In the same node, let's launch the pyspark shell via Fat JAR:

pyspark \
--master "yarn" \
--deploy-mode "client" \
--driver-memory 16g \
--executor-cores 5 \
--executor-memory 5g \
--conf spark.kryoserializer.buffer.max=2000M \
--jars https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-spark23-assembly-3.0.0.jar

The we can run this code:

from sparknlp.annotator import *
from sparknlp.base import *

document_assembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentence_detector = SentenceDetector() \
    .setInputCols(["document"]) \
    .setOutputCol("sentence") \
    .setLazyAnnotator(False)

embeddings = BertSentenceEmbeddings.pretrained() \
      .setInputCols("sentence") \
      .setOutputCol("embeddings")

I would like to see if the incompatibility with your existing dependencies in your environment such as amazon/jackson can be fixed by using a Fat JAR that has everything inside already.

Many thanks

DanielOX commented 3 years ago

@maziyarpanahi okay gimme a minute

maziyarpanahi commented 3 years ago

@DanielOX Thank you. After that, we will try the .load() offline to narrow down the issue just to the online download which uses AWS and JACKSON.

DanielOX commented 3 years ago

@maziyarpanahi okay, btw this one taking time on starting shell but no error yet, the file 378mbs, so i think that might be taking time

maziyarpanahi commented 3 years ago

@maziyarpanahi okay, btw this one taking time on starting shell but no error yet, the file 378mbs, so i think that might be taking time

Yes, if this way works, then what I usually do myself is putting it on a HDFS and then I put it in config as --jars hdfs:///tmp/spark-nlp-spark23-assembly-3.0.0.jar it's super fast and spark skips even copying it on HDFS since it's already there. If this fails too then we will download, extract, and push the model on HDFS for offline use via .load() and we can move the jar there too.

DanielOX commented 3 years ago

@maziyarpanahi it worked this time


 pyspark \
> --master "yarn" \
> --deploy-mode "client" \
> --driver-memory 16g \
> --executor-cores 5 \
> --executor-memory 5g \
> --conf spark.kryoserializer.buffer.max=2000M \
> --jars https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-spark23-assembly-3.0.0.jar
SPARK_MAJOR_VERSION is set to 2, using Spark2
Picked up _JAVA_OPTIONS: -Xmx8192m
Python 3.6.8 (default, Nov 16 2020, 16:55:22) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Picked up _JAVA_OPTIONS: -Xmx8192m
Picked up _JAVA_OPTIONS: -Xmx8192m

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/06/08 07:59:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.2.6.5.1175-1
      /_/

Using Python version 3.6.8 (default, Nov 16 2020 16:55:22)
SparkSession available as 'spark'.
>>> 
>>> from sparknlp.annotator import *
>>> from sparknlp.base import *
>>> 
>>> document_assembler = DocumentAssembler()\
...     .setInputCol("text")\
...     .setOutputCol("document")
>>> 
>>> sentence_detector = SentenceDetector() \
...     .setInputCols(["document"]) \
...     .setOutputCol("sentence") \
...     .setLazyAnnotator(False)
>>> 
>>> embeddings = BertSentenceEmbeddings.pretrained() \
...       .setInputCols("sentence") \
...       .setOutputCol("embeddings")
sent_small_bert_L2_768 download started this may take some time.
21/06/08 08:00:21 WARN SparkSession$Builder: Using an existing SparkSession; some configuration may not take effect.
Approximate size to download 139.6 MB
[ | ]sent_small_bert_L2_768 download started this may take some time.
[ / ]Approximate size to download 139.6 MB
[ \ ]Download done! Loading the resource.
[ / ]2021-06-08 08:04:11.447145: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN)to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
maziyarpanahi commented 3 years ago

Fabulous! This means it downloaded, extracted, and loaded successfully!

So the issue is as follow:

You can find Fat JARs at the bottom of each release: https://github.com/JohnSnowLabs/spark-nlp/releases

Also, you can put that Fat JAR somewhere in your HDFS which speeds up Spark session creation by avoiding downloading it and not copying it on HDFS.

maziyarpanahi commented 3 years ago

So all it's left is trying it via Livy to see if Jackson will be fine or it's gonna fail:

spark = SparkSession.builder\
    .master("yarn")\
    .config("spark.locality.wait", "0")\
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars", "https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-spark23-assembly-3.0.0.jar")\
    .config("spark.sql.autoBroadcastJoinThreshold", -1)\
    .config("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")\
    .getOrCreate()
DanielOX commented 3 years ago

@maziyarpanahi I was literally stuck on this problem for 2 days, didn't know the FAT Jar was the answer. I really appreciate your support. from here i can take my script onwards.

maziyarpanahi commented 3 years ago

You are very welcome! Have fun with Spark and Spark NLP 🚀

DanielOX commented 3 years ago

@maziyarpanahi much love and blessings to SPARK-NLP Team

DanielOX commented 3 years ago

Hi @maziyarpanahi, Sorry for re-opening the issue but the issue with livy still persists, i am doing the above mentioned but it's giving me the same error

: java.lang.NoSuchMethodError: org.json4s.jackson.JsonMethods$.parse$default$3()Z
spark = SparkSession.builder\
    .master("yarn")\
    .config("spark.locality.wait", "0")\
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars", "https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-spark23-assembly-3.0.0.jar")\
    .config("spark.sql.autoBroadcastJoinThreshold", -1)\
    .config("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")\
    .getOrCreate()

I also saved the spark-nlp-spark23-assembly-3.0.0.jar file into HDFS and hard-coded SPARK_SUBMIT_OPTIONS=--jars hdfs://tmp/spark-nlp-spark23-assembly-3.0.0.jar on every worker in .bashrc.

maziyarpanahi commented 3 years ago

Hi @DanielOX Yes, as I mentioned in my last comment, if Livy gives you that error it means Livy is loading other packages/jars that have org.json4s.jackson dependencies in the classpath and it's not compatible with Spark/Spark NLP.

Test 1:

Add the following to enforce it in SparkSession:

spark = SparkSession.builder\
    .master("yarn")\
    .config("spark.locality.wait", "0")\
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars", "https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-spark23-assembly-3.0.0.jar")\
    .config("spark.jars.packages", "org.json4s:json4s-jackson_2.11:3.5.3")\
    .config("spark.sql.autoBroadcastJoinThreshold", -1)\
    .config("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")\
    .getOrCreate()

If this failed, I would look very carefully into Livy configs and every single jar and packages being set to be loaded once Livy is created. You might be able to see what doesn't look right, what seems to be for Spark 2.4.x but is being loaded in your Spark 2.3.x, etc. In fact, if possible, maybe just comment on them all or remove them to not load anything at all from Ambari UI to see if it helps and then narrowing down to the issue by adding them back one by one.

DanielOX commented 3 years ago

okay @maziyarpanahi , let me try and get back to you

DanielOX commented 3 years ago

@maziyarpanahi It gave the same error,

below is the screenshot of loaded configuration

screenshot-snamenode rapidev ae_8042-2021 06 09-15_40_36 (1)

maziyarpanahi commented 3 years ago

It may not be in the logs, it can be in the config files, some libraries in the classpath are loaded. I would look into Livy conf file and Livy version itself etc.

Unfortunately, I am not sure if your Livy is compatible. The very last option is to:

Hopefully, what is needed inside Spark NLP stays there and the rest in your Livy won't have any conflict. (🤞)

DanielOX commented 3 years ago

Thanks @maziyarpanahi , i will try and let you know.

DanielOX commented 3 years ago

@maziyarpanahi, i will try to download and save the pretrained pipeline on HDFS and load it using PipelineModel to see if that works. My guess is org.json4s version is the bottleneck here. Thanks for the support @maziyarpanahi again.

DanielOX commented 3 years ago

@maziyarpanahi the procedure worked, it loaded the model in RAM from HDFS, although it took sometime but it returned the error

"/hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0516/container_e199_1623058160826_0516_01_000001/pyspark.zip/pyspark/sql/dataframe.py", line 350, in show
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0516/container_e199_1623058160826_0516_01_000001/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0516/container_e199_1623058160826_0516_01_000001/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0516/container_e199_1623058160826_0516_01_000001/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o432.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.0 failed 4 times, most recent failure: Lost task 0.3 in stage 32.0 (TID 859, worker.local.com, executor 7): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$dfAnnotate$1: (array<array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>>) => array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: org.tensorflow.TensorFlowException: Unsuccessful TensorSliceReader constructor: Failed to find any matching files for /hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0516/container_e199_1623058160826_0516_01_000008/tmp/b29de3b9c8fa_ner8143839515809426224/variables
     [[{{node save/RestoreV2}}]]
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at com.johnsnowlabs.nlp.embeddings.BertSentenceEmbeddings.getModelIfNotSet(BertSentenceEmbeddings.scala:195)
    at com.johnsnowlabs.nlp.embeddings.BertSentenceEmbeddings.annotate(BertSentenceEmbeddings.scala:233)
    at com.johnsnowlabs.nlp.AnnotatorModel$$anonfun$dfAnnotate$1.apply(AnnotatorModel.scala:35)
    at com.johnsnowlabs.nlp.AnnotatorModel$$anonfun$dfAnnotate$1.apply(AnnotatorModel.scala:34)
    ... 19 more
Caused by: org.tensorflow.TensorFlowException: Unsuccessful TensorSliceReader constructor: Failed to find any matching files for /hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0516/container_e199_1623058160826_0516_01_000008/tmp/b29de3b9c8fa_ner8143839515809426224/variables
     [[{{node save/RestoreV2}}]]
    at org.tensorflow.Session.run(Native Method)
    at org.tensorflow.Session.access$100(Session.java:48)
    at org.tensorflow.Session$Runner.runHelper(Session.java:326)
    at org.tensorflow.Session$Runner.run(Session.java:276)
    at com.johnsnowlabs.ml.tensorflow.TensorflowWrapper$.read(TensorflowWrapper.scala:325)
    at com.johnsnowlabs.ml.tensorflow.TensorflowWrapper.readObject(TensorflowWrapper.scala:248)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$8.apply(TorrentBroadcast.scala:308)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:309)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$apply$2.apply(TorrentBroadcast.scala:235)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:211)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1343)
    ... 28 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:363)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2484)
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2698)
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:254)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$dfAnnotate$1: (array<array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>>) => array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more
Caused by: java.io.IOException: org.tensorflow.TensorFlowException: Unsuccessful TensorSliceReader constructor: Failed to find any matching files for /hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0516/container_e199_1623058160826_0516_01_000008/tmp/b29de3b9c8fa_ner8143839515809426224/variables
     [[{{node save/RestoreV2}}]]
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1350)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:207)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:66)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:66)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:96)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at com.johnsnowlabs.nlp.embeddings.BertSentenceEmbeddings.getModelIfNotSet(BertSentenceEmbeddings.scala:195)
    at com.johnsnowlabs.nlp.embeddings.BertSentenceEmbeddings.annotate(BertSentenceEmbeddings.scala:233)
    at com.johnsnowlabs.nlp.AnnotatorModel$$anonfun$dfAnnotate$1.apply(AnnotatorModel.scala:35)
    at com.johnsnowlabs.nlp.AnnotatorModel$$anonfun$dfAnnotate$1.apply(AnnotatorModel.scala:34)
    ... 19 more
Caused by: org.tensorflow.TensorFlowException: Unsuccessful TensorSliceReader constructor: Failed to find any matching files for /hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0516/container_e199_1623058160826_0516_01_000008/tmp/b29de3b9c8fa_ner8143839515809426224/variables
     [[{{node save/RestoreV2}}]]
    at org.tensorflow.Session.run(Native Method)
    at org.tensorflow.Session.access$100(Session.java:48)
    at org.tensorflow.Session$Runner.runHelper(Session.java:326)
    at org.tensorflow.Session$Runner.run(Session.java:276)
    at com.johnsnowlabs.ml.tensorflow.TensorflowWrapper$.read(TensorflowWrapper.scala:325)
    at com.johnsnowlabs.ml.tensorflow.TensorflowWrapper.readObject(TensorflowWrapper.scala:248)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1909)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2018)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1942)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1808)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1353)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:373)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$8.apply(TorrentBroadcast.scala:308)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:309)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1$$anonfun$apply$2.apply(TorrentBroadcast.scala:235)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:211)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1343)

From what i can infer is that, livy is messing up some tensorflow files, which are being cached by spark for later use.

maziyarpanahi commented 3 years ago

Yes, it seems that explicitly you asking for hdfs in your spark applications works, but by default for hdfs and local fileSystem it really goes on its own. I used to use Livy with Cloudera 5 and Spark NLP before ditching it in 6.x since multi-user Zeppelin wasn't really stable on Livy. I can look again to see and hoping I fave the same issue so we can find a way. But in the meantime, spark-shell, spark-submit and pyspark should be fine as we tested

DanielOX commented 3 years ago

Yes @maziyarpanahi the rest is working great. I found a catch, if u can verify ( as you are the expert ), when i try to update the conf in the spark script, using like this

spark = SparkSession.builder\
    .master("yarn")\
    .config("spark.locality.wait", "0")\
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars", "https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/jars/spark-nlp-spark23-assembly-3.0.0.jar")\
    .config("spark.jars.packages", "org.json4s:json4s-jackson_2.11:3.5.3")\
    .config("spark.sql.autoBroadcastJoinThreshold", -1)\
    .config("spark.sql.codegen.aggregate.map.twolevel.enabled", "false")\
    .getOrCreate()

the YARN didn't override the settings but when i performed the same using Livy conf parameter, it overrides. Now i have changed the staggingDir to the HDFS and container caching to HDFS. I ran the script now, let's see what happens. Fingers crossed!

DanielOX commented 3 years ago

@maziyarpanahi man, it loads the model from HDFS now but when if performed .transform(df) it throws

Caused by: org.tensorflow.TensorFlowException: Unsuccessful TensorSliceReader constructor: Failed to find any matching files for /hadoop/yarn/local/usercache/livy/appcache/application_1623058160826_0516/container_e199_1623058160826_0516_01_000008/tmp/b29de3b9c8fa_ner8143839515809426224/variables

I am stuck on both sides, i can't get it work by

embeddings = BertSentenceEmbeddings.pretrained("labse", "xx") \
      .setInputCols("sentence") \
      .setOutputCol("embeddings")

nor with loading model from HDFS

maziyarpanahi commented 3 years ago

Yes, it's the same, the Livy is taking over the local fileSystem where we put stuff for /tmp. I am afraid not much can be done at this point since the Spark NLP works with your cluster without any issue, but this Livy on Spark 2.3 can be a bug, a bad config or something that is missing. I've experienced this some time ago in EMR Jupyter but it was an impersonation bug by Livy.

DanielOX commented 3 years ago

@maziyarpanahi after an extensive research, i would like to propose 2 possible solution

NFS

Since livy is storing usercache/filecache/appcache locally on each executors, so there is an attribute yarn.nodemanager.local-dir which spark aliases as spark.local.dir. Upon further research spark.local.dir doesnot supports hdfs:/// or file:/// protocol, one has to specifiy local-path starting with /. The proposed solution here is NFS, create a shared directory and mount hdfs on it, on each executor. So, the path will be local path as well as the cached files will stored on local.

Hugging Face

It provides a lot of pretrained models, at first u load the load in an arbitrary variable and broadcast it. later u can use in udf or rdd.map() for it to take effect

@maziyarpanahi thanks for your amazing support. I think we should close this issue, since it's the problem with livy not spark-nlp.

Thanks.