tabular-io / iceberg-kafka-connect

Apache License 2.0
177 stars 32 forks source link

How can i configure max.request.size option on producer(to control-topic) for RecordTooLargeException #149

Closed okayhooni closed 8 months ago

okayhooni commented 8 months ago

I got the error below on Iceberg sink connector..!

It's a producer side error related to max.request.size option. so I guess it is related to produce meta record to the control topic by this sink connector.

How can I configure max.request.size option for the producer of this sink connector..? (somewhat weird..)

2023-11-06 09:50:02,726 ERROR [json-to-iceberg.weblog-common.stage.v19.recursive-snake-flatten-str-convert-tz-arr-impressions-ingest-ts-hourly-8bucket.json|task-21] WorkerSinkTask{id=json-to-iceberg.weblog-common.stage.v19.recursive-snake-flatten-str-convert-tz-arr-impressions-ingest-ts-hourly-8bucket.json-21} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot execute transactional method because we are in an error state (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-json-to-iceberg.weblog-common.stage.v19.recursive-snake-flatten-str-convert-tz-arr-impressions-ingest-ts-hourly-8bucket.json-21]
org.apache.kafka.common.KafkaException: Cannot execute transactional method because we are in an error state
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:1010)
    at org.apache.kafka.clients.producer.internals.TransactionManager.maybeAddPartition(TransactionManager.java:328)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:1061)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:962)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:847)
    at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
    at io.tabular.iceberg.connect.channel.Channel.send(Channel.java:96)
    at io.tabular.iceberg.connect.channel.Worker.receive(Worker.java:158)
    at io.tabular.iceberg.connect.channel.Channel.lambda$consumeAvailable$2(Channel.java:129)
    at java.base/java.lang.Iterable.forEach(Iterable.java:75)
    at io.tabular.iceberg.connect.channel.Channel.consumeAvailable(Channel.java:119)
    at io.tabular.iceberg.connect.channel.Worker.process(Worker.java:101)
    at io.tabular.iceberg.connect.IcebergSinkTask.processControlEvents(IcebergSinkTask.java:165)
    at io.tabular.iceberg.connect.IcebergSinkTask.put(IcebergSinkTask.java:152)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1446709 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

Could it be overridden with producer.override.max.request.size option, as in usual producer?

okayhooni commented 8 months ago

I tested producer.override.max.request.size option on this iceberg sink connector, but this is NOT working..!

producer.override.max.request.size: 2097152 # default: 1048576

@bryanck

bryanck commented 8 months ago

Try iceberg.kafka.max.request.size. The size of the message is somewhat concerning, are you creating many files per table per commit?

okayhooni commented 8 months ago

Try iceberg.kafka.max.request.size. The size of the message is somewhat concerning, are you creating many files per table per commit?

Thanks I will try that option!

That connector was stopped for nearly 5 hours due to a type mismatch issue. So I renamed auto-created field and recreated new field with proper type, then it started consuming again from the record offset five hours ago.

The target table is hourly partitioned table. and each hourly partition has about 30 sub-partitions. so it might try to ingest almost 150 x file(=5 hours 30 sub-partitions * x) per table per commit. (x = the s3 uploaded files count per each sub-partition path, between commit interval[=5 minutes])

bryanck commented 8 months ago

How many columns does your table have, including those nested in structs? You may want to limit the columns storing stats in the metadata to decrease the size, if you have a lot of columns.

okayhooni commented 8 months ago

How many columns does your table have, including those nested in structs? You may want to limit the columns storing stats in the metadata to decrease the size, if you have a lot of columns.

Sure... That table has currently flattened 544 columns (and it will be increased with new 3~5 columns/day..) and I set 'write.metadata.metrics.max-inferred-column-defaults' = 700..

If I decrease 'write.metadata.metrics.max-inferred-column-defaults' to small values like 100, this issue can be avoided?

bryanck commented 8 months ago

You could decrease write.metadata.metrics.max-inferred-column-defaults from the default of 100, another option is set write.metadata.metrics.default to none and enable metrics only on the columns you will be filtering on with write.metadata.metrics.column.*.

okayhooni commented 8 months ago

You could decrease write.metadata.metrics.max-inferred-column-defaults from the default of 100, another option is set write.metadata.metrics.default to none and enable metrics only on the columns you will be filtering on with write.metadata.metrics.column.*.

Thanks! I will do that.

But I don't understand why this big metadata size is related to the size of record produced to control-topic by this sink connector..

Does control-topic record also has the field related to this iceberg field metadata?

bryanck commented 8 months ago

Yes, the control message includes the file path as well as metadata related to the file, such as column stats. These stats are captured during the write process.

okayhooni commented 8 months ago

Yes, the control message includes the file path as well as metadata related to the file, such as column metrics. These metrics are captured during the write process.

Aha.. Could you explain why those column metadata also have to be ingested to control topic..?

Isn't it necessary to ensure exactly-once semantic by control topic?

bryanck commented 8 months ago

The column stats are written to manifest files as part of the Iceberg commit. Iceberg uses these stats to prune the file scan list during query planning. Because the worker captures the stats during the write process, these need to be passed to the coordinator in the control message.

okayhooni commented 8 months ago

Thanks for kind explanation..!

Because the worker captures the stats during the write process, these need to be passed to the coordinator in the control message.

In this procedure above, is there no method to filter out unnecessary fields before ingesting to control-topic..?

bryanck commented 8 months ago

You set those write properties to filter out the columns whose stats you don't care about. Then those will not be captured and not included in the control message.

okayhooni commented 8 months ago

Sure..!

I guess all the metadata related to column statistics is not necessary to be in control topic record.. I am wrong....?

bryanck commented 8 months ago

Only the metadata configured to be captured is put in the control message, this is why I am recommending you set those write properties as that will reduce the size of the control message.

okayhooni commented 8 months ago

Sure..! (but.. I think.. control topic can be ingested by this connector own logic with only needed field for itself, even though there are lots of unnecessary big metadata on original message to sink)

Those full metadata is needed to ingest to control topic, because is it necessary to commit by coordinator(=consuming all the records on the control topic, and load those into Iceberg table with single commit)? Is it right..?

Then.. I eventually understand..! thanks..!

bryanck commented 8 months ago

Only the column stats that are configured to be saved are captured and put in the control message. Stats for other columns are not captured and are not in the control message. Thus if you reduce the number of columns with stats, you will reduce the size of the control message.

bryanck commented 8 months ago

By the way, this will also improve query planning performance, as the Iceberg metadata will be smaller as well, so it is a good idea to do this for that reason also.

okayhooni commented 8 months ago

By the way, this will also improve query planning performance, as the Iceberg metadata will be smaller as well, so it is a good idea to do this for that reason also.

Thanks for advice..!!

okayhooni commented 8 months ago

@bryanck

Could you give me some additional advice to select proper statistics option between counts, truncate, or full for each column? (what full means..)

On the official docs for Iceberg configuration, I didn't find any detailed explanation for each option.

bryanck commented 8 months ago

counts will only store count stats for a column such as null counts. full will also store the lower/upper bounds for the column, each bounds being the full column value. truncate likewise will store the lower/upper bounds but only the first n bytes of the value for the bounds.

bryanck commented 8 months ago

You may want to open a ticket in the Iceberg repo so that can be better documented.

bryanck commented 8 months ago

In terms of advice, if a column value in a data file will contain a wide range of values, or if the column is not used in filters, then having the boundaries probably isn't going to be useful as it won't help prune the file scan list. For truncate, try to pick a number of bytes that as small as possible but still distinct enough to be useful for filtering.

okayhooni commented 8 months ago

Thanks for detailed answer!! :)