apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.75k stars 4.21k forks source link

[Bug]: Messages are not ACK on Pubsub starting Beam 2.52.0 on Flink Runner in detached mode #29902

Closed gfalcone closed 4 months ago

gfalcone commented 8 months ago

What happened?

Hello !

I have a streaming job processing messages from Pub/Sub that does not work anymore using Beam 2.52.0 with Flink Runner (in detached mode)

The pipeline works fine in Beam 2.51.0

Here is the code of the pipeline :

package com.mycompany.reco_videocatalog;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URISyntaxException;
import java.security.GeneralSecurityException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VideoCatalogWriteJob {

    private static final Logger logger = LoggerFactory.getLogger(VideoCatalogWriteJob.class);

    public static void main(String[] args) throws URISyntaxException, UnsupportedEncodingException, IOException, GeneralSecurityException,
           InterruptedException {
               // Register our custom options interface and parse command line arguments.
               PipelineOptionsFactory.register(Options.class);

               final Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

               // Create the pipeline with the specified options.
               final Pipeline p = Pipeline.create(options);

               // Read input data
               p
               .apply("Read from Pub/Sub", PubsubIO.readStrings().fromSubscription(options.getOplogPubsubSubscription()));

               // Start pipeline execution.
               p.run();
    }
}

With the pom.xml :

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.mycompany</groupId>
    <artifactId>data-octopus</artifactId>
    <version>1.5.2</version>
    <name>data-octopus</name>
    <description>Recommendation features computation batch and real-time jobs</description>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <jdk.version>1.8</jdk.version>

        <maven-compiler-plugin.version>3.5.1</maven-compiler-plugin.version>

        <org.slf4j.version>1.7.21</org.slf4j.version>
        <ch.qos.logback.version>1.4.14</ch.qos.logback.version>
        <commons-math3.version>3.6.1</commons-math3.version>
        <guava.version>33.0.0-jre</guava.version>
        <beam.version>2.52.0</beam.version>
        <jackson.version>2.16.1</jackson.version>
        <aerospike-client.version>5.3.0</aerospike-client.version>
        <flink.artifact.name>beam-runners-flink-1.16</flink.artifact.name>
    </properties>

    <repositories>
       <repository>
            <id>artifact-registry</id>
            <url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts</url>
            <releases>
                <enabled>true</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
             </snapshots>
        </repository>
        <repository>
            <id>central</id>
            <name>Maven Repository</name>
            <url>https://repo.maven.apache.org/maven2</url>
            <snapshots>
                <enabled>false</enabled>
            </snapshots>
        </repository>
    </repositories>
    <distributionManagement>
        <repository>
            <id>artifact-registry</id>
            <url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts/</url>
        </repository>
        <snapshotRepository>
            <id>artifact-registry</id>
            <url>https://us-maven.pkg.dev/dm-artifacts/java-artifacts/</url>
        </snapshotRepository>
    </distributionManagement>

    <dependencies>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${org.slf4j.version}</version>
        </dependency>

        <!-- Adds a dependency on the Beam SDK. -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-core</artifactId>
            <version>${beam.version}</version>
        </dependency>
        <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
            <version>${beam.version}</version>
        </dependency>
    </dependencies>

    <profiles>
        <profile>
            <id>direct-runner</id>
            <activation>
                <activeByDefault>true</activeByDefault>
            </activation>
            <!-- Makes the DirectRunner available when running a pipeline. -->
            <dependencies>
                <dependency>
                    <groupId>org.apache.beam</groupId>
                    <artifactId>beam-runners-direct-java</artifactId>
                    <version>${beam.version}</version>
                    <scope>runtime</scope>
                </dependency>
            </dependencies>
        </profile>
        <profile>
            <id>flink-runner</id>
            <!-- Makes the FlinkRunner available when running a pipeline. -->
            <dependencies>
                <dependency>
                    <groupId>org.apache.beam</groupId>
                    <!-- Please see the Flink Runner page for an up-to-date list
                        of supported Flink versions and their artifact names:
                    https://beam.apache.org/documentation/runners/flink/ -->
                    <artifactId>${flink.artifact.name}</artifactId>
                    <version>${beam.version}</version>
                    <scope>runtime</scope>
                </dependency>
            </dependencies>
        </profile>
    </profiles>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-resources-plugin</artifactId>
                <version>3.0.2</version>
                <executions>
                    <execution>
                        <id>template-videocatalog-dockerfile</id>
                        <phase>validate</phase>
                        <goals>
                            <goal>copy-resources</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>deploy/videocatalog</outputDirectory>
                            <resources>
                                <resource>
                                    <directory>deploy/videocatalog/template</directory>
                                    <filtering>true</filtering>
                                </resource>
                            </resources>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <source>${jdk.version}</source>
                    <target>${jdk.version}</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.0.2</version>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/LICENSE</exclude>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
                            </transformers>
                            <shadedArtifactAttached>true</shadedArtifactAttached>
                            <shadedClassifierName>bundled</shadedClassifierName>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <parallel>all</parallel>
                    <threadCount>4</threadCount>
                    <redirectTestOutputToFile>true</redirectTestOutputToFile>
                </configuration>
                <dependencies>
                    <dependency>
                        <groupId>org.apache.maven.surefire</groupId>
                        <artifactId>surefire-junit47</artifactId>
                        <version>2.18.1</version>
                    </dependency>
                </dependencies>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-deploy-plugin</artifactId>
                <version>2.8.1</version>
                <executions>
                    <execution>
                        <id>default-deploy</id>
                        <phase>deploy</phase>
                        <goals>
                            <goal>deploy</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

