Open andrejfd opened 1 year ago
Hey @andrejfd :wave:! Thank you so much for reporting the issue/feature request :rotating_light:. Someone from SynapseML Team will be looking to triage this issue soon. We appreciate your patience.
@svotaw / @mhamilton723 Can you please help in looking at the lightgbm.fit
out of memory issue?
yes, the current version of SynapseML LightGBM wrapper (bulk mode) suffers from excessive memory usage, and there's not much you can do about it. The fix is to use the newer "streaming" mode which is checked in, but it is not in 10.2. We should be cutting a new version soon (within a month, hopefully sooner). If you'd like to try to get a snapshot version working now, let me know and I'll send you a maven coordinate.
@svotaw that sounds good I would love a snapshot version. Will the snapshot version carry into the new release or could it expire before then?
Here's is the last build I saw from main.
SynapseML Build and Release Information Maven Coordinates com.microsoft.azure:synapseml_2.12:0.10.2-86-49d457f8-SNAPSHOT
Maven Resolver https://mmlspark.azureedge.net/maven
The streaming mode has not been released yet, so it's not seen a lot of public testing. If you hit issues you can let me know. Use setExecutionMode("streaming")
Thanks @svotaw , any other helpful tips for success when using lightgbm with streaming execution mode? or do most of the parameters remain the same as in bulk execution mode?
Still doesn't seem to help memory footprint, but maybe my implementation is wrong.
I have about 80MM records now with 40 columns in the array (double).
I have 8 workers with 128 GB and 16 CPU cores each.
My driver is same spec.
at java.io.ObjectInputStream$HandleTable.grow(ObjectInputStream.java:3995)
at java.io.ObjectInputStream$HandleTable.assign(ObjectInputStream.java:3801)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2086)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:76)
at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$unBlockifyObject$4(TorrentBroadcast.scala:432)
at org.apache.spark.broadcast.TorrentBroadcast$$$Lambda$763/113317936.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:434)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:317)
at org.apache.spark.broadcast.TorrentBroadcast$$Lambda$692/611180653.apply(Unknown Source)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:291)
at org.apache.spark.broadcast.TorrentBroadcast$$Lambda$678/1413736270.apply(Unknown Source)
at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:286)
at org.apache.spark.broadcast.TorrentBroadcast$$Lambda$667/2000516935.apply(Unknown Source)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1647)```
This is issue I run into where each executor in the SparkUI says:
266.9 MiB / 230 (record size/record)
----
Any suggestion?
Hmmm, can you try setting binSampleCount to lower (default is 200K)? This doesn't seem to be the whole error, but it involves broadcasted data, which we do with sample data. I am actually working on a fix for broadcasting sample data, but LightGBM only approved my PR last week, so it'll be a while before we can use it on the SynapseML side. Also, is this a different memory error than you were seeing before?
Apologies for the delayed response, but yes that worked.
Is my intuition correct that given a size dataset of X increasing the number of partitions may hurt performance for this model?
Since the "batch" size becomes much smaller? i.e. should we try to minimize the number of partitions if all else equal?
The overall parameters are not really different for streaming mode.
Yes, LightGBM requires all partitions to be acted upon at once (losing the overall Spark reliability gains from normally being able to repeat failed partitions), and some extensive network interactions between every "batch" as you call it. So the more partitions you have, the more risk of having network failures break something. It's kind of combinatorial risk that way. As far as perf, it can be more performant to use fewer partitions since there is less network traffic, but it's not a simple linear relationship. Obviously you can't always shrink it down to 1 batch. :) That's the point of being able to distribute data.
But of course, the fewer partitions you have, the bigger the data being processed per executor and the risk of having things like OOM happen. This is where streaming mode can help because you can pack more data on one node without running out of space. Bulk mode is VERY inefficient (more than an order of magnitude more than streaming). But bulk vs streaming only affects dataset loading, and not the networking or training.
Curious, are you saying that reducing the sample size fixed your issue? This is unrelated to partitioning. This is just an issue where we now calculate sample data on the driver (to be consistent and reproducible), and broadcast that to the workers. This is inefficient, so we are making a change to use the sample data for what it's needed ONLY on the driver and not need to broadcast it. This change is likely weeks or more away though. We need to update our LightGBM build to use the latest checkin, and that is a manual task for us (to build and publish a new LightGBM jar package).
Yes, and for bulk execution mode I was able to tweak the tweak the spark config to get the dataset to fit in.
Decreasing spark offHeap and increase executor. Definitely seems I have maxed out the clusters.
Out of curiosity, how come LightGBM is not fault tolerant. If all the info is on driver and a task in a worker fails why can it not recover, rather than failing fit
, even if it means recalculating ops?
You'd have to ask LightGBM developers. We are just a wrapper around it. But we (SynapseML) are wrapping their "distributed" mode, which can work in any distributed environment. We happen to implement it in Spark. But their algorithm requires communication between all nodes repeatedly during the training, so I guess their performance optimizations require this. We actually do a little trick in Spark where we initialize every partition and then literally turn networking over to the native library and then take back control when it is done. This is how we've made it work inside Spark, which is normally more of a fault tolerant environment.
Awesome. Regardless thank you, and congrats on getting streaming execution mode seems to be a major upgrade for the library.
@svotaw
Tried to bump up synapse to 0.11.
Same dataset size, same parameters (I did try to switch off and on again barrierExecution), same cluster spec, as I was using with bulk execution mode successfully and now the job fails.
From looking at the executor logs looks like some task begins to hang.
Any idea what might be the culprit? Happy to provide additional details.
No, I haven't made changes in months, so not sure what would be wrong. Did you try "streaming" mode?
Please provide more details.
@svotaw Yes I am using streaming mode. It works on bulk but not on streaming.
100MM train, 20MM val, I am using a valIndicator column. Roughly 40-45 features.
executionMode = "streaming"
barrierExecution = False ## tried True as well.
numTasks = 0 # default
useSingleDatasetMode = True
I've tried toying with the number of partitions from 32 which I was using in bulk, to 256.
I am using 4 r5d.8xlarge DB workers. (256 GB, 32 Cores per worker).
Executor errors are pretty vague.
Either
java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:613)
at java.net.Socket.connect(Socket.java:561)
at java.net.Socket.(Socket.java:457)
at java.net.Socket.(Socket.java:234)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getNetworkTopologyInfoFromDriver(NetworkManager.scala:129)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$2(NetworkManager.scala:116)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:24)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.core.utils.FaultToleranceUtils$.retryWithTimeout(FaultToleranceUtils.scala:29)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.$anonfun$getGlobalNetworkInfo$1(NetworkManager.scala:111)
at com.microsoft.azure.synapse.ml.core.env.StreamUtilities$.using(StreamUtilities.scala:28)
at com.microsoft.azure.synapse.ml.lightgbm.NetworkManager$.getGlobalNetworkInfo(NetworkManager.scala:107)
at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.initialize(BasePartitionTask.scala:197)
at com.microsoft.azure.synapse.ml.lightgbm.BasePartitionTask.mapPartitionTask(BasePartitionTask.scala:132)
at com.microsoft.azure.synapse.ml.lightgbm.LightGBMBase.$anonfun$executePartitionTasks$1(LightGBMBase.scala:591)
at org.apache.spark.sql.execution.MapPartitionsExec.$anonfun$doExecute$3(objects.scala:228)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:903)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:903)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:156)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:95)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
or
ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
Shuffle read size / records per executor looks around 200 MB
Bulk task gets done in around 1 hr on the cluster, at 2 hours streaming starts to flunk out and I begin to see WARN StreamingPartitionTask:
in the logs.
Thanks for reporting. This is the first example of streaming not working and bulk working that I've seen, so hopefully we can use it to work a bug out. Are you doing a training? Or a fit?
I suspect it's related to the sample data broadcast we do currently, but would need to confirm. I'm waiting on a new build of LightGBM from someone, and then I can fix the sample data issue. We won't need to broadcast the data anymore. Hopefully within the next few weeks. As soon as I get the build, should only be a day or two to use the new features and check in.
It's working in bulk mode for you now right?
This is for a training (but calls model.fit(train) if that's what you are referring to)
It works in bulk mode for me currently yes. Making some tweaks to spark config allowed bulk mode to work pretty smoothly.
Hi @svotaw , circling back on this issue to see if there was a fix, or to see if there is any way I can help you fix it?
We have released 11.2, which finally has the last of the planned streaming features.
SynapseML version
0.10.2
System information
Describe the problem
Training LightGBM Regressor in bulk execution mode extremely finnicky with spark configuration.
I mentioned this briefly in https://github.com/microsoft/SynapseML/issues/1761 but wanted to seek additional assistance.
I am training on Databricks runtime on a dataset with approximately 200MM data points and 30 features of DoubleType for simplicity.
If I operate on ~100M data points everything runs great in a reasonable amount of time, however it is the scaling up of the data/cluster that maybe makes default spark configs out of proportion.
I have had to expand
spark.driver.maxResultSize
to64g
to get around the first set of memory constraints, but quickly run intojava Heap OOM
issues the more I increase the dataset. I have tried up tor5d.16xlarge
(64 cores 512 GB mem) as my worker time to accomodate these issues but still to no avail, additionally when monitoring executor usage in the SparkUI it does not appear I am utilizing nearly max memory even on smaller cluster size leading me to believe there is something in my configuration set up I could change.I have tried tinkering with
numBatches
parameter but find that performance significantly declines when using this so have continued withuseSingleDatasetMode = true
.This is the most common issue I get after I adjust the
maxResultSize
,java.lang.OutOfMemoryError: Java heap space
. I have tried to increase executor memory and decrease the offHeap memory but this seems like could be headed down the wrong path.Does
lightgbm.fit
really require that much more memory that I can't fit in huge executors I am using?If so, how come I don't see these types of metrics in the Ganglia UI I look at?
Here is part of the common trace I receive.
I'd be more than happy to provide additional info if you could help point in the right direction. Thanks.
Code to reproduce issue
I cannot share the data, but I have listed above the sizes of the data I work with.
Other info / logs
No response
What component(s) does this bug affect?
area/cognitive
: Cognitive projectarea/core
: Core projectarea/deep-learning
: DeepLearning projectarea/lightgbm
: Lightgbm projectarea/opencv
: Opencv projectarea/vw
: VW projectarea/website
: Websitearea/build
: Project build systemarea/notebooks
: Samples under notebooks folderarea/docker
: Docker usagearea/models
: models related issueWhat language(s) does this bug affect?
language/scala
: Scala source codelanguage/python
: Pyspark APIslanguage/r
: R APIslanguage/csharp
: .NET APIslanguage/new
: Proposals for new client languagesWhat integration(s) does this bug affect?
integrations/synapse
: Azure Synapse integrationsintegrations/azureml
: Azure ML integrationsintegrations/databricks
: Databricks integrations