apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.86k stars 4.25k forks source link

[prism] Support Stateful SDFs (state and timers) #32139

Open lostluck opened 2 months ago

lostluck commented 2 months ago

Prism doesn't currently correctly support Stateful Splittable DoFns. That is, SDFs with State and Timers. This issue is to track whether this changes, and to outline the problem so an error message can link here.

The recommended work around is to split the SDF into two: An SDF, and a StatefulDoFn. Ideally the KV key into the SDF is different than what's into the Stateful DoFn, to prevent the implied fusion break of the StatefulDoFn from being less efficient. (eg. if the SDF is keyed by a partition or a filename, it shouldn't be emitting values with that as the key to the splittable DoFn. That limits parallelism anyway.)


The difficulty is that what we execute for an SDF is not based on the users's key due to the rewriting an SDF goes though. The executed KV for a ProcessSizedElementAndRestriction is a KV(KV(element, restriction), float64), where element may be a user KV itself with what the user key dictates. (See https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/runners/prism/internal/handlepardo.go#L126)

I believe the user expectation would be to base state on their Key, and not the full Element+Restriction pair, or specifically the Key+Value+Restriction triple.

So the element manager would need to be aware of this for extracting the key for the element, and processing only one bundle for the key at a time, and also not "break" the key handling for state elsewhere.

lostluck commented 2 months ago

In discussion with Kenn, we came to the conclusion that while from a user affordance perspective, they are orthogonal, and thus compatible features, Kenn noted "But in some sense they are, in fact, diametrical opposites. State is all about non-parallelism, while splitting is all about parallelism." and I can't help but agree.

I now have a task to document the contraindication in the programming guide and protocol buffers.