pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
19 stars 3 forks source link

Imperative Materialization #164

Closed windiana42 closed 5 months ago

windiana42 commented 6 months ago

In some cases, it might make sense to imperatively trigger materialization in the middle of a task:

@materialize(lazy=True, input_type=sa.Table)
def task():
  sql = sa.select(sa.literal(1).label("a"))
  tbl = dag.Table(sql).materialize()
  sql2 = sa.select(tbl).limit(1)
  return dag.Table(sql2)

On first sight this looks like a crazy violation of principles that are important to make automatic cache invalidation work. On the other hand, it can be implemented tolerating a bit more magic. In fact it would be nice if it is even possible to close with a materialize: return dag.Table(sql2).materialize(). It would make no difference except that any errors during materialization would still be reported with stack trace of task open => much easier debugging.

The magic that needs to happen is that in case multiple dag.Table().materialize() happen inside a task, the situation of the first being cache valid and the second cache invalid will cause trouble. There are multiple ways out of this trouble: 1) We throw an exception on the second materialize() that is cache-invalid. This will trigger a rerun of the task where the first is executed as well. 2) The first materialize returns a reference to the cached object instead of a fake reference.

Conceptional ideas for the magic:

windiana42 commented 6 months ago

As I am writing this, I am favoring the solution:

  1. The first materialize returns a reference to the cached object instead of a fake reference.

I think there is not that much that has to change for this to happen. Not even in the cache invalidation logic. The logic which table depends on which other one (subsequent materialize() calls) is available at runtime by the reproduced sequence of materialize() calls and does not need to be represented in metadata tables.

windiana42 commented 6 months ago

For the implementation of option 2., there are a few detailed decisions necessary:

  1. We support calling tasks without RunContext open for dataframe tasks that simply take a dataframe and return one. In general we like that return dag.Table(...) should be identical to return dag.Table(...).materialize(). Thus it is unclear what Table.materialize() should do in case no RunContext is open. One idea would to not do anything and simply to return Table.obj. That is quite different to what happens with RunContext open, but in fact the usability is probably maximized by this measure.
  2. With RunContext open, it would be nice to offer three options for what to return from Table.materialize(): a) Return a reference with the exact same type as input_type of the task. b) Return an explicitly given return_as_type object. c) Return None and don't dematerialize after writing the table. There are plenty of ways how to design the call interface. One option is Table.materialize(config_context: ConfigContext | None = None, return_as_type = None, return_nothing:bool = False, drop_if_exists:bool = False). I suggest to interpret return_as_type=None as take input_type from Task if respective TableHook returns a reference (TableHooks need to say themselves what they do). If the input_type TableHook does not return a reference, I suggest returning a sqlalchemy. Table reference by default.
  3. We don't really need the debug.materialize_table function any more with Table.materialize(drop_if_exists=True). I would still keep it for more control of options (debug_suffix, flag_task_debug_tainted). The implementation of both can be shared though. I would also set table.debug_tainted=True in Table.materialize() if it is executed multiple times for the same table object.
  4. We assume that every Table.materialize() is a dependency of every subsequent Table.materialize() call or any Table returned by the task. This can be reflected with a private Table property that is also JSON serialized and thus affects cache invalidation. Table._assumed_dependencies=[Table(...), Table(...), ...]