pydiverse / pydiverse.pipedag

A data pipeline orchestration library for rapid iterative development with automatic cache invalidation allowing users to focus writing their tasks in pandas, polars, sqlalchemy, ibis, and alike.
https://pydiversepipedag.readthedocs.io/
BSD 3-Clause "New" or "Revised" License
15 stars 2 forks source link

Automatic extraction of pipeline subgraph for PROD execution #217

Open windiana42 opened 3 weeks ago

windiana42 commented 3 weeks ago

Flow.run() should be able to extract a subgraph of the overall pipeline when given a reference to a task output and additional specification where in the graph we want to insert the actual input data and where we want to save tables flowing in the training pipeline and simply load them as constant input data when running the PROD pipeline subgraph.

usage syntax hypothesis

This could be the top level wiring code for a close to reality data pipeline:

class CalibrationState:
    def __init__(self):
        self.feature_parameters = dag.LazyDict()
        self.parameter_tbls = dag.LazyDict()

def get_pipeline(attrs):
    with Flow("typical_pipeline") as flow:
        with Stage("01_raw_input"):
            raw_tbls = read_input_data()
        with Stage("02_early_cleaning"):
            clean_tbls = clean(raw_tbls)
        with Stage("03_economic_representation"):
            train_test_set = mark_train_test_set(clean_tbls, **attrs["train_test_set"])
            # tbls = dict with keys: insuree, contract, provider, case, position
            tbls = economic_representation(clean_tbls, train_test_set)
        with Stage("04_features"):
            feature_tbls = features(tbls, train_test_set)
            # calibration_state holds dictionaries of parameters and parameter tables
            # which are computed during training based on train set but already applied
            # to rows of test set. For deployment, the calibration_state will be loaded
            # and injected as constant input into the pipeline subgraph that runs in
            # production.
            calibration_state = CalibrationState()
            feature_tbls2 = calibrated_features(tbls, feature_tbls, train_test_set, calibration_state)
            feature_tbls.update(feature_tbls2)  # will be executed lazily in consumer tasks
        with Stage("05_model_training"):
            run_id = get_run_id()
            input_data_train, target_train, encoding_paramters = model_encoding(
                tbls, feature_tbls, train_test_set, run_id, train=True
            )
            model, model_run_id, output_train = model_train(input_data_train, target_train, run_id)
        with Stage("06_model_evaluation"):
            run_id = get_run_id(run_id)  # get a run id if only evaluation is running
            document_link(run_id, model_run_id)
            input_data_test, target_test, _ = model_encoding(
                tbls, feature_tbls, train_test_set, run_id, encoding_parameters=encoding_paramters
            )
            output_test = model_predict(model, input_data_test, run_id)
            evaluation_result = evaluate_model(output_test, target_test, run_id)
            document_evaluation(evaluation_result, run_id)
    prod_in_out_spec = InOutSpecification(
        input=clean_tbls, ignore=[train_test_set, run_id],
        const=[model, model_run_id, feature_parameters, parameter_tbls, encoding_paramters], output=output_test,
    )
    return flow, prod_in_out_spec

The InOutSpecification object here would be nothing more than a dictionary holding references from within the graph. Those references may be tables, dictionary of tables, or anything a pipedag task may produce or consume.

If you simply want to run the training pipeline, you run this code as:

    # pure training
    flow, _ = get_pipeline(cfg.attrs)

    result = flow.run(config=cfg)
    assert result.successful

If you like to prepare a production release, you might want to do the following:

    # prepare release
    with StageLockContext():  # keep cache locked until outputs are read back
        flow, prod_in_out_spec = get_pipeline(cfg.attrs)

        result = flow.run(config=cfg)
        assert result.successful
        const_inputs = result.get(prod_in_out_spec.const, input_type=pl.DataFrame)
    write_const_inputs(const_inputs, const_input_directory)

It would be nice if this is enough to run a pipeline subgraph in production:

    # in PROD:
    prod_cfg = PipedagConfig.default.get_instance("arrow_store_with_duckdb")
    flow, prod_in_out_spec = get_pipeline(prod_cfg.attrs)

    clean_tbls = prod_input_adapter(in_directory)
    const_inputs = read_const_inputs(const_input_directory)
    result = flow.run(prod_in_out_spec.output, config=prod_cfg,
                      inputs={prod_in_out_spec.input: clean_tbls, prod_in_out_spec.const: const_inputs,
                              prod_in_out_spec.ignore: None})
    output = result.get(prod_in_out_spec.output)

details

  1. Please note the two different approaches to solve the somewhat recursive behavior that some functions might calibrate parameters on training data or even produce constant tables and apply this calibrated information on both training and test set: a) model_encoding() is called twice and b) calibrated_features() is only called once with a parameter calibration_state which is empty unless it is saved as constant input for production
  2. Flow.run() needs to traverse the task graph from desired output to source nodes until some task inputs are mentioned in the inputs dictionary. If a reference should be ignored in production, we feed None as input into the tasks reading this reference.