dmlc / xgboost

Scalable, Portable and Distributed Gradient Boosting (GBDT, GBRT or GBM) Library, for Python, R, Java, Scala, C++ and more. Runs on single machine, Hadoop, Spark, Dask, Flink and DataFlow
https://xgboost.readthedocs.io/en/stable/
Apache License 2.0
26.22k stars 8.72k forks source link

[jvm-packages] Does xgboost spark version sync model during iterations? #1708

Closed ushanbrisk closed 6 years ago

ushanbrisk commented 7 years ago

I run a training process using xgboost spark, I use gblinear with customized obj functions. I add some debug code inside gblinear.cc to print the bias of model in each iteration. I found that different nodes (rank in rabbit) print different values in all iterations. Does this indicates that xgboost spark does not sync boosters in the training process. And all boosters accrocess nodes remain different. Then why trainWithRDD only return the first booster? Is this a bug or the sync only happens after training? Correct me if i am wrong

CodingCat commented 7 years ago

what do you mean by different values?

after each iteration, boosters sync with each other through the allreduce algorithm offered by rabit library....

ushanbrisk commented 7 years ago

Thanks for the reply. I run a 4 node , 50 iteration test on spark, submitting job to local server. In each iteration i add debug code around the gblinear bias updating part to print the bias before and bias after the updating. Given a fixed round number, I found the bias on all 4 nodes are different (e.g in round 5, the bias before on node 1-4 are different). I also found that given a fixed node, the bias before equals to the bias after of previous round. Does this mean the model is not synced and the nodes work separately. I also tried collect() all booster and dump to text, and found they are different. I looked into the code, there are no explicit Allreduce operation on model in learner.cc or gblinear.cc. I only found saveRabitCheckpoint() during iterations in XGBoost.java. Is saveRabitCheckpoint for fault tolerance purpose only? Or an implicit Allreduce operation is done in saveRabitCheckpoint. If it is implicitly done, then through what method the model.weight is set? I add debug code in Load(fi) method and found it is not called

xydrolase commented 7 years ago

My understanding is that the checkpoint is only for the lost of socket connection: if an executor/worker transiently lost connection to the tracker, then it can restore once it reconnects.

I've seen cases where an executor is lost. Despite the fact that Spark will allocate a new executor to replace the lost one, the newly allocated executor cannot restore to the previous state, mainly for two reasons:

  1. The Rabit tracker currently does not handle the event of loss of executors/trackers. In fact, there is no heartbeat mechanism in the Rabit protocol, hence the Rabit tracker is agnostic to the status of individual workers. When Spark replenish a new executor to replace the dead/lost one, although the Rabit client will send start command to the tracker, since all other workers/nodes have already started, they are not actively listening to incoming connections from the newly created worker. As a result, the replaced worker cannot connect to its neighboring nodes, which is necessary for the Allreduce operation.
  2. The Rabit tracker does not store the current state of the booster. Therefore, if an executor crashes, the replaced executor spawned by Spark cannot restore the partially trained model from the tracker.

I wonder how many people are interested in fault tolerance support in the above scenarios. I've trained many models on Spark clusters in the past month, and I've only seen an executor loss once. With that said, as the scale of distributed training escalates, the probability that one executor will fail increases. So to implement some robust fault tolerance may be worth it. But it wouldn't be possible without changing the Rabit implementation (both the tracker, and the client.)

@CodingCat @tqchen Any comments?

CodingCat commented 7 years ago

I nearly forgot this issue...gblinear does not support distributed training, gbtree implements distributed training with TreeSyncer (https://github.com/dmlc/xgboost/blob/master/src/tree/updater_sync.cc#L22)

xydrolase commented 7 years ago

But if an executor is lost, the training still hangs, correct? I may need to double check my previous statement by reading the Rabit code more carefully.

CodingCat commented 7 years ago

@xydrolase I'm just answering the original question

regarding fault tolerance, how long will xgboost model training usually take? I think it is rare for xgboost for running as long as DL models...for me, to keep things simple, I prefer to restart the complete job once there is a failure

xydrolase commented 7 years ago

It depends. Recently I trained a model with about 100 million rows and about 100 features, with max_depth = 8 and round = 500. Each round took about 20~30 seconds.

On our Spark clusters, there appear to be some slower nodes that run significantly slower than other nodes (for reason that is currently unknown), so in worst scenarios, perhaps the training takes a day.

Yeah, I kind of agree that given the training time, it's probably easier just to retrain the whole model should one or more nodes fail.

CodingCat commented 7 years ago

@xydrolase if it allows, would you mind sharing how ebay uses xgboost4j in production? a guest blog article or some short description in https://github.com/dmlc/xgboost/tree/master/demo would be great

xydrolase commented 7 years ago

@CodingCat Oh, sure. Once our machine pipeline settles, I'll be glad to add some description to the demo page.

CodingCat commented 7 years ago

thanks

edi-bice commented 7 years ago

Saying XGBoost supports YARN is misleading especially since fault tolerance is one of the key features of the latter. Model training is one thing, and yes it may not take that long, and the odds of node failure may be small, but model search via cross validation is another. I don't use the Spark interface but I've queued up many cv runs over the weekend (via dmlc-submit script) only to find out on Monday it did not make it past Friday night. Yes my DEV cluster is a mess of (~30) VMs running on Xen Hypervisor and due to some networking bug and bad timing nodes are failing left and right. That said, if dev resources are limited, this is not a deal breaker feature - I'm working my way around it.