pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
12 stars 2 forks source link

Persist manually versioned intermediate state / decoupling development on two parts of the pipeline #195

Open windiana42 opened 1 month ago

windiana42 commented 1 month ago

Background: Pydiverse pipedag is currently more used at the beginning of pipelines before training actual models. In the process of increasing adoption further down, a demand was raised for decoupling the experimentation with model training from the early part of preparing the input. The model training part may also include a thin layer of feature engineering which is more about selecting, encoding, and combining base feature data. We call it "feature encoding" here. A general pipeline might be split as follows:

input preparation: raw ingestion -> cleaning for inspection -> best possible representation for economic reasoning -> feature engineering

model training: -> feature encoding(s5) -> model training(s6) -> model evaluation(s7)

Idea: Similar to the local table cache, the pipedag configuration could include another table store called "persistence_table_store". When having a flow f and a task output t, one could actively create a manually persisted version of t with version_id = f.persist([t]). When executing model training code, one could run f.run([s5,s6,s7], persisted_inputs={t: version_id}). Assuming the persistence_table_store is configured as parquet storage in a certain directory or s3 bucket, the version_id would be a monotonically increasing string (i.e date and time) which makes it reasonably easy to manually delete all persisted versions older than 3 months for example. Since it is done manually, keeping a few or all versions long term is no problem.

Interaction:

Questions:

  1. It is unclear whether f.run([s5,s6,s7], persisted_inputs={t: version_id}) should check whether version_id is cache valid with the code in the currently running repo checkout or whether it leaves this check up to the caller. The most common usecase here is to perform this manually versioned persistence on a flatfile representation. It can be assumed that new code typically just adds columns to the flatfile representation. As a consequence, enforcing strict cache validity would be an annoying restriction.
  2. Enforcing identical position_hash for task that produced t would be another option, but it has the same downsides as checking full cache validity
  3. In case persistence_table_store uses parquet, should the default behavior be that dematerialization for SQL based tasks throws and error, or that it should be tried using duckdb. In combination with a parquet based local_table_cache it would even be possible to execute queries spanning both persisted tables and non-persisted tables.