Open Sam-sad-Sajid opened 1 month ago
I found this issue in librdkafka https://github.com/confluentinc/librdkafka/issues/1032
It seems that if I have different producer configurations for different topics using one producer client for multiple topics does not set the different values.
Example: Sample producer configurations for three topics:
"topic_a": {
"acks": "all",
"linger.ms": "10",
"request.timeout.ms": "5000",
"delivery.timeout.ms": "10000"
},
"topic_b": {
"acks": "1",
"linger.ms": "10",
"request.timeout.ms": "6000",
"delivery.timeout.ms": "11000"
},
"topic_c": {
"acks": "all",
"linger.ms": "10",
"request.timeout.ms": "7000",
"delivery.timeout.ms": "12000"
}
I set the debug: all
in the producer configurations.
Output:
%7|1727911922.230|TOPIC|test-app#producer-1| [thrd:app]: New local topic: topic_a
%7|1727911922.230|TOPPARNEW|test-app#producer-1| [thrd:app]: NEW topic_a [-1] 0x12d00aa00 refcnt 0x12d00aa90 (at rd_kafka_topic_new0:488)
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: Topic "topic_a" configuration (default_topic_conf):
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: request.required.acks = -1
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: request.timeout.ms = 5000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: message.timeout.ms = 10000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: partitioner = murmur2_random
%7|1727911922.230|CONNECT|test-app#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 46ms: leader query
%7|1727911922.230|METADATA|test-app#producer-1| [thrd:app]: Hinted cache of 1/1 topic(s) being queried
%7|1727911922.230|METADATA|test-app#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): leader query: no usable brokers
%7|1727911922.230|TOPIC|test-app#producer-1| [thrd:app]: New local topic: topic_b
%7|1727911922.230|TOPPARNEW|test-app#producer-1| [thrd:app]: NEW topic_b [-1] 0x12c010800 refcnt 0x12c010890 (at rd_kafka_topic_new0:488)
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: Topic "topic_b" configuration (default_topic_conf):
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: request.required.acks = -1
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: request.timeout.ms = 5000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: message.timeout.ms = 10000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: partitioner = murmur2_random
%7|1727911922.230|CONNECT|test-app#producer-1| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 46ms: leader query
%7|1727911922.230|METADATA|test-app#producer-1| [thrd:app]: Hinted cache of 1/1 topic(s) being queried
%7|1727911922.230|METADATA|test-app#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): leader query: no usable brokers
%7|1727911922.230|TOPIC|test-app#producer-1| [thrd:app]: New local topic: topic_c
%7|1727911922.230|TOPPARNEW|test-app#producer-1| [thrd:app]: NEW topic_c [-1] 0x12c012200 refcnt 0x12c012290 (at rd_kafka_topic_new0:488)
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: Topic "topic_c" configuration (default_topic_conf):
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: request.required.acks = -1
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: request.timeout.ms = 5000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: message.timeout.ms = 10000
%7|1727911922.230|CONF|test-app#producer-1| [thrd:app]: partitioner = murmur2_random
However, if I use multiple producers for multiple topics, per-topic configurations are respected. Output for the same configurations from above:
%7|1727911512.054|TOPIC|test-app#producer-1| [thrd:app]: New local topic: topic_b
%7|1727911512.054|TOPIC|test-app#producer-3| [thrd:app]: New local topic: topic_a
%7|1727911512.054|TOPIC|test-app#producer-2| [thrd:app]: New local topic: topic_c
%7|1727911512.054|TOPPARNEW|test-app#producer-2| [thrd:app]: NEW topic_c [-1] 0x14e81f800 refcnt 0x14e81f890 (at rd_kafka_topic_new0:488)
%7|1727911512.054|CONF|test-app#producer-2| [thrd:app]: Topic "topic_c" configuration (default_topic_conf):
%7|1727911512.054|CONF|test-app#producer-2| [thrd:app]: request.required.acks = -1
%7|1727911512.054|CONF|test-app#producer-2| [thrd:app]: request.timeout.ms = 7000
%7|1727911512.054|CONF|test-app#producer-2| [thrd:app]: message.timeout.ms = 12000
%7|1727911512.054|CONF|test-app#producer-2| [thrd:app]: partitioner = murmur2_random
%7|1727911512.054|CONNECT|test-app#producer-2| [thrd:app]: Not selecting any broker for cluster connection: still suppressed for 32ms: leader query
%7|1727911512.054|METADATA|test-app#producer-2| [thrd:app]: Hinted cache of 1/1 topic(s) being queried
%7|1727911512.054|METADATA|test-app#producer-2| [thrd:app]: Skipping metadata refresh of 1 topic(s): leader query: no usable brokers
%7|1727911512.054|TOPPARNEW|test-app#producer-1| [thrd:app]: NEW topic_b [-1] 0x152808200 refcnt 0x152808290 (at rd_kafka_topic_new0:488)
%7|1727911512.054|CONF|test-app#producer-1| [thrd:app]: Topic "topic_b" configuration (default_topic_conf):
%7|1727911512.054|CONF|test-app#producer-1| [thrd:app]: request.required.acks = 1
%7|1727911512.054|CONF|test-app#producer-1| [thrd:app]: request.timeout.ms = 6000
%7|1727911512.054|CONF|test-app#producer-1| [thrd:app]: message.timeout.ms = 11000
%7|1727911512.054|CONF|test-app#producer-1| [thrd:app]: partitioner = murmur2_random
%7|1727911512.054|TOPPARNEW|test-app#producer-3| [thrd:app]: NEW topic_a [-1] 0x15200ce00 refcnt 0x15200ce90 (at rd_kafka_topic_new0:488)
%7|1727911512.054|CONF|test-app#producer-3| [thrd:app]: Topic "topic_a" configuration (default_topic_conf):
%7|1727911512.054|CONF|test-app#producer-3| [thrd:app]: request.required.acks = -1
%7|1727911512.054|CONF|test-app#producer-3| [thrd:app]: request.timeout.ms = 5000
%7|1727911512.054|CONF|test-app#producer-3| [thrd:app]: message.timeout.ms = 10000
%7|1727911512.054|CONF|test-app#producer-3| [thrd:app]: partitioner = murmur2_random
Description
Hello π
I am seeking guidance on the best practices to set up producer clients for multiple topics.
My current setup is that when my server boots up I create producer clients for each topic. So, if my server wants to publish to 1...N number of topics then I create 1...N number of clients. While theoretically, N can be any positive number in practice (in my case), it can be between 1~10.
I create the producers when the app boots up (during dependency injection). The producers are alive throughout the app lifecycle. During the application shutdown, I close the producers after flushing them.
The reason I don't create a producer per message is because of the high latency of producer client creation and then sending the message.
The motivation for creating one producer client for each topic is to provide isolation among the clients. If one client gets impacted due to any reasons (for example, messages larger than 1MB are being published for one topic so Kafka rejects it, but the other clients can continue to publish messages for other topics)
Some downsides of having multiple producer clients for multiple topics I can think of are as follows:
redundant TCP connections to Kafka: since all producer clients are connected to all broker nodes, I have a redundant TCP connection. I am assuming that this take redundancy takes some space in the application heap memory.
Could you please suggest guidance if the above approach is ok? If not could you please give me a pointer for the best approaches?
How to reproduce
Checklist
Please provide the following information:
LibraryVersion()
): 2.4.0ConfigMap{...}
"debug": ".."
as necessary)