databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

Errors in routing data to Iceberg with Amazon MSK Connect #103

Closed anuja-kelkar closed 1 year ago

anuja-kelkar commented 1 year ago

We are using iceberg-kafka-connect as a custom plugin with Amazon MSK Connect. We are publishing messages to a single Kafka topic in our Amazon MSK Cluster and using the Multi-table fan-out, dynamic routing to write to dynamic tables in Iceberg.

We are trying to deploy a connector with the following configuration:

{
   "name": "iceberg-sink-connector",
   "config": {
      "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
      "iceberg.tables.route-field": "iceberg_table",
      "iceberg.tables.cdc-field": "cdc_op",
      "iceberg.tables.id-columns": "row_id",
      "iceberg.control.group.id": "cg-control-sync-sandbox-iceberg-connector",
      "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
      "tasks.max": 5,
      "topics": "datasets",
      "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
      "key.converter.region": "us-east-1",
      "iceberg.tables.upsert-mode-enabled": false,
      "iceberg.catalog.client.region": "us-east-1",
      "heartbeat.interval.ms": 2000,
      "iceberg.tables.dynamic-enabled": true,
      "iceberg.catalog": "AwsDataCatalog",
      "value.converter.region": "us-east-1",
      "iceberg.tables.auto-create-enabled": true,
      "value.converter.schemas.enable": false,
      "iceberg.catalog.warehouse": "s3://data-os-sandbox",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "session.timeout.ms": 20000,
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
}

Here is a sample message published to the datasets topic.

{
  "field1": "value1",
  "field1": "value2",
  "field3": "value3",
  "field4": "value4",
  "field5": "value5",
  "field6": "value5",
  "cdc_op": "I",
  "iceberg_table": "db_name.table_name"
}

However, the connector is failing because of the following errors that we observe in the connector logs:

  1. org.apache.kafka.common.errors.UnknownMemberIdException: The coordinator is not aware of this member.
  2. org.apache.kafka.common.errors.NotCoordinatorException: This is not the correct coordinator.
  3. Equality field ids shouldn't be null or empty when creating equality-delete writer (org.apache.kafka.connect.runtime.WorkerSinkTask:612)

Also, we do not see tables getting auto-created in our Iceberg database while setting iceberg.tables.auto-create-enabled to true.

We do see log messages in the connector logs indicating that the Iceberg table metadata was refreshed for specific table names mentioned in the JSON messages published to the Kafka topic. So we understand that the messages are being deserialized successfully.

Can you please advise how we could resolve these errors and route data successfully to Iceberg? Thanks.

bryanck commented 1 year ago

I’ll take a look at this today.

bryanck commented 1 year ago

One issue looks like the sample message you posted has no row_id field. That's required if you have CDC deltas enabled. Also, does the db_name database exist? The sink will not create databases.

anuja-kelkar commented 1 year ago

@bryanck Thanks for looking into this issue. Yes, we are sending in a row_id, that was missed in the sample message added in the issue description. Also, we are creating the database in advance of the messages being published to the topic.

Can you shed some light on the coordinator-related errors too?

bryanck commented 1 year ago

The coordinator errors are from Kakfa (the name "coordinator" in this case isn't the sink coordinator). If you could attach the Kafka Connect logs I can see if I can figure out what is going on. Also, there is an integration test for CDC if you want to see how that is set up.

jwalinrami commented 1 year ago

hi @bryanck , below are the errors/info mesages we are seeing now. We downloaded latest build (0.5.2) and rebuild the plugin and connector.

[Worker-06702737a5217a0c4] [2023-10-09 21:00:12,942] ERROR [iceberg-kafka-connect-sandbox-10-09-2023|task-1] [Consumer clientId=2793956a-be57-4a37-8bbc-9b5eae1210e6, groupId=cg-control-257ba208-482d-4e55-a115-8c5dc8111220] LeaveGroup request with Generation{generationId=-1, memberId='2793956a-be57-4a37-8bbc-9b5eae1210e6-39615fde-fa4c-4730-a917-fe6843ae31b7', protocol='null'} failed with error: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:1076)

[Worker-06702737a5217a0c4] org.apache.kafka.common.errors.UnknownMemberIdException: The coordinator is not aware of this member.

INFO [iceberg-kafka-connect-sandbox-10-09-2023|task-0] [Consumer clientId=89a6e34a-a487-4fb5-864e-f2bfa9889bf0, groupId=cg-control-9eb72703-79d7-4bc6-85a4-305fcf6ee808] Rebalance failed. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:470)

