GoogleCloudPlatform / DataflowTemplates

Cloud Dataflow Google-provided templates for solving in-Cloud data tasks
https://cloud.google.com/dataflow/docs/guides/templates/provided-templates
Apache License 2.0
1.14k stars 954 forks source link

Pubsub to BigQuery - Cumlated errors make jobs crash #7

Closed joseluisroblesurquiza closed 3 months ago

joseluisroblesurquiza commented 6 years ago

Hi

I've been using Google Dataflow Templates to send messages from pub/sub to BigQuery based on this: https://cloud.google.com/dataflow/docs/templates/provided-templates#cloudpubsubtobigquery

Since I've launched the dataflow job in streaming mode, the job has started to generate errors and finally crash based on the way Dataflow exceptions are handled: https://cloud.google.com/dataflow/faq#how-are-java-exceptions-handled-in-cloud-dataflow

Here is the error: java.lang.RuntimeException: java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"","message":"Repeated record added outside of an array.","reason":"invalid"}],"index":0}, {"errors":[{"debugInfo":"","location":"","message":"","reason":"stopped"}],"index":1}, {"errors":[{"debugInfo":"","location":"","message":"","reason":"stopped"}],"index":2}, {"errors":[{"debugInfo":"","location":"","message":"","reason":"stopped"}],"index":3} .......] org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:125) org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:94)

captura de pantalla de 2018-09-13 13-54-58

image

As I understand, this kind of behaviour makes this template not useful for data streaming.

Are there any possibilities to configure the template to avoid exceptions thrown but still send them to stackdriver?

Thanks

ryanmcdowell commented 6 years ago

The Pub/Sub to BigQuery template should handle parse, format, & UDF exceptions automatically by routing exceptions to a dead-letter table. However, in your case it looks like the failure is occurring on the output to BigQuery which is not being re-routed to the dead-letter today.

A couple of follow-up questions: 1) Which transform is throwing the errors (WriteSuccessfulRecords or WriteFailedRecords)? 2) Can you give a sample of the input? Sanitized as necessary. 3) Does the input schema match the BigQuery schema? 4) When you say the job crashed... are you just referring to the errors or did the job actually halt processing of records which should have been successful?

joseluisroblesurquiza commented 6 years ago

Hi Ryan

  1. I'm not really sure but I think it could be in WriteFailedRecords because of the format of the JSON that DF is trying to cast from JSON to TableRow. Here is the complete message shown in Dataflow log (There isn't anything on GC Consle logging): DF_ERROR.txt

  2. This file (pub_sub_message.txt ) is an example of the message sent to PUB/SUB

  3. The JSON format matches the BigQuery schema. The two first hours when the job starts, the information is correct in BQ, but after approx one or two hours the information stops to flow.

  4. The job is still working, but the information doesn't appear in BigQuery, maybe because of the system lag trying to process these exceptions image

I look forward to your comments

ryanmcdowell commented 6 years ago

I tried to re-create the scenario with the provided payload but was unable to receive the same error. I used a pretty vanilla configuration in the "Create Job from Template" UI with no UDF, which consumed from a topic and output to a table (schema).

When publishing the message, I noticed that your payload is invalid JSON due to the trailing comma within the first record in the recommendations array.

support_invalid_json

Considering the invalid JSON, the pipeline automatically routed the record to the dead-letter table which was successfully created and written to. I did not receive any errors on output.

support_deadletter

Is there anything you can think of that does not match between my test configuration and yours (e.g. UDF, custom code, etc)?

joseluisroblesurquiza commented 6 years ago

Sorry Ryan, my mistake, I tried to give you a reduced sample, but I'm sure that the JSON format is not the problem because of the code I'm using to generate PUBSUB messages (Python3): image image

If I'm right, this code ensures that the JSON will always be well formatted (json.dumps(msg).encode()), otherwise I would receive an execution error before publishing the message. Anyway, in my case, I'm not receiving anything in the dead-letter table.

Related to UDF functions, I'm not using any UDF functions.

If you need a complete PUBSUB message example or the BQ Schema, I would prefer to send them to you privately. If you need anything else from me, just ask for it.

Thanks!

joseluisroblesurquiza commented 6 years ago

