cooperative-computing-lab / cctools

The Cooperative Computing Tools (cctools) enable large scale distributed computations to harness hundreds to thousands of machines from clusters, clouds, and grids.
http://ccl.cse.nd.edu
Other
134 stars 116 forks source link

vine: `FuturesExecutor` examples do not work #3835

Closed gpauloski closed 1 month ago

gpauloski commented 4 months ago

I am trying to use the FuturesExecutor and have run into a few problems. I've enumerated them in the order that I came across them. I have opened a PR (#3836) for issues 1, 2, 4, and 5.

Issues 3 and 6 are more nuanced so hopefully someone can take a look at those.

Installation

I've installed the latest ndcctools as follows:

conda create -p $PWD/test-env python=3.11
conda install -y -c conda-forge ndcctools

Issues

1. None return type in FuturesExecutor.submit()

The first example in the docs does not work (https://cctools.readthedocs.io/en/stable/taskvine/#futures).

import ndcctools.taskvine as vine

def my_sum(x, y):
    return x + y

m = vine.FuturesExecutor(manager_name='my_manager')

a = m.submit(my_sum, 3, 4)
b = m.submit(my_sum, 5, 2)
c = m.submit(my_sum, a, b)  # note that the futures a and b are
                            # a passed as any other argument.

print(c.result())
$ python t.py
Traceback (most recent call last):
  File "/home/jgpaul/workspace/taskvine/t.py", line 13, in <module>
    print(c.result())
          ^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'result'

This is because Futures.Executor.submit() is missing a return statement if the two isinstance checks fail: https://github.com/cooperative-computing-lab/cctools/blob/930509871b6a0cf85194c2561612e6c442c035fa/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py#L68-L69

The second example works correctly because a FuturePythonTask is created manually, but this (and another) of the examples in the docs have a typo and use FutureExecutor instead of FuturesExecutor.

2. Adding a callback to VineFuture fails

This should be _callback_fns: https://github.com/cooperative-computing-lab/cctools/blob/930509871b6a0cf85194c2561612e6c442c035fa/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py#L137-L138

3. Printing a VineFuture fails

While trying to print the futures returned by FuturesExecutor.submit(), you get the following error:

  File ".../lib/python3.11/concurrent/futures/_base.py", line 345, in __repr__
    with self._condition:
         ^^^^^^^^^^^^^^^
AttributeError: 'VineFuture' object has no attribute '_condition'

This is because VineFuture subclasses concurrent.futures.Future but does not call super().__init__() which initialized some attributes of concurrent.futures.Future used in concurrent.futures.Future.__repr__.

This also means that VineFuture is not compatible with any functions that operate on Future (i.e., wait or as_completed), but that is maybe a more nuanced design decision.

4. VineFuture.result(timeout=None) does not work

The concurrent.futures.Future specification describes that timeout=None says there is no limit to the wait time.

It seems FineFuture uses "wait_forever" instead to signal no wait time limit: https://github.com/cooperative-computing-lab/cctools/blob/930509871b6a0cf85194c2561612e6c442c035fa/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py#L134-L135

This makes it annoying to use FuturesExecutor in the same code base that may use other concurrent.futures.Executor implementations. It would be easy to translate None -> "wait_forever" within result.

5. Future callback signature is different

concurrent.futures.Future.add_done_callback(fn) says that "fn will be called, with the future as its only argument".

However, FuturesExecutor calls the callback with the task's output: https://github.com/cooperative-computing-lab/cctools/blob/930509871b6a0cf85194c2561612e6c442c035fa/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py#L184-L185

https://github.com/cooperative-computing-lab/cctools/blob/930509871b6a0cf85194c2561612e6c442c035fa/taskvine/src/bindings/python3/ndcctools/taskvine/futures.py#L277-L278

This is another deviation from the concurrent.futures.Executor spec, and the callback can still access the result from the future if the result is the only thing that is needed.

6. VineFuture callbacks invoked when retrieving result

This is another deviation from the behavior of concurrent.futures.Future. concurrent.futures.Future invokes its callbacks when one of the following happens: a task is cancelled, a result is set, or an exception is set.

VineFuture, instead, invokes the callbacks when VineFuture.result() is called. It caches the result so later calls VineFuture.result() return early and do not invoke the callbacks. However, if bug (5) is fixed and the callback accesses the result of the future, the callback will be invoked twice (or infinitely in some cases).

For example, this prints "callback!" indefinitely until a RecursionError is raised.

import ndcctools.taskvine as vine

def my_sum(x, y):
    return x + y

def callback(future):
    print('callback!')
    future.result()

m = vine.FuturesExecutor(manager_name='my_manager')

a = m.submit(my_sum, 3, 4)
a.add_done_callback(callback)

print(a.result())

I'm not sure what the best way to fix this is, but ideally invoking callbacks would be independent of the client code calling VineFuture.result(). Otherwise, that sort of defeats the value of callbacks.

BarrySlyDelgado commented 4 months ago

Hello @gpauloski. Thanks for noting some of the issues with the Futures Executor. I think the PR #3836 should fix issues 1,2,4,5 making them align more with concurrent.futures. Regarding issues 3 It is an oversight that some of the values of concurrent.futures.Future are uninitialized. With this the module functions as_completed and wait can be Implemented I'll make a PR that fixes issue 3.

Issue 6 poses a different challenge in regard to design. Essentially, TaskVine schedules tasks to workers whenwait is called on the Manager (This is the wait function within the TaskVine manager as opposed to the module function mentioned above). Initially, the most straightforward way to add callback functions was when wait returned the requested task. Essentially, the main challenge is the TaskVine manager does not schedule tasks and subsequently return tasks unless explicitly asked to via a call to wait. So wait would only be called when result() was called and the result for callback functions were only available through this method. A simple way to prevent the recursion issue would be to check if callback functions have been executed. However, this would not make it necessarily independent of calling result(). With implemented as_completed and wait module functions, results would be loaded without calling result() however, examples like the first one in the issue would still have callbacks depend on result(). If you have any insight on your use case or general use cases that would be welcome. We would like to hear what you are working on and if there's anything we can do to help or provide clarification on.

Thanks again!

dthain commented 4 months ago

@gpauloski we would be interested to learn more what sort of application you have in mind here.

gpauloski commented 4 months ago

Thanks, @BarrySlyDelgado and @dthain, for the quick reply!

For context, we are compiling some reference applications and want to design them in such a way that we can easily swap out different workflow/task execution engines. We've decided on using the concurrent.futures.Executor interface as it's the closest thing to a "standard" and is supported by a number of other engines we are looking. (I.e., dask.distributed.Client, GlobusComputeExecutor, ParslPoolExecutor, ThreadPoolExecutor, and ProcessPoolExecutor all provide a similar interface and functionality.)

Within these applications we are using Future.add_done_callback() for signaling (e.g., "task is done so trigger something else") and logging. So here I'm relying on the callback behavior being that callbacks are invoked when a future's result is set, as it could be some extended period of time before the application actually calls future.result().

Regarding (3), I think it's okay for VineFuture to not be a subclass of Future, if that makes more sense because you don't need any of the internal mechanisms provided by Future. For example, Dask's Future is its own type and so Dask provides its own wait() and as_completed() implementations. But, personally, it's convenient to have VineFuture be a subclass of Future. So that is to say there's merit to both approaches.

BarrySlyDelgado commented 4 months ago

Thanks for the feedback! I think there should be a more convenient way to add signaling if wait() and as_completed() are implemented. The general idea I have is having having wait() and as_completed() do something like this:

while not m.empty():
    task = m.wait(5)
    if task:
        print(f"Task {task.id} completed with result {task.output}")

Once tasks are returned callback functions could be executed. Though this wouldn't solve 6 if those methods are not used, but could possibly be used for your use case.

gpauloski commented 4 months ago

Unfortunately, I don't think this would work for my use case as a common use of callbacks for us is to signal that other asynchronous computations can start (even the submission of another task). If the client is required to intervene to trigger that callback (i.e., call .result(), wait(), or as_completed()), that would delay the start of those later computations.

I understand that TaskVine has other mechanisms that are probably better suited to support this kind of pattern, but here I am bound by needing to support many different task executors so my preference is to stick as close to the concurrent.futures.Executor spec as possible.

A temporary solution for me might to spawn a thread that waits on "alive" tasks so that the callbacks are triggered closer to when the task finishes. This could use your idea for wait().

gpauloski commented 3 months ago

Hi, @BarrySlyDelgado. Is there any update on the status of supporting wait()/as_completed()? I'd like to run some experiments with TaskVine in the next month or so (for a camera ready paper, if it gets accepted).

BarrySlyDelgado commented 3 months ago

Apologies for the delay, I can get an implementation of wait and as_completed() by the weekend if that fits within your time frame.

gpauloski commented 3 months ago

Thanks, Barry! By the end of June would be appreciated!

BarrySlyDelgado commented 3 months ago

I've created a PR (#3876) that will add as_completed and waitthat should be merged soon.

gpauloski commented 1 month ago

@BarrySlyDelgado, I think this issue can be closed now. I'm going to open a new issue to track some items that are different from what's mentioned here.