microsoft / SynapseML

Simple and Distributed Machine Learning
http://aka.ms/spark
MIT License
5.07k stars 831 forks source link

WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests #563

Closed edsonaoki closed 5 years ago

edsonaoki commented 5 years ago

Again, CentOS7 problem, this time with the mmlspark_2.11:0.17.dev1+1.g5e0b2a0 version that @imatiach-msft posted on https://github.com/Azure/mmlspark/issues/514. Running Pyspark 3 on Yarn-Cluster mode over Cloudera (LightGBM does not work at all in Yarn-Client mode).

Code:

from pyspark.sql import SparkSession
from mmlspark import LightGBMClassifier

spark = SparkSession.builder.getOrCreate()

# ... 
# code to load df_data_featurized from a parquet file and to set label_col variable...
# ...

# Strange mechanism needed to prevent LightGBM from using a very small number of
# partitions in training, and a subsequent NullPointerException
cnt = df_data_featurized.count()
df_data_featurized = df_data_featurized.limit(cnt)

train, test = df_data_featurized.randomSplit([0.80, 0.20], seed=1)
model = LightGBMClassifier(labelCol=label_col, weightCol='weight').fit(train)

Dataframe size about 3GB. Spark configuration:

                            "spark.dynamicAllocation.maxExecutors" : "3",
                            "spark.driver.memory" : "3G",
                            "spark.executor.memory" : "25G",
                            "spark.default.parallelism": "1000",
                            "spark.network.timeout": "3600s",
                            "spark.executor.heartbeatInterval": "120s"

A lot of weird stuff happens so let me try to go one by one:

  1. Code works normally when applied to a small set of data (i.e. much smaller than 3GB)

  2. The piece of code

    cnt = df_data_featurized.count()
    df_data_featurized = df_data_featurized.limit(cnt)

    looks completely useless. However, if I don't use it, LightGBM will start training the model with a very small number of partitions, eventually leading to the same NullPointerException error described in https://github.com/Azure/mmlspark/issues/514

  3. The training seems to start with a "reduce" operation with 0.8 x 1000 = 800 partitions, as prescribed. However, afterwards, it will invariably start another "reduce" operation with a single partition and with the entire 3GB of data as input to the executor. This was the reason I used a small number of executors with a large executor memory, otherwise I will get an OutOfMemory error.

  4. The last lines of log in the executor which runs on the single partition are:

19/05/09 13:25:18 INFO Executor: Finished task 788.0 in stage 4.0 (TID 2456). 1460 bytes result sent to driver
19/05/09 13:25:22 INFO CoarseGrainedExecutorBackend: Got assigned task 2602
19/05/09 13:25:22 INFO Executor: Running task 0.0 in stage 5.0 (TID 2602)
19/05/09 13:25:22 INFO MapOutputTrackerWorker: Updating epoch to 2 and clearing cache
19/05/09 13:25:22 INFO TorrentBroadcast: Started reading broadcast variable 6
19/05/09 13:25:22 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 48.4 KB, free 12.1 GB)
19/05/09 13:25:22 INFO TorrentBroadcast: Reading broadcast variable 6 took 9 ms
19/05/09 13:25:22 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 176.8 KB, free 12.1 GB)
19/05/09 13:25:22 INFO CodeGenerator: Code generated in 103.583424 ms
19/05/09 13:25:22 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 2, fetching them
19/05/09 13:25:22 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTracker@ <omitted>:39108)
19/05/09 13:25:22 INFO MapOutputTrackerWorker: Got the output locations
19/05/09 13:25:22 INFO ShuffleBlockFetcherIterator: Getting 200 non-empty blocks out of 800 blocks
19/05/09 13:25:22 INFO TransportClientFactory: Successfully created connection to <omitted> after 2 ms (0 ms spent in bootstraps)
19/05/09 13:25:22 INFO ShuffleBlockFetcherIterator: Started 2 remote fetches in 18 ms
19/05/09 13:25:22 INFO CodeGenerator: Code generated in 211.764621 ms
19/05/09 13:25:22 INFO CodeGenerator: Code generated in 13.585673 ms
19/05/09 13:25:50 INFO TransportClientFactory: Successfully created connection to <omitted> after 1 ms (0 ms spent in bootstraps)
19/05/09 13:25:54 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 12.1 GB to disk (0  time so far)
19/05/09 13:26:38 INFO UnsafeExternalSorter: Thread 89 spilling sort data of 12.1 GB to disk (1  time so far)
19/05/09 13:27:11 INFO LightGBMClassifier: Successfully bound to port 12432
19/05/09 13:27:11 INFO LightGBMClassifier: LightGBM worker connecting to host:  <omitted> and port: 42451
19/05/09 13:27:11 INFO LightGBMClassifier: send current worker info to driver:  <omitted>:12432 
19/05/09 13:27:11 INFO LightGBMClassifier: LightGBM worker got nodes for network init:  <omitted>:12432
19/05/09 13:27:11 INFO LightGBMClassifier: LightGBM worker listening on: 12432

