confluentinc / kafka-connect-bigquery

A Kafka Connect BigQuery sink connector
Apache License 2.0
3 stars 2 forks source link

Potenial data consistency issues #381

Open abdelhakimbendjabeur opened 9 months ago

abdelhakimbendjabeur commented 9 months ago

Hello 👋

I have been experiencing some data consistency issues when sinking a single Kafka topic to a BigQuery table using the connector

Version

wepay/kafka-connect-bigquery 2.5.0

cp-kafka-connect-base:7.4.0

...
ARG KC_BQ_SINK_VERSION=2.5.0

RUN confluent-hub install --no-prompt wepay/kafka-connect-bigquery:${KC_BQ_SINK_VERSION}
...

Source Topic

The source topic contains CDC data from a PG table, it goes though a Flink pipeline that add a few extra columns and filterd rows based on some criteria. -> timestamp refers to the moment when the PG transaction occurred (insert, update, delete) -> event_id is UUID that is unique per payload, it's hash to identify unique events.

"number_of_partitions" = 12
"cleanup.policy"       = "compact"
"retention.ms"         = 25 * 30 * 24 * 3600 * 1000 # 25 months
"retention.bytes"      = "-1"
"segment.bytes"        = 100 * 1024 * 1024    # 100MB
"segment.ms"           = 7 * 24 * 3600 * 1000 # 7 days

Connector configuration

connect-distributed.properties


group.id=kc-analytics-cdc-filtered-bq-sink
bootstrap.servers=SASL_SSL://xxx.gcp.confluent.cloud:9092
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='uuu'   password='xxx';
consumer.sasl.mechanism=PLAIN
consumer.security.protocol=SASL_SSL
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule   required username='uuu'   password='xxx';
consumer.auto.offset.reset=earliest
consumer.max.poll.records=500
consumer.max.partition.fetch.bytes=1048576
consumer.fetch.min.bytes=1
consumer.fetch.max.wait.ms=500

key.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false

value.converter=io.apicurio.registry.utils.converter.AvroConverter value.converter.schemas.enable=true

value.converter.apicurio.registry.url=http://apicurio-registry.apicurio.svc.cluster.local:8080/apis/registry/v2 value.converter.apicurio.auth.username=uu1 value.converter.apicurio.auth.password=xxx value.converter.apicurio.registry.as-confluent=false

offset.storage.topic=kc.internal.analytics-cdc-filtered.offsets offset.storage.replication.factor=3 config.storage.topic=kc.internal.analytics-cdc-filtered.configs config.storage.replication.factor=3 status.storage.topic=kc.internal.analytics-cdc-filtered.status status.storage.replication.factor=3 offset.flush.interval.ms=10000 plugin.path=/usr/share/java,/usr/share/confluent-hub-components,/usr/share/java/kafka-connect-plugins/


> **sink-connector.json**
```json
{
    "allowNewBigQueryFields": "true",
    "autoCreateTables": "false",
    "bigQueryPartitionDecorator": "false",
    "bigQueryRetry": "3",
    "bigQueryRetryWait": "1000",
    "connector.class": "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
    "defaultDataset": "analytics",
    "keyfile": "/opt/kafka/config/gcp_credentials.json",
    "project": "xxx-production",
    "tasks.max": 4,
    "topics": "analytics.cdc.ticket-message",
    "transforms": "tombstoneHandler,copyFieldsFromKey,renameFields,routeTicketMessage,fixEpochTs,addMetadata,timestampConverterEventTs",
    "transforms.addMetadata.offset.field": "__kafka_offset",
    "transforms.addMetadata.partition.field": "__kafka_partition",
    "transforms.addMetadata.static.field": "__cluster",
    "transforms.addMetadata.static.value": "aus-xxx",
    "transforms.addMetadata.type": "org.apache.kafka.connect.transforms.InsertField$Value",

    "transforms.copyFieldsFromKey.fields": "event_id:STRING:__event_id",
    "transforms.copyFieldsFromKey.type": "com.xxx.kafka.connect.transforms.CopyFieldFromKeyToValue",

    "transforms.fixEpochTs.fields": "filtered_ticket_message:__event_timestamp",
    "transforms.fixEpochTs.type": "com.xxx.kafka.connect.transforms.SetToEpoch",

    "transforms.renameFields.renames": "timestamp:__event_timestamp",
    "transforms.renameFields.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",

    "transforms.routeTicketMessage.regex": "analytics.cdc.ticket-message",
    "transforms.routeTicketMessage.replacement": "filtered_ticket_message",
    "transforms.routeTicketMessage.type": "org.apache.kafka.connect.transforms.RegexRouter",

    "transforms.timestampConverterEventTs.field": "__event_timestamp",
    "transforms.timestampConverterEventTs.target.type": "Timestamp",
    "transforms.timestampConverterEventTs.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",

    "transforms.tombstoneHandler.behavior": "ignore",
    "transforms.tombstoneHandler.type": "io.confluent.connect.transforms.TombstoneHandler"
}

I use 2 custom SMT that do not ignore records

CopyFieldFromKeyToValue -> add a new field to the payload from the record key. SetToEpoch -> if the value is negative, replace by 0.

Deployment on Kubernetes

replica_count     = 4 
resource_requests = { cpu = "2", memory = "4Gi" }
resource_limits   = { cpu = "2", memory = "4Gi" }
number_of_tasks   = 4

PS. I deliberately put high resources because I already had the data consistency issues and I thought it was related multiple restarted caused by throttling on CPU/Memory when the resources were not enough.

Bug description

After deploying the connector, I waited for it to reach the tail of the topic before running some checks. I noticed some records missing. How? -> I have another pipeline that sinks records to ClickHouse for analytics purposes and the records are there.

How I proceeded?

GROUP TOPIC PARTITION NEW-OFFSET connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 9 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 11 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 3 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 2 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 5 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 8 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 6 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 1 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 7 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 10 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 0 0 connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 4 0

- Recreate the connector
```bash
curl -i -X PUT -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/analytics-cdc-json-bq-sink/config -d @sink-connector.json

After noticing that the consumer-group is at the tail, run the same query on both the table with duplicates and the backup.

$ ./bin/kafka-consumer-groups.sh \
  --command-config=... \
  --bootstrap-server=... \
  --group="connect-analytics-cdc-filtered-bq-sink" \
  --describe --timeout=100000

GROUP                                  TOPIC                                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 3          20576030        20576030        0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 4          20617505        20617506        1
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 5          20633814        20633814        0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 6          20606746        20606747        1
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 7          20570934        20570935        1
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 8          20627953        20627953        0
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 9          20641628        20641633        5
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 10         20610793        20610795        2
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 11         20586025        20586027        2
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 0          20625081        20625083        2
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 1          20642950        20642958        8
connect-analytics-cdc-filtered-bq-sink analytics.cdc.ticket-message 2          20593419        20593429        10

Screenshot 2024-02-07 at 17 26 24

What I discovered is the second run had brought new records, meaning the first run skipped them somehow, which is very concerning... Note the rows that are circled in red, they show more unique tickets/messages/events on the run_after_offset_reset data.

The fact that it has more unique tickets/messages/events means that these records are indeed in the topic and there were somehow missed during the first sink.

I am having trouble understanding where the problem comes from. No weird error logs have been noticed.

Has anybody experienced something similar or if there is something wrong with the config, I'd love to hear about it.

Thank you 🙏

pedromazala commented 9 months ago

I think this may be related to https://github.com/confluentinc/kafka-connect-bigquery/pull/333

But I see 2.6 as RC since September. Do you folks have a due date to release it?

abdelhakimbendjabeur commented 9 months ago

333 Seems to have been reverted in https://github.com/confluentinc/kafka-connect-bigquery/pull/357

abdelhakimbendjabeur commented 9 months ago

@sp-gupta @b-goyal Sorry to ping you, do you have any insight regarding this issue?

andrelu commented 4 months ago

Hello @abdelhakimbendjabeur. We are facing a similar issue with our BigQuery Sink connector deployment. I'm interested in this topic. Have you find any way of mitigating this?

abdelhakimbendjabeur commented 4 months ago

Hi @andrelu No progress on our side on this one. We had to rerun the connector to cover the missing data. This is not ideal as it's more expensive.