nipy / nipype

Workflows and interfaces for neuroimaging packages
https://nipype.readthedocs.org/en/latest/
Other
745 stars 529 forks source link

Idea to minimize memory fingerprint #2776

Open oesteban opened 5 years ago

oesteban commented 5 years ago

Summary

Large workflows are prone to get killed by the OOM Killer in linux and similar devices in other systems. The common denominator is that the RSS memory fingerprint is sensible, not too big, but the VM skyrockets when the workflow has many nodes.

Actual behavior

Except for pure python interfaces, all nipype interfaces fork new processes via subprocess.Popen. That way, all new processes are started using fork and allocate for twice as much virtual memory as it is allocated before forking. This, in addition with python inefficiency to garbage collect after finishing the process, leads to overcommitting memory (and processes being killed on systems that would not allow that).

Expected behavior

Less memory consumption

How to replicate the behavior

Run fmriprep on a large dataset, disallowing overcommitting and with a low memory limit (e.g. 8GB)

Idea:

Patch subprocess.Popen with multiprocessing.context.Popen. Doing that, in theory, all these processes should be forked on the server process (which should have a constant, small memory fingerprint).

WDYT @satra @effigies @chrisfilo ?

BTW, this solution might be only possible with Python>=3.4. As per https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods:

Changed in version 3.4: spawn added on all unix platforms, and forkserver added for some unix platforms. Child processes no longer inherit all of the parents inheritable handles on Windows.

chrisgorgo commented 5 years ago

Sounds reasonable. By "Patch" you mean replace?

On Tue, Nov 13, 2018 at 10:42 AM Oscar Esteban notifications@github.com wrote:

Summary

Large workflows are prone to get killed by the OOM Killer in linux and similar devices in other systems. The common denominator is that the RSS memory fingerprint is sensible, not too big, but the VM skyrockets when the workflow has many nodes. Actual behavior

Except for pure python interfaces, all nipype interfaces fork new processes via subprocess.Popen. That way, all new processes are started using fork and allocate for twice as much virtual memory as it is allocated before forking. This, in addition with python inefficiency to garbage collect after finishing the process, leads to overcommitting memory (and processes being killed on systems that would not allow that). Expected behavior

Less memory consumption How to replicate the behavior

Run fmriprep on a large dataset, disallowing overcommitting and with a low memory limit (e.g. 8GB) Idea:

Patch subprocess.Popen with multiprocessing.context.Popen. Doing that, in theory, all these processes should be forked on the server process (which should have a constant, small memory fingerprint).

WDYT @satra https://github.com/satra @effigies https://github.com/effigies @chrisfilo https://github.com/chrisfilo ?

BTW, this solution might be only possible with Python>=3.4. As per https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods :

Changed in version 3.4: spawn added on all unix platforms, and forkserver added for some unix platforms. Child processes no longer inherit all of the parents inheritable handles on Windows.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/nipy/nipype/issues/2776, or mute the thread https://github.com/notifications/unsubscribe-auth/AAOkp9S8tpPqYViC3P9VhnlIZXQw_j4lks5uuxJ6gaJpZM4YcRYq .

effigies commented 5 years ago

Seems worth trying.

satra commented 5 years ago

let's try it out.

are we the only ones struggling with memory? how does dask handle this?

oesteban commented 5 years ago

@chrisfilo: By "Patch" you mean replace? @effigies: Seems worth trying. @satra let's try it out.

Yes, although I'm checking and such replacement is not as easy as I first thought. The interface of the Popen in multiprocessing is not the same as that in subprocess.

@satra are we the only ones struggling with memory? how does dask handle this? I think this is heavily related to this: https://bugs.python.org/issue8713#msg105719, which gave rise to adding the forkserver mode in multiprocessing.

I think there is one big difference w.r.t. dask: they don't ubiquitously use subprocess.Popen (we do with every CommandLineInterface) which uses this fork+exec mode of linux. Since they just run pickleable functions that are sent to the workers, the sub-processes already exist and they are only re-initiated if maxtasksperchild is reached. If we were able to run fmriprep with dask I'd say we would see the same issues.

