Open CodingCat opened 5 years ago
@trams welcome for the feedback
First, I want to thank you for a comprehensive RFC. I like it.
Second, let me quickly reiterate your proposition to make sure I understand correctly.
You propose to make partitioning deterministic by using some kind of "hash" function which result will be based purely on data value.
You propose to add to XGBoost4J an ability to save a booster to an OutputStream. This will enable us to store them in HDFS. This will also permit us to move checkpoint saving to executors (it can be performed by task 0 or by task number which is determined by a checkpoint number (to spread the load)). This would in turn allow us to launch only one Spark job to train a complete model and we would not suffer any performance degradation due to cache loss if a case of no failure cause we won't be restarting the training in this case (as you mentioned here https://github.com/dmlc/xgboost/issues/4785 that is one of alternative solutions I mentioned)
When we restart the training from a checkpoint we rebuild caches so only the first step will pay a performance penalty.
Tell me if I've got something wrong.
And now I want to share my comments point by point
Either way I would like to see our partitioning deterministic and I would gladly help to do this.
Things we must be very careful about.
I had some time to think about your comment here. I guess I just have irrational fears about Spark suddenly using speculative execution or just restarting my tasks but this has nothing to do with checkpoints cause xgboost-spark does not work with speculative execution or tasks restarts without checkpoints too.
Overall I think it is definitely much easier then exposing the "state" of the training and I agree that exposing caches may not be a good idea design and API wise (cache is an implementation detail)
I agree with your analysis here
Can we test if mod based partition offers approx uniform data distribution?
partitionId = math.abs(preventOverflow(row.getHashCode + row.getAs[Float](math.abs(row.getHashCode) % number_non_zero_features)) % numWorkers
Can we test if mod based partition offers approx uniform data distribution?
partitionId = math.abs(preventOverflow(row.getHashCode + row.getAs[Float](math.abs(row.getHashCode) % number_non_zero_features)) % numWorkers
no, it will not without any injected randomization, mod based partition is known to subject to data skew
however the proposed strategy here is based on
plus
which creates certain level of randomness for load balancing
First, I want to thank you for a comprehensive RFC. I like it.
Second, let me quickly reiterate your proposition to make sure I understand correctly.
Reiteration
- You propose to make partitioning deterministic by using some kind of "hash" function which result will be based purely on data value.
yes
- You propose to add to XGBoost4J an ability to save a booster to an OutputStream. This will enable us to store them in HDFS. This will also permit us to move checkpoint saving to executors (it can be performed by task 0 or by task number which is determined by a checkpoint number (to spread the load)).
yes
This would in turn allow us to launch only one Spark job to train a complete model and we would not suffer any performance degradation due to cache loss if a case of no failure cause we won't be restarting the training in this case (as you mentioned here #4785 that is one of alternative solutions I mentioned)
yes
- When we restart the training from a checkpoint we rebuild caches so only the first step will pay a performance penalty.
yes
Tell me if I've got something wrong.
And now I want to share my comments point by point
My comments
- "Deterministic partitioning" I am all for! I am just a bit confused regarding the proposed partitioning function. Could you elaborate a bit. More specifically I am a bit confused regarding the following
- the result function returns Float. Should we truncate? Or is it implied?
- I guess I do not fully understand why current partitioning which is done using HashPartitioner (correct me if I am wrong) is bad. Row.hashCode is suppose to be quite good and it is based on Murmur. But if that hash value is not deterministic (your observation implies it) then I would like to understand why and if it is possible to fix it. Maybe it is because of Vector class we use... (I would like to hear your opinion)
the current partitioning triggered by a repartition(N)
is not based on HashParititioner, it is more like a RoundRobin manner with a randomized start, check https://github.com/apache/spark/blob/02a0cdea13a5eebd27649a60d981de35156ba52c/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L469-L487
Either way I would like to see our partitioning deterministic and I would gladly help to do this.
- You propose to add to XGBoost4J an ability to save a booster to an OutputStream.
Things we must be very careful about.
- Spark discourages writing to HDFS directly from executors but I googled and it is possible (slightly tricky but not complex)
I don't know why it is not supposed to call HDFS API directly from a task
- I suggest to save a checkpoint to a temp file and then atomically commit it by using atomic move.
while adding this is not something complicated and good to have, but currently I am not doing it and didn't see any issue, it's just a checkpoint file...I will add it when upstream this part
- I suggest to expose enough logging to ease debugging
yes
- I suggest to consider exposing some information about the progress though custom Spark Accumulators but I need to think about it a bit
yes, but I don't think it belongs to this RFC, AnalyticsZoo does use this strategy to report many metrics, however, it relies on the fact that it runs as one job per MicroBatch
I had some time to think about your comment here. I guess I just have irrational fears about Spark suddenly using speculative execution or just restarting my tasks but this has nothing to do with checkpoints cause xgboost-spark does not work with speculative execution or tasks restarts without checkpoints too.
Overall I think it is definitely much easier then exposing the "state" of the training and I agree that exposing caches may not be a good idea design and API wise (cache is an implementation detail)
- When we restart the training from a checkpoint we rebuild caches so only the first step will pay a performance penalty.
I agree with your analysis here
Thank you for your response. Here are some comments to clarify what I meant
the current partitioning triggered by a repartition(N) is not based on HashParititioner, it is more like a RoundRobin manner with a randomized start, check https://github.com/apache/spark/blob/02a0cdea13a5eebd27649a60d981de35156ba52c/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L469-L487
I did not know that. Thank you for the pointer
I don't know why it is not supposed to call HDFS API directly from a task
Just to make it clear I am not against writing to HDFS from the task directly. I was merely pointed out that it makes me uneasy cause it breaks Spark computational model (tasks should not have side effects). That being said xgboost-spark is breaking it by design :)
Also that is the reason why I suggested to "commit" checkpoint atomically. My concern is a undetected corrupted checkpoint in a case of a user (or a script) killing the job. My use case is restart or the machine which launches the job. I understand that this use case is far fetched and we are probably safeguarded by the fact that it is probably impossible to write the file with a booster partially and still get a valid booster
You propose to make partitioning deterministic by using some kind of "hash" function which result will be based purely on data value.
Hmm, is it a good thing for data diversity on learning? Does not it make some bias on data distribution?
You propose to make partitioning deterministic by using some kind of "hash" function which result will be based purely on data value.
Hmm, is it a good thing for data diversity on learning? Does not it make some bias on data distribution?
it will not,
(1) the input to the partition is not based on a single feature, but all features, so suppose we have [0, 0, 1] in the first row, [0, 0, 2] in the second row, in most of cases they will be in different partition
(2) there is a step in xgb to get global data sketch, so say you have [0,0,1] in worker 1, [1,1,1] in worker 2, the distribution of each feature will eventually get synced
Hi @CodingCat , In XGBoost using Spark, after the change made here 1 which (if I understood correctly) enables the deterministic data partitioning if checkpoint is enabled, should the model be deterministic after running several times with the same input? I’m using the same input data, coalescing the data in my python script ( df = df.coalesce(…) ), setting sparkConfig as spark.task.cpus’,‘1’ , fixed random seed, setting nthread=1 in XGBoostClassifier, and the new method needDeterministicRepartitioning to TRUE. After all of this, I still get different models when using more than 2 workers. Is it expected? Why? With nworkers <= 2, I got the same models. Please correct me if I understood anything incorrect regarding the method 'needDeterministicRepartitioning' and the algorithm behavior. I asked this question xgboost discussion topic but I believe here would be the right link. Thanks!
@merleyc here is my feedback. I am no expert here so I may be wrong (please correct me if I am wrong)
.coalease
is not deterministic.hist
or approx
?
Checkpoint Cleanup
The current implementation in XGBoost-Spark does not clean up the checkpoint file after a successful training. As a result, the user may get a wrong training process without a careful setup. For instance, the following the process will lead to a wrongly-formed model in the second run.
The second training job will finish “too fast” for two reasons:
The checkpoint built at the 8th iteration in the first was left in the checkpoint directory Without a different checkpoint dir setup, the second run will load the left checkpoint file and only run for 2 iterations
Therefore, we propose to always cleanup the checkpoint directory with a successful training.
this has been merged with https://github.com/dmlc/xgboost/commit/7b5cbcc8468448245a1c2ab698d07cb05be75e94
Deterministic Partitioning
Based on the definition of gradient boosting, we should ensure that the input dataset to a booster is fixed for each iteration. However, this would be hard to achieve in the distributed training with Spark.
The current partitioning mechanism in XGBoost-Spark is to ensure that the input data can be dispatched to the correct number of partitions. The number of partitions is controlled by the user’s configuration on parameter numWorkers.
The design of deterministic partitioning is to guarantee that given a fixed input dataset the partitioning of the strategy is always the same. This goal is interpreted as follows:
The current partitioning strategy in XGBoost-Spark cannot achieve the second part of the goal. The current strategy is based on the repartition API in Spark RDD which is implemented to ensure an even distribution. The execution of the current partitioning is like the following:
Because of the random start, we cannot guarantee the fixed partitioning strategy for each checkpoint interval or each recovery from the failure
We propose the following mechanism to have a deterministic partitioning and achieve load balancing with the best effort
partitionId = math.abs(preventOverflow(row.getHashCode + row.getAs[Float](math.abs(row.getHashCode) % number_non_zero_features)) % numWorkers
Avoid Multiple Jobs for Checkpointing
The current checkpoint is to collect the booster produced at the last iteration of each checkpoint internal to Driver and persist it in HDFS. The major issue with this approach is that it needs to re-perform the data preparation for training if the user didn’t choose to cache the training dataset.
The proposed change is to build the external memory checkpoint in XGBoost4J layer as well so that we can instruct XGBoost4J to save the checkpoint content to HDFS from XGBoost-Spark layer after moving forward with a specified number of iterations.
In the engineering perspective, XGBoost-Spark passes in three variables to XGBoost4J, buildExternalCache, interval and a OutputStream. then in XGBoost4J layer, we simply feed booster to OutputStream in the partition where buildExternalCache is true when it has moved interval iterations
Prediction Cache Building
The key factor leading to the performance drop with checkpoint mechanism is that we lost the prediction cache after the first checkpoint was made so that we need to go through each tree to calculate the residual even we performed the same computation for evaluation in the last round.
There are two potential fixes to address this problem:
Based on the above analysis and the conflicts with
Avoid Multiple Jobs for Checkpointing
in the second approach, we adopt the first approach for pursuing the best practice of design.