apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.78k stars 4.22k forks source link

[Bug]: beam word count example is failing on the flink runner deployed at dedicated flink cluster using beam java SDK #32154

Closed siddhanta-rath closed 1 month ago

siddhanta-rath commented 1 month ago

What happened?

I have used the below official documentation to run the beam word count example : https://beam.apache.org/get-started/quickstart-java/

I have installed flink 1.16.2 version and running it on my local at 8081 port. I am using the latest beam SDK 2.58.0 version and added beam-runners-flink-1.16 dependency in the pom.xml

I have used the below command to deploy the beam job on flink runner: mvn package exec:java -Dexec.mainClass=org.example.WordCount \ -Dexec.args="--runner=FlinkRunner --flinkMaster=localhost:8081 --filesToStage=target/word-count-beam-in-new-bundled-1.0-SNAPSHOT.jar \ --inputFile=/Users/siddhantarath/sample-beam-wordcount/word-count-beam-in-new/sample.txt --output=/tmp/counts" -Pflink-runner

the task manager is crashing and getting below errors:

Screenshot 2024-08-12 at 12 36 36 PM Screenshot 2024-08-12 at 12 36 10 PM

pom.xml:


<?xml version="1.0" encoding="UTF-8"?>
<!--
    Licensed to the Apache Software Foundation (ASF) under one or more
    contributor license agreements.  See the NOTICE file distributed with
    this work for additional information regarding copyright ownership.
    The ASF licenses this file to You under the Apache License, Version 2.0
    (the "License"); you may not use this file except in compliance with
    the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

    Unless required by applicable law or agreed to in writing, software
    distributed under the License is distributed on an "AS IS" BASIS,
    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    See the License for the specific language governing permissions and
    limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>org.example</groupId>
  <artifactId>word-count-beam-in-new</artifactId>
  <version>1.0-SNAPSHOT</version>

  <packaging>jar</packaging>

  <properties>
    <beam.version>2.58.0</beam.version>
    <flink.artifact.name>beam-runners-flink-1.16</flink.artifact.name>

    <bigquery.version>v2-rev20240323-2.0.0</bigquery.version>
    <google-api-client.version>2.0.0</google-api-client.version>
    <guava.version>33.1.0-jre</guava.version>
    <hamcrest.version>2.1</hamcrest.version>
    <jackson.version>2.15.4</jackson.version>
    <joda.version>2.10.14</joda.version>
    <junit.version>4.13.1</junit.version>
    <kafka.version>2.4.1</kafka.version>
    <libraries-bom.version>26.39.0</libraries-bom.version>
    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
    <maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
    <mockito.version>3.7.7</mockito.version>
    <pubsub.version>v1-rev20220904-2.0.0</pubsub.version>
    <slf4j.version>1.7.30</slf4j.version>
    <spark.version>3.2.2</spark.version>
    <hadoop.version>2.10.2</hadoop.version>
    <maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version>
    <nemo.version>0.1</nemo.version>
    <flink.artifact.name>beam-runners-flink-1.18</flink.artifact.name>
  </properties>

  <repositories>
    <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>

      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>${maven-surefire-plugin.version}</version>
        <configuration>
          <parallel>all</parallel>
          <threadCount>4</threadCount>
          <redirectTestOutputToFile>true</redirectTestOutputToFile>
        </configuration>
        <dependencies>
          <dependency>
            <groupId>org.apache.maven.surefire</groupId>
            <artifactId>surefire-junit47</artifactId>
            <version>${maven-surefire-plugin.version}</version>
          </dependency>
        </dependencies>
      </plugin>

      <!-- Ensure that the Maven jar plugin runs before the Maven
        shade plugin by listing the plugin higher within the file. -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-jar-plugin</artifactId>
        <version>${maven-jar-plugin.version}</version>
      </plugin>

      <!--
        Configures `mvn package` to produce a bundled jar ("fat jar") for runners
        that require this for job submission to a cluster.
      -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>${maven-shade-plugin.version}</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <finalName>${project.artifactId}-bundled-${project.version}</finalName>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/LICENSE</exclude>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
              <transformers>
                <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                  <resource>reference.conf</resource>
                </transformer>
              </transformers>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>

  <profiles>
    <profile>
      <id>direct-runner</id>
      <activation>
        <activeByDefault>true</activeByDefault>
      </activation>
      <!-- Makes the DirectRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-direct-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

   <profile>
      <id>portable-runner</id>
      <activation>
        <activeByDefault>true</activeByDefault>
      </activation>
      <!-- Makes the PortableRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-portability-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>dataflow-runner</id>
      <!-- Makes the DataflowRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>flink-runner</id>
      <!-- Makes the FlinkRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <!-- Please see the Flink Runner page for an up-to-date list
               of supported Flink versions and their artifact names:
               https://beam.apache.org/documentation/runners/flink/ -->
          <artifactId>${flink.artifact.name}</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>spark-runner</id>
      <!-- Makes the SparkRunner available when running a pipeline. -->
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-spark-3</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
          <exclusions>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>jul-to-slf4j</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-streaming_2.12</artifactId>
          <version>${spark.version}</version>
          <scope>runtime</scope>
          <exclusions>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>jul-to-slf4j</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
        <dependency>
          <groupId>com.fasterxml.jackson.module</groupId>
          <artifactId>jackson-module-scala_2.12</artifactId>
          <version>${jackson.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
    <profile>
      <id>samza-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-samza</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
    <profile>
      <id>twister2-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-twister2</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>
    <profile>
      <id>nemo-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.nemo</groupId>
          <artifactId>nemo-compiler-frontend-beam</artifactId>
          <version>${nemo.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-common</artifactId>
          <version>${hadoop.version}</version>
          <exclusions>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-api</artifactId>
            </exclusion>
            <exclusion>
              <groupId>org.slf4j</groupId>
              <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
          </exclusions>
        </dependency>
      </dependencies>
    </profile>

    <profile>
      <id>jet-runner</id>
      <dependencies>
        <dependency>
          <groupId>org.apache.beam</groupId>
          <artifactId>beam-runners-jet</artifactId>
          <version>${beam.version}</version>
          <scope>runtime</scope>
        </dependency>
      </dependencies>
    </profile>

  </profiles>

  <dependencies>
    <!-- Adds a dependency on the Beam SDK. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- Adds a dependency on the Python Multi-language pipelines API module. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-extensions-python</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- Adds a dependency on the Beam Kafka IO module. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-kafka</artifactId>
      <version>${beam.version}</version>
    </dependency>

    <!-- Dependencies below this line are specific dependencies needed by the examples code. -->
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>${google-api-client.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-bigquery</artifactId>
      <version>${bigquery.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.http-client</groupId>
      <artifactId>google-http-client</artifactId>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-pubsub</artifactId>
      <version>${pubsub.version}</version>
      <exclusions>
        <!-- Exclude an old version of guava that is being pulled
             in by a transitive dependency of google-api-client -->
        <exclusion>
          <groupId>com.google.guava</groupId>
          <artifactId>guava-jdk5</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>${joda.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka.version}</version>
    </dependency>

    <!-- Add slf4j API frontend binding with JUL backend -->
    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-api</artifactId>
      <version>${slf4j.version}</version>
    </dependency>

    <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-jdk14</artifactId>
      <version>${slf4j.version}</version>
      <!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
      <scope>runtime</scope>
    </dependency>

    <!-- Hamcrest and JUnit are required dependencies of PAssert,
         which is used in the main code of DebuggingWordCount example. -->
    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>hamcrest-core</artifactId>
      <version>${hamcrest.version}</version>
    </dependency>

    <dependency>
      <groupId>org.hamcrest</groupId>
      <artifactId>hamcrest-library</artifactId>
      <version>${hamcrest.version}</version>
    </dependency>

    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>${junit.version}</version>
    </dependency>

    <!-- The DirectRunner is needed for unit tests. -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>${beam.version}</version>
      <scope>test</scope>
    </dependency>

    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-core</artifactId>
      <version>${mockito.version}</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>${guava.version}</version>  <!-- "-jre" for Java 8 or higher -->
      </dependency>
      <!-- GCP libraries BOM sets the version for google http client -->
      <dependency>
        <groupId>com.google.cloud</groupId>
        <artifactId>libraries-bom</artifactId>
        <version>${libraries-bom.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>
</project>

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

Abacn commented 1 month ago

The pom file has both

<flink.artifact.name>beam-runners-flink-1.16</flink.artifact.name>
    <flink.artifact.name>beam-runners-flink-1.18</flink.artifact.name>

likely 1.18 beam-flink-runner is used on 1.16 backend.

Flink runner is running fine on tests and this is likely not a beam issue. I'm closing this for now. A few things to check

if issue persists, feel free to reopen