afterwards, it would get into a connectivity timeout error of some sort (that's why I increased "spark.network.timeout" and "spark.executor.heartbeatInterval"). When I increase both timeouts, it finally fails with:

19/05/09 13:39:07 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.

Note that from the driver side, the final lines of the log are:

19/05/09 14:51:54 INFO DAGScheduler: ShuffleMapStage 4 (reduce at LightGBMBase.scala:68) finished in 50.110 s
19/05/09 14:51:54 INFO DAGScheduler: looking for newly runnable stages
19/05/09 14:51:54 INFO DAGScheduler: running: Set()
19/05/09 14:51:54 INFO DAGScheduler: waiting: Set(ResultStage 5)
19/05/09 14:51:54 INFO DAGScheduler: failed: Set()
19/05/09 14:51:54 INFO DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[37] at reduce at LightGBMBase.scala:68), which has no missing parents
19/05/09 14:51:54 INFO YarnAllocator: Driver requested a total number of 1 executor(s).
19/05/09 14:51:55 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 176.8 KB, free 1456.6 MB)
19/05/09 14:51:55 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 48.4 KB, free 1456.5 MB)
19/05/09 14:51:55 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 10.70.22.110:35547 (size: 48.4 KB, free: 1458.3 MB)
19/05/09 14:51:55 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1006
19/05/09 14:51:55 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[37] at reduce at LightGBMBase.scala:68) (first 15 tasks are for partitions Vector(0))
19/05/09 14:51:55 INFO YarnClusterScheduler: Adding task set 5.0 with 1 tasks
19/05/09 14:51:55 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 2602, <omitted>, executor 1, partition 0, NODE_LOCAL, 4991 bytes)
19/05/09 14:51:55 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on <omitted>:46663 (size: 48.4 KB, free: 12.2 GB)
19/05/09 14:51:55 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 2 to <omitted>:60648
19/05/09 14:51:55 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 750 bytes

Any help would be appreciated.

imatiach-msft commented 5 years ago

@edsonaoki this debug line here indicates you are running everything on just one node:

19/05/09 13:27:11 INFO LightGBMClassifier: LightGBM worker got nodes for network init:  <omitted>:12432

You should see multiple comma-separated nodes here. Maybe we could debug over skype (or some other conference call system where you can share desktop)? I'm sure it's a bug that we should fix, but with different cluster configurations it's difficult to reproduce some issues. You definitely shouldn't need to use this code:

cnt = df_data_featurized.count()
df_data_featurized = df_data_featurized.limit(cnt)

I also see this in the logs:

19/05/09 14:51:54 INFO YarnAllocator: Driver requested a total number of 1 executor(s).

However you requested a max of 3 executors. I wonder why you are only getting one.

edsonaoki commented 5 years ago

Hi @imatiach-msft, the reason that it only gets one executor is that the reduce operation has only 1 partition. However, when the training starts, it uses the prescribed number of partitions (800). Is there some step at LightGBM that involves shuffling / coalescing / repartitioning / collecting, that explains the number of partitions changing from 800 to 1?

