DAGWorks-Inc / hamilton

Hamilton helps data scientists and engineers define testable, modular, self-documenting dataflows, that encode lineage/tracing and metadata. Runs and scales everywhere python does.
https://hamilton.dagworks.io/en/latest/
BSD 3-Clause Clear License
1.88k stars 126 forks source link

Add async support for pipe_family #1223

Closed jernejfrank closed 1 week ago

jernejfrank commented 1 week ago

Enables running pipe_input, pipe_output and mutate with asyncio. Addresses #1193.

Changes

How I tested this

import asyncio

import pandas as pd

from hamilton import async_driver
from hamilton.function_modifiers import pipe_output, pipe_input, mutate, apply_to, step, hamilton_exclude

async def data_input() -> pd.DataFrame:
    await asyncio.sleep(0.0001)
    return pd.DataFrame({
        "a": [1, 2, 3],
        "b": [4, 5, 6]
    })

async def _groupby_a(d: pd.DataFrame) -> pd.DataFrame:
    await asyncio.sleep(0.0001)
    return d.groupby("a").sum().reset_index()

async def _groupby_b(d: pd.DataFrame) -> pd.DataFrame:
    await asyncio.sleep(0.0001)
    return d.groupby("b").sum().reset_index()

@pipe_input(
    step(_groupby_a).when(groupby="a"),
    step(_groupby_b).when_not(groupby="a"),
)
def data_pipe_input(data_input: pd.DataFrame) -> pd.DataFrame:
    return data_input

@pipe_output(
    step(_groupby_a).when(groupby="a"),
    step(_groupby_b).when_not(groupby="a"),
)
def data_pipe_output(data_input: pd.DataFrame) -> pd.DataFrame:
    return data_input

def data_mutate(data_input: pd.DataFrame) -> pd.DataFrame:
    return data_input

@mutate(data_mutate)
async def _groupby_a_mutate(d: pd.DataFrame) -> pd.DataFrame:
    await asyncio.sleep(0.0001)
    return d.groupby("a").sum().reset_index()

@mutate(apply_to(data_mutate))
async def _groupby_b_mutate(d: pd.DataFrame) -> pd.DataFrame:
    await asyncio.sleep(0.0001)
    return d.groupby("b").sum().reset_index()

@hamilton_exclude
async def main():
    import __main__
    dr = (await async_driver.Builder()
          .with_modules(__main__)
          .with_config(dict(groupby="b"))
          .build())
    results = await dr.execute(["data_pipe_input", "data_pipe_output", "data_mutate"])
    print(results)

if __name__ == "__main__":
    asyncio.run(main())

Notes

Follows the pattern used in subdag by creating an async def placeholder callable.

Checklist