My impression is that this issue only happens with linux.

oesteban commented 5 years ago

related: https://bugs.python.org/issue20104#msg222570

EDIT: this post in the same thread may also apply https://bugs.python.org/issue20104#msg289758

oesteban commented 5 years ago

This would be an alternative (python 3.8, not sure how easy to backport) https://docs.python.org/3.8/library/os.html#os.posix_spawn

effigies commented 5 years ago

There is also the 'spawn' multiprocessing context. Maybe the context could become a plugin argument, and thus heavy pipelines like fMRIPrep could use that without significantly changing the performance of existing pipelines.

oesteban commented 5 years ago

Yep, the problem is that subprocess.Popen is totally independent of multiprocessing.

I would not expect a great difference between forkserver and spawn if we managed to create such patch. Although, the more I read, the clearer I see we really need spawn here instead of fork.

oesteban commented 5 years ago

This might be too big to try. Just in case I wrote this bpo https://bugs.python.org/issue35238?@ok_message=msg%20329868%20created%0Aissue%2035238%20created&@template=item#msg329868 Let's see the reactions.

oesteban commented 5 years ago

I have a functional implementation of nipype running commands through posix_spawn.

Patching it into a custom fmriprep docker image**, I've gotten the following virtual memory profile: nipype-fmriprep-spawn

Using the same docker image without patching nipype (i.e., using fork_exec): nipype-fmriprep-fork

** Docker image was built with latest fmriprep's master, with the addition of memory_profiler. The tool was modified to measure VMS instead of RMS.

Conclusion: I'm either measuring wrongly*** or changing to spawn does not have any effect. Another possibility is that ds000005/sub-01 is not a big enough dataset so as to trigger memory problems.

*** It could be because my patch to memory_profiler is faulty or because memory_profiler is just not the right tool to measure this. Or that sampling at 0.01s rate is not fast enough to capture the memory duplication of fork and thus allocated memory gets freed before you can even measure.

Extra thought: I'll dig deeper on a peak of 200GB that happens soon after starting fmriprep, as seen on the same profiles without zooming in:

Spawn: nipype-fmriprep-spawn-zoomout

Fork: nipype-fmriprep-fork-zoomout

oesteban commented 5 years ago

