microsoft / SynapseML

Simple and Distributed Machine Learning
http://aka.ms/spark
MIT License
5.05k stars 830 forks source link

[LightGBM] Classifier hangs on fit most of the time #1967

Open donallw opened 1 year ago

donallw commented 1 year ago

SynapseML version

0.11.1

System information

Describe the problem

We have been attempting to fit a dataset of about 18 million rows and 225 columns with the synapseML LightGBM classifier. This is a binary classification problem. Each feature is a continuous float value.

When we running our hyperparameter tuning (handled by hyperopt), the model occasionally hangs on the fitting stage. By occasionally, we mean most of the time - but sometimes, if we are “lucky”, there is a chance that it will fully train the model and sometimes even run through a parameter search fully. Specifically, it hangs on the collect function on line 599 in this code: https://github.com/microsoft/SynapseML/blob/14213e25147b99611ee56946933208f8d7520560/lightgbm/src/main/scala/com/microsoft/azure/synapse/ml/lightgbm/LightGBMBase.scala

We identified this by going into the spark UI and looking at the specific operations it was running.

We have tried using barrier execution mode to no avail, as well as setting a variety of variables. I shared the variety of parameters we have tested for the classifier near the end.

We are running spark through pyspark on a jupyter notebook, and we installed synapseML via the spark packages. Our spark master/workers are hosted on a kubernetes cluster. The data is stored in parquet format on distributed storage.

Note we are also running spark with spot instances; we scale nodes up and down as we need the resources. We have attempted fixing the nodes to just a specific amount of workers however we have not attempted entirely disabling spot instances.

Code to reproduce issue

Below is how we set up our spark session, only including what may be relevant to the issue:


packages = [
    'com.microsoft.azure:synapseml_2.12:0.11.1',
]

spark = SparkSession.builder \
        .appName("ModelTraining") \
        .master(SPARK_MASTER) \
        .config("spark.kubernetes.namespace", "spark") \
        .config("spark.kubernetes.container.image", "apache/spark-py:v3.4.0") \
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
        .config("spark.executor.memory", "32g") \
        .config("spark.executor.cores", "12") \
        .config("spark.driver.memory", "15g") \
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.driver.bindAddress", "0.0.0.0") \
        .config("spark.driver.host", driver_host) \
        .config("spark.driver.port", "31137") \
        .config("spark.blockManage r.port", "7777") \
        .config("spark.storage.replication", "1") \
        .config("spark.jars.packages", ",".join(packages)) \
        .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
        .config("spark.sql.mapKeyDedupPolicy", "LAST_WIN") \
        .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
        .config("spark.driver.extraJavaOptions", "-Xss4M -XX:+UseG1GC") \
        .getOrCreate()

This is how we are preparing our data:

# Repartition data
feature_df = feature_df.repartition(72)

# Split the tagged_feature into a train, test split
train, val, test = feature_df.randomSplit([0.8, .1, 0.1], seed=1)

# Find numeric columns for feature assembler
# Uses a helper function from our preprocessing code that simply gets the feature columns
feature_generation = FeatureGeneration()
feat_columns = feature_generation.generate_feat(feature_df)

# delete unused feature df
feature_df.unpersist()

# pull out target feature columns
featurizer = VectorAssembler(inputCols=feat_columns, outputCol="features")

# Create the train and test data based on the features and split data
train_data = featurizer.transform(train)["label", "features"]
val_data = featurizer.transform(val)["label", "features"]
test_data = featurizer.transform(test)["label", "features"]

# force DAG
train_data.cache()
train_data.head()
val_data.cache()
val_data.head()
test_data.cache()
test_data.head()

And lastly, tuning and fitting the model. We have tried a wide combination of the settings below. Note this is not exactly how we run this since we actually pass this parameter space to hyperopt, but we left that out for simplicity.

num_eval = 10
boostingTypes = ['gbdt']
objectives = ['binary']
binSampleCounts = [10]