Meanwhile I will try to extract some additional logs / screenshots. Unfortunately I cannot show the full logs as they contain confidential information.

edsonaoki commented 5 years ago

btw my data is extremely unbalanced (say 1 positive label for hundreds of negative labels), does this have any impact on how LightGBM generates the partitions?

edsonaoki commented 5 years ago

Another piece of information: if I do a repartition on the training dataset immediately before calling the fit function, LightGBM will use more than 1 partition (but still much less than the repartition number).

imatiach-msft commented 5 years ago

@edsonaoki yes, there is a known issue where lightgbm gets stuck in training if some partition has only a single label: https://github.com/Azure/mmlspark/issues/542 The workers with a partition that only has a single label exit early while the other workers wait, which eventually leads to a timeout as the training gets stuck. To mitigate, you have to shuffle your data or ensure at least one positive and negative example exists in each partition (eg by using stratified split across partitions). However, in your case that doesn't seem to be the issue from the logs because only a single worker is initialized.

imatiach-msft commented 5 years ago

@edsonaoki ah, I should have noticed this earlier, I know this issue, you are using limit function from above - I just saw this a couple weeks ago from another dev: "yeah... so the limit seems to pull everything into a single partition" eg see: https://stackoverflow.com/questions/49143266/repartitioning-dataframe-from-spark-limit-function It's not advisable to use the limit function in spark as it puts everything in one partition. Can you try to avoid the limit function? What do you see if you don't use it?

imatiach-msft commented 5 years ago

@edsonaoki it sounds like the real problem we need to resolve is the null pointer exception you are getting

edsonaoki commented 5 years ago

@imatiach-msft you are right, I will take out the limit and do the label redistribution you suggested. However, note that even if I repartition just before the training, the number of partitions used by LightGBM is much smaller than the number of partitions of the dataframe. Is there are any explanation for that?

imatiach-msft commented 5 years ago

@edsonaoki yes, that is by design, since lightgbm has to run all workers at the same time: https://github.com/Azure/mmlspark/blob/master/src/lightgbm/src/main/scala/LightGBMBase.scala#L45 we coalesce the dataset to the number of workers. We use the native lightgbm code which significantly reduces network communication for training, but this means we can't run on asynchronous partitions/tasks at a time, unlike spark estimators and transformers. I have thought about adding such a mode but Guolin, the main author of lightgbm, said this would be the best way to integrate lightgbm into mmlspark. However, if you can't fit the full dataset into memory you can run lightgbm in batches, where we split the dataset input multiple chunks, train on each one at a time, and then reinitialize a new learner, but accuracy may suffer compared to training on the full dataset at once without batching: https://github.com/Azure/mmlspark/blob/master/src/lightgbm/src/main/scala/LightGBMParams.scala#L168

edsonaoki commented 5 years ago

That was useful, I will try again, thanks!

edsonaoki commented 5 years ago

@imatiach-msft I removed the "limit" and split the dataset into 240 partitions (say partition set A), making sure all partitions have the same fraction (about 1/600) of positive and negative labels. When LightGBM starts the training, it coalesces to 224 partitions (say partition set B). Assuming it's an actual coalescence and not a repartition, each new partition of partition set B should contain 1+ partitions of partition set A, and thus positive and negative labels in all partitions. Could you confirm this understanding?

Unfortunately, the executors still gets "frozen" after the following debug messages:

