DAGWorks-Inc / hamilton

Hamilton helps data scientists and engineers define testable, modular, self-documenting dataflows, that encode lineage/tracing and metadata. Runs and scales everywhere python does.
https://hamilton.dagworks.io/en/latest/
BSD 3-Clause Clear License
1.89k stars 126 forks source link

pipe_input target #1173

Closed jernejfrank closed 1 month ago

jernejfrank commented 1 month ago

Addresses #1161.

Default behaviour reverts back to first parameter chaining for backwards compatibility.

Changes

How I tested this

Notes

There are two differences to on_output:

  1. Since we do not use NodeTransformer we can have a mixture of global and local on_input settings. I made it so that the local (on the level of Applicable) adds to the global one (on the level of pipe_input). The other option would be to allow the local to override the global one. I also added a safeguard that if on_input is used, all transforms need to have a target to have it clear.
  2. The namespace convention needs slight adjustment in case we apply pipe_input to multiple parameters. Since each node is chained through the parameter name, I have now that in case the same transform is applied to two parameters, we create two nodes with the same namespace but add to them "_param_name" so that we don't have problems with same-node names in the DAG.

Checklist

jernejfrank commented 1 month ago

Nice! Left some thoughts -- I think this is valuable (E.G. the first parameter is a bit of an awkward assumption), but we might want to restrict the API a bit. Also some naming/consistency stuff.

The API expanded substantially to leave as much flexibility in there as possible. It also is a complete dual to pipe_output in the sense that you can target multiple parameters (like we do end nodes) and can allocate transforms individually between them -- so you could use them interchangeably / becomes a personal preference if you rather chain on output nodes or input parameter.

But I'm happy to constrain it more, like @elijahbenizzy suggested, to only allow only a single parameter and you pass it in as a string seems reasonable. In case you want more flexibility use pipe_output. There would be one case I don't see this working out, but that is probably then more indicative of a code smell than a lack of flexibility:


def _transform():
    ...

def _other_transform():
    ...

@pipe_output(step(_transform)) # this won't work
def interesting_node():
    ...

@pipe_input(
        step(_transform).on_input="interesting_node", # this should work
        step(_other_transform).on_input("foo") # leaving flexibility to also transform other params
) 
def downstream(foo, bar, baz, interesting_node,...):
    ...

def some_other_dependency(interesting_node):
# here you don't want to have the pipe transform happening just the original `interesting_node` input
    ...
jernejfrank commented 1 month ago

Nice! Left some thoughts -- I think this is valuable (E.G. the first parameter is a bit of an awkward assumption), but we might want to restrict the API a bit. Also some naming/consistency stuff.

The API expanded substantially to leave as much flexibility in there as possible. It also is a complete dual to pipe_output in the sense that you can target multiple parameters (like we do end nodes) and can allocate transforms individually between them -- so you could use them interchangeably / becomes a personal preference if you rather chain on output nodes or input parameter.

But I'm happy to constrain it more, like @elijahbenizzy suggested, to only allow only a single parameter and you pass it in as a string seems reasonable. In case you want more flexibility use pipe_output. There would be one case I don't see this working out, but that is probably then more indicative of a code smell than a lack of flexibility:

def _transform():
    ...

def _other_transform():
    ...

@pipe_output(step(_transform)) # this won't work
def interesting_node():
    ...

@pipe_input(
        step(_transform).on_input="interesting_node", # this should work
        step(_other_transform).on_input("foo") # leaving flexibility to also transform other params
) 
def downstream(foo, bar, baz, interesting_node,...):
    ...

def some_other_dependency(interesting_node):
# here you don't want to have the pipe transform happening just the original `interesting_node` input
    ...

Maybe it would make sense to keep it like this because the subtle difference in the implementations between pipe_input and pipe_output can work hand in hand.

Specifically, for pipe_output we touch the original node by creating this chain node_raw--chain_nodes--node, whereby for pipe_input we have param--chain_node--renamed_param_in_upstream_node so we don't touch the original node, but rename the argument in the next node.