And the associated configuration :

apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
  annotations:
    meta.helm.sh/release-name: data-octopus.flink-pipelines
    meta.helm.sh/release-namespace: flink-pipelines
    rollme: Uv1n7
  creationTimestamp: "2024-01-01T15:15:31Z"
  finalizers:
  - flinkdeployments.flink.apache.org/finalizer
  generation: 5
  labels:
    app.kubernetes.io/managed-by: Helm
    environment: staging
  name: vc-realtime-ix7-staging
  namespace: flink-pipelines
  resourceVersion: "2211827076"
  uid: 328c9b41-ce8a-4165-9c9e-80141c9eb16d
spec:
  flinkConfiguration:
    env.java.opts: -Dlog4j2.formatMsgNoLookups=true
    high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
    high-availability.storageDir: s3a://flink/ha/videocatalog-realtime
    kubernetes.jobmanager.entrypoint.args: -Ds3.access-key=${MINIO_ACCESS_KEY} -Ds3.secret-key=${MINIO_SECRET_KEY}
      -Dmetrics.reporter.dghttp.apikey=${DATADOG_API_KEY}
    kubernetes.taskmanager.entrypoint.args: -Ds3.access-key=${MINIO_ACCESS_KEY} -Ds3.secret-key=${MINIO_SECRET_KEY}
      -Dmetrics.reporter.dghttp.apikey=${DATADOG_API_KEY}
    metrics.reporter.dghttp.class: org.apache.flink.metrics.datadog.DatadogHttpReporter
    metrics.reporter.dghttp.tags: app:videocatalog-realtime,env:staging
    metrics.scope.jm: flink.jm
    metrics.scope.jm.job: flink.jm.job
    metrics.scope.operator: flink.operator
    metrics.scope.task: flink.task
    metrics.scope.tm: flink.tm
    metrics.scope.tm.job: flink.tm.job
    s3.endpoint: http://flink-minio-svc:9000
    s3.path-style-access: "true"
    state.backend.type: hashmap
    state.checkpoint-storage: filesystem
    state.checkpoints.dir: s3a://flink/recommender/videocatalog/externalized-checkpoints
    state.savepoints.dir: s3a://flink/recommender/videocatalog/savepoints
    taskmanager.numberOfTaskSlots: "4"
    web.timeout: "100000"
    web.upload.dir: /opt/flink
  flinkVersion: v1_16
  image: quay.io/mycompany/data-octopus-flink:a64370c
  imagePullPolicy: Always
  ingress:
    annotations:
      external-dns.alpha.kubernetes.io/hostname: vc-realtime-ix7-staging.mydomain.com
      external-dns.alpha.kubernetes.io/target: ****
      external-dns.alpha.kubernetes.io/ttl: "120"
      nginx.ingress.kubernetes.io/whitelist-source-range: *****
    className: nginx-priv
    template: vc-realtime-ix7-staging.mydomain.com
  job:
    args:
    - --oplogPubsubSubscription=projects/my-company/subscriptions/oplog-low-latency.aerospike
    - --runner=FlinkRunner
    - --streaming=true
    - --attachedMode=false
    - --checkpointingInterval=60000
    - --latencyTrackingInterval=60000
    entryClass: com.mycompany.reco_videocatalog.VideoCatalogWriteJob
    jarURI: local:///opt/flink/flink-web-upload/data-octopus-bundled.jar
    parallelism: 8
    state: running
    upgradeMode: savepoint
  jobManager:
    replicas: 1
    resource:
      cpu: 0.5
      memory: 2g
  podTemplate:
    apiVersion: v1
    kind: Pod
    spec:
      containers:
      - env:
        - name: DATADOG_API_KEY
          valueFrom:
            secretKeyRef:
              key: datadogApiKey
              name: data-octopus
        - name: GOOGLE_APPLICATION_CREDENTIALS
          value: /var/secrets/google/google-credentials
        - name: MINIO_ACCESS_KEY
          valueFrom:
            secretKeyRef:
              key: minioAccessKey
              name: data-octopus
        - name: MINIO_SECRET_KEY
          valueFrom:
            secretKeyRef:
              key: minioSecretKey
              name: data-octopus
        name: flink-main-container
        volumeMounts:
        - mountPath: /var/secrets/google
          name: gcp-serviceaccount
        - mountPath: /var/secrets/certs
          name: mycompany-ca-cert
      imagePullSecrets:
      - name: mycompany-puller-pull-secret
      volumes:
      - name: gcp-serviceaccount
        secret:
          items:
          - key: google-credentials
            path: google-credentials
          secretName: data-octopus
      - name: mycompany-ca-cert
        secret:
          items:
          - key: ca.crt
            path: ca.crt
          secretName: mycompany-ca-cert
  serviceAccount: flink
  taskManager:
    resource:

      cpu: 2
      memory: 4g