19/05/13 14:20:08 INFO PythonRunner: Times: total = 515, boot = 326, init = 189, finish = 0
19/05/13 14:20:08 INFO CodeGenerator: Code generated in 119.256145 ms
19/05/13 14:20:08 INFO ShuffleBlockFetcherIterator: Getting 277 non-empty blocks out of 422 blocks
19/05/13 14:20:08 INFO ShuffleBlockFetcherIterator: Started 13 remote fetches in 5 ms
19/05/13 14:20:08 INFO PythonRunner: Times: total = 224, boot = 15, init = 209, finish = 0
19/05/13 14:20:08 INFO ShuffleBlockFetcherIterator: Getting 277 non-empty blocks out of 422 blocks
19/05/13 14:20:08 INFO ShuffleBlockFetcherIterator: Started 13 remote fetches in 16 ms
19/05/13 14:20:08 INFO PythonRunner: Times: total = 136, boot = 4, init = 132, finish = 0
19/05/13 14:20:08 INFO ShuffleBlockFetcherIterator: Getting 277 non-empty blocks out of 422 blocks
19/05/13 14:20:08 INFO ShuffleBlockFetcherIterator: Started 13 remote fetches in 6 ms
19/05/13 14:20:08 INFO PythonRunner: Times: total = 303, boot = 26, init = 277, finish = 0
19/05/13 14:20:08 INFO ShuffleBlockFetcherIterator: Getting 277 non-empty blocks out of 422 blocks
19/05/13 14:20:08 INFO ShuffleBlockFetcherIterator: Started 13 remote fetches in 3 ms
19/05/13 14:20:09 INFO PythonRunner: Times: total = 322, boot = 64, init = 258, finish = 0
19/05/13 14:20:09 INFO ShuffleBlockFetcherIterator: Getting 277 non-empty blocks out of 422 blocks
19/05/13 14:20:09 INFO ShuffleBlockFetcherIterator: Started 13 remote fetches in 4 ms
19/05/13 14:20:09 INFO PythonRunner: Times: total = 190, boot = 14, init = 176, finish = 0
19/05/13 14:20:09 INFO ShuffleBlockFetcherIterator: Getting 277 non-empty blocks out of 422 blocks
19/05/13 14:20:09 INFO ShuffleBlockFetcherIterator: Started 13 remote fetches in 41 ms
19/05/13 14:20:13 INFO LightGBMClassifier: Successfully bound to port 12416
19/05/13 14:20:13 INFO LightGBMClassifier: LightGBM worker connecting to host: 10.64.22.3 and port: 46213
19/05/13 14:20:13 INFO LightGBMClassifier: send current worker info to driver: 10.64.22.31:12416 

From the driver's side, nothing interesting either:

19/05/13 14:20:07 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.116:12528
19/05/13 14:20:07 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:07 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.114:12544
19/05/13 14:20:07 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:08 INFO LightGBMClassifier: driver received socket from worker: 10.70.21.46:12592
19/05/13 14:20:08 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:09 INFO LightGBMClassifier: driver received socket from worker: 10.70.21.46:12576
19/05/13 14:20:09 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:09 INFO LightGBMClassifier: driver received socket from worker: 10.70.21.46:12608
19/05/13 14:20:09 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:11 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.13:12496
19/05/13 14:20:11 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:12 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.112:12512
19/05/13 14:20:12 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:12 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.122:12432
19/05/13 14:20:12 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:13 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.120:12624
19/05/13 14:20:13 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:13 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.120:12560
19/05/13 14:20:13 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:13 INFO LightGBMClassifier: driver received socket from worker: 10.64.22.31:12416
19/05/13 14:20:13 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:13 INFO LightGBMClassifier: driver received socket from worker: 10.64.22.31:12480
19/05/13 14:20:13 INFO LightGBMClassifier: driver accepting a new connection...
19/05/13 14:20:14 INFO LightGBMClassifier: driver received socket from worker: 10.64.22.31:12464
19/05/13 14:20:14 INFO LightGBMClassifier: driver accepting a new connection...

Configuration is as folllows:

"spark.dynamicAllocation.maxExecutors" : "15",
"spark.driver.memory" : "9G",
"spark.executor.memory" : "6G",
"spark.default.parallelism": "480",
"spark.network.timeout": "900s",
"spark.executor.heartbeatInterval": "60s"

You mentioned that LightGBM would create 1 partition per executor, but in this case it created 224 partitions, when there are only 15 executors. Is that something abnormal?

imatiach-msft commented 5 years ago