I just reproduce the error in an ad-hoc code.

The error seems to be related to BigQueryServicesImplement (BigQueryServicesImpl.java:799)(BigQueryServicesImpl.java:813). Maybe is an exception not currently handled in your code... image

The problem arise from some cases in which there is a pubsub message with a bigquery array record with no elements. The BigQuery field in wich I'm trying to save this information is: image

This few lines solve my problem: image

I relaunched the template and its correctly working with no errors: image

In my opinion, this template should handle this kind of exceptions.

I look forward to your comments

ryanmcdowell commented 6 years ago

I tried recreating the error, using NULL arrays, empty arrays, and populated arrays and was unable to do so. All were successfully output to the table. If you could send me an anonymized version of your payload / schema via email (in my profile), I can take a deeper look.

The error being thrown is from the BigQuery service invocation not liking the format of the JSON but I haven't been able to reproduce so it's difficult to pinpoint. A long-term solution will be to re-route records which failed output back to the dead-letter table. However, BigQueryIO does not give a description of the error which caused the failure so while that would get rid of the errors, it wouldn't give the user a good indication of why those records failed.

mnebes commented 5 years ago

Hi @ryanmcdowell, I am dealing with a similar problem here. If the message payload doesn't 100% match our BigQuery Schema it just keeps bouncing in the "WriteSuccessfulRecords" step. While we generally make sure we do not have such messages in the topic, when it unfortunately happens it is pretty annoying.

We thought about a custom UDF to verify the json message schema, but it makes the template higher maintenance. We would definitely welcome a feature where the unsuccessful BigQuery writes would be re-routed to the dead-letter table if BigQuery returns a response indicating the payload is invalid.

ryanmcdowell commented 5 years ago

@mnebes - Thanks for the feedback. There should be a change for this in the next week or so.

mnebes commented 5 years ago

@ryanmcdowell That's awesome! Thank you very much!

joseluisroblesurquiza commented 5 years ago

Good News! Thanks you!

mnebes commented 5 years ago

Hi @ryanmcdowell , just wanted to ask if there are any news on this front?

RyanGordon commented 5 years ago

Also running into this bug with similar circumstances: Java.lang.RuntimeException: java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"","message":"Maximum allowed row size exceeded. Allowed: 1048576 Row size: 1383816","reason":"invalid"}],"index":0}]

The dataflow job retries indefinitely, causing the pipeline to stall out and not process anything else. In this case probably, the error should be logged & dropped

mayansalama commented 5 years ago

Hi there,

I'm encountering this issue when the schema of the incoming JSON message does not match the target table. @ryanmcdowell do you have any ideas if there's a fix for this on the way? I've included a stack trace below.

java.lang.RuntimeException: java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"col3","message":"no such field.","reason":"invalid"}],"index":0}]
        org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:141)
        org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:102)
Caused by: java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"col3","message":"no such field.","reason":"invalid"}],"index":0}]
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:812)
        org.apache.beam.sdk.io.gcp.bigquery.BigQueryServicesImpl$DatasetServiceImpl.insertAll(BigQueryServicesImpl.java:829)
        org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.flushRows(StreamingWriteFn.java:130)
        org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn.finishBundle(StreamingWriteFn.java:102)
        org.apache.beam.sdk.io.gcp.bigquery.StreamingWriteFn$DoFnInvoker.invokeFinishBundle(Unknown Source)
        org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.finishBundle(SimpleDoFnRunner.java:285)
        org.apache.beam.runners.dataflow.worker.SimpleParDoFn.finishBundle(SimpleParDoFn.java:407)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.finish(ParDoOperation.java:56)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:84)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1233)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:144)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:972)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

EDIT: Found this very useful thread to solve this issue - https://stackoverflow.com/questions/52044349/apache-beam-google-dataflow-pubsub-to-bigquery-pipeline-handling-insert-errors

ee07dazn commented 2 years ago

@ryanmcdowell : any progress on this as i am seeing the same issue on pubsub-cdc-to-bigquery

github-actions[bot] commented 3 months ago

This issue has been marked as stale due to 180 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the issue at any time. Thank you for your contributions.

github-actions[bot] commented 3 months ago

This issue has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.