cooperative-computing-lab / cctools

The Cooperative Computing Tools (cctools) enable large scale distributed computations to harness hundreds to thousands of machines from clusters, clouds, and grids.
http://ccl.cse.nd.edu
Other
131 stars 111 forks source link

Vine: Prune Files in Dask Graph during Evaluation #3851

Open cmoore24-24 opened 1 month ago

cmoore24-24 commented 1 month ago

Issue: I am currently running into an issue when trying to run a process with Task/Dask Vine, wherein the process inevitably fails when the input dataset is too large. The symptoms are:

Use Case: Using Dask Vine to perform large calculations on QCD sample root files, and output the results into parquet files. The total size of the dataset(s) that fail is ~170 GB. I have independently worked around this issue by manually cutting the datasets into batches; doing one-quarter of the dataset at a time has been a successful workaround thus far.

Manager Options:

I will work on recreating this scenario on my end so I can provide the vine-run-info logs.

dthain commented 1 month ago

@BarrySlyDelgado I think you mentioned that the distributed storage plot may come in handy here, is that present in our repo so that Connor can try it out?

BarrySlyDelgado commented 1 month ago

Once the logs are generated we should see what tasks are failing and what workers are failing (If any). I currently suspect that the the accumulation of intermediate results may cause workers to fail. This is somewhat of a knock on affect of TaskVine scheduling tasks to workers that have the most bytes of data dependencies without checking if data transfers will cause that worker to exceed its allocation. The tool that generates the distributed storage plot is Its not currently present but I'll add it.

cmoore24-24 commented 1 month ago

Apologies for taking so long to get logs-- for the past few days, I haven't been able to get more than ~20 workers at a time, probably a combination my bad priority and heavy use by other users. I will keep trying to make a faithful recreation of the issue for the purposes of logs.

dthain commented 1 month ago

Hmm, does the crash occur when using the large dataset with a small number of workers? (Our first rough hypothesis would suggest that a crash is more likely with fewer workers...)

cmoore24-24 commented 1 month ago

I haven't let it go long enough to get to a crash, but I can tell you the behavior where essentially countless recovery tasks are produced occurs almost immediately.

cmoore24-24 commented 1 month ago

Hi @BarrySlyDelgado, it looks like I'm going to have trouble getting a faithful recreation; even backing off on the memory and disk I request, I can't get any more than 70 workers. That said, I did let a run go to completion after discussing with Doug above. There were quite a few task failures, and many recovery tasks, but this run actually managed to complete, which greatly shocked me. I've never been able to get this program to run all the way through when I was requesting 150+ workers-- could there be such a thing as too many workers?

Either way, I've attached the logs from this run here in case they would be helpful. logs.zip

BarrySlyDelgado commented 1 month ago

At the time of writing, the number there seems to be a limited amount of machines in the condor pool which would suggest why you were not getting your requested amount of workers. I'll take a look at the logs to see what I can get from them. Also, do you have the code which writes the parquet files?

cmoore24-24 commented 1 month ago

Sure, here's the code that does the skimming: https://github.com/cmoore24-24/hgg/blob/main/notebooks/skims/skimmer/skimmer.py

BarrySlyDelgado commented 1 month ago

I have some potential insights that will probably need to be addressed further. I've plotted the average task runtime left-y (seconds) and the number of tasks of a specific category right-y: Screenshot from 2024-06-04 14-57-40 There are roughly 500 tasks within the workflow that take roughly an hour on average to complete. It may be possible that some workers are getting evicted in that time period, possibly causing some of the retires. For the most part, each worker has sufficient cache space for the data that its handling, and we see relatively a small amount of worker stoppages during workflow execution (of those that have been scheduled tasks) from this graph: Screenshot from 2024-06-04 15-30-13 Here, each + represents when a task is scheduled to a worker, where the color represents the category. What is strange to see is that most of the purple tasks accumulate on a single worker raising its cache utilization near it's limit. If this is a similar occurrence on other runs its possible that this could be a point of failure.

To go forward, I think we should investigate what the long running red tasks are and why purple tasks seem to be less distributed across workers.

FYI @dthain

dthain commented 1 month ago

Very interesting, the plots really help here!

It would not be hard to imagine a case where the scheduler prefers to run jobs on the nodes that already have the most data, and thus prefers the same node over and over again until it is full.

The scheduler certainly has access to quite a lot of information about storage utilization, but is not currently making use of it. We could do a better job there.

dthain commented 3 weeks ago

(Giving Barry a break on this one while he is away for the summer.)

@JinZhou5042 is going to start looking into this matter. The key issue as we understand it is that the DaskVine executor is not cleaning up intermediate files as they are consumed. We need two things to make this work: 1 - The TaskVine API needs a way to indicate that a file's data is no longer needed, without forgetting the logical existing of the file. This looks something like m.prune_file(f) to delete all of the replicas. However, the manager will still be capable of recreating/replicating the file if failures make it needed. 2 - The DaskVine executor then needs to invoke prune_file to clean up replicas onec they are fully consumed by all known tasks. It should only undeclare_file at the very end when nothing more is needed.

dthain commented 2 weeks ago

@JinZhou5042 please set up a time to talk with @btovar to discuss DaskVine and make sure you understand what is going on there. Then, you can put in m.prune_file(f) where appropriate.