dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 718 forks source link

Does dask distributed allow customized work.executor? #1971

Open collinwo opened 6 years ago

collinwo commented 6 years ago

Hello,

We are in a process of evaluating the possibilities of using dask and dask distributed in our Analytics Platform. However, there are some legacy problems that force us to customize work's executor. I found that WorkBase allows pass in executor, but Nanny doesn't. Any suggestion for how to pass in a customized executor?

` class WorkerBase(ServerNode):

    def __init__(self, scheduler_ip=None, scheduler_port=None,
             scheduler_file=None, ncores=None, loop=None, local_dir=None,
             services=None, service_ports=None, name=None,
             reconnect=True, memory_limit='auto',
             executor=None, resources=None, silence_logs=None,
             death_timeout=None, preload=(), preload_argv=[], security=None,
             contact_address=None, memory_monitor_interval='200ms', **kwargs):

    self.executor = executor or ThreadPoolExecutor(self.ncores)

`

mrocklin commented 6 years ago

You might consider using a --preload script to modify the worker after it starts: http://distributed.readthedocs.io/en/latest/setup.html?highlight=preload#customizing-initialization

collinwo commented 6 years ago

Thanks. I will take a try this solution.

collinwo commented 6 years ago

Hello,

Does work.executor allow a multi-process-executor by default, e.g. ProcessPoolExecutor? I tried to integrated concurrent.futures.ProcessPoolExecutor, but got error:

Traceback (most recent call last): File ".../lib/python2.7/multiprocessing/queues.py", line 268, in _feed send(obj) TypeError: expected string or Unicode object, NoneType found

mrocklin commented 6 years ago

Most people who prefer to use processes just use many single-threaded workers instead

dask-worker --nthreads 1

On Mon, Jun 4, 2018 at 7:20 AM, Shuo YU notifications@github.com wrote:

Hello,

Does work.executor allow a multi-process-executor by default, e.g. ProcessPoolExecutor? I tried to integrated concurrent.futures.ProcessPoolExecutor, but got error:

Traceback (most recent call last): File ".../lib/python2.7/multiprocessing/queues.py", line 268, in _feed send(obj) TypeError: expected string or Unicode object, NoneType found

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/1971#issuecomment-394320340, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszM311c2WsBc7i10aKPyNXSGv-ZmCks5t5Rf8gaJpZM4T2JK1 .

MichaelSchreier commented 6 years ago

I am confronted with the same issue since I would like to make use of the worker resource system. Since there seems to be no way to make workers multiprocessed rather than multithreaded1, however, the resource system doesn't work for me (being faced with a heavily CPU-bound task).

Or is there a way to make this work that I'm missing?

For my personal use case the easiest solution would in fact be to make the resource system available via dask.multiprocessing though I can work with dask.distributed just as well.

1 perhaps the current worker argument ncores should be renamed to something like nthreads to avoid confusion