aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.09k stars 224 forks source link

I keep getting MessageSizeTooLargeError, error message gives size much bigger than actual message were given to producer. #966

Open ant0nk opened 5 months ago

ant0nk commented 5 months ago

I'm using avro serialization and nevertheless I'm receiving errors like "The message is 1699136 bytes when serialized which is larger than the maximum request size you have configured with the max_request_size configuration" though original message size was around 800kb and after serialization it also must be less than 1mb (max_request_size by default). Does producer tries to combine several messages in a batch and exceeds max_request_size?

vmaurin commented 5 months ago

How do you serialize with avro ? Avro is a format where a schema is required for the producer to serialize and the consumer to deserialize, otherwise is just gibberish bytes. Usually the strategy with kafka is to store schema in some sort of central registery, then put in kafka header the reference to the schema used to produce the message. It is what confluent is doing with their schema registry.

As far as I know aiokafka doesn't provide anything avro related, so the serializer/deserializer must be from your own implementation. Depending on what you are doing, it might be that your serialized message is containing both the schema and the data

ant0nk commented 5 months ago

@vmaurin I'm using kafkit library for serialization and communication with schema registry.

vmaurin commented 5 months ago

Maybe try to dump the message you serialized before passing it to aiokafka ? Otherwise, as far as I can see, the size is checked per message https://github.com/aio-libs/aiokafka/blob/master/aiokafka/producer/producer.py#L411 (even if then messages might be batched) The formula seems to be : overhead + len(key) + len(value) Headers seems ignored

ant0nk commented 5 months ago

@vmaurin Can I specify max_request_size for producer bigger than broker's relevant value if I have compression enabled?

vmaurin commented 5 months ago

You mean max.message.bytes on broker/topic ? It might be then it seems to be applied after compression, but then it is also applied to a batch of message, while the check in aiokafka is just for a single message

ant0nk commented 5 months ago

@vmaurin yes, but I use send_and_wait() to send immediately, so I hope batch will not exceed max.message.bytes too.

Symas1 commented 1 month ago

@ant0nk @vmaurin did you figure this out? Having similar issue:

Symas1 commented 1 month ago

Got around the problem by disabling aiokafka message size validation.

aiokafka validates message size before compression - rejecting otherwise valid messages. Setting max_request_size to huge value disables aiokafka validation. Validation is still performed by kafka. So, if compressed message is too big kafka.errors.MessageSizeTooLargeError is raised.

ods commented 1 month ago

@Symas1 Your approach may not work if you send messages quickly enough, as aiokafka combines multiple messages into batches and raising this setting may lead to huge requests being rejected by broker.

[…] This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.