JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.88k stars 712 forks source link

NerDLApproach doesnt work with HDFS path for graph folder #13198

Closed kuopching closed 1 year ago

kuopching commented 1 year ago

Description

I am training NerDL model using spark-nlp 4.2.4 in Spark Standalone mode with 1 worker. I am not able to acces custom graph in hdfs storage. When i try: NerDLApproach nerTagger = new NerDLApproach(); nerTagger.setGraphFolder("hdfs://localhost:19000/graph");

then I get error: ERROR Instrumentation: java.lang.IllegalArgumentException: Pathname /C:/Users/USER1/AppData/Local/Temp/sparknlp_tmp_11909547575733209894/graph from C:/Users/USER1/AppData/Local/Temp/sparknlp_tmp_11909547575733209894/graph is not a valid DFS filename.

Expected Behavior

NerDLApproach().setGraphFolder should be able to access custom graphs in the HDFS file system

Current Behavior

Loading graph from local filesystem works with "embedded spark" - com.johnsnowlabs.nlp.SparkNLP.start(). Loading training dataset and WordEmbeding from hdfs works.

Loading custom graph from hdfs doesnt work as expected. Folder /graph contains file blstm_17_300_128_237.pb

Exception is thrown:

ERROR Instrumentation: java.lang.IllegalArgumentException: Pathname /C:/Users/USER1/AppData/Local/Temp/sparknlp_tmp_11909547575733209894/graph from C:/Users/USER1/AppData/Local/Temp/sparknlp_tmp_11909547575733209894/graph is not a valid DFS filename.
    at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:257)
    at org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.<init>(DistributedFileSystem.java:1269)
    at org.apache.hadoop.hdfs.DistributedFileSystem$DirListingIterator.<init>(DistributedFileSystem.java:1256)
    at org.apache.hadoop.hdfs.DistributedFileSystem$25.doCall(DistributedFileSystem.java:1201)
    at org.apache.hadoop.hdfs.DistributedFileSystem$25.doCall(DistributedFileSystem.java:1197)
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    at org.apache.hadoop.hdfs.DistributedFileSystem.listLocatedStatus(DistributedFileSystem.java:1215)
    at org.apache.hadoop.fs.FileSystem.listLocatedStatus(FileSystem.java:2162)
    at org.apache.hadoop.fs.FileSystem$5.<init>(FileSystem.java:2288)
    at org.apache.hadoop.fs.FileSystem.listFiles(FileSystem.java:2285)
    at com.johnsnowlabs.nlp.util.io.ResourceHelper$.listLocalFiles(ResourceHelper.scala:646)
    at com.johnsnowlabs.nlp.annotators.ner.dl.WithGraphResolver.$anonfun$getFiles$3(NerDLApproach.scala:691)
    at scala.Option.map(Option.scala:230)
    at com.johnsnowlabs.nlp.annotators.ner.dl.WithGraphResolver.getFiles(NerDLApproach.scala:689)
    at com.johnsnowlabs.nlp.annotators.ner.dl.WithGraphResolver.searchForSuitableGraph(NerDLApproach.scala:605)
    at com.johnsnowlabs.nlp.annotators.ner.dl.WithGraphResolver.searchForSuitableGraph$(NerDLApproach.scala:599)
    at com.johnsnowlabs.nlp.annotators.ner.dl.NerDLApproach$.searchForSuitableGraph(NerDLApproach.scala:705)
    at com.johnsnowlabs.nlp.annotators.ner.dl.NerDLApproach.train(NerDLApproach.scala:530)
    at com.johnsnowlabs.nlp.annotators.ner.dl.NerDLApproach.train(NerDLApproach.scala:180)
    at com.johnsnowlabs.nlp.AnnotatorApproach._fit(AnnotatorApproach.scala:69)
    at com.johnsnowlabs.nlp.AnnotatorApproach.fit(AnnotatorApproach.scala:75)
    at org.apache.spark.ml.Pipeline.$anonfun$fit$5(Pipeline.scala:151)
    at org.apache.spark.ml.MLEvents.withFitEvent(events.scala:130)
    at org.apache.spark.ml.MLEvents.withFitEvent$(events.scala:123)
    at org.apache.spark.ml.util.Instrumentation.withFitEvent(Instrumentation.scala:42)
    at org.apache.spark.ml.Pipeline.$anonfun$fit$4(Pipeline.scala:151)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.ml.Pipeline.$anonfun$fit$2(Pipeline.scala:147)
    at org.apache.spark.ml.MLEvents.withFitEvent(events.scala:130)
    at org.apache.spark.ml.MLEvents.withFitEvent$(events.scala:123)
    at org.apache.spark.ml.util.Instrumentation.withFitEvent(Instrumentation.scala:42)
    at org.apache.spark.ml.Pipeline.$anonfun$fit$1(Pipeline.scala:133)
    at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
    at scala.util.Try$.apply(Try.scala:213)
    at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
    at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:133)

