tabular-io / iceberg-kafka-connect

Apache License 2.0
192 stars 41 forks source link

Errors in routing data to Iceberg with Amazon MSK Connect #103

Closed anuja-kelkar closed 10 months ago

anuja-kelkar commented 11 months ago

We are using iceberg-kafka-connect as a custom plugin with Amazon MSK Connect. We are publishing messages to a single Kafka topic in our Amazon MSK Cluster and using the Multi-table fan-out, dynamic routing to write to dynamic tables in Iceberg.

We are trying to deploy a connector with the following configuration:

{
   "name": "iceberg-sink-connector",
   "config": {
      "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
      "iceberg.tables.route-field": "iceberg_table",
      "iceberg.tables.cdc-field": "cdc_op",
      "iceberg.tables.id-columns": "row_id",
      "iceberg.control.group.id": "cg-control-sync-sandbox-iceberg-connector",
      "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
      "tasks.max": 5,
      "topics": "datasets",
      "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
      "key.converter.region": "us-east-1",
      "iceberg.tables.upsert-mode-enabled": false,
      "iceberg.catalog.client.region": "us-east-1",
      "heartbeat.interval.ms": 2000,
      "iceberg.tables.dynamic-enabled": true,
      "iceberg.catalog": "AwsDataCatalog",
      "value.converter.region": "us-east-1",
      "iceberg.tables.auto-create-enabled": true,
      "value.converter.schemas.enable": false,
      "iceberg.catalog.warehouse": "s3://data-os-sandbox",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "session.timeout.ms": 20000,
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
}

Here is a sample message published to the datasets topic.

{
  "field1": "value1",
  "field1": "value2",
  "field3": "value3",
  "field4": "value4",
  "field5": "value5",
  "field6": "value5",
  "cdc_op": "I",
  "iceberg_table": "db_name.table_name"
}

However, the connector is failing because of the following errors that we observe in the connector logs:

  1. org.apache.kafka.common.errors.UnknownMemberIdException: The coordinator is not aware of this member.
  2. org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
  3. Equality field ids shouldn't be null or empty when creating equality-delete writer (org.apache.kafka.connect.runtime.WorkerSinkTask:612)

Also, we do not see tables getting auto-created in our Iceberg database while setting iceberg.tables.auto-create-enabled to true.

We do see log messages in the connector logs indicating that the Iceberg table metadata was refreshed for specific table names mentioned in the JSON messages published to the Kafka topic. So we understand that the messages are being deserialized successfully.

Can you please advise how we could resolve these errors and route data successfully to Iceberg? Thanks.

bryanck commented 11 months ago

I’ll take a look at this today.

bryanck commented 11 months ago

One issue looks like the sample message you posted has no row_id field. That's required if you have CDC deltas enabled. Also, does the db_name database exist? The sink will not create databases.

anuja-kelkar commented 11 months ago

@bryanck Thanks for looking into this issue. Yes, we are sending in a row_id, that was missed in the sample message added in the issue description. Also, we are creating the database in advance of the messages being published to the topic.

Can you shed some light on the coordinator-related errors too?

bryanck commented 11 months ago

The coordinator errors are from Kakfa (the name "coordinator" in this case isn't the sink coordinator). If you could attach the Kafka Connect logs I can see if I can figure out what is going on. Also, there is an integration test for CDC if you want to see how that is set up.

jwalinrami commented 11 months ago

hi @bryanck , below are the errors/info mesages we are seeing now. We downloaded latest build (0.5.2) and rebuild the plugin and connector.

[Worker-06702737a5217a0c4] [2023-10-09 21:00:12,942] ERROR [iceberg-kafka-connect-sandbox-10-09-2023|task-1] [Consumer clientId=2793956a-be57-4a37-8bbc-9b5eae1210e6, groupId=cg-control-257ba208-482d-4e55-a115-8c5dc8111220] LeaveGroup request with Generation{generationId=-1, memberId='2793956a-be57-4a37-8bbc-9b5eae1210e6-39615fde-fa4c-4730-a917-fe6843ae31b7', protocol='null'} failed with error: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1076)

[Worker-06702737a5217a0c4] org.apache.kafka.common.errors.UnknownMemberIdException: The coordinator is not aware of this member.

INFO [iceberg-kafka-connect-sandbox-10-09-2023|task-0] [Consumer clientId=89a6e34a-a487-4fb5-864e-f2bfa9889bf0, groupId=cg-control-9eb72703-79d7-4bc6-85a4-305fcf6ee808] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:470)

However, we do see data landing in Iceberg as we are able to see 'metadata' and 'data' object on S3 location. As well as we are able to see data when we run SQL on Athena.

If you can please help us understanding/resolving these or have any suggestions to resolve it would be great.

anuja-kelkar commented 11 months ago

@bryanck We are looking into resolving the coordinator-related errors. What properties in the connector config do we set for heartbeat.interval.ms and session.timeout.ms kafka connect properties? So far, setting these properties and redeploying hasn't changed the default values. Please advise, thanks.

anuja-kelkar commented 11 months ago

@bryanck We also notice that we get an error INFO [iceberg-kafka-connect-sandbox|task-0] Commit 456e3e80-2949-4c00-a6b9-6e20049f11b6 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:170) . Can you help us reconcile why it would not be able to commit to the iceberg table mentioned in our reoute-field? Also what is the significance of vtts null?

bryanck commented 11 months ago

Thanks for the additional info, I'll try your config tonight on MSK and see if I run into the same issues.

isterin commented 11 months ago

@bryanck Thank you 🙏

bryanck commented 11 months ago

Still setting things up, hopefully will have something to report later today.

anuja-kelkar commented 11 months ago

@bryanck Does the iceberg-kafka-connect plugin support ingestion into EXTERNAL TABLE type in Iceberg?

bryanck commented 11 months ago

The sink uses the Iceberg API for writes and commits, so I don't think there should be an issue, if it works in other cases.

anuja-kelkar commented 11 months ago

@bryanck Here is our latest connector config:

