apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.82k stars 4.24k forks source link

DataFrame components cannot be pruned by Interactive Beam #21430

Open damccorm opened 2 years ago

damccorm commented 2 years ago

Unfortunately I haven't been able to diagnose the exact issue here or come up with a minimal repro. I just have some code to reproduce in https://github.com/apache/beam/pull/16445.

That PR adds support for value_count(bins) in the DataFrame API, which for some reason is interacting poorly with pipeline pruning in interactive Beam (rehydrating the pipeline raises an error about a PCollection's producer missing). The PR also adds a test to transform_test.py that replicate the issue, as well as a temporary mitigation in pipeline_fragment.py. I think the mitigation is effectively disabling pipeline pruning, so it likely shouldn't be merged.

Imported from Jira BEAM-13624. Original Jira may contain additional context. Reported by: bhulette.

nika-qubit commented 2 years ago

Thanks for filing this. The mitigation of disabling the pipeline pruning is fine. An example, we don't prune TestStream from the pipeline: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/pipeline_fragment.py#L219

TheNeuralBit commented 2 years ago

But this would disable pruning within pipelines that use the DataFrame API. I'd consider that just a temporary workaround. Shouldn't we investigate what's causing these transforms to be incorrectly pruned?

nika-qubit commented 2 years ago

It just skips the pruning for a specific transform, not the whole pruning feature.

TL;DR: the pruning logic is pretty naive, it marks all transforms and input/ouptut PCollections contributing to a output PCollection bottom-up until it hits the root transform or cached PCollections.

The problem with the pruning logic is that it can preserve only partial transforms/PCollections from a composite transform, which can be a problem (maybe some context is shared within a composite but not in the format of a transform/pcollection).

The mitigation is to skip the pruning of such composite transforms.

The solution might require rewriting the composite transform or add a special composite pruning logic for it.

TheNeuralBit commented 2 years ago

It just skips the pruning for a specific transform, not the whole pruning feature.

Right, but all usages of the DataFrame API build a single composite DataFrameTransform. These usages could even be full pipelines (read_csv, some analytics, to_csv).

TheNeuralBit commented 2 years ago

Re-opening this to track the follow-up work discussed in https://github.com/apache/beam/pull/23069#issuecomment-1245899612

With #23069 we just disabled pruning for DataFrames, we should find a way to correctly prune them.