@edsonaoki it coalesces to executor-cores, so if you have 15 executors then there must be about 15 or 16 cores per executor. I'm not exactly sure how the 224 is calculated in your case. You may also be running into a special case where you have some empty partitions, which was fixed in this PR and should be in v0.17, so it shouldn't be a problem: https://github.com/Azure/mmlspark/commit/572b2f89c16e168cad5bfbf9dfd1727753e3b1ba Could you post the rest of the logs from the driver? It looks like it didn't even start training from the logs you sent above. Also, I've found it much easier and quicker to debug over something like skype, there are so many different logs in a spark cluster and it's a real process to figure out what went wrong (eg, looking at yarn ui to see the errors from all workers, then looking at a specific worker, etc).

edsonaoki commented 5 years ago

hi @imatiach-msft sorry for the delay. I run a test today with verbosity=4, here is what I get from the executor's side:

19/05/27 09:12:40 INFO CodeGenerator: Code generated in 26.890796 ms
19/05/27 09:12:40 INFO CodeGenerator: Code generated in 11.146674 ms
19/05/27 09:12:40 INFO InternalParquetRecordReader: RecordReader initialized will read a total of 10185 records.
19/05/27 09:12:40 INFO InternalParquetRecordReader: at row 0. reading next block
19/05/27 09:12:40 INFO InternalParquetRecordReader: block read in memory in 12 ms. row count = 10185
19/05/27 09:12:40 INFO LightGBMClassifier: Successfully bound to port 12416
19/05/27 09:12:40 INFO LightGBMClassifier: LightGBM worker connecting to host: 10.64.22.3 and port: 38415
19/05/27 09:12:40 INFO LightGBMClassifier: send current worker info to driver: 10.64.22.30:12416 
19/05/27 09:12:44 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:12:49 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:13:19 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:13:24 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:13:54 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:13:59 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:14:29 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:14:34 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:15:04 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:15:09 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:15:39 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:15:44 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:16:14 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:16:19 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.

And from the driver's side:

19/05/27 09:12:50 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.127:12672
19/05/27 09:12:50 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:12:50 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.125:12576
19/05/27 09:12:50 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:12:50 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.112:12496
19/05/27 09:12:50 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:12:50 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.112:12448
19/05/27 09:12:50 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:12:50 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.112:12480
19/05/27 09:12:50 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:12:50 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.112:12432
19/05/27 09:12:50 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:12:51 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.118:12688
19/05/27 09:12:51 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:12:51 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.118:12656
19/05/27 09:12:51 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:12:51 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.118:12608
19/05/27 09:12:51 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:12:51 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.118:12624
19/05/27 09:12:51 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:12:51 INFO LightGBMClassifier: driver received socket from worker: 10.70.22.121:12720
19/05/27 09:12:51 INFO LightGBMClassifier: driver accepting a new connection...
19/05/27 09:13:03 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:13:08 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:13:38 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:13:38 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:14:08 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:14:08 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:14:38 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:14:38 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:15:08 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:15:08 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:15:38 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:15:43 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.
19/05/27 09:16:13 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:16:18 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.

Spark configuration (Cloudera stack):

