microsoft / SynapseML

Simple and Distributed Machine Learning
http://aka.ms/spark
MIT License
5.05k stars 830 forks source link

[BUG] Error while using "LightGMB" on Fabric #2242

Open promisinganuj opened 3 months ago

promisinganuj commented 3 months ago

SynapseML version

1.0.4

System information

Runtime 1.2

Describe the problem

I am trying the following tutorial in a Microsoft Fabric notebook: https://learn.microsoft.com/en-us/fabric/data-science/how-to-use-lightgbm-with-synapseml

The Step 5 of this sample is failing:

from synapse.ml.lightgbm import LightGBMClassifier

model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol="Bankrupt?", isUnbalance=True
)
model = model.fit(train_data)

Here is the excerpt of the error:

Py4JJavaError                             Traceback (most recent call last)
Cell In[29], line 1
----> 1 model = model.fit(train_data)

File ~/cluster-env/trident_env/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:573, in safe_patch.<locals>.safe_patch_function(*args, **kwargs)
    571     patch_function.call(call_original, *args, **kwargs)
    572 else:
--> 573     patch_function(call_original, *args, **kwargs)
    575 session.state = "succeeded"
    577 try_log_autologging_event(
    578     AutologgingEventLogger.get_logger().log_patch_function_success,
    579     session,
   (...)
    583     kwargs,
    584 )

File ~/cluster-env/trident_env/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:252, in with_managed_run.<locals>.patch_with_managed_run(original, *args, **kwargs)
    249     managed_run = create_managed_run()
    251 try:
--> 252     result = patch_function(original, *args, **kwargs)
    253 except (Exception, KeyboardInterrupt):
    254     # In addition to standard Python exceptions, handle keyboard interrupts to ensure
    255     # that runs are terminated if a user prematurely interrupts training execution
    256     # (e.g. via sigint / ctrl-c)
    257     if managed_run:

File ~/cluster-env/trident_env/lib/python3.10/site-packages/mlflow/pyspark/ml/__init__.py:1140, in autolog.<locals>.patched_fit(original, self, *args, **kwargs)
   1138 if t.should_log():
   1139     with _AUTOLOGGING_METRICS_MANAGER.disable_log_post_training_metrics():
-> 1140         fit_result = fit_mlflow(original, self, *args, **kwargs)
   1141     # In some cases the `fit_result` may be an iterator of spark models.
   1142     if should_log_post_training_metrics and isinstance(fit_result, Model):

File ~/cluster-env/trident_env/lib/python3.10/site-packages/mlflow/pyspark/ml/__init__.py:1126, in autolog.<locals>.fit_mlflow(original, self, *args, **kwargs)
   1124 input_training_df = args[0].persist(StorageLevel.MEMORY_AND_DISK)
   1125 _log_pretraining_metadata(estimator, params, input_training_df)
-> 1126 spark_model = original(self, *args, **kwargs)
   1127 _log_posttraining_metadata(estimator, spark_model, params, input_training_df)
   1128 input_training_df.unpersist()

File ~/cluster-env/trident_env/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:554, in safe_patch.<locals>.safe_patch_function.<locals>.call_original(*og_args, **og_kwargs)
    551         original_result = original(*_og_args, **_og_kwargs)
    552         return original_result
--> 554 return call_original_fn_with_event_logging(_original_fn, og_args, og_kwargs)

File ~/cluster-env/trident_env/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:489, in safe_patch.<locals>.safe_patch_function.<locals>.call_original_fn_with_event_logging(original_fn, og_args, og_kwargs)
    480 try:
    481     try_log_autologging_event(
    482         AutologgingEventLogger.get_logger().log_original_function_start,
    483         session,
   (...)
    487         og_kwargs,
    488     )
--> 489     original_fn_result = original_fn(*og_args, **og_kwargs)
    491     try_log_autologging_event(
    492         AutologgingEventLogger.get_logger().log_original_function_success,
    493         session,
   (...)
    497         og_kwargs,
    498     )
    499     return original_fn_result

File ~/cluster-env/trident_env/lib/python3.10/site-packages/mlflow/utils/autologging_utils/safety.py:551, in safe_patch.<locals>.safe_patch_function.<locals>.call_original.<locals>._original_fn(*_og_args, **_og_kwargs)
    543 # Show all non-MLflow warnings as normal (i.e. not as event logs)
    544 # during original function execution, even if silent mode is enabled
    545 # (`silent=True`), since these warnings originate from the ML framework
    546 # or one of its dependencies and are likely relevant to the caller
    547 with set_non_mlflow_warnings_behavior_for_current_thread(
    548     disable_warnings=False,
    549     reroute_warnings=False,
    550 ):
--> 551     original_result = original(*_og_args, **_og_kwargs)
    552     return original_result

File /opt/spark/python/lib/pyspark.zip/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params)
    203         return self.copy(params)._fit(dataset)
    204     else:
--> 205         return self._fit(dataset)
    206 else:
    207     raise TypeError(
    208         "Params must be either a param map or a list/tuple of param maps, "
    209         "but got %s." % type(params)
    210     )

File ~/cluster-env/trident_env/lib/python3.10/site-packages/synapse/ml/lightgbm/LightGBMClassifier.py:2148, in LightGBMClassifier._fit(self, dataset)
   2147 def _fit(self, dataset):
-> 2148     java_model = self._fit_java(dataset)
   2149     return self._create_model(java_model)

