pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
15 stars 2 forks source link

Common case fast: If all tasks in a stage are cache valid, then there should not be spent time to write data to a new schema #49

Closed windiana42 closed 1 year ago

windiana42 commented 1 year ago

The common case is that a developer only works on one task or stage. In this scenario, most stages have 100% cache-valid tasks. With million of rows, however, even a CREATE TABLE out_tbl AS SELECT * FROM in_tbl may take minutes. Thus, even it makes pipedag much more complex, we should not transfer any data to a schema with 100% cache-valid tasks. The problem is, we don't know this upfront. I see two approaches:

1) When a cache valid task runs, it creates an ALIAS in the transaction schema. It tells the RunContextServer that it wants to do a deferred copy of the data to this ALIAS. Every cache invalid task tells that it is cache invalid to RunContextServer. Once the RunContextServer sees the first cache invalid task, it will spawn processes for every table copy operation to a temporary name in the transaction schema. On Stage commit, the RunContextServer knows whether we have 100% cache valid tasks. If so, it will just abandon the transaction schema and leave the active schema in-place. If not, it will join all table copy processes and then replace the ALIASes with the renamed temporary tables.

2) We defer stage-initialization until the first cache-invalid task is found. We still need to keep track of all cache valid tasks before, for which we need to copy the data to the transaction schema.

I find 1) much easier to implement than 2). However, I am not sure all database targets support an efficient ALIAS mechanism. Views might be a second option, but they require a lot of custom code for dealing with the resulting tables (we might have to implement redirection on our end to reflect primary keys for example)