I am coming from using s3 sink connector, there I could use kafka lag exporter to get the lag from my consumer group automatically created, then exported to prometheus using the exporter.
I am not sure I perfectly understand how it works with Iceberg sink connector, but from what I get it uses another topic to store offset, and then each tasks stores its offset in a different consumer group to then send it to a coordinator.
Why my control-iceberg topic is empty?
How can I keep using kafka log exporter to get my lags if I use Iceberg Sink? If I cannot, how do you guys monitor the delay of your data?
Can you guys help me out?
Sorry if it is a basic question... I also couldn't find any related existing thread on this topic.
Hi!
I am coming from using s3 sink connector, there I could use kafka lag exporter to get the lag from my consumer group automatically created, then exported to prometheus using the exporter. I am not sure I perfectly understand how it works with Iceberg sink connector, but from what I get it uses another topic to store offset, and then each tasks stores its offset in a different consumer group to then send it to a coordinator.
Why my control-iceberg topic is empty? How can I keep using kafka log exporter to get my lags if I use Iceberg Sink? If I cannot, how do you guys monitor the delay of your data? Can you guys help me out?
Sorry if it is a basic question... I also couldn't find any related existing thread on this topic.
Here's my connector configuration:
{ "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector", "tasks.max": "32", "timezone": "UTC", "topics": "MY_TOPIC", "iceberg.control.topic": "control-iceberg", "iceberg.control.group-id": "ingestion-iceberg-cg", "iceberg.tables": "iceberg.my_log", "iceberg.catalog.s3.endpoint": "http://my-endpoint.net", "iceberg.catalog.s3.path-style-access": "true", "iceberg.catalog.s3.access-key-id": "XXX", "iceberg.catalog.s3.secret-access-key": "YYY", "iceberg.catalog.s3.multipart.part-size-bytes": "25165824", "iceberg.catalog.client.region": "eu-west-1", "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO", "iceberg.catalog.warehouse": "s3a://my-storage-log-kfc-iceberg/MY_TOPIC/", "iceberg.catalog.catalog-impl": "org.apache.iceberg.jdbc.JdbcCatalog", "iceberg.catalog.uri": "jdbc:postgresql://my-kafka-connect-pg:5432/iceberg", "iceberg.catalog.jdbc.useSSL": "false", "iceberg.catalog.jdbc.user": "iceberg-pguser", "iceberg.catalog.jdbc.password": "ZZZZ", "iceberg.tables.auto-create-enabled": "true", "iceberg.tables.evolve-schema-enabled": "true", "iceberg.control.commit.interval-ms": 180000, "value.converter": "com.converter.ThriftAvroLogConverter", "key.converter": "org.apache.kafka.connect.converters.LongConverter", "value.converter.schemas.enable": "false" , "iceberg.kafka.request.timeout.ms": "480000", "iceberg.kafka.max.poll.records": "2000" }
Thanks for your help!