param_hyperopt= {
    'featuresCol' : hp.choice('featuresCol', ["features"]),
    'labelCol' : hp.choice('labelCol', ["label"]),
    'isUnbalance' : hp.choice('isUnbalance', [True]),
    'executionMode' : hp.choice('executionMode', ['streaming']),
    'useBarrierExecutionMode' : hp.choice('useBarrierExecutionMode', [False]),
    'boostingType': hp.choice('boostingType', boostingTypes),
    'objective': hp.choice('objective', objectives),
    'learningRate': hp.loguniform('learningRate', np.log(0.01), np.log(1)),
    'maxDepth': scope.int(hp.quniform('maxDepth', 5, 10, 1)),
    'numIterations': scope.int(hp.quniform('numIterations', 100, 250, 1)),
    'numLeaves': scope.int(hp.quniform('numLeaves', 5, 50, 1)),
    # 'numThreads' : hp.choice('numThreads', [70]),
    # 'colSampleByTree': hp.uniform('colsample_bytree', 0.6, 1.0),
    # 'regLambda': hp.uniform('reg_lambda', 0.0, 1.0),
    # 'binSampleCount' : hp.choice('binSampleCount', binSampleCounts),
    #'useSingleDatasetMode' : hp.choice('useSingleDatasetMode', [True]),
}

model = LightGBMClassifier(**param_hyperopt)
model = model.fit(train_data) # <- This will hang on the collect of line 599

Other info / logs

If we use the default executor hearbeat interval we will hit that and receive an missed heartbeat interval error. When we change it to a very large interval / network timeout we see a different error, fairly typical spark crash with not much info. The executors do not show errors in their logs.

[Stage 22:================>                                       (7 + 17) / 24]

23/05/31 17:12:31 ERROR TaskSchedulerImpl: Lost executor 0 on 10.244.3.14: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 10.0 in stage 22.0 (TID 433) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 13.0 in stage 22.0 (TID 436) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 5.0 in stage 22.0 (TID 427) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 2.0 in stage 22.0 (TID 421) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 15.0 in stage 22.0 (TID 438) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 7.0 in stage 22.0 (TID 429) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 3.0 in stage 22.0 (TID 423) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 11.0 in stage 22.0 (TID 435) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 14.0 in stage 22.0 (TID 437) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 0.0 in stage 22.0 (TID 419) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 9.0 in stage 22.0 (TID 431) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN TaskSetManager: Lost task 4.0 in stage 22.0 (TID 425) (10.244.3.14 executor 0): ExecutorLostFailure (executor 0 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_49 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_29 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_68 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_21 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_65 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_2 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_3 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_19 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_26 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_20 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_55 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_27 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_70 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_66 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_32 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_59 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_36 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_30 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_64 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_6 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_31 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_24 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_56 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_67 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_61 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_62 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_57 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_46 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_0 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_33 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_58 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_62_0 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_63 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_41 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_15 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_34 !
23/05/31 17:12:31 WARN BlockManagerMasterEndpoint: No more replicas available for rdd_32_28 !
23/05/31 17:12:33 WARN TaskSetManager: Lost task 0.1 in stage 22.0 (TID 445) (10.244.19.3 executor 2): FetchFailed(null, shuffleId=2, mapIndex=-1, mapId=-1, reduceId=0, message=
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 2 partition 0
    at org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1705)
    at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10(MapOutputTracker.scala:1652)
    at org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10$adapted(MapOutputTracker.scala:1651)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.MapOutputTracker$.convertMapStatuses(MapOutputTracker.scala:1651)
    at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorIdImpl(MapOutputTracker.scala:1294)
    at org.apache.spark.MapOutputTrackerWorker.getMapSizesByExecutorId(MapOutputTracker.scala:1256)
    at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:140)
    at org.apache.spark.shuffle.ShuffleManager.getReader(ShuffleManager.scala:63)
    at org.apache.spark.shuffle.ShuffleManager.getReader$(ShuffleManager.scala:57)
    at org.apache.spark.shuffle.sort.SortShuffleManager.getReader(SortShuffleManager.scala:73)
    at org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:208)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
    at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1523)
    at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1450)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1514)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1337)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:177)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:591)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

What component(s) does this bug affect?

What language(s) does this bug affect?

What integration(s) does this bug affect?

github-actions[bot] commented 1 year ago

Hey @donallw :wave:! Thank you so much for reporting the issue/feature request :rotating_light:. Someone from SynapseML Team will be looking to triage this issue soon. We appreciate your patience.

donallw commented 1 year ago

