pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
15 stars 2 forks source link

feat(): Task links #216

Closed DominikZuercherQC closed 2 weeks ago

DominikZuercherQC commented 3 weeks ago

Idea for a new feature: Add the option to pass task_links to flow.run(). task_links defines a mapping from tasks to (external) tables in the database. When the flow is executed all tasks run as usual expect for the tasks listed in the mapping. These tasks are not executed, instead an ExternaTableReference is established for them. If the option is not used flow.run() works as usual.

Usecase: Development on a task requires testing it on a large dataset. Development in the original database is discouraged as it is used by many users. Duplicating / rerunning the entire database takes too long / too many computational resources. -> This feature allows to use the tables in the original database without overhead, only run the tasks/stages interesting for the development at hand and leaves the other users undisturbed.

Note: Most of the changes actually originate from moving container.py to another submodule which was needed to solve a circular import (maybe there is a better way?)

@windiana42 / @nicolasmueller WDYT?

Checklist

windiana42 commented 3 weeks ago

I would prefer if the syntax of this PR would be similar to: https://github.com/pydiverse/pydiverse.pipedag/issues/217

This might look as follows:

    with Flow() as f:
        with Stage("sql_table_origin"):
            _ = make_external_table()

        with Stage("sql_table_linked"):
            table = duplicate_table_reference()
            _ = consume(table)
    external_link = table

    with StageLockContext():
        # Linked execution. Body of duplicate_table_reference should not be executed,
        # instead output is referenced from the linked table
        result = f.run(
            inputs={
                external_link: ExternalTableReference(
                    "external_table",
                    schema="external_schema",
                )
            }
        )
windiana42 commented 3 weeks ago

We could even support a feature for the usecase to reference the same table in another pipeline instance:

   cfg = PipedagConfig.default.get(instance)
   input_cfg = PipedagConfig.default.get("full") 

    with Flow() as f:
        with Stage("sql_table_origin"):
            _ = make_external_table()

        with Stage("sql_table_linked"):
            table = duplicate_table_reference()
            _ = consume(table)
    external_link = table

    with StageLockContext():
        # Linked execution. Body of duplicate_table_reference should not be executed,
        # instead output is referenced from the linked table
        result = f.run(
            inputs={
                external_link: input_cfg
            },
            config=cfg  # still optional
        )

This syntax would even allow to dematerialize dicts/lists/constants and multiple tables that appear in the output of duplicate_table_reference from the cache of "full" instance.