However, we do see data landing in Iceberg as we are able to see 'metadata' and 'data' object on S3 location. As well as we are able to see data when we run SQL on Athena.

If you can please help us understanding/resolving these or have any suggestions to resolve it would be great.

anuja-kelkar commented 1 year ago

@bryanck We are looking into resolving the coordinator-related errors. What properties in the connector config do we set for heartbeat.interval.ms and session.timeout.ms kafka connect properties? So far, setting these properties and redeploying hasn't changed the default values. Please advise, thanks.

anuja-kelkar commented 1 year ago

@bryanck We also notice that we get an error INFO [iceberg-kafka-connect-sandbox|task-0] Commit 456e3e80-2949-4c00-a6b9-6e20049f11b6 complete, committed to 0 table(s), vtts null (io.tabular.iceberg.connect.channel.Coordinator:170) . Can you help us reconcile why it would not be able to commit to the iceberg table mentioned in our reoute-field? Also what is the significance of vtts null?

bryanck commented 1 year ago

Thanks for the additional info, I'll try your config tonight on MSK and see if I run into the same issues.

isterin commented 1 year ago

@bryanck Thank you 🙏

bryanck commented 1 year ago

Still setting things up, hopefully will have something to report later today.

anuja-kelkar commented 1 year ago

@bryanck Does the iceberg-kafka-connect plugin support ingestion into EXTERNAL TABLE type in Iceberg?

bryanck commented 1 year ago

The sink uses the Iceberg API for writes and commits, so I don't think there should be an issue, if it works in other cases.

anuja-kelkar commented 1 year ago

@bryanck Here is our latest connector config:

{
   "name": "iceberg-sink-connector",
   "config": {
      "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
      "iceberg.tables.route-field": "iceberg_table",
      "iceberg.control.commit.threads": 2,
      "iceberg.control.commit.interval-ms": 120000,
      "iceberg.control.commit.timeout-ms": 60000,
      "iceberg.tables.cdc-field": "cdc_op",
      "iceberg.tables.id-columns": "row_id",
      "iceberg.control.group.id": "cg-control-iceberg-sync-kafka-connect-sandbox",
      "iceberg.catalog.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
      "tasks.max": 1,
      "topics": "datasets",
      "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
      "key.converter.region": "us-east-1",
      "iceberg.catalog.client.region": "us-east-1",
      "iceberg.tables.dynamic-enabled": true,
      "iceberg.catalog": "AwsDataCatalog",
      "value.converter.region": "us-east-1",
      "value.converter.schemas.enable": false,
      "iceberg.catalog.warehouse": "s3://data-os-sandbox",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "key.converter": "org.apache.kafka.connect.storage.StringConverter"
   }
}

We are seeing some strange behaviour in the files generated in Iceberg. We see the metadata.json and data parquet files being created for the messages published to Kafka topic. However, we see position delete file being created in the data folder amongst parquet files and the metadata.json file indicates position deletes for all the messages except for the last one in the batch. As an example, if we had 5 messages published and consumed, on querying table via Athena, we only saw the last record and the first 4 were deleted.

Also, interestingly, when we published 1 new message only, the position delete file was not created and the message was sinked to the Iceberg table successfully.

Any connector config property we are missing that would help resolve this?

bryanck commented 1 year ago

I finally got an MSK environment set up, and the connector is running fine for me, the table was autocreated and data was committed. My setup is MSK (Kafka 3.5.1) -> MSK Connect w/ sink 0.5.5 -> S3 with Iceberg REST catalog. The network and security settings can be a little bit tricky to get right so you may want to double check those.

The only other variable is using a Glue catalog, someone else reported that is working for them. I also used a simple example w/out CDC so you may want to start there, here is the config I used:

connector.class=io.tabular.iceberg.connect.IcebergSinkConnector
tasks.max=1
topics=bck-msk-test
header.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
iceberg.tables=bck.msk_test
iceberg.control.commit.interval-ms=120000
iceberg.tables.auto-create-enabled=true
iceberg.tables.evolve-schema-enabled=true
iceberg.catalog.type=rest
iceberg.catalog.uri=https://api.tabular.io/ws
iceberg.catalog.credential=xxxx
iceberg.catalog.warehouse=xxxx
bryanck commented 1 year ago

Another thing, make sure the control topic is created. I noticed that MSK may not auto create that even with auto create enabled. The default topic name is control-iceberg.

anuja-kelkar commented 1 year ago

@bryanck Thanks for looking into this. We had to create the control topic ourselves, yes, we did create it. We are using dynamic routing, so we aren't setting the iceberg.tables property. Instead, we are setting iceberg.tables.dynamic-enabled and iceberg.tables.route-field.

anuja-kelkar commented 1 year ago

@bryanck Our latest connect config has resolved our coordinator-related errors that we saw before.

bryanck commented 1 year ago

For dynamic routing, if you don’t have auto create turned on, then if the table isn’t found the record will be skipped, in case you want to double check that.

bryanck commented 1 year ago

Also, try a simple example like the one I provided to ensure basic functionality is working for you.

anuja-kelkar commented 1 year ago

@bryanck We are creating Iceberg database and table dynamically via glue SDK. So before the messages are published to the topic, the table is already created in Iceberg. Is it ok if we use dynamic routing without using the auto-create property? Also, when you say the record will be skipped, are you referring to position deletes?

bryanck commented 1 year ago

Yes you can create the table yourself and disable auto create and use dynamic routing. Skip meaning the record is ignored if it cannot be routed to a table.

anuja-kelkar commented 1 year ago

@bryanck Anything obvious to you about the position deletes issue I mentioned above?

anuja-kelkar commented 1 year ago

@bryanck We are wondering if the iceberg.control.commit.* properties tuning might help us resolve the issue we are seeing with not all the records being sinked into Iceberg tables. Is there are a recommended value for these or a guideline we should follow while tuning these properties?

bryanck commented 1 year ago

We've set defaults that are sensible for most cases. You should absolutely never lose data. Records will be skipped over only if the entire record value is null or if the sink is unable to route dynamically to a table because the table specified in the route field does not exist and auto create is off.

bryanck commented 1 year ago

We may want to add an option to fail with dynamic routing if the record can't be routed.

anuja-kelkar commented 1 year ago

@bryanck Do you have any guidance for the position deletes we see occuring in the sinked data in Iceberg?

bryanck commented 1 year ago

In the config you posted, you have CDC enabled. This will apply deltas to the table, which involves creating delete files. If you just want to append data then you should remove the CDC config.

anuja-kelkar commented 1 year ago

@bryanck Ok, that makes sense. We actually used the cdc_op to imply that we do not want to support upserts/deletes. To confirm, we should be able to do inserts only without using CDC config and setting cdc op to I?

bryanck commented 1 year ago

Yes, the CDC and upsert features are specifically for applying deltas (updates/deletes). In the default case, rows are appended/inserted. You don't need to set the CDC op field at all.

anuja-kelkar commented 1 year ago

Thanks @bryanck . We will test with the updated config. (after removing the cdc op field)

anuja-kelkar commented 1 year ago

@bryanck That resolved our issue of position deletes! Thanks!

anuja-kelkar commented 1 year ago

We have the connector deployed and running, sinking records into Iceberg tables. I think we can close this issue.

sidharrth2002 commented 9 months ago

Hi @anuja-kelkar , we are also trying to get the Iceberg sink working on MSK Connect, it would be amazing if you could also share your latest working connector config! Furthermore, I also wanted to ask about S3 authentication, and whether you're adding any access keys in the worker config or using IAM roles. We are currently facing issues with the Iceberg S3 writer, where it lacks the proper auth.

Thanks in advance!

cc: @weichunnn

k0bayash1maru commented 8 months ago

@anuja-kelkar , can you please share how you got past the vtts null error:

Commit 2d5189f7-1cc9-4891-a6d4-334d31534936 complete, committed to 0 table(s), vtts null [io.tabular.iceberg.connect.channel.Coordinator]

What changes did you have to make ?

sidharrth2002 commented 5 months ago

@anuja-kelkar I have the same questions as @k0bayash1maru about the vtts null issue