Seeing improved performance with a significantly reduced amount of partitions. Before was working with 32-650 partitions on my data, been experimenting with only 1-2 and seeing much better results

svotaw commented 1 year ago

Yes, the fewer partitions the less likelihood of network failures. There were also some more recent improvements to streaming mode that went in after 11.1 if you'd like to try.

also, try setting the config "spark.dynamicAllocation.enabled": "false" to turn off dynamic allocation of executors (LGBM algorithm does not handle dynamic scaling)

svotaw commented 1 year ago

We have released 11.2, which has some more improvements.

yungcero commented 1 year ago

11.2 has had some more stability for training from what I've seen so far. I believe there is a problem with fit though as it pertains to size and potentially how it partitions. Here are some logs from my job that failed. I am retraining a model and it works, but sometimes will get failure so it is unreliable:

Py4JJavaError: An error occurred while calling o9057.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 648.1 failed 4 times, most recent failure: Lost task 0.3 in stage 648.1 (TID 1772) (10.244.2.3 executor 3): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
AllardJM commented 1 year ago

I am using 11.2 and experiencing similar issues. Fit hangs before starting. It will run on really small datasets but fail on anything larger.

yungcero commented 1 year ago

Same @AllardJM. Trying to find what the "cap" for my data is but it fluctuates from time to time. The number of evals can also vary on when it fails (for instance running 15 evals it can fail on 10 or 14, etc)

svotaw commented 1 year ago

@yungcero In your stack trace above, there was a bug in logging that prevented useful information from being logged (an exception thrown logging an exception basically). I fixed that and smoothed out some of the logging load, so if you want to try with a newer version with that fixed, try the below version at the end of this comment.

Also, the 11.2 docs have some suggestion for handling dynamic scaling issues related to using LightGBM in Spark. If I interpret you correctly, you seem to indicate that for your fit() call fails during training iterations (which means the issue is not in the new streaming code and instead in LightGBM native code itself). Without logs from executors, my best guess is that some executor crashed and/or a networking problem. See docs here: https://microsoft.github.io/SynapseML/docs/Explore%20Algorithms/LightGBM/Overview/#dynamic-allocation-limitations

If you have data on the number of executors and number of partitions, let me know. Generally, to reduce random network errors causing flakiness in LightGBM, it's better to use fewer executors, or at least match partition count to executors. By default, SynapseML repartitions the data to be better matched, but you can override that. I can't tell from the logs you pasted what the counts are (partitions, executors)


SynapseML Build and Release Information Maven Coordinates com.microsoft.azure:synapseml_2.12:0.11.2-23-54a8c7f2-SNAPSHOT

Maven Resolver https://mmlspark.azureedge.net/maven

donallw commented 1 year ago

FYI Yungcero and I are working on the same project.

Num Partitions: for the tests below, we used the default partition count of around ~330. We experimented with low partitions (1-10), and were finding similar issues.

In terms of executor count, we have been using an autoscaler managed by Kubernetes that creates new executors/workers as resources kick up. Generally for these training tasks it has not scaled to more than 3 executors, typically sitting around 1 or 2.

Updated to 11.2. Still facing a similar issue. There appears to be something of a threshold where the training starts to become generally unreliable. Using the same data (225 columns of float-32 values) we started selecting a subset of the first n rows and training lightGBM binary classifiers on it using the script below. We are using as many defaults to help isolate the issue. For us, we found the threshold to be around ~12.5m rows.

# Get the number of rows in the DataFrame
num_rows = df.count()
print(f'num rows: {num_rows}')

n = int(12.5e6)

# Take the first half of the DataFrame
df = df.limit(n)

# Find numeric columns for feature assembler
feature_generation = FeatureGeneration()
new_numeric_columns = feature_generation.find_numerical_columns(df)

# Change to pull out correct feature columns not just numeric
featurizer = VectorAssembler(inputCols=new_numeric_columns, outputCol="features")

# Create the train and test data based on the features and split data
df = featurizer.transform(df)["label", "features"]
df.cache()
count = df.count()
print(count)

time1 = time.time()
model = LightGBMClassifier(executionMode="streaming",objective='binary')
model = model.fit(df)
print(f'{n} time taken: {times[-1]}')

Here is how long it took for various dataset sizes:

