confluentinc / kafka-connect-bigquery

A Kafka Connect BigQuery sink connector
Apache License 2.0
1 stars 1 forks source link

Connector Not Restarting after BigQueryConnectException #362

Open hendoxc opened 11 months ago

hendoxc commented 11 months ago

Hey, I have connector that sometimes fails due to BigQueryConnectException with the stack trace

Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write to table
Caused by: Not found: Dataset my_dataset.test_table; See logs for more detail
    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:233)

I have the sink connector settings:

errors.retry.timeout = -1
errors.retry.delay.max.ms = 1000

bigQueryRetryWait = 1000
bigQueryRetry = 100000

Any ideas how I could get this to keep retrying?, I know the problem is intermittent, as when I go to restart the connector things run fine again for sometime until the issue arises again

b-goyal commented 10 months ago

@hendoxc , could you share full logs? From the logs you have shared, it is not clear from where the issue is originating.

upendrao commented 5 months ago

Hi, I have also experienced this issue several times. Attached error logs below. This happens with both partitioned and non-partitioned tables while the destination project and dataset have been there forever. Once I restart the task it resumes normally. All my datasets are in 'EU' multi-region. Could this be an issue with Google BQ API in use? Is it safe to retry this scenario using bigQueryRetry option?

[2024-04-12 23:02:32,513] WARN [tilbud-offers-sink|task-1] Could not write batch of size 1 to BigQuery. Error code: 404, underlying error (if present): BigQueryError{reason=notFound, location=null, message=Not found: Dataset some-project-id:some_dataset} (com.wepay.kafka.connect.bigquery.write.batch.TableWriter:97)
com.google.cloud.bigquery.BigQueryException: Not found: Dataset some-project-id:some_dataset
    at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
    at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:494)
    at com.google.cloud.bigquery.BigQueryImpl$28.call(BigQueryImpl.java:1068)
    at com.google.cloud.bigquery.BigQueryImpl$28.call(BigQueryImpl.java:1065)
    at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
    at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
    at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
    at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:1064)
    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:96)
    at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:116)
    at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:93)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found
POST https://www.googleapis.com/bigquery/v2/projects/some-project-id/datasets/some_dataset/tables/some_table$20240412/insertAll?prettyPrint=false
{
  "code" : 404,
  "errors" : [ {
    "domain" : "global",
    "message" : "Not found: Dataset some-project-id:some_dataset",
    "reason" : "notFound"
  } ],
  "message" : "Not found: Dataset some-project-id:some_dataset",
  "status" : "NOT_FOUND"
}
    at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
    at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:428)
    at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
    at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
    at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:492)
    ... 12 more
androa commented 2 months ago

I'm also seeing this issue. The tables are stable and it seems like it's simply temporary issues on GCP. There is in total 6 topics being replicated, and only one failed with this error:

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:632)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:350)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
  at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: A write thread has failed with an unrecoverable error
Caused by: Exceeded configured 0 attempts for write request
  at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.lambda$maybeThrowEncounteredError$0(KCBQThreadPoolExecutor.java:101)
  at java.base/java.util.Optional.ifPresent(Optional.java:183)
  at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredError(KCBQThreadPoolExecutor.java:100)
  at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:240)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)
  ... 11 more
Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Exceeded configured 0 attempts for write request
Caused by: Service is unavailable. Please retry.
  at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:147)
  at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:93)
  ... 3 more
Caused by: com.google.cloud.bigquery.BigQueryException: Service is unavailable. Please retry.
  at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.translate(HttpBigQueryRpc.java:115)
  at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:494)
  at com.google.cloud.bigquery.BigQueryImpl$28.call(BigQueryImpl.java:1068)
  at com.google.cloud.bigquery.BigQueryImpl$28.call(BigQueryImpl.java:1065)
  at com.google.api.gax.retrying.DirectRetryingExecutor.submit(DirectRetryingExecutor.java:103)
  at com.google.cloud.RetryHelper.run(RetryHelper.java:76)
  at com.google.cloud.RetryHelper.runWithRetries(RetryHelper.java:50)
  at com.google.cloud.bigquery.BigQueryImpl.insertAll(BigQueryImpl.java:1064)
  at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:93)
  at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:112)
  ... 4 more
Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 503 Service Unavailable
POST https://www.googleapis.com/bigquery/v2/projects/my-project-here/datasets/dataprodukt/tables/some_data$20240711/insertAll?prettyPrint=false
{
  \"code\" : 503,
  \"errors\" : [ {
    \"domain\" : \"global\",
    \"message\" : \"Service is unavailable. Please retry.\",
    \"reason\" : \"backendError\"
  } ],
  \"message\" : \"Service is unavailable. Please retry.\",
  \"status\" : \"UNAVAILABLE\"
}
  at com.google.api.client.googleapis.json.GoogleJsonResponseException.from(GoogleJsonResponseException.java:146)
  at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:118)
  at com.google.api.client.googleapis.services.json.AbstractGoogleJsonClientRequest.newExceptionOnError(AbstractGoogleJsonClientRequest.java:37)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest$1.interceptResponse(AbstractGoogleClientRequest.java:428)
  at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1111)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:514)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.executeUnparsed(AbstractGoogleClientRequest.java:455)
  at com.google.api.client.googleapis.services.AbstractGoogleClientRequest.execute(AbstractGoogleClientRequest.java:565)
  at com.google.cloud.bigquery.spi.v2.HttpBigQueryRpc.insertAll(HttpBigQueryRpc.java:492)
  ... 12 more