JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.87k stars 712 forks source link

ClassifierDL Model save leads to Unsuccessful TensorSliceReader constructor error #1173

Closed demongolem closed 3 years ago

demongolem commented 3 years ago

The ClassifierDL is buggy when it comes to either saving models or using them to predict on test data. Issue #857, provides a workaround to a related problem, but this workaround does not work and this bug really is a bug on the workaround to that issue.

Description

Train a pipeline with embeddings and the ClassiferDL. Either save that model or try to use it directly to predict on test data and you will get the same error. This is particular to ClassifierDL and ClassifierDL which has embeddings in it. It appears that variables are stored in local storage and unavailable at some point during the save process.

Expected Behavior

The model should be saved to dbfs storage

Current Behavior

java.io.IOException: org.tensorflow.TensorFlowException: Unsuccessful TensorSliceReader constructor: Failed to find any matching files for /local_disk0/tmp/64d888947012_ner823401808093088544/variables

Possible Solution

See #857

Steps to Reproduce

Train a pipeline with embeddings and the ClassiferDL. Either save that model or try to use it directly to predict on test data and you will get the same error


import joblib
import os
import pickle
import pyspark
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
import sklearn
import sparknlp
from sparknlp.annotator import *
from sparknlp.base import *

spark.conf.set('spark.serializer', 'org.apache.spark.serializer.KryoSerializer')

# COMMAND ----------

# 1 get data into dataframe

# COMMAND ----------

# File location and type
train_file_location = "/FileStore/tables/train.csv"
train_file_type = "csv"
test_file_location = "/FileStore/tables/test.csv"
test_file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
multi_line = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_train = spark.read.format(train_file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiLine", multi_line) \
  .option("escape", '\"') \
  .load(train_file_location)
df_test = spark.read.format(test_file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .option("multiLine", multi_line) \
  .option("escape", '\"') \
  .load(test_file_location)

category_label_encoder_file_location = "dbfs:/cat_encoder.joblib"
cat_df = spark.read.format("binaryFile").option("pathGlobFilter", "*.joblib").load(category_label_encoder_file_location)
class_names = pickle.loads(cat_df.first().content)

# COMMAND ----------

# 2 bert version
document_assembler = DocumentAssembler()\
  .setInputCol("text")\
  .setOutputCol("document")

tokenizer = Tokenizer().setInputCols(["document"])\
  .setOutputCol("token")

use = UniversalSentenceEncoder.pretrained() \
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")

word_embeddings = BertEmbeddings.pretrained('bert_base_cased', 'en')\
  .setInputCols(["document", "token"])\
  .setOutputCol("embeddings")

document_classifier = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings", "embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("category_int\r")\
  .setMaxEpochs(5)\
  .setEnableOutputLogs(True)

bert_pipeline = Pipeline().setStages(
  [
    document_assembler,
    tokenizer,
    use,
    word_embeddings,
    document_classifier
  ]
)

# COMMAND ----------

# 3 train
print(df_train.columns)
nlp_model = bert_pipeline.fit(df_train)
# dbfs:/ or hdfs:/ if you are saving it on distributed file systems
nlp_model.stages[-1].write().overwrite().save('dbfs:/tmp_classifierDL_model')
#nlp_model.save('pac_model_db')

# COMMAND ----------

# 4 test
df_test.show()
processed = nlp_model.transform(df_test)
processed.show()

Context

I am trying to save a model, so that I can reload it and use it to classify on test data

Your Environment

maziyarpanahi commented 3 years ago

Hi,

Thanks for the detailed post, since you are using Databricks do you have Enable autoscaling local storage? https://github.com/JohnSnowLabs/spark-nlp#databricks-cluster

Any details you can share about your Cluster step, nodes, storage, etc. can be helpful to reproduce this issue on our side.

demongolem commented 3 years ago

Thanks @maziyarpanahi . I have seen that page, but it does trigger another pertinent point about my Databricks cluster. I am actually using the Community Edition.

"Free 15GB Memory: As a Community Edition user, your cluster will automatically terminate after an idle period of two hours. For more configuration options, please upgrade your Databricks subscription." So some configuration options likely are not available to me.

So when I go to look for "Enable autoscaling local storage" I cannot find it. Here is a page that is a visual guide for me where I see where the checkbox should be for that item: https://docs.databricks.com/clusters/configure.html. I see that there is an Autopilot Options section where this would be in that web link but there is no Autopilot Options section for me.

So to finish up the details, my Community Optimized cluster is 15.3 GB Memory, 2 cores, 1 DBU. I already have given you the versioning of products: Databricks 6.6, Apache Spark 2.4.5, Scala 2.11

maziyarpanahi commented 3 years ago

In that case Community Edition you don't have the known issue. That only happens in the cluster not in the community edition since there is only one machine.

I would try the following:

Have the followings in your cluster setup since you are using UniversalSentenceEncoder:

On a new cluster or existing one you need to add the following to the Advanced Options -> Spark tab:

spark.kryoserializer.buffer.max 1000M
spark.serializer org.apache.spark.serializer.KryoSerializer

The complete core:

# actual content is inside description column
document = DocumentAssembler()\
    .setInputCol("description")\
    .setOutputCol("document")

use = UniversalSentenceEncoder.pretrained() \
 .setInputCols(["document"])\
 .setOutputCol("sentence_embeddings")

# the classes/labels/categories are in category column
classsifierdl = ClassifierDLApproach()\
  .setInputCols(["sentence_embeddings"])\
  .setOutputCol("class")\
  .setLabelColumn("category")\
  .setMaxEpochs(5)\
  .setEnableOutputLogs(True)

pipeline = Pipeline(
    stages = [
        document,
        use,
        classsifierdl
    ])

pipelineModel = pipeline.fit(trainDataset)

# try both
# pipelineModel.stages[-1].write().overwrite().save('./new_classifierDL_model')
pipelineModel.stages[-1].write().overwrite().save('/FileStore/new_classifierDL_model')

Please make sure you restart your cluster in a fresh notebook to be sure nothing leftover is interfering. I have tested this in Community Edition and it works fine. When you are saving the model stages[-1] you are only saving the ClassifierDL not the embeddings. It is only a few MB model. That is why later you need to load it in a complete pipeline.

The complete example which comes from Databricks community edition (just in case): https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/jupyter/training/english/classification/ClassifierDL_Train_multi_class_news_category_classifier.ipynb

PS: why is your OS says Windows 10?

demongolem commented 3 years ago

Using the core code you gave me same results. However, when I deleted my cluster and restarted with the two lines in my cluster setup under the Spark tab, I was able to save to dbfs with the line pipelineModel.stages[-1].write().overwrite().save('dbfs:/new_classifierDL_model'). So it is those settings that fixed it for me.

Now onward to see if I can use that same model to classify my test data which was my original objective. But this part, the save function, is now resolved.

demongolem commented 3 years ago

So the same configuration also allowed me to do

processed = nlp_model.transform(df_test)
processed.show()

which had not been working before and thus the original #857 would not be an issue for me either any longer.

maziyarpanahi commented 3 years ago

Thanks for the update.

As a note, the UniversalSentenceEncoder models are very big so it needs those spark.config to be set before the SparkSession regardless of being local or in a cluster.