confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
179 stars 3.14k forks source link

Quadratic behavior in timer scheduling #3827

Open travisdowns opened 2 years ago

travisdowns commented 2 years ago

Description

Timer scheduling is O(n^2) in the number of timers (under a reasonable assumption about the interval distribution).

How to reproduce

Consume from any topic with 20k partitions with a non-zero statistics interval configured. Note a large amount of time spent in rd_kafka_timer_schedule_next (usually actually inside rd_kafka_timer_cmp which is inlined), both during initial creation of the topic and then even during idle (with poll being called periodically), with a typical stack for the latter case shown below:

rdk:main 905626 339264.203807:   32365636 cycles: 
            55648d5c00fc rd_kafka_timer_cmp+0x4c (inlined)
            55648d5c00fc rd_kafka_timer_schedule_next+0x4c (/home/tdowns/dev/librdkafka-travis/examples/many_consumers)
            55648d5c0908 rd_kafka_timer_schedule+0x1e8 (inlined)
            55648d5c0908 rd_kafka_timer_schedule+0x1e8 (inlined)
            55648d5c0908 rd_kafka_timers_run+0x1e8 (/home/tdowns/dev/librdkafka-travis/examples/many_consumers)
            55648d5947e8 rd_kafka_thread_main+0x228 (/home/tdowns/dev/librdkafka-travis/examples/many_consumers)
            7f8f785c2809 start_thread+0x139 (/usr/lib/x86_64-linux-gnu/libc.so.6)

The problem is that timer scheduling is quadratic, because each time a timer is popped from the front, we do a linear search through the list to find the new position, which in this case is always the end of the list (the timers all have the same interval so a just popped timer will reschedule with a time older than the others). So in one interval 20k timers will be resheduled each of which will look through 20k entries.

The overwhelming amount time is spent in this hot assembly loop:

hot loop

This is just doing the linked list traversal and looking for the end of the list or the position to insert the element. At best, this loop can execute one iteration every 4 cycles, so 20k reshedules costs 20,000^2 * 4 = 1.6 billion cycles, or half a second of CPU time on a 3.2 GHz machine. With a higher partition count or more clients, the CPU can be saturated just running timers.

In this case, the timers were the statistics callbacks rktp_consumer_lag_tmr but it could apply to any timer. As a workaround for this specific high partition count + stats case, we can disable statistics.

This occurs on current master f7f527d8f2ff7f5bd86856ddc43115eb4dfbba97.

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

Topic config

request.required.acks = -1 request.timeout.ms = 30000 message.timeout.ms = 300000 queuing.strategy = fifo produce.offset.report = false partitioner = consistent_random compression.codec = inherit compression.level = -1 auto.commit.enable = true auto.commit.interval.ms = 60000 auto.offset.reset = smallest offset.store.path = . offset.store.sync.interval.ms = -1 offset.store.method = broker consume.callback.max.messages = 0



 - [x] Operating system: Ubuntu 21.10
edenhill commented 2 years ago

This is a great bug report, @travisdowns!

Having that many partitions for a small number of topics is a bit of a special use-case which librdkafka is not optimized for (obviously), but this should be fixed nontheless. Another problem with consumer lag monitoring is that it creates one OffsetsRequest per partition, even though the same broker may be leader for multiple partitions, so in your case that's also a very large amount of protocol requests that will be sent - which in turn may cause head of line blocking for more necessary requests.

So, two action points:

  1. reimplement timers, perhaps using a timer wheel like https://25thandclement.com/~william/projects/timeout.c.html
  2. try to consolidate consumer_lag requests
travisdowns commented 2 years ago

Having that many partitions for a small number of topics is a bit of a special use-case which librdkafka is not optimized for (obviously), but this should be fixed nontheless.

Yes, it is a bit special, but unfortunately we cannot always control the number of partitions someone chooses to use and there may be some valid reasons for such a high number (real world case: need to distribute among 10,000s of clients, need at least one partition per client if using a dynamic consumer group).

Your suggestions sound great. I also think even something O(log(n)) like a priority heap would be fine, but timer wheel looks even more specialized to this problem (not sure about the overhead at very small counts, priority heap has similar overhead to a linked list for small counts, which is nice).