pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
27 stars 3 forks source link

Implement Stage.tbls #103

Open windiana42 opened 1 year ago

windiana42 commented 1 year ago

An initial raw data loading stage might produce an unknown list of tables. However, we want to convert this into a visible dependency graph passing explicit tables into tasks as quickly as possible. One way to do this would be to get a dictionary of table references from a Stage containing all tables produced by this stage. However, this dictionary cannot be produced at graph link time. Thus an idea would be to return a placeholder object by Stage.tbls which can be passed into a task which causes dematerialization of all tables produced by a stage into a dictionary available to a task in the form of its input_type argument.

Cache invalidation for this tbls argument might also be tricky. One can write the whole dictionary of table references to output_json in metadata. The problem might be that output_json can be limited in length for some backends. Another alternative would be that a stage commit modifies a hash in the metadata tables which can be considered as trigger for the Stage.tbls object to invalidate tasks consuming it.

An error should be raised if Stage.tbls is used inside its own stage.

NMAC427 commented 1 year ago

How about this: A RawSql object in the input will be converted to a dictionary of tables produced by it.

@materialize
def foo():
    # Task that produces a table called "x" and "y"
    return RawSql(...)

@materialize(input_type=pd.DataFrame)
def bar(x, y):
    ...

@mateialize(input_type=sa.Table)
def buzz(tables: dict[str: sa.Table])
    ...

with Flow():
    with Stage("s"):
        foo_out = foo()

        # During wiring of tasks, we can use __getitem__ to get a specific table
        # from the RawSQL object
        bar_out = bar(foo_out["x"], foo_out["y"])

        # Alternatively we can pass it into a task and it will be converted
        # into a dictionary
        buzz_out = buzz(foo_out)
windiana42 commented 7 months ago

There might be a relation to #164. We know from all tasks which tables they produce with pipedag materialization functions. So we could even ensure proper dependency tree for tasks reading from Stage.tbls dictionary in case the table was produced this way.