databricks / iceberg-kafka-connect

Apache License 2.0
219 stars 49 forks source link

Overriding iceberg.connect.group-id prevents sink from working #179

Open drewtech opened 10 months ago

drewtech commented 10 months ago

Background

Our company kafka cluster requires specific consumer groups prefixes so the default ones need to be configurable.

The issue

When overriding iceberg.connect.group-id, the sink stops working and can no longer consume data. If you remove this configuration, the sink will work. This can be repro'd on docker with the following configuration:

{
    "tasks.max": "1",
    "topics": "postgres.public.students",
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "iceberg.catalog.s3.endpoint": "http://minio:9000",
    "iceberg.catalog.s3.secret-access-key": "password",
    "iceberg.catalog.s3.access-key-id": "admin",
    "iceberg.catalog.uri": "http://rest:8181",
    "iceberg.catalog.warehouse": "s3://warehouse/",
    "iceberg.catalog.client.region": "us-east-1",
    "iceberg.catalog.type": "rest",
    "iceberg.control.commit.interval-ms": "1000",
    "iceberg.tables": "public.students",
    "iceberg.tables.auto-create-enabled": true,
    "value.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "schemas.enable": false,
    "iceberg.connect.group-id": "iceberg-connect-group-1"
}

If I remove iceberg.connect.group-id and issue an update request, the commit loop starts working.

The code looks to have implemented an override for this, so I am unsure why it doesn't work - see https://github.com/tabular-io/iceberg-kafka-connect/blob/4c4b8f8408846926545ae453cfe4ffbdae7801cd/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java#L404

drewtech commented 10 months ago

Logs -

LogsWithGroupIdOverride.txt LogsWithoutGroupIdOverrideWorking.txt

Note there are no logs with ERROR so I'm not sure why it doesn't work.