coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

Making a worker use processes rather than cores #238

Open fingoldo opened 1 year ago

fingoldo commented 1 year ago

Hi, I am trying to run distributed computing on AWS using coiled. My code is pure Python and therefore can not bypass the GIL. When testing with local Dask on my laptop, to activate all cores I had to use threads_per_worker settings like this:

cluster = LocalCluster(processes=True, threads_per_worker=1, n_workers=8)

In fact, I was telling dask to spawn as many worker processes as my node had cores, and use only 1 thread per process. That allowed my laptop to utilize all 8 of its (HyperThreaded, but still) cores, yielding the highest performance. As I get it, in distributed dask and in coiled workers have different meaning - it's basically nodes (which is confusing). I can't pass threads_per_worker to cluster creation code anymore: I get TypeError: init() got an unexpected keyword argument 'threads_per_worker'. I noticed that Coiled.Cluster has worker_options={"nthreads": 1} ability instead, but it was suspicious that documentation mentioned it as means to make computing "synchronous" rather that leverage processes instead of threads. Indeed, after trying it my not-so-great performance dropped even more.

To recap, my problem is that my code is in pure Python with GIL, and coiled only utilizes one core of an AWS machine with 4 cores (which I clearly see from the coiled dashboard and from the overall timing), despite upon submitting of the job dask says " Using backend DaskDistributedBackend with 4 concurrent workers.".

How do I achieve behaviour similar to _LocalCluster with processes=True, threads_perworker=1 cited above with coiled?

ntabris commented 1 year ago

Hi, @fingoldo. We currently run just a single Dask worker process on each VM node. Given that Coiled makes it easy to use different instances types, we typically recommend that if you have GIL blocking code, you'd get better parallelism by running across many small VMs.

For AWS, the smallest balanced non-burstable instance would be the m6i.large (2 "vCPU", which means a single physical core).

So, supposing you want 10 workers, you might try coiled.Cluster(n_workers=10, worker_vm_type=["m6i.large"]). Personally, I'd probably try both with and without worker_options={"nthreads": 1} on some sample to see which gets better performance for your specific case.

fingoldo commented 1 year ago

Thanks a lot for replying Nat, your approach would still mean that out of 2 cores on the smallest instance, my code would run only on 1, right? It would be awkward to pay for 2 but be able to use only 1 (even if the 2nd is a hyperthreading core).

For example, on a 2 vCPU t3.medium/us-east with default settings, my computations batch run in 10min 54s. Same instance/region, nthreads=1, spent 12min 48s. It's a bit weird since there is an obvious improvement, so the second thread is definitely helping (15% time reduction from NOT using nthreads=1), despite being HT and code being GIL-prone.

I tried t3.xlarge with 4 vCPUs (with default settings) a few times, but the time did not go down much (9min 44s and 9min 55s), which meant it did not utilize the 2nd "true" core. In the dask dashboard I was seeing white gaps in the tasks flows on each of 4 "workers", probably indicating GIL blocks. So, adding 2 more threads (one of which was a true core) only helped by another ~13% wheras it could be a 50% win if the cores would be fully utilized.

I checked on my laptop and I do get a benefit from hyperthreading as well, as some part of my computations are integer operations. I have i7-7700HQ with 4 real and 4 HT cores, _processes=True, threads_per_worker=1, nworkers=4 results in 4min 7s and ~70% CPU load, whereas _nworkers=8 runs in 3min 36s with 100% CPU load (13% advantage).

So, your recommendation seems to be valid, thank you for it! Running with 1 process on 2 cores, one of which is HT seems to extract all horsepower from the could instance for my particular workload. But, there might be a lack of small instances at some provider momentarily, or big machines could be more cost-efficient (given that all the cores could be utilized) due to non-linear pricing and demand, or my code could impose some additional memory or GPU requirements.

