apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.82k stars 4.24k forks source link

[Bug]: Flink runner fails with "The job graph is cyclic." #21945

Open waj334 opened 2 years ago

waj334 commented 2 years ago

What happened?

I tried a extremely simple beam pipeline:

package main

import (
    "context"
    "flag"
    "github.com/apache/beam/sdks/v2/go/pkg/beam"
    "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
    "log"
)

func main() {
    flag.Parse()
    beam.Init()

    ctx := context.Background()
    p, s := beam.NewPipelineWithRoot()

    beam.Impulse(s)

    if err := beamx.Run(ctx, p); err != nil {
        log.Fatal(ctx, "Failed to execute job: %v", err)
    }
}

When using the flink runner with the following error:

C:\Users\waj33\AppData\Local\Temp\GoLand\___2Telemetry_Beam_Pipeline.exe --runner=flink --endpoint=localhost:8099

2022/06/18 19:29:36 No environment config specified. Using default config: 'apache/beam_go_sdk:2.41.0.dev'
2022/06/18 19:29:36 components: <
  transforms: <
    key: "e1"
    value: <
      unique_name: "Impulse"
      spec: <
        urn: "beam:transform:impulse:v1"
      >
      outputs: <
        key: "i0"
        value: "n1"
      >
    >
  >
  pcollections: <
    key: "n1"
    value: <
      unique_name: "n1"
      coder_id: "c0"
      is_bounded: BOUNDED
      windowing_strategy_id: "w0"
    >
  >
  windowing_strategies: <
    key: "w0"
    value: <
      window_fn: <
        urn: "beam:window_fn:global_windows:v1"
      >
      merge_status: NON_MERGING
      window_coder_id: "c1"
      trigger: <
        default: <
        >
      >
      accumulation_mode: DISCARDING
      output_time: END_OF_WINDOW
      closing_behavior: EMIT_IF_NONEMPTY
      on_time_behavior: FIRE_IF_NONEMPTY
      environment_id: "go"
    >
  >
  coders: <
    key: "c0"
    value: <
      spec: <
        urn: "beam:coder:bytes:v1"
      >
    >
  >
  coders: <
    key: "c1"
    value: <
      spec: <
        urn: "beam:coder:global_window:v1"
      >
    >
  >
  environments: <
    key: "go"
    value: <
      urn: "beam:env:docker:v1"
      payload: "\n\035apache/beam_go_sdk:2.41.0.dev"
      capabilities: "beam:protocol:progress_reporting:v1"
      capabilities: "beam:protocol:multi_core_bundle_processing:v1"
      capabilities: "beam:transform:sdf_truncate_sized_restrictions:v1"
      capabilities: "beam:protocol:worker_status:v1"
      capabilities: "beam:protocol:monitoring_info_short_ids:v1"
      capabilities: "beam:version:sdk_base:go"
      capabilities: "beam:coder:bytes:v1"
      capabilities: "beam:coder:bool:v1"
      capabilities: "beam:coder:varint:v1"
      capabilities: "beam:coder:double:v1"
      capabilities: "beam:coder:string_utf8:v1"
      capabilities: "beam:coder:length_prefix:v1"
      capabilities: "beam:coder:kv:v1"
      capabilities: "beam:coder:iterable:v1"
      capabilities: "beam:coder:state_backed_iterable:v1"
      capabilities: "beam:coder:windowed_value:v1"
      capabilities: "beam:coder:global_window:v1"
      capabilities: "beam:coder:interval_window:v1"
      capabilities: "beam:coder:row:v1"
      capabilities: "beam:coder:nullable:v1"
      dependencies: <
        type_urn: "beam:artifact:type:file:v1"
        role_urn: "beam:artifact:role:go_worker_binary:v1"
      >
    >
  >
