PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
15.33k stars 1.5k forks source link

can task decorater supports Object.method? #7198

Open appassionate opened 1 year ago

appassionate commented 1 year ago

First check

Prefect Version

2.x

Describe the current behavior

from prefect import flow as prefect_flow
from prefect import task as prefect_task

class Foo():

    def __init__(self,a=1):
        self.a = a
        #print(a)
    def do_method():
        print(self.a)

def create_foo():

    foo = Foo()
    print("get foo!")
    return foo

def dummy_flow():

    foo = prefect_task(create_foo)()
    prefect_task(foo.do_method)()

prefect_flow(dummy_flow)()
File ~/.conda/envs/prefect/lib/python3.8/inspect.py:1808, in _signature_bound_method(sig)
   1805 params = tuple(sig.parameters.values())
   1807 if not params or params[0].kind in (_VAR_KEYWORD, _KEYWORD_ONLY):
-> 1808     raise ValueError('invalid method signature')
   1810 kind = params[0].kind
   1811 if kind in (_POSITIONAL_OR_KEYWORD, _POSITIONAL_ONLY):
   1812     # Drop first parameter:
   1813     # '(p1, p2[, ...])' -> '(p2[, ...])'

ValueError: invalid method signature

Describe the proposed behavior

Such a "prefect_task(foo.do_method)" can be successfully executed.

Example Use

i believe the current behaviour can be executed.

Additional context

Hi, Prefect group! it is exciting that Prefect orion has break the wall of DAG concept, which we can build tasks/flows in a nested way. Dynamically generating task in executing make the "loop" condition possible.

To make such a "task" decorater more convient, can "prefect_task" decorator support the feature of "decorate class method"? I believe in inspect module, we can get the "method" belonging to the object by using "foo.do_method._self_". Maybe we can do something to get such a feature. in my imagination: we can using like this or rename it?

prefect_task(foo.do_method , ismethod=True, task_name= "balabala") 

Many thanks ! : )

zanieb commented 1 year ago

Thanks for the request! We do not recommend using methods for tasks as managing state during concurrent and distributed execution can be confusing. That said, if someone wants to try it it should be possible. We should handle this without requiring an ismethod on the task decorator — I believe we can inspect if it is a method. Can you share the full traceback for the failure?

appassionate commented 1 year ago

@madkinsz Thanks for your kind reply.

tasks as managing state during concurrent and distributed execution can be confusing

In my case, Is the object created the one I get in each prefect task? I mean every prefect task having foo instance has the same address in memory? Operations on the same object do have potential risks, but i think it depends on the user's usage of the object, i guess.... One thing to confirm is that such an object should be picklable without lock or in some io considering...

I believe we can inspect if it is a method

Yes, i think you are right. inspect module can judge whether the object is method or function, "ismethod" is extra. But I don't know if I should distinguish between func and method, like you consider on concurrent.

And this is the total error.

/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/flows.py:199: UserWarning: A flow named 'dummy-flow' and defined at '/tmp/ipykernel_237630/3946684294.py:18' conflicts with another flow. Consider specifying a unique `name` parameter in the flow definition:

 `@flow(name='my_unique_name', ...)`
  warnings.warn(
10:43:07.804 | INFO    | prefect.engine - Created flow run 'ultraviolet-cow' for flow 'dummy-flow'
/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/tasks.py:185: UserWarning: A task named 'create_foo' and defined at '/tmp/ipykernel_237630/3946684294.py:9' conflicts with another task. Consider specifying a unique `name` parameter in the task definition:

 `@task(name='my_unique_name', ...)`
  warnings.warn(
10:43:07.960 | INFO    | Flow run 'ultraviolet-cow' - Created task run 'create_foo-8b412ec3-0' for task 'create_foo'
10:43:07.961 | INFO    | Flow run 'ultraviolet-cow' - Executing 'create_foo-8b412ec3-0' immediately...
10:43:08.035 | INFO    | Task run 'create_foo-8b412ec3-0' - Finished in state Completed()
10:43:08.038 | ERROR   | Flow run 'ultraviolet-cow' - Encountered exception during execution:
Traceback (most recent call last):
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/engine.py", line 578, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/tmp/ipykernel_237630/3946684294.py", line 21, in dummy_flow
    foo = prefect_task(foo.do_method)()
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/tasks.py", line 828, in task
    Task(
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/context.py", line 163, in __register_init__
    __init__(__self__, *args, **kwargs)
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/tasks.py", line 153, in __init__
    raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"])
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/site-packages/prefect/utilities/callables.py", line 177, in raise_for_reserved_arguments
    function_paremeters = inspect.signature(fn).parameters
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/inspect.py", line 3093, in signature
    return Signature.from_callable(obj, follow_wrapped=follow_wrapped)
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/inspect.py", line 2842, in from_callable
    return _signature_from_callable(obj, sigcls=cls,
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/inspect.py", line 2228, in _signature_from_callable
    return _signature_bound_method(sig)
  File "/home/ubuntu/.conda/envs/prefect/lib/python3.8/inspect.py", line 1808, in _signature_bound_method
    raise ValueError('invalid method signature')
ValueError: invalid method signature
10:43:08.069 | ERROR   | Flow run 'ultraviolet-cow' - Finished in state Failed('Flow run encountered an exception. ValueError: invalid method signature\n')
get foo!
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In [9], line 23
     20     foo = prefect_task(create_foo)()
     21     foo = prefect_task(foo.do_method)()
---> 23 prefect_flow(dummy_flow)()

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/flows.py:429, in Flow.__call__(self, return_state, *args, **kwargs)
    425 parameters = get_call_parameters(self.fn, args, kwargs)
    427 return_type = "state" if return_state else "result"
--> 429 return enter_flow_run_engine_from_flow_call(
    430     self, parameters, return_type=return_type
    431 )

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/engine.py:147, in enter_flow_run_engine_from_flow_call(flow, parameters, return_type)
    143 elif in_async_main_thread():
    144     # An event loop is already running and we must create a blocking portal to
    145     # run async code from this synchronous context
    146     with start_blocking_portal() as portal:
--> 147         return portal.call(begin_run)
    148 else:
    149     # An event loop is not running so we will create one
    150     return anyio.run(begin_run)

File ~/.conda/envs/prefect/lib/python3.8/site-packages/anyio/from_thread.py:283, in BlockingPortal.call(self, func, *args)
    268 def call(
    269     self,
    270     func: Callable[..., Union[Coroutine[Any, Any, T_Retval], T_Retval]],
    271     *args: object
    272 ) -> T_Retval:
    273     """
    274     Call the given function in the event loop thread.
    275 
   (...)
    281 
    282     """
--> 283     return cast(T_Retval, self.start_task_soon(func, *args).result())

File ~/.conda/envs/prefect/lib/python3.8/concurrent/futures/_base.py:439, in Future.result(self, timeout)
    437     raise CancelledError()
    438 elif self._state == FINISHED:
--> 439     return self.__get_result()
    440 else:
    441     raise TimeoutError()

File ~/.conda/envs/prefect/lib/python3.8/concurrent/futures/_base.py:388, in Future.__get_result(self)
    386 def __get_result(self):
    387     if self._exception:
--> 388         raise self._exception
    389     else:
    390         return self._result

File ~/.conda/envs/prefect/lib/python3.8/site-packages/anyio/from_thread.py:219, in BlockingPortal._call_func(self, func, args, kwargs, future)
    216             else:
    217                 future.add_done_callback(callback)
--> 219             retval = await retval
    220 except self._cancelled_exc_class:
    221     future.cancel()

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/client/utilities.py:47, in inject_client.<locals>.with_injected_client(*args, **kwargs)
     45 async with client_context as new_client:
     46     kwargs.setdefault("client", new_client or client)
---> 47     return await fn(*args, **kwargs)

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/engine.py:229, in create_then_begin_flow_run(flow, parameters, return_type, client)
    227     return state
    228 elif return_type == "result":
--> 229     return await state.result(fetch=True)
    230 else:
    231     raise ValueError(f"Invalid return type for flow engine {return_type!r}.")

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/states.py:74, in _get_state_result(state, raise_on_failure)
     70 """
     71 Internal implementation for `get_state_result` without async backwards compatibility
     72 """
     73 if raise_on_failure and (state.is_crashed() or state.is_failed()):
---> 74     raise await get_state_exception(state)
     76 if isinstance(state.data, DataDocument):
     77     result = result_from_state_with_data_document(
     78         state, raise_on_failure=raise_on_failure
     79     )

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/engine.py:578, in orchestrate_flow_run(flow, flow_run, parameters, interruptible, client, partial_flow_run_context)
    572             else:
    573                 run_sync = (
    574                     run_sync_in_interruptible_worker_thread
    575                     if interruptible or timeout_scope
    576                     else run_sync_in_worker_thread
    577                 )
--> 578                 result = await run_sync(flow_call)
    580         waited_for_task_runs = await wait_for_task_runs_and_report_crashes(
    581             flow_run_context.task_run_futures, client=client
    582         )
    584 except Exception as exc:

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/utilities/asyncutils.py:68, in run_sync_in_worker_thread(__fn, *args, **kwargs)
     57 """
     58 Runs a sync function in a new worker thread so that the main thread's event loop
     59 is not blocked
   (...)
     65 thread may continue running — the outcome will just be ignored.
     66 """
     67 call = partial(__fn, *args, **kwargs)
---> 68 return await anyio.to_thread.run_sync(call, cancellable=True)

File ~/.conda/envs/prefect/lib/python3.8/site-packages/anyio/to_thread.py:31, in run_sync(func, cancellable, limiter, *args)
     10 async def run_sync(
     11     func: Callable[..., T_Retval],
     12     *args: object,
     13     cancellable: bool = False,
     14     limiter: Optional[CapacityLimiter] = None
     15 ) -> T_Retval:
     16     """
     17     Call the given function with the given arguments in a worker thread.
     18 
   (...)
     29 
     30     """
---> 31     return await get_asynclib().run_sync_in_worker_thread(
     32         func, *args, cancellable=cancellable, limiter=limiter
     33     )

File ~/.conda/envs/prefect/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:937, in run_sync_in_worker_thread(func, cancellable, limiter, *args)
    935 context.run(sniffio.current_async_library_cvar.set, None)
    936 worker.queue.put_nowait((context, func, args, future))
--> 937 return await future

File ~/.conda/envs/prefect/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:867, in WorkerThread.run(self)
    865 exception: Optional[BaseException] = None
    866 try:
--> 867     result = context.run(func, *args)
    868 except BaseException as exc:
    869     exception = exc

Cell In [9], line 21, in dummy_flow()
     18 def dummy_flow():
     20     foo = prefect_task(create_foo)()
---> 21     foo = prefect_task(foo.do_method)()

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/tasks.py:828, in task(__fn, name, description, tags, version, cache_key_fn, cache_expiration, retries, retry_delay_seconds, persist_result, result_storage, result_serializer)
    745 """
    746 Decorator to designate a function as a task in a Prefect workflow.
    747 
   (...)
    823     >>>     return "hello"
    824 """
    825 if __fn:
    826     return cast(
    827         Task[P, R],
--> 828         Task(
    829             fn=__fn,
    830             name=name,
    831             description=description,
    832             tags=tags,
    833             version=version,
    834             cache_key_fn=cache_key_fn,
    835             cache_expiration=cache_expiration,
    836             retries=retries,
    837             retry_delay_seconds=retry_delay_seconds,
    838             persist_result=persist_result,
    839             result_storage=result_storage,
    840             result_serializer=result_serializer,
    841         ),
    842     )
    843 else:
    844     return cast(
    845         Callable[[Callable[P, R]], Task[P, R]],
    846         partial(
   (...)
    859         ),
    860     )

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/context.py:163, in PrefectObjectRegistry.register_instances.<locals>.__register_init__(__self__, *args, **kwargs)
    161 registry = cls.get()
    162 try:
--> 163     __init__(__self__, *args, **kwargs)
    164 except Exception as exc:
    165     if not registry or not registry.capture_failures:

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/tasks.py:153, in Task.__init__(self, fn, name, description, tags, version, cache_key_fn, cache_expiration, retries, retry_delay_seconds, persist_result, result_storage, result_serializer)
    150 self.name = name or self.fn.__name__
    151 self.version = version
--> 153 raise_for_reserved_arguments(self.fn, ["return_state", "wait_for"])
    155 self.tags = set(tags if tags else [])
    156 self.task_key = to_qualified_name(self.fn)

File ~/.conda/envs/prefect/lib/python3.8/site-packages/prefect/utilities/callables.py:177, in raise_for_reserved_arguments(fn, reserved_arguments)
    174 def raise_for_reserved_arguments(fn: Callable, reserved_arguments: Iterable[str]):
    175     """Raise a ReservedArgumentError if `fn` has any parameters that conflict
    176     with the names contained in `reserved_arguments`."""
--> 177     function_paremeters = inspect.signature(fn).parameters
    179     for argument in reserved_arguments:
    180         if argument in function_paremeters:

File ~/.conda/envs/prefect/lib/python3.8/inspect.py:3093, in signature(obj, follow_wrapped)
   3091 def signature(obj, *, follow_wrapped=True):
   3092     """Get a signature object for the passed callable."""
-> 3093     return Signature.from_callable(obj, follow_wrapped=follow_wrapped)

File ~/.conda/envs/prefect/lib/python3.8/inspect.py:2842, in Signature.from_callable(cls, obj, follow_wrapped)
   2839 @classmethod
   2840 def from_callable(cls, obj, *, follow_wrapped=True):
   2841     """Constructs Signature for the given callable object."""
-> 2842     return _signature_from_callable(obj, sigcls=cls,
   2843                                     follow_wrapper_chains=follow_wrapped)

File ~/.conda/envs/prefect/lib/python3.8/inspect.py:2228, in _signature_from_callable(obj, follow_wrapper_chains, skip_bound_arg, sigcls)
   2221 sig = _signature_from_callable(
   2222     obj.__func__,
   2223     follow_wrapper_chains=follow_wrapper_chains,
   2224     skip_bound_arg=skip_bound_arg,
   2225     sigcls=sigcls)
   2227 if skip_bound_arg:
-> 2228     return _signature_bound_method(sig)
   2229 else:
   2230     return sig

File ~/.conda/envs/prefect/lib/python3.8/inspect.py:1808, in _signature_bound_method(sig)
   1805 params = tuple(sig.parameters.values())
   1807 if not params or params[0].kind in (_VAR_KEYWORD, _KEYWORD_ONLY):
-> 1808     raise ValueError('invalid method signature')
   1810 kind = params[0].kind
   1811 if kind in (_POSITIONAL_OR_KEYWORD, _POSITIONAL_ONLY):
   1812     # Drop first parameter:
   1813     # '(p1, p2[, ...])' -> '(p2[, ...])'

ValueError: invalid method signature
canuters commented 5 months ago

Hello, I found a workaround for running a class method as a prefect task. This might be useful for someone. I just declare a task decorated function outside the class scope, and run it using the "pipe" method. Using this, I was able to run the method as a prefect task and use the class object without significant changes to the code. Here's an example:

from time import sleep
from prefect import flow, task
class MyClass:
    def __init__(self) -> None:
        pass

    def pipe(self, func, *args, **kwargs):
        try:
            self = func(self, *args, **kwargs)
        except Exception as e:
            print(f'Piping {func.__name__} failed. Exception: {e.message}')
        finally:
            return self
    def f1(self, time=2):
        print(f'My time is {time} seconds')

    def f2(self, time=2):
        print(f'Sleeping {time} seconds')
        sleep(time)

    @task
    def f3(self, time=2):
        sleep(time)
        print('aaa')

@task
def f1(self, *args, **kwargs):
    print('This is a piped function')
    return MyClass.f1(self, *args, **kwargs)

@flow(name='encapsulated tasks', log_prints=True)
def main(*args):
    c = MyClass()
    c.f1()
    c.f2()
    c.pipe(f1, 5)
    try:
        c.f3()
    except Exception as e:
        print(str(e))

main()

Output:

11:05:26.793 | INFO    | prefect.engine - Created flow run 'emerald-tody' for flow 'encapsulated tasks'
11:05:26.795 | INFO    | Flow run 'emerald-tody' - View at http://127.0.0.1:4200/flow-runs/flow-run/ea0d6fe1-e373-4aa0-8a1b-a621913644a8
11:05:26.948 | INFO    | Flow run 'emerald-tody' - My time is 2 seconds
11:05:26.950 | INFO    | Flow run 'emerald-tody' - Sleeping 2 seconds
11:05:29.903 | INFO    | Flow run 'emerald-tody' - Created task run 'f1-0' for task 'f1'
11:05:29.907 | INFO    | Flow run 'emerald-tody' - Executing 'f1-0' immediately...
11:05:30.093 | INFO    | Task run 'f1-0' - This is a piped function
11:05:30.094 | INFO    | Task run 'f1-0' - My time is 5 seconds
11:05:30.214 | INFO    | Task run 'f1-0' - Finished in state Completed()
11:05:30.214 | INFO    | Flow run 'emerald-tody' - Error binding parameters for function 'f3': missing a required argument: 'self'.
Function 'f3' has signature 'self, time=2' but received args: () and kwargs: [].
11:05:30.340 | INFO    | Flow run 'emerald-tody' - Finished in state Completed('All states completed.')
felix5572 commented 1 week ago

@canuters how to deal with the f3 method? Is there any way to decorate f3 method as a task?