apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.78k stars 4.21k forks source link

python CombineGlobally().with_fanout() cause duplicate combine results for sliding windows #20528

Open damccorm opened 2 years ago

damccorm commented 2 years ago

not only there are more than 1 result per window, results for each window got duplicated as well.

here is some code I made to reproduce the issue, just run it with and without *.with_fanout*

if running with Dataflow runner, add appropriate *gs://path/* in *WriteToText*

 


import apache_beam as beam
from apache_beam.transforms import window
from apache_beam.utils.timestamp import Timestamp

class ListFn(beam.CombineFn):
  def create_accumulator(self):
    return []

 def add_input(self, mutable_accumulator, element):
    return mutable_accumulator + [element]

 def merge_accumulators(self, accumulators):
    res = []
    for accu in accumulators:
      res = res + accu
    return res

  def extract_output(self, accumulator):
    return accumulator

p = beam.Pipeline()

(
    p
    | beam.Create([
      window.TimestampedValue(1, Timestamp(seconds=1596216396)),

     window.TimestampedValue(2, Timestamp(seconds=1596216397)),
      window.TimestampedValue(3, Timestamp(seconds=1596216398)),

     window.TimestampedValue(4, Timestamp(seconds=1596216399)),
      window.TimestampedValue(5, Timestamp(seconds=1596216400)),

     window.TimestampedValue(6, Timestamp(seconds=1596216402)),
      window.TimestampedValue(7, Timestamp(seconds=1596216403)),

     window.TimestampedValue(8, Timestamp(seconds=1596216405))])
    | beam.WindowInto(window.SlidingWindows(10, 5))
    | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
    | beam.Map(repr)

   | beam.io.WriteToText("py-test-result", file_name_suffix='.json', num_shards=1))

p.run()

 

Imported from Jira BEAM-10617. Original Jira may contain additional context. Reported by: leiyiz.

kennknowles commented 2 years ago

If this is still true, then it should be very easy to reproduce. @robertwb or @tvalentyn or @pabloem do you know?

tvalentyn commented 1 year ago

@BjornPrime can you try to reproduce this ?

BjornPrime commented 1 year ago

I've successfully reproduced the issue.

Compare with fanout:

[1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 5, 5, 6, 6, 7, 7, 8, 8]
[1, 1, 2, 2, 3, 3, 4, 4, 5, 5, 6, 6, 7, 7, 1, 1, 2, 2, 3, 3, 4, 4]
[1, 1, 2, 2, 3, 3, 4, 4]
[5, 5, 6, 6, 7, 7, 8, 8, 8, 8]
[8, 8]

and without:

[1, 2, 3, 4, 5, 6, 7]
[1, 2, 3, 4]
[5, 6, 7, 8]
[8]
tvalentyn commented 1 year ago

Thanks, I'll add it to our interrupts tracker.

TheNeuralBit commented 1 year ago

I feel like the labels P1 and good first issue should be mutually exclusive?

tvalentyn commented 1 year ago

it may be neither actually.

kennknowles commented 1 year ago

It sounds like data loss, which should certainly be P1

tvalentyn commented 1 year ago

Sure, we can investigate as P1. P2 was based on the suspicion that this behavior has been like this from initial implementation. As next step, I think we should identify whether with_fanout by nature of its implementation only works for particular classes of combiners/accumulators, for example: Top or Mean where repeating elements twice doesn't affect the end result, or whether this is a bug that can and should be fixed.

kennknowles commented 1 year ago

If we cannot quick fix, we should remove with_fanout until we can fix it, or at least warn somehow, and add to release notes for released SDKs.

kennknowles commented 1 year ago

I mean, disable with_fanout when sliding windows.

BTW is this on all runners?

robertwb commented 1 year ago

OK, the problem is that we re-window to avoid stacked accumulating mode:

https://github.com/apache/beam/blob/release-2.40.0/sdks/python/apache_beam/transforms/core.py#L2742

This re-duplicates the windows for sliding windows WindowFns. (It will do the wrong thing for sessions as well.) What we need to do is set the new windowing to be the same thing without accumulating mode, but without applying the windowfn.

robertwb commented 1 year ago

If we can't get someone to fix this we should at least throw an error ASAP, before the next release, e.g. rejecting all but known good windows (globals and fixed windows).

tvalentyn commented 1 year ago

ack, thanks.

chamikaramj commented 1 year ago

@tvalentyn any updates here ?

tvalentyn commented 1 year ago

started working on this today, no PR yet.

tvalentyn commented 1 year ago

