tabular-io / iceberg-kafka-connect

Apache License 2.0
177 stars 32 forks source link

Debezium CDC data committed to iceberg table but not readable in AWS Athena/Glue #172

Open dertodestod opened 7 months ago

dertodestod commented 7 months ago

Hello,

we are having some issues when running this sink connector in combination with AWS and Debezium CDC data. The issue is that we see no data when querying the table in AWS Glue.

After the reading the documentation, I was able to create a test table with 1 string column (using Debezium transform) and in S3 I can see that 1 data file and metadata file get generated/committed. However, for some reason it seems the snapshot is not properly processed so there is no snapshot in the iceberg table and the data is not possible to be queried.

This is the config I'm using:

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
topics=ingestion.schema.iceberg
tasks.max=1
iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.warehouse=s3://xxxxx-iceberg-warehouse
iceberg.catalog.client.region=eu-central-1
iceberg.table.default.iceberg.id-columns=col1
iceberg.tables.auto-create-enabled=true
iceberg.tables=default.iceberg
iceberg.tables.cdc-field=_cdc.op
transforms=debezium
transforms.debezium.type=io.tabular.iceberg.connect.transforms.DebeziumTransform

These are the logs if I restart the sink connector and the table gets (auto)-created:

[2023-12-19 16:50:11,423] INFO Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil)
[2023-12-19 16:50:11,561] INFO Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil)
[2023-12-19 16:50:11,657] INFO Table properties set at catalog level through catalog properties: {} (org.apache.iceberg.BaseMetastoreCatalog)
[2023-12-19 16:50:11,657] INFO Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil)
[2023-12-19 16:50:11,871] INFO Table properties enforced at catalog level through catalog properties: {} (org.apache.iceberg.BaseMetastoreCatalog)
[2023-12-19 16:50:12,268] INFO Successfully committed to table iceberg.default.iceberg in 397 ms (org.apache.iceberg.BaseMetastoreTableOperations)
[2023-12-19 16:50:12,385] INFO Refreshing table metadata from new version: s3://xxxxx-iceberg-warehouse/default.db/iceberg/metadata/00000-c16738b8-a4a5-4031-8ffb-a120c3f809f0.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations)
[2023-12-19 16:50:12,415] INFO Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool)
[2023-12-19 16:50:12,419] INFO Got brand-new compressor [.zstd] (org.apache.hadoop.io.compress.CodecPool)
[2023-12-19 16:50:13,290] INFO [Consumer clientId=b4316402-ee78-4023-a34e-90bb229dd9d4, groupId=cg-control-IcebergSinkConnector-coord] Finished assignment for group at generation 37: {b4316402-ee78-4023-a34e-90bb229dd9d4-1fcbd58f-7d82-4d6e-9b39-014969b18bc8=Assignment(partitions=[control-iceberg-0, control-iceberg-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2023-12-19 16:50:13,294] INFO [Consumer clientId=b4316402-ee78-4023-a34e-90bb229dd9d4, groupId=cg-control-IcebergSinkConnector-coord] Successfully joined group with generation 37 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2023-12-19 16:50:13,294] INFO [Consumer clientId=b4316402-ee78-4023-a34e-90bb229dd9d4, groupId=cg-control-IcebergSinkConnector-coord] Adding newly assigned partitions: control-iceberg-0, control-iceberg-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2023-12-19 16:50:13,297] INFO [Consumer clientId=b4316402-ee78-4023-a34e-90bb229dd9d4, groupId=cg-control-IcebergSinkConnector-coord] Setting offset for partition control-iceberg-1 to the committed offset FetchPosition{offset=43, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-1.xxxxx.xxxxx.xxxx.kafka.eu-central-1.amazonaws.com:9092 (id: 1 rack: euc1-az1), epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2023-12-19 16:50:13,297] INFO [Consumer clientId=b4316402-ee78-4023-a34e-90bb229dd9d4, groupId=cg-control-IcebergSinkConnector-coord] Setting offset for partition control-iceberg-0 to the committed offset FetchPosition{offset=497, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-3.xxxxx.xxxxx.xxxxx.kafka.eu-central-1.amazonaws.com:9092 (id: 3 rack: euc1-az3), epoch=0}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

Afterwards I see 1 data file and metadata and if I check that data file it actually contains my 'col1' with its proper values.

