Closed pinjutien closed 7 years ago
From the previous comments by @jcrist it appears that graph size is ~ O(grid points cv). In this case we are looking at O(20002), for cv=2.
Each dask-worker's child worker process will get a copy of the graph. But only one copy not a copy per cv fold.
Each worker process gets only the parts of the graph on which it computes.
On Wed, Mar 15, 2017 at 11:47 AM, bansalr notifications@github.com wrote:
From the previous comments by @jcrist https://github.com/jcrist it appears that graph size is ~ O(grid points cv). In this case we are looking at O(20002), for cv=2.
Each dask-worker's child worker process will get a copy of the graph. But only one copy not a copy per cv fold.
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/dask-learn/issues/33#issuecomment-286784752, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszBbHUeLERrpvK_5DYTvG8tzJP8jcks5rmAgtgaJpZM4McGkb .
Apologies for the lack of comment here. I'm working on improving the graph build timings (which is proving doable, but not straightforward).
Yes, the size of the graph is approximately n_splits * n_candidates
(plus some additional work for setup and scoring). This is because you fit an estimator for each split & candidate combination.
I'm not sure what the fail case is here (and haven't had a chance to run it). Theoretically each worker should only have at max n_splits * (X.nbytes + y.nbytes)
data in memory, as well as however much memory each estimator takes up. The estimators shouldn't stick around that long though, as they're immediately scored and then dropped. I should hopefully finish up the performance improvements over the coming days.
@jcrist , thanks for reply! Below is the plot of running time vs. the number of workers (cv = 3),
The reason that the running time increase when number of worker increase from 5 to 7 is perhaps the increasing memory usage.
I also have memory usage snapshot at cv = 3. You can see that every worker uses similar amount of memory.
It looks like: With fixed cv =3, grid search points ~2000, training set ~27mb, the more number of workers generate more memory load. i.e.
1 worker: memory usage ~ 3.6GB 7 workers: memory usage ~ 3.0GB * 7 because every worker uses ~3GB instead of ~3.6GB/7.
For one worker, its memory load ~3.6GB For 7 workers, total memory load ~ 3.0GB x 7 = ~21GB -> This appears to be the main reason for the memory error.
Please let me know if you have any thoughts on this.
Thank You so much!
Apologies, just had a chance to look at this now. This is actually expected, and there's not much that can be done about it. Here's the rundown:
n_splits
chunks, which are then recombined to form the test/train matrices. This effectively duplicates the data in memory n_splits
times, resulting in ~0.8 GB per worker..276 * n_splits * n_workers # GB
memory usage across the clusterfit
, and the memory usage of each estimator. Using a single thread, I was able to hit ~1.2 GB for the DecisionTreeClassifier
per worker (indicating ~.4 GB overhead for fitting and estimator memory usage). More threads -> more fitting happening at once -> more memory usage in each worker. Without knowing how you setup your cluster, I can't say if this led to what you saw, but I suspect you probably had several threads per each worker. Still, I was unable to replicate the 3 GB per worker you saw. Running the provided script, everything completed fine on my laptop with 8 workers each running with a single thread. My memory usage peaked at around 9 GB for all python processes combined.
One option that may lead to slightly lower memory usage is to perform the cv split extraction on every call to fit. This is what scikit-learn does with joblib. This would lead to nbytes(original_data) * (n_threads + 1)
memory overhead per worker. When n_threads < n_splits
, this would result in smaller memory usage than the current way. However, it would have negative consequences when nthreads > n_splits
. It would also perform worse using the threaded scheduler, as we'd be performing the same cv extraction many times. Given that the memory gains here are small at best, I don't think this is worth it.
Raises the question "what is the distribution of values for n_splits?"
The default is 3, but from some issue comments it sounds like they wish it was 5. Not sure what the use tends to be in practice though.
@jcrist Thanks for the detail explanation. I did not notice the processed train data become ~276MB not ~27MB (raw data anymore). The followings are the way I setup dask-worker and dask-scheduler. I am not sure if it leads to several threads for every worker.
dask-scheduler --http-port 8899 --port 8877 --bokeh-port 8866 & dask-worker 173.208.222.74:8877 &
I will try the following to limit the number of threads per worker: dask-worker 173.208.222.74:8877 --nthreads 1 &
Thank you!
By default, dask-worker
will setup a single worker process with the same number of threads as your machine has cores (so yes, your worker processes each had the same number of threads as your machine has cores). Note that if you can also specify the number of processes to dask-worker
. This makes it easy to setup a local cluster with multiple worker processes in a single call:
dask-worker --nthreads 1 --nprocs 8 address
One option that may lead to slightly lower memory usage is to perform the cv split extraction on every call to fit.
I found an easy way to support both options in the code, so I'm planning on just exposing this as a configuration switch. Benchmarking showed memory reductions on the order expected, but also considerable slowdowns (repeatedly extracting non-contiguous subsets of the arrays has a cost).
In #37 I added a way to do choose between caching the splitting operations once on each worker (which is faster) and always splitting each time (which uses less memory if n_threads < n_splits
). The default is to cache. To change, set cache_cv=False
in the init of the Dask*SearchCV
classes.
@jcrist this is awesome work, we will try much more cores/machines, dask-worker's memory is also tight, I guess the estimator copy and fit process is totally in worker side, and released after fit and predict I guess? The parameters passing around and scores are necessary. I will give more updates later.
If the sklearn operation releases the GIL you might consider using one dask worker per machine. That worker process could have multiple threads. This would reduce the need to replicate data.
On Mon, Mar 27, 2017 at 11:13 PM, Zhuo Yin notifications@github.com wrote:
@jcrist https://github.com/jcrist this is awesome work, we will try much more cores/machines, dask-worker's memory is also tight, I guess the estimator copy and fit process is totally in worker side, and released after fit and predict I guess? The parameters passing around and scores are necessary. I will give more updates later. [image: image] https://cloud.githubusercontent.com/assets/678138/24387351/a70dc9e8-1342-11e7-858c-ee30f1d76b81.png
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/dask-learn/issues/33#issuecomment-289650407, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszOFjw_80svwrF1h2zKYLE4dDWCzoks5rqHrpgaJpZM4McGkb .
Hello,
I encounter a memory error when I increase the number of workers. This example uses the code here
Invoked by : python dklearn_grid_search_script.py
When the number of workers is around 7 or 8, then the memory error shows up, such as
To understand this more detail, I would like to know what is the memory load for every worker. When number of worker = 1, I choose different kind of cv
number of workers: 1 number of grid points: 2000 cv = 2 running time: 237s size of graph: 7322
cv = 3 running time: 359s size of graph: 10980 ~ 7322 * 1.5
cv = 4 running time: 478s size of graph: 14638 ~ 7322 * 2
So far so good, as we increase cv time scales linearly. So each dask-worker creates a process that does the work. I did a snapshot of top to capture memory usage of this worker at cv =2 and cv =4. when cv =2, it was ~3.178 GB at peak
when cv =4, it was ~5.2 GB at peak
So, a few questions.
when cv =2, the single worker increases to ~ 3.178 GB. What is the cause of this?
Why does every worker need to complete copy of the graphs as many times as cv? Is it because every worker behave like a process and process is hared to share data with each other?
For example, In this comment, @jcrist pointed out that the size of graph is linear to cv. It means if cv = 2, dask-worker copy two of graphs? In my understanding, cv just decided how many sub-train set i want to divide. And dask-worker apply a graph on every sub-train set?
Please let me know if my understanding is correct. I appreciate any thoughts on this.
Thank you! Pin-Ju