{
   "name": "iceberg-sink-connector",
   "config": {
      "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
      "iceberg.tables.route-field": "iceberg_table",
      "iceberg.control.commit.threads": 2,
      "iceberg.control.commit.interval-ms": 120000,
      "iceberg.control.commit.timeout-ms": 60000,
      "iceberg.tables.cdc-field": "cdc_op",
      "iceberg.tables.id-columns": "row_id",
      "iceberg.control.group.id": "cg-control-iceberg-sync-kafka-connect-sandbox",
      "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
      "tasks.max": 1,
      "topics": "datasets",
      "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
      "key.converter.region": "us-east-1",
      "iceberg.catalog.client.region": "us-east-1",
      "iceberg.tables.dynamic-enabled": true,
      "iceberg.catalog": "AwsDataCatalog",
      "value.converter.region": "us-east-1",
      "value.converter.schemas.enable": false,
      "iceberg.catalog.warehouse": "s3://data-os-sandbox",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
}

We are seeing some strange behaviour in the files generated in Iceberg. We see the metadata.json and data parquet files being created for the messages published to Kafka topic. However, we see position delete file being created in the data folder amongst parquet files and the metadata.json file indicates position deletes for all the messages except for the last one in the batch. As an example, if we had 5 messages published and consumed, on querying table via Athena, we only saw the last record and the first 4 were deleted.

Also, interestingly, when we published 1 new message only, the position delete file was not created and the message was sinked to the Iceberg table successfully.

Any connector config property we are missing that would help resolve this?

bryanck commented 11 months ago

I finally got an MSK environment set up, and the connector is running fine for me, the table was autocreated and data was committed. My setup is MSK (Kafka 3.5.1) -> MSK Connect w/ sink 0.5.5 -> S3 with Iceberg REST catalog. The network and security settings can be a little bit tricky to get right so you may want to double check those.

The only other variable is using a Glue catalog, someone else reported that is working for them. I also used a simple example w/out CDC so you may want to start there, here is the config I used:

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
topics=bck-msk-test
header.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
iceberg.tables=bck.msk_test
iceberg.control.commit.interval-ms=120000
iceberg.tables.auto-create-enabled=true
iceberg.tables.evolve-schema-enabled=true
iceberg.catalog.type=rest
iceberg.catalog.uri=https://api.tabular.io/ws
iceberg.catalog.credential=xxxx
iceberg.catalog.warehouse=xxxx
bryanck commented 11 months ago

Another thing, make sure the control topic is created. I noticed that MSK may not auto create that even with auto create enabled. The default topic name is control-iceberg.

anuja-kelkar commented 11 months ago

@bryanck Thanks for looking into this. We had to create the control topic ourselves, yes, we did create it. We are using dynamic routing, so we aren't setting the iceberg.tables property. Instead, we are setting iceberg.tables.dynamic-enabled and iceberg.tables.route-field.

anuja-kelkar commented 11 months ago

@bryanck Our latest connect config has resolved our coordinator-related errors that we saw before.

bryanck commented 11 months ago

For dynamic routing, if you don’t have auto create turned on, then if the table isn’t found the record will be skipped, in case you want to double check that.

bryanck commented 11 months ago

Also, try a simple example like the one I provided to ensure basic functionality is working for you.

anuja-kelkar commented 11 months ago

@bryanck We are creating Iceberg database and table dynamically via glue SDK. So before the messages are published to the topic, the table is already created in Iceberg. Is it ok if we use dynamic routing without using the auto-create property? Also, when you say the record will be skipped, are you referring to position deletes?

bryanck commented 11 months ago

Yes you can create the table yourself and disable auto create and use dynamic routing. Skip meaning the record is ignored if it cannot be routed to a table.

anuja-kelkar commented 11 months ago

@bryanck Anything obvious to you about the position deletes issue I mentioned above?

anuja-kelkar commented 11 months ago

@bryanck We are wondering if the iceberg.control.commit.* properties tuning might help us resolve the issue we are seeing with not all the records being sinked into Iceberg tables. Is there are a recommended value for these or a guideline we should follow while tuning these properties?

bryanck commented 11 months ago

We've set defaults that are sensible for most cases. You should absolutely never lose data. Records will be skipped over only if the entire record value is null or if the sink is unable to route dynamically to a table because the table specified in the route field does not exist and auto create is off.

bryanck commented 11 months ago

We may want to add an option to fail with dynamic routing if the record can't be routed.

anuja-kelkar commented 10 months ago

@bryanck Do you have any guidance for the position deletes we see occuring in the sinked data in Iceberg?

bryanck commented 10 months ago

In the config you posted, you have CDC enabled. This will apply deltas to the table, which involves creating delete files. If you just want to append data then you should remove the CDC config.

anuja-kelkar commented 10 months ago

@bryanck Ok, that makes sense. We actually used the cdc_op to imply that we do not want to support upserts/deletes. To confirm, we should be able to do inserts only without using CDC config and setting cdc op to I?

bryanck commented 10 months ago

Yes, the CDC and upsert features are specifically for applying deltas (updates/deletes). In the default case, rows are appended/inserted. You don't need to set the CDC op field at all.

anuja-kelkar commented 10 months ago

Thanks @bryanck . We will test with the updated config. (after removing the cdc op field)

anuja-kelkar commented 10 months ago

@bryanck That resolved our issue of position deletes! Thanks!

anuja-kelkar commented 10 months ago

We have the connector deployed and running, sinking records into Iceberg tables. I think we can close this issue.

sidharrth2002 commented 6 months ago

Hi @anuja-kelkar , we are also trying to get the Iceberg sink working on MSK Connect, it would be amazing if you could also share your latest working connector config! Furthermore, I also wanted to ask about S3 authentication, and whether you're adding any access keys in the worker config or using IAM roles. We are currently facing issues with the Iceberg S3 writer, where it lacks the proper auth.

Thanks in advance!

cc: @weichunnn

k0bayash1maru commented 5 months ago

@anuja-kelkar , can you please share how you got past the vtts null error:

Commit 2d5189f7-1cc9-4891-a6d4-334d31534936 complete, committed to 0 table(s), vtts null [io.tabular.iceberg.connect.channel.Coordinator]

What changes did you have to make ?

sidharrth2002 commented 3 months ago

@anuja-kelkar I have the same questions as @k0bayash1maru about the vtts null issue