This re-duplicates the windows for sliding windows WindowFns. (It will do the wrong thing for sessions as well.)

Does not seem to repro with sessions. Passing test:

  def test_combining_with_session_windows_and_fanout(self):
    import logging
    class ListFn(beam.CombineFn):
      def create_accumulator(self):
        return []

      def add_input(self, mutable_accumulator, element):
        return mutable_accumulator + [element]

      def merge_accumulators(self, accumulators):
        res = []
        for accu in accumulators:
          res = res + accu
        return res

      def extract_output(self, accumulator):
        return accumulator

    options = PipelineOptions()
    options.view_as(StandardOptions).streaming = True
    with TestPipeline(options=options) as p:
      def has_expected_values(actual):
        from hamcrest.core import assert_that as hamcrest_assert
        from hamcrest.library.collection import contains_exactly
        from hamcrest.library.collection import only_contains

        hamcrest_assert(ordered, contains_exactly([0, 1, 2, 3], [5, 6, 7, 8]))

      result = (
              p
              | beam.Create([
        window.TimestampedValue(0, Timestamp(seconds=0)),
        window.TimestampedValue(1, Timestamp(seconds=1)),
        window.TimestampedValue(2, Timestamp(seconds=2)),
        window.TimestampedValue(3, Timestamp(seconds=3)),

        window.TimestampedValue(5, Timestamp(seconds=5)),
        window.TimestampedValue(6, Timestamp(seconds=6)),
        window.TimestampedValue(7, Timestamp(seconds=7)),
        window.TimestampedValue(8, Timestamp(seconds=8))])
              | beam.WindowInto(window.Sessions(2))
              | beam.CombineGlobally(ListFn()).without_defaults().with_fanout(5)
      )
kennknowles commented 1 year ago

Yea, it probably has to do with any windowfn that duplicates an element into multiple windows

tvalentyn commented 1 year ago

OK, the problem is that we re-window to avoid stacked accumulating mode:

Can we re-window only if we detect accumulating mode? See: https://github.com/apache/beam/pull/23828

Alternatively, we could not rewindow if we detect slidng windows. disabling sliding windows is straightforward too if we want to go that route for now.

chamikaramj commented 1 year ago

This is the only issue currently blocking the 2.43.0 release without an existing cherry-pick request.

Can we push this to the next release ?

tvalentyn commented 1 year ago

I discussed the fix yesterday w/ Robert and we will pursue a slightly different fix, which should be ready shortly. however, this bug has been there from the very first commit implementing with_fanout, so I wouldn't delay this specific release on it.

chamikaramj commented 1 year ago

Ack. Removing this from the milestone for now.

tvalentyn commented 1 year ago

current fix caused an OOM in one of customers' pipelines , planning to revert and investigate further.

kennknowles commented 1 year ago

Is it possible that the OOM is unavoidable because of the fix? This is a pretty seriouis data corruption problem, no? I suppose it is not a regression but I would very much like 2.44.0 to have correct results in this situation.

tvalentyn commented 1 year ago

Is it possible that the OOM is unavoidable because of the fix?

it's unlikely

This is a pretty seriouis data corruption problem, no? I suppose it is not a regression

that's correct. it has been there since when with_fanout was added.

but I would very much like 2.44.0 to have correct results in this situation.

I am planning to come back to it next week when I am on a rotational duty again.

kennknowles commented 1 year ago

OK given this context I am going to remove the release milestone. We can document the range of releases for which it does not work.

kennknowles commented 1 year ago

Actually simply disabling it would be a reasonable way to protect user data and we should do that in the immediate term.

kennknowles commented 1 year ago

The mitigation has been cherrypicked, so I am removing it from the 2.44.0 milestone. The bug remains open.

kennknowles commented 1 year ago

If this is now disabled, is it perhaps P2?

kennknowles commented 1 year ago

If it isn't actively being worked on, I suggest downgrading and unassigning.

ee07dazn commented 1 year ago

any progress on this ?

tvalentyn commented 1 year ago

we had a tentative fix but it has caused a performance regression and was rolled back; since then it remains in the backlog.

ee07dazn commented 1 year ago

I need the combiner because group-by is ineffective. And i needed it to further fanout as its streaming kafka input at quite a high rate.

any possible workaround approaches ?

tvalentyn commented 1 year ago

Ack. this feedback would help w/ prioritization.

I'm not sure I can answer based on limited information about the usecase, but for experimentation processes you could apply https://github.com/apache/beam/pull/23828 locally. These changes only matter at job submission.