tabular-io / iceberg-kafka-connect

Apache License 2.0
169 stars 31 forks source link

Not commiting to Iceberg when Kafka has small amount of records #247

Open farbodahm opened 1 month ago

farbodahm commented 1 month ago

Hey, We are running connector on AWS ECS using Glue as catalog and S3 for storage.

This is the task configuration we are using:

{
    "name": "test",
      "config": {
        "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
        "tasks.max": "1",
        "topics": "table_v1",
        "iceberg.tables": "src_erp_na_sink.table_v1",
        "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "iceberg.catalog.warehouse": "s3://bucket-v1/warehouse",
        "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
        "iceberg.catalog.client.region": "us-east-1",
        "iceberg.tables.default-id-columns": "pk",
        "iceberg.tables.auto-create-enabled": "true",
        "iceberg.tables.evolve-schema-enabled": "true",
        "iceberg.tables.schema-case-insensitive": "true",
        "iceberg.tables.cdc-field": "cdc_field"
      }
    }

When we are running the connector on a topic which has lots of records (+1000) everything works correctly; But when we have a few number of records in the topic (~50), connector creats the parquet files in S3, but it doesn't create any snaphot for it (to me it looks like that it's not commiting).

Here you can find the screen shots of the data and metadata directories.

Screenshot 2024-05-08 at 12 38 39 Screenshot 2024-05-08 at 12 38 34

And this is the only metadata created:

{
  "format-version" : 2,
  "table-uuid" : "c15addaa-8b31-4960-8810-5dd871f03040",
  "location" : "s3://bucket-v1/test_v1",
  "last-sequence-number" : 0,
  "last-updated-ms" : 1715086111752,
  "last-column-id" : 3,
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "name",
      "required" : true,
      "type" : "string"
    }, {
      "id" : 2,
      "name" : "last_name",
      "required" : true,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "cdc_field",
      "required" : true,
      "type" : "string"
    } ]
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ ]
  } ],
  "last-partition-id" : 999,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "write.parquet.compression-codec" : "zstd"
  },
  "current-snapshot-id" : -1,
  "refs" : { },
  "snapshots" : [ ],
  "statistics" : [ ],
  "snapshot-log" : [ ],
  "metadata-log" : [ ]
}

I think it should be something related to some kind of buffer related configuration, but I can't find that. Do you have any idea what can it be related to?

farbodahm commented 1 month ago

After creating a new control topic and changing the commit interval to 30 seconds, looks like the problem is resolved now and it's sinking small amount of records as expected.

{
    "name": "test",
      "config": {
        "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
        "tasks.max": "1",
        "topics": "table_v1",
        "iceberg.tables": "src_erp_na_sink.table_v1",
        "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
        "iceberg.catalog.warehouse": "s3://bucket-v1/warehouse",
        "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
        "iceberg.catalog.client.region": "us-east-1",
        "iceberg.tables.default-id-columns": "pk",
        "iceberg.tables.auto-create-enabled": "true",
        "iceberg.tables.evolve-schema-enabled": "true",
        "iceberg.tables.schema-case-insensitive": "true",
        "iceberg.tables.cdc-field": "cdc_field",
        "iceberg.control.topic": "iceberg-control-2",
        "iceberg.control.commit.interval-ms": "30000"
      }
 }

The previous control topic had around +7000 records in it, can it be the reason behind of this weird behavior? Data files were written as soon as we were producing test data, however it was not committing the changes; So I assume workers were working correctly and problem was in coordinator. Also in coordinator logs, we were seeing that it was logging something like this:

Commit b6741a33-5a53-4c01-ab22-c3d5d6498e8b complete, committed to 0 table(s), vtts null

So it couldn't find anything to commit, however workers were writing data files.