JohnSnowLabs / spark-nlp

State of the Art Natural Language Processing
https://sparknlp.org/
Apache License 2.0
3.85k stars 711 forks source link

Cannot cast to float #14162

Open ottermegazord opened 8 months ago

ottermegazord commented 8 months ago

Is there an existing issue for this?

Who can help?

@maz

What are you working on?

GTE Small EN 5.0.2 En

Current Behavior

Traceback (most recent call last): File "/path/to/task.py", line 94, in result = pipeline.fit(data).transform(data) File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 217, in transform File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/pipeline.py", line 278, in _transform File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/base.py", line 217, in transform File "/opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py", line 350, in _transform File "/opt/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/java_gateway.py", line 1309, in call File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco File "/opt/spark/python/lib/py4j-0.10.9.2-src.zip/py4j/protocol.py", line 326, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o111.transform. : java.lang.UnsupportedOperationException: Cannot cast to float version 3.2.0.3.2.2 at com.johnsnowlabs.util.Version.toFloat(Version.scala:36) at com.johnsnowlabs.nlp.util.SparkNlpConfig$.getEncoder(SparkNlpConfig.scala:28) at com.johnsnowlabs.nlp.AnnotatorModel._transform(AnnotatorModel.scala:69) at com.johnsnowlabs.nlp.AnnotatorModel.transform(AnnotatorModel.scala:130) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750)

Expected Behavior

Process should run

Steps To Reproduce

document = DocumentAssembler()\ .setInputCol("text")\ .setOutputCol("document")

tokenizer = Tokenizer()\ .setInputCols(["document"])\ .setOutputCol("token")

embeddings = BertEmbeddings.load("\path\to\local\model\gte_small_en", spark)\ .setInputCols(["document", "token"])\ .setOutputCol("embeddings")

Spark NLP version and Apache Spark

Spark 3.2.0 Spark NLP 5.2.3

Type of Spark Application

spark-submit

Java Version

No response

Java Home Directory

No response

Setup and installation

No response

Operating System and Version

No response

Link to your project (if available)

No response

Additional Information

No response

maziyarpanahi commented 8 months ago

Cloud you please provide some links:

It's not possible to help with what's provided, we do need these links or at least the end-to-end Colab notebook

ottermegazord commented 8 months ago

Give me a moment while I get the information you’ll need :)

vdksoda commented 6 months ago

Facing the same issue. My setup is interactive spark jupyter notebook with sparkmagic via livy.

Here is my setup

Spark version 3.2.2.3.2.2

{
    "driverMemory": "32G",
    "executorMemory": "16G",
    "numExecutors": 20,
    "executorCores": 5,
    "jars": ["/path/to/jars/spark-nlp-assembly-5.3.3.jar"],
    "archives": [
        "/path/to/conda-packs/conda-pack-spark-nlp-mpnetqa.tar.gz#env"
    ],
    "conf": {
        "spark.pyspark.python": "env/bin/python"
    },
    "proxyUser": "some.user"
}

The complete notebook code

%%spark
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import Row
import re
import sys
from datetime import datetime,timedelta

import sparknlp
print(sparknlp.version()). # prints 5.3.3

import pandas as pd
# from sparknlp.pretrained import PretrainedPipeline
# from sparknlp.annotator import DocumentAssembler, BertSentenceEmbeddings, SentenceDetector, MPNetForQuestionAnswering
# from sparknlp.base import EmbeddingsFinisher
# from pyspark.ml import Pipeline
# from sparknlp.base import LightPipeline, TokenAssembler

from sparknlp.base import *
from sparknlp.annotator import *
from pyspark.ml import Pipeline

documentAssembler = MultiDocumentAssembler() \
    .setInputCols(["question", "context"]) \
    .setOutputCols(["document_question", "document_context"])

spanClassifier = MPNetForQuestionAnswering.load('/path/to/hdfs/pretrained-models/mpnet_base_question_answering_squad2_en_5.2.4_3.0_1705756189243') \
    .setInputCols(["document_question", "document_context"]) \
    .setOutputCol("answer") \
    .setCaseSensitive(False)

pipeline = Pipeline().setStages([
    documentAssembler,
    spanClassifier
])

data = spark.createDataFrame([["What's my name?", "My name is Clara and I live in Berkeley."]]).toDF("question", "context")
result = pipeline.fit(data).transform(data)
result.select("answer.result").show(truncate=False)

