Open larry-cdn77 opened 2 years ago
As far as I have been able to tell, the excessive CPU usage is of two kinds.
1. Fast leader queries. Something happens with metadata for one broker or with the broker itself. Produce requests continuously fail with NOT_LEADER_FOR_PARTITION
. With 320 partitions quite a few produce requests accumulate. I have seen hundreds a second. A failed request looks to see if a fast leader query should be made and—presumably due to #3690—always makes one, which turns into a metadata request. Partition leader queries override the mechanism for skipping repeat metadata requests (so they can respond to cluster events quicker) and therefore loads end up sent with a chunk of CPU time spent processing replies (replies apply to all partitions).
As a workaround, I lower the rate of fast leader queries, maybe as much as:
topic.metadata.refresh.fast.interval.ms 15000
I did not yet understand how the broker error sustains or repeats itself, or what it is that seems to happen on the one broker. It is not a transient event, however. In the one case I have detailed logs for, it went on for half an hour until I restarted the producer.
2. Broker wake-ups. Somehow, the same broker thread that creates the metadata storm in the previous point also repeatedly enters timeout scanning. The message rate is non-trivial and each scan finds a timed-out message. With idempotence, the broker serve is now supposed to drain in-flight produce requests and issue a wake-up. There end up being so many wake-ups as much as 30% CPU is consumed sort-inserting them into ops queue. I am seeing close to 5 x 320 each serve period (is that 1 second?), where 5 is my Kafka cluster size. Later, as if caused by the wake-ups themselves, other broker threads enter cycles of timeout scanning. So multiply the wake-up rate by 5 once more.
As a naive mitigation, I issue the broker wake-up op out of order directly to head of queue, as if a higher priority than flash (and reverse order):
void rd_kafka_broker_wakeup (rd_kafka_broker_t *rkb) {
rd_kafka_op_t *rko = rd_kafka_op_new(RD_KAFKA_OP_WAKEUP);
- rd_kafka_op_set_prio(rko, RD_KAFKA_PRIO_FLASH);
- rd_kafka_q_enq(rkb->rkb_ops, rko);
+ rd_kafka_q_enq1(rkb->rkb_ops, rko, rkb->rkb_ops, 1, 1);
rd_rkb_dbg(rkb, QUEUE, "WAKEUP", "Wake-up");
}
I have not tested this change thoroughly. Particularly not with consumer. Would separate queues or binary heap be a good alternative implementation of #1088 to remove the need for sorted insert?
Despite raising more questions than answers, I chose to submit this update as the two workarounds mean I can re-deploy my producer in production. I no longer see dramatic CPU spikes.
On further investigation and code reading, broker wake-ups as per point 2 above only seem to spike CPU for a few minutes following cluster shutdown until metadata expires and broker-assigned toppars stop seeing message timeouts that make them generate drain-bump-wakeup events. Let me elaborate on the sequence of events. We are not restarting the cluster yet, just stop it and wait.
The cluster shutdown is clean and one broker at a time. Partitions are gradually reassigned until the last broker to shut down is left with 256 and remaining 64 (which this broker has no replicas of) are given to the internal broker thread. Other broker threads have no active toppars.
Kafka TCP/IP port is closed via firewall and it takes 2-3 minutes for ETIMEDOUT to arrive in response to a NONBLOCK socket connect (itself an EINPROGRESS). Broker threads wait in CONNECT state, 1 top level update a second. The interval is spent waiting for a transport I/O event (at first it seemed unfair to leave no waiting time for the subsequent operations serve in rd_kafka_ops_io_serve
but I may have missed a design intent). So every 2-3 minutes broker threads run through DOWN-INIT-TRY and back into CONNECT. The last broker goes from INIT to TRY because it has a connection need. Other brokers go from INIT to TRY because main thread's 1-second timer spots them being in INIT and sends them a connect operation as part of 'refresh unavailable topics' (it's trying to connect to 'any' broker and I wonder if it should respect sparse connections and pick a broker with toppars).
Messages start to time out (default 5 minutes). Only the last broker, the one with active toppars, responds to dying messages with drain-bump and wake-up. Up to 256 drain-bumps in each 1-second interval occur. Brokers without toppars have a chance to eat through all those wakeups in INIT state every time a connection timeout takes them into it. Due to sparse connections they have no connection need and thus are able to circle around in INIT state (no transport I/O to wait for like in CONNECT state so it goes quick). The last broker has a connection need that always takes it from INIT to TRY without a chance to eat additional wakeups.
The attached graph shows brokers 1-4 eating through their wakeups on each connection timeout while broker 5 just accumulates wakeups. At 15:00 is cluster stop.
Broker threads get an additional opportunity to eat through a backlog of operations as they spent about 100ms in the TRY state doing a reconnect backoff. I have seen no more than 2,000 wake-ups consumed in this way, which is not enough on distant machines that take a long time to arrive at ETIMEDOUT, meaning more time spent in CONNECT scanning for timeouts and making wake-up operations.
Metadata expires (default 15 minutes). New messages are assigned to the UA partition. Remaining messages time out in the last broker's toppars and connection need ceases. It is only now that the last broker gets to do its rounds in INIT state and finally eat that backlog of wakeups. (I have seen 100,000 accumulate and take several seconds to process.)
Now stable, with a UA toppar message queue of a few thousand. Main thread is looking after aging those because without metadata they don't belong to any broker thread (numbered or internal). Most importantly, after cluster restart I see no excess CPU usage due to wakeup operations.
The trouble with 100,000 wake-ups—as I eluded to earlier—is that their sorted insert is effectively quadratic complexity. I appreciate that inserting wake-ups out of order is hacky, and wonder if, apart from using separate priority queues or a priority heap, it would work to only do a wake-up after the entire topic scan in rd_kafka_broker_produce_toppars
. Not for each partition in rd_kafka_toppar_producer_serve
. It would certainly mitigate the extra CPU usage. I suppose one dramatic scenario is a flappy network and sporadically refreshed metadata, each time spiking CPU before expiring.
Furthermore, I must be missing something when I imagine the all-broker wake-up introduced in 54711c3 be unnecessary because broker thread state machine is serviced at least every 1000ms. A word of education would be wholeheartedly welcome 😄
Further on broker wake-ups, I attach a snippet from my production machine where 64 producers run simultaneously. Graph points are second-by-second aggregated counts of the 'timed out' (only broker number 5, the last one to stop, has those) and 'Wake-up' (all 5 brokers) debug messages. The graphs illustrate how message timeouts lead to excessive broker wake-ups. Cluster stop at 18:45, cluster start only later so not visible.
Don't know how I missed this issue, but it is pure gold! Great troubleshooting and analysis @larry-cdn77 !
I just merged a bunch of producer latency fixues to master. Would it be possible for you to try to reproduce this on master?
I noticed the recent changes, and would like to try them although cannot be sure when will be the next opportunity
Will certainly post any updates here, thanks!
I think the issue is resolved with Magnus changes but I will wait for you to confirm.
Cluster unavailable (eg network partition) for 15 minutes or so can put an idempotent producer into a state of excessive CPU usage. It can take several minutes after cluster restart for CPU usage to climb up and it tends to come and go in bursts of a few minutes at a time as one example graph demonstrates.
What it needs:
Log attached up to level 6. I have level 7 for searching (200GB). Graphs of frequency by facility is attached that might be useful. I have a made a gprof file, which I attach too.
Further, attached is a minimal C producer snippet that I use to demonstrate the problem, albeit at smaller scale (production setup has 64 producers). In there are also specific configuration lines. This snippet runs at 1-2% CPU usage on a Silver Xeon core but 30-40% after cluster outage.
Checklist:
Any thoughts would be greatly appreciated
worker.c.txt debug.log.gz controller.log.gz gmon.out.gz