Apparently, switching to Python 3.7 (incl. #2778, thus with forkserver mode process pool) seems to reduce memory usage by 1~2GiB in this test case:

nipype-mem-fork-py37

UPDATE: corresponding plot using posix_spawn: nipype-mem-spawn-37

effigies commented 5 years ago

So spawn appears to do very little. It looks like it's the fork of the main process that's killing us, not the secondary Popen fork. Correct?

And spawn looks noticeably slower. Note that the transitions that happen around 2000s and 4000s with Popen occur around 2200s and 4800s with spawn.

oesteban commented 5 years ago

Correct

EDIT: Correct iff we are are not missing out memory allocations of fork_exec because they happen very fast (way below the sampling frequency of 0.01s) or because they don't add to the VMS.

effigies commented 5 years ago

Ah, good point. Maybe we can test by allocating ~1/4 of available memory in the base process and setting the overcommit policy to disallow?

oesteban commented 5 years ago

I'll try that tomorrow.

oesteban commented 5 years ago

Okay, since I wasn't convinced by memory_profiler measurements, I managed to run fmriprep with valgrind-massif with the following command line: valgrind --tool=massif --massif-out-file=massif.out.%p --pages-as-heap=yes --suppressions=/usr/local/miniconda/share/valgrind-python.supp /usr/local/miniconda/bin/fmriprep - more details here.

This is what happens with vanilla nipype (using fork):

fmriprep-valgrind-fork

And this with spawn:

fmriprep-valgrind-spawn

By using the option --pages-as-heap=yes I hoped to capture also virtual memory allocations. However it is shocking that the graph is giving 10 times less memory usage than memory_profiler. As valgrind is pretty mature, I'm guessing that there is something off with memory_profiler or its plotting function. Other than that, my impression is that the only difference is the density of sampling (faster for memory_profiler).

My next step will be running fmriprep on TACC with Russ' parameters and valgrind, since we know that will crash fmriprep due to memory. WDYT?

I can also play with when valgrind snapshots. There is one mode where a snapshot is triggered by memory allocations and frees. That could be a better way of measuring.

chrisgorgo commented 5 years ago

Testing on TACC might lead to long turnaround time - last time my job spend 25h in the queue.

On Tue, Nov 20, 2018, 8:10 AM Oscar Esteban <notifications@github.com wrote:

Okay, since I wasn't convinced by memory_profiler measurements, I managed to run fmriprep with valgrind-massif with the following command line: valgrind --tool=massif --massif-out-file=massif.out.%p --pages-as-heap=yes --suppressions=/usr/local/miniconda/share/valgrind-python.supp /usr/local/miniconda/bin/fmriprep - more details here https://github.com/poldracklab/fmriprep/compare/master...oesteban:ehn/include-profiler .

This is what happens with vanilla nipype (using fork):

[image: fmriprep-valgrind-fork] https://user-images.githubusercontent.com/598470/48785854-6d706000-ec9a-11e8-8744-374b601cca77.png

And this with spawn:

[image: fmriprep-valgrind-spawn] https://user-images.githubusercontent.com/598470/48785873-76f9c800-ec9a-11e8-99ba-b341c38779b9.png

By using the option --pages-as-heap=yes I hoped to capture also virtual memory allocations. However it is shocking that the graph is giving 10 times less memory usage than memory_profiler. As valgrind is pretty mature, I'm guessing that there is something off with memory_profiler or its plotting function. Other than that, my impression is that the only difference is the density of sampling (faster for memory_profiler).

My next step will be running fmriprep on TACC with Russ' parameters and valgrind, since we know that will crash fmriprep due to memory. WDYT?

I can also play with when valgrind snapshots. There is one mode where a snapshot is triggered by memory allocations and frees. That could be a better way of measuring.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/nipy/nipype/issues/2776#issuecomment-440329457, or mute the thread https://github.com/notifications/unsubscribe-auth/AAOkp7LCcgvV5_nc-M-HBpvJYhqrve_1ks5uxClqgaJpZM4YcRYq .

oesteban commented 5 years ago

Okay, I'll test on My Connectome locally then. Using memory_profiler I can Ctrl^C anytime and I will still have memory traces.

The hypothesis is: it seems we are allocating a lot of memory just in the beginning and then keeping it throughout fMRIPrep. Let's see what is the memory slope for My Connectome, and then suppose that it enters the plateau when execution reaches the t1_2_mni node.

satra commented 5 years ago

if you know a consistent crash pattern @mgxd could test on our slurm cluster with a memory limit on the process as well.

oesteban commented 5 years ago

@satra I'll try to think of something we can hand over to @mgxd to replicate, but I don't want to waste anyone else's time.

One quick hack we can try is creating the MultiProc runner object at the very very beginning of the execution (so the workers have a minimal memory fingerprint) and pass it on to the workflow.run method. That should work, shouldn't it?

satra commented 5 years ago

that would work. we do create it pretty soon after expanding iterables.

oesteban commented 5 years ago

Okay, so I'm trying to simplify and I'm starting to see interesting things. First, I stopped running these tests within docker.

I wanted to see how the forkserver is behaving. To minimize the size of workers, I initialize a MultiProcPlugin instance in the beginning of fmriprep, right after parsing the inputs (the only customization of nipype was #2786) and Python 3.7.1. I made two tests:

MultiProc, 8 processors, 8 omp-nthreads: multiproc_nocontainer

LegacyMultiProc, 8 processors, 8 omp-nthreads: legacymultiproc_nocontainer

Conclusions:

oesteban commented 5 years ago

One thing I've been chatting with @effigies:

Python 3.7.1 | packaged by conda-forge | (default, Nov 13 2018, 18:15:35) 
[GCC 4.8.2 20140120 (Red Hat 4.8.2-15)] :: Anaconda custom (64-bit) on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import psutil
>>> p = psutil.Process()
>>> p.memory_info().vms / (1024**2)
71.33984375
>>> from multiprocessing import set_start_method
>>> set_start_method('forkserver')
>>> p.memory_info().vms / (1024**2)
75.765625
>>> from nipype.pipeline.plugins import legacymultiproc as lmpc
>>> plugin = lmpc.LegacyMultiProcPlugin(plugin_args={'nprocs': 8})
>>> p.memory_info().vms / (1024**2)
1066.44140625

So basically, importing nipype takes 850 MB and starting a LegacyMultiProcPlugin adds some 150MB in addition.

From a different point of view:

>>> import sys
>>> a = set(sys.modules)
>>> import nipype
>>> b = set(sys.modules)
>>> len(b - a)
1628

Nipype might be importing too many things.

effigies commented 5 years ago

Posted this on the slack, but might as well do it here, too. Here's a more thorough profiling script and the results: https://gist.github.com/effigies/183898aa6ef047eb8ef86f7e63f5528a

chrisgorgo commented 5 years ago

Is it worth focusing on constants when the memory issue seems to scale with the size of the workflow?

On Tue, Nov 27, 2018 at 6:04 PM Chris Markiewicz notifications@github.com wrote:

Posted this on the slack, but might as well do it here, too. Here's a more thorough profiling script and the results: https://gist.github.com/effigies/183898aa6ef047eb8ef86f7e63f5528a

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/nipy/nipype/issues/2776#issuecomment-442290552, or mute the thread https://github.com/notifications/unsubscribe-auth/AAOkp6dXaahXDZ2LLV_9lap_2PE-ep8xks5uze8jgaJpZM4YcRYq .

effigies commented 5 years ago

If we're running with a forkserver, the base memory footprint needed to run each node is going to make a difference. We're currently looking at 850MB for a bare interpreter + nipype, which is what each worker needs to load in order to un-pickle the interface and run it, so 8-core machines will use 6.8GB at rest, apart from the workflow.

Of course, this only makes sense with a forkserver; if we're forking from a process that contains the workflow, we're always going to get destroyed by a big node count.

Though yes, we should try to reduce node size. Oscar's pushing from that end.

oesteban commented 5 years ago

Both are related. The offset of 850MB of the nipype import is pretty huge. Worse so for the main thread (where you start the forkserver), where the offset of nipype + pool is 1GB.

ds000005 is not a particularly large dataset, it adds some 700MB of workflow and node objects on top. So, the real footprint is Chris' baseline of 6.8GB on workers plus 1.7GB of a normal dataset.

For My Connectome I'm sure that the workflow overhead is much more than 700MB.

On the imports side. Just doing this: https://github.com/nipy/nipype/compare/master...oesteban:enh/minimize-node-memory-2 I've gotten to the following in the imports front:

>>> import psutil
>>> p = psutil.Process()
>>> p.memory_info().vms / (1024**2)
71.34765625
>>> import nipype
>>> p.memory_info().vms / (1024**2)
455.28515625
>>> from nipype.interfaces import base
>>> p.memory_info().vms / (1024**2)
781.16015625

On the Node, Workflow and Interface instance front I've managed to reduce a lot the size of individual objects, and pickling interfaces (which must be pickled anyways) and keeping track of the pickled objects on hard disk we could get a much lighter workflow object on which we call run() minimizing that offset from 1.7GB to something close to the baseline 1 GB.

However, I'm running into a problem: the config object is copied entirely into all Nodes and Workflows. For Workflows that accounts for most of the object size (discounting Nodes) and you usually have maybe hundreds of Workflows at most. But for Nodes it is a problem because this copy precludes using __slots__ efficiently (as in in #2805 for the runtime object). Right now I'm weighting whether it is worth to start transitioning the config to something like django.conf.settings. As it is a module attribute, it is singleton, preloaded in VM the first time you import and much more lightweight than keeping the whole configparser object (and most obviously copying it over and over again).

Chris and I have discussed that nodes could have a proxy for the input traits, which are the heaviest part of Interface objects. I can see clear links to the flat file system representation of hashes.

oesteban commented 5 years ago

One more picture. I've run fmriprep with the following snippet inserted at points of initialization:

    print('Memory usage <where> - '
          'RSS %.1f MB / VM %.f MB' % (mem.rss / (1024**2), mem.vms / (1024**2)))
    print('Object usage:')
    objgraph.show_most_common_types()

where

    thisproc = psutil.Process()
    mem = thisproc.memory_info()

This is the output:

Memory usage at entering main - RSS 136.7 MB / VM 870 MB
Object usage:
function                   37098
dict                       17940
tuple                      16195
weakref                    6844
list                       6077
cell                       5701
getset_descriptor          5137
builtin_function_or_method 4619
type                       4501
wrapper_descriptor         3005

Memory usage after parsing arguments - RSS 137.8 MB / VM 872 MB
Object usage:
function                   37099
dict                       18016
tuple                      15821
weakref                    6844
list                       6168
cell                       5701
getset_descriptor          5137
builtin_function_or_method 4619
type                       4501
wrapper_descriptor         3005

Memory usage before plugin initialization - RSS 138.8 MB / VM 872 MB
Object usage:
function                   37290
dict                       18042
tuple                      15969
weakref                    6937
list                       6100
cell                       5711
getset_descriptor          5160
builtin_function_or_method 4684
type                       4527
wrapper_descriptor         3005

Memory usage after plugin initialization - RSS 139.2 MB / VM 1064 MB
Object usage:
function                   37378
dict                       18134
tuple                      16159
weakref                    6977
list                       6122
cell                       5712
getset_descriptor          5174
builtin_function_or_method 4699
type                       4540
wrapper_descriptor         3005

Memory usage before sending workflow initialization to worker - RSS 139.2 MB / VM 1064 MB
Object usage:
function                   37378
dict                       18134
tuple                      15962
weakref                    6977
list                       6122
cell                       5712
getset_descriptor          5174
builtin_function_or_method 4699
type                       4540
wrapper_descriptor         3005

181128-09:34:55,706 nipype.workflow IMPORTANT:

    Running fMRIPREP version 1.2.3+5.gae4c45ce.dirty:
      * BIDS dataset path: /oak/stanford/groups/russpold/data/openfmri/ds000005.
      * Participant list: ['01'].
      * Run identifier: 20181128-093455_50ee435e-a833-429c-9ee9-17cce60809a6.

181128-09:36:03,35 nipype.workflow IMPORTANT:
     Creating bold processing workflow for "/oak/stanford/groups/russpold/data/openfmri/ds000005/sub-01/func/sub-01_task-mixedgamblestask_run-01_bold.nii.gz" (0.02 GB / 240 TRs). Memory resampled/largemem=0.06/0.10 GB.
181128-09:36:03,51 nipype.workflow IMPORTANT:
     No single-band-reference found for sub-01_task-mixedgamblestask_run-01_bold.nii.gz
181128-09:36:03,602 nipype.workflow WARNING:
     SDC: no fieldmaps found or they were ignored (/oak/stanford/groups/russpold/data/openfmri/ds000005/sub-01/func/sub-01_task-mixedgamblestask_run-01_bold.nii.gz).
181128-09:36:04,471 nipype.workflow IMPORTANT:
     Creating BOLD surface-sampling workflow.
181128-09:36:06,767 nipype.workflow IMPORTANT:
     Creating bold processing workflow for "/oak/stanford/groups/russpold/data/openfmri/ds000005/sub-01/func/sub-01_task-mixedgamblestask_run-02_bold.nii.gz" (0.04 GB / 240 TRs). Memory resampled/largemem=0.14/0.23 GB.
181128-09:36:06,782 nipype.workflow IMPORTANT:
     No single-band-reference found for sub-01_task-mixedgamblestask_run-02_bold.nii.gz
181128-09:36:07,267 nipype.workflow WARNING:
     SDC: no fieldmaps found or they were ignored (/oak/stanford/groups/russpold/data/openfmri/ds000005/sub-01/func/sub-01_task-mixedgamblestask_run-02_bold.nii.gz).
181128-09:36:08,129 nipype.workflow IMPORTANT:
     Creating BOLD surface-sampling workflow.
181128-09:36:10,475 nipype.workflow IMPORTANT:
     Creating bold processing workflow for "/oak/stanford/groups/russpold/data/openfmri/ds000005/sub-01/func/sub-01_task-mixedgamblestask_run-03_bold.nii.gz" (0.04 GB / 240 TRs). Memory resampled/largemem=0.14/0.23 GB.
181128-09:36:10,490 nipype.workflow IMPORTANT:
     No single-band-reference found for sub-01_task-mixedgamblestask_run-03_bold.nii.gz
181128-09:36:10,967 nipype.workflow WARNING:
     SDC: no fieldmaps found or they were ignored (/oak/stanford/groups/russpold/data/openfmri/ds000005/sub-01/func/sub-01_task-mixedgamblestask_run-03_bold.nii.gz).
181128-09:36:11,830 nipype.workflow IMPORTANT:
     Creating BOLD surface-sampling workflow.

Memory usage after sending workflow initialization to worker - RSS 235.6 MB / VM 1609 MB
Object usage:
function                   52796
dict                       40938
tuple                      24878
list                       16669
weakref                    10688
cell                       7794
getset_descriptor          6870
type                       6507
CTrait                     6258
builtin_function_or_method 5651

181128-09:36:35,238 nipype.workflow INFO:
     Workflow fmriprep_wf settings: ['check', 'execution', 'logging', 'monitoring']
181128-09:36:35,884 nipype.workflow INFO:
     Running in parallel.
181128-09:36:35,908 nipype.workflow INFO:
     [LegacyMultiProc] Running 0 tasks, and 6 jobs ready. Free memory (GB): 28.19/28.19, Free processors: 8/8.
181128-09:36:36,53 nipype.workflow INFO:
     [Node] Setting-up "fmriprep_wf.single_subject_01_wf.about" in "/home/oesteban/tmp/nipype-mem-fork/work/fmriprep_wf/single_subject_01_wf/about".

...

In summary, we start with RSS 136.7 MB / VM 870 MB as expected from just importing nipype and argparse. Then, a blip up to RSS 139.2 MB / VM 1064 MB after instantiating LegacyMultiProcPlugin. Finally, a last jump to RSS 235.6 MB / VM 1609 MB happens when the workflow (which is created in a different process, although I am guessing that is not helping much) is returned to the main thread.

We should probably try to get nipype not to import pandas (of course) and numpy (which is imported to check its version in utils/config.py). Ideally, I'd say the interfaces could be imported and the pipeline left for after the workers are started.

All in all, I don't think we can reduce the initial VM footprint below ~500MB. But, having, e.g., 8 workers that means start forking without duplicating 300MB per worker.

The legacy multiproc with maxtasksperchild is a MUST at this point.

WDYT?

oesteban commented 5 years ago

Oh, I forgot to note a couple of details:

effigies commented 5 years ago

Looking at that RSS/VMS disparity with the workflow building (+100MB RSS / +550MB VMS), I think we should check the modules list to see how much of the virtual memory addition is imports. It may be even more critical to confine as many imports as possible to code that will not run until inside the worker.

oesteban commented 5 years ago
>>> import psutil
>>> p = psutil.Process()
>>> a = p.memory_info().vms / (1024**2)
>>> import nipype
>>> b = p.memory_info().vms / (1024**2)
>>> from fmriprep.workflows import anatomical
>>> c = p.memory_info().vms / (1024**2)
>>> from niworkflows.interfaces.segmentation import ReconAllRPT
>>> d = p.memory_info().vms / (1024**2)
>>> print('VM overhead of nipype is %.f MB' % (b - a))
VM overhead of nipype is 778 MB
>>> print('VM overhead of fmriprep.workflows.anatomical is %.f MB' % (c - b))
VM overhead of fmriprep.workflows.anatomical is 580 MB
>>> print('VM now includes niworkflows: %.f MB' % (d - c))
VM now includes niworkflows: 0 MB

In a previous check, I found that from niworkflows.interfaces.segmentation import ReconAllRPT adds 292 MB on top of nipype.