apache / beam

Apache Beam is a unified programming model for Batch and Streaming data processing.
https://beam.apache.org/
Apache License 2.0
7.89k stars 4.27k forks source link

[Bug]: Issue with IllegalStateException in `STORAGE_WRITE_API` method in `BigQueryIO` #31422

Open clongnguyen6 opened 6 months ago

clongnguyen6 commented 6 months ago

What happened?

I am running a Beam pipeline in Google Dataflow (Beam SDK version 2.56.0) that reads messages from Pub/Sub and writes data to a BigQuery table using the STORAGE_WRITE_API method. This is the BigQueryIO configuration I'm using:

BigQueryIO.write<KV<String, TableRow>>().to(
                BigQueryDestination(...)
            )
                .withFormatFunction { it.value } // Get TableRow
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                .withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
                .withTriggeringFrequency(Duration.standardMinutes(5))
                .withAutoSharding()
                .ignoreUnknownValues()

I observed an IllegalStateException error in the pipeline logs when running load test:

Error message from worker: java.lang.RuntimeException: java.lang.IllegalStateException
        org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:608)
        org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:869)
Caused by: java.lang.IllegalStateException
        org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:496)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$WriteStreamServiceImpl$1.pin(BigQueryServicesImpl.java:1459)
        org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.lambda$process$12(StorageApiWritesShardedRecords.java:600)
        org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.process(StorageApiWritesShardedRecords.java:869)
        org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
        org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
        org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:276)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:86)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWindowedValue(SimpleDoFnRunner.java:449)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.outputWithTimestamp(SimpleDoFnRunner.java:439)
        org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.outputWithTimestamp(DoFnOutputReceivers.java:98)
        org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.flushBatch(GroupIntoBatches.java:660)
        org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn.processElement(GroupIntoBatches.java:518)
        org.apache.beam.sdk.transforms.GroupIntoBatches$GroupIntoBatchesDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:186)
        org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1238)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.lambda$scheduleWorkItem$11(StreamingDataflowWorker.java:974)
        org.apache.beam.runners.dataflow.worker.streaming.Work.run(Work.java:81)
        org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor.lambda$executeMonitorHeld$0(BoundedQueueExecutor.java:232)
        java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        java.base/java.lang.Thread.run(Thread.java:829)

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

liferoad commented 5 months ago

Is the error transient?

clongnguyen6 commented 5 months ago

Is the error transient? No, the error is not transient It occurs frequently in the pipeline logs aaaa

liferoad commented 5 months ago

Since this is a streaming job, is the job often stuck? I also suggest you open a GCP support ticket (https://cloud.google.com/dataflow/docs/support/getting-support). I have seen some issues similar to this but they are often caused by some Java package mismatches.

clongnguyen6 commented 5 months ago

Since this is a streaming job, is the job often stuck? I also suggest you open a GCP support ticket (https://cloud.google.com/dataflow/docs/support/getting-support). I have seen some issues similar to this but they are often caused by some Java package mismatches.

The job is not stuck and is running normally, but the error logs are quite annoying. @liferoad, Sorry because the above information is not correct, the Job will be stuck.

Abacn commented 5 months ago

The trackstace points to here: https://github.com/apache/beam/blob/8c6e1a4654bced7bf732e1a46092908580da561b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L1459

indicates the client is already closed when one trying to write records via "StorageApiWritesShardedRecords".

"closed" could only be true when StreamAppendClient.close is called.

Sounds like some racing condition happened there.

Note that "WritesShardedRecords" is used to write spilled records that not handled by "WritesUnShardedRecords" and is a less execised code.

In any case, open a support ticket so cloud support could inspect your job ID. I see a public bug opened for https://b.corp.google.com/issues/345352979 this is not a usual process

clongnguyen6 commented 5 months ago

The trackstace points to here:

https://github.com/apache/beam/blob/8c6e1a4654bced7bf732e1a46092908580da561b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java#L1459

indicates the client is already closed when one trying to write records via "StorageApiWritesShardedRecords".

"closed" could only be true when StreamAppendClient.close is called.

Sounds like some racing condition happened there.

Note that "WritesShardedRecords" is used to write spilled records that not handled by "WritesUnShardedRecords" and is a less execised code.

In any case, open a support ticket so cloud support could inspect your job ID. I see a public bug opened for https://b.corp.google.com/issues/345352979 this is not a usual process

I created a GCP support ticket.

Abacn commented 5 months ago

There are two places closing the client:

https://github.com/apache/beam/blob/2ddfcfb1c18020d0c01c7ebfe5d898a5b8a303b6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java#L392

https://github.com/apache/beam/blob/2ddfcfb1c18020d0c01c7ebfe5d898a5b8a303b6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java#L517

Apparent racing condition most likely client already marked closed here, then in

https://github.com/apache/beam/blob/2ddfcfb1c18020d0c01c7ebfe5d898a5b8a303b6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java#L600

it tries to use the client and fails precondition.

It's hard to locate where the client get closed (async and lambda programming code style). Basically, there is a APPEND_CLIENTS cache:

https://github.com/apache/beam/blob/2ddfcfb1c18020d0c01c7ebfe5d898a5b8a303b6/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java#L181

removal of cache item will trigger appendClientInfo.close which calls getCloseAppendClient().accept(client) that is the code above.

Then what happens is the cache entry contains the client get invalidated before it's going to be used again.