wepay / kafka-connect-bigquery

DEPRECATED. PLEASE USE https://github.com/confluentinc/kafka-connect-bigquery. A Kafka Connect BigQuery sink connector
Apache License 2.0
155 stars 192 forks source link

Failed to update table schema #289

Open thienmcff opened 4 years ago

thienmcff commented 4 years ago

I have Postgres source connector and Bigquery sink connector, also Schema Registry. I add new field into a table in Postgres and insert a row into the table. Postgres connector detects the changes, but Bigquery failed to update schema (autoUpdateSchamas is true):

[2020-07-03 02:35:58,704] ERROR WorkerSinkTask{id=bq-core-field-connector-1} Commit of offsets threw an unexpected exception for sequence number 96048: null (org.apache.kafka.connect.runtime.WorkerSinkTask:259) com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; See logs for more detail at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:97) at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:119) at org.apache.kafka.connect.sink.SinkTask.preCommit(SinkTask.java:125) at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:379) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:208) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) [2020-07-03 02:35:58,713] INFO Putting 19 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask:170) [2020-07-03 02:35:58,874] INFO Putting 29 records in the sink. (com.wepay.kafka.connect.bigquery.BigQuerySinkTask:170) [2020-07-03 02:35:59,229] INFO Attempting to update table GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=BQ_CORE_FIELD_ALL_SCHEMAS, tableId=core_yga_ed_core_data_device}} with schema Schema{fields=[Field{name=before, type=RECORD, mode=NULLABLE, description=null}, Field{name=after, type=RECORD, mode=NULLABLE, description=null}, Field{name=source, type=RECORD, mode=REQUIRED, description=null}, Field{name=op, type=STRING, mode=REQUIRED, description=null}, Field{name=ts_ms, type=INTEGER, mode=NULLABLE, description=null}, Field{name=transaction, type=RECORD, mode=NULLABLE, description=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager:60) [2020-07-03 02:35:59,915] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: Failed to update table schema for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=BQ_CORE_FIELD_ALL_SCHEMAS, tableId=core_yga_ed_core_data_device}} (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor:68)

C0urante commented 4 years ago

@mtthien-tma what's the name of the column you added to your Postgres table?

thienmcff commented 4 years ago

@C0urante I added 'priority' column to the table.

C0urante commented 4 years ago

@mtthien-tma that doesn't appear to be a name of a column in the BigQuery schema in the log you provided. Are you sure that's the only column? The ones listed are before, after, source,op, ts_ms, and transaction.

thienmcff commented 4 years ago

I am sure that is the field. It should be nested in before and after RECORD. It seems that the log does not show all the fields in RECORD type.

C0urante commented 4 years ago

Ah, gotcha. You mentioned you're using Schema Registry; can you provide the Avro schema for the data that's breaking on the connector?

I should let you know--right now I suspect that a non-backwards-compatible change took place on the schema of the data (probably the addition of a non-optional field) and that's what's causing the failures with the connector.

thienmcff commented 4 years ago

Hi @C0urante Sorry for so late reply. You are right that adding non-optional field seems to be the root cause. I tried adding nullable field in Postgres and could see that BQ connector could create the column in BQ. And it failed when I added required field in Postgres.

I am using BQ connector 1.3. Is there any workaround? Do I have to manually update BQ schema?