DAGWorks-Inc / hamilton

Hamilton helps data scientists and engineers define testable, modular, self-documenting dataflows, that encode lineage and metadata. Runs and scales everywhere python does.
https://hamilton.dagworks.io/en/latest/
BSD 3-Clause Clear License
1.58k stars 94 forks source link

GracefulErrorAdapter fails with Parallelizable #1009

Closed JamesArruda closed 1 week ago

JamesArruda commented 2 weeks ago

GracefulErrorAdapter does not know to return a list of the sentinel value for Parallelizable nodes (ones that are of type EXPAND).

Current behavior

Stack Traces

The error can be reproduced with the code in the steps to replicate.

Traceback (most recent call last):
  File "c:\git\github\hamilton\tests\test_parallel_error.py", line 42, in <module>
    ans = dr.execute(
          ^^^^^^^^^^^
  File "c:\git\github\hamilton\hamilton\driver.py", line 566, in execute
    raise e
  File "c:\git\github\hamilton\hamilton\driver.py", line 556, in execute
    outputs = self.raw_execute(_final_vars, overrides, display_graph, inputs=inputs)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\git\github\hamilton\hamilton\driver.py", line 685, in raw_execute
    raise e
  File "c:\git\github\hamilton\hamilton\driver.py", line 674, in raw_execute
    results = self.graph_executor.execute(
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "c:\git\github\hamilton\hamilton\driver.py", line 232, in execute
    executors.run_graph_to_completion(execution_state, self.execution_manager)
  File "c:\git\github\hamilton\hamilton\execution\executors.py", line 399, in run_graph_to_completion
    execution_state.update_task_state(task_name, state, result)
  File "c:\git\github\hamilton\hamilton\execution\state.py", line 363, in update_task_state
    self.write_task_results(task_to_update, results)
  File "c:\git\github\hamilton\hamilton\execution\state.py", line 323, in write_task_results
    result_for_expansion = list(results_to_write.pop(result_name_for_expansion))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: 'NoneType' object is not iterable

Steps to replicate behavior

from hamilton.htypes import Collect, Parallelizable
from hamilton import driver
from hamilton.execution.executors import SynchronousLocalTaskExecutor
from hamilton.lifecycle import GracefulErrorAdapter

def distro(n: int, allow_fail: bool) -> Parallelizable[int]:
    for x in range(n):
        if x > 4 and allow_fail:
            raise Exception("bad")
        yield x * 3

def some_math(distro: int) -> float:
    if distro > 15:
        raise Exception("No no no")
    return distro * 2.0

def distro_gather(some_math: Collect[float]) -> list[float]:
    ans = [x for x in some_math]
    return ans 

if __name__ == "__main__":
    local_executor = SynchronousLocalTaskExecutor()
    import __main__
    dr = (
        driver.Builder()
        .enable_dynamic_execution(allow_experimental_mode=True)
        .with_modules(__main__)
        .with_remote_executor(local_executor)
        .with_adapters(
            GracefulErrorAdapter(
                error_to_catch=Exception,
                sentinel_value=None
            )
        )
        .build()
    )
    ans = dr.execute(
        ["distro_gather"],
        inputs={
            "n": 10,
            "allow_fail": True,
        }
    )
    print(ans)

Toggling allow_fail shows that the GracefulError works nicely within a parallel block, but not for the entry point.

Library & System Information

Windows 10 Pro Python 3.12.3 Hamilton 1.69.0 (7fd5e169df48ab9c17292f927941b3e24bd791c6)

Expected behavior

The node should fail safely and press on.

Possible Solution

The cause is straightforward: hamilton expects an iterable back from an EXPAND node (really from an EXPAND_UNORDERED group, I think). The GracefulErrorAdapter doesn't know to do that currently.

The solution I was attempting to pursue was to inject more node data into the run_to_execute_node method of the adapter. However, the base class doesn't allow an override of do_node_execute such that I could pass that information down into the adapter (such as from node_).

My first workaround was to modify the adapter to look for the "expand-" at the start of the task_id value that's passed in, but that catches all nodes within the parallelizeable block, not just the first one. Currently I am tagging the parallelizable nodes and looking for the tag in the do_node_execute to return [self.sentinel_value], which avoid the error.

elijahbenizzy commented 2 weeks ago

Ahh, yes, good point. Thanks! That's a reasonable workaround -- would you mind sharing the code for it? Otherwise we can take a look to see how we might cascade it forwards with these expectations.

That said, it's a little iffy on requirements. What would you expect to happen (I think I know which one I'd like...):

  1. The first three get executed, the rest get dropped
  2. Nothing gets executed, the parallelizable failed
  3. It returns an empty list

(2) is not crazy, but (1) is a good UX IMO. And I think might be easy to implement...

JamesArruda commented 2 weeks ago

I modified the adapter to have:

    def run_to_execute_node(
        self, *, node_callable: Any, node_kwargs: Dict[str, Any], node_tags: dict[str, Any], **future_kwargs: Any
    ) -> Any:
        """Executes a node. If the node fails, returns the sentinel value."""
        for key, value in node_kwargs.items():
            if value == self.sentinel_value:  # == versus is
                return self.sentinel_value  # cascade it through
        try:
            return node_callable(**node_kwargs)
        except self.error_to_catch:
            if node_tags.get("for_graceful", "") == "parallel":
                return [self.sentinel_value]
            return self.sentinel_value

And had:

from hamilton.function_modifiers import tag

@tag(for_graceful="parallel")
def distro(n: int, allow_fail: bool) -> Parallelizable[int]:
    for x in range(n):
        if x > 4 and allow_fail:
            raise Exception("bad")
        yield x * 3

I would prefer (1). If the function fails before the yield/return portion, then I'd expect (2). Or for it to act as if it gets one run through the parallelizable block with the sentinel. As long as the collect block got some kind of list of sentinels/actual results.

I'm working on an expansion of the adapter that'll grab traceback and other metadata and pass that forward as a specific object class (as the sentinel) so I know everything will make it to the end no matter what (and I can introspect and write my logs, etc. based on that). Because of that, anything that fails safely forward and passes the sentinel down is preferred.

elijahbenizzy commented 2 weeks ago

Ok, great, would love it if you contributed that back! Yes, agreed on (1), and your edge-cases.

Easy enough implementation (might have to do a little more surgery, but shouldn't be too hard):

  1. If the node is an Parallelizable type then it outputs an item for every non-failed one (E.G. if index 0,1,2,5 succed and 3/4 fail it would be [res_1, res_2, SENTINEL, SENTINEL, res_5]
  2. Collect should filter out the sentinel values (or cascade them through, TBD...). I think filtering out makes sense but the code has to be resilient to list shortening. Also filtering wouldn't work with your "collector" approach which I really like.
  3. If it fails before the generator is started it should probably have an empty list. Collect will then be able to use the fact that its empty to bypass the rest. Could also work with the single list you have.
  4. If it fails inside a Parallelizable it should be the same as failing during the generator

Note that with (1) it could also fail for every step after any errors (in the generator), assuming that the first error cuts out control flow. IMO that's cleaner -- you don't want to be running code after an exception. So the example from (1) would be [res_1, res_2].

The ideal solution would be to check for the sentinel type inside the framework but I think that's a bit challenging and doesn't change too much.

Re: implementation -- you seem to be building something exciting -- do you want to contribute back with these changes? I've uploaded a PR that provides the deeper framework-level changes you'll need -- figure that will help you get started. If you're going to build the adapter for your own use-case we can probably find a way to generalize it once you're happy together.

https://github.com/DAGWorks-Inc/hamilton/pull/1010

JamesArruda commented 2 weeks ago

Thank you very much for the PR upload, can't wait to dig in.

I'll definitely be putting a PR when I have it moderately functional!.

elijahbenizzy commented 2 weeks ago

Thank you very much for the PR upload, can't wait to dig in.

I'll definitely be putting a PR when I have it moderately functional!.

You got it! I don't doubt you'll be able to figure this out but feel free to reach out if you want help navigating -- slack is probably the easiest to reach us.

zilto commented 1 week ago

Closing since PR was merged