apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.76k stars 4.21k forks source link

[Bug]: PaneInfo not populated in Go SDK #31153

Open camphillips22 opened 4 months ago

camphillips22 commented 4 months ago

What happened?

I'm attempting to use early triggering and PaneInfo to limit bundle sizes to avoid running into the dataflow limit of 80MB and have found that PaneInfo does not appear to be populated correctly.

Runner: Dataflow Beam Version: 2.55.1

Here's a test that I believe demonstrates the problem:


func init() {
    register.Function2x0(produceFn)
    register.Function4x0(getPanes)
    register.Emitter1[int]()
}

func produceFn(_ []byte, emit func(beam.EventTime, int)) {
    baseT := mtime.Now()
    for i := 0; i < 10; i++ {
        emit(baseT.Add(time.Minute), i)
    }
}

func Produce(s beam.Scope) beam.PCollection {
    return beam.ParDo(s, produceFn, beam.Impulse(s))
}

func getPanes(ctx context.Context, pi typex.PaneInfo, _ int, emit func(int)) {
    log.Output(ctx, log.SevWarn, 0, fmt.Sprintf("got pane %+v", pi))
    emit(int(pi.Index))
}
func TestPanes(t *testing.T) {
    p, scp := beam.NewPipelineWithRoot()

    c := Produce(scp)
    windowed := beam.WindowInto(
        scp,
        window.NewFixedWindows(5*time.Minute),
        c,
        beam.Trigger(trigger.AfterEndOfWindow().
            EarlyFiring(
                trigger.Repeat(
                    trigger.AfterCount(2),
                ),
            ),
        ),
        beam.PanesDiscard(),
    )
    panes := beam.ParDo(scp, getPanes, windowed)
    paneIdxs := beam.WindowInto(scp, window.NewGlobalWindows(), panes)
    passert.Count(scp, paneIdxs, "pane idxs", 10)
    passert.EqualsList(scp, paneIdxs, []int{0, 0, 1, 1, 2, 0, 0, 1, 1, 2})
    ptest.RunAndValidate(t, p)
}

The logs are all:

got pane {Timing:0 IsFirst:false IsLast:false Index:0 NonSpeculativeIndex:0}

Even if I don't have the indexes correct in the test (the test is failing on the EqualsList), I would expect these to be internally consistent. That is, I would expect there to be at least one IsFirst:true and IsLast:true each.

Issue Priority

Priority: 2 (default)

Issue Components

camphillips22 commented 4 months ago

I messed up the bug report (submitted too early) and it got classified as P3 whereas I think it should be P2

camphillips22 commented 4 months ago

I've tossed up a draft PR that essentially tries to pipe pane info through all the FullValue writes. I now am getting the default NoFiringPane() in the log. Still working on figuring out why the triggering info is not being populated as expected.

got paneinfo: {Timing:3 IsFirst:true IsLast:true Index:0 NonSpeculativeIndex:0}
camphillips22 commented 4 months ago

Still working on figuring out why the triggering info is not being populated as expected.

I think it was actually coming in correctly, but I was misapplying triggering, so things were not happening as expected because I was attempting to use paneinfo before bundles were being committed to the backend.

lostluck commented 4 months ago

First, thank for finding and reporting this!

Agreed that p2 is more appropriate for this issue generally, updated labels.

But probably not higher than that. In principle, using State and Timers should enable the same semantics as they are lower level primitives. BUT, that won't work very well for executions on Batch Dataflow, since timers behave differently when all data is available a-priory.


This would be a blocker for getting triggers working properly on the Go SDK's local runner, Prism, as it's not doing anything with Panes or Triggers at present, though that work is coming up. (see #29650 for the Prism implementation list).

And proper Pane propagation would allow for implementing natively in the Go SDK sophisticated Streaming enabled File Sinks, which rely on correct pane information to output and update files written in an unbounded pipeline.


The example code is demonstrating that the default pane isn't being set to the NoFiringPane. That is a bug, that is probably broken due to a lack of propagation and should be fixed.

What it's not demonstrating is that the pane should be different due to a trigger.

IIRC Triggers only resolve at the downstream GBK/Aggregation, so that's when there would be multiple firings, and different Panes.

Panes are only updated after a trigger is enacted ("fired") from a runner source, like after a GBK. More precisely, The default "No Firing Pane" is the expected default until a trigger actually resolves. The "No Firing Pane" means the given pane was not due to a trigger firing.

So, having the following pipeline should show different firings: