databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

RecordTooLargeException: When producing message to Kafka control topic #297

Open ArkaSarkar19 opened 1 month ago

ArkaSarkar19 commented 1 month ago

Hi Team

We are getting RecordTooLargeException when the connector tries to produce a message to the control topic.

re\nCaused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 7222162 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.\n"

Can you suggest some ways to reduce the message size that the connector produces to Kafka control topic. We have tried the following configuration inorder to reduce the message size but that doesn't seem to help :

{
  "iceberg.kafka.producer.override.max.request.size": 1048576,
  "iceberg.kafka.write.metadata.metrics.default": "none",
  "iceberg.kafka.write.metadata.metrics.max-inferred-column-defaults": "1",
  "iceberg.kafka.write.metadata.compression-codec": "gzip",
  "iceberg.catalog.table-override.write.metadata.previous-versions-max": 5,
  "iceberg.catalog.table-override.write.parquet.row-group-size-bytes": 1048576,
  "iceberg.kafka.producer.override.buffer.memory": 524288,
  "iceberg.kafka.producer.override.compression.type": "snappy"
}

This has been a huge blocker for us as we cannot increase these limits on the Kafka broker end. Can you please suggest a fix for the same.

QQ: Is there an enterprise version for this connector ?

Here is the connector config :


{
  "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
  "iceberg.catalog.table-override.write.data.path": "<REDACTED>",
  "iceberg.catalog.table-override.write.parquet.compression-codec": "snappy",
  "errors.log.include.messages": "true",
  "iceberg.tables.cdc-field": "operationType",
  "iceberg.catalog.s3.region": "us-east-1",
  "iceberg.catalog.client.region": "us-east-1",
  "iceberg.table.dummy_table_name.partition-by": "checkIn, checkOut",
  "iceberg.catalog.table-override.write.metadata.path": "<REDACTED>",
  "errors.log.enable": "true",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "consumer.override.bootstrap.servers": "<REDACTED>",
  "value.converter.schema.registry.url": "<REDACTED>",
  "iceberg.partition": "part1, part2",
  "name": "dummy_connector_name",
  "iceberg.tables.evolve-schema-enabled": "true",
  "iceberg.catalog.table-override.write.metadata.previous-versions-max": "5",
  "tasks.max": "9",
  "value.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
  "iceberg.catalog.s3.sse.key": "AES256",
  "iceberg.tables.upsert-mode-enabled": "true",
  "iceberg.tables.auto-create-enabled": "true",
  "iceberg.tables": "demo.dummy_table_name",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "iceberg.catalog.s3.sse.type": "s3",
  "iceberg.catalog.table-override.write.metadata.delete-after-commit.enabled": "true",
  "iceberg.table.dummy_table_name.id-columns": "id_column",
  "topics": "dummy_topic_name",
  "iceberg.catalog.io-impl": "org.apache.iceberg.aws.s3.S3FileIO",
  "iceberg.kafka.write.metadata.metrics.default": "none",
  "iceberg.control.commit.interval-ms": "900000",
  "iceberg.catalog.uri": "<REDACTED>",
  "key.converter.value.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicRecordNameStrategy",
  "iceberg.catalog": "spark_catalog",
  "iceberg.kafka.write.metadata.compression-codec": "gzip",
  "consumer.override.auto.offset.reset": "latest",
  "iceberg.catalog.warehouse": "<REDACTED>",
  "iceberg.control.topic": "control_dummy_topic_name",
  "iceberg.kafka.write.metadata.metrics.max-inferred-column-defaults": "1",
  "key.converter.schema.registry.url": "<REDACTED>",
  "iceberg.catalog.type": "hive",
  "iceberg.catalog.s3.path-style-access": "true"
}