pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
12 stars 2 forks source link

Implement inline feature in pipedag.Table() to prevent materialization to database if not needed #163

Open windiana42 opened 3 months ago

windiana42 commented 3 months ago

The idea would be that inline=True is just a hint which opens the possibility that materialization to database is actually disabled if communication between two tasks can be established otherwise. For example between pandas 2.x and polars tasks, it is possible to transfer apache arrow dataframes.

This issue interacts with #76 since it is similar only that SQL Writeback is skipped altogether and parallelization does not play a role here. Multi-node orchestration needs to be considered. A simple starting solution would be to ignore inline flag in this case.

Furthermore, the interpretation of the input attribute should be configurable per instance. For example one should be able to set inline=True globally for a complete instance (might make sense to reduce storage capacity for some large pipeline instance where we care only about the output). Or it should be possible to force inline=False for a complete pipeline instance in case it is working with so small data that materializing intermediate results does not take time. So a tri-logic with True, False, None could make sense and override logic with (None -> True, None-> False, Force-True, Force-False) in instance configuration.

This feature heavily interacts with Caching. Essentially, a preceding task becomes part of the cache invalidation of subsequent tasks because their output is not available in cache. In other situations, it also turned out that a concept change might be beneficial for lazy=True tasks, where they could throw a cache invalid exception which may trigger a rerun of the task. In this case, we could implement version=AUTO_VERSION also as lazy=True and the rerun would trigger preceding inlined tasks as well. Assuming we don't make this concept change, lazy=True will probably never be used together with inline=True. Thus the retry concept of version=AUTO_VERSION may be sufficient to trigger the computation of preceding inlined tasks.

windiana42 commented 3 months ago

If #164 leans towards not implementing any retry mechanism for lazy=True tasks, I might favor the solution to take lazy=True on a consumer as a trigger to reject inline=True hint. As a consequence, inline tasks just need to be called preceding the determination that an eager task is cache invalid. For version=AUTO_VERSION, also the chain of inlined tasks must be version=AUTO_VERSION and will be executed together. In this case, the subgraph execution will be constructed from the end based on the first task that effectively is not inlined or by the stage finalization task.

windiana42 commented 3 months ago

It would be nice if inlined Tables could still be written to local_table_cache. Since the local_table_cache does not implement full blown cache metadata, it would be fine to write a random version hash if it cannot be correctly computed.