confluentinc / kafka-connect-bigquery

A Kafka Connect BigQuery sink connector
Apache License 2.0
3 stars 2 forks source link

bigQueryRetry and bigQueryRetryWait don't seem to take effect #94

Open kdanielss opened 3 years ago

kdanielss commented 3 years ago

In version 2.1.0, if there is a temporary connection issue with BigQuery (connection reset) the task fails immediately and doesn't retry.

This is the exception reported in the failing task:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write to table
Caused by: Connection reset; See logs for more detail
    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:233)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:563)
    ... 10 more

We have configured our connector with the following parameters:

bigQueryRetry
bigQueryRetryWait
errors.retry.timeout
errors.retry.delay.max.ms

and still the tasks fail on first connection reset.

Is this a known issue or are we missing something?

C0urante commented 3 years ago

The bigQueryRetry property is documented as "The number of retry attempts that will be made per BigQuery request that fails with a backend error or a quota exceeded error". This requires the request to actually go through and be met with a response from BigQuery; it looks like the connector isn't even able to healthily connect to BigQuery in the trace you've posted here.

I don't believe errors.retry.timeout and errors.retry.delay.max.ms are recognized properties for this connector.

I think it might be alright to expand bigQueryRetry to also cover retry when connectivity issues are encountered. The only tricky part here is how we would identify those connectivity issues without false positives.

kdanielss commented 3 years ago

Thanks for your answer. Can you clarify what is meant by "backend error"?

Just to explain our situation, we have Kafka connect running in GCP and Confluent Cloud cluster that also runs in GCP, still from time to time (1-2 times an hour) we experience the error above. Most of the time the tasks that go to failed state are dealing with topics that have higher traffic (~10K messages / sec). Our expectation was that Kafka Connect / BQ Connector would be able to deal with temporary network issues.

For now, we implemented a job that restarts failed tasks at regular intervals, however this feels more like a work-around than a solution. Can you advice on a better solution?

IOANNIS1234 commented 3 years ago

We are running the connector with version 1.6.6, and we also believe the bigQueryRetry bigQueryRetryWait are not taken into effect. We can see that, for example, in case of a deserialisation error ( which should also be a "backend error"), the malformed record is written once in the deadletter queue. Also in case a malformed JSON record is published in the topic( for example a field is missing), before the connector finally fails it seems it will not retry even though the conf bigQueryRetry is specified.

C0urante commented 3 years ago

Hi all--apologies for the delay.

@kdanielss There are a number of errors that can happen BigQuery-side that will result in the HTTP request completing, but the response to it will include information that the insertion attempt failed. These are the "backend errors" that we cover right now, but I think it's fine to expand the scope of errors that are retried on as long as those errors are actually retriable. I think network hiccups can fall under that umbrella so I'd be fine with altering the connector to retry on them; if you'd like to open a PR to address this, I'd be happy to review. In case you choose to go that route, I'd just like to reiterate one important detail: we'd have to find a way to implement that broader retry support without adding the chance for false positives (i.e., retrying on non-retriable errors). Or at least, we'd have to make a reasonable effort to.

@IOANNIS1234 Deserialization errors don't fall into that category and likely never will as they are handled by code outside the connector's control; specifically, the key and value Converter instances that the Connect framework sets up and uses for data that will eventually be passed to the connector. As far as retrying on records with missing required fields goes, I'm not sure what benefit that would provide, considering the cause of failure in that case is completely deterministic and, without an update to the BigQuery table schema, there's no reason to expect a subsequent retry attempt to yield a different result.