GoogleCloudPlatform / DataflowJavaSDK

Google Cloud Dataflow provides a simple, powerful model for building both batch and streaming parallel data processing pipelines.
http://cloud.google.com/dataflow
855 stars 324 forks source link

Since porting to 2.1.0, pipeline now fails when it encounters an empty `PCollection` for BigQuery sink/output #610

Open polleyg opened 6 years ago

polleyg commented 6 years ago

We are porting some pipelines from 1.8.0 to 2.1.0 in our applications.

These pipelines are basic: Read from GCS -> ParDo -> Write to BigQuery. There are multiple side outputs being written to BigQuery (think different months/years, which correspond to sharded tables in BigQuery).

In some cases, we may not have any elements for one or more of the BigQuery sinks. Running on 1.8.0, this never causes an issue. However, when running on 2.1.0, the pipeline now fails if it encounters an empty PCollection<TableRow>:

ID: 2017-10-08_18_49_08-4262554366118214565 2017-10-09 (13:13:50) java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: ... (db49af7a45b5ecaa): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to create load job with id prefix c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001, reached max retries: 3, last failed load job: { "configuration" : { "load" : { "createDisposition" : "CREATE_IF_NEEDED", "destinationTable" : { "datasetId" : "<redacted>", "projectId" : "<redacted>", "tableId" : "Clicks_transformed_2017_11" }, "sourceFormat" : "NEWLINE_DELIMITED_JSON", "sourceUris" : [ "gs://<redacted>/BigQueryWriteTemp/c4b4181de93d47f3b2b6ba5d2cdd2507/43f3689a-6b4d-4410-b566-3d0bcbf7cbb7" ], "writeDisposition" : "WRITE_APPEND" } }, "etag" : "\"Z8nD8CIuj_TqmPrS_MNV-O9x2rU/CT0v4ZW2CSH0SWrB04bn5-C_ckc\"", "id" : "<redacted>:c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "jobReference" : { "jobId" : "c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "projectId" : "<redacted>" }, "kind" : "bigquery#job", "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/<redacted>/jobs/c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "statistics" : { "creationTime" : "1507515172555", "endTime" : "1507515172669", "startTime" : "1507515172669" }, "status" : { "errorResult" : { "message" : "No schema specified on job or table.", "reason" : "invalid" }, "errors" : [ { "message" : "No schema specified on job or table.", "reason" : "invalid" } ], "state" : "DONE" }, "user_email" : "<redacted>-compute@developer.gserviceaccount.com" }. at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:182) at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:104) at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:54) at com.google.cloud.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:37) at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:117) at com.google.cloud.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:74) at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:113) at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:187) at com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:148) at com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:68) at com.google.cloud.dataflow.worker.DataflowWorker.executeWork(DataflowWorker.java:336) at com.google.cloud.dataflow.worker.DataflowWorker.doWork(DataflowWorker.java:294) at com.google.cloud.dataflow.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:135) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:115) at com.google.cloud.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:102) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.RuntimeException: Failed to create load job with id prefix c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001, reached max retries: 3, last failed load job: { "configuration" : { "load" : { "createDisposition" : "CREATE_IF_NEEDED", "destinationTable" : { "datasetId" : "<redacted>", "projectId" : "<redacted>", "tableId" : "Clicks_transformed_2017_11" }, "sourceFormat" : "NEWLINE_DELIMITED_JSON", "sourceUris" : [ "gs://<redacted>/BigQueryWriteTemp/c4b4181de93d47f3b2b6ba5d2cdd2507/43f3689a-6b4d-4410-b566-3d0bcbf7cbb7" ], "writeDisposition" : "WRITE_APPEND" } }, "etag" : "\"Z8nD8CIuj_TqmPrS_MNV-O9x2rU/CT0v4ZW2CSH0SWrB04bn5-C_ckc\"", "id" : "<redacted>:c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "jobReference" : { "jobId" : "c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "projectId" : "<redacted>" }, "kind" : "bigquery#job", "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/<redacted>/jobs/c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "statistics" : { "creationTime" : "1507515172555", "endTime" : "1507515172669", "startTime" : "1507515172669" }, "status" : { "errorResult" : { "message" : "No schema specified on job or table.", "reason" : "invalid" }, "errors" : [ { "message" : "No schema specified on job or table.", "reason" : "invalid" } ], "state" : "DONE" }, "user_email" : "<redacted>-compute@developer.gserviceaccount.com" }. at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36) at org.apache.beam.sdk.io.gcp.bigquery.WriteTables$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:138) at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233) at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:183) at org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:211) at org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:66) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:436) at org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:424) at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:84) at org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source) at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:177) at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:141) at com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:233) at com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:48) at com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) at com.google.cloud.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:180) ... 21 more Caused by: java.lang.RuntimeException: Failed to create load job with id prefix c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001, reached max retries: 3, last failed load job: { "configuration" : { "load" : { "createDisposition" : "CREATE_IF_NEEDED", "destinationTable" : { "datasetId" : "<redacted>", "projectId" : "<redacted>", "tableId" : "Clicks_transformed_2017_11" }, "sourceFormat" : "NEWLINE_DELIMITED_JSON", "sourceUris" : [ "gs://<redacted>/BigQueryWriteTemp/c4b4181de93d47f3b2b6ba5d2cdd2507/43f3689a-6b4d-4410-b566-3d0bcbf7cbb7" ], "writeDisposition" : "WRITE_APPEND" } }, "etag" : "\"Z8nD8CIuj_TqmPrS_MNV-O9x2rU/CT0v4ZW2CSH0SWrB04bn5-C_ckc\"", "id" : "<redacted>:c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "jobReference" : { "jobId" : "c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "projectId" : "<redacted>" }, "kind" : "bigquery#job", "selfLink" : "https://www.googleapis.com/bigquery/v2/projects/<redacted>/jobs/c4b4181de93d47f3b2b6ba5d2cdd2507_9fb1b09312020bde79d0135a8180901d_00001-2", "statistics" : { "creationTime" : "1507515172555", "endTime" : "1507515172669", "startTime" : "1507515172669" }, "status" : { "errorResult" : { "message" : "No schema specified on job or table.", "reason" : "invalid" }, "errors" : [ { "message" : "No schema specified on job or table.", "reason" : "invalid" } ], "state" : "DONE" }, "user_email" : "<redacted>-compute@developer.gserviceaccount.com" }. at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.load(WriteTables.java:179) at org.apache.beam.sdk.io.gcp.bigquery.WriteTables.processElement(WriteTables.java:114)

As a test, I hacked one of the GCS (csv) input files, to ensure at least one element ends up in each of the BigQuery sinks. Et voila, it worked (ID: 2017-10-08_20_25_01-3317103171891646372).

We are skipping 1.9.0 and 2.0.0, so I'm not sure which version this bug was introduced in.

koichirokamoto commented 6 years ago

Is there any update?