8000000 time taken: 146.64714694023132
8100000 time taken: 146.86330699920654
8500000 time taken: 155.5053837299347
8500000 time taken: 154.26885294914246
8600000 time taken: 158.38563346862793
8700000 time taken: 152.80049800872803
8800000 time taken: 163.29340744018555
8900000 time taken: 168.76366710662842
8900000 time taken: 163.02760076522827
9000000 time taken: 162.13767838478088
9100000 time taken: 161.89039850234985
9200000 time taken: 167.836364030838
10000000 time taken: 171.2624990940094 [much more warnings/executor losses]
10000000 time taken: 180.3660273551941
10000000 time taken: 180.1025731563568
10500000 time taken: 214.8800437450409
11000000 time taken: 216.96116757392883
12000000 time taken: 240.2725555896759
12500000 time taken: 260.7602264881134
12500000 Failed
12750000 Failed
13000000 Failed
15000000 Failed
20000000 Failed

As you can see, the time taken for each of these is not egregious. Also, at 12.5m rows the fit fails sometimes, and passes others. It appears that passing a certain threshold introduces the issues. Again these are with a larger number of partitions ~330.

It’s worth noting that we are able to run prediction fairly quickly on the full dataset of 48M rows. It is just the training that has been failing.

Here is the current spark config we are using:

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2',
    'org.apache.kafka:kafka-clients:3.3.2',
    'com.microsoft.azure:synapseml_2.12:0.11.2',
    'com.azure:azure-storage-blob:12.22.2',
    'org.apache.hadoop:hadoop-azure:3.3.5',
]

spark = SparkSession.builder \
        .appName("ModelTraining") \
        .master(SPARK_MASTER) \
        .config("spark.kubernetes.namespace", "spark") \
        .config("spark.kubernetes.container.image", "apache/spark-py:v3.4.0") \
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
        .config("spark.kubernetes.executor.podNamePrefix", "training-spark-executor") \
        .config("spark.dynamicAllocation.enabled", False) \
        .config("spark.network.timeout", "1801s") \
        .config("spark.executor.memory", "32g") \
        .config("spark.executor.cores", "12") \
        .config("spark.executor.heartbeatInterval", "1800s") \
        .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
        .config("spark.driver.extraJavaOptions", "-Xss4M -XX:+UseG1GC") \
        .config("spark.driver.memory", "15g") \
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.driver.bindAddress", "0.0.0.0") \
        .config("spark.driver.host", driver_host) \
        .config("spark.driver.port", "31137") \
        .config("spark.blockManager.port", "7777") \
        .config("spark.storage.replication", "1") \
        .config("spark.jars.packages", ",".join(packages)) \
        .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
        .config("spark.sql.mapKeyDedupPolicy", "LAST_WIN") \
        .config("spark.hadoop.fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
        .config("spark.hadoop.fs.azure.account.key.sparkmlblob.blob.core.windows.net", AZURE_BLOB_API_KEY) \
        .getOrCreate()

I just tested again using your snapshot release (everything noted above was with com.microsoft.azure:synapseml_2.12:0.11.2) and found the following logs. There never appeared to be more than 1 executor at one moment. You can see the executor chart from the SparkUI here. This is with the same data, cut to 20M rows, with ~330 partitions:

23/08/02 18:22:49 WARN TaskSetManager: Lost task 0.1 in stage 32.2 (TID 1388) (10.244.2.3 executor 2): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

23/08/02 18:23:03 WARN TaskSetManager: Lost task 0.2 in stage 32.2 (TID 1389) (10.244.2.3 executor 2): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

23/08/02 18:23:17 WARN TaskSetManager: Lost task 0.3 in stage 32.2 (TID 1390) (10.244.2.3 executor 2): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

23/08/02 18:23:17 ERROR TaskSetManager: Task 0 in stage 32.2 failed 4 times; aborting job
23/08/02 18:23:17 ERROR LightGBMClassifier: {"buildVersion":"0.11.2-23-54a8c7f2-SNAPSHOT","className":"class com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier","columns":null,"method":"train","uid":"LightGBMClassifier_0892c9b1a129"}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.2 failed 4 times, most recent failure: Lost task 0.3 in stage 32.2 (TID 1390) (10.244.2.3 executor 2): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3120)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:3120)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks(LightGBMBase.scala:623)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining(LightGBMBase.scala:598)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch(LightGBMBase.scala:446)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$2(LightGBMBase.scala:62)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb(SynapseMLLogging.scala:93)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb$(SynapseMLLogging.scala:90)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain(SynapseMLLogging.scala:84)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain$(SynapseMLLogging.scala:83)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logTrain(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:64)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:36)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [17], line 5
      3 time1 = time.time()
      4 model = LightGBMClassifier(executionMode="streaming",objective='binary')
