Open quasiben opened 4 years ago
It sounds like you're asking for hooks to specify environment variables with preload scripts earlier on in the startup process. Is this correct?
If so
https://github.com/dask/distributed/pull/3673 https://github.com/dask/distributed/pull/3678
On Tue, Apr 7, 2020 at 11:45 AM Benjamin Zaitlen notifications@github.com wrote:
cuDF recently changed and it now creates cuda context at import. This is necessary for a variety of reasons, most importantly is validation -- cuDF wants to validate if it can run on the device. This change breaks some assumptions we currently make in the dask-gpu ecosystem, namely, the dask-cuda-worker context initialization https://github.com/rapidsai/dask-cuda/blob/branch-0.14/dask_cuda/initialize.py which is done via a dask-worker preload script.
The environment variable necessary for our use case is CUDA_VISIBLE_DEVICES which must be set prior to creating a cuda context. Again, this was not an issue in the past because dask-cuda was responsible for creating the context. Now, however, we need to ensure this is set before we create any processes. When using LocalCluster or LocalCUDACluster, the startup looks like the following:
start nanny process
spawn thread for worker
In the current setup, the environment variable does not get set until step 2). If this is true, we'd like to explore how environment variable setting might happen during step 1). A solution here would also benefit other libraries which may need configurations on main thread such as OpenMP or BLAS (anything which needs to be configured before importing the library)
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3682, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGCGT5WMSAF35CUPJDRLNYEDANCNFSM4MDLAYCA .
That is correct but I think those PRs handle preloading after the original nanny process has started. Additionally, I think there may also be a race between the multiple workers spawning
I think what is actually happening can be reduced to the following example -- dask does something like the following
import os
import rmm
import cudf
import multiprocessing as mp
import time
def f(name, env):
os.environ.update(env)
rmm.reinitialize(
pool_allocator=True, managed_memory=False, initial_pool_size=30_000_000_000
)
print(env)
time.sleep(2)
if __name__ == "__main__":
ctx = mp.get_context("spawn")
p1 = ctx.Process(target=f, args=("bob", {"CUDA_VISIBLE_DEVICES": "0,2"}))
p2 = ctx.Process(target=f, args=("bob", {"CUDA_VISIBLE_DEVICES": "2,0"}))
p1.start()
# succeeds when the following line is uncommented
# os.environ.update({"CUDA_VISIBLE_DEVICES": "2,0"})
p2.start()
p1.join()
p2.join()
The above script fails with cuDF, because cuDF now creates a context prior to creating a new process which makes things rather challenging. The new process will still effectively have some cuda env remnants lying around and why dask took some care to create contexts after spawning.
If one uncomments the os.environ.update({"CUDA_VISIBLE_DEVICES": "2,0"})
line, both Python and CUDA are 😊because we are now changing the environment before spawning new processes.
I am not sure as to the best course of action here. We could contextualize the process creation but this may be challenging. @kkraus14 @pentschev do you have thoughts here ?
So for some additional context (pun intended!) cuDF doesn't create a CUDA context on import, but we do initialize the CUDA Driver API which seems to cache the value of CUDA_VISIBLE_DEVICES
for when a runtime context is created.
We also explored using the cudaSetDevice
runtime API to set the targetted device as opposed to relying on the CUDA_VISIBLE_DEVICES
and while using it from various libraries correctly affects other libraries in limited testing with cudf, cupy, and numba, it's somewhat common for libraries to call cudaSetDevice(0)
which will then have unintended side effects for us (i.e. xgboost does this, unsure of interaction with UCX).
Also, in general, forking a process (even a C process, no python) after creating a CUDA context causes bad things as the CUDA context can't exist in two processes memory spaces simultaneously, so we can focus our conversation on the spawn
method of launching processes.
I also went digging to see if there was a way to have multiprocessing run a piece of code or set environment variables before it starts to recreate the parent namespace or execute the target function, but I didn't see anything.
Thanks for the information gents. I'll take a look tomorrow and see if I can think of anything.
On Tue, Apr 7, 2020, 7:59 PM Keith Kraus notifications@github.com wrote:
So for some additional context (pun intended!) cuDF doesn't create a CUDA context on import, but we do initialize the CUDA Driver API which seems to cache the value of CUDA_VISIBLE_DEVICES for when a runtime context is created.
We also explored using the cudaSetDevice runtime API to set the targetted device as opposed to relying on the CUDA_VISIBLE_DEVICES and while using it from various libraries correctly affects other libraries in limited testing with cudf, cupy, and numba, it's somewhat common for libraries to call cudaSetDevice(0) which will then have unintended side effects for us (i.e. xgboost does this, unsure of interaction with UCX).
Also, in general, forking a process (even a C process, no python) after creating a CUDA context causes bad things as the CUDA context can't exist in two processes memory spaces simultaneously, so we can focus our conversation on the spawn method of launching processes.
I also went digging to see if there was a way to have multiprocessing run a piece of code or set environment variables before it starts to recreate the parent namespace or execute the target function, but I didn't see anything.
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3682#issuecomment-610724488, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTBYFTB7G6AVTGAFQZTRLPSAPANCNFSM4MDLAYCA .
Thanks for the minimal example @quasiben . I think the right way would indeed be to set environment variables just at the time a new process is forked/spawned, before the child has the chance to execute any other code. However, I don't know if that's possible from Python (or even C), so this is something we should explore.
The environment variable necessary for our use case is CUDA_VISIBLE_DEVICES which must be set prior to creating a cuda context. Again, this was not an issue in the past because dask-cuda was responsible for creating the context. Now, however, we need to ensure this is set before we create any processes
I'm not sure I understand this. Don't we need to set the environment variable just before we make a CUDA context rather than before the process is created? I'm not sure I don't understand why a preload script running on the Dask worker doesn't work? Is this because UCX or cudf is creating a CUDA context at import time? If so, what is triggering those imports? I would not expect any code within Dask would trigger an import of any library that creates a CUDA context before you have a chance for your preload script to run.
So what if you set the preload value to the following:
distributed:
worker:
preload:
- "CUDA_VISIBLE_DEVICES=0,2"
- dask_cuda.initialize
(although presumably you're not using yaml here, but generating this dynamically and passing it directly from within dask-cuda-worker)
Or sorry, I guess it should be the following:
set_env = """
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0,2"
"""
nanny = Nanny(..., preload=[set_env, "dask_cuda.initialize"], ...)
Some docs on preloads, including some features (like accepting raw Python code) that weren't previously advertised: https://github.com/dask/dask/pull/6077
On Wed, Apr 8, 2020 at 2:57 AM Peter Andreas Entschev < notifications@github.com> wrote:
Thanks for the minimal example @quasiben https://github.com/quasiben . I think the right way would indeed be to set environment variables just at the time a new process is forked/spawned, before the child has the chance to execute any other code. However, I don't know if that's possible from Python (or even C), so this is something we should explore.
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3682#issuecomment-610866063, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTA44RYGJYKDYNVKHVDRLRC6XANCNFSM4MDLAYCA .
Is this because UCX or cudf is creating a CUDA context at import time?
Yes, cuDF is.
If so, what is triggering those imports?
This is done today to ensure that devices are supported and correct CUDA versions are available.
I would not expect any code within Dask would trigger an import of any library that creates a CUDA context before you have a chance for your preload script to run.
This is happening at import time, so LocalCUDACluster
will spawn new processes that will start and immediately import cuDF, always creating a context on GPU 0, before the preload script has a chance to run.
So what if you set the preload value to the following: ...
That is what we currently do, but the preload script runs too late for the case I mentioned just above. For reference, this is where it happens today: https://github.com/rapidsai/dask-cuda/blob/branch-0.14/dask_cuda/local_cuda_cluster.py#L246-L254.
Yes, cuDF is.
We actually aren't creating a CUDA context, but we are initializing the driver which caches the device enumeration from CUDA_VISIBLE_DEVICES
.
May I suggest we hop on a video call to have a higher bandwidth discussion about this?
Sure. I'm free most of the afternoon.
On Wed, Apr 8, 2020 at 12:18 PM Keith Kraus notifications@github.com wrote:
Yes, cuDF is.
We actually aren't creating a CUDA context, but we are initializing the driver which caches the device enumeration from CUDA_VISIBLE_DEVICES.
May I suggest we hop on a video call to have a higher bandwidth discussion about this?
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/3682#issuecomment-611142827, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTAXOCBOO4BT7VX6EC3RLTEZLANCNFSM4MDLAYCA .
@quasiben @kkraus14 @pentschev and I had a quick chat. There is some bizarre behavior in imports happening that I couldn't explain. @quasiben when you have a moment can I ask you to include your minimal example with time.sleep
and the foo
module?
We were also curious about how to create a Python process with a set of environment variables. This was posted as a Stack Overflow question here: https://stackoverflow.com/questions/61109571/how-do-i-set-environment-variables-in-a-new-python-process
Also, for completeness, here is the script I wrote up to show that the Spawn mechanism doesn't carry over imports
# foo.py
def time_to_import_pandas():
import time
start = time.time()
import pandas
stop = time.time()
print(stop - start)
import pandas
import multiprocessing
ctx = multiprocessing.get_context("spawn")
import foo
proc = ctx.Process(target=foo.time_to_import_pandas)
proc.start() # prints about 1s, rather than 0s which we would expect if pandas had already been imported
FWIW I looked into the environment variables issue. I haven't figured out a way to set env vars with multiprocessing.Process
, but multiprocessing.Pool
has an initializer parameter that allows you to run setup code when a process is started. For example,
import os
import multiprocessing
def set_env(*args):
for name, value in args:
os.environ[name] = value
def print_env():
from pprint import pprint
for name in ["FOO", "BAR"]:
print(f"{name} = {os.environ.get(name)}")
if __name__ == '__main__':
ctx = multiprocessing.get_context("spawn")
with ctx.Pool(processes=1, initializer=set_env, initargs=(("FOO", "1"), ("BAR", "2"))) as pool:
print("In main process:")
print_env()
print("In process pool:")
result = pool.apply_async(print_env)
result.get()
print("Back in main process:")
print_env()
outputs
In main process:
FOO = None
BAR = None
In process pool:
FOO = 1
BAR = 2
Back in main process:
FOO = None
BAR = None
Not sure if that’s directly applicable to this situation, but it seemed at least potentially useful
This still doesn't work unfortunately. I.E. here's a clearer example from @mrocklin above:
# foo.py
def do_nothing():
pass
# bar.py
print("I was imported!")
# main.py
import foo
import bar
import multiprocessing
if __name__ == '__main__':
ctx = multiprocessing.get_context("spawn")
proc = ctx.Process(target=foo.do_nothing())
proc.start()
# output
I was imported!
I was imported!
Using the same foo and bar above with a pool:
# main.py
import foo
import bar
import multiprocessing
if __name__ == '__main__':
ctx = multiprocessing.get_context("spawn")
with ctx.Pool(processes=1, initializer=print, initargs=("initializer",)) as pool:
result = pool.apply_async(foo.do_nothing())
# output
I was imported!
I was imported!
initializer
@pitrou do you have any idea on what might be going on here? This breaks my understanding of how Python's multiprocessing works.
multiprocessing
always re-imports the main module in child processes. I'm not sure why that is, but there's probably a reason (it's been like that for as long as I remember).
multiprocessing
always re-imports the main module in child processes. I'm not sure why that is, but there's probably a reason (it's been like that for as long as I remember).
Do you know of any way to either set environment variables or run some type of user defined code before this happens?
No. However, you'll see that the main module is imported as __mp_main__
in the child process, not as __main__
(a __main__
alias is created afterwards). For example, add this code at the top of your main.py
above:
print("I was imported! My name is:", __name__)
(which also means that everything under if __name__ == '__main__':
doesn't get executed in the child)
I believe what MP does is serialize the file, spawns a new process, deserializes the file, then executes the target function. I heard some rumors about how cloudpickle could be used in place of pickle for the serialization process but this may no longer be available to us. cloudpickle may have been useful here in that we could potentially have it scoped to only serializing the function. Perhaps this helps jog the memory of @pitrou should he recall these kinds of things
I've been thinking more about locking. Assuming that we're only locking in-process (presumably os.environ
is the protected resource) then if we can arrange things to put a lock around process os.environ + process creation that's small enough then I could be in favor of that. Creating many Nannys in one process is relatively rare. We would want to see what the slowdown was like, especially on a system with an NFS. We would also probably also want to only do this if env
was specified.
cc @pentschev
Can this be closed now that #6841 is in?
Thanks @crusaderky for the reminder here. I was doing a bit of testing and I think you're right, we could close this but we must expose pre_spawn_env
to Nanny
's constructor first, allow me to elaborate why.
In Dask-CUDA, specifically LocalCUDACluster
we launch multiple workers and each must have a different environment variable, which is the object of discussion of this thread. This is achieved by returning an updated spec
for each worker with the appropriate env
kwarg, that is consumed during scale
. Because dask.config
is global each call ends up overwriting the previous one, and the last one to run is what gets passed to new processes. By adding a pre_spawn_env
kwarg to Nanny
we can keep the correct variable passing at the now correct time (pre-spawn).
Are there any objections to implement that kwarg? If not, I can send a PR later today, otherwise I'm open to suggestions on handling this with dask.config
.
The whole reason why we split env and pre-spawn-env is that in the latter you can't have different values for different workers - unless you are very careful to spawn one at a time, at which point you might as well call dask.config.set
instead.
The whole reason why we split env and pre-spawn-env is that in the latter you can't have different values for different workers - unless you are very careful to spawn one at a time, at which point you might as well call
dask.config.set
instead.
Yes, you are right. I actually ran it and was successful, but it's anyway a concurrency game, specifically in https://github.com/dask/distributed/blob/9021d57f338128ea488285d5c37f5dfcc33f6482/distributed/nanny.py#L685-L688 we could have multiple processes overwriting each other's variables. In https://github.com/dask/distributed/issues/6749#issuecomment-1190739588 I attempted to ensure synchronization, but the cost of launching workers becomes far too prohibitive.
Unfortunately then we don't have a solution for this issue with https://github.com/dask/distributed/pull/6841, and we should keep it open.
cuDF recently changed and it now creates a cuda context at import. This is necessary for a variety of reasons, most importantly is validation -- cuDF wants to validate if it can run on the device. This change breaks some assumptions we currently make in the dask-gpu ecosystem, namely, the dask-cuda-worker context initialization which is done via a dask-worker preload script.
The environment variable necessary for our use case is
CUDA_VISIBLE_DEVICES
which must be set prior to creating a cuda context. Again, this was not an issue in the past because dask-cuda was responsible for creating the context. Now, however, we need to ensure this is set before we create any processes. When usingLocalCluster
orLocalCUDACluster
, the startup looks like the following: 1) start nanny processIn the current setup, the environment variable does not get set until step 2). If this is true, we'd like to explore how environment variable setting might happen during step 1). A solution here would also benefit other libraries which may need configurations on main thread such as OpenMP or BLAS (anything which needs to be configured before importing the library)