confluentinc / kafka-connect-bigquery

A Kafka Connect BigQuery sink connector
Apache License 2.0
3 stars 2 forks source link

Could not specify the google cloud region #419

Open leonardotorresaltez opened 4 weeks ago

leonardotorresaltez commented 4 weeks ago

Hello,

We have the following error in production: BigQuery dataset not found.

this error is because we are writing records to the Europe region , but some times the request timeouts exceed the deadline and we are redirected to the US zone . We need to avoid this behavior.

com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: A write thread has failed with an unrecoverable error Caused by: com.google.api.client.googleapis.json.GoogleJsonResponseException: 404 Not Found Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write to table { "code" : 404, "errors" : [ { "domain" : "global", "message" : "Not found: Dataset xxxxx", "reason" : "notFound" } ], "message" : "Not found: Dataset xxxx", "status" : "NOT_FOUND" Caused by: Failed to write to table at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.lambda$maybeThrowEncounteredError$0(KCBQThreadPoolExecutor.java:101) at java.base/java.util.Optional.ifPresent(Optional.java:183) at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredError(KCBQThreadPoolExecutor.java:100) at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:90) at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:171) at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.preCommit(BigQuerySinkTask.java:187) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:415) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:385) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:222) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:207) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)

After open a ticket to Google Cloud support and Confluent support i can see that the parameter "location" exist and the BigQuery connector is not using it.

The code is here: https://github.com/confluentinc/kafka-connect-bigquery/blob/master/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/GcpClientBuilder.java#L142

We propose to change the builder to receive the parameter location from the connector configuration: BigQueryOptions.newBuilder().setLocation( .... )

regards, Leonardo