----> 5 model = model.fit(df)
      6 print(f'{n} time taken: {times[-1]}')

File /opt/conda/lib/python3.9/site-packages/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params)
    203         return self.copy(params)._fit(dataset)
    204     else:
--> 205         return self._fit(dataset)
    206 else:
    207     raise TypeError(
    208         "Params must be either a param map or a list/tuple of param maps, "
    209         "but got %s." % type(params)
    210     )

File /opt/conda/lib/python3.9/site-packages/synapse/ml/lightgbm/LightGBMClassifier.py:2148, in LightGBMClassifier._fit(self, dataset)
   2147 def _fit(self, dataset):
-> 2148     java_model = self._fit_java(dataset)
   2149     return self._create_model(java_model)

File /opt/conda/lib/python3.9/site-packages/pyspark/ml/wrapper.py:380, in JavaEstimator._fit_java(self, dataset)
    377 assert self._java_obj is not None
    379 self._transfer_params_to_java()
--> 380 return self._java_obj.fit(dataset._jdf)

File /opt/conda/lib/python3.9/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/conda/lib/python3.9/site-packages/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File /opt/conda/lib/python3.9/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o342.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.2 failed 4 times, most recent failure: Lost task 0.3 in stage 32.2 (TID 1390) (10.244.2.3 executor 2): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3120)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:3120)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks(LightGBMBase.scala:623)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining(LightGBMBase.scala:598)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch(LightGBMBase.scala:446)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$2(LightGBMBase.scala:62)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb(SynapseMLLogging.scala:93)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb$(SynapseMLLogging.scala:90)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain(SynapseMLLogging.scala:84)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain$(SynapseMLLogging.scala:83)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logTrain(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:64)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:36)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:115)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
donallw commented 1 year ago

Matching partition now to executor (just 1 partition then?) and testing again, will post updates here

donallw commented 1 year ago

Here is output with just 1 partition, 20 millions rows:

23/08/02 18:49:44 WARN TaskSetManager: Lost task 0.0 in stage 32.1 (TID 1046) (10.244.2.3 executor 1): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

23/08/02 18:49:59 WARN TaskSetManager: Lost task 0.1 in stage 32.1 (TID 1047) (10.244.2.3 executor 1): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

23/08/02 18:50:14 WARN TaskSetManager: Lost task 0.2 in stage 32.1 (TID 1048) (10.244.2.3 executor 1): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

[Stage 32:>                                                         (0 + 1) / 1]

23/08/02 18:50:28 WARN TaskSetManager: Lost task 0.3 in stage 32.1 (TID 1049) (10.244.2.3 executor 1): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

23/08/02 18:50:28 ERROR TaskSetManager: Task 0 in stage 32.1 failed 4 times; aborting job
23/08/02 18:50:28 ERROR LightGBMClassifier: {"buildVersion":"0.11.2-23-54a8c7f2-SNAPSHOT","className":"class com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier","columns":null,"method":"train","uid":"LightGBMClassifier_0f1d4da80647"}
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.1 failed 4 times, most recent failure: Lost task 0.3 in stage 32.1 (TID 1049) (10.244.2.3 executor 1): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3120)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:3120)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks(LightGBMBase.scala:623)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining(LightGBMBase.scala:598)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch(LightGBMBase.scala:446)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$2(LightGBMBase.scala:62)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb(SynapseMLLogging.scala:93)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb$(SynapseMLLogging.scala:90)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain(SynapseMLLogging.scala:84)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain$(SynapseMLLogging.scala:83)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logTrain(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:64)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:36)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In [17], line 5
      3 time1 = time.time()
      4 model = LightGBMClassifier(executionMode="streaming",objective='binary')
----> 5 model = model.fit(df)
      6 print(f'{n} time taken: {times[-1]}')