Plus if I need say 196 cores, instead of using 1 big machine I would have to pay for allocating and using of 98 VM images, which would unnecessarily add to storage costs. Basically, I would have to pay 2 orders of magnitude more for the storage for no reason, which would be not very lucrative. This does not feel right, as the "local" dask can already do the processes parallelization that I need with just a simple flag. The ease if spinning cloud instances through coiled would be not justified by the bigger storage bill and inability to get more diverse or cost-efficient servers. As a customer I'd say you need to look into it, as probably I'm not the only one who has pure python workloads.

I found an older dask article mentioning nthreads (which you already support) and nprocs parameters:

"How to choose nthreads to utilize multithreading When starting dask workers themselves, there are two very important configuration options to play against each other: how many workers and how many threads per worker. You can actually manipulate both on the same worker process with flags, such as in the form dask-worker --nprocs 2 --nthreads 2, though --nprocs simply spins up another worker in the background so it is cleaner configuration to avoid setting --nprocs and instead manipulate that configuration with whatever you use to specify total number of workers. We already talked about how to choose number of workers, but you may modify your decision about that if you change a workers’ --nthreads to increase the amount of work an individual worker can do.

I figured you might also support nprocs, just don't mention it in the docs. Tried looking for possible worker_options, could not find in the docs anything except nthreads. Could not locate the appropriate source codes as well. In the end, tried adding worker_options={"nprocs": 2} and starting a new cluster, it simply did not let the worker start (the scheduler was started though). Do I have to look into some kind of nested joblib or multiprocessing.Pool magic inside the function I need to run on a cluster if I want to use coiled with a bigger machine?

jrbourbeau commented 1 year ago

@fingoldo you can also configure your workers to use a process pool to execute tasks (instead of the default thread pool). Here's a little snippet that demonstrates that in action:

from concurrent.futures import ProcessPoolExecutor

import dask
from dask.distributed import WorkerPlugin
import coiled

# Needed when workers are using ProcessPoolExecutor
dask.config.set({"distributed.worker.daemon": False})

# Spin up a coiled cluster
cluster = coiled.Cluster(n_workers=10)
client = cluster.get_client()

# Swap out the default threadpool for a processpool on workers
class ProcessPoolWorkers(WorkerPlugin):
    def setup(self, worker):
        worker.executors["default"] = ProcessPoolExecutor(max_workers=worker.state.nthreads)

client.register_worker_plugin(ProcessPoolWorkers())

# Run some Dask code (replace with your actual code)
df = dask.datasets.timeseries(
    "2000", "2005", partition_freq="2w"
).persist()
df.groupby("name").aggregate({"x": "sum", "y": "max"}).compute()

I'd be curious to here if this helps with your performance issues in practice

ntabris commented 1 year ago

So, your recommendation seems to be valid, thank you for it! Running with 1 process on 2 cores, one of which is HT seems to extract all horsepower from the could instance for my particular workload.

Great, glad that worked well enough for you!

But, there might be a lack of small instances at some provider momentarily

This would surprise me. The small instances are just a smaller slice of the resources on a very large host. I don't think AWS cares much about how that host is sliced into VMs, so if you can get (say) 1/4 of the host as a single VM, you could just as well get 1/4 of the host as (say) 4 smaller VMs. I'm not sure that's how it works, but that's been my assumption and it fits my experience.

or big machines could be more cost-efficient (given that all the cores could be utilized) due to non-linear pricing and demand, or my code could impose some additional memory or GPU requirements. [...] I would have to pay 2 orders of magnitude more for the storage for no reason, which would be not very lucrative

I agree those are potential issues. In my experience, those haven't been major concerns, and usually optimizing your workload at that level is going to have much less significant payoffs than other ways of optimizing (say, making code more efficient, or getting better utilization of spot instances—esp. since it's usually easier to get smaller rather than larger spot instances). But if you are running at sufficient scale that these sorts of optimizations make sense, lets talk!

fingoldo commented 1 year ago
# Swap out the default threadpool for a processpool on workers
class ProcessPoolWorkers(WorkerPlugin):
    def setup(self, worker):
        worker.executors["default"] = ProcessPoolExecutor(max_workers=worker.state.nthreads)

client.register_worker_plugin(ProcessPoolWorkers())

@jrbourbeau Unfortunately, proposed approach runs into error:

/opt/coiled/env/lib/python3.9/site-packages/distributed/worker.py in execute() /opt/coiled/env/lib/python3.9/multiprocessing/queues.py in _feed() /opt/coiled/env/lib/python3.9/multiprocessing/reduction.py in dumps()

PicklingError: Can't pickle <function my_cluster_func at 0x7f82b6744940>: attribute lookup my_cluster_func on main failed

even though I ensured my_cluster_func definition is placed above if __name__ == "__main__": line of my script.

jrbourbeau commented 1 year ago

It looks like my_cluster_func isn't pickleable (I'm not sure what that function is). You could confirm by seeing if

import cloudpickle
cloudpickle.loads(cloudpickle.dumps(my_cluster_func))

passes or raises.

fingoldo commented 1 year ago

It looks like my_cluster_func isn't pickleable (I'm not sure what that function is). You could confirm by seeing if

import cloudpickle
cloudpickle.loads(cloudpickle.dumps(my_cluster_func))

passes or raises.

it passes, actually.

jrbourbeau commented 1 year ago

Interesting -- thanks for trying that out. If you're still interested in swapping out the threadpool for a processpool, do you have a full traceback / code you could share?

fingoldo commented 1 year ago

Interesting -- thanks for trying that out. If you're still interested in swapping out the threadpool for a processpool, do you have a full traceback / code you could share?

I'll do it right away, thanks a tad! In the meanwhile, may I ask why Coiled devs decided to support in the worker_options nthreads, but not nprocs (as it was before) or nworkers (as it is supported now in dask.distributed)? It would be much easier if one of them would be supported.

jrbourbeau commented 1 year ago

may I ask why Coiled devs decided to support in the worker_options nthreads, but not nprocs (as it was before) or nworkers (as it is supported now in dask.distributed)?

Coiled just forwards worker_options as keyword arguments to whatever worker class you're using. The default distributed.Worker supports nthreads but not nprocs / nworkers (here is a list of all the default worker parameters in distributed). Hopefully that's helpful

fingoldo commented 1 year ago

may I ask why Coiled devs decided to support in the worker_options nthreads, but not nprocs (as it was before) or nworkers (as it is supported now in dask.distributed)?

Coiled just forwards worker_options as keyword arguments to whatever worker class you're using. The default distributed.Worker supports nthreads but not nprocs / nworkers (here is a list of all the default worker parameters in distributed). Hopefully that's helpful

Wow, indeed, there is no such param. But I just recently run a command like "dask worker tcp://192.168.100.30:8786 --nworkers=4 --nthreads=1". and it worked the way I wanted. How does nworkers param works then? Sorry for my silly questions. If I change default worker class to Nanny (I saw it in the docs somewhere), will I be able to pass nworkers in worker_options? Update. Tried


 n_workers=1,
 worker_options={"nworkers": 4, "nthreads": 1}, 
 worker_class="distributed.Nanny"

and the worker still does not start.

jrbourbeau commented 1 year ago

Ah, I see. --nworkers is a dask worker CLI parameter, but not a Worker class parameter. FWIW dask worker <scheduler-address> --nworkers=X is essentially just a shorthand for running dask worker <scheduler-address> X times in a for loop (i.e. it spins up X separate worker processes on the same node).

If I change default worker class to Nanny (I saw it in the docs somewhere), will I be able to pass nworkers in worker_options?

No, Nanny is a wrapper around Worker that more-or-less just forwards kwargs down to the worker.

fingoldo commented 1 year ago

Sorry, it took me a while to prepare a clean reproducer. There are 2 modules, cloud.py with all of the logic. and _coiledtest.py which creates tasks via joblib.delayed and then sends them to a Coiled cluster using with parallel_backend("dask"): construct.

cloud.py

import numpy as np
import math

def my_cloud_func(
    input_array: np.ndarray,
):
    # imagine some non gil-releasing computational load

    total = 0.0
    for x in input_array:
        total += math.sin(x)

    return total

coiled_test.py


