Closed chenqin closed 5 years ago
@chenqin Hi, first of all, I'd like to say thank you. Your work in distributed training has been instrumental in brining XGBoost to industrial-grade standard.
I'd like to receives your suggestions about improving the CI infrastructure, i.e. how we should test distributed training. See discussion at #4234.
For example, we should maybe test dmlc/rabit together in dmlc/xgboost, so that no future change of dmlc/rabit would break distributed XGBoost.
Thanks for all the work @chenqin ! Regarding rabbit's future, I was also wondering if it would be possible to introduce a scatter-reduce algorithm like LightGBM uses.
The benefits when user 2^x workers seem to be significant.
For job restores, what is the state that we need to be restoring? I'm not familiar with the term peer restore, does that mean communicating data and models from other nodes instead of a single node reading from HDFS? My assumption is that if a worker fails and we launch a new one, the new one would need to first read its data partition into memory from a distributed FS, then read the current model version as well. Am I missing something here?
Do you think there are ideas we can use from Flink/Spark job recovery?
I'd like to receives your suggestions about improving the CI infrastructure, i.e. how we should test distributed training. See discussion at #4234.
Sounds good, split build/test , adding OSX, LINUX, WINDOWS should be straight forward.
For example, we should maybe test dmlc/rabit together in dmlc/xgboost, so that no future change of dmlc/rabit would break distributed XGBoost.
In term of distributed XGBoost, we can start with including XGBoost4j-Spark tests as start point. Possibly next adding simulate rabit failures tests in XGBoost4j-Spark using mock engine to verify item 2)
Thanks for all the work @chenqin ! Regarding rabbit's future, I was also wondering if it would be possible to introduce a scatter-reduce algorithm like LightGBM uses.
yes, that should be item 3) once we can verify framework failure recovery works as expected. We can add more functions with good failure resilient recovery support.
The benefits when user 2^x workers seem to be significant.
For job restores, what is the state that we need to be restoring? I'm not familiar with the term peer restore, does that mean communicating data and models from other nodes instead of a single node reading from HDFS? My assumption is that if a worker fails and we launch a new one, the new one would need to first read its data partition into memory from a distributed FS, then read the current model version as well. Am I missing something here?
That's mostly correct to start a job. item 2) is mostly around the case when you have single worker failure. How distributed XGBoost deal with entire worker fleet. What we have is to drop all works since last checkpoint and restart job.(entire load data -> restore external checkpoint -> continue train) What item 2) trying to solve is only restore worker that lost (load only partial data assigned to lost worker(s)-> restore last checkpoint from other peers ->catch up on next allreduce) The catch is there will be situation where XGBoost lost too many workers in tree-reduce (most likely) topology, so we need to basically fallback to what we have right now.
Do you think there are ideas we can use from Flink/Spark job recovery?
AFAIK underlaying Flink/Spark is continuous processing and map-reduce paradigms. In map-reduce, we can retry mapper in case they lost, sync (barrier) happens after a map stage complete. In continuous processing, the sync might never happens hence flink introduced control messages to coordinate workers with different roles.
In distributed XGBoost, global sync(ALLREDUCE/BRAODCAST) happens frequently to update histogram on fixed topology (network topology is fixed after all workers registered in tracker). AllReduce -> BroadCast->AllReduce ... Unless we could prove in math we can upper bound error ratio based on partial histogram, rabit still have to block till MPI func complete each operation.
What item 2) trying to solve is only restore worker that lost (load only partial data assigned to lost worker(s)-> restore last checkpoint from other peers ->catch up on next allreduce)
OK if I understand correctly then when a worker fails in this scenario, the others continue with the iteration while we launch a new one and it reads its data partition. We complete the current iteration using the limited information from the live workers, and in the next iteration, the new worker gets the model state and we continue as before.
Unless we could prove in math we can upper bound error ratio based on partial histogram, rabit still have to block till MPI func complete each operation.
OK I see what you are saying. We have discussed early termination of histogram creation/communication with @hcho3 in the past, where concentration bounds like the Hoeffding bound can be used to prove convergence, similar to the VFDT tree or this recent work by @arapat and Freund (code here).
Linking Rabit's paper here that focuses on the fault tolerance mechanism.
I'll try to go through this and find correspondences in the code, and hopefully annotate the code with comments to make it more readable.
What item 2) trying to solve is only restore worker that lost (load only partial data assigned to lost worker(s)-> restore last checkpoint from other peers ->catch up on next allreduce)
OK if I understand correctly then when a worker fails in this scenario, the others continue with the iteration while we launch a new one and it reads its data partition. We complete the current iteration using the limited information from the live workers, and in the next iteration, the new worker gets the model state and we continue as before.
I agree we should trace back to rabit paper and see how they designed failure recovery initially. At same time, binary tree wise tree reduce might not be most performant network topology considering time cost of {allreduce-> broadcast} e.g when we try to grow tree depth on large worker cluster total number of {allreduce-> broadcast} per iteration goes up exponentially (2^number_of_depth) so as number of network hops (log (number_of_workers) ).
Unless we could prove in math we can upper bound error ratio based on partial histogram, rabit still have to block till MPI func complete each operation.
OK I see what you are saying. We have discussed early termination of histogram creation/communication with @hcho3 in the past, where concentration bounds like the Hoeffding bound can be used to prove convergence, similar to the VFDT tree or this recent work by @arapat and Freund (code here).
Thanks! Let me dig in a bit and will come back to you latter.
Rabit2 design proposal
keyword FSM, EPOLL, C10K, data linerage, map-reduce, allreduce, fast recovery, large scale, MPI
@chenqin Is this task complete?
I think we can say hardening part is complete, improving is still undergoing.
Improvement
I have been working on enable native rabit checkpoint in YARN/SPARK deployment. here is test I run to instrument failure and test recovery
https://github.com/chenqin/xgboost/blob/master/tests/cli/runsingle.sh https://github.com/chenqin/xgboost/blob/master/tests/cli/runtests.sh
so far I identified two issues with current rabit based recovery in xgb. 1) saved checkpoints missed some configurations (e.g max_depth, dsplit) Although, it can be solved relative straightforward by adding those into payload.
2) DMatrix::Load calls allreduce before loadcheckpoint which failed recovery payload size check. It turns out to be a bit tricky to fix.
In order to construct a Partitioned DMatrix and build learner (which will get injected with loadcheckpoint) we need to sync number of columns in all workers before loadcheckpoint. https://github.com/chenqin/xgboost/blob/master/src/cli_main.cc#L160
AFAIK, a simple way is to keep this configuration in tracker (each worker just call once when it runs or reruns) and expose via rabit.h with following changes https://github.com/chenqin/rabit/commit/11adddc83fa8ec0041b547fb2fe77721cc072eae https://github.com/chenqin/dmlc-core/blob/master/tracker/dmlc_tracker/tracker.py#L271-L279 here is code change in XGB https://github.com/chenqin/xgboost/blob/master/src/data/data.cc#L227-L248
then it pass failure recovery tests with rabit.
cc @hcho3 @trivialfis @CodingCat
@chenqin Thanks. Honestly I don't quite understand what's happening ... So I will stay out of this.
AFAIK, a simple way is to keep this configuration in tracker (each worker just call once when it runs or reruns) and expose via rabit.h with following changes chenqin/rabit@11adddc https://github.com/chenqin/dmlc-core/blob/master/tracker/dmlc_tracker/tracker.py#L271-L279 here is code change in XGB https://github.com/chenqin/xgboost/blob/master/src/data/data.cc#L227-L248
then it pass failure recovery tests with rabit.
cc @hcho3 @trivialfis @CodingCat
follow your github repo @chenqin , dmlc/rabit#63 is solved. but there is another issue when one worker stopped and restart. I add some logging, the error occured when gmat_.Init()
[09:56:15] INFO: /opt/xgboost/src/learner.cc:215: Tree method is selected to be 'hist', which uses a single updater grow_quantile_histmaker.
[09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:267: gbm:BoostNewTrees() start
[09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:255: GBM::InitUpdater(), pstr: 'grow_quantile_histmaker'
[09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:290: gbm:BoostNewTrees() call up->Update()
[09:56:15] INFO: /opt/xgboost/src/tree/updater_quantile_hist.cc:55: QuantileHistMaker::Update() start
[09:56:15] INFO: /opt/xgboost/src/tree/updater_quantile_hist.cc:58: QuantileHistMaker::Update() call gmat_.Init()
[09:56:16] INFO: /opt/xgboost/src/common/hist_util.cc:177: HIstCutMatrix::Init() call sreducer.Allreduce() start
[debug] allreduce_robust::TryGetResult(), data_size: 6678336, size: 342115888
Allreduce Recovered data size do not match the specification of function call.
Please check if calling sequence of recovered program is the same the original one in current VersionNumber
can you share tests case you were running?
follow your github repo @chenqin , dmlc/rabit#63 is solved. but there is another issue when one worker stopped and restart. I add some logging, the error occured when
gmat_.Init()
[09:56:15] INFO: /opt/xgboost/src/learner.cc:215: Tree method is selected to be 'hist', which uses a single updater grow_quantile_histmaker. [09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:267: gbm:BoostNewTrees() start [09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:255: GBM::InitUpdater(), pstr: 'grow_quantile_histmaker' [09:56:15] INFO: /opt/xgboost/src/gbm/gbtree.cc:290: gbm:BoostNewTrees() call up->Update() [09:56:15] INFO: /opt/xgboost/src/tree/updater_quantile_hist.cc:55: QuantileHistMaker::Update() start [09:56:15] INFO: /opt/xgboost/src/tree/updater_quantile_hist.cc:58: QuantileHistMaker::Update() call gmat_.Init() [09:56:16] INFO: /opt/xgboost/src/common/hist_util.cc:177: HIstCutMatrix::Init() call sreducer.Allreduce() start [debug] allreduce_robust::TryGetResult(), data_size: 6678336, size: 342115888 Allreduce Recovered data size do not match the specification of function call. Please check if calling sequence of recovered program is the same the original one in current VersionNumber
my test as follow:
run a tracker, using https://github.com/chenqin/dmlc-core/blob/master/tracker/dmlc_tracker/tracker.py, as python tracker.py --num-workers 2
start 2 workers, using https://github.com/dmlc/xgboost/blob/master/tests/distributed/test_basic.py
after training started for a few round, one worker killed. then restart the worker.
the conf:
{
"silent": false,
"n_estimators": 500,
"max_depth": 5,
"nthread": 10,
"learning_rate": 0.07,
"early_stopping_rounds": 50,
"subsample": 0.8,
"tree_method": "hist",
"verbosity": 3,
"eval_metric": "auc"
}
the above error was tree_method=hist
.
but, with tree_method = approx
, it worked without any error.
so it seems there is still some problem with tree_method=hist
Thanks for sharing, will take a look!
update: there is much simpler way of backfill allreduce/broadcast resbuf from other workers which should address this issue.
update 4th July: both hist and approx works with latest rabit recovery cache (orthogonal to "in same iteration checkpointing" based recovery)
Thank you, @chenqin for all the work. We are experimenting with XGBoost on a large scale and I am very interested in working on making xgboost-spark training reliable.
I read this thread and I am a bit confused. As far as I understand xgboost-spark is not reliable. At least it has TaskFailedListener which kills the job if one of tasks fail. On another hand you have a test showing that distributed xgboost without Spark can recover from a node failure (great achievement!)
Do I understand correctly that distributed training in xgboost-spark is still not reliable (in 0.9)? What should we do to make it reliable? It seems that the underneath tech support fault tolerance. I would gladly hear your opinion.
@trams xgb-spark is reliable and able to do checkpoint restore, we daily runs large number of jobs with hundreds of containers and thousands of cores deployment. This doc is tracking works around rabit (worker synchronization layer). As we speak right now, XGBSpark works fine with remote checkpoint and restart / resume job.
The goal of improvement is to allow distributed xgb tolerate a few failures without restart/resume entire training job. It can also help moving larger xgb workers on cheaper preemptive instances which can only last a few iterations with much cheaper cost.
Thank you, @chenqin for all the work. We are experimenting with XGBoost on a large scale and I am very interested in working on making xgboost-spark training reliable.
I read this thread and I am a bit confused. As far as I understand xgboost-spark is not reliable. At least it has TaskFailedListener which kills the job if one of tasks fail. On another hand you have a test showing that distributed xgboost without Spark can recover from a node failure (great achievement!)
Do I understand correctly that distributed training in xgboost-spark is still not reliable (in 0.9)? What should we do to make it reliable? It seems that the underneath tech support fault tolerance. I would gladly hear your opinion.
@trams fail-all-and-make-a-checkpoint on any error is a very typical strategy for machine learning systems.
The very original reason to fail everything on any error in spark layer is that the rabit fault tolerance is a bit off here and there leading to the stuck training process. Fixing those "offs" is the topic in this thread. In future, we should provide two types of fault tolerance strategy, auto task recovery functionality from this thread, and checkpoint in spark (to handle situation that the whole job is down for some reason)
I am glad to hear about your plans on making distributed xgboost more reliable. And thank you for a quick response
As for checkpointing it works but https://github.com/dmlc/xgboost/issues/3946 prevents us to use to full extend
@wstian can you try my latest master and see if it can pass your previous tests next week? I can see it pass https://github.com/chenqin/xgboost/blob/master/tests/cli/runhist.sh
rabit one off recovery cache https://github.com/dmlc/rabit/pull/98
@wstian can you try my latest master and see if it can pass your previous tests next week? I can see it pass https://github.com/chenqin/xgboost/blob/master/tests/cli/runhist.sh
sorry for late response.
try with your latest master:
https://github.com/chenqin/xgboost.git @807ff491d666a93586bcb2aa1e4d6f1769fc402d
https://github.com/chenqin/rabit.git @ 87c90639f3594046697179fd8dd56ee8fbaba954
and repeat the previous tests, which is
start a distributed training, with worker-0 and worker-1
after data loaded and training started, shutdown worker-0
after tracker
receive recover signal from worker-1
, restart worker-0
after the shutdown worker-0
restarted, worker-0
hit an error and exit.
[16:22:39] XGBoost distributed mode detected, will split data among workers
[16:22:39] Load part of data 0 of 2 parts
[16:22:44] 100017x10438 matrix with 330975794 entries loaded from train.200k.libsvm
Too many nodes went down and we cannot recover.., shutting down process
worker-1 also come across this error and exit.
Do I miss any dependency update, or using wrong commits?
@wstian I added more logging https://github.com/chenqin/rabit/commit/2bd993a8d0483ddb8a4661561cf2bfd16bcd8a91 please paste log from your side
The error you saw might be a rabit long existing rabit bug. in my test, it means checkpoint/loadcheckpoint collective calls sometimes unexpectedly interact with allreduce/broadcast calls from healthy nodes. Which causes trygetresult fail due to lack of necessary roles to operate (KRequestData, KHaveData)
@wstian I added more logging chenqin/rabit@2bd993a please paste log from your side
The error you saw might be a rabit long existing rabit bug. in my test, it means checkpoint/loadcheckpoint collective calls sometimes unexpectedly interact with allreduce/broadcast calls from healthy nodes. Which causes trygetresult fail due to lack of necessary roles to operate (KRequestData, KHaveData)
try with [chenqin/rabit@2bd993a]
worker-0 shut down when restart, log as below:
2019-07-09 09:54:45,298 - start-worker.py[line:92] - INFO: load train: train.200k.libsvm
[09:54:45] XGBoost distributed mode detected, will split data among workers
[09:54:45] Load part of data 0 of 2 parts
[09:54:50] 100017x10438 matrix with 330975794 entries loaded from train.200k.libsvm
[0] RecoverExec caller GetCache seq 0
[0] caller GetCache requester 1
[0] checkpoint req - |0|0|0|1|0| - |0|1|0|
[0] checkpoint ack - |0|1|0|1|1| - |33554432|0|1|
[0] role is 1 in trygetresult seqno 0 seq_counter 0
Allreduce Recovered data size do not match the specification of function call.
Please check if calling sequence of recovered program is the same the original one in current VersionNumber, shutting down process
worker-1 is still running (not shut down this time), log as below:
[1] RecoverExec caller CheckPoint_ seq 9
[1] caller CheckPoint_ requester 0
[1] checkpoint req - |33554432|1|0|0|0| - |33554432|0|0|
[1] checkpoint ack - |0|1|0|1|1| - |33554432|0|1|
[1] role is 0 in trygetresult seqno 0 seq_counter 9
@wstian quick look into log you posted, I think it was same as I saw before.
wha happens is worker-1 calls into checkpoint procedure while worker-0 is still in bootstrap/allreduce. https://github.com/chenqin/rabit/blob/test/src/allreduce_robust.cc#L362
it force worker1 and worker0 into collective checkpoint state https://github.com/chenqin/rabit/blob/test/src/allreduce_robust.cc#L1018
I guess the initial intention of this code is to see if any nodes are missing latest checkpoint payload before collective checkpointing. The problem you saw was that worker 0 caller give a buffer pointer and size that suppose to do allreduce/broadcast. When actual checkpoint payload recover starts. payload size mismatch.
working on fixing.
update: here is a fix candidate,
`
[0]@@@Hit Mock Error:CheckPoint 2019-07-09 21:23:53,771 INFO [21:23:53] [3] test-rmse:0.007253 [21:23:54] start instance-xgboost:0 [21:23:54] Load part of data 0 of 10 parts [21:23:54] 650x127 matrix with 14300 entries loaded from ../../demo/data/agaricus.txt.train [21:23:54] Load part of data 0 of 10 parts [21:23:54] 161x127 matrix with 3542 entries loaded from ../../demo/data/agaricus.txt.test [21:23:54] Tree method is specified to be 'hist' for distributed training. 2019-07-09 21:23:54,947 INFO [21:23:54] [4] test-rmse:0.006189 2019-07-09 21:23:55,044 INFO [21:23:55] [5] test-rmse:0.005330 2019-07-09 21:23:55,143 INFO [21:23:55] [6] test-rmse:0.004574 2019-07-09 21:23:55,239 INFO [21:23:55] [7] test-rmse:0.003929 2019-07-09 21:23:55,335 INFO [21:23:55] [8] test-rmse:0.003416 2019-07-09 21:23:55,431 INFO [21:23:55] [9] test-rmse:0.002966 2019-07-09 21:23:55,527 INFO [21:23:55] [10] test-rmse:0.002581 2019-07-09 21:23:55,531 INFO @tracker All nodes finishes job 2019-07-09 21:23:55,531 INFO @tracker 2.28832602501 secs between node start and job finish
` https://github.com/chenqin/rabit/commit/1e82c3369edd83c41289c0e497caefd49f831cb6
try chenqin/rabit@1e82c33
worker-0
shutdown with following log:
[0] RecoverExec caller LoadCheckPoint seq 0
[0] RecoverExec caller LoadCheckPoint seq 0
[0] RecoverExec caller LoadCheckPoint seq 0
[0] load checkpoint global 29235 version 23
[0] RecoverExec caller LoadCheckPoint seq 0
[0] RecoverExec caller Allreduce seq 0
[0] RecoverExec caller Allreduce seq 1
[0] RecoverExec caller CheckPoint_ seq 2
[0] RecoverExec caller CheckPoint_ seq 0
[0] RecoverExec caller GetCache seq 0
[0] RecoverExec caller GetCache seq 0
[0] RecoverExec caller Allreduce seq 0
[0] RecoverExec caller Allreduce seq 1
[0] send req - |1|0|0|0|0| - |4|0|0|
[0] recv ack - |1|0|0|0|1| - |33699152|0|1|
[0] pre trygetresult seqno 1 requester 1
[0] role is 1 in trygetresult seqno 1 seq_counter 1
Allreduce Recovered data size do not match the specification of function call.
Please check if calling sequence of recovered program is the same the original one in current VersionNumber, shutting down process
worker-1
log:
[1] RecoverExec caller CheckPoint_ seq 9
[1] caller CheckPoint_ requester 0
[1] RecoverExec caller CheckPoint_ seq 9
[1] caller CheckPoint_ requester 0
[1] RecoverExec caller CheckPoint_ seq 9
[1] RecoverExec caller CheckPoint_ seq 0
[1] RecoverExec caller CheckPoint_ seq 0
[1] RecoverExec caller Allreduce seq 0
[1] RecoverExec caller Allreduce seq 0
[1] RecoverExec caller Allreduce seq 1
[1] RecoverExec caller CheckPoint_ seq 2
[1] RecoverExec caller CheckPoint_ seq 0
[1] RecoverExec caller Allreduce seq 0
[1] RecoverExec caller Allreduce seq 0
[1] RecoverExec caller Allreduce seq 0
worker-0
shutdown with following log:[0] RecoverExec caller LoadCheckPoint seq 0 [0] RecoverExec caller LoadCheckPoint seq 0 [0] RecoverExec caller LoadCheckPoint seq 0 [0] load checkpoint global 17351 version 13 [0] RecoverExec caller LoadCheckPoint seq 0 [0] RecoverExec caller Allreduce seq 0 [0] RecoverExec caller Allreduce seq 1 [0] RecoverExec caller CheckPoint_ seq 2 [0] RecoverExec caller CheckPoint_ seq 0 [0] RecoverExec caller GetCache seq 0 [0] RecoverExec caller GetCache seq 0 [0] RecoverExec caller Allreduce seq 0 [0] RecoverExec caller Allreduce seq 1 [0] send req - |1|0|0|0|0| - |4|0|0| [0] recv ack - |-55050240|0|0|0|1| - |33716452|0|1| [0] pre trygetresult seqno -55050240 requester 0 AssertError:likely minimal seqno overflow, shutting down process
Thanks you quick response! This is also a known issue where I put an assert to avoid unexpected state https://github.com/chenqin/rabit/blob/test/src/allreduce_robust.cc#L932 Could you also post log from worker-1 as well, -55050240 is really weird seqno
Also, I tried to avoid seq number overflow by reducing specialOp value. Can you help give another try on this as well? https://github.com/chenqin/rabit/commit/aa2153fcc36f943e1429537d2b0c1caf4a38d80c
try chenqin/rabit@aa2153f
worker-0
shutdown, log as below:
[0] load checkpoint global 17351 version 13
[0] send req - |1|0|0|0|0| - |4|0|0|
[0] recv ack - |-55050240|0|0|0|1| - |33716452|0|1|
AssertError:likely minimal seqno overflow, shutting down process
worker-1
alive, waiting, but no other log.
humm, last try. adding assertion of negative seq no as "recv ack - |-55050240|" as well as flag logging.
passing multiple run of https://github.com/chenqin/xgboost/blob/master/tests/cli/runxgbtests.sh
@wstian will you be able to help run one last time against https://github.com/chenqin/rabit/commit/0a522a77829162f2dbea8c99487819b8b50b47c2
Let's document what is "expected" fault recovery behavior in current rabit design so we are all on same page with reasonable expectation.
allreduce/broadcast has to happen between loadcheckpoint and checkpoint calls.
in order to let failed node catchup, no "bootstrap" (before loadcheckpoint) allreduce/broadcast is allowed. Otherwise, other nodes will try to recover allreduce broadcast ops before loadcheckpoint with allreduce broadcasts after last checkpoints.
in case too many nodes are down, it will recover from very beginning.
example of supported use case:
example of not supported use case:
if node fail and recovered node run allreduce before loadcheckpoint, other nodes will not be able to recover results. A successful checkpoint also instruments all nodes clear allreduce/boradcast results from corresponding iteration. Hence recovery node will not be able to pick right results from other nodes. It also apply to histogram init within first iteration after loadcheckpoint-0. When nodes surpass first iteration, those results will be lost.
worker-0
shutdown, log as below:[0] load checkpoint global 17351 version 13 [0] send req - |1|0|0|0|0| - |4|0|0| [0] recv ack - |-55050240|0|0|0|1| - |33716452|0|1| AssertError:likely minimal seqno overflow, shutting down process
worker-1
alive, waiting, but no other log.
So I found root cause of negative overflow, it was caused by introduce additional bit and update left shift 5 bits. It cause kSpecialOp << 5 + flags greater than signed int32 range. I submitted a fix which will use unsigned_int32 instead of signed one. https://github.com/chenqin/rabit/pull/4/files#diff-5404c32553915533fabb6d48e626e98fR274 The issue should be fix.
@chenqin Can we close this one?
Yes please
Over the course of last half, I have been working on some of largest xgboost jobs. Often we face some of weird issues while running thousands of workers over billion rows of training data-set. Minor set of PRs submitted https://github.com/dmlc/dmlc-core/pull/512 https://github.com/dmlc/rabit/pull/81 https://github.com/dmlc/rabit/pull/73
As distributed training moves into production with hundreds of production models running on top of XGB, this lead to the need of harden rabit framework with better code layout/documentation as well as improved function test coverage. Here is list of work I am planning to execute and request for feedbacks from community.
increase test coverages with split of _robust implementations, add test coverage and harden on ring based restore (haven't able to find where that code path were used). There should be a waterfall flow where filesystem based checkpoint and job restart will be executed only when ring based recovery can't restore.
switch distributed xgb checkpoint restore from external checkpoint(HDFS/S3) to peer restore. Redistribute training dataset can take 30 - 40 mins pulling from HDFS, restore single worker failure (increase with cluster getting larger) without restart entire training stage can save significant amount of time. This is particularly interesting while workers running on top of preemptive instances. If small percentage of tasks might get rescheduled due to executor host get preempted, whether job can restore fast. AFAIK, not of other MPI frameworks has build such recovery mechanism to run on top of preemptive instances.
more MPI features, primitive metrics and profiling capability. Investigate job failure can be burdensome, primitive metrics report aggregation could improve user experience a lot.
Thanks, Chen