Possible Solution

Steps to Reproduce

SparkSession spark =SparkSession.builder().appName("Spark NLP")
.master("spark://192.168.2.151:7077")
.config("spark.driver.memory", "1024")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryoserializer.buffer.max", "2000M")
.config("spark.driver.maxResultSize", "0")
.config("spark.jars", "hdfs://localhost:19000/tmp/spark-nlp-assembly-4.2.4.jar")
.config("spark.jsl.settings.pretrained.cache_folder","hdfs://localhost:19000/cache")
.config("spark.jsl.settings.annotator.log_folder", "log")
.config("spark.submit.deployMode", "cluster")
.config("spark.jsl.settings.storage.cluster_tmp_dir", "/tmp").getOrCreate();

SparkContext sc = spark.sparkContext();
sc.hadoopConfiguration().set("fs.defaultFS", "hdfs://localhost:19000");
sc.hadoopConfiguration().set("hadoop.http.staticuser.user", "root");
sc.hadoopConfiguration().set("dfs.client.use.datanode.hostname", "true");
sc.hadoopConfiguration().set("dfs.datanode.use.datanode.hostname", "true");
sc.hadoopConfiguration().set("dfs.namenode.datanode.registration.ip-hostname-check", "false");
sc.hadoopConfiguration().set("dfs.permissions.enabled", "true");
sc.hadoopConfiguration().set("dfs.replication", "1");

CoNLL conll =  new CoNLL("document", "sentence", "token", "pos",3,1,"text","label",true," ");
Dataset<?> ds = conll.readDataset(spark, "hdfs://localhost:19000/conll.txt", ReadAs.TEXT().toString(), 0, null);

WordEmbeddingsModel.load("hdfs://localhost:19000/w2v_cc_300d_cs_3.4.1_3.0_1647292556306");

DocumentAssembler document = new DocumentAssembler();
document.setInputCol("text");
document.setOutputCol("document");

SentenceDetector sentenceDetector = new SentenceDetector();
sentenceDetector.setInputCols(new String[]{"document"});
sentenceDetector.setOutputCol("sentence");

Tokenizer tokenizer = new Tokenizer();
tokenizer.setInputCols(new String[]{"sentence"});
tokenizer.setOutputCol("token");

WordEmbeddingsModel wem = new WordEmbeddingsModel();
wem.setInputCols(new String[] {"document", "token"});
wem.setOutputCol("embeddings");
wem.setStorageRef("w2v_cc_300d_cs");
wem.setDimension(300);

NerDLApproach nerTagger = new NerDLApproach();
nerTagger.setInputCols(new String[] {"sentence", "token", "embeddings"});
nerTagger.setLabelColumn("label");
nerTagger.setOutputCol("ner");
nerTagger.setMaxEpochs(2);
nerTagger.setGraphFolder("hdfs://localhost:19000/graph"); 
nerTagger.setLr(0.003f);
nerTagger.setBatchSize(32);
nerTagger.setRandomSeed(0);
nerTagger.setVerbose(2);
nerTagger.setValidationSplit(0.2f);
nerTagger.setEvaluationLogExtended(true); 
nerTagger.setEnableOutputLogs(true);
nerTagger.setIncludeConfidence(true);
nerTagger.setOutputLogsPath("ner_logs");

Pipeline pipeline = new Pipeline();
pipeline.setStages(new PipelineStage[] {document,sentenceDetector,tokenizer,wem, nerTagger});
PipelineModel ner_model = pipeline.fit(ds);

Context

Unable to train an NerDL model with custom graph

Your Environment

maziyarpanahi commented 1 year ago

Hi @kuopching

