Open gdubs89 opened 1 month ago
Thank you for sharing your experience. Based on your description, you are trying to find some "best practices" for distributed training. I will try to do some more experiments and come up with something more comprehensive like a blog post. But for now, let's start with some checks:
allreduce
during tree build, if every synchronization requires waiting, the total training time can be extremely slow. I recently observed on a 48GPU cluster that a 5-second iteration can be stalled to above 20 minutes due to imbalanced data. (2TB data, BTW) Imbalanced dataset also causes OOM errors.On your first point, how would you suggest I dig into this a bit more? I've looked at three things:
dask-databricks
, this works no problem)htop
on a web terminal (presumably this only tells me about the driver node) Some screenshots from the dask dashboard (I'm new to this so not entirely sure what I'm looking for)
Databricks cluster dashboard (they're all hovering around 1.8%, which is less than 1/32)
Htop shows generally one CPU near 100% on the driver node, it occasionally bounces between different CPUs, but never more than one being engaged.
Also note from my previous point, when I used a cluster of 8 i3.4xlarge
machines, which should collectively have easily enough memory to handle this data, it crashes. So I have some concerns that a lot/all of the data is being pulled onto the driver/a single worker node, but not sure how to debug this further.
And yes, to your last point, I'm not expecting to be able to scale this infinitely, training on 3 trillion rows of data is clearly a pipe dream. But given I can train on ~100 million rows on a single machine, I'd be very surprised if it weren't feasible to increase by 1, if not 2 orders of magnitude by going distributed.
Maybe starting with observing the CPU/Memory usage across workers, there's a "workers" tag in the dask dashboard. I can't provide a definite answer on why a specific run is slow without running it myself, but in general, it's the data balance issue. On GPU, sometimes I just repartition the data and the problem can be mitigated.
The XGBoost train function in the dashboard task view is actually a lambda function, one for each worker, do you see dask waiting for them to finish?
If you are using the default tree method (hist
), consider using the QuantileDMatrix
instead of the DMatrix
for the training dataset, the former is much more memory efficient.
I'm also working on improving the scaling at the moment and will add some logging facilities to XGBoost to help debug.
OK, I've got the vanilla dask dashboard working in the meantime (previously only had it working when using dask-databricks), so will now provide a detailed comparison of the dashboards in a variety of cases:
Scenario 1: ~50 million rows, 19 columns
First Hardware Setup, I used a "small, fat" setup, i.e. a small fleet of large machines, driver + 3 workers all of type rd5.16xlarge
(total 256 cores, ~2TB memory)
dask-databricks: The whole process of reading in the data, categorizing, creating dmatrices and training for 50 epochs takes about 1 minute, of which ~45 seconds are spent training (and 15 seconds on data prep). When using dask-databricks, I don't get any verbose training output, but based on adding some print statements to the progress, here's a screenshot of the dask dashboard at a point when I'm fairly sure training is going on
so we're getting decent CPU utilisation, but still nowhere near full utilisation (which would be 6400%). Also, this amount of data is using a tiny fraction of the memory, making me think I should with this cluster be easily able to scale up by an order of magnitude, and would only need to scale the cluster a bit to get to two orders of magnitude
vanilla dask:
Pretty similar CPU and memory utilisation, I also get verbose training output. Data prep takes circa 15 seconds, training about 60 seconds.
For benchmark, this dataset also easily fits in memory on a 6d1d.36xlarge
(128 cores, 1 GB RAM). In-memory training for 50 epochs (same process of creating dmatrices and fitting via the learning API) takes about 1 minute.
Scenario 2, ~40 million rows, 19 columns
dask-databricks: Casting to categoricals and categorizing starts to take significant time here (100s). CPU utilisation is very poor
Creating DMatices takes almost 3 minutes, CPU utilisation also poor. We do see all workers engaged and more than one CPU, but nowhere near full CPU engagement
Training starts to get quite squiffy, screenshot doesn't really capture it well. You tend to see at any one point, that one worker might have quite high (20+ cpus) CPU utilisation and another 1 or 2 workers have close to 0%
Memory utilisation is also not very symmetric in that 3 machines are much more engaged than the 4th. Also quite surprising how much more memory this dataset is taking up than the previous one. All of the extra columns are integer rather than dense/float, but perhaps the categorical type is more data hungry.
vanilla-dask: Worker utilisation while categorizing:
Takes about 80s
Then we get into dmatrix creation
Dmatrices take almost 4 minutes to create.
Things get a bit squiffy when we get to training. The CPU utilisation fluctuations a lot, so a screenshot isn't that helpful, but in one I did take here, we see one of the workers close to maxed out (29/32 CPUs fully engaged) but two of them doing almost nothing. So indeed some hypothesis of imbalanced worker utilisation seems fair
taken at another point in training
Training, once it finally started, took about 6 minutes to complete
In-memory training goes off without a hitch, the actual training stage takes 90s. Categorizing+dmatrix creation takes about 40s, obviously the reading from spark into pandas stage takes a bit longer (like 3 minutes)
I also tried using QuantileDmatrices for the larger dataset, just in the dask-databricks setup. QuantileDmatrices took about 4 minutes to create again.
Here's a training screenshot where it does look like 2 machines are being well-utilised and one is doing basically nothing, but here's another where it looks quite poor
If you watch the dashboard for a while, it's a little hard to discern what's going on. Basically fluctuation between fairly high utilisation across the board, to times when there's low utilisation across the board, to times when some are highly utilised and some not. Training took 8 minutes
I should also add, that in the larger dataset case, both on vanilla dask and dask-databricks, the whole thing was pretty shonky and inconsistent. It probably fails around 2/3 of the time for various reasons. When it fails, I just detach and re-attach the spark cluster, start a fresh dask cluster, and just run the code again, nothing else changed. Sometimes it works, sometimes it doesn't. One failure mode (this is easier to ascertain in vanilla dask than dask-databricks as you get more output) seems to be that DMatrix creation causes a worker to be terminated and restarted, which in turn means that training has the wrong IP addresses for the workers so fails. But I've also had more esoteric messages, and one case where nothing went wrong but training losses started to blow up, which is very odd indeed.
Finally, in the vanilla dask setup, I tried scaling this up to the ~600million row dataset (full 49 columns). This didn't even make it part the read-in and partition the data stage, it's been 25 minutes and I'm getting almost no CPU utilisation, so I think I'm going to shut this off
In conclusion, for this task which can still be done pretty comfortably in-memory on a single big EC2 machine, when using Dask I find:
dd.read_parquet(..).repartition()
doesn't seem to work, which obviously isn't an xgboost problem per se admittedly, but perhaps plays into my original post of "still looking for the canonical way to scale xgboost beyond in-memory training"I will also try to do all of the above with differently shaped clusters which use more and smaller workers, will post updates tomorrow.
Same analysis using 12 r4.4xlarge
workers (+ same driver). This gives us about 200 cores and 1.5TB of RAM, but with a much "more distributed" setup
Vanilla Dask
Process takes 5 minutes end to end. CPU utilisation is extremely low, as is memory utilisation (again making it rather implausible that I'm gonna get any OOM error when scaling up to more columns or even scaling up the number of rows by an order of magnitude)
On the larger dataset (i.e. same number of rows but more columns, where all the additional columns are of categorical type), the first time, it failed at the dmatrix creation stage. The second time, it managed to create the dmatrices, but I'm still getting some weird output that suggests it failed to distributed them quite as intended As before, when it gets to training, we see highly heterogenous distribution of CPU utilisation
The training then results in the loss starting to explode before it fails (I've partially redacted that output as it contained some information about the training data)
Dask Databricks Noticeably better performance on the smaller dataset. End to end training the read-in + train process took 35s, and CPU utilisation, while far from full, was much better
When we get to the bigger dataset, again things are just far more variable than with the smaller one and a screenshot doesn't fully capture it, but what is clear is that
However, end to end, the process completed in 4 minutes, and the actual training only took approx 1 of those minutes. The memory utilisation again is so low, that I figured why not try this on a dataset that's 10x larger than this.
As before, when I try this, it seems to just get stuck at data read-in/repartition stage, with almost zero CPU utilisation
So in summary:
This is a combination of dask data balancing issues, dask memory usage and data spilling issues (the read/partition), XGBoost training performance issues, and optimization in Databricks. Let's get some of the easy issues resolved first.
client.persist
and distributed.wait
before DMatrix.DMatrix
than dask as it has this weird design that needs to construct and iterate through n_samples
numpy arrays instead of a few partitions. On the other hand, the data distribution for PySpark is more stable and well-balanced.I would suggest that the first thing that needs to be done is to ensure the data is well-balanced based on these hints from the screenshots:
cc @fjetter @mrocklin for awareness.
Wow OK, I think I've mostly solved the issue of it just "not doing anything" when I try to scale to data sizes which cannot be trained in memory.
The problem was that the raw dataset is ~50billion rows and is spread over ~120k partitions. I am doing a downsampling in pyspark (before starting the dask cluster) and writing a downsampled ~500 million row dataset to disk. Little did I know that spark was retaining the original partitioning and thus writing this 500 million row dataset to disk over 120k partitions, which is A) a lot of partitions and B) extremely fragmented. Either way, it seems this just totally overwhelmed dask.
If I do a repartition in spark before writing to disk, dask can handle it and I now seem to be successfully training on 500 million rows of data, only using ~15% of the RAM on the aforementioned cluster, so I'm gonna scale this up to ~2billion rows and see how we go. So I've successfully moved beyond a data regime which could be trained in-memory 🚀
CPU utilisation is still a bit heterogenous at times, but you do get moments of beauty like this one if you watch the dashboard for long enough👌
Will create some somewhat larger datasets and see how far I can scale this paradigm and report back.
In the meantime, one question around partitioning of the train and eval sets. Is it recommended/necessary for the train and eval sets to have the same number of partitions? I'm a little clear on what's done with each partition under the hood. My concern would be that if you don't have the same number of partitions (i.e. use fewer for the eval set because it's smaller), one or more of the workers might not get an eval set. When I originally had the eval set partitioned with fewer partitions than the train set (in ratio to how big they are), I got a rather difficult to parse error about empty dmatrices.
Excellent progress! No, it's not required to have the same number of partitions. Preferably both of them have partitions for all workers (no worker is being starved for either dataset).
XGBoost takes what's given, it doesn't move data or anything. Internally, it just iterates over partitions for each dataset independently. As long as partitions within each dataset are aligned (comes from the same dataframe,for instance), then it's fine.
So I have managed to scale to 2 billion rows, but this does seem to be the point where it started to struggle. Dask started to complain about the size of the graph
What's interesting, is that the graph seems to be getting larger. The origin of this is probably that I'm doing some sequential training where I increase the learning rate (to avoid the situation where you end up training for 800 rounds before early-stopping, but the last 500 rounds are only giving very marginal improvements). The error in fact links to here . While this isn't what I'm doing, I wonder whether the fact that I do have a for loop of learning rates and within the loop I'm doing model = dxgb.train(..., xgb_model =model['booster'])
is causing some highly nested dask graph to be built, which in turn is probably making the later training rounds less efficient.
Any ideas how to mitigate this? [edit: one idea I had to mitigate this was to write the model to disk and then load it back in, as that might break the graph. And it did, in the sense that the warning now said my graph was only 13Mb compared to 54mb previously (and it kicked in after more loops of the training procedure), but it doesn't seem to have solved the problem, as in I'm still getting a warning about large dask graphs]
Second edit: Interestingly, increasing the max_depth seems to significantly increase the size of the graph in the warning
Any ideas how to mitigate this
My first guess is the booster
object is too large, and dask complains about it. If that's the case, the warning is harmless, and there's no need to work around it. We have to transfer the booster somehow. Splitting it up can disable the warning but creates no benefit. If that's not the cause, please share your code. Or at least a gist of the code and the hyper-parameters you use.
Code is unchanged from original post, other than that I've created a loop to increase the learning rate:
lr_rounds = {0.1: 100, 0.2:100, 0.4:100, 0.8:100_000}#when it gets to the final learning rate, want the number of boosting rounds to be functionally infinite and let early stopping determine how long we train for
for LR in sorted(list(lr_rounds.keys())):
params['learning_rate'] = LR
if model is None:
model = dxgb.train(
client=client,
params=params,
dtrain=dtrain,
num_boost_round=lr_rounds[LR],
early_stopping_rounds=10,
evals=[(dvalid, 'eval')]
)
else:
model = dxgb.train(
client=client,
params=params,
dtrain=dtrain,
num_boost_round=lr_rounds[LR],
early_stopping_rounds=10,
evals=[(dvalid, 'eval')],
xgb_model=model
)
(I added a few more bells and whistles to make sure that if it early stops for one of the learning rates before 0.8, that the next boosting round starts from the optimal model rather than the final one, but I don't think that should affect anything)
Broadly speaking this won't train for more than 400 rounds for the data I have, and I'm exploring maxdepths from ~8-16. So 400 trees of depth 16 are not trivial in terms of memory consumption, but also still a fraction of the data volume being handled on each worker.
I'm happy to accept this as harmless if you don't think this is a problem. I haven't had any more problems with training failing or being erratic.
Thank you for sharing! The code looks fine.
but also still a fraction of the data volume being handled on each worker
Dask doesn't usually send large objects across workers, which can hurt performance due to network constraints. But gathering a single booster for the client process should be fine.
Feel free to close the issue if you have no further questions. ;-)
I'm wondering if we can have a doc for running xgboost with spark/dask on different cloud environments? @trivialfis
small update on this (not sure it's worth re-opening the issue over), I am struggling when the tree maxdepths are increased. The aforementioned warning about graph sizes is exacerbated by having a higher maxdepth, which might be related (potentially the trees themselves taking up more room in memory?)
The problem is, that I would anticipate that the way you're going to get more performance out of scaling training to bigger datasets, is precisely that optimal performance will happen at higher maxdepths, i.e. due to having more data, you can fit more complex models before you overfit. (I'm also seeing this empirically, that the deepest maxdepth I've manage to successfully train with has got the best performance on a test set).
I tried to reconfigure my cluster to have roughly the same amount of overall memory (an amount of memory which comfortable allowed me to train xgboost with a maxdepth of say 8 without being anywhere near the memory limit according to the dask dashboard (roughly around 50% utilisation across the board)) but with fewer, larger workers (in itself not cost ideal, this increased my DBU/hr in databricks by about 20%), but still ran into the error.
Furthermore, it happens deep into training, often after like 3 hours, so it's not really feasible from a time or cost perspective, to experiment with lots of little hyperparameter tweaks (e.g. introduce some gamma-reg so that not all trees bottom out, perhaps up the learning rate a little so we have fewer trees, start training with high depths and then bring down the maxdepth and up the learning rate for later training rounds, etc).
And I've also not been able to get verbose training output to work in dask-databricks (annoyingly it works in vanilla dask, but I found vanilla dask on databricks to be extremely unreliable), but verbose output would help a little bit in terms of being able to understand quickly whether a set of hyperparameters was going to give acceptable performance from an ML metrics perspective and is thus worth pursuing.
XGBoost pyspark in Databricks is quite stable, maybe you can try xgboost pyspark. and it's quite fast if using GPUs.
For the graph size warning, I suggest you to look for issues like whether the partition size is being too small. It will generate a larger operation graph and slower performance. On the other hand, if it's caused by XGBoost booster model, which happens only at the end of the training, then please ignore it.
Thus far, the only critical thing for XGBoost to achieve good distributed training performance is data balancing. It can be mad slow if data size is skewed. The latency caused by waiting accumulates instead of simply bottlenecked by the slowest worker.
The training function and the estimators accept a callback function. You can define your own logger using callbacks. Please find an example in the demo/guide-python
directory. With a callback, you have full control over logging. Maybe writing the results to a remote file server?
@wbo4958 : I believe you're talking about the SparkXGBClassifier
? If I'm not mistaken, it neither accepts spark's sparse feature representation as inputs, nor does it natively support categorical variables, and consequently I don't think it's really suitable for doing large-scale ML with many high-cardinality categoricals.
@trivialfis : I can take another look at the partitioning strategy, but the issue here is that it trains just fine on the data I've prepared, on the cluster I set up (never getting north of ~50% memory utilisation on any worker) until I start pushing up the maxdepths. So this suggests something to do with tree size, but I also find it rather dubious that the issue would be that the ensemble of trees itself is taken up too much memory, because I'm being relatively aggressive about upping the learning rate, so we're never getting to more than say 500 trees, and 500 trees of maxdepth 14...back of the envelope says that's gonna be small fraction of the size of the full training data (not sure whether the full tree is being copied to all workers but even then, I'd be surprised if it would be taking up more than single-digit % of each worker's RAM)
Thanks for the tip on logging, I'll have a look.
So this suggests something to do with tree size
It makes sense. Dask prefers small data transfer, which is unrelated to the total size of the data. It raises a warning if it needs to send large objects or graphs across the network.
issue would be that the ensemble of trees itself is taken up too much memory
It's not about taking up too much memory; You can push the memory usage to its limit without seeing dask warning. It's dask considers sending objects of large sizes across the network or using complex graphs inefficient and warns users to look for potential causes and optimizations. It's a performance warning.
Having said that, I will ping @rjzamora for better insight. I'm not familiar with the dynamics inside dask.
Having said that, I will ping @rjzamora for better insight. I'm not familiar with the dynamics inside dask.
I'm not sure if this is helpful at all, but the UserWarning: Sending large graph ...
you get from dask is entirely related to the size of the graph when it is pickled on the client process and sent over the wire to the scheduler. Although the size of the graph is proportional to the number of tasks (which is proportional to the number of partitions), this warning is rarely cause by the number of tasks being huge (although that can certainly happen).
In practice, the large-graph warning usually means that you are constructing a graph on the client that contains data that it probably shouldn't contain. For example, if each of your tasks will be operating on a distinct partition of Array data, you wouldn't want to pass that data to the workers through the graph. Rather, you would want your tasks to include the necessary logic to read that data from disk when it executes on the worker.
In the case of XGB, it does seem possible that dask is just complaining about the size of the booster when you send it to the cluster. @trivialfis is correct that the model needs to get to the workers somehow. As long as you aren't explicitly passing a copy of the booster in every training task within the graph, then there is probably not a "better" option.
Not sure what is meant by "explicitly passing a copy of the booster in every training task within the graph" ? What could cause this to happen?
The only perhaps non-standard thing I'm doing is dynamically updating the learning rate using code very similar to above. Is it possible that this is causing dask to copy not just the latest version of the booster (containing all the trees) but also previous versions?
When you say dask might be complaining about the size of the booster (I believe it is btw, because I can see that if I up the max depth, the graph size in the warning message increases), is there some reason why a large graph might cause problems long before I physically run out of memory? I was using a cluster of r5d.8xlarge
, i.e. 256 gb of memory on each, and early in training they were at circa 50% memory. Back of the envelope, there's no way the booster itself is gonna start taking up on the order of 100gb, and also the number in the warning message is more of the order of 5gb.
Not sure what is meant by "explicitly passing a copy of the booster in every training task within the graph" ? What could cause this to happen?
I don't think you are doing this. My intention was more-so to explain "who" this warning is intended for. My general impression that the large-graph warning itself shouldn't be a concern to you. With that said, I am concerned if the size of the graph grows with every iteration and you plan on doing this for many iterations.
Is it possible that this is causing dask to copy not just the latest version of the booster (containing all the trees) but also previous versions?
I honestly don't know off hand - Are you planning to run many iterations?
When you say dask might be complaining about the size of the booster (I believe it is btw, because I can see that if I up the max depth, the graph size in the warning message increases), is there some reason why a large graph might cause problems long before I physically run out of memory?
I'm definitely not 100% sure about this, but my impression is that the graph is large-enough to meet Dask's simple heuristic to warn the user, but not actually large enough to cause problems. A large graph can (temporarily) slow down the scheduler if there is a huge number of tasks or it is overwhelmed with serialization/deserialization.
is there some reason why a large graph might cause problems long before I physically run out of memory
My understanding is that dask simply considers the graph size abnormal, which is unrelated to the total amount of memory available in the system.
Is it possible that this is causing dask to copy not just the latest version of the booster (containing all the trees) but also previous versions?
I honestly don't know off hand - Are you planning to run many iterations?
By iterations, do you mean separate training runs with new learning rates? Is so, not really. I think I ran 100 boosting rounds at a learning rate of 0.1, another 100 at a learning rate of 0.2, another 100 at a learning rate of 0.4, and then finally set the learning rate to 0.85 and let it train until it early stops.
My best estimate is that the whole ensemble would come in at south of 1000 trees, and as the maxdepth increases, the optimal stopping point will come forward and we'll end up with fewer trees, but presumably overall more terminal nodes/an booster that consumes more memory.
I could of course just not do this, but in practice I've found that you need a decently low learning rate to get good performance, but low learning rates lead to extremely slow convergence and you need to up the learning rate after a while if you don't want it to train for thousands of rounds.
If we think the iterative training is likely to be the issue, I suppose one mitigation would be to just train for 200 boosting rounds with a learning rate of 0.1, and then set the learning rate to maybe 0.6 and train until we early stop. This would likely end up with more trees in the ensemble overall, and perhaps marginally worse generalisation but nothing too catastrophic. If even if is likely to cause problems and I need to train in one shot until we early stop, I think it could get quite tricky to choose a learning rate that would generalise well but would not take unacceptably long to train.
I guess an option of desperation (the thing I always tell people not to do) would be to just estimate what a good number of boosting rounds is, and do away with early stopping (could then repurpose the eval set for more training data)
The booster is transferred once per training session. If you run train(client, {"learning_rate": 0.1}, ..., num_boost_rounds=200)
, the booster is transferred at the end of the train
function. Meaning, there's only one transfer in this function. If you try 3 different learning rates with three different calls to train
, then there are three transfers of the booster object.
My suggestion is still to just ignore the graph size warning. There's no effective solution to it yet and it doesn't affect your model training.
Yeah sorry my bad, I looked back through the messages and I didn't make this clear when I said "I'm struggling". The issue is not just the graph warning, training is actually failing when I increase the maxdepth. It's also taking a long time before it fails (<1 hour) to quite difficult to iterate. So with the same train/eval set, on a given cluster configuration, I'm able to train with maxdepth<=10, but it fails for maxdepth=12.
Annoyingly I didn't make a note of the traceback at the time, but I convinced myself it seemed memory related. I can burn some more compute hours and reproduce if useful.
max_depth=12
sounds fine, the memory usage of really deep models is dominated by the histogram during training. For a dataset with 50 features (yours), and with max_bin=256
(default), the gradient histogram is 16 * 50 * 256 * (2^13 - 1)
bytes (f64 grad + f64 hess = 16 bytes), which is roughly 1.56 GB, not a lot. I don't know what's causing the failure. It might be true that the cluster is already on the brink of OOM error and the extra depth is the last straw. But some more investigation can be helpful.
seeing as it took a few hours before it failed, I wasn't watching the dask dashboard at the time of failure, but ~20 minutes into training, all workers were at circa 50% memory usage, which is why I was saying previously that it seemed incredibly unlikely that a large learner size was pushing it over the edge.
I'll re-run and post a detailed error message
hmmm OK, upon re-running it, I've been keeping an eye on the dask dashboard, checking in every 20 minutes or so.
For the first ~2 hours (which roughly was the first two sets of 100 boosting rounds at LR=0.1 and then LR=0.2), memory utilisation across the cluster ranged from 30-60%, but it was very stable (as in there, was one machine with 60% utilisation but I checked every 10-20 minutes for 2 hours and the number barely moved)
But then during the 3rd set of 100 boosting rounds at LR=0.4, I get an OOM error. Now I wasn't watching the dask dashboard constantly, so I can't confirm that the worker with the highest memory load never exceeded 60%, but it seems improbable or at least unclear why this would have happened, given it was so stable, hovering around 60% for a few hours/many training iterations.
Here's the traceback
(bottom few lines are the output I print to screen every time we start/finish a training run)
Edit: This is technically from a different run, I decreased the MaxDepth by 1, but I'm pretty sure the same happened last time, i.e. currently it hasn't technically failed yet, but the dask dashboard is showing that nothing is happening, and there's been a tonne of spill but I believe it will eventually fail with an OOM error, but that takes a long time to happen, it happens long after the dask dashboard is able to ascertain that something has gone wrong
Hmm, thank you for sharing. I see a severe imbalance there, with a few workers having significantly less data than others.
Yeah, let me try and get a screengrab of what is looks like early on in training when things are healthy
We will try to work on automated data balancing. Without it, it isn't easy to control the memory usage.
so this is like 2 hours into training with a lower max depth, so it does seem to be being caused/exacerbated by upping the maxdepth 🤔
@gdubs89 Could you please share the following information:
Please share the version of the above information closest to the version that causes issues. I will try to reproduce and profile next week.
Sure:
I'm using a cluster of 16 r5d.8xlarge
machines (+ same worker) so ~4TB memory are 512 cores
My training params are set by:
params = {
"objective": "binary:logistic",
"max_depth": max_depth, #this is where the problem arises, whether I set this to 8 or 14
'monotone_constraints': {'continuous_feature_x': 1, 'continuous_feature_y': 1},
'eval_metric':'logloss',
'tree_method':'hist'
}#so I'm mostly using default hyperparameter values
As per above, I start with a low learning rate and then turn it up, my learning rate to boosting rounds dictionary is {0.1: 100, 0.2: 100, 0.4: 100, 0.8: 20_000}
When it comes to the data:
int
and float
. Not entirely sure what happens under the hood in dask when I read them in. And just to reiterate, I'm not explicitly using a sparse encoding for the categoricals, but I think this is what happens when I cast columns as categorical types in dask and then convert to dmatrix with enable_categorical=True
? And I'm running databricks runtime 15.4 LTS, dask_databricks==0.3.2 and xgboost==2.1.1
An extra datapoint to add to this, I did manage to train a tree with maxdepth=14, when I just did it in one training call with early stopping, rather than sequentially training and turning up the learning rate. Unfortunately, even with a relatively high learning rate of 0.3, this took ~8 hours until it early-stopped (as opposed to more like 3-4 hours at maxdepth=12 with iteratively increasing learning rates).
So while this training in steps is causing some issues, I would regard this as more than just a nice to have (especially if I wanted to increase the training data size/cluster by an order of magnitude)
Thank you for the detailed information. I'm working on the dask interface now. This PR should help with the issue of retrieving evaluation logs from databricks: https://github.com/dmlc/xgboost/pull/10942 .
Will look into memory usage.
I'm currently training a binary classifier using a tiny sample of a dataset. The dataset is of size approx 50bn rows per day, and we persist the data for ~60 days, so in theory I could be training this data on up to ~3TN rows of data. Of course that's probably a little excessive, but currently I'm training on a 0.1% sample of a day's data, i.e. approx 50 million rows.
I do this by doing
df = spark.read.parquet('s3://bucketname/data.pq').sample(fraction=0.001).toPandas()
I can play with this fraction a little bit, I've pushed it as far as 100 million rows and might be able to push it a bit further, but fundamentally the approach of pulling everything into a massive driver node and training in memory is not scalable and it's never going to allow me to train on 1 billion rows, or 10 billion rows, or more.
To that end, I've been looking for the canonical way to scale xgboost, i.e. do distributed training on databricks. I'm open to doing GPU training but my strong suspicion is that I'm far more memory-limited than compute limited (when training on 50million rows on a single EC2 machine, once the data has been read in and converted to dmatrices, the actual training is a breeze, takes 10-15 minutes), so my instinct is to try distributed CPU training.
Also, I'm using the following bells & whistles which I'll need any distributed training to support
For the sake of benchmarking, I've prepared the following 4 datasets:
I first tried to do this using xgboost-dask. This is the solution I landed on:
This "worked" when I used dataset 3 described above, but failed when I used dataset 2. I.e. 50 million rows and about ~20 columns worked but 50 million rows and ~50 columns was too much. I was also a little suspicious that dask wasn't utilising the worker nodes. I can't connect to the dask dashboard, I think it's something I'd need to talk to our databricks admin about (I tried to SSH into the driver but my connection timed out, to my best understanding, we'd need to unblock some port), but the databricks cluster dashboard only ever showed the driver node being engaged (in retrospect, it could also possibly have been just one worker being engaged, if this is deemed relevant I can re-run and check). Note that when I do
print(client)
, it's telling me I have 128 threads (8*16, i.e. the number of worker cores) and ~500gb of RAM, but they don't seem to be being engaged by the training process.If only one machine is being engaged, each of these machines has significantly less memory than the machine I used to train on the 50 million row dataset in memory, so it's not entirely surprising that this fell over at the point where it did. I tested this by firing up a "wonky" cluster, comprised of two
rd5.16xlarge
workers and a driver of the same type. This worked, but again only one machine was being engaged, so we've not gained anything over just training on a single large machine.So my suspicion here is that raw dask doesn't play very well with databricks/spark, so instead I decided to try
dask-databricks
. So basically in the above code, replacewith
Same deal, when I
print(client)
, I see the number of threads/amount of memory I expect. However when running on a cluster of 8i3.4xlarge
workers, I have the same scaling issues as previously, I can run on the 50 milliow row dataset with ~20 columns but when I try on the set with ~50 columns, it falls over.I'm now running a cluster of 12
r5d.8xlarge
machines (I should have usedr5d.16xlarge
like I did before for reproducibility), and the training run for the 50million dataset with 50 columns hasn't technically crashed, but it's been running for 50 minutes now (which, given how big this cluster is compared to the single machine I can train this in memory in in ~10-15 minutes, is bad). When using dask-databricks, I can access the dask dashboard, and while I'm not expert on how to read this, it looks like all CPUs are being used, but only like 1.5/32 cores are being used per worker. This is in line with what the databricks cluster's dashboard is telling me.I also get a warning
which I don't fully know what to do with.
The cluster I'm currently using has at least 3x more RAM and 4x more cores than the largest single EC2 machine, the one that I've been using to train on 50million rows/50 columns (and that I've shown can be pushed a little bit further, at least to 100million rows, maybe to 150m, probably not as far as 200m), and also I would have hoped that when doing distributed training in dask, you'd get much more memory efficient handling of the data than when pulling the data into pandas. And yet I'm not even getting close to being able to replicate the performance I get with a single EC2 instance, which does not seem to bode well for scaling up to 500 million rows and beyond.
Help either with this, or other ways to scale XGBoost beyond in-memory training would be greatly appreciated. I was hoping there would be an accepted way to do distributed xgboost training but alas, it doesn't seem that there is an accepted wisdom on how to do this.
Other notes: