confluentinc / kafka-connect-bigquery

A Kafka Connect BigQuery sink connector
Apache License 2.0
1 stars 1 forks source link

The simplest possible connector config cannot deliver data #135

Closed ideasculptor closed 2 years ago

ideasculptor commented 3 years ago

I establish a connector on a topic (which has 1000 messages in it, which amount to very little actual data since they are tiny).

{
  "name": "MyDomainEventBigQuerySinkConnector",
  "config": {
    "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "topics": "MyDomainEvent",
    "batchLoadIntervalSec": "120",
    "project": REDACTED,
    "defaultDataset": REDACTED,
    "keyfile": REDACTED,
    "keySource": "JSON",
    "sanitizeTopics": "true",
    "sanitizeFieldNames": "true",
    "autoCreateTables": "true",
    "allowNewBigQueryFields": "true",
    "allowBigQueryRequiredFieldRelaxation": "true",
    "allowSchemaUnionization": "true",
    "upsertEnabled": "false",
    "deleteEnabled": "false",
    "timePartitioningType": "HOUR",
    "includeKafkaData": "false",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "bigQueryPartitionDecorator": "true"
  }
}

The connector starts up without difficult. After just more than 1 minute, it finally tries to create the table. It is possible that was a response to a new message arriving rather than a timeout, since I sent an additional 500 messages to try to wake it up.

connect            | [2021-09-03 04:17:45,155] INFO interceptor=confluent.monitoring.interceptor.connector-consumer-MyDomainEventBigQuerySinkConnector-0 created for client_id=connector-consumer-MyDomainEventBigQuerySinkConnector-0 client_type=CONSUMER session= cluster=VSRwwYpGQz-nKD4NkwIH_g group=connect-MyDomainEventBigQuerySinkConnector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-09-03 04:17:45,166] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-MyDomainEventBigQuerySinkConnector-0] Cluster ID: VSRwwYpGQz-nKD4NkwIH_g (org.apache.kafka.clients.Metadata)
connect            | [2021-09-03 04:18:56,996] INFO Attempting to create table `REDACTED`.`MyDomainEvent` with schema Schema{fields=[Field{name=metadata, type=RECORD, mode=NULLABLE, description=null, policyTags=null}, Field{name=customStringField, type=STRING, mode=NULLABLE, description=null, policyTags=null}, Field{name=customLongField, type=INTEGER, mode=NULLABLE, description=null, policyTags=null}]} (com.wepay.kafka.connect.bigquery.SchemaManager)

After waiting several more minutes, it finally spits out the following INFO message. No records have been written to the table

connect            | [2021-09-03 04:22:45,182] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Member connector-consumer-MyDomainEventBigQuerySinkConnector-0-c80ea1aa-2da7-4164-8bab-280ab5cb35ce sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

If I use control center to look at the consumer for the connector, it still has failed to consume any messages at all. See screenshot

Screen Shot 2021-09-02 at 9 30 22 PM

I can confirm with a non-connector client that the topic has 1500 messages and that it is working just fine.

Screen Shot 2021-09-02 at 9 31 01 PM

Surely the connector is able to resolve being far enough behind the current max offset that it can eventually recover? Getting 1500 messages behind, which amounts to a total of 200kb, should not be enough to permanently hang a connector with the default config, particularly when the documentation makes no mention at all of either max.poll.interval.ms or max.poll.records or informs us as to what the default values are.

Screen Shot 2021-09-02 at 9 32 45 PM
ideasculptor commented 3 years ago

After 12 minutes without any output from the connector at all, I finally got this, which tells me to look at logs for details even though this IS the log data. And the consumer still shows NO messages consumed from any topic. It is still completely blank when I look at it in control center, just like the picture above, not even specifying the topic name

connect            | [2021-09-03 04:33:31,637] ERROR Task failed with com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException error: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=REDACTED, tableId=MyDomainEvent}} (com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor)
connect            | Exception in thread "pool-6-thread-1" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=REDACTED, tableId=MyDomainEvent}}
connect            |    at com.wepay.kafka.connect.bigquery.write.row.AdaptiveBigQueryWriter.performWriteRequest(AdaptiveBigQueryWriter.java:145)
connect            |    at com.wepay.kafka.connect.bigquery.write.row.BigQueryWriter.writeRows(BigQueryWriter.java:118)
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.TableWriter.run(TableWriter.java:96)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | [2021-09-03 04:33:31,655] ERROR WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Offset commit failed, rewinding to last committed offsets (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=REDACTED, tableId=MyDomainEvent}}; See logs for more detail
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:96)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:160)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.preCommit(BigQuerySinkTask.java:176)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:387)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:216)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | [2021-09-03 04:33:31,661] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Seeking to offset 0 for partition MyDomainEvent-0 (org.apache.kafka.clients.consumer.KafkaConsumer)
connect            | [2021-09-03 04:33:31,662] ERROR WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=REDACTED, tableId=MyDomainEvent}}; See logs for more detail
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:96)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:160)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.preCommit(BigQuerySinkTask.java:176)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:387)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:216)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | [2021-09-03 04:33:31,666] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect            | [2021-09-03 04:33:31,666] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Lost previously assigned partitions MyDomainEvent-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect            | [2021-09-03 04:33:31,668] WARN WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | [2021-09-03 04:33:31,668] ERROR WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Commit of offsets threw an unexpected exception for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=REDACTED, tableId=MyDomainEvent}}; See logs for more detail
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:96)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:160)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.preCommit(BigQuerySinkTask.java:176)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:387)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:639)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1300(WorkerSinkTask.java:71)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsRevoked(WorkerSinkTask.java:719)
connect            |    at org.apache.kafka.clients.consumer.ConsumerRebalanceListener.onPartitionsLost(ConsumerRebalanceListener.java:198)
connect            |    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.invokePartitionsLost(ConsumerCoordinator.java:331)
connect            |    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:693)
connect            |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
connect            |    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
connect            |    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
connect            |    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1264)
connect            |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
connect            |    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1213)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:452)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:324)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | [2021-09-03 04:33:31,672] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | [2021-09-03 04:33:31,691] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Group coordinator broker:29092 (id: 2147483646 rack: null) is unavailable or invalid due to cause: null.isDisconnected: true. Rediscovery will be attempted. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | [2021-09-03 04:33:31,785] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Discovered group coordinator broker:29092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | [2021-09-03 04:33:31,786] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | org.apache.kafka.common.errors.DisconnectException
connect            | [2021-09-03 04:33:31,787] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | [2021-09-03 04:33:31,790] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | [2021-09-03 04:33:31,795] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Successfully joined group with generation Generation{generationId=1, memberId='connector-consumer-MyDomainEventBigQuerySinkConnector-0-2ce04c96-c402-47dd-8ec0-9d71addd6e1d', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | [2021-09-03 04:33:31,796] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Finished assignment for group at generation 1: {connector-consumer-MyDomainEventBigQuerySinkConnector-0-2ce04c96-c402-47dd-8ec0-9d71addd6e1d=Assignment(partitions=[MyDomainEvent-0])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect            | [2021-09-03 04:33:31,802] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Successfully synced group in generation Generation{generationId=1, memberId='connector-consumer-MyDomainEventBigQuerySinkConnector-0-2ce04c96-c402-47dd-8ec0-9d71addd6e1d', protocol='range'} (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | [2021-09-03 04:33:31,803] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Notifying assignor about the new Assignment(partitions=[MyDomainEvent-0]) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect            | [2021-09-03 04:33:31,803] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Adding newly assigned partitions: MyDomainEvent-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect            | [2021-09-03 04:33:31,804] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Found no committed offset for partition MyDomainEvent-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect            | [2021-09-03 04:33:31,888] ERROR WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=REDACTED, tableId=MyDomainEvent}}; See logs for more detail (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=REDACTED, tableId=MyDomainEvent}}; See logs for more detail
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:233)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | [2021-09-03 04:33:31,890] WARN WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | [2021-09-03 04:33:31,890] ERROR WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Commit of offsets threw an unexpected exception for sequence number 3: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=REDACTED, tableId=MyDomainEvent}}; See logs for more detail
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:96)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:160)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.preCommit(BigQuerySinkTask.java:176)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:387)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:639)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | [2021-09-03 04:33:31,891] ERROR WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect            | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:609)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | Caused by: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=REDACTED, tableId=MyDomainEvent}}; See logs for more detail
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.put(BigQuerySinkTask.java:233)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
connect            |    ... 10 more
connect            | [2021-09-03 04:33:31,896] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Revoke previously assigned partitions MyDomainEvent-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect            | [2021-09-03 04:33:31,897] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Member connector-consumer-MyDomainEventBigQuerySinkConnector-0-2ce04c96-c402-47dd-8ec0-9d71addd6e1d sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | [2021-09-03 04:33:31,903] INFO Publish thread interrupted for client_id=connector-consumer-MyDomainEventBigQuerySinkConnector-0 client_type=CONSUMER session= cluster=VSRwwYpGQz-nKD4NkwIH_g group=connect-MyDomainEventBigQuerySinkConnector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-09-03 04:33:31,904] INFO Publishing Monitoring Metrics stopped for client_id=connector-consumer-MyDomainEventBigQuerySinkConnector-0 client_type=CONSUMER session= cluster=VSRwwYpGQz-nKD4NkwIH_g group=connect-MyDomainEventBigQuerySinkConnector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-09-03 04:33:31,904] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-MyDomainEventBigQuerySinkConnector-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
connect            | [2021-09-03 04:33:31,914] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 04:33:31,915] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 04:33:31,915] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 04:33:31,916] INFO App info kafka.producer for confluent.monitoring.interceptor.connector-consumer-MyDomainEventBigQuerySinkConnector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-09-03 04:33:31,916] INFO Closed monitoring interceptor for client_id=connector-consumer-MyDomainEventBigQuerySinkConnector-0 client_type=CONSUMER session= cluster=VSRwwYpGQz-nKD4NkwIH_g group=connect-MyDomainEventBigQuerySinkConnector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-09-03 04:33:31,917] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 04:33:31,917] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 04:33:31,917] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 04:33:31,920] INFO App info kafka.consumer for connector-consumer-MyDomainEventBigQuerySinkConnector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)
ideasculptor commented 3 years ago

Just to take permissions off the table, I gave the service account full Owner privileges on the entire project. There is nothing it cannot do. It still just hangs there, never doing work. Also, when I switch the connector to batch loading, it has no trouble reading from the topic OR writing to the bigquery table. It immediately sees all the messages, writes them to blobs, and then loads them 2 minutes later, deleting them 2 minutes after that. So whatever it is failling on when in streaming mode, it isn't failing on permissions.

When I switch back to streaming mode by deleting and recreating connector with the config included from above, it goes right back to not seeing new messages and producing no output in the logs or in the bq table. The consumer once again shows an ever-increasing lag.

However, if I put it back in batch mode without deleting and re-creating, it remains hung. The consumer fails to make progress and it never generates any log output that gives the impression is it doing anything. There is no change in the number of rows in the table even though it is 1000 entries behind the topic. After 5 minutes, I once again get one of these:

connect            | [2021-09-03 10:08:14,327] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Member connector-consumer-MyDomainEventBigQuerySinkConnector-0-fb8da841-83a5-44ff-98ba-5d5f179cb1ac sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

So then I deleted the connector and recreated it, once again in batch mode. I got a bunch of messages about consumer rebalancing, which didn't make much sense since there was only one consumer, but I assumed the old one didn't shut down correctly. After several minuutes of log messages about rebalancing, it finally consumed the 1000 messages that it was behind by and I sent another 500 just to validate. All 1500 got written to GCS (as 2 blobs of 500 and 500 blobs of 1, which is FAR from optimal, still). Then I got a bunch of logs like those I received earlier:

