databricks / iceberg-kafka-connect

Apache License 2.0
213 stars 47 forks source link

Equality delete files and data files being committed in the same snapshot after commit timeout #303

Open waiyan1612 opened 1 week ago

waiyan1612 commented 1 week ago

Problem Description

We have a scenario where the source db can generate create and delete events back to back. Example

INSERT INTO my_database.my_table_1 (id) VALUES (1);
DELERE FROM my_database.my_table_1 WHERE id = 1;

Connector version

v0.6.15

Expected Behaviour

When we query from Iceberg, the record id=1 should not show up.

Actual Behaviour

When we query from Iceberg, the record id=1 shows up.

Analysis

This is the part where we want to check with the contributors here who are more familiar with the code base and Iceberg in general. Do highlight if our understanding is incorrect.

From Iceberg Specifications

An equality delete file must be applied to a data file when all of the following are true:

  1. The data file's data sequence number is strictly less than the delete's data sequence number
  2. The data file's partition (both spec id and partition values) is equal [4] to the delete file's partition or the delete file's partition spec is unpartitioned

First we checked the metadata of our snapshot.

Snapshot Details

We see id=1 in data files #2(09:40:09) and eq delete file #1 (09:41:09). Could this be the reason why this eq delete is not applied during the table scan on read?

Connector details

We also checked the connector flow to understand why these are going into the same snapshot. We saw in the connector design docs that

If a worker sends a data files event after the commit timeout then the files are included in the next commit.

This is the log for the commits around this window.

[Worker-0e31a105f75e76cba] [2024-10-04 09:38:25,444] INFO [my-service-sink-iceberg|task-0] Sending event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:38:25,485] INFO [my-service-sink-iceberg|task-0] Started new commit with commit-id=7848680c-1e13-47a1-a2f8-cf49dbdf4aa8 (io.tabular.iceberg.connect.channel.Coordinator:103)
[Worker-0e31a105f75e76cba] [2024-10-04 09:39:25,506] INFO [my-service-sink-iceberg|task-0] Commit timeout reached (io.tabular.iceberg.connect.channel.CommitState:103)
[Worker-0e31a105f75e76cba] [2024-10-04 09:39:25,515] INFO [my-service-sink-iceberg|task-0] Sending event of type: COMMIT_COMPLETE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:39:25,556] INFO [my-service-sink-iceberg|task-0] Commit 7848680c-1e13-47a1-a2f8-cf49dbdf4aa8 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:164)
[Worker-0e31a105f75e76cba] [2024-10-04 09:39:48,324] INFO [my-service-sink-iceberg|task-0] Sending event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:39:48,359] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:39:48,361] WARN [my-service-sink-iceberg|task-0] Received commit ready for commit-id=7848680c-1e13-47a1-a2f8-cf49dbdf4aa8 when no commit in progress, this can happen during recovery (io.tabular.iceberg.connect.channel.CommitState:60)
[Worker-0e31a105f75e76cba] [2024-10-04 09:39:48,361] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,252] INFO [my-service-sink-iceberg|task-1] Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil:302)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,297] INFO [my-service-sink-iceberg|task-2] Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil:302)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,350] INFO [my-service-sink-iceberg|task-0] Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil:302)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,371] INFO [my-service-sink-iceberg|task-1] Refreshing table metadata from new version: s3://bucket/my_database.db/my_table_1/metadata/02471-1bb95145-e40b-4416-9c50-3cb4326e9135.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,384] INFO [my-service-sink-iceberg|task-2] Refreshing table metadata from new version: s3://bucket/my_database.db/my_table_1/metadata/02471-1bb95145-e40b-4416-9c50-3cb4326e9135.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,449] INFO [my-service-sink-iceberg|task-0] Refreshing table metadata from new version: s3://bucket/my_database.db/my_table_1/metadata/02471-1bb95145-e40b-4416-9c50-3cb4326e9135.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,547] INFO [my-service-sink-iceberg|task-2] Table loaded by catalog: iceberg.my_database.my_table_1 (org.apache.iceberg.BaseMetastoreCatalog:67)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,553] INFO [my-service-sink-iceberg|task-2] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,558] INFO [my-service-sink-iceberg|task-2] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,560] INFO [my-service-sink-iceberg|task-1] Table loaded by catalog: iceberg.my_database.my_table_1 (org.apache.iceberg.BaseMetastoreCatalog:67)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,572] INFO [my-service-sink-iceberg|task-1] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,576] INFO [my-service-sink-iceberg|task-1] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,595] INFO [my-service-sink-iceberg|task-0] Table loaded by catalog: iceberg.my_database.my_table_1 (org.apache.iceberg.BaseMetastoreCatalog:67)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,597] INFO [my-service-sink-iceberg|task-0] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,600] INFO [my-service-sink-iceberg|task-0] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,637] INFO [my-service-sink-iceberg|task-1] Sending event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,637] INFO [my-service-sink-iceberg|task-1] Sending event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,744] INFO [my-service-sink-iceberg|task-2] Sending event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,745] INFO [my-service-sink-iceberg|task-2] Sending event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,746] INFO [my-service-sink-iceberg|task-2] Sending event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,772] INFO [my-service-sink-iceberg|task-1] Handled event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,778] WARN [my-service-sink-iceberg|task-0] Received commit response with commit-id=7848680c-1e13-47a1-a2f8-cf49dbdf4aa8 when no commit in progress, this can happen during recovery (io.tabular.iceberg.connect.channel.CommitState:51)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,778] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,780] WARN [my-service-sink-iceberg|task-0] Received commit ready for commit-id=7848680c-1e13-47a1-a2f8-cf49dbdf4aa8 when no commit in progress, this can happen during recovery (io.tabular.iceberg.connect.channel.CommitState:60)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,780] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,783] INFO [my-service-sink-iceberg|task-1] Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil:302)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,819] INFO [my-service-sink-iceberg|task-2] Handled event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,825] WARN [my-service-sink-iceberg|task-0] Received commit response with commit-id=7848680c-1e13-47a1-a2f8-cf49dbdf4aa8 when no commit in progress, this can happen during recovery (io.tabular.iceberg.connect.channel.CommitState:51)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,825] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,829] WARN [my-service-sink-iceberg|task-0] Received commit response with commit-id=7848680c-1e13-47a1-a2f8-cf49dbdf4aa8 when no commit in progress, this can happen during recovery (io.tabular.iceberg.connect.channel.CommitState:51)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,829] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,830] WARN [my-service-sink-iceberg|task-0] Received commit ready for commit-id=7848680c-1e13-47a1-a2f8-cf49dbdf4aa8 when no commit in progress, this can happen during recovery (io.tabular.iceberg.connect.channel.CommitState:60)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,830] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,831] INFO [my-service-sink-iceberg|task-2] WorkerSinkTask{id=my-service-sink-iceberg-2} Committing offsets asynchronously using sequence number 3087: {...} (org.apache.kafka.connect.runtime.WorkerSinkTask:353)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,847] INFO [my-service-sink-iceberg|task-2] Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil:302)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,899] INFO [my-service-sink-iceberg|task-1] Refreshing table metadata from new version: s3://bucket/my_database.db/my_table_1/metadata/02471-1bb95145-e40b-4416-9c50-3cb4326e9135.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:08,948] INFO [my-service-sink-iceberg|task-2] Refreshing table metadata from new version: s3://bucket/my_database.db/my_table_1/metadata/02471-1bb95145-e40b-4416-9c50-3cb4326e9135.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:09,044] INFO [my-service-sink-iceberg|task-1] Table loaded by catalog: iceberg.my_database.my_table_1 (org.apache.iceberg.BaseMetastoreCatalog:67)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:09,051] INFO [my-service-sink-iceberg|task-1] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:09,055] INFO [my-service-sink-iceberg|task-1] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:09,257] INFO [my-service-sink-iceberg|task-2] Table loaded by catalog: iceberg.my_database.my_table_1 (org.apache.iceberg.BaseMetastoreCatalog:67)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:09,262] INFO [my-service-sink-iceberg|task-2] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:09,267] INFO [my-service-sink-iceberg|task-2] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:25,831] INFO [my-service-sink-iceberg|task-0] Sending event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:25,839] INFO [my-service-sink-iceberg|task-0] Started new commit with commit-id=4df1175b-fc65-4501-ac04-2d8acc9c521f (io.tabular.iceberg.connect.channel.Coordinator:103)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:48,466] INFO [my-service-sink-iceberg|task-0] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:48,491] INFO [my-service-sink-iceberg|task-0] Sending event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:48,492] INFO [my-service-sink-iceberg|task-0] Sending event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:48,517] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:48,523] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:48,523] INFO [my-service-sink-iceberg|task-0] Commit 4df1175b-fc65-4501-ac04-2d8acc9c521f not ready, received responses for 19 of 57 partitions, waiting for more (io.tabular.iceberg.connect.channel.CommitState:128)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:48,524] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:40:48,527] INFO [my-service-sink-iceberg|task-0] WorkerSinkTask{id=my-service-sink-iceberg-0} Committing offsets asynchronously using sequence number 3073: {...} (org.apache.kafka.connect.runtime.WorkerSinkTask:353)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:07,980] INFO [my-service-sink-iceberg|task-2] Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil:302)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:08,060] INFO [my-service-sink-iceberg|task-2] Refreshing table metadata from new version: s3://bucket/my_database.db/my_table_2/metadata/00644-7df38353-17dc-4fcd-83cf-1805844380b4.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:08,139] INFO [my-service-sink-iceberg|task-2] Table loaded by catalog: iceberg.my_database.my_table_2 (org.apache.iceberg.BaseMetastoreCatalog:67)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:08,143] INFO [my-service-sink-iceberg|task-2] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:08,145] INFO [my-service-sink-iceberg|task-2] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:08,774] INFO [my-service-sink-iceberg|task-1] WorkerSinkTask{id=my-service-sink-iceberg-1} Committing offsets asynchronously using sequence number 3064: {...} (org.apache.kafka.connect.runtime.WorkerSinkTask:353)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:08,932] INFO [my-service-sink-iceberg|task-2] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:09,046] INFO [my-service-sink-iceberg|task-2] Sending event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:09,046] INFO [my-service-sink-iceberg|task-2] Sending event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:09,047] INFO [my-service-sink-iceberg|task-2] Sending event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:09,059] INFO [my-service-sink-iceberg|task-2] Handled event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:09,064] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:09,066] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:09,066] INFO [my-service-sink-iceberg|task-0] Commit 4df1175b-fc65-4501-ac04-2d8acc9c521f not ready, received responses for 38 of 57 partitions, waiting for more (io.tabular.iceberg.connect.channel.CommitState:128)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:09,066] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,070] INFO [my-service-sink-iceberg|task-0] Commit timeout reached (io.tabular.iceberg.connect.channel.CommitState:103)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,071] INFO [my-service-sink-iceberg|task-0] Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil:302)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,071] INFO [my-service-sink-iceberg|task-0] Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil:302)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,145] INFO [my-service-sink-iceberg|task-0] Refreshing table metadata from new version: s3://bucket/my_database.db/my_table_1/metadata/02471-1bb95145-e40b-4416-9c50-3cb4326e9135.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,166] INFO [my-service-sink-iceberg|task-0] Refreshing table metadata from new version: s3://bucket/my_database.db/my_table_2/metadata/00644-7df38353-17dc-4fcd-83cf-1805844380b4.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,235] INFO [my-service-sink-iceberg|task-0] Table loaded by catalog: iceberg.my_database.my_table_2 (org.apache.iceberg.BaseMetastoreCatalog:67)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,264] INFO [my-service-sink-iceberg|task-0] Table loaded by catalog: iceberg.my_database.my_table_1 (org.apache.iceberg.BaseMetastoreCatalog:67)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,760] INFO [my-service-sink-iceberg|task-0] Successfully committed to table iceberg.my_database.my_table_2 in 281 ms (org.apache.iceberg.BaseMetastoreTableOperations:139)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,760] INFO [my-service-sink-iceberg|task-0] Committed snapshot 1701265366095856834 (BaseRowDelta) (org.apache.iceberg.SnapshotProducer:414)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,856] INFO [my-service-sink-iceberg|task-0] Refreshing table metadata from new version: s3://bucket/my_database.db/my_table_2/metadata/00645-17febf38-1d69-4848-a31a-8259493dbbbb.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,870] INFO [my-service-sink-iceberg|task-0] Successfully committed to table iceberg.my_database.my_table_1 in 345 ms (org.apache.iceberg.BaseMetastoreTableOperations:139)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,870] INFO [my-service-sink-iceberg|task-0] Committed snapshot 987432018576847863 (BaseRowDelta) (org.apache.iceberg.SnapshotProducer:414)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:26,960] INFO [my-service-sink-iceberg|task-0] Refreshing table metadata from new version: s3://bucket/my_database.db/my_table_1/metadata/02472-8870b1be-9482-44a1-a994-f3c9e8ac8657.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations:199)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:27,013] INFO [my-service-sink-iceberg|task-0] Received metrics report: CommitReport{tableName=iceberg.my_database.my_table_2, snapshotId=1701265366095856834, sequenceNumber=645, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.679890548S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=2}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=716}, addedDeleteFiles=CounterResult{unit=COUNT, value=2}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=2}, addedPositionalDeleteFiles=null, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=882}, addedRecords=CounterResult{unit=COUNT, value=2}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=5020}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=21072}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=8646242}, addedPositionalDeletes=null, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=354}, addedEqualityDeletes=CounterResult{unit=COUNT, value=2}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=1931}}, metadata={iceberg-version=Apache Iceberg 1.4.2 (commit f6bb9173b13424d77e7ad8439b5ef9627e530cb2)}} (org.apache.iceberg.metrics.LoggingMetricsReporter:38)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:27,013] INFO [my-service-sink-iceberg|task-0] Sending event of type: COMMIT_TABLE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:27,022] INFO [my-service-sink-iceberg|task-0] Commit complete to table my_database.my_table_2, snapshot 1701265366095856834, commit ID 4df1175b-fc65-4501-ac04-2d8acc9c521f, vtts null (io.tabular.iceberg.connect.channel.Coordinator:266)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:27,141] INFO [my-service-sink-iceberg|task-0] Received metrics report: CommitReport{tableName=iceberg.my_database.my_table_1, snapshotId=987432018576847863, sequenceNumber=2472, operation=overwrite, commitMetrics=CommitMetricsResult{totalDuration=TimerResult{timeUnit=NANOSECONDS, totalDuration=PT0.787312128S, count=1}, attempts=CounterResult{unit=COUNT, value=1}, addedDataFiles=CounterResult{unit=COUNT, value=4}, removedDataFiles=null, totalDataFiles=CounterResult{unit=COUNT, value=11215}, addedDeleteFiles=CounterResult{unit=COUNT, value=3}, addedEqualityDeleteFiles=CounterResult{unit=COUNT, value=1}, addedPositionalDeleteFiles=CounterResult{unit=COUNT, value=2}, removedDeleteFiles=null, removedEqualityDeleteFiles=null, removedPositionalDeleteFiles=null, totalDeleteFiles=CounterResult{unit=COUNT, value=789}, addedRecords=CounterResult{unit=COUNT, value=298}, removedRecords=null, totalRecords=CounterResult{unit=COUNT, value=821718217}, addedFilesSizeInBytes=CounterResult{unit=BYTES, value=113550}, removedFilesSizeInBytes=null, totalFilesSizeInBytes=CounterResult{unit=BYTES, value=39792265832}, addedPositionalDeletes=CounterResult{unit=COUNT, value=246}, removedPositionalDeletes=null, totalPositionalDeletes=CounterResult{unit=COUNT, value=57038}, addedEqualityDeletes=CounterResult{unit=COUNT, value=51}, removedEqualityDeletes=null, totalEqualityDeletes=CounterResult{unit=COUNT, value=21349}}, metadata={iceberg-version=Apache Iceberg 1.4.2 (commit f6bb9173b13424d77e7ad8439b5ef9627e530cb2)}} (org.apache.iceberg.metrics.LoggingMetricsReporter:38)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:27,141] INFO [my-service-sink-iceberg|task-0] Sending event of type: COMMIT_TABLE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:27,150] INFO [my-service-sink-iceberg|task-0] Commit complete to table my_database.my_table_1, snapshot 987432018576847863, commit ID 4df1175b-fc65-4501-ac04-2d8acc9c521f, vtts null (io.tabular.iceberg.connect.channel.Coordinator:266)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:27,157] INFO [my-service-sink-iceberg|task-0] Sending event of type: COMMIT_COMPLETE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:41:27,185] INFO [my-service-sink-iceberg|task-0] Commit 4df1175b-fc65-4501-ac04-2d8acc9c521f complete, committed to 2 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:164)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:08,898] INFO [my-service-sink-iceberg|task-1] Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool:153)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:08,928] INFO [my-service-sink-iceberg|task-1] Sending event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:08,929] INFO [my-service-sink-iceberg|task-1] Sending event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:08,942] INFO [my-service-sink-iceberg|task-1] Handled event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:08,948] WARN [my-service-sink-iceberg|task-0] Received commit response with commit-id=4df1175b-fc65-4501-ac04-2d8acc9c521f when no commit in progress, this can happen during recovery (io.tabular.iceberg.connect.channel.CommitState:51)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:08,949] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_RESPONSE (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:08,949] WARN [my-service-sink-iceberg|task-0] Received commit ready for commit-id=4df1175b-fc65-4501-ac04-2d8acc9c521f when no commit in progress, this can happen during recovery (io.tabular.iceberg.connect.channel.CommitState:60)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:08,949] INFO [my-service-sink-iceberg|task-0] Handled event of type: COMMIT_READY (io.tabular.iceberg.connect.channel.Channel:130)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:08,952] INFO [my-service-sink-iceberg|task-1] WorkerSinkTask{id=my-service-sink-iceberg-1} Committing offsets asynchronously using sequence number 3065: {...} (org.apache.kafka.connect.runtime.WorkerSinkTask:353)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:09,069] INFO [my-service-sink-iceberg|task-2] WorkerSinkTask{id=my-service-sink-iceberg-2} Committing offsets asynchronously using sequence number 3089: {...} (org.apache.kafka.connect.runtime.WorkerSinkTask:353)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:25,952] INFO [my-service-sink-iceberg|task-0] Sending event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel:86)
[Worker-0e31a105f75e76cba] [2024-10-04 09:42:25,960] INFO [my-service-sink-iceberg|task-0] Started new commit with commit-id=9500724d-120f-499b-b0f6-87a535550ce1 (io.tabular.iceberg.connect.channel.Coordinator:103)