apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.81k stars 4.23k forks source link

[Bug]: [Go SDK] periodic.impulse increases Dataflow Backlog time #27707

Open kemkemG0 opened 1 year ago

kemkemG0 commented 1 year ago

What happened?

We have a slowly updating object that we'd like to use as a SideInput.

We've decided to utilize periodic.Impulse to periodically update the SideInput. We set the startTime to be "now" and the endTime to be "10 years later", with the intention of producing output endlessly.

However, we observed an extreme increase in the Backlog time, ballooning to around 600 weeks. We suspect that this is because we've set the endTime to be 10 years later.

Image below, Reaching to around 500 weeks = 10 years as I set as endTime.

スクリーンショット 2023-07-27 19 17 31

It appears that Dataflow estimates the end of the job as "10 years later = 500 weeks", causing horizontal scaling of workers and resulting in the use of 100 workers.

Below is our simplified sample code, which closely follows this reference.

[Refrence] https://github.com/apache/beam/blob/dc1cfe54bfa0d3a22034f3fea463f0284cb2ba83/sdks/go/examples/slowly_updating_side_input/slowly_updating_side_input.go

func main() {
    flag.Parse()
    beam.Init()
    ctx := context.Background()

    checkFlags(ctx)

    project := gcpopts.GetProject(ctx)
    p, s := beam.NewPipelineWithRoot()
    pubsubCol := pubsubio.Read(s, project, *topicId, &pubsubio.ReadOptions{
        Subscription: *subscription,
    })

    timeInterval := 1 * time.Minute

    windowedPubsubCol := beam.WindowInto(s, window.NewFixedWindows(timeInterval), pubsubCol)

    periodicImp := periodic.Impulse(s, time.Now(), time.Now().AddDate(10, 0, 0), timeInterval, false)
    updatedSideInput := beam.ParDo(s, SideInputGeneratorDoFn, periodicImp)
    windowedSideInput := beam.WindowInto(s, window.NewFixedWindows(timeInterval),
        updatedSideInput,
        beam.Trigger(trigger.Repeat(trigger.Always())),
        beam.PanesDiscard(),
    )

    col2 := beam.ParDo(s, &DoFn1{}, windowedPubsubCol, beam.SideInput{Input: windowedSideInput})
    col3 := beam.ParDo(s, &DoFn2{}, col2)
    pubsubio.Write(s, *wProject, *wProjectId, col3)

    if err := beamx.Run(ctx, p); err != nil {
        log.Exitf(ctx, "Failed to execute job: %v", err)
    }
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

Abacn commented 1 year ago

just note Java PeriodicImpulse having the same issue. Its more likely a Dataflow issue of metrics

kemkemG0 commented 1 year ago

@Abacn Thank you for your reply.

Then, is there a way to implement the slowly updating side input pattern with the Go SDK?

We have come up with some workaround ideas:

  1. Use another PubSub to simulate the impulse.
  2. Use windowing with beam.Trigger(trigger.Repeat(trigger.AfterProcessingTime().PlusDelay(timeInterval))) and beam.Combine to mimic the impulse.
  3. Store data on local file so that each DoFns can access to it.

However, we would like to avoid complex implementations.

Abacn commented 1 year ago

If I understood correctly this is just an artifact of Dataflow UI. Does it affect the functionality of PeriodicImpulse?

victorrgez commented 4 months ago

Same issue here with Python but without having explicitly specified an End Time. Once Dataflow scales up, it will not scale down. As you can see, it has many many weeks in backlog even though Data freshness is already 0 because everything is already being processed at a good pace after catching up with the real backlog

image

image

image

victorrgez commented 4 months ago

I believe this has to do with the fact that PeriodicImpulse uses ImpulseSeqGenDoFn which has a restriction_tracker behind the scenes. Similarly to a SplittableDoFn, it is indeed telling Dataflow that there is a huge backlog yet to process.

EDITED (Answer in next comment): What would be the suitable alternative then for slowly-changing side inputs?

victorrgez commented 4 months ago

@kemkemG0 In case you did not manage to solve the issue, the whole problem is in the class ImpulseSeqGenRestrictionProvider in the method restriction_size which is the one that tells Dataflow how much backlog there is to be processed, and hence, it tells Dataflow not to downscale because there is too much to process yet.

In can be solved "easily" by replacing this method in a subclass for:

def restriction_size(self, unused_element, restriction):
        """
        This removes the backlog issue by specifying the size to be processed is 0
        """
        return 0

And then subclass ImpulseSeqGenDoFn so that it uses this RestrictionProvider and, finally subclass PeriodicImpulse so that it uses your ImpulseSeqGenDoFn