tabular-io / iceberg-kafka-connect

Apache License 2.0
171 stars 31 forks source link

Overriding iceberg.connect.group-id prevents sink from working #179

Open drewtech opened 6 months ago

drewtech commented 6 months ago

Background

Our company kafka cluster requires specific consumer groups prefixes so the default ones need to be configurable.

The issue

When overriding iceberg.connect.group-id, the sink stops working and can no longer consume data. If you remove this configuration, the sink will work. This can be repro'd on docker with the following configuration:

{
    "tasks.max": "1",
    "topics": "postgres.public.students",
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "iceberg.catalog.s3.endpoint": "http://minio:9000",
    "iceberg.catalog.s3.secret-access-key": "password",
    "iceberg.catalog.s3.access-key-id": "admin",
    "iceberg.catalog.uri": "http://rest:8181",
    "iceberg.catalog.warehouse": "s3://warehouse/",
    "iceberg.catalog.client.region": "us-east-1",
    "iceberg.catalog.type": "rest",
    "iceberg.control.commit.interval-ms": "1000",
    "iceberg.tables": "public.students",
    "iceberg.tables.auto-create-enabled": true,
    "value.converter.schemas.enable": "false",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "schemas.enable": false,
    "iceberg.connect.group-id": "iceberg-connect-group-1"
}

If I remove iceberg.connect.group-id and issue an update request, the commit loop starts working.

The code looks to have implemented an override for this, so I am unsure why it doesn't work - see https://github.com/tabular-io/iceberg-kafka-connect/blob/4c4b8f8408846926545ae453cfe4ffbdae7801cd/kafka-connect/src/main/java/io/tabular/iceberg/connect/IcebergSinkConfig.java#L404

drewtech commented 6 months ago

Logs -

LogsWithGroupIdOverride.txt LogsWithoutGroupIdOverrideWorking.txt

Note there are no logs with ERROR so I'm not sure why it doesn't work.