from cloud import *
from concurrent.futures import ProcessPoolExecutor
from joblib import Parallel, delayed, parallel_backend
from dask.distributed import WorkerPlugin
import cloudpickle
import logging
import coiled
import dask

def proxy_cloud_func(*args, **kwargs):
    # for the function to be defined in-process. not sure if it's necessary.
    from cloud import my_cluster_func

    return my_cluster_func(*args, **kwargs)

def create_params_grid(
    input_array: np.ndarray,
):

    jobslist = []

    for _ in range(1000):
        jobslist.append(
            delayed(proxy_cloud_func)(
                input_array=input_array,
            )
        )
    return jobslist

if __name__ == "__main__":

    logging.basicConfig(
        level=logging.INFO,
        format=("%(asctime)s - %(levelname)s - line:%(lineno)d - %(message)s"),
    )

    input_array = np.random.random(size=300_000)

    use_dask = True

    logging.info(f"Is my cluster func pickleable? { 'Yes' if cloudpickle.loads(cloudpickle.dumps(my_cluster_func)) else 'No' }")
    logging.info(f"Is the proxy of my cluster func pickleable? { 'Yes' if cloudpickle.loads(cloudpickle.dumps(proxy_cloud_func)) else 'No' }")

    if use_dask:

        logging.info("Creating Coiled software env...")

        coiled.create_software_environment(
            name="minimal-ml-env",
            pip="pandas numba numpy".split(),
        )

        logging.info("Creating Coiled cluster...")

        # Needed when workers are using ProcessPoolExecutor
        dask.config.set({"distributed.worker.daemon": False})

        cluster = coiled.Cluster(
            name="test",
            n_workers=1,
            # worker_options={"nthreads": 1},  
            software="minimal-ml-env",
            use_best_zone=True,
            compute_purchase_option="spot",
            backend_options={"region_name": "us-east-2"},
            scheduler_options={"idle_timeout": "10 minutes"},
            scheduler_vm_types=["t3.medium"],
            worker_vm_types=["m6i.large"],
        )
        # point Dask to remote cluster
        client = cluster.get_client()
        print("Dask scheduler dashboard:", client.dashboard_link)

        # Swap out the default threadpool for a processpool on workers
        class ProcessPoolWorkers(WorkerPlugin):
            def setup(self, worker):
                worker.executors["default"] = ProcessPoolExecutor(max_workers=worker.state.nthreads)

        client.register_worker_plugin(ProcessPoolWorkers())

        client.wait_for_workers(1)
        client.upload_file("cloud.py")

    params_grid = create_params_grid(input_array=input_array)
    logging.info("Start of computing...")

    if use_dask:
        with parallel_backend("dask"):
            res = Parallel(n_jobs=-1)(params_grid)
    else:
        res = Parallel(n_jobs=-1)(params_grid)

    logging.info(f"Done. {res[0]}")

When use_dask is set to True, upon running I get that PicklingError despite the func is deemed to be pickleable.

mrocklin commented 1 year ago

Coming to this issue late, but concurrent.futures.ProcessPoolExecutor isn't very pickle friendly. I think that Loky is recommended for this: https://loky.readthedocs.io/en/stable/ and is a drop-in replacement.

But really I think that this is probably too much technology for not that big of a benefit. Again, if you're using so much that you're becoming quite cost conscious then there are likely other things to think about. I see that you've added things like Spot (🎉 ). Maybe setting arm=True would give you another boost (and give you access to single-CPU instances anyway).

fingoldo commented 1 year ago

Coming to this issue late, but concurrent.futures.ProcessPoolExecutor isn't very pickle friendly. I think that Loky is recommended for this: https://loky.readthedocs.io/en/stable/ and is a drop-in replacement.

But really I think that this is probably too much technology for not that big of a benefit. Again, if you're using so much that you're becoming quite cost conscious then there are likely other things to think about. I see that you've added things like Spot (🎉 ). Maybe setting arm=True would give you another boost (and give you access to single-CPU instances anyway).

