apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.9k stars 4.27k forks source link

[Bug]: PubsubIO on Flink Runner not acknowledging old messages #32461

Open xzhang2sc opened 2 months ago

xzhang2sc commented 2 months ago

What happened?

I'm using "org.apache.beam:beam-runners-flink-1.18:2.57.0". When I read from pubsub, I found it's not able to acknowledging messages that are generated before the job starts. As a result, the messages are sent to Flink repeatedly, the number of unacked messages stay flat. I also observed a similiar issue to this one https://github.com/apache/beam/issues/31510 The ack message count can be higher than the message produce rate.

It can be reproduced with the following code, it's simply reading from pubsub and print out a string. args

 - "--runner=FlinkRunner"
 - "--attachedMode=false"
 - "--checkpointingInterval=10000"
 - "--unalignedCheckpointEnabled=true"
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import java.util.concurrent.ThreadLocalRandom;

public class Test {
    public static void main(String[] args) {
        FlinkPipelineOptions options = PipelineOptionsFactory.fromArgs(args)
                .withValidation().withoutStrictParsing().as(FlinkPipelineOptions.class);
        Pipeline pipeline = Pipeline.create(options);
        PCollection<PubsubMessage> pubsubMessages = pipeline.apply(
                        PubsubIO.readMessages().fromSubscription(
                                "xxx"))
                .apply("print", ParDo.of(new DoFn<PubsubMessage, PubsubMessage>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        if (ThreadLocalRandom.current().nextDouble() < 0.01) {
                            System.out.println("##################");
                            c.output(c.element());
                        }
                    }
                }));
        pipeline.run();
    }
}

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

liferoad commented 2 months ago

@je-ik Is this something you could provide some help? or any guideline to fix this issue?

xzhang2sc commented 2 months ago

I have a suspicion that the job needs permission to access pubsub metrics (oldest unacked message age) to work properly, verifying that.

xzhang2sc commented 2 months ago

I found it's able to acknowledging old messages after I got the permission to access pubsub metrics. However, the number is not adding up. [update: I don't think accessing pubsub metrics is helping]

In the past 30 minutes, the ack message count stays well about 150/s, in total it should've ack'ed 150 60 30 = 270k messages, but the unacked messages only dropped 8k. The publish rate is about 10/s, which is negligible.

Screenshot 2024-09-16 at 10 42 08 AM
xzhang2sc commented 2 months ago

I found this assumption quite problematic, and the consequence of a wrong watermark is actually dramatic.

This assumes Pubsub delivers the oldest (in Pubsub processing time) available message at least once a minute

If pubsub didn't deliver an old message during the past minute, then the estimated watermark will be wrong. If the watermark has already progressed, then it means old messages don't get acked properly and they will be delivered repeatedly.

In summary I think there are two problems:

  1. the inaccuracy in estimated watermark results in old messages not acked.
  2. The ack message count metric doesn't align with the actual ack'ed messages count. The metrics seems way higher than the actual ack'ed message count.
je-ik commented 2 months ago

What is your ack deadline in PubSub? FlinkRunner can ack messages only after checkpoint, default ack deadline is 10 seconds and your checkpoint interval is aligned with that (--checkpointingInterval=10000). This could cause issues you observe, you might try to either decrese checkpoint interval or increase ack deadline.

xzhang2sc commented 2 months ago

My ACK deadline is 600s, so that shouldn't be the issue

xzhang2sc commented 2 months ago

@liferoad @je-ik PubsubIO is basically unusable on Flink runner, but maybe I'm missing some configurations. Is it possible to bump up the priority of this issue?

je-ik commented 2 months ago

Adding @Abacn @kennknowles who might have more context.