pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
19 stars 3 forks source link

Batched processing of dataframe based task: reduce memory consumption and parallelize #205

Open windiana42 opened 3 months ago

windiana42 commented 3 months ago

In case of dataframe based task code, it is sometimes possible to batch computation based on some grouping columns. This can be used to reduce the memory footprint (not the whole input and output dataset needs to reside in memory), and it can be used for parallelization (i.e. Snowflake bulk upload speed greatly increases with >100 small snippet COPY INTO requests in flight).

So first order, the solution is simple. Instead of:

  1. dematerialize inputs
  2. call task function
  3. materialize outputs

One would like to do: for each chunk:

  1. dematerialize chunk inputs
  2. call task function
  3. materialize chunk output

finalize output table based on chunks

The main problem is that there are a million ways to configure chunking. It is also quite interwoven with other features because user input (grouping columns, number of batches, temporary storage directory, ...) needs to reach (de-)materialization hooks.

One idea would be to establish the concept of an invocation hook which can be special case hacked in user space which integrates into pipedag task calling mechanics:

@materialize(version=AUTO_VERSION, input_type=pl.LazyFrame, invocation_hook=Batched(n=50, columns=["id"]))
def task(tbl1: pl.LazyFrame, tbl2: pl.LazyFrame):
  ...

This feature needs to be aligened with:

Future features:

windiana42 commented 3 months ago

For parallelization, it might require explicit state handling for ContextVar based contexts. Especially when using multi-threading.

Snowflake is quite slow when internalizing a huge table via COPY INTO from S3. Thus it would be nice to support n-way multi-processing for dematerialize->task function call->write parquet->upload S3 chain and m-way multi-threading for keeping >100 COPY INTO chunks in flight.

windiana42 commented 3 months ago

This is the code region which requires refactoring for this feature: https://github.com/pydiverse/pydiverse.pipedag/blob/main/src/pydiverse/pipedag/materialize/core.py#L742

            def imperative_materialize(
                table: Table,
                config_context: ConfigContext | None,
                return_as_type: type | None = None,
                return_nothing: bool = False,
            ):
                my_store = config_context.store if config_context is not None else store
                state = task_cache_info.imperative_materialization_state
                if id(table) in state.table_ids:
                    raise RuntimeError(
                        "The table has already been imperatively materialized."
                    )
                table.assumed_dependencies = (
                    list(state.assumed_dependencies)
                    if len(state.assumed_dependencies) > 0
                    else []
                )
                _ = my_store.materialize_task(
                    task, task_cache_info, table, disable_task_finalization=True
                )
                if not return_nothing:

                    def get_return_obj(return_as_type):
                        if return_as_type is None:
                            return_as_type = task.input_type
                            if (
                                return_as_type is None
                                or not my_store.table_store.get_r_table_hook(
                                    return_as_type
                                ).retrieve_as_reference(return_as_type)
                            ):
                                # dematerialize as sa.Table if it would transfer all
                                # rows to python when dematerializing with input_type
                                return_as_type = sa.Table
                        obj = my_store.dematerialize_item(
                            table, return_as_type, run_context
                        )
                        state.add_table_lookup(obj, table)
                        return obj

                    if isinstance(return_as_type, Iterable):
                        return tuple(get_return_obj(t) for t in return_as_type)
                    else:
                        return get_return_obj(return_as_type)

            task_context.imperative_materialize_callback = imperative_materialize
            result = self.fn(*args, **kwargs)
            task_context.imperative_materialize_callback = None
            if task.debug_tainted:
                raise RuntimeError(
                    f"The task {task.name} has been tainted by interactive debugging."
                    f" Aborting."
                )

            def result_finalization_mutator(x):
                state = task_cache_info.imperative_materialization_state
                object_lookup = state.object_lookup
                if id(x) in object_lookup:
                    # substitute imperatively materialized object references with
                    # their respective table objects
                    x = object_lookup[id(x)]
                if isinstance(x, (Table, RawSql)):
                    # fill assumed_dependencies for Tables that were not yet
                    # materialized
                    if len(state.assumed_dependencies) > 0:
                        if x.assumed_dependencies is None:
                            x.assumed_dependencies = list(state.assumed_dependencies)
                return x

            result = deep_map(result, result_finalization_mutator)
            result = store.materialize_task(task, task_cache_info, result)

It would be nice to abstract it a bit more such that the custom invocation hooks only need to mess with:

  1. chunking preparation
  2. looping over chunks (some support should be given to simplify multi-processing / multi-threading)
  3. feeding information to (de-)materialization hooks (query manipulation and temporary chunk storage management)
  4. finalizing materialization
windiana42 commented 3 months ago

In case of Snowflake, fast bulk load requires writing temporary parquet files. This overlaps with activity of local table cache. It could make sense to allow an invocation hook to tightly bind to the local table cache such that

  1. parquet files end up in a directory configured in the local table cache
  2. the local table cache stores each chunk individually (changes in chunking configuration will automatically invalidate cache; it is possible to offer unchunked access to cached chunks with polars scan functionality)