When adjusted for Loky, i.e.

        from loky import get_reusable_executor
         ...
        coiled.create_software_environment(
            name="minimal-ml-env",
            pip="pandas numba numpy loky".split(),
        )
        ...
        # Swap out the default threadpool for a processpool on workers
        class ProcessPoolWorkers(WorkerPlugin):
            def setup(self, worker):
                worker.executors["default"] = get_reusable_executor(max_workers=worker.state.nthreads)

I get the following error:

2023-05-19 13:13:14,966 - INFO - line:1037 - Ensuring network infrastructure is ready... 2023-05-19 13:13:20,420 - INFO - line:1037 - Scheduler: queued for launch Workers: 1 queued for launch (of 1) 2023-05-19 13:13:26,261 - INFO - line:1037 - Scheduler: starting Workers: 1 starting (of 1) 2023-05-19 13:13:31,461 - INFO - line:1037 - Scheduler: starting Workers: 1 running but not ready (of 1) 2023-05-19 13:13:37,946 - INFO - line:1037 - Scheduler: running but not ready Workers: 1 running but not ready (of 1) 2023-05-19 13:13:44,338 - INFO - line:1037 - Scheduler: running but not ready Workers: 1 running but not ready (of 1) 2023-05-19 13:13:49,685 - INFO - line:1037 - Scheduler: running but not ready Workers: 1 downloading software environment (of 1) 2023-05-19 13:13:56,030 - INFO - line:1037 - Scheduler: downloading software environment Workers: 1 downloading software environment (of 1) 2023-05-19 13:14:02,116 - INFO - line:1037 - Scheduler: downloading software environment Workers: 1 downloading software environment (of 1) 2023-05-19 13:14:03,677 - ERROR - line:132 - | Worker Process | test-worker-27ce6eb765 | error at 13:13:59 (Russia TZ 2 Standard Time) | Software environment exited with error code 1 2023-05-19 13:14:03,679 - ERROR - line:132 - | Cluster | test | error at 13:13:59 (Russia TZ 2 Standard Time) | Workers all had error -> Software environment exited with error code 1

fingoldo commented 1 year ago

Apologies, I was pointed out that dask and distributed were missing from the env. Still, after the fix, I am getting

File "C:\ProgramData\Anaconda3\lib\concurrent\futures_base.py", line 446, in result return self.get_result() File "C:\ProgramData\Anaconda3\lib\concurrent\futures_base.py", line 391, in get_result raise self._exception File "/opt/coiled/env/lib/python3.9/site-packages/distributed/worker.py", line 2270, in execute File "/opt/coiled/env/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 255, in run_in_executor File "/opt/coiled/env/lib/python3.9/asyncio/base_events.py", line 819, in run_in_executor File "/opt/coiled/env/lib/python3.9/site-packages/loky/reusable_executor.py", line 225, in submit File "/opt/coiled/env/lib/python3.9/site-packages/loky/process_executor.py", line 1248, in submit File "/opt/coiled/env/lib/python3.9/site-packages/loky/process_executor.py", line 1220, in _ensure_executor_running File "/opt/coiled/env/lib/python3.9/site-packages/loky/process_executor.py", line 1209, in _adjust_process_count File "/opt/coiled/env/lib/python3.9/multiprocessing/process.py", line 118, in start AssertionError: daemonic processes are not allowed to have children 2023-05-19 18:13:11,158 - ERROR - line:1753 - Task was destroyed but it is pending!

no matter if I use dask.config.set({"distributed.worker.daemon": False}) or not.

Re-checked with ProcessPoolExecutor, for it setting deamon to False makes it throw (instead of daemonic processes are not allowed to have children)

