plasma-umass / scalene

Scalene: a high-performance, high-precision CPU, GPU, and memory profiler for Python with AI-powered optimization proposals
Apache License 2.0
11.61k stars 387 forks source link

Unable to profile dask #280

Open JPvRiel opened 2 years ago

JPvRiel commented 2 years ago

Describe the bug

When attempting to profile a dask dataframe operation, the following error occurs:

AttributeError: Can't pickle local object 'replacement_mp_semlock.<locals>.ReplacementSemLock'

Full stack trace provided at the end.

I wanted to unpack some memory utilization problems with dask, which I accept is likely one of the more complex python frameworks one might try profile!

Seems that the exception repeats per dask worker process.

To Reproduce

Try profile python using dask. Code from dask_profile.py shown below.

Error produced when run as python3.9 -m scalene --off dask_profile.py.

import os
import numpy as np
import pandas as pd
import dask.config
import dask.distributed
import dask.dataframe as dd
import scalene

# make the OS trim free memory more frequently and try avoid running out of memory due to unmanaged memory.
os.environ['MALLOC_TRIM_THRESHOLD_'] = '16384'

if __name__ == '__main__':

    # Generate a large numpy array with 100 million items of random data.
    rng = np.random.default_rng()
    a = rng.integers(low=0, high=100, size=(10_000, 100))
    print(f"Memory use by numpy array in bytes: {a.size * a.itemsize / 2 ** 20:.3f} MB.")

    # Pandas dataframe as a basline
    df = pd.DataFrame(a)
    print(f"Memory use by pandas dataframe {df.memory_usage(deep=True).sum()/ 2 ** 20:.3f} MB")
    print('Profile squaring a pandas dataframe.')
    scalene.scalene_profiler.start()
    df_sqared = df ** 2
    df_sqared_sum = df_sqared.sum()
    print(df_sqared_sum)
    scalene.scalene_profiler.stop()
    del df, df_sqared

    # dask setup
    dask_custom_conf = {
        'temporary-directory': '/tmp',
        'memory': {
            'target': 0.50,
            'spill': 0.60,
            'pause': 0.70,
            'terminate': 0.80
        }
    }
    dask.config.update(dask.config.config, dask_custom_conf)
    dask_client = dask.distributed.Client(name='demo')

    # Dask dataframe
    ddf = dd.from_array(a)
    scalene.scalene_profiler.start()
    ddf_sqared = ddf ** 2
    dff_sqared_sum = ddf_sqared.sum()
    df_sqared_sum = dff_sqared_sum.compute()
    print(df_sqared_sum)
    scalene.scalene_profiler.stop()
    del ddf, ddf_sqared, dff_sqared_sum, df_sqared_sum

    # cleanup
    input("Press Enter to exit...")
    dask_client.close()

Expected behavior

Scalene was noted as capable of handling python multi-processed deeper profiling. However, in the above dummy test, it is unable to profile dask for some reason.

Desktop (please complete the following information):

Additional context

pip --upgrade was to ensure all packages are latest available via pip.

Full stack trace:

/usr/local/lib/python3.9/multiprocessing/resource_tracker.py:96: UserWarning: resource_tracker: process died unexpectedly, relaunching.  Some resources might leak.
  warnings.warn('resource_tracker: process died unexpectedly, '
Task exception was never retrieved
future: <Task finished name='Task-17' coro=<_wrap_awaitable() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:684> exception=AttributeError("Can't pickle local object 'replacement_mp_semlock.<locals>.ReplacementSemLock'")>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/asyncio/tasks.py", line 691, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/usr/local/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/local/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'replacement_mp_semlock.<locals>.ReplacementSemLock'
Task exception was never retrieved
future: <Task finished name='Task-19' coro=<_wrap_awaitable() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:684> exception=AttributeError("Can't pickle local object 'replacement_mp_semlock.<locals>.ReplacementSemLock'")>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/asyncio/tasks.py", line 691, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/usr/local/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/local/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'replacement_mp_semlock.<locals>.ReplacementSemLock'
Task exception was never retrieved
future: <Task finished name='Task-18' coro=<_wrap_awaitable() done, defined at /usr/local/lib/python3.9/asyncio/tasks.py:684> exception=AttributeError("Can't pickle local object 'replacement_mp_semlock.<locals>.ReplacementSemLock'")>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/asyncio/tasks.py", line 691, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/usr/local/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/local/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'replacement_mp_semlock.<locals>.ReplacementSemLock'
Error in program being profiled:
 Can't pickle local object 'replacement_mp_semlock.<locals>.ReplacementSemLock'
Traceback (most recent call last):
  File "/home/enigma/.local/lib/python3.9/site-packages/scalene/scalene_profiler.py", line 1326, in profile_code
    exec(code, the_globals, the_locals)
  File "dask_profile.py", line 46, in <module>
    dask_client = dask.distributed.Client(name='demo')
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/client.py", line 758, in __init__
    self.start(timeout=timeout)
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/client.py", line 940, in start
    sync(self.loop, self._start, **kwargs)
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/utils.py", line 326, in sync
    raise exc.with_traceback(tb)
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/utils.py", line 309, in f
    result[0] = yield future
  File "/home/enigma/.local/lib/python3.9/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/client.py", line 1005, in _start
    self.cluster = await LocalCluster(
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/deploy/spec.py", line 402, in _
    await self._correct_state()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/deploy/spec.py", line 371, in _correct_state_internal
    await w  # for tornado gen.coroutine support
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/core.py", line 283, in _
    await self.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 335, in start
    response = await self.instantiate()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 418, in instantiate
    result = await self.process.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/nanny.py", line 685, in start
    await self.process.start()
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/process.py", line 32, in _call_and_set_future
    res = func(*args, **kwargs)
  File "/home/enigma/.local/lib/python3.9/site-packages/distributed/process.py", line 202, in _start
    process.start()
  File "/usr/local/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/local/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
AttributeError: Can't pickle local object 'replacement_mp_semlock.<locals>.ReplacementSemLock'
sternj commented 2 years ago

Can you please post the result of your pip freeze?

sternj commented 2 years ago

Ok so this is a really really weird issue-- what essentially is happening is that Dask is attempting to use the spawn multiprocessing context, which we disallow in Scalene because of the complications of reinitializing the profiler after a spawn. When we try to fork, since Dask works on a custom asyncio executor, all of the daemon threads just kinda... stop. When Dask is trying to see whether it needs to add more worker threads, it simply checks for the presence of thread objects rather than their liveness. This tends to work for them because they never have to deal with fork, but it doesn't anymore-- I'm considering starting a discussion with them to discuss this. For the meanwhile though, on the current version of dask with the current version of scalene, it doesn't work.

JPvRiel commented 2 years ago

Thanks @sternj, I'd happily try raise the issue on the dask project, but my python multi-processing knowledge is limited. You mentioned:

tends to work for them because they never have to deal with fork

As a random side-note about not dealing with forks cleanly, I observed what looked like a recursive accidental fork bomb when I first tried to hack together a benchmark/profile script without if __name__ == "__main__":. I didn't debug the details of why, but dask seems to end up re-executing the parent process code. Maybe it somehow imports the parent script. I noticed other libs like ray and modin don't have this same accidental trap of ending up re-executing the parent python process code when used in a flat/simple script ommiting if __name__ == "__main__":. Yes one should check for main to disentangle import vs direct exec, but regardless, it's notable behavior for how dask is doing something odd with how it sets up it's multi-processing.

sternj commented 2 years ago

I'm working up a commit for the Dask project!

Can you explain that fork bomb-esque thing that you saw @JPvRiel ? I know that with Pyenv in particular Scalene ends up starting up some child processes, but I haven't seen them grow seemingly without bound. A minimal example would be much appreciated!

JPvRiel commented 2 years ago

@sternj , sorry for a delayed reply. Maybe we should start a new issue on the dask project to continue this?

TL;DR: dask doesn't gracefully catch when it's being executed via a flat script and does seem to stumble on trying to spawn itself too many times. I asked for only 2x workers, but it got stuck in a loop of being unable to spawn that seems to stop after about 100 attempts...

I tried to recreate a small scale of my orignal bad benchmark/profile script that was flat and didn't use if __name__ == '__main__':

import logging
import psutil
import numpy as np
import dask.config
import dask.distributed
import dask.dataframe as dd

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s %(message)s',
    filename='no_main_forked_up.log'
)

def log_proc_info():
    mem_total = psutil.virtual_memory().total
    mem_available = psutil.virtual_memory().available
    proc = psutil.Process()
    children = proc.children(recursive=True)
    child_pids = [c.pid for c in children]
    proc_mem_vms_sum = sum([proc.memory_info().vms] + [c.memory_info().vms for c in children])
    proc_mem_rss_sum = sum([proc.memory_info().rss] + [c.memory_info().rss for c in children])
    logging.info(f'pid={proc.pid}, parent_pid={proc.parent().pid}, child_pids={child_pids}, vss_sum_m={proc_mem_vms_sum/2**20:.0f}, rss_sum_m={proc_mem_rss_sum/2**20:.0f}, avail_g={mem_available/2**30:.3f}, avail_p={mem_available/mem_total:.2%}, cmd="{" ".join(proc.cmdline())}"')

# Sample data
rng = np.random.default_rng()
a = rng.integers(low=0, high=100, size=(1_000_000, 10))

# Dask client created at global base scope and not within `if __name__ == "__main__":`, so it will get created again if something imports this or forks it?
dask_custom_conf = {'temporary-directory': '/tmp'}
dask.config.update(dask.config.config, dask_custom_conf)
dask_client = dask.distributed.Client(n_workers=2)

ddf = dd.from_array(a)
ddf.persist()

logging.info(f'numpy_array_m={a.size * a.itemsize / 2 ** 20:.0f}, dask_dataframe_m={ddf.memory_usage(deep=True).sum().compute()/2**20:.0f}')
log_proc_info()

It doesn't even get into the nice logging bits I try to output at the end...

I had upgraded dask and python, and I'm unable to re-produce the original behavior I mentioned, which seemed more silent and resource exhausting in my first (now refactored/lost implementation) - I recall my system swapping itself to death.

With the newer version of python (3.9.7) and dask (2021.11.1), python now seems to be catching this bad coding practice somehow, or what I tried to mockup didn't reproduce the issue I stumbled onto the first time. Now it repeats "An attempt has been made to start a new process before the current process has finished its bootstrapping phase" runtime error a lot and just hangs after a while.

  File "/usr/local/lib/python3.9/multiprocessing/spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError: 
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

The _check_not_importing_main runtime error showed up 100 times in my log file.

$ grep -c '_check_not_importing_main' no_main_forked_up.log 
100

Maybe python multiprocessing has now got some protection that pauses/stops things once it hits a limit.

So while I don't have the same behavior as before, I'm still sort of seeing the dask ends up trying to spawn a whole lot. But perhaps this has nothing to do with the issue that scalene can't profile it?

Anyhow, I've learnt that while ray and modin/pandas don't seem to mind flat scripting without being guarded in main, dask needs to be run/created from main and will somehow respawn/inclue itself multiple times.

Stumbled onto https://github.com/pytorch/pytorch/issues/5858 where there too, the lib expects to be used/guarded by being only called from main.

Soon as I put the stuff within main, my logging works as expected:

2021-11-09 22:13:02,246 numpy_array_m=76, dask_dataframe_m=76
2021-11-09 22:13:02,356 pid=21361, parent_pid=5615, child_pids=[21389, 21392, 21395], vss_sum_m=8040, rss_sum_m=2277, avail_g=21.015, avail_p=67.51%, cmd="python3.9 main_no_prob.py"
JPvRiel commented 2 years ago

Okay, wait, quick update. I think I know what happened a while ago. I previously had a much more complex benchmark script that was trying to benchmark standard pandas vs modin with ray vs modin with dask or something of the sort, and when dask executed, it would spawn/re-import the script where all the other pandas/ray benchmark stuff was being done before it, and maybe that's why I ended up with a locked up system swapped to death. Dask seems to be quite persistent at detecting spawing a worker or something failed and trying again and again...

Bad idea. I know now.

By the way, found this useful explanation about fork vs spawn: What happens when you start a new process?Permalink

When a process is forked the child process inherits all the same variables in the same state as they were in the parent. Each child process then continues independently from the forking point. ... On the other hand, when a process is spawned, it begins by starting a new Python interpreter. The current module is reimported and new versions of all the variables are created.

So hence, the multiprocessing default of forking might be a bit more forgiving with flat scripts that don't use main, because it will only exec the interpreter from the point at which it forks, and use copy on write for any var values it inherited form the parent, but spawn starts a fresh new interpreter and fully re-imports everything.