Could you please share a screenshot for the Spark UI Executors tab? I have a feeling you launch your Spark application from a local machine to a remote cluster, but the Driver becomes the local machine. All the training happens in the Driver, so when it is going to copy/move data it will do it locally while the graph is remote actually.

This is certainly an edge case, but your Spark UI screenshots will help us to set up the env to reproduce this better.

PS: I forgot to ask, we also need a complete description of your env, we see the IP address, localhost, etc. Is this a YARN cluster in a Docker while you are launching your Spark app from a local Windows? Would be great if we can have all the info to reproduce it, this is a very unique case.

kuopching commented 1 year ago

Hi @maziyarpanahi , thank you for quick response.

Spark UI screenshot: sparknlp-issue

I'am starting spark cluster manually without YARN. I'm launching Spark/Hadoop from local Windows machine with cmd:

C:\spark\bin>spark-class org.apache.spark.deploy.master.Master --host 192.168.2.151
C:\spark\bin>spark-class org.apache.spark.deploy.worker.Worker 192.168.2.151
C:\hadoop>sbin\start-dfs.cmd

Spark conf: default from clean installation

Hadoop conf: core-site.xml

<configuration>
    <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:19000</value>
    </property>
</configuration>
maziyarpanahi commented 1 year ago

Thanks for the extra information. So this is not really a cluster, it's more of a master/slave situation. I am not sure if this is possible in a not managed cluster. (if it was managed like YARN or K8s this would work as it has all the info).

Since it's a standalone cluster and the training (Approach) happens in the Driver, I think it is confusing the Driver/App being your local machine when it is trying to do some File related ops. Is it possible to have your App/Driver also part of the cluster (like YARN cluster-mode)?

PS: I forgot again to ask, what is 192.168.2.151? Is this a VM? And how does a worker in 192.168.2.151 have access to hdfs://localhost:19000? (shouldn't fs.defaultFS be accessible from all the nodes?)

kuopching commented 1 year ago

192.168.2.151 is IP of my local machine I replaced localhost with 192.168.2.151 in code and config as well and behavior is the same.

I am not sure if this is possible in a not managed cluster

Is it difference between loading graph and loading dataset/wordembedding from hdfs?

Is it possible to have your App/Driver also part of the cluster (like YARN cluster-mode)?

Can you please elaborate this more? Now I'm launching Spark NLP java project from IDE.

maziyarpanahi commented 1 year ago

192.168.2.151 is IP of my local machine I replaced localhost with 192.168.2.151 in code and config as well and behavior is the same.

So this is just a standalone Spark like a local mode local[*] since there is only 1 machine. Why having master and worker set and have .config("spark.submit.deployMode", "cluster")? (it's not really a cluster in a single machine)

I am not sure if this is possible in a not managed cluster

Is it difference between loading graph and loading dataset/wordembedding from hdfs?

Yes, in those cases you are using a native Apache Spark .read and .load which we extended them natively. The Graph file, we have to list that directory, copy it locally, and then read it.

It should be still possible if we tweak a few things, but I want to be sure I understand the use case as why set master/worker and cluster as deplyMode? What's the advantage if everything is in the single machine?

kuopching commented 1 year ago

Yes, You are right. It is not real cluster so far. The cluster I mean cluster of SPARK workers. The goal of this exercise is only POC/testing for now. I'm trying to get it up and running and then spread it to other machines. Add more workers. Anyway I changed .config("spark.submit.deployMode", "cluster") to.config("spark.submit.deployMode", "client")

kuopching commented 1 year ago

Hi, do you need more information from me?

maziyarpanahi commented 1 year ago

Anyway I changed .config("spark.submit.deployMode", "cluster") to.config("spark.submit.deployMode", "client")

Sorry, I thought this was a positive response on your part. But I am assuming deployMode is only available for YARN and K8 clusters and not for master/worker setup.

Would it be possible to use something more realistic for a cluster like https://github.com/maziyarpanahi/docker-spark-yarn-cluster? I use this for cluster development from my local machine. (not exactly how you launch the app though)

It would be great if you can test this and see if you encounter a similar issue, this way we can easily reproduce it.

kuopching commented 1 year ago

Hi @maziyarpanahi , I can confirm that your setup works like charm. Loading graph from hdfs works. Obviously error was the setup thing on my side.

Thank you

maziyarpanahi commented 1 year ago

@kuopching I am glad that worked out and thanks for confirming this, I appreciate it.