spark.authenticate=false
spark.dynamicAllocation.enabled=true
spark.dynamicAllocation.executorIdleTimeout=3600
spark.dynamicAllocation.minExecutors=0
spark.dynamicAllocation.schedulerBacklogTimeout=1
spark.eventLog.enabled=true
spark.io.encryption.enabled=false
spark.network.crypto.enabled=false
spark.serializer=org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled=true
spark.shuffle.service.port=7337
spark.ui.enabled=true
spark.ui.killEnabled=true
spark.executor.extraJavaOptions=-javaagent:/opt/cloudera/parcels/UNRAVEL_SENSOR/lib/java/btrace-agent.jar=config=executor,libs=spark-2.2
spark.driver.extraJavaOptions=-javaagent:/opt/cloudera/parcels/UNRAVEL_SENSOR/lib/java/btrace-agent.jar=script=StreamingProbe.btclass,config=driver,libs=spark-2.2
spark.port.maxRetries=100
spark.unravel.server.hostport=<masked>
spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version=2
spark.hadoop.mapreduce.fileoutputcommitter.cleanup-failures.ignored=true
spark.master=yarn
spark.submit.deployMode=cluster
spark.sql.hive.metastore.jars=${env:HADOOP_COMMON_HOME}/../hive/lib/*:${env:HADOOP_COMMON_HOME}/client/*
spark.sql.hive.metastore.version=1.1.0
spark.sql.catalogImplementation=hive
spark.eventLog.dir=hdfs://hanameservice/user/spark/spark2ApplicationHistory
spark.yarn.historyServer.address=<masked>
spark.yarn.jars=local:/opt/cloudera/parcels/SPARK2-2.2.0.cloudera2-1.cdh5.12.0.p0.232957/lib/spark2/jars/*
spark.driver.extraLibraryPath=/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/lib/hadoop/lib/native
spark.executor.extraLibraryPath=/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/lib/hadoop/lib/native
spark.yarn.am.extraLibraryPath=/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/lib/hadoop/lib/native
spark.hadoop.mapreduce.application.classpath=
spark.hadoop.yarn.application.classpath=
spark.yarn.config.gatewayPath=/opt/cloudera/parcels
spark.yarn.config.replacementPath={{HADOOP_COMMON_HOME}}/../../..
spark.yarn.appMasterEnv.PYSPARK_PYTHON=/opt/cloudera/parcels/Anaconda-4.4.0/bin/python
spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON=/opt/cloudera/parcels/Anaconda-4.4.0/bin/python
spark.driver.memory=6605m
spark.hadoop.hadoop.treat.subject.external=true
edsonaoki commented 5 years ago

I disabled the connection to the Unravel server and the corresponding error messages disappear:

19/05/27 09:16:13 INFO JsonImpl: Retrying to reach Unravel server.
19/05/27 09:16:18 WARN JsonImpl: Error while sending REST request. Temporary disabling JSON requests.

Unfortunately, the worker nodes are still stuck at "19/05/27 09:12:40 INFO LightGBMClassifier: send current worker info to driver: ..." Note that I also set verbosity to 4.

edsonaoki commented 5 years ago

@imatiach-msft I noticed something else. It seems the driver is not getting all connections it is expecting:

19/05/31 14:17:25 INFO LightGBMClassifier: driver expecting 36 connections... followed by only 25 messages "INFO LightGBMClassifier: driver received socket from worker: ..."

I understand that until the driver gets all 36 connections, it is not going to pass through "driver writing back to all connections: ..." and the task will hang?

edsonaoki commented 5 years ago

It's strange because I set the number of executors to 25 and the number of cores per executor to 1... so it creates only 25 executors. However when I look at the number of tasks in the Spark UI it says 36 executors, and the Driver also expects 36 executors...

edsonaoki commented 5 years ago

I found that the problem is related to the Yarn dynamic allocation. When dynamic allocation is turned on, LightGBM for some reason always creates more tasks than the actual number of executors (by default Yarn uses 1 core per executor). As consequence, the driver hangs on forever since if there are X executors and X+Y tasks:

When I turn off dynamic allocation the number of created tasks is not greater than the number of executors, so that the training starts.

imatiach-msft commented 5 years ago

@edsonaoki sorry, yes, dynamic allocation is not supported, please see this issue: https://github.com/Azure/mmlspark/issues/319 and also this related PR which I started on but isn't working currently: https://github.com/Azure/mmlspark/pull/369

edsonaoki commented 5 years ago

@imatiach-msft is there a way to not have all the tasks starting at the same time, i.e. having some tasks in queue while other run, as in Spark MLLib GB? Currently I can't allocate more than a few dozen cores and a few GB per memory per core, which makes it sort of impossible to run LightGBM training over the amount of data that I have if all the tasks start at the same time...

edsonaoki commented 5 years ago

Finally, I managed to run LightGBM for a reasonably large dataset. So these are my guidelines (for Cloudera Spark 2.2 + Yarn + CentOS 7 environment):

  1. Use the mmlspark_2.11:0.17.dev1+1.g5e0b2a0 version
  2. Launch Spark job in cluster mode (client mode won't work, probably because of some networking issue involving executors connecting to the driver)
  3. Turn of dynamic allocation
  4. Explicitly set the exact number of executors that you are going to use and set one core per executor. The job may fail if Yarn cannot provide the requested number of executors
  5. Split your training data set in a number of partitions smaller than the number of executors (can't tell exactly how much smaller. This is needed because apparently, sometimes LightGBM running on Yarn creates a few more tasks than the number of partitions). For classification, make sure that each partition contains at least one row of each label. For Pyspark, in order to do so, I used the function "repartition_df_equal_label_fractions" below:
def split_df_equal_label_fractions(spark, df, n_partitions, label_col):
  def get_single_partitition_rdd(idx, iterator, select_idx):
    if idx == select_idx:
      yield list(iterator)
    return
    yield
  pdf_label_info = df.select(label_col).distinct().toPandas()
  labels = pdf_label_info[label_col].values
  single_label_dfs = []
  for label in labels:
    if isinstance(label, str):
      single_label_dfs.append(df.where("%s='%s'" % (label_col, label)))
    else:
      single_label_dfs.append(df.where("%s=%d" % (label_col, label)))
    single_label_dfs[-1] = single_label_dfs[-1].repartition(n_partitions) 
  single_partition_dfs = []
  for j in range(n_partitions):
    single_partitionlabel_dfs = []
    for i in range(len(single_label_dfs)):
      single_partitionlabel_rdd = single_label_dfs[i].rdd.mapPartitionsWithIndex(lambda idx, iterator: get_single_partitition_rdd(idx, iterator, j))\
        .flatMap(lambda x: x)
      single_partitionlabel_dfs.append(spark.createDataFrame(single_partitionlabel_rdd, df.schema))
    single_partition_df = single_partitionlabel_dfs[0]
    for single_partitionlabel_df in single_partitionlabel_dfs[1:]:
      single_partition_df = single_partition_df.union(single_partitionlabel_df)
    single_partition_dfs.append(single_partition_df)
  return single_partition_dfs

def repartition_df_equal_label_fractions(spark, df, n_partitions, label_col):
  single_partition_dfs = split_df_equal_label_fractions(spark, df, n_partitions, label_col)
  new_df = single_partition_dfs[0].coalesce(1)
  for single_partition_df in single_partition_dfs[1:]:
    new_df = new_df.union(single_partition_df.coalesce(1))
  return new_df
imatiach-msft commented 5 years ago

hi @edsonaoki , really sorry for all the trouble you had to go through to make this work. For issue 4: "Explicitly set the exact number of executors that you are going to use and set one core per executor. The job may fail if Yarn cannot provide the requested number of executors"

I recently fixed an issue where if there are multiple cores per executor the driver may expect more workers than there are and it will wait until a timeout error which surfaces as a java.lang.NullPointerException, but the fix is not yet released; there is a private build here:

586

--packages com.microsoft.ml.spark:mmlspark_2.11:0.17.dev21+1.ga443f29 and --repositories https://mmlspark.azureedge.net/maven

with the fix for this issue.

For this issue: "Split your training data set in a number of partitions smaller than the number of executors" There is a different code path that basically runs mappartitions twice if your number of partitions is less than number of executors - which is something that for performance reasons should really be avoided. This indicates that the logic to estimate the number of workers is not working here: https://github.com/Azure/mmlspark/blob/master/src/lightgbm/src/main/scala/LightGBMUtils.scala#L213

    blockManager.master.getMemoryStatus.toList.flatMap({ case (blockManagerId, _) =>
      if (blockManagerId.executorId == "driver") None
      else Some((blockManagerId.executorId.toInt, getHostToIP(blockManagerId.host)))
}).toArray

so there seems to be something wrong with that logic on your cluster.

LightGBM needs to know all of the workers ahead of time for the NetworkInit call (and provide their host, port to form a network ring where the lightgbm distributed processing logic can take over):

https://github.com/microsoft/LightGBM/blob/master/include/LightGBM/c_api.h#L1000

I'm not quite sure how to fix the issue on your cluster though. I need to make this initialization part of the logic much more robust on different cluster configurations.

edsonaoki commented 5 years ago

hi @imatiach-msft, thanks, I understand, I will close this issue such that we can focus on other problems.