>
root_transform_ids: "e1"
2022/06/18 19:29:39 Prepared job with id: go-job-1-1655598576054300700_3356a1e1-23e9-4522-8839-f5547d5b18cb and staging token: go-job-1-1655598576054300700_3356a1e1-23e9-4522-8839-f5547d5b18cb
2022/06/18 19:29:39 Staged binary artifact with token: 
2022/06/18 19:29:39 Submitted job: go0job0101655598576054300700-root-0619002939-c5a32241_10511e7a-30ce-4f6e-872a-5023c28ba4d9
2022/06/18 19:29:39 Job state: STOPPED
2022/06/18 19:29:39 Job state: STARTING
2022/06/18 19:29:39 Job state: RUNNING
2022/06/18 19:29:45  (): java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
        at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1061)
        at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:958)
        at org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator$BatchTranslationContext.execute(FlinkBatchPortablePipelineTranslator.java:195)
        at org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:118)
        at org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:85)
        at org.apache.beam.runners.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:86)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
        at org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
        at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
        at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1908)
        at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1056)
        ... 11 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
        at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316)
        at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedFunction$2(FunctionUtils.java:75)
        at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
        at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
        at java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
        ... 1 more
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
        at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
        at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source)
        at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown Source)
        at java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
        at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: org.apache.flink.api.common.InvalidProgramException: The job graph is cyclic.
        at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
        at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
        ... 4 more
Caused by: org.apache.flink.api.common.InvalidProgramException: The job graph is cyclic.
        at org.apache.flink.runtime.jobgraph.JobGraph.getVerticesSortedTopologicallyFromSources(JobGraph.java:442)
        at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:186)
        at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:149)
        at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363)
        at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:191)
        at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:139)
        at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:135)
        at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:115)
        at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)
        at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:322)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:106)
        at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:94)
        at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
        ... 4 more
2022/06/18 19:29:45  (): org.apache.flink.api.common.InvalidProgramException: The job graph is cyclic.
2022/06/18 19:29:45 Job state: FAILED
2022/06/18 19:29:45 context.BackgroundFailed to execute job: %vjob go0job0101655598576054300700-root-0619002939-c5a32241_10511e7a-30ce-4f6e-872a-5023c28ba4d9 failed

Process finished with the exit code 1

Issue Priority

Priority: 3

Issue Component

Component: sdk-go

waj334 commented 2 years ago

Turns out I was using Flink 1.15. Switched 1.14 and it worked.

jrmccluskey commented 2 years ago

So the issue has been resolved?

lostluck commented 2 years ago

I'd be a bit more concerned if the pipeline were non-trivial. The SDK does do it's own cycle detection. Currently appropriately triaged as a P3.

dartiga commented 1 year ago

This is now happening to us after we tried upgrading from 1.14 to 1.15.

sdcys-github commented 1 month ago

is this bug resolved? I just met this problem..

lostluck commented 1 month ago

is this bug resolved? I just met this problem..

Is the pipeline more than a trivial single Impulse?

sdcys-github commented 4 weeks ago

is this bug resolved? I just met this problem..

Is the pipeline more than a trivial single Impulse?

sorry for my poor english, i do not understand what your meaning... I just submit the job to flink 1.13.1 cluster, and successful:

Weixin Image_20240913113937

lostluck commented 4 weeks ago

@sdcys-github No worries!

In this instance, I'm curious about what circumstances you have reproduced this bug.

If your pipeline is a single Impulse sith no other transforms, then it's exactly the same. In this case the bug remains on the Flink runner side, not the Go side.

But if your pipeline has additional transforms, and still has a "job graph is cyclic" error, then I would like you to provide the shape of that pipeline. This doesn't mean revealing the code of the transforms, but how they are connected.

The given Pipeline screenshot in your reply has two transforms and the pipeline succeeds according to your comment.

If the pipeline is succeeding, it can't be experiencing this bug.

If it is experiencing this bug, the versions of flink, and beam, and the shape of the pipeline would be valuable in reproducing the issue and ultimately resolving the issue.