These are the logs after the table has been created (even if insert records in between):

[2023-12-19 16:55:11,351] INFO Sending event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel)
[2023-12-19 16:55:41,388] INFO Commit timeout reached (io.tabular.iceberg.connect.channel.CommitState)
[2023-12-19 16:55:41,392] INFO Sending event of type: COMMIT_COMPLETE (io.tabular.iceberg.connect.channel.Channel)
[2023-12-19 16:55:41,412] INFO Commit 65671f0c-4cac-4067-9cf3-23112fb36fed complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator)

If I recreate the table after I have inserted a record then I see the new record in the created .parquet data file but as said, there is no snapshot in the table in Glue/Athena so I cannot query for any data.

Here is the metadata after the table has been auto-created by the connector:

{
  "format-version": 2,
  "table-uuid": "39a0f2c1-9a31-4bdd-a443-17c8b2bccf93",
  "location": "s3://xxxxx-iceberg-warehouse/default.db/iceberg",
  "last-sequence-number": 0,
  "last-updated-ms": 1703004611871,
  "last-column-id": 7,
  "current-schema-id": 0,
  "schemas": [
    {
      "type": "struct",
      "schema-id": 0,
      "fields": [
        {
          "id": 1,
          "name": "col1",
          "required": false,
          "type": "string"
        },
        {
          "id": 2,
          "name": "_cdc",
          "required": true,
          "type": {
            "type": "struct",
            "fields": [
              {
                "id": 3,
                "name": "op",
                "required": true,
                "type": "string"
              },
              {
                "id": 4,
                "name": "ts",
                "required": true,
                "type": "timestamptz"
              },
              {
                "id": 5,
                "name": "offset",
                "required": false,
                "type": "long"
              },
              {
                "id": 6,
                "name": "source",
                "required": true,
                "type": "string"
              },
              {
                "id": 7,
                "name": "target",
                "required": true,
                "type": "string"
              }
            ]
          }
        }
      ]
    }
  ],
  "default-spec-id": 0,
  "partition-specs": [
    {
      "spec-id": 0,
      "fields": []
    }
  ],
  "last-partition-id": 999,
  "default-sort-order-id": 0,
  "sort-orders": [
    {
      "order-id": 0,
      "fields": []
    }
  ],
  "properties": {
    "write.parquet.compression-codec": "zstd"
  },
  "current-snapshot-id": -1,
  "refs": {},
  "snapshots": [],
  "statistics": [],
  "snapshot-log": [],
  "metadata-log": []
}

We are using some older confluent-platform (5.4.2) with Kafka 2.4.1 using Avro Schema registry, and Debezium 1.9.7 and latest sink (0.6.5).

Does anyone have an idea how to fix that? Please let me know if you need any further information from my side. Thanks a lot!

Best regards Thomas

junsik-gsitm commented 7 months ago

Are your files being stored in S3? I used your JSON data exactly, but I keep getting the same error... Is any special configuration required for AWS Glue?

dertodestod commented 7 months ago

Yes, we are using S3 and the data file can be found in the proper folder: s3://xxxxx-iceberg-warehouse/default.db/iceberg/data/

I'm quite sure Glue needs special configuration which is why I have set the following configuration:

iceberg.catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog
iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
iceberg.catalog.warehouse=s3://xxxxx-iceberg-warehouse
iceberg.catalog.client.region=eu-central-1

But of course, I could be missing something else. For example, I'm not specifying anything related to Avro but since I can see the values of my column just fine in the parquet file, I assume it's properly converting them with the default values.

Regarding the workflow, I can see that the data file in S3 is created seconds before the table is created (when using auto-create-enabled=true) or before the table is 'loaded by catalog' (when using auto-create-enabled=false). I cannot find matching log entries for this S3 file action because the timestamps in the logs only match the table create/load event. That leads me to believe that this is some extra load and the actual CDC load is either ignoring the rows or something is failing without showing a warning or error. This would also match that the commit every 5 minutes just times out no matter if I create a record or not. If I restart the connector in kafka-connect then it will create another data file with all records from the table in S3. Is that expected when the connector restarts or is it a sign of the offsets not working as expected?

By the way, is there an easy way to get more logging when running the sink connector in kafka connect? Would be great to see more information about what the sink and/or the iceberg library is doing underneath.

junsik-gsitm commented 7 months ago

"Whatever I do, I encounter this error and nothing happens.. The data is not even being stored in S3.. 😢 java.base/java.lang.Thread.run(Thread.java:829) Caused by: java.lang.IllegalArgumentException: Invalid table identifier: tripNotification

If you want to check the errors of the connect in detail, you can change the log4j settings of the connector to debug, and it will output detailed logs. However, be aware that this will generate a huge amount of logs, so it's best to use this setting only briefly.

Note that you need to restart the connect server after changing the log4j settings."

dertodestod commented 7 months ago

Hi @junsik-gsitm , thanks for the reply. Is there a way to increase the logging directly in kafka-connect without changing the code and building my own sink package? I'm not sure how your error is related to mine, but have you tried with "iceberg.tables": "default.tripNotification", or with some other database name in front of the table name?

Regarding my issue, I was able to get now also a delete file in my S3 data folder by putting the Avro parameters:

  "key.converter": "io.confluent.connect.avro.AvroConverter",
  "key.converter.schema.registry.url": "http://xxxxx-streaming-cp-schema-registry:8081",
  "key.converter.schemas.enable": "true",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "value.converter.schema.registry.url": "http://xxxxx-streaming-cp-schema-registry:8081",
  "value.converter.schemas.enable": "true",
  "schemas.enable": "true"

However, I still don't see any snapshots in my table but I also cannot see any logs that would indicate a problem. Does somebody have any ideas how to further troubleshoot this?

junsik-gsitm commented 7 months ago

@dertodestod

Locate the connect-log4j.properties File: Find the connect-log4j.properties file in your Kafka Connect installation directory.

Change Logging Levels: Open the file and modify the logging level for the components you're interested in. For example, you can set them to DEBUG or TRACE to get more detailed logs.

Restart Kafka Connect: After making changes, restart Kafka Connect to apply the new logging settings.


I also solved the problem. In my case, I first created a database in AWS Glue and connected it to the S3 bucket, which made it work properly.

But then, a different problem occurred. Problem 1) I tried to process multiple tables in "iceberg.tables". So, I also set the option "iceberg.tables.route-field":"_cdc.source". But the data of table_b was going into table_a...ㅠㅠ

So, I used the option "iceberg.tables.dynamic-enabled": "true", and did not use the "iceberg.tables" option.... Then, it only worked when I used the exact name of the source side database...

mattssll commented 6 months ago

@junsik-gsitm a) have you tried getting data from multiple topics and routing each topic to a different table in Iceberg? That would be my ideal scenario. I'm not sure if it's a good idea to send ALL tables to the same topic (at least I don't like this idea).

I passed the below config and the tables created all have the same schema which seems to be a combination of multiple topics, which indicates it didn't work as I expected.

topics: "airflow_from_dms.airflow.log,airflow_from_dms.airflow.dag_run,airflow_from_dms.airflow.job, airflow_from_dms.celery_taskmeta"
iceberg.tables: "data_lake.log,data_lake.dag_run,data_lake.job,data_lake.celery_taskmeta"

I was expecting that the connector would understand that the first topic is related to the first iceberg.table, but it's not the case. It mixes up everything and create all tables with same schema combining data from multiple topics.

I could do as you said and use "iceberg.tables.dynamic-enabled" and get the table name into a field, but then my table would be called for example "log" and would be missing the glue schema in front of it, which would not let the table be created, it would fail with the following log:

2024-01-10 08:42:36,751 WARN [multi-iceberg-sink-to-s3|task-0] Retrying task after failure: Invalid table identifier: job (org.apache.iceberg.util.Tasks) [task-thread-multi-iceberg-sink-to-s3-0]
java.lang.IllegalArgumentException: Invalid table identifier: job
  at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkArgument(Preconditions.java:218)
  at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.<init>(BaseMetastoreCatalog.java:146)
  at org.apache.iceberg.BaseMetastoreCatalog.buildTable(BaseMetastoreCatalog.java:94)
  at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:71)
  at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:93)
  at io.tabular.iceberg.connect.data.IcebergWriterFactory.lambda$autoCreateTable$1(IcebergWriterFactory.java:106)
  at org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413)
  at org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219)
  at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203)
  at org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196)
  at io.tabular.iceberg.connect.data.IcebergWriterFactory.autoCreateTable(IcebergWriterFactory.java:100)
  at io.tabular.iceberg.connect.data.IcebergWriterFactory.createWriter(IcebergWriterFactory.java:57)
  at io.tabular.iceberg.connect.channel.Worker.lambda$writerForTable$8(Worker.java:242)
  at java.base/java.util.HashMap.computeIfAbsent(HashMap.java:1220)
  at io.tabular.iceberg.connect.channel.Worker.writerForTable(Worker.java:241)
  at io.tabular.iceberg.connect.channel.Worker.routeRecordDynamically(Worker.java:227)
  at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:182)
  at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
  at io.tabular.iceberg.connect.channel.Worker.save(Worker.java:171)
  at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:150)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
  at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
  at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181)
  at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
  at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
  at java.base/java.lang.Thread.run(Thread.java:840)

To solve the above I used a Custom Transformer from Debezium that let me create a new field from the table name field and there I could append my glue catalog name. It's bad that this connector from tabular is taking table name as schema.table_name, there are different things and are coupled at the moment. So that allowed me to make everything work with dynamic fanout while reading data from multiple topics.

Below is the config that worked for me for a single table, I can read data in athena, but funny enough it takes around 5-15 minutes for the data to be accessible in athena, even though the data is already in s3, I'd like to understand the why of that...

class: io.tabular.iceberg.connect.IcebergSinkConnector
  tasksMax: 1
  config:
    iceberg.control.topic: "last-control-iceberg-topicc"
    iceberg.control.group-id: "iceberg-sink-to-s3-lastt"
    # GlueCatalog Configuration
    iceberg.catalog: "data_lake"
    iceberg.catalog.catalog-impl: "org.apache.iceberg.aws.glue.GlueCatalog"
    iceberg.catalog.warehouse: "s3a://xxxx/icewwwwwberg"
    iceberg.catalog.io-impl: "org.apache.iceberg.aws.s3.S3FileIO"
    iceberg.catalog.client.region": eu-west-1"
    # Schema Registry Configuration
    key.converter: "org.apache.kafka.connect.json.JsonConverter"
    # key.converter: "io.confluent.connect.avro.AvroConverter"
    # key.converter.schema.registry.url: "http://schema-registry-confluent.kafka.svc.cluster.local:8081"
    # key.converter.schemas.enable: "true"
    value.converter: "io.confluent.connect.avro.AvroConverter"
    value.converter.schema.registry.url: "http://schema-registry-confluent.kafka.svc.cluster.local:8081"
    value.converter.schemas.enable: "true"
    iceberg.control.commit.interval-ms": "5000"
    # CDC Configation
    iceberg.tables.cdc-field: "__op"
    # Iceberg Configuration
    iceberg.tables.upsert-mode-enabled: "true"
    iceberg.tables.auto-create-enabled: "true"
    iceberg.tables.evolve-schema-enabled: "true"
    iceberg.tables.schema-force-optional: "true"
    # Tables and Topics Configuration
    topics: "airflow_from_dms.airflow.log"
    iceberg.tables: "data_lake.log"
    iceberg.tables.default-id-columns: "id"

Looking forward to hearing from you guys, thanks! @dertodestod

dertodestod commented 5 months ago

Hi @bryanck , sorry for tagging you like that, but I'm a bit at a loss here.

Do you have an idea what might be the issue in my case? Or what I could try to get more information? To summarize, I can see data files in S3 but there are no committed snapshots in the iceberg tables no matter what configuration I've tried. I don't know why the other users have commented on this issue but I don't think their issues are related to mine.

Thanks a lot.

bryanck commented 5 months ago

Are you seeing any "Received event of type" log statements? I didn't see any above. That could indicate a permissions issue with the control topic.

dertodestod commented 5 months ago

@bryanck I see 1 Received event of type at the end when I start a connector from scratch which finds 2 records in the kafka topic. It auto-creates the table, puts a data file in S3 but it's not creating a snapshot:

[2024-02-01 17:22:24,959] INFO Kafka version: 5.4.2-ccs (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-01 17:22:24,959] INFO Kafka commitId: 2626d8cfb686c23e (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-01 17:22:24,959] INFO Kafka startTimeMs: 1706808144959 (org.apache.kafka.common.utils.AppInfoParser)
[2024-02-01 17:22:24,973] INFO [Consumer clientId=4ed32aba-2366-45e6-8879-93f7a2aed34e, groupId=cg-control-49e10334-2f6f-44c5-b1e2-425795cb3df7] Subscribed to topic(s): control-iceberg (org.apache.kafka.clients.consumer.KafkaConsumer)
[2024-02-01 17:22:24,978] INFO [Consumer clientId=4ed32aba-2366-45e6-8879-93f7a2aed34e, groupId=cg-control-49e10334-2f6f-44c5-b1e2-425795cb3df7] Cluster ID: -4YfE3SqRMGUHTFz8AnL5A (org.apache.kafka.clients.Metadata)
[2024-02-01 17:22:24,979] INFO [Consumer clientId=4ed32aba-2366-45e6-8879-93f7a2aed34e, groupId=cg-control-49e10334-2f6f-44c5-b1e2-425795cb3df7] Discovered group coordinator b-3.xxx.amazonaws.com:9092 (id: 2147483644 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2024-02-01 17:22:24,979] INFO [Consumer clientId=4ed32aba-2366-45e6-8879-93f7a2aed34e, groupId=cg-control-49e10334-2f6f-44c5-b1e2-425795cb3df7] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2024-02-01 17:22:24,981] INFO [Consumer clientId=4ed32aba-2366-45e6-8879-93f7a2aed34e, groupId=cg-control-49e10334-2f6f-44c5-b1e2-425795cb3df7] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2024-02-01 17:22:26,005] INFO ###### go into applyWithSchema (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:22:26,006] INFO ###### after payloadschema fields = [Field{name=col1, index=0, schema=Schema{STRING}}] (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:22:26,006] INFO ###### record = SinkRecord{kafkaOffset=3, timestampType=CreateTime} ConnectRecord{topic='ingestion.forecast.iceberg_key', kafkaPartition=1, key=Struct{col1=eighth}, keySchema=Schema{ingestion.forecast.iceberg_key.Key:STRUCT}, value=Struct{after=Struct{col1=eighth},source=Struct{version=1.9.7.Final,connector=postgresql,name=ingestion,ts_ms=1706807798286,snapshot=false,db=logistics,sequence=["3828229543152","3828230423576"],schema=forecast,table=iceberg_key,txId=230628588,lsn=3828230423576},op=c,ts_ms=1706807798631}, valueSchema=Schema{ingestion.forecast.iceberg_key.Envelope:STRUCT}, timestamp=1706807799038, headers=ConnectHeaders(headers=)} (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:22:26,006] INFO ###### record key = Struct{col1=eighth} (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:22:26,006] INFO ###### record value = Struct{after=Struct{col1=eighth},source=Struct{version=1.9.7.Final,connector=postgresql,name=ingestion,ts_ms=1706807798286,snapshot=false,db=logistics,sequence=["3828229543152","3828230423576"],schema=forecast,table=iceberg_key,txId=230628588,lsn=3828230423576},op=c,ts_ms=1706807798631} (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:22:26,007] INFO Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO (org.apache.iceberg.CatalogUtil)
[2024-02-01 17:22:26,136] INFO Refreshing table metadata from new version: s3a://xxx-iceberg-warehouse/iceberg.db/iceberg_key/metadata/00000-5cf1b5be-9b0d-4cdc-9868-f6ad52bbd19b.metadata.json (org.apache.iceberg.BaseMetastoreTableOperations)
[2024-02-01 17:22:26,220] INFO Table loaded by catalog: iceberg.iceberg.iceberg_key (org.apache.iceberg.BaseMetastoreCatalog)
[2024-02-01 17:22:26,226] INFO Got brand-new compressor [.gz] (org.apache.hadoop.io.compress.CodecPool)
[2024-02-01 17:22:26,232] INFO Got brand-new compressor [.gz] (org.apache.hadoop.io.compress.CodecPool)
[2024-02-01 17:22:26,234] INFO ###### go into applyWithSchema (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:22:26,234] INFO ###### after payloadschema fields = [Field{name=col1, index=0, schema=Schema{STRING}}] (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:22:26,234] INFO ###### record = SinkRecord{kafkaOffset=8, timestampType=CreateTime} ConnectRecord{topic='ingestion.forecast.iceberg_key', kafkaPartition=0, key=Struct{col1=seventh}, keySchema=Schema{ingestion.forecast.iceberg_key.Key:STRUCT}, value=Struct{after=Struct{col1=seventh},source=Struct{version=1.9.7.Final,connector=postgresql,name=ingestion,ts_ms=1706804194057,snapshot=false,db=logistics,sequence=["3828224159976","3828229543040"],schema=forecast,table=iceberg_key,txId=230623033,lsn=3828229543040},op=c,ts_ms=1706804194363}, valueSchema=Schema{ingestion.forecast.iceberg_key.Envelope:STRUCT}, timestamp=1706804194489, headers=ConnectHeaders(headers=)} (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:22:26,234] INFO ###### record key = Struct{col1=seventh} (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:22:26,234] INFO ###### record value = Struct{after=Struct{col1=seventh},source=Struct{version=1.9.7.Final,connector=postgresql,name=ingestion,ts_ms=1706804194057,snapshot=false,db=logistics,sequence=["3828224159976","3828229543040"],schema=forecast,table=iceberg_key,txId=230623033,lsn=3828229543040},op=c,ts_ms=1706804194363} (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:22:27,847] INFO [Consumer clientId=eab64b86-43e6-4480-8ae8-9c1da3604c68, groupId=cg-control-IcebergSinkConnectorKey-coord] Finished assignment for group at generation 5: {eab64b86-43e6-4480-8ae8-9c1da3604c68-75d2bb97-8743-42c5-9f21-ca697ad6d448=Assignment(partitions=[control-iceberg-0, control-iceberg-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-02-01 17:22:27,852] INFO [Consumer clientId=eab64b86-43e6-4480-8ae8-9c1da3604c68, groupId=cg-control-IcebergSinkConnectorKey-coord] Successfully joined group with generation 5 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2024-02-01 17:22:27,852] INFO [Consumer clientId=eab64b86-43e6-4480-8ae8-9c1da3604c68, groupId=cg-control-IcebergSinkConnectorKey-coord] Adding newly assigned partitions: control-iceberg-0, control-iceberg-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-02-01 17:22:27,854] INFO [Consumer clientId=eab64b86-43e6-4480-8ae8-9c1da3604c68, groupId=cg-control-IcebergSinkConnectorKey-coord] Found no committed offset for partition control-iceberg-0 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-02-01 17:22:27,854] INFO [Consumer clientId=eab64b86-43e6-4480-8ae8-9c1da3604c68, groupId=cg-control-IcebergSinkConnectorKey-coord] Setting offset for partition control-iceberg-1 to the committed offset FetchPosition{offset=683, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=b-1.xxx.amazonaws.com:9092 (id: 1 rack: euc1-az1), epoch=3}} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-02-01 17:22:27,855] INFO [Consumer clientId=eab64b86-43e6-4480-8ae8-9c1da3604c68, groupId=cg-control-IcebergSinkConnectorKey-coord] Resetting offset for partition control-iceberg-0 to offset 906. (org.apache.kafka.clients.consumer.internals.SubscriptionState)
[2024-02-01 17:22:27,858] INFO Received event of type: COMMIT_COMPLETE (io.tabular.iceberg.connect.channel.Channel)

I see this every 5 minutes but nothing happens:

[2024-02-01 17:27:25,945] INFO Sending event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel)
[2024-02-01 17:27:26,042] INFO Received event of type: COMMIT_REQUEST (io.tabular.iceberg.connect.channel.Channel)
[2024-02-01 17:27:56,053] INFO Commit timeout reached (io.tabular.iceberg.connect.channel.CommitState)
[2024-02-01 17:27:56,058] INFO Sending event of type: COMMIT_COMPLETE (io.tabular.iceberg.connect.channel.Channel)
[2024-02-01 17:27:56,072] INFO Commit 428c18fb-4d10-45a5-ba7b-821b8ee66654 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator)
[2024-02-01 17:27:56,076] INFO Received event of type: COMMIT_COMPLETE (io.tabular.iceberg.connect.channel.Channel)

When I add a record to kafka (via debezium) I only get this:

[2024-02-01 17:40:18,389] INFO 1 records sent during previous 00:23:39.395, last recorded offset: {transaction_id=null, lsn_proc=3828230760984, lsn_commit=3828230423688, lsn=3828230760984, txId=230630776, ts_usec=1706809217578964} (io.debezium.connector.common.BaseSourceTask)
[2024-02-01 17:40:18,399] INFO ###### go into applyWithSchema (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:40:18,399] INFO ###### after payloadschema fields = [Field{name=col1, index=0, schema=Schema{STRING}}] (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:40:18,399] INFO ###### record = SinkRecord{kafkaOffset=9, timestampType=CreateTime} ConnectRecord{topic='ingestion.forecast.iceberg_key', kafkaPartition=0, key=Struct{col1=ninth}, keySchema=Schema{ingestion.iceberg_key.Key:STRUCT}, value=Struct{after=Struct{col1=ninth},source=Struct{version=1.9.7.Final,connector=postgresql,name=ingestion,ts_ms=1706809217578,snapshot=false,db=logistics,sequence=["3828230423688","3828230760984"],schema=forecast,table=iceberg_key,txId=230630776,lsn=3828230760984},op=c,ts_ms=1706809217945}, valueSchema=Schema{ingestion.forecast.iceberg_key.Envelope:STRUCT}, timestamp=1706809218391, headers=ConnectHeaders(headers=)} (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:40:18,399] INFO ###### record key = Struct{col1=ninth} (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:40:18,399] INFO ###### record value = Struct{after=Struct{col1=ninth},source=Struct{version=1.9.7.Final,connector=postgresql,name=ingestion,ts_ms=1706809217578,snapshot=false,db=logistics,sequence=["3828230423688","3828230760984"],schema=forecast,table=iceberg_key,txId=230630776,lsn=3828230760984},op=c,ts_ms=1706809217945} (io.tabular.iceberg.connect.transforms.DebeziumTransform)
[2024-02-01 17:40:18,399] INFO [Consumer clientId=4ed32aba-2366-45e6-8879-93f7a2aed34e, groupId=cg-control-49e10334-2f6f-44c5-b1e2-425795cb3df7] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2024-02-01 17:40:21,729] INFO [Consumer clientId=4ed32aba-2366-45e6-8879-93f7a2aed34e, groupId=cg-control-49e10334-2f6f-44c5-b1e2-425795cb3df7] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)

I haven't done any special configuration for the control topic so it should use default settings.

How can I check your suspicion regarding the control topic? This is what I see in the control-iceberg topic in kafka. 2 Messages every 5 minutes, but not from the time of when I added a new record via debezium (timestamp in the image is in UTC+1): image

k0bayash1maru commented 4 months ago

@dertodestod , where you able to resolve this issue ? If yes, can you please give some pointers on how you fixed in ?

dertodestod commented 4 months ago

Hi @k0bayash1maru , I actually have not been able to fix this. But I also haven't invested much time into it in the last weeks. I will probably start from scratch with the latest version of the sink and some new environment on my side if I find some time again to work on this.

Are you seeing the same issue? If you make any progress or have any ideas please let me know because I'm still interested to use this.

dertodestod commented 3 months ago

So I had another go at this with an updated environment (MSK 3.51, Debezium 2.5.1 and confluent-platform 7.5.1) but I can still see the same problems. Sink was installed like this to my conlfuent-platform docker image:

RUN confluent-hub install --no-prompt tabular/iceberg-kafka-connect:0.6.13

The issue is as before: The connector runs without failing but there are no data files in S3, only a metadata file. If the connector is stopped, then the data file is created.

This happens during the issue:

  1. The transient control group throws a lot of messages related to UNKNOWN_MEMBER_ID and JOIN_GROUP
