apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.67k stars 4.19k forks source link

Remove expensive shuffle of read data in KafkaIO when using sdf and commit offsets #31682

Open scwhittle opened 6 days ago

scwhittle commented 6 days ago

The reshuffle adds a barrier so that the read records were committed before possibly committing offsets. However this is unnecessary as the DoFn is annotated with RequiresStableInput and the fusion barrier can be introduced with just shuffling the offsets.

In both cases it is possible for a commit offset to be committed back to kafka before the data has been entirely processed by the pipeline. However with support to drain data from the pipeline (such as in Cloud Dataflow) this allows for exactly-once semantics for KafkaIO via the committed offsets to the topics across pipeline drain and restart.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels Python tests Java tests Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

scwhittle commented 6 days ago

@kennknowles I'd appreciate you to take a look and let me know if this makes sense with the beam model in general. I don't think that the Reshuffle adds any more guarantees about barriers for other runners than the combine itself would but could use some confirmation.

github-actions[bot] commented 6 days ago

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

kennknowles commented 5 days ago

I think the problem is that no runner actually implements RequiresStableInput. If Dataflow implemented it, the implemention would be a shuffle.

scwhittle commented 5 days ago

There is still a shuffle of the offsets due to the combine-per-key before the offset commit fn. If there was not that shuffle, having RequiresStableInput insert a shuffle would be ok because this shuffle would just be of the commit offsets not the read data and thus cheap.

We need to guarantee that offsets are committed to kafka only once the records have been processed by the system such that no records are lost if the pipeline is drained. For dataflow streaming, given that all side-effects/outputs of a fused stage are committed atomically, it is sufficient if the committing of the offsets is in a subsequent fused stage (in this case enforced by combining per key to get max offset) as the processing of the records themselves either completes in the fused-stage reading the records or the effects of that processing are also part of the same atomic commit.

For other runners such as Flink, is Reshuffle more powerful than any other groupby key, i.e. does it insert some sort of checkpoint barrier? If not, the existing solution of reshuffling the data+offsets doesn't seem like it guarantees any better that the commitfn runs after the record processing effects have been persisted in a checkpoint. The fused graph with the reshuffle looks something like:

Read -> Reshuffled records+ offsets + fused processing  -> fused stage with more user processing                                                                                               
                                                       \-> per key combined fused stage commiting offsets

With flink, if data is just flowing through before checkpoint barrier, there is nothing to prevent offsets from being committed to Kafka before the checkpoint passes, so it seems this reshuffle doesn't provide any further guarantees.

kennknowles commented 3 days ago

There is still a shuffle of the offsets due to the combine-per-key before the offset commit fn. If there was not that shuffle, having RequiresStableInput insert a shuffle would be ok because this shuffle would just be of the commit offsets not the read data and thus cheap.

We need to guarantee that offsets are committed to kafka only once the records have been processed by the system such that no records are lost if the pipeline is drained. For dataflow streaming, given that all side-effects/outputs of a fused stage are committed atomically, it is sufficient if the committing of the offsets is in a subsequent fused stage (in this case enforced by combining per key to get max offset) as the processing of the records themselves either completes in the fused-stage reading the records or the effects of that processing are also part of the same atomic commit.

For other runners such as Flink, is Reshuffle more powerful than any other groupby key, i.e. does it insert some sort of checkpoint barrier? If not, the existing solution of reshuffling the data+offsets doesn't seem like it guarantees any better that the commitfn runs after the record processing effects have been persisted in a checkpoint. The fused graph with the reshuffle looks something like:

Read -> Reshuffled records+ offsets + fused processing  -> fused stage with more user processing                                                                                               
                                                       \-> per key combined fused stage commiting offsets

With flink, if data is just flowing through before checkpoint barrier, there is nothing to prevent offsets from being committed to Kafka before the checkpoint passes, so it seems this reshuffle doesn't provide any further guarantees.

Yes, for Flink this really needs to be something that happens after the checkpoint is known to be durable. To do this right, we probably could use a semantic construct for that, which would also solve it for Dataflow.

Given that everything about this implementation depends on Dataflow's idiosyncracies, I'm OK with modifying it to further depend on them but have better performance.