esi-neuroscience / acme

Asynchronous Computing Made ESI
https://esi-acme.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
11 stars 2 forks source link

Jobs 'Killed' with no error message #24

Closed KatharineShapcott closed 3 years ago

KatharineShapcott commented 3 years ago

Hello! My code executes fine when running from an interactive node. But from a SLURM interactive session in the 8GBVIS cue it doesn't work. Here's the output in my terminal when it fails:

[shapcottk@esi-svhpc6 ~]$ /mnt/gs/home/shapcottk/.conda/envs/acme/bin/python /mnt/gs/home/shapcottk/python/filter_net_paper/scripts/filter_net_paper.py 
/mnt/pns/home/shapcottk/python/filter_net_paper/scripts/acme/__init__.py:29: ImportWarning: <ACME> Package is not installed in site-packages nor cloned via git. Please consider obtaining ACME sources from supported channels. 
  warnings.showwarning(msg, ImportWarning, __file__, inspect.currentframe().f_lineno)
<esi_cluster_setup> Requested job-count 758 exceeds waiter threshold 400: waiting for `n_jobs_startup` jobs to come online, then proceed
<esi_cluster_setup> SLURM workers ready: 26/400 [elapsed[elapsed time 00:25 | timeout at 01:00]c
<esi_cluster_setup> SLURM workers ready: 26/400     [elapsed time 01:00 | timeout at 01:00]
<esi_cluster_setup> SLURM jobs could not be started within given time-out interval of 60 seconds
Time remaining: 119  <esi_cluster_setup> Do you want to [k]eep waiting for 60s, [a]bort or [c]ontinue with 26 workers?[Default: 'c'] 
<esi_cluster_setup> Cluster dashboard accessible at http://                .6:8787/status
Calc_nfilters with trainfrac0.02
<ParallelMap> INFO: Attaching to global parallel computing client <Client: 'tcp://                .6:35317' processes=26 threads=26, memory=208.00 GB>
<ParallelMap> INFO: Preparing 758 parallel calls of `comparison_preprocessing` using 758 workers
<ParallelMap> INFO: Log information available at /mnt/hpx/slurm/shapcottk/shapcottk_20210310-093026
100% |\u2588\u2588\u2588\u2588\u2588\| 758/758 [00:33<00:00]
<ParallelMap> INFO: SUCCESS! Finished parallel computation. Results have been saved to /mnt/hpx/home/shapcottk/ACME_20210310-093129-181328
Calc_nfilters with trainfrac0.08
<ParallelMap> INFO: Attaching to global parallel computing client <Client: 'tcp://                 .6:35317' processes=26 threads=26, memory=208.00 GB>
<ParallelMap> INFO: Preparing 758 parallel calls of `comparison_preprocessing` using 758 workers
<ParallelMap> INFO: Log information available at /mnt/hpx/slurm/shapcottk/shapcottk_20210310-093026
  0% |                                                                                                                                    | 0/758 [00:06<?]
Killed

It hangs and then after some minutes I see the 'Killed' message and the jobs end. If I try and do Ctrl+C while it's hanging e.g. #23 the jobs do not actually end. This is repeatedly happening if I try and reuse the same client for a second batch of jobs (as I did in the code above).

I now managed to get the same behaviour using some test code where I'm sending a large array as an input to the function. This causes issues from both SLURM and the interactive node:

import numpy as np
from acme import ParallelMap, esi_cluster_setup
from time import sleep

def f(x, y, z=3, w=np.zeros((3, 1)), **kwargs):
    print('Something from F')
    return (sum(x) + y) * z * w.max()

n_jobs = 200
x = np.random.randint(0,100,(70000,785)) #large array
z = range(n_jobs)
w = np.ones((8, 1))

client = esi_cluster_setup(partition="8GBXS", n_jobs=n_jobs, n_jobs_startup=18)

pmap = ParallelMap(f, x, np.random.rand(n_jobs), z=z, w=w, n_inputs=n_jobs)
with pmap as p:
    p.compute()

Here's the output this code causes:

[shapcottk@esi-svhpc6 ~]$ /mnt/gs/home/shapcottk/.conda/envs/acme/bin/python /mnt/gs/home/shapcottk/python/filter_net_paper/scripts/test_acme.py 
/mnt/pns/home/shapcottk/python/filter_net_paper/scripts/acme/__init__.py:29: ImportWarning: <ACME> Package is not installed in site-packages nor cloned via git. Please consider obtaining ACME sources from supported channels. 
  warnings.showwarning(msg, ImportWarning, __file__, inspect.currentframe().f_lineno)
<esi_cluster_setup> Requested job-count 200 exceeds waiter threshold 18: waiting for `n_jobs_startup` jobs to come online, then proceed
<esi_cluster_setup> SLURM workers ready: 26/None elapsed[elapsed time 00:06 | timeout at 01:00]
<esi_cluster_setup> Cluster dashboard accessible at http://               .6:8787/status
<ParallelMap> INFO: Attaching to global parallel computing client <Client: 'tcp://              .6:41391' processes=26 threads=26, memory=208.00 GB>
<ParallelMap> INFO: Preparing 200 parallel calls of `f` using 200 workers
<ParallelMap> INFO: Log information available at /mnt/hpx/slurm/shapcottk/shapcottk_20210310-105935
  0% |             | 0/200 [00:05<?]
Killed

Here's an example slurm output: slurm-3350545.out.txt

pantaray commented 3 years ago

Hi! Thanks for detailed bug report! This is really strange - running your code locally on my workstation hangs and then crashes the entire session:

<ParallelMap> INFO: Log information available at http://127.0.0.1:8787/info/main/workers.html
  0%  0/200 [00:19<?]Killed
$ /opt/software/conda/envs/acme/lib/python3.8/multiprocessing/resource_tracker.py:216: UserWarning: resource_tracker: There appear to be 24 leaked semaphore objects to clean up at shutdown

So the problem seems to be not SLURM specific. I'll do some digging, what goes wrong here...

KatharineShapcott commented 3 years ago

It's definitely something to do with that "large array", which isn't that big, which is why I was sending it. I wonder if it depends on how many jobs you have running? Maybe it's multiplying the size of the array by the number of jobs in local memory or something? Worse case scenario you could detect the input sizes and tell users to load arrays over size X from disk.

But what's also odd is that for my code at least this wasn't happening a few months ago. It worked fine to send this size of array.

pantaray commented 3 years ago

Hm, this is really strange. The offending part is in compute (in backend.py): firstArg = db.from_sequence(...). This takes forever and eventually crashes if the number of workers is large enough. Initially I thought that maybe enforcing npartitions=ncalls in the creation of the dask bag might be the issue (cf. #6), but letting dask partition the bag on its own does not change anything. I've also tried using generators in the from_sequence call (same result) and started playing around with from_delayed (which behaves very unintuitively for 2d arrays...). What puzzles me the most is that it worked before, as you said. Do you happen to know which version of ACME you used (commit hash)?

So no concrete solution for now, but at least copying input args ncalls times and shoving them into lists is less than optimal. This might be okay for scalars (which were my one and only test case...), but even moderately sized NumPy arrays can overflow the memory of the caller if copied often enough (take a 4MB array and scale it to 5000 workers; suddenly the ACME caller has to deal with a 20GB input...). I will definitely modify this to not have lists hanging around there. The dask-bag situation requires some additional meditating, though...

KatharineShapcott commented 3 years ago

Huh that's really odd... Okay good to know that, I was kinda thinking dask might have done some clever memory sharing thing between its jobs but I guess it doesn't.

My version of acme is old. Did you change something in the cluster that could cause an issue with fewer jobs than before? Because I would have sworn this was working just before I went on holiday at the beginning of February with up to 400 jobs, and now 26 doesn't even work...

ACME: commit 95403e4632fd878e69c6fdd0ff82f0b787765044 (HEAD -> main, origin/main, origin/HEAD) Merge: 1d24c6f 3979789 Author: Stefan Fuertinger pantaray@users.noreply.github.com Date: Tue Dec 29 14:18:14 2020 +0100

My dask stuff is: image

pantaray commented 3 years ago

Hi Katharine! I just pushed a commit that uses a completely revamped argument distribution mechanic: instead of relying on bone-headed copies of variables that are needed by all workers, we now broadcast the respective variable to the entire worker-cluster and create a list of references to the resulting future that gets distributed when calling a user-provided function. I've tested the new mechanic with the minimal (not-)working example you provided (thank you again for cooking this up - that was tremendously helpful!): ACME processes the example both with SLURM and using a LocalCluster without hiccups or memory flooding. If you have a chance, please feel free to test-drive the new version (commit 9791cb7 in the dev branch). If this works for you, I'll merge into master.

pantaray commented 3 years ago

Hi Katharine! Did you have a chance already to test the new version of ACME? I'm also almost done with the emergency pickling patch. As soon as we're sure everything works, I'll merge into main :)

KatharineShapcott commented 3 years ago

Sorry I haven't yet because I rewrote all my code to avoid sending arrays! I'll try to squeeze it in ASAP.

pantaray commented 3 years ago

All good, thank you for testing!

KatharineShapcott commented 3 years ago

Hi Stefan, My jobs crashed on the dev branch with the same code that works when I use it on the main branch. It looks like the first job of a worker returns successfully and then crashes on each job called after that. I'm using the modified code where I load in a different subset of the data each time (e.g. there aren't being sent via slurm) and it doesn't seem to like it.

Are you maybe making some assumptions about the size of the output now?

First error message in slurm log:

distributed.worker - WARNING -  Compute Failed
Function:  func_wrapper
args:      (MFProbabilistic())
kwargs:    {'dataset': 'reuters', 'train_size': 0.7935033498304951, 'outDir': '/mnt/hpx/home/shapcottk/ACME_20210421-094302-881385', 'outFile': 'comparison_classifier_157.h5', 'taskID': 157, 'userFunc': <function comparison_classifier at 0x7feb9731d5e0>}
Exception: ValueError('operands could not be broadcast together with shapes (1296,) (18368,) ')

This is then repeated, as if the first 1296 array was never cleared:

Exception: ValueError('operands could not be broadcast together with shapes (1296,) (12678,) ')
Exception: ValueError('operands could not be broadcast together with shapes (1296,) (6987,) ')
Exception: ValueError('operands could not be broadcast together with shapes (1296,) (10174,) ')

On another worker it looks like this (also repeated):

Exception: ValueError('operands could not be broadcast together with shapes (1979,) (7670,) ')

Acme error:

<esi_cluster_setup> SLURM workers ready: 0/35   [elapsed time 00:00 | timeout at 01:00]<esi_cluster_setup> Requested job-count 50 exceeds `n_jobs_startup`: waiting for 35 jobs to come online, then proceed
<esi_cluster_setup> SLURM workers ready: 50/None    [elapsed time 00:07 | timeout at 01:00]
<ParallelMap> INFO: Attaching to global parallel computing client <Client: 'tcp://              .4:45339' processes=50 threads=50, memory=400.00 GB>
<esi_cluster_setup> Cluster dashboard accessible at http://            .4:8787/status
<ParallelMap> INFO: Preparing 200 parallel calls of `comparison_classifier` using 50 workers
<ParallelMap> INFO: Log information available at /mnt/hpx/slurm/shapcottk/shapcottk_20210421-094254
 25% |██▌       | 50/200 [01:20<04:00]
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
~/python/filter_net_paper/scripts/filter_net_figure7.py in <module>
     11 with ParallelMap(comparison_classifier, clf, dataset=dataset, train_size=train_sizes, 
     12                     n_inputs=n_trys, write_worker_results=write) as pmap:
---> 13     results = pmap.compute()

/mnt/pns/home/shapcottk/python/filter_net_paper/scripts/acme/backend.py in compute(self, debug)
    422 
    423             # Finally, raise an error and get outta here
--> 424             raise RuntimeError(msg)
    425 
    426         # If wanted (not recommended) collect computed results in local memory

RuntimeError: <ACMEdaemon> Parallel computation failed: 150/200 tasks failed or stalled.
Concurrent computing scheduler log below: 

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://                :45339
distributed.scheduler - INFO -   dashboard at:                     :8787
distributed.scheduler - INFO - Register worker <Worker 'tcp://               .22:43819', name: 19, memory: 3, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://             .22:43819
...
KatharineShapcott commented 3 years ago

Another possibility is that the MFProbabilistic() arg persists between calls, that would also cause a crash when used with different input sizes.

pantaray commented 3 years ago

Hi Katharine!

Thanks so much for testing this! I think your hunch is right here: the MFProbabilistic() arg might be the culprit. The updated code does not use dask bags any more for argument distrubtion (to avoid memory overflow) but instead broadcasts (presumably identical) args to the worker cluster for use of all workers (i.e., by reference, no copies are generated). The relevant code is:

https://github.com/esi-neuroscience/acme/blob/9791cb7e06186141830bc4a6696f0dfcd63cd094/acme/backend.py#L318-L330

I assume MFProbabilistic() is called repeatedly to randomly select data? The above snippet would not do that.

KatharineShapcott commented 3 years ago

Hi Stefan, Yes MFProbabilistic() was the problem. It's a classifier that then was being fit multiple times (I need to handle that correctly for the future).

Looks like it's working! I sent a very large array of 60000x784 to both 50 and 189 workers from the 8GBVIS cue and had no problems at all. Thanks so much for fixing that! Best, Katharine

pantaray commented 3 years ago

Hey Katharine! Thank you for testing this! Great to hear the changes work :) I think I'll include a warning in the code: if ParallelMap is launched with a single callable object as input arg to the user-provided function, the callable will only be executed once for all workers. I think there are scenarios where this makes sense (e.g., loading a file with global parameters) but there are other circumstances (like the one you ran into) where this behavior leads to problematic side effects. I don't think there is a "right" or "wrong" way here - unless you have any objections, I'd leave the default as it is (single callable is called once). If it is to be called separately by each worker, it has to be specifically provided like [MFProbabilistic] * n_jobs (although I'm not sure if NumPy would then use the same seed for all workers...).