[2024-03-26 14:44:44,941] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Subscribed to topic(s): control-iceberg (org.apache.kafka.clients.consumer.KafkaConsumer)
[2024-03-26 14:44:44,945] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Cluster ID: -4YfE3SqRMGUHTFz8AnL5A (org.apache.kafka.clients.Metadata)
[2024-03-26 14:44:44,946] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Discovered group coordinator b-2.xxx-kafka-dev.xxx.c4.kafka.eu-central-1.amazonaws.com:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:44:44,946] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:44:44,949] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Request joining group due to: need to re-join with the given member-id: 86fc9ccf-2ab6-4458-a692-4c86da4939df-5abb0756-32d4-4497-b690-96a95693e97b (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:44:44,949] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:44:44,949] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:45:40,895] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Successfully joined group with generation Generation{generationId=1, memberId='86fc9ccf-2ab6-4458-a692-4c86da4939df-5abb0756-32d4-4497-b690-96a95693e97b', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:45:40,895] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Finished assignment for group at generation 1: {86fc9ccf-2ab6-4458-a692-4c86da4939df-5abb0756-32d4-4497-b690-96a95693e97b=Assignment(partitions=[control-iceberg-0, control-iceberg-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:45:40,997] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] SyncGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=1, memberId='86fc9ccf-2ab6-4458-a692-4c86da4939df-5abb0756-32d4-4497-b690-96a95693e97b', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:45:40,997] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Resetting generation and member id due to: encountered UNKNOWN_MEMBER_ID from SYNC_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:45:40,997] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Request joining group due to: encountered UNKNOWN_MEMBER_ID from SYNC_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:46:40,900] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Request joining group due to: rebalance failed due to 'The coordinator is not aware of this member.' (UnknownMemberIdException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:46:40,901] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:47:40,905] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Request joining group due to: need to re-join with the given member-id: 86fc9ccf-2ab6-4458-a692-4c86da4939df-5c58e311-68cc-47b8-a21d-2f2c699ec5c7 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:47:40,905] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:47:40,905] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:48:40,909] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] JoinGroup failed: The coordinator is not aware of this member. Need to re-join the group. Sent generation was Generation{generationId=-1, memberId='86fc9ccf-2ab6-4458-a692-4c86da4939df-5c58e311-68cc-47b8-a21d-2f2c699ec5c7', protocol='null'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:48:40,909] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Resetting generation and member id due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:48:40,909] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Request joining group due to: encountered UNKNOWN_MEMBER_ID from JOIN_GROUP response (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:48:40,909] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Request joining group due to: rebalance failed due to 'The coordinator is not aware of this member.' (UnknownMemberIdException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:48:40,909] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:49:01,436] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Request joining group due to: need to re-join with the given member-id: 86fc9ccf-2ab6-4458-a692-4c86da4939df-1d81f694-6e2a-4d03-b112-86f6a493637e (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:49:01,436] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Request joining group due to: rebalance failed due to 'The group member needs to have a valid member id before actually entering a consumer group.' (MemberIdRequiredException) (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:49:01,436] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] (Re-)joining group (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
  1. The transient consumer group is not listed as 'STABLE' and it is not consuming from the control topic (screenshots are from different tries) image image

  2. The control-topic only has even offset values in partition 0. There are no events/records values for partition 1 image

So seeing this, I was suspecting an issue with 1 of the consumer groups. Then suddenly, this member was able to join the group and the messages stopped in the logs and everything started to work:

[2024-03-26 14:49:40,910] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Successfully joined group with generation Generation{generationId=3, memberId='86fc9ccf-2ab6-4458-a692-4c86da4939df-1d81f694-6e2a-4d03-b112-86f6a493637e', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:49:40,910] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Finished assignment for group at generation 3: {86fc9ccf-2ab6-4458-a692-4c86da4939df-1d81f694-6e2a-4d03-b112-86f6a493637e=Assignment(partitions=[control-iceberg-0, control-iceberg-1])} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2024-03-26 14:49:41,010] INFO [Consumer clientId=86fc9ccf-2ab6-4458-a692-4c86da4939df, groupId=cg-control-13eae41d-dce1-4255-84f6-a7edc44c86a3] Successfully synced group in generation Generation{generationId=3, memberId='86fc9ccf-2ab6-4458-a692-4c86da4939df-1d81f694-6e2a-4d03-b112-86f6a493637e', protocol='range'} (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)

I've got data files in S3 and snapshots were also committed. Now during this state, the consumer group is showing as 'STABLE' and consuming. image image

Also, I could now see records/events with uneven offsets in partition 1 of the control topic: image

So for some reason, it started working for me but I don't know why it worked and I'm not able to make it work again. But from what I can see I'm quite sure it's somehow related to the transient consumer group and the errors related to the coordinator and group membership.

@bryanck Do you have an idea what could cause this? We are running mostly default values in kafka, so I'd be really happy about any pointers. Thanks again.