databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

RecordTooLargeException with multiple partitions. #285

Open justas200 opened 3 months ago

justas200 commented 3 months ago

Hello,

I've recently started using Iceberg Kafka Connect. I am sending data from Kafka to S3. The topic I am reading from is keeping information form 2 days and the size is approx. 22GB. It has 10 partitions.

Here is the kafka-connect config with sensitive information removed:

{
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "iceberg.tables.evolve-schema-enabled": "true",
    "iceberg.catalog.catalog-impl": "org.apache.iceberg.nessie.NessieCatalog",
    "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
    "tasks.max": "5",
    "iceberg.control.commit.interval-ms": "60000",
    "iceberg.tables.auto-create-enabled": "true",
    "consumer.override.auto.offset.reset": "earliest",
    "iceberg.catalog.s3.staging-dir": "/home/kafka/iceberg-staging",
    "iceberg.catalog.ref": "main",
    "value.converter.schemas.enable": "true",
}

The problem I am having is with a single partition (approx 500 different distinct values) the connector is working just fine. If I add another partition with approx 10 distinct values I am getting the error

org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601) ... 11 more Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

I have increased the following configs, however it does not have any influence:

  config:
    max.message.bytes: 20000000
    max.request.size: 200000000
    message.max.bytes: 200000000
    max.partition.fetch.bytes: 200000000

Does anyone have any tips what I should look at to solve this problem? Logs don't show anything more than the error itself. Node metrics are fine - CPU and Mem below threshold. Not sure what else to look at. How come the message size appears to grow exponentially?

yornstei commented 2 months ago

@justas200 we just had the same issue. we had to update these configs on the broker, connect and connector levels and then the issue was resolved.

ArkaSarkar19 commented 2 months ago

We are getting the same error and have tried setting the max.request.size, buffer.size and partition.fetch.bytes at the consumer / producer level. Is there any way this can be resolved without increasing the Kafka broker settings, can we set a hard limit at the connector level only ?

yornstei commented 2 months ago

@ArkaSarkar19 i don't think so; since this occurs due to very large control messages the connector is producing to the control topic, if the broker has a smaller limit it fails