2023-05-19 18:17:58,802 - INFO - line:93 - Start of computing... concurrent.futures.process._RemoteTraceback: """ Traceback (most recent call last): File "/opt/coiled/env/lib/python3.9/multiprocessing/queues.py", line 244, in _feed obj = _ForkingPickler.dumps(obj) File "/opt/coiled/env/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps cls(buf, protocol).dump(obj) _pickle.PicklingError: Can't pickle <function proxy_cloud_func at 0x7f80e4cab310>: attribute lookup proxy_cloud_func on main failed

shughes-uk commented 1 year ago

Unrelated but we just pushed a change to our staging deployment that will show you those "dask is missing" errors more directly, instead of having to hunt for them in the system logs! Hopefully deployed pretty soon

mrocklin commented 1 year ago

@fingoldo thought on this comment:

But really I think that this is probably too much technology for not that big of a benefit. Again, if you're using so much that you're becoming quite cost conscious then there are likely other things to think about. I see that you've added things like Spot (🎉 ). Maybe setting arm=True would give you another boost (and give you access to single-CPU instances anyway).

From my perspective it sounds like you're focused on using a ProcessPoolExecutor, but this is a very atypical path and, my guess is, not actually all that useful.

Are you really hurting from having two vCPUs cost-wise? I'm curious how much computation costs you're running into if so such that it warrants this focus. Have you tried running your computation on ARM? They don't have vCPUs, so maybe you can stop caring if you're on that architecture.

fingoldo commented 1 year ago

@fingoldo thought on this comment:

But really I think that this is probably too much technology for not that big of a benefit. Again, if you're using so much that you're becoming quite cost conscious then there are likely other things to think about. I see that you've added things like Spot (🎉 ). Maybe setting arm=True would give you another boost (and give you access to single-CPU instances anyway).

From my perspective it sounds like you're focused on using a ProcessPoolExecutor, but this is a very atypical path and, my guess is, not actually all that useful.

Are you really hurting from having two vCPUs cost-wise? I'm curious how much computation costs you're running into if so such that it warrants this focus. Have you tried running your computation on ARM? They don't have vCPUs, so maybe you can stop caring if you're on that architecture.

Maybe you are right and I am too concerned with cost. I have not much computations currently (anticipated savings only tens of $$ a day) but I estimate to have more in the coming months. I realized I always have the option of using a Dask cloudprovider which I should be able to fit to my liking without bothering anyone ) Will need to make it accept extra params to run on spot instances though.

Thanks a lot everyone for spending time on this question, if it's not on the rodmap of Coiled, feel free to close this feature request.

mrocklin commented 1 year ago

Have you tried arm=True? Arm supports single-cpu instances.

On Sun, May 21, 2023 at 10:00 AM fingoldo @.***> wrote:

@fingoldo https://github.com/fingoldo thought on this comment:

But really I think that this is probably too much technology for not that big of a benefit. Again, if you're using so much that you're becoming quite cost conscious then there are likely other things to think about. I see that you've added things like Spot (🎉 ). Maybe setting arm=True would give you another boost (and give you access to single-CPU instances anyway).

From my perspective it sounds like you're focused on using a ProcessPoolExecutor, but this is a very atypical path and, my guess is, not actually all that useful.

Are you really hurting from having two vCPUs cost-wise? I'm curious how much computation costs you're running into if so such that it warrants this focus. Have you tried running your computation on ARM? They don't have vCPUs, so maybe you can stop caring if you're on that architecture.

Maybe you are right and I am too concerned with cost. I have not much computations currently (anticipated savings only tens of $$ a day) but I estimate to have more in the coming months. I realized I always have the option of using a Dask cloudprovider which I should be able to fit to my liking without bothering anyone ) Will need to make it accept extra params to run on spot instances though.

Thanks a lot everyone for spending time on this question, if it's not on the rodmap of Coiled, feel free to close this feature request.

— Reply to this email directly, view it on GitHub https://github.com/coiled/feedback/issues/238#issuecomment-1556201085, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTBAQ4GEN3EF4BJSQE3XHIU2NANCNFSM6AAAAAAW7KGER4 . You are receiving this because you commented.Message ID: @.***>

--

https://coiled.io

Matthew Rocklin CEO, Dask Maintainer

fingoldo commented 1 year ago

Have you tried arm=True? Arm supports single-cpu instances.

Wow, I did not know that. Not tried yet as in previous versions ARM required some non-standard environment creation (via docker, I recall). Will definitely give it a try then.

mrocklin commented 1 year ago

Hey @fingoldo , checking in, did using ARM help resolve this issue?