apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.67k stars 4.19k forks source link

[Bug]: Cross-Language pipelines doesn't work with BigQuery IO and Go SDK on Dataflow Runner #22172

Open PaulVasilenko opened 1 year ago

PaulVasilenko commented 1 year ago

What happened?

I am using Go SDK and github.com/apache/beam/sdks/v2/go/pkg/beam/io/xlang/bigqueryio package.

When i am running a pipeline which writes to BigQuery using xlang package, it always fails with error

Error message from worker: java.lang.IllegalArgumentException: unable to deserialize Custom DoFn With Execution Info
...
Caused by: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.gcp.bigquery.StorageApiWriteUnshardedRecords$WriteRecordsDoFn

That is also reproducable with this wordcount example: https://github.com/apache/beam/blob/master/sdks/go/examples/xlang/bigquery/wordcount.go

Error message from worker: java.lang.IllegalArgumentException: unable to deserialize Custom DoFn With Execution Info
...
Caused by: java.lang.ClassNotFoundException: org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$TypedRead$4

I might be doing something wrong, but that looks to me as lack of Java SDK on runner or some kind of other runner issues.

Issue Priority

Priority: 2

Issue Component

Component: cross-language

jrmccluskey commented 1 year ago

Which runner are you using?

PaulVasilenko commented 1 year ago

Which runner are you using?

@jrmccluskey Dataflow Runner

jrmccluskey commented 1 year ago

I haven't encountered the same issue with TypedRead on wordcount. Can you post the Job Info and Pipeline Options output from the example wordcount job?

PaulVasilenko commented 1 year ago

@jrmccluskey here it is, anything else?

"sdkPipelineOptions": {
      "display_data": [
        {
          "key": "name",
          "namespace": "options",
          "type": "STRING",
          "value": "bigquery-wordcount-20220714-171900"
        },
        {
          "key": "experiments",
          "namespace": "options",
          "type": "STRING",
          "value": "use_unified_worker,use_portable_job_submission"
        },
        {
          "key": "project",
          "namespace": "options",
          "type": "STRING",
          "value": "mobalytics-1242"
        },
        {
          "key": "region",
          "namespace": "options",
          "type": "STRING",
          "value": "us-central1"
        },
        {
          "key": "container_images",
          "namespace": "options",
          "type": "STRING",
          "value": "apache/beam_go_sdk:2.40.0,apache/beam_java8_sdk:2.40.0,apache/beam_java8_sdk:2.40.0"
        },
        {
          "key": "temp_location",
          "namespace": "options",
          "type": "STRING",
          "value": "gs://test-hl-percentiles/binaries/"
        },
        {
          "key": "hookOrder",
          "namespace": "go_options",
          "type": "STRING",
          "value": "[\"default_remote_logging\"]"
        },
        {
          "key": "dry_run",
          "namespace": "go_options",
          "type": "STRING",
          "value": "true"
        },
        {
          "key": "out_table",
          "namespace": "go_options",
          "type": "STRING",
          "value": "mobalytics-1242:lol_match_metrics.wordcount"
        },
        {
          "key": "hooks",
          "namespace": "go_options",
          "type": "STRING",
          "value": "{\"default_remote_logging\":null}"
        }
      ],
      "options": {
        "experiments": [
          "use_unified_worker",
          "use_portable_job_submission",
          "beam_fn_api"
        ],
        "pipelineUrl": "gs://test-hl-percentiles/binaries/go-1-1657811958617626000/model",
        "region": "us-central1",
        "tempLocation": "gs://test-hl-percentiles/binaries/"
      },
      "beam:option:go_options:v1": {
        "options": {
          "dry_run": "true",
          "hookOrder": "[\"default_remote_logging\"]",
          "hooks": "{\"default_remote_logging\":null}",
          "out_table": "mobalytics-1242:lol_match_metrics.wordcount"
        }
      }
    },
    "tempStoragePrefix": "gs://test-hl-percentiles/binaries/",
    "userAgent": {
      "name": "Apache Beam SDK for Go",
      "version": "2.40.0"
    },
    "version": {
      "job_type": "FNAPI_BATCH",
      "major": "6"
    },
    "workerPools": [
      {
        "autoscalingSettings": {},
        "ipConfiguration": "WORKER_IP_UNSPECIFIED",
        "kind": "harness",
        "numWorkers": 1,
        "packages": [
          {
            "location": "gs://test-hl-percentiles/binaries/go-1-1657811958617626000/worker",
            "name": "worker"
          }
        ],
        "sdkHarnessContainerImages": [
          {
            "containerImage": "apache/beam_go_sdk:2.40.0"
          },
          {
            "containerImage": "apache/beam_java8_sdk:2.40.0"
          },
          {
            "containerImage": "apache/beam_java8_sdk:2.40.0"
          }
        ],
        "workerHarnessContainerImage": "apache/beam_go_sdk:2.40.0"
      }
    ]
  },
  "name": "bigquery-wordcount-20220714-171900",
  "projectId": "mobalytics-1242",
  "type": "JOB_TYPE_BATCH"
jrmccluskey commented 1 year ago

I'm sorry this has taken me a while to circle back to. I'm not seeing a clear culprit for why the job is failing in this manner, especially since I've been able to use it myself. Do you mind telling me what expansion service you're using when you build your pipeline?