Here is the screenshot from Google Cloud Console showing that messages are never acked :

Screenshot 2024-01-03 at 09 41 22

Thank you for your help :)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

gfalcone commented 8 months ago

Also, it might be good to add the --attached option to this page : https://beam.apache.org/documentation/runners/flink/

Abacn commented 8 months ago

there wasn't much changes for flink runner between Beam 2.51.0 and 2.52.0 except for #28614, which known to have caused a couple of flaky tests and then fixed later.

Is attached mode works fine?

gfalcone commented 8 months ago

Impossible to test on my end with the setup I have. When running through FlinkDeployment Kubernetes object, it is submitted in detached mode...

noster-dev commented 7 months ago

Similar thing happening with AWS SqsIO after update to 2.52.

It seems that #28614 replaced UnboundedSourceWrapper with FlinkUnboundedSource and WatermarkStrategy.noWatermarks(), effectively blocking watermark emission.

Abacn commented 7 months ago

CC: @jto who has been quite responsive. Would appreciate if you can share some ideas about fixing https://github.com/apache/beam/issues/29902#issuecomment-1883597256

Abacn commented 7 months ago

Move to P1 as this essentially breaking streaming FlinkRunner. If it's not fixed in the next release I would suggest revert #28614 and related changes.

@noster-dev @gfalcone are you able to test that reverting that change would resolve the issue?

noster-dev commented 7 months ago

I can confirm that reverting to 2.51 fixes the problem. Tomorrow I will try to test it on 2.52 with #28614 reverted

anartemp commented 6 months ago

Hi, are there any updates to this issue? I'm facing a potentially similar issue with Kafka source not committing offsets back even though commitsOffsetsInFinalize is true and checkpointing is enabled. Beam v2.52.0. Flink v1.12.4.

Committing offsets works in Beam v2.51.0.

jto commented 5 months ago

Hey there! Apologies for not looking into this earlier. I was on parental leave and had a lot to catch up when I got back. I think this issue and https://github.com/apache/beam/issues/30539 are duplicates. Taking a look now.

je-ik commented 4 months ago

This is very much likely related to #25525 as after the change to FLIP-27 sources call to finalizeCheckpoint seem to disappear. The source refactor likely caused #29816 as well, though the specific mechanics are still unknown.

je-ik commented 4 months ago

Hey there! Apologies for not looking into this earlier. I was on parental leave and had a lot to catch up when I got back. I think this issue and #30539 are duplicates. Taking a look now.

Yes, also related to missing finalizeCheckpoint call.