Open MikkelHJuul opened 4 years ago
I'd agree with this, message.max.bytes is listed in the Kafka documentation only as a broker property, not producer config. The producer property relating to limiting the request is max.request.size. (It's bad enough that the Kafka docs conflate the meanings of message/record with message batch and request sizes...)
Added now as #3246
Also, it seems to me that the producer checks the uncompressed message size before sending, while the broker checks the compressed message size.
Also, it seems to me that the producer checks the uncompressed message size before sending, while the broker checks the compressed message size.
You are right. The comprimation is applied on whole batch, so testing occurs on uncompromised message. When using compression it's hard to figure out the right setting for producer to what is set on broker.
The message should be less confusing.
Also confusing: the on_delivery
callback configured in the Producer
is never called, even though from the error it appears that the message was rejected by the broker.
It can be observed that the on_delivery
is called when message.max.bytes
in the Producer
config is set to some value greater than that which the broker allows (since the message is now sent by the producer and rejected by the broker).
Description
Poor error text when producing messages, fails on MSG_SIZE_TOO_LARGE. Description points toward broker, when it is a client/producer error. The apache-kafka documentation for the kafka producer does not even include the fixing configuration:
message.max.bytes
. (The fix in the code for this issue is to use configuration 'message.max.bytes': <int
greater than bytes length of message to send>)How to reproduce
In python. Using
confluent_kafka==1.5.0
.LOG:
Checklist
Please provide the following information:
<REPLACE with e.g., v0.10.5 or a git sha. NOT "latest" or "current">
<REPLACE with e.g., 0.10.2.3>
<REPLACE with e.g., message.timeout.ms=123, auto.reset.offset=earliest, ..>
<REPLACE with e.g., Centos 5 (x64)>
debug=..
as necessary) from librdkafka