aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.09k stars 224 forks source link

performance degradation of producer when having many topics #943

Closed uchiiii closed 7 months ago

uchiiii commented 7 months ago

Describe the bug When a kafka cluster has many topics like 5k or more, the performance of the producer degrades drastically. I compared the performance with that of kafka-python. Here is the summary.

Running Tests for Kafka-Python
Starting benchmark for Kafka-Python producer.
Kafka-Python producer Results:
Number of Runs: 5, Number of messages: 100000, Message Size: 100 bytes.
Average Time for 100000 messages: 7.028003358840943 seconds.
Messages / sec: 14228.792289093612
MB / sec : 1.3569633759587871
===================
Running Tests for AIOKafka
/root/workspace/news/kafka-client-benchmarks/aiokafka-benchmark.py:40: DeprecationWarning: The loop argument is deprecated since 0.7.1, and scheduled for removal in 0.9.0
  producer = AIOKafkaProducer(loop=loop, **producer_config)
Starting benchmark for AIOKafka Producer.
AIOKafka Producer Results:                   
Number of Runs: 5, Number of messages: 100000, Message Size: 100 bytes.
Average Time for 100000 messages: 54.26867785453796 seconds.
Messages / sec: 1842.6835506853604
MB / sec : 0.17573199755529026
===================

aiokafka producer's perf is almost 1/10.

In my environment, kakfa cluster has more than 5000 topics. I noticed the number of topics affects producer.send function every time. SO, I will send PR to fix this.

Expected behaviour Even when # of topics becomes large, aiokafka producer is faster than kafka-python producer.

Environment (please complete the following information):

Reproducible example To measure performance I used the code here https://github.com/abhishekray07/kafka-client-benchmarks/tree/master

uchiiii commented 7 months ago

Here is the result of py-spy profiler while running aiokafka producer.

We can see that _wait_on_metadata (aiokafka/client.py) and topics (kafka/cluster.py) take most time, which I think should be avoided.

v3.9.16)0%, Active: 100.00%, Threads: 2
Total Samples 2200
GIL: 100.00%, Active: 100.00%, Threads: 2

  %Own   %Total  OwnTime  TotalTime  Function (filename)                                                                                                                                 
 71.00%  71.00%   12.40s    12.40s   topics (kafka/cluster.py)
 25.00%  96.00%    6.23s    18.69s   _wait_on_metadata (aiokafka/client.py)
  0.00%   0.00%   0.300s    0.730s   decode (kafka/protocol/types.py)
  0.00%   0.00%   0.290s    0.730s   <listcomp> (kafka/protocol/types.py)
  1.00%  98.00%   0.210s    19.29s   send (aiokafka/producer/producer.py)
  0.00%   0.00%   0.140s    0.140s   _unpack (kafka/protocol/types.py)
  0.00%   0.00%   0.120s    0.150s   append (aiokafka/producer/message_accumulator.py)
  0.00%   1.00%   0.090s    0.160s   _partition (aiokafka/producer/producer.py)
  0.00% 100.00%   0.080s    20.31s   benchmark (aiokafka-benchmark.py)
  0.00%   0.00%   0.070s    0.070s   partitions_for_topic (kafka/cluster.py)
  1.00%   1.00%   0.060s    0.060s   <lambda> (<string>)
  0.00%   0.00%   0.040s    0.040s   create_salted_password (aiokafka/conn.py)
  0.00%   0.00%   0.030s    0.030s   _serialize (aiokafka/producer/producer.py)
  0.00%   0.00%   0.030s    0.030s   debug (logging/__init__.py)
  0.00%   0.00%   0.030s    0.030s   create_future (aiokafka/util.py)
  0.00%   0.00%   0.020s    0.020s   _build (aiokafka/producer/message_accumulator.py)
  0.00%   0.00%   0.020s    0.020s   _wait (asyncio/tasks.py)
  0.00%   0.00%   0.020s    0.060s   _call_with_frames_removed (<frozen importlib._bootstrap>)
  1.00%   1.00%   0.020s    0.040s   __call__ (kafka/partitioner/default.py)
  1.00%   1.00%   0.020s    0.750s   _handle_frame (aiokafka/conn.py)
  0.00%  98.00%   0.020s    19.31s   _produce (aiokafka-benchmark.py)
  0.00%   0.00%   0.010s    0.010s   <module> (click/types.py)
  0.00%   0.00%   0.010s    0.020s   choice (random.py)
  0.00%   0.00%   0.010s    0.010s   <listcomp> (kafka/cluster.py)
  0.00%   0.00%   0.010s    0.020s   available_partitions_for_topic (kafka/cluster.py)
  0.00%   0.00%   0.010s    0.010s   isfuture (asyncio/base_futures.py)
  0.00%   0.00%   0.010s    0.010s   encode (kafka/protocol/types.py)
  0.00%   0.00%   0.010s    0.010s   get_running_loop (aiokafka/util.py)
  0.00%   0.00%   0.010s    0.030s   connect (aiokafka/conn.py)
  0.00%   0.00%   0.010s    0.010s   _verbose_message (<frozen importlib._bootstrap>)
  0.00%   0.00%   0.010s    0.010s   __init__ (asyncio/streams.py)
  0.00%   1.00%   0.010s    0.030s   handle_response (aiokafka/producer/sender.py)
  0.00%   1.00%   0.010s    0.020s   done (aiokafka/producer/message_accumulator.py)
  0.00%   0.00%   0.010s    0.060s   update_metadata (aiokafka/cluster.py)
  0.00%   0.00%   0.010s    0.010s   drain_by_nodes (aiokafka/producer/message_accumulator.py)
  0.00%   0.00%   0.010s    0.010s   _compile_bytecode (<frozen importlib._bootstrap_external>)
  0.00%   0.00%   0.010s    0.010s   <listcomp> (sre_compile.py)
  0.00%   0.00%   0.010s    0.010s   _randbelow_with_getrandbits (random.py)
  0.00%   0.00%   0.000s    0.010s   <module> (kafka/cluster.py)
  0.00%   0.00%   0.000s    0.010s   <module> (kafka/record/memory_records.py)
  0.00%   0.00%   0.000s    0.010s   <module> (zstandard/__init__.py)
  0.00%   0.00%   0.000s    0.010s   _find_spec (<frozen importlib._bootstrap>)
  0.00% 100.00%   0.000s    20.37s   <module> (aiokafka-benchmark.py)
  0.00%   0.00%   0.000s    0.010s   create_module (<frozen importlib._bootstrap_external>)
  0.00%   0.00%   0.000s    0.010s   <module> (click/termui.py)
  0.00%   0.00%   0.000s    0.040s   _step (aiokafka/conn.py)
  0.00%   0.00%   0.000s    0.010s   create_task (aiokafka/util.py)
  0.00%   0.00%   0.000s    0.020s   <module> (kafka/admin/client.py)