dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 719 forks source link

concurrent.futures compatibility #3695

Open TomAugspurger opened 4 years ago

TomAugspurger commented 4 years ago

In writing up this post, I discovered this discussion on python ideas: https://mail.python.org/archives/list/python-ideas@python.org/thread/LMTQ2AI6A7UXEFVHRGHKWD33H24FGM6G/#ICJKHZ4BPIUMOPIT2TDTBIW2EH4CPNCP and BPO: https://bugs.python.org/issue39645.

If I've followed those threads correctly, there's a chance that Python 3.9 will include some additions to the concurrent.futures.Future API to make this doable.


Has there been any discussions about using dask.distributed's Client and Futures with top-level concurrent.futures APIs like wait and as_completed? A quick search didn't turn anything up.

My motivation: Say I have a library that can't depend on dask, but supports parallel computation through concurrent.futures. I'd like to add optional support for distributed computation with Dask through the concurrent.futures interface. I can only use concurrent.futures.{wait, as_completed}, not distributed.{wait, as_completed}

import concurrent.futures

def do_single(x): return x

def do_many(sequence, executor=None):
    executor = executor or concurrent.futures.ThreadPoolExecutor()
    futures = [executor.submit(do_single, x) for x in sequence]
    result = [x.result() for x in concurrent.futures.as_completed(futures)]
    return result

This works with the concurrent.futures executors.

In [3]: do_many([1, 2])
Out[3]: [1, 2]

But fails with Dask's Client

In [4]: from distributed import Client

In [5]: client = Client()
In [6]: do_many([1, 2], client)
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-7-f124f8a5a765> in <module>
----> 1 do_many([1, 2], client)

<ipython-input-1-b4c62d29b5e0> in do_many(sequence, executor)
      8     executor = executor or concurrent.futures.ThreadPoolExecutor()
      9     futures = [executor.submit(do_single, x) for x in sequence]
---> 10     result = [x.result() for x in concurrent.futures.as_completed(futures)]
     11     return result

<ipython-input-1-b4c62d29b5e0> in <listcomp>(.0)
      8     executor = executor or concurrent.futures.ThreadPoolExecutor()
      9     futures = [executor.submit(do_single, x) for x in sequence]
---> 10     result = [x.result() for x in concurrent.futures.as_completed(futures)]
     11     return result

~/miniconda3/envs/prophet/lib/python3.7/concurrent/futures/_base.py in as_completed(fs, timeout)
    217     fs = set(fs)
    218     total_futures = len(fs)
--> 219     with _AcquireFutures(fs):
    220         finished = set(
    221                 f for f in fs

~/miniconda3/envs/prophet/lib/python3.7/concurrent/futures/_base.py in __enter__(self)
    144     def __enter__(self):
    145         for future in self.futures:
--> 146             future._condition.acquire()
    147
    148     def __exit__(self, *args):

AttributeError: 'Future' object has no attribute '_condition'

That's getting into the internals of concurrent.futures, which we perhaps want to stay out of.

jakirkham commented 4 years ago

Well there is ClientExecutor. Not sure if that helps. It appears we don't include it in the docs currently. Though that seems doable.