# # Fit the model to an empty data frame so it can be used on inputs.
# empty_df = spark.createDataFrame([['','']]).toDF("question", "context")
# pipeline_model = pipeline.fit(empty_df)
# light_pipeline = LightPipeline(pipeline_model)

# embed_df = light_pipeline.transform(spark.createDataFrame([["What's my name?", "My name is Clara and I live in Berkeley."]]).toDF("question", "context"))
# # embed_df.createOrReplaceTempView("embed_df")

Giving error

An error was encountered:
An error occurred while calling o70.transform.
: java.lang.UnsupportedOperationException: Cannot cast to float version 3.2.2.3.2.2
    at com.johnsnowlabs.util.Version.toFloat(Version.scala:36)
    at com.johnsnowlabs.nlp.util.SparkNlpConfig$.getEncoder(SparkNlpConfig.scala:28)
    at com.johnsnowlabs.nlp.AnnotatorModel._transform(AnnotatorModel.scala:69)
    at com.johnsnowlabs.nlp.AnnotatorModel.transform(AnnotatorModel.scala:130)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:750)
maziyarpanahi commented 6 months ago

Hi all,

We have this toFloat() function here:

def toFloat: Float = {
    val versionString = parts.length match {
      case 1 => parts.head.toString
      case 2 => f"${parts.head.toString}.${parts(1).toString}"
      case 3 => f"${parts.head.toString}.${parts(1).toString}${parts(2).toString}"
      case _ =>
        throw new UnsupportedOperationException(
          f"Cannot cast to float version ${this.toString()}")
    }
    versionString.toFloat
  }

We use it in various places to extract the version of Apache Spark. In this case, here:

def getEncoder(inputDataset: Dataset[_], newStructType: StructType): ExpressionEncoder[Row] = {
    val sparkVersion = Version.parse(inputDataset.sparkSession.version).toFloat
    if (sparkVersion >= 3.5f) {
      val expressionEncoderClass =
        Class.forName("org.apache.spark.sql.catalyst.encoders.ExpressionEncoder")
      val applyMethod = expressionEncoderClass.getMethod("apply", classOf[StructType])
      applyMethod.invoke(null, newStructType).asInstanceOf[ExpressionEncoder[Row]]
    } else {
      try {
        // Use reflection to access RowEncoder.apply in older Spark versions
        val rowEncoderClass = Class.forName("org.apache.spark.sql.catalyst.encoders.RowEncoder")
        val applyMethod = rowEncoderClass.getMethod("apply", classOf[StructType])
        applyMethod.invoke(null, newStructType).asInstanceOf[ExpressionEncoder[Row]]
      } catch {
        case _: Throwable =>
          throw new UnsupportedOperationException(
            "RowEncoder.apply is not supported in this Spark version.")
      }
    }
  }

As you can see, this is required because of Spark not being backward compatible. We either have to drop support for previous versions of Apache Spark, or find a way to adapt conditionally.

It seems your ENV, doesn't have Apache Spark the same pattern as the other ENVs. Could you please do spark.version and show me the result?

cc @danilojsl

vdksoda commented 6 months ago

Thanks for the response.

>>> spark.version
'3.2.2.3.2.2.0-1'

Meanwhile checking with our Spark admin if there is some misconfiguration on our end. Will come back in a few days.

maziyarpanahi commented 6 months ago

Thanks, that must be it! We first noticed this in EMR, where they were adding some stuff to the version. This also looks similar, makes it pretty hard to parse really. (I think we can have pattern for Livy if this is the pattern of showing spark version)

vdksoda commented 6 months ago

Spoke to our Spark administrator and got to know that in our on-premise deployment there are mild changes made to it to suit the on-premise deployment. The in-house Spark team maintains a version number by appending to the original Spark version. Resulting in the spark.version showcased above. They are checking if there is a way to override this version number on a per Spark job / session basis.

On a separate note, would you be open to a change (PR) to consider only the the first three elements in the parts sequence vs attempting to transform the entire parts. The original author of this post also raised the same issue along with the EMR observation in the previous post; so I guess we're not the only ones impacted by the present version checking.

If yes, then I could propose such a PR.

maziyarpanahi commented 6 months ago

Thanks for looking into this on your side. Either way would work for us, if it's a PR as long as it is still backward compatible I will include it in the next release 👍🏼

github-actions[bot] commented 10 hours ago

This issue is stale because it has been open 180 days with no activity. Remove stale label or comment or this will be closed in 5 days