File /opt/conda/lib/python3.9/site-packages/pyspark/ml/base.py:205, in Estimator.fit(self, dataset, params)
    203         return self.copy(params)._fit(dataset)
    204     else:
--> 205         return self._fit(dataset)
    206 else:
    207     raise TypeError(
    208         "Params must be either a param map or a list/tuple of param maps, "
    209         "but got %s." % type(params)
    210     )

File /opt/conda/lib/python3.9/site-packages/synapse/ml/lightgbm/LightGBMClassifier.py:2148, in LightGBMClassifier._fit(self, dataset)
   2147 def _fit(self, dataset):
-> 2148     java_model = self._fit_java(dataset)
   2149     return self._create_model(java_model)

File /opt/conda/lib/python3.9/site-packages/pyspark/ml/wrapper.py:380, in JavaEstimator._fit_java(self, dataset)
    377 assert self._java_obj is not None
    379 self._transfer_params_to_java()
--> 380 return self._java_obj.fit(dataset._jdf)

File /opt/conda/lib/python3.9/site-packages/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /opt/conda/lib/python3.9/site-packages/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File /opt/conda/lib/python3.9/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o343.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 32.1 failed 4 times, most recent failure: Lost task 0.3 in stage 32.1 (TID 1049) (10.244.2.3 executor 1): java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
    at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
    at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:3120)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
    at org.apache.spark.sql.Dataset.collect(Dataset.scala:3120)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executePartitionTasks(LightGBMBase.scala:623)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.executeTraining(LightGBMBase.scala:598)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.trainOneDataBatch(LightGBMBase.scala:446)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$train$2(LightGBMBase.scala:62)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb(SynapseMLLogging.scala:93)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logVerb$(SynapseMLLogging.scala:90)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logVerb(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain(SynapseMLLogging.scala:84)
    at com.microsoft.azure.synapse.ml.logging.SynapseMLLogging.logTrain$(SynapseMLLogging.scala:83)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.logTrain(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train(LightGBMBase.scala:64)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.train$(LightGBMBase.scala:36)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMClassifier.train(LightGBMClassifier.scala:27)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:151)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.parseExecutorPartitionList(NetworkManager.scala:178)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$2(NetworkManager.scala:167)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.usingMany(StreamUtilities.scala:18)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getNetworkTopologyInfoFromDriver$1(NetworkManager.scala:137)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:134)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:120)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:115)
    at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
    at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:111)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
    at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
    at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:615)
    at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:201)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:136)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
svotaw commented 1 year ago

you are still getting NullRef, so I'll need to look again. Will look in a little bit.

Also, you said you are using an auto-scaler? LighGBM does not work with dynamic scaling. It's a limitation of the algorithm implementation of the LightGBM team. see https://microsoft.github.io/SynapseML/docs/Explore%20Algorithms/LightGBM/Overview/#dynamic-allocation-limitations

svotaw commented 1 year ago

Can you actually send logs from BEFORE the exception? It's weird that the line the error is on is a comment :/

Are you sure you were using the snapshot version?

svotaw commented 1 year ago

I am going to assume that the above traces are not from the snapshot version since the line numbers look like the original 11.2 version. (snapshot version with nullref fix pasted above = com.microsoft.azure:synapseml_2.12:0.11.2-23-54a8c7f2-SNAPSHOT)

donallw commented 1 year ago

Thanks for the tip on the autoscaler. We turned it off and saw improvements in stability, but still facing the NullRef. I can confirm this is with the snapshot version. I ran this on 20m rows, repartitioned into just 1 partition, as I was running with 1 spark worker. The spark worker resource allocation are shown below.

The updated spark config with the snapshot build:

packages = [
    f'org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.2',
    'org.apache.kafka:kafka-clients:3.3.2',
     'com.microsoft.azure:synapseml_2.12:0.11.2-23-54a8c7f2-SNAPSHOT',
    'com.azure:azure-storage-blob:12.22.2',
    'org.apache.hadoop:hadoop-azure:3.3.5',
]