connect            | [2021-09-03 10:17:41,146] ERROR WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Commit of offsets threw an unexpected exception for sequence number 1: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=temporal, tableId=MyDomainEvent}}, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=temporal, tableId=MyDomainEvent}}; See logs for more detail
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:96)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:160)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.preCommit(BigQuerySinkTask.java:176)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:387)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:216)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | [2021-09-03 10:17:41,149] WARN WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Offset commit failed during close (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | [2021-09-03 10:17:41,149] ERROR WorkerSinkTask{id=MyDomainEventBigQuerySinkConnector-0} Commit of offsets threw an unexpected exception for sequence number 2: null (org.apache.kafka.connect.runtime.WorkerSinkTask)
connect            | com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Some write threads encountered unrecoverable errors: com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=temporal, tableId=MyDomainEvent}}, com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: Failed to write rows after BQ table creation or schema update within 30 attempts for: GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=temporal, tableId=MyDomainEvent}}; See logs for more detail
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.maybeThrowEncounteredErrors(KCBQThreadPoolExecutor.java:108)
connect            |    at com.wepay.kafka.connect.bigquery.write.batch.KCBQThreadPoolExecutor.awaitCurrentTasks(KCBQThreadPoolExecutor.java:96)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.flush(BigQuerySinkTask.java:160)
connect            |    at com.wepay.kafka.connect.bigquery.BigQuerySinkTask.preCommit(BigQuerySinkTask.java:176)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:387)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:639)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)
connect            |    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
connect            |    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
connect            |    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
connect            |    at java.base/java.lang.Thread.run(Thread.java:829)
connect            | [2021-09-03 10:17:41,156] INFO [Consumer clientId=connector-consumer-MyDomainEventBigQuerySinkConnector-0, groupId=connect-MyDomainEventBigQuerySinkConnector] Lost previously assigned partitions MyDomainEvent-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect            | [2021-09-03 10:17:41,160] INFO Publish thread interrupted for client_id=connector-consumer-MyDomainEventBigQuerySinkConnector-0 client_type=CONSUMER session= cluster=VSRwwYpGQz-nKD4NkwIH_g group=connect-MyDomainEventBigQuerySinkConnector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-09-03 10:17:41,161] INFO Publishing Monitoring Metrics stopped for client_id=connector-consumer-MyDomainEventBigQuerySinkConnector-0 client_type=CONSUMER session= cluster=VSRwwYpGQz-nKD4NkwIH_g group=connect-MyDomainEventBigQuerySinkConnector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-09-03 10:17:41,161] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-MyDomainEventBigQuerySinkConnector-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
connect            | [2021-09-03 10:17:41,168] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 10:17:41,168] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 10:17:41,168] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 10:17:41,168] INFO App info kafka.producer for confluent.monitoring.interceptor.connector-consumer-MyDomainEventBigQuerySinkConnector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-09-03 10:17:41,168] INFO Closed monitoring interceptor for client_id=connector-consumer-MyDomainEventBigQuerySinkConnector-0 client_type=CONSUMER session= cluster=VSRwwYpGQz-nKD4NkwIH_g group=connect-MyDomainEventBigQuerySinkConnector (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-09-03 10:17:41,168] INFO Metrics scheduler closed (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 10:17:41,168] INFO Closing reporter org.apache.kafka.common.metrics.JmxReporter (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 10:17:41,168] INFO Metrics reporters closed (org.apache.kafka.common.metrics.Metrics)
connect            | [2021-09-03 10:17:41,170] INFO App info kafka.consumer for connector-consumer-MyDomainEventBigQuerySinkConnector-0 unregistered (org.apache.kafka.common.utils.AppInfoParser)

But those were immediately followed by logs about ingesting the batched data correctly. Clearly, the failures were from the previous connector which somehow did not go away when I deleted it and recreated it because it was locked in some synchronous operation that takes many minutes to complete. The current connector appears to be running correctly in batch mode again. So it is definitely only streaming mode which doesn't work.

connect            | [2021-09-03 10:19:09,408] INFO GCS To BQ job tally: 1 successful jobs, 0 failed jobs. (com.wepay.kafka.connect.bigquery.GCSToBQLoadRunnable)
connect            | [2021-09-03 10:19:09,408] INFO Attempting to delete 502 blobs (com.wepay.kafka.connect.bigquery.GCSToBQLoadRunnable)
connect            | [2021-09-03 10:19:22,803] INFO Successfully deleted 502 blobs; failed to delete 0 blobs (com.wepay.kafka.connect.bigquery.GCSToBQLoadRunnable)
C0urante commented 3 years ago

I can't speak to the GCS batch logic as I'm not very familiar with it. I can say that the configuration provided in the initial description hints that at least one underlying issue may be https://github.com/confluentinc/kafka-connect-bigquery/issues/138, given these properties:

{
    "autoCreateTables": "true",
    "timePartitioningType": "HOUR",
    "bigQueryPartitionDecorator": "true"
}

Two things you might try are to either change timePartitioningType to DAY (which will cause tables created by the connector to be partitioned by day instead of hour) or set bigQueryPartitionDecorator to false (which will cause the connector to write to tables without using decorator syntax, which at this point is only necessary if you would like to use message time partitioning and AFAIU in all other cases can be disabled with little to no change in behavior).

C0urante commented 2 years ago

Closing since this appears to be due to #138.