File /opt/spark/python/lib/pyspark.zip/pyspark/ml/wrapper.py:378, in JavaEstimator._fit_java(self, dataset)
    375 assert self._java_obj is not None
    377 self._transfer_params_to_java()
--> 378 return self._java_obj.fit(dataset._jdf)

File ~/cluster-env/trident_env/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /opt/spark/python/lib/pyspark.zip/pyspark/errors/exceptions/captured.py:169, in capture_sql_exception.<locals>.deco(*a, **kw)
    167 def deco(*a: Any, **kw: Any) -> Any:
    168     try:
--> 169         return f(*a, **kw)
    170     except Py4JJavaError as e:
    171         converted = convert_exception(e.java_exception)

File ~/cluster-env/trident_env/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o7876.fit.
: java.lang.Exception: Dataset create from samples call failed in LightGBM with error: Feature (Column_) appears more than one time.
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMUtils$.validate(LightGBMUtils.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.dataset.ReferenceDatasetUtils$.createReferenceDatasetFromSample(ReferenceDatasetUtils.scala:47)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.calculateRowStatistics(LightGBMBase.scala:545)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch(LightGBMBase.scala:425)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$2(LightGBMBase.scala:62)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb(SynapseMLLogging.scala:163)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb$(SynapseMLLogging.scala:160)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logFit(SynapseMLLogging.scala:153)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logFit$(SynapseMLLogging.scala:152)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logFit(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:64)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:36)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:114)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:829)

Code to reproduce issue

from pyspark.sql import SparkSession

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

from synapse.ml.core.platform import *
from synapse.ml.lightgbm import LightGBMClassifier
from pyspark.ml.feature import VectorAssembler

df = (
    spark.read.format("csv")
    .option("header", True)
    .option("inferSchema", True)
    .load(
        "wasbs://publicwasb@mmlspark.blob.core.windows.net/company_bankruptcy_prediction_data.csv"
    )
)
# print dataset size
print("records read: " + str(df.count()))

train, test = df.randomSplit([0.85, 0.15], seed=1)

feature_cols = df.columns[1:]
featurizer = VectorAssembler(inputCols=feature_cols, outputCol="features")
train_data = featurizer.transform(train)["Bankrupt?", "features"]
test_data = featurizer.transform(test)["Bankrupt?", "features"]

display(train_data.groupBy("Bankrupt?").count())

model = LightGBMClassifier(
    objective="binary", featuresCol="features", labelCol="Bankrupt?", isUnbalance=True
)

model = model.fit(train_data)

Other info / logs

No response

What component(s) does this bug affect?

What language(s) does this bug affect?

What integration(s) does this bug affect?

promisinganuj commented 3 months ago

Please refer to https://github.com/microsoft/LightGBM/issues/6492 which was initially raised with LightGBM initially.

Illuminae commented 3 months ago

Tagging on to this; I am seeing the same error in the same Fabric Spark runtime. \EDIT - on runtime 1.1 it works for me.

ttruong001 commented 3 months ago

I've got the same issue when running the following script on Synapse studio %%configure -f { "name": "synapseml", "conf": { "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:1.0.4", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind", "spark.yarn.user.classpath.first": "true", "spark.sql.parquet.enableVectorizedReader": "false" } }

train, test = ( spark.read.parquet( "wasbs://publicwasb@mmlspark.blob.core.windows.net/BookReviewsFromAmazon10K.parquet" ) .limit(1000) .cache() .randomSplit([0.8, 0.2]) )

display(train)

from pyspark.ml import Pipeline from synapse.ml.featurize.text import TextFeaturizer from synapse.ml.lightgbm import LightGBMRegressor

model = Pipeline( stages=[ TextFeaturizer(inputCol="text", outputCol="features"), LightGBMRegressor(featuresCol="features", labelCol="rating"), ] ).fit(train)

--Updated-- It works if we use Spark 3.3 pools %%configure -f { "name": "synapseml", "conf": { "spark.jars.packages": "com.microsoft.azure:synapseml_2.12:0.11.4-spark3.3", "spark.jars.repositories": "https://mmlspark.azureedge.net/maven", "spark.jars.excludes": "org.scala-lang:scala-reflect,org.apache.spark:spark-tags_2.12,org.scalactic:scalactic_2.12,org.scalatest:scalatest_2.12,com.fasterxml.jackson.core:jackson-databind", "spark.yarn.user.classpath.first": "true", "spark.sql.parquet.enableVectorizedReader": "false" } }

memoryz commented 3 months ago

We're running into the same issue with this configuration from Azure Synapse Spark 3.4 pool.

mhamilton723 commented 3 months ago

Hi All, thank you for your patience. We have tried to get a simple repro of this and found that its something funky is going on with the natives.

...
.setDataTransferMode("bulk")

To work around this for now

memoryz commented 3 months ago

Hi All, thank you for your patience. We have tried to get a simple repro of this and found that its something funky is going on with the natives.

...
.setDataTransferMode("bulk")

To work around this for now

Confirm this works in Synapse Spark 3.4 pool. Thank you @mhamilton723 for the mitigation.

Jens-automl commented 3 months ago

Can you elaborate how to use the workaround. Thank you!

reubster commented 2 months ago

@Jens-automl - it's just a function on the LightGBMClassifier

https://mmlspark.blob.core.windows.net/docs/1.0.4/pyspark/synapse.ml.lightgbm.html