spark = SparkSession.builder \
        .appName("ModelTraining") \
        .master(SPARK_MASTER) \
        .config("spark.kubernetes.namespace", "spark") \
        .config("spark.kubernetes.container.image", "apache/spark-py:v3.4.0") \
        .config("spark.kubernetes.authenticate.driver.serviceAccountName", "spark") \
        .config("spark.kubernetes.executor.podNamePrefix", "training-spark-executor") \
        .config("spark.dynamicAllocation.enabled", False) \
        .config("spark.network.timeout", "1801s") \
        .config("spark.executor.memory", "32g") \
        .config("spark.executor.cores", "12") \
        .config("spark.executor.heartbeatInterval", "1800s") \
        .config("spark.executor.extraJavaOptions", "-XX:+UseG1GC") \
        .config("spark.driver.extraJavaOptions", "-Xss4M -XX:+UseG1GC") \
        .config("spark.driver.memory", "15g") \
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.driver.bindAddress", "0.0.0.0") \
        .config("spark.driver.host", driver_host) \
        .config("spark.driver.port", "31137") \
        .config("spark.blockManager.port", "7777") \
        .config("spark.storage.replication", "1") \
        .config("spark.jars.packages", ",".join(packages)) \
        .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven") \
        .config("spark.sql.mapKeyDedupPolicy", "LAST_WIN") \
        .config("spark.hadoop.fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem") \
        .config("spark.hadoop.fs.azure.account.key.sparkmlblob.blob.core.windows.net", AZURE_BLOB_API_KEY) \
        .getOrCreate()

The following PasteBin link is the output of the below code, just a basic fit on the data frame of 20m.

model = LightGBMClassifier(executionMode="streaming",objective='binary')
model = model.fit(df)

https://pastebin.com/cuS05GAF

donallw commented 1 year ago

Here is a PasteBin Link for the same exact run, but repartitioning into 3 partitions and with 3 workers set.

https://pastebin.com/1xHdWjJ1

yungcero commented 10 months ago

@svotaw any update or thoughts on this?

svotaw commented 10 months ago

Sorry for the slow response... I don't work on SynapseML much anymore. It's handled by a separate dedicated group now.

Without logs, it's hard to tell what happened. I need the log lines right before the exception. :) The general problem is that the driver is supposed to notify all executors with some information, but this information is null for some reason. I also could use full driver logs to see what the driver thinks it sent. Any chance I can get access to a full set of logs?

I guess I can make a change to throw with the info I need in the message itself. But these stack traces aren't useful as is. I need actual info logs with the existing code you are using.

Note that we just made version 1.0, so the library is going "public" per se. I will update when I have a new snapshot for you to test.

svotaw commented 10 months ago

Specifically, I am looking for the following logs (if maybe you can find them in failed runs): driver: driver expecting $numTasks connections driver received load-only status from task driver received socket from task driver writing back network topology to... driver writing back partition topology to... driver closing all sockets and server socket

executors: task $taskId, partition $partitionId received partition topology: task $taskId, partition $partitionId received nodes for network init:

svotaw commented 10 months ago

I first need to diagnose if this is a direct bug of some kind in network transfer payloads, or if maybe it's just some general timeout issue where a socket timesout listening and just returns null string.

Here's a snapshot version that would improve the logs a little. com.microsoft.azure:synapseml_2.12:1.0.1-3-eae787ce-SNAPSHOT

svotaw commented 10 months ago

So fundamentally the LightGBM library distributed mode was not designed for things like Spark/Kubernetes. It was designed for a fixed cluster of dedicated VMs. SynapseML tries to shoehorn LightGBM so it can be used over Spark (and seamlessly alongside other Notebook functions), but the fundamental limitation of the native library is still there. Also, think about your resources. If you split a million rows across N partitions, it doesn't really help if you only have 1 executor running over 1 VM (or pod over a VM). All the rows get aggregated over that 1 compute/memory resource, regardless of how you partition them. I don't know for sure but given that it works fine up until a point, I'm assuming you are likely exhausting some resource (memory or compute).

I am not as experienced with Kubernetes (SynapseML is typically used in Databricks or Synapse/Fabric), but if I were you I'd try to find a way to get a fixed number of X pods running before you call fit(). As you add more rows, X needs to be bigger to spread out the compute. It sounds to me that fit() likely succeeds if you only need 1 pod, but as you need 2+ resources get limited (or scaling kicks in which breaks LightGBM). Calling fit() takes a lot more than just prediction, which can run fairly synchronously/parallel.