Parsl / parsl

Parsl - a Python parallel scripting library
http://parsl-project.org
Apache License 2.0
508 stars 196 forks source link

Passing a function or `PythonApp` as a kwarg to a `JoinApp` yields a `TypeError` #2791

Closed Andrew-S-Rosen closed 1 year ago

Andrew-S-Rosen commented 1 year ago

Describe the bug Different behavior is observed when I pass a function or PythonApp as a kwarg to a JoinApp versus when it is obtained from scope.

Disclaimer: I understand that it is highly likely that I'm the problem, not Parsl, but I just haven't figure out how yet.

To Reproduce

To set the stage, note that the following works perfectly fine.

from parsl import join_app, python_app

@python_app
def add(a, b):
    return a + b

@python_app
def make_more(val):
    return [val] * 3

@join_app
def workflow(a, b, c):
    future1 = add(a, b)
    future2 = make_more(future1)
    return [add(val, c) for val in future2.result()]

result = workflow(1, 2, 3).result() # [6, 6, 6]

However, if I change the way the add function gets called, I get different behavior.

from parsl import join_app, python_app

def add(a, b):
    return a + b

@python_app
def make_more(val):
    return [val] * [3]

@join_app
def workflow(a, b, c, f=add):
    f_app = python_app(add)
    future1 = f_app(a, b)
    future2 = make_more(future1)
    return [add(val, c) for val in future2.result()]

result = workflow(1, 2, 3).result() # e.g. [6, 6, 6]

The traceback is returned:

TypeError                                 Traceback (most recent call last)
Cell In[16], line 16
     13     future2 = make_more(future1)
     14     return [add(val, c) for val in future2.result()]
---> 16 result = workflow(1, 2, 3).result() # e.g. [6, 6, 6]

File ~/software/miniconda/envs/parsl/lib/python3.10/concurrent/futures/_base.py:458, in
 Future.result(self, timeout)
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File ~/software/miniconda/envs/parsl/lib/python3.10/concurrent/futures/_base.py:403, in
 Future.__get_result(self)
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/software/miniconda/envs/parsl/lib/python3.10/site-packages/parsl/dataflow/dflow.py:300, in DataFlowKernel.handle_exec_update(self, task_record, future)
    297     raise RuntimeError("done callback called, despite future not reporting itself as done")
    299 try:
--> 300     res = self._unwrap_remote_exception_wrapper(future)
    302 except Exception as e:
    303     logger.debug("Task {} try {} failed".format(task_id, task_record['try_id']))

File ~/software/miniconda/envs/parsl/lib/python3.10/site-packages/parsl/dataflow/dflow.py:566, in DataFlowKernel._unwrap_remote_exception_wrapper(future)
    564 result = future.result()
    565 if isinstance(result, RemoteExceptionWrapper):
--> 566     result.reraise()
    567 return result

File ~/software/miniconda/envs/parsl/lib/python3.10/site-packages/parsl/app/errors.py:123, in RemoteExceptionWrapper.reraise(self)
    119 logger.debug("Reraising exception of type {}".format(self.e_type))
    121 v = self.get_exception()
--> 123 reraise(t, v, v.__traceback__)

File ~/software/miniconda/envs/parsl/lib/python3.10/site-packages/six.py:719, in reraise(tp, value, tb)
    717     if value.__traceback__ is not tb:
    718         raise value.with_traceback(tb)
--> 719     raise value
    720 finally:
    721     value = None

File ~/software/miniconda/envs/parsl/lib/python3.10/site-packages/parsl/app/errors.py:146, in wrapper()
    144 from parsl.app.errors import RemoteExceptionWrapper
    145 try:
--> 146     return func(*args, **kwargs)
    147 except Exception:
    148     return RemoteExceptionWrapper(*sys.exc_info())

Cell In[16], line 14, in workflow()
     12 future1 = f_app(a, b)
     13 future2 = make_more(future1)
---> 14 return [add(val, c) for val in future2.result()]

File ~/software/miniconda/envs/parsl/lib/python3.10/concurrent/futures/_base.py:458, in
 result()
    456     raise CancelledError()
    457 elif self._state == FINISHED:
--> 458     return self.__get_result()
    459 else:
    460     raise TimeoutError()

File ~/software/miniconda/envs/parsl/lib/python3.10/concurrent/futures/_base.py:403, in
 __get_result()
    401 if self._exception:
    402     try:
--> 403         raise self._exception
    404     finally:
    405         # Break a reference cycle with the exception in self._exception
    406         self = None

File ~/software/miniconda/envs/parsl/lib/python3.10/site-packages/parsl/dataflow/dflow.py:300, in handle_exec_update()
    297     raise RuntimeError("done callback called, despite future not reporting itself as done")
    299 try:
--> 300     res = self._unwrap_remote_exception_wrapper(future)
    302 except Exception as e:
    303     logger.debug("Task {} try {} failed".format(task_id, task_record['try_id']))

File ~/software/miniconda/envs/parsl/lib/python3.10/site-packages/parsl/dataflow/dflow.py:566, in _unwrap_remote_exception_wrapper()
    564 result = future.result()
    565 if isinstance(result, RemoteExceptionWrapper):
--> 566     result.reraise()
    567 return result

File ~/software/miniconda/envs/parsl/lib/python3.10/site-packages/parsl/app/errors.py:123, in reraise()
    119 logger.debug("Reraising exception of type {}".format(self.e_type))
    121 v = self.get_exception()
--> 123 reraise(t, v, v.__traceback__)

File ~/software/miniconda/envs/parsl/lib/python3.10/site-packages/six.py:719, in reraise()
    717     if value.__traceback__ is not tb:
    718         raise value.with_traceback(tb)
--> 719     raise value
    720 finally:
    721     value = None

File ~/software/miniconda/envs/parsl/lib/python3.10/site-packages/parsl/app/errors.py:146, in wrapper()
    144 from parsl.app.errors import RemoteExceptionWrapper
    145 try:
--> 146     return func(*args, **kwargs)
    147 except Exception:
    148     return RemoteExceptionWrapper(*sys.exc_info())

Cell In[16], line 8, in make_more()
      6 @python_app
      7 def make_more(val):
----> 8     return [val] * [3]

TypeError: can't multiply sequence by non-int of type 'list'

The same error occurs if I pass in python_app(add) as a kwarg directly.

Expected behavior The same behavior should occur in both circumstances.

Environment

Distributed Environment

benclifford commented 1 year ago

In your 2nd example, in make_more, val is an int:

return [val] * [3]

but this sort of thing doesn't work in Python:

>>> [3] * [3]
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: can't multiply sequence by non-int of type 'list'

That's the very last error in the stack trace you pasted.

Andrew-S-Rosen commented 1 year ago

Ooops... you're absolutely right. Okay, closing this because that was a silly error on my part.

I'm trying to create a minimal working example of a more complex error but haven't quite gotten it yet. Apologies for this one!

benclifford commented 1 year ago

There are some other problems in your second example:

benclifford commented 1 year ago

eg like this:

def workflow(a, b, c, f=add):
    f_app = python_app(add)
    future1 = f_app(a, b)
    future2 = make_more(future1)
    end = add_all(future2, c, f_app=f_app)
    return end

@join_app
def add_all(many, c, f_app):
    return [f_app(val, c) for val in many]

The workflow function doesn't need to be a @join_app, but can instead be a regular Python function that returns a Future - because it doesn't have any need to wait on dependencies.

The block of code that waits for result2 to be completed is the bit of code that should be a join app, because that bit of code needs to wait for some earlier future (result2) to complete.

Andrew-S-Rosen commented 1 year ago

Ah, perfect!! Thank you for reading my mind. The bit about how to not call .result() in the app was something I was trying to sort out. This makes a lot more sense now and will probably resolve some of my other issues I've been running into.