PrefectHQ / prefect

Prefect is a workflow orchestration framework for building resilient data pipelines in Python.
https://prefect.io
Apache License 2.0
17.4k stars 1.64k forks source link

Implement __getattr__ as a prefect.Task magic method #3024

Closed julianhess closed 4 years ago

julianhess commented 4 years ago

Current behavior

Currently, prefect.Task implements a number of operators that implicitly add extra tasks to the current flow context. These are known internally as "Magic Methods".

__getitem__ is a good example of this:

import prefect

@prefect.task
def return_dict():
  return { "a" : "b" }

with prefect.Flow("getitem_test") as f:
  dict_task = return_dict()
  value_task = dict_task["a"]

value_task = dict_task["a"] will implicitly create a new task (value_task) that runs __getitem__ on the return value of dict_task, thus returning "b".

Proposed behavior

prefect.Task should also implement __getattr__, which calls a custom Task whose __call__ method is overridden. Here is a simple implementation:

class MyTask(prefect.Task):
  def __getattr__(self, attr):
    getattr_task = MyGetattr().bind(obj = self, attr = attr)
    return getattr_task

class MyGetattr(MyTask):
  def run(self, obj, attr):
    return obj.__getattr__(attr)

  def __call__(self, *args, **kwargs):
    return prefect.task(lambda x : x(*args, **kwargs)).bind(self)

This would be extremely useful for being able to implicitly access properties or call methods of the return value of MyTask.run(). Since obj.__getattr__(attr) only runs if attr is not present in obj.__dict__, it is guaranteed not to interfere with any properties/methods already implemented by prefect.Task.

Example

import pandas as pd

# monkey patch a couple preexisting "magic methods" 
# (obviously, if prefect.Task natively implemented __getattr__, this would not be necessary)
prefect.tasks.core.operators.GetItem.__getattr__ = MyTask.__getattr__
prefect.tasks.core.function.FunctionTask.__getattr__ = MyTask.__getattr__

class PandasTask(MyTask):
    def run(self):
        return pd.DataFrame({ "x" : range(0, 10), "y" : range(20, 30) })

with prefect.Flow(name = "dataframe_example") as f:
    # return a basic Pandas dataframe
    df = PandasTask()

    # implicity call the drop method of the dataframe; drop_task will contain
    # rows 5 through the end
    df_dropped = df.drop(range(0, 5))

    # implicity access the column "x" from the dataframe
    col_x = df.x

    # implicity access column "y" via the loc object of the dataframe. note that loc's custom
    # __getitem__ implementation automatically works
    col_y = df.loc[:, "y"]

    # of course, these tasks can be infinitely chained, implicitly adding new tasks to the DAG
    y_dropped = col_y.drop(range(0, 5)).transpose()
    x_dropped = col_x.drop(range(10, -1))

I am happy to submit a PR to add this functionality — it should be fairly trivial to implement.

jlowin commented 4 years ago

The issue with implementing __getattr__ on tasks is that it compromises the user experience while trying to work with true attributes, like .name, as well as creating confusing behavior around shadowed attributes (what if you wanted to access a result attribute called .name?)

However, I recognize the value of this in the same way we currently support __getitem__ (though that doesn't have the same usability issues because there's no expectation that Task objects are index-able).

Could we think through maybe having a single Task attribute that supported both . and [] magic, something like task.output or task.result or task.data? That would avoid the usability issues. Something like:

with Flow(...):
    a = my_task()
    b = do_stuff(a.output['a_col'])  # currently a['a_col']
    c = do_stuff(a.output.another_col)  # currently unsupported
julianhess commented 4 years ago

The issue with implementing getattr on tasks is that it compromises the user experience while trying to work with true attributes, like .name

As I wrote above, obj.__getattr__(attr) only runs if attr is not present in obj.__dict__. Since true attributes (like .name) are already present in task.__dict__, the custom __getattr__ would not be invoked on them, so no true Task attributes would be clobbered.

I agree with you that the main issue is what you describe in the next clause:

confusing behavior around shadowed attributes (what if you wanted to access a result attribute called .name?)

Perhaps we could simply warn users that any attributes belonging to a returned object conflicting with native Task attributes (e.g. .name) will be clobbered? I really like your idea of adding a special attribute (e.g. .result/.data) to access shadowed attributes:

Could we think through maybe having a single Task attribute that supported both . and [] magic, something like task.output or task.result or task.data? That would avoid the usability issues. Something like:

Can we implement this alongside the proposed __getattr__ as a fallback mechanism to access clobbered attributes?

@task(name = "my_demo_dataframe")
def df_task():
  return pd.DataFrame({ "x" : range(0, 10), "name" : range(20, 30) })

with Flow("clobber_demo") as f:
  df = df_task()
  task_name = df.name # will access the Prefect Task name attribute ("my_demo_dataframe"), not the dataframe field "name"
  x = df.x # since x is not a native Prefect Task attribute, this will access the dataframe field "x"
  df_name = df.data.name # proposed way of accessing shadowed attributes

Edit: Not sure how you feel about this from a UX standpoint, but perhaps Prefect could raise warnings at runtime for each native Task attribute that shadows a return object attribute. Sadly we wouldn't be able to easily do this when the DAG is built, but warning the user at runtime is better than nothing.

In the above example, when we call df.name, it would be straightforward to check that .name is also an attribute of the returned dataframe, and raise a warning like:

WARNING: "name" is both a native attribute of prefect.Task and an attribute of the "df" task's return value. "df.name" will access the former ("my_demo_dataframe"). If you actually meant to access the latter, please use "df.data.name"

The wording of this warning could be much improved, but hopefully my example text gets the point across.

jlowin commented 4 years ago

This would add enormous complexity - warnings, dict checks, shadows, different behaviors across tasks with different "true" attributes. I already suspect that __getitem__ is too magically implemented already; I don't think we can introduce such a magic access on __getattr__ as well. I would strongly recommend df.data.xyz as the universal access (possibly for df.data['xyz'] as well).

cicdw commented 4 years ago

Jumping in with one additional consideration - Prefect doesn't add much value at that level of granularity (e.g., what does it mean to retry an attribute access or to have a stateful dependency on attribute access?). If you want to build deferred computational graphs like this, I highly recommend dask (which has a first-class delayed / distributed dataframe concept).

julianhess commented 4 years ago

This would add enormous complexity - warnings, dict checks, shadows, different behaviors across tasks with different "true" attributes. I already suspect that getitem is too magically implemented already; I don't think we can introduce such a magic access on getattr as well. I would strongly recommend df.data.xyz as the universal access (possibly for df.data['xyz'] as well).

I agree that we should first focus on your suggested .data attribute (since we'd need it as a fallback for shadowed attributes anyway). I suspect there are subtleties I'm not yet seeing with __getattr__, but thus far in my own implementation it seems to be working without any issues. Perhaps rigorously verifying that it has no ill effects could be a "moonshot" goal, and something for a separate issue thread.

possibly for df.data['xyz'] as well

I'm not sure I agree with this. I really like the fact that all magic methods behave as if the task object and return value are equivalent, e.g.

@task
def return_num(x):
  return x

with Flow("add_example") as f:
  a = return_num(1)
  b = return_num(2)
  c = a + b # by implicitly creating a new task, Task.__add__ acts on the return values of a and b, not the Task objects they actually are

If as you suggest the return value would now be accessed in task.data (so task.data["xyz"] would replace task["xyz"]), shouldn't the same logic apply to all the other magics? I think it would be confusing from a UX perspective for __getitem__ to behave differently from everything else. The above code would then become

with Flow("add_example") as f:
  a = return_num(1)
  b = return_num(2)
  c = a.data + b.data

which is a step backwards in simplicity, IMO. The only need for introducing .data to access attributes of a returned object is because the . operator is overloaded to access native Task attributes; AFAIK [] (and all the other operators) do not have similar overloading issues.

Jumping in with one additional consideration - Prefect doesn't add much value at that level of granularity (e.g., what does it mean to retry an attribute access or to have a stateful dependency on attribute access?). If you want to build deferred computational graphs like this, I highly recommend dask (which has a first-class delayed / distributed dataframe concept).

I only listed dataframes as an example of where this would be useful (and indeed, for that one specific use case, Dask is great!) However, I think this would be handy for any Prefect task returning an object whose attributes are then passed to downstream tasks. For example,

import prefect

class Example:
  def __init__(self, a, b):
    self.a = a
    self.b = b
    self.c = a + b

@prefect.task
def return_object(a, b):
  return Example(a, b)

@prefect.task
def add_three(a, b, c):
  return a + b + c

with prefect.Flow("attr_example") as f:
  foo = return_object(1, 1) # foo.a = 1, foo.b = 1, foo.c = 2
  sum_foo = add_three(foo.a, foo.b, foo.c) # would add 1 + 1 + 2 if __getattr__ were implemented
  sum_const = add_three(1, 2, 3) # adds 1 + 2 + 3

Having first-class support for accessing attributes of returned objects is much cleaner than the current alternative, which would require having to create a special handler class that unwraps objects, e.g.

@prefect.task
def obj_extractor(obj):
  return obj.__dict__

with prefect.Flow("attr_example") as f:
  foo = return_object(1, 1) # foo.a = 1, foo.b = 1, foo.c = 2

  # sum_foo = add_three(foo.a, foo.b, foo.c)
  # ^^^^ won't work; the only easy way to access the attributes of 
  # foo's return value is with an extra wrapper class that converts it to a dict:
  foo_ext = obj_extractor(foo)
  sum_foo = add_three(foo_ext["a"], foo_ext["b"], foo_ext["c"])

  sum_const = add_three(1, 2, 3) # adds 1 + 2 + 3

Having to create such a wrapper class that explicitly unpacks the desired attributes of an object into something accessible by prefect.Task.__getitem__ is cumbersome, and makes the workflow definition un-Pythonic. I'm a huge fan of Prefect because unlike any other workflow definition API I've come across, Prefect flow definitions (mostly) read like standard imperative Python, due to the many magic methods already implemented.

cicdw commented 4 years ago

While I understand where you're coming from, this is a slippery slope that will introduce a quite large maintenance burden:

In general, Prefect has to strike a very delicate balance between being Pythonic and staying true to one of its goals of providing a state management system for workflows. We routinely get the question "how small should a Prefect Task be?" and the answer is always "the smallest it can be where its state has meaning" and I think attribute access violates that rule of thumb (the state of attribute access will always be explicitly tied to whether the task result being access succeeded).

Now, all of that being said, I'm not willing to close this issue yet - if there is additional interest or any other use cases that arise we can reconsider.

basnijholt commented 4 years ago

I was also looking for this and through this issue, I learned about the Magic Methods which I didn't read anything about in the docs. Although it would have been nice to know!

Not really the right place (maybe) but could there be a similar magic method to iterate over an Iterable?

cicdw commented 4 years ago

Hm I think that introduces many of the problems from above - in particular, for iterating, we don't have anything to iterate over so we'd have to create a deferred "iteration" task, but I'm not sure what value that adds. I think it would have the result of further confusing people who try things like: So things like:

for data in my_task:
...

which currently raise an error (as it should, because there is no data to iterate over until runtime).

basnijholt commented 4 years ago

What about using type annotations to indicate that, e.g., a tuple of a certain length is returned? Maybe I am asking something that would bring more (which I can't see now) complications with it. In reality, I would like to be able to map flows inside flows but probably I should discuss this in a separate issue.

cicdw commented 4 years ago

For that you could either:

Note that task magic methods are only useful at build time, not runtime, and the most they can do is generate more tasks.

jcrist commented 4 years ago

I'm going to close this. Creating tasks automatically based on __getattr__ is problematic for two reasons:

If you're looking for a way to automatically parallelize/distribute existing code, you may be interested in using dask.delayed which does support automatic tasks created via __getattr__, and has much lower overhead (also, __getattr__ tasks are usually optimized out).

julianhess commented 4 years ago

Fair enough. In my application (which wraps Prefect), I'll continue to add in this functionality via monkey patching. Thanks for the discussion!