DAGWorks-Inc / hamilton

Hamilton helps data scientists and engineers define testable, modular, self-documenting dataflows, that encode lineage/tracing and metadata. Runs and scales everywhere python does.
https://hamilton.dagworks.io/en/latest/
BSD 3-Clause Clear License
1.71k stars 105 forks source link

Parallel Execution: Collect node returns `list[list[pd.DataFrame]]` instead of `list[pd.DataFrame]` #1030

Closed legout closed 3 weeks ago

legout commented 1 month ago

The following toy example returns a list[list[pd.DataFrame]] instead of list[pd.DataFrame].

test_flow

from hamilton.htypes import Parallelizable, Collect
import random
import pandas as pd

def df_generator() -> Parallelizable[pd.DataFrame]:
    for i in range(10):
        yield pd.DataFrame(
            {"a": random.randint(0, 100), "b": [random.random() for i in range(100)]}
        )

def collect_df(df_generator: Collect[pd.DataFrame]) -> list[pd.DataFrame]:
    return df_generator

if __name__ == "__main__":
    import __main__ as test
    from hamilton import driver

    dr = (
        driver.Builder()
        .with_modules(test)
        .enable_dynamic_execution(allow_experimental_mode=True)
        .build()
    )

    final_vars = ["collect_df"]
    r = dr.execute(final_vars=final_vars)

Hamilton Version: 1.70.0

skrawcz commented 1 month ago

@legout thanks for catching this. This is an unusual situation, since using "Parallelizable" doesn't make sense here, as nothing is actually being Parallelized. That is, we should expect a node in between them.

Instead of Parallelizable, you'd instead do something like this:

def df_generator() -> list[pd.DataFrame]:
    dfs = []
    for i in range(10):
        dfs.append(pd.DataFrame(
            {"a": random.randint(0, 100), "b": [random.random() for i in range(100)]}
        ))
    return dfs

def all_dfs(df_generator: list[pd.DataFrame]) -> pd.DataFrame:
    return pd.concat([df_generator])

Do you have more context here? Since I think we should probably error out in this situation -- thoughts?

skrawcz commented 1 month ago

It's on our TODO to document / provide better handling as seen in https://github.com/DAGWorks-Inc/hamilton/issues/301

legout commented 1 month ago

@legout thanks for catching this. This is an unusual situation, since using "Parallelizable" doesn't make sense here, as nothing is actually being Parallelized. That is, we should expect a node in between them.

Instead of Parallelizable, you'd instead do something like this:

def df_generator() -> list[pd.DataFrame]:
    dfs = []
    for i in range(10):
        dfs.append(pd.DataFrame(
            {"a": random.randint(0, 100), "b": [random.random() for i in range(100)]}
        ))
    return dfs

def all_dfs(df_generator: list[pd.DataFrame]) -> pd.DataFrame:
    return pd.concat([df_generator])

Do you have more context here? Since I think we should probably error out in this situation -- thoughts?

@skrawcz Thanks for your fast response. For sure, this might be an unusual situation. But I wonder, if your alternative is really the same, when using a remote executor like MultiThreadingExecutor?

skrawcz commented 1 month ago

But I wonder, if your alternative is really the same, when using a remote executor like MultiThreadingExecutor?

So the way Parallelizable + Collect currently works is that only the code between them, is submitted to an executor. So in this example above, everything would be done serially - nothing would be submitted to be run in parallel.

To run that generation in parallel you'd instead want to do:


from hamilton.htypes import Parallelizable, Collect
import random
import pandas as pd

def index() -> Parallelizable[int]:
    for i in range(10):
        yield i

def df_generator(index: int) -> pd.DataFrame:
     return pd.DataFrame(
            {"a": random.randint(0, 100), "b": [random.random() for i in range(100)]}
     )

def collect_df(df_generator: Collect[pd.DataFrame]) -> list[pd.DataFrame]:
    return df_generator

if __name__ == "__main__":
    import __main__ as test
    from hamilton import driver

    dr = (
        driver.Builder()
        .with_modules(test)
        .enable_dynamic_execution(allow_experimental_mode=True)
        .build()
    )

    final_vars = ["collect_df"]
    r = dr.execute(final_vars=final_vars)

Now - the usual caveats apply with doing anything in parallel in python. E.g. with multithreading CPU bound operations will likely mean things aren't done in parallel, but switching to multi-processing would work in parallel but you'd incur serialization/deserialization costs... So it only really makes sense to do if the time it takes to compute is long, which then makes parallelization worthwhile. Does that help?

legout commented 1 month ago

Ok got it. Thanks for the detailed explanation!

skrawcz commented 3 weeks ago

closing since I think we've explained the expected behavior.