confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
254 stars 3.15k forks source link

Not switching to a leader without leader epochs while doing a cluster roll to upgrade the cluster #4796

Open lvhuat opened 2 months ago

lvhuat commented 2 months ago

Description:

After upgrading Kafka from version 2.2.1 to 3.5.1, we observed issues related to leader switching. The specific issues and reproduction steps are as follows:

Process:

  1. During the Kafka upgrade, there were frequent leader switches.
  2. In Kafka 2.2.1, there was no leader epoch in the metadata; it was set to -1.
  3. When a node upgrades and the leader is on the upgrading node (e.g., node 2), the leader epoch becomes effective, let's assume it is 20.
  4. If the leader switches back to node 1, sending messages at this point returns a NotLeader error, triggering a MetadataRequest.
  5. Due to a weighting algorithm, all requests are received by node 1. Node 1 returns all leader epochs as -1. Since -1 < 20, no update occurs.
  6. All nodes are incorrectly routed to node 2, forming a loop until node 1 starts upgrading.

Expected Behavior:

In such cases, the metadata should update correctly, allowing proper leader switching and load balancing.

Actual Behavior:

The metadata fails to update correctly, causing all requests to be incorrectly routed to a specific node, forming an error loop.

Additional Information:

Relevant Kafka configuration and librdkafka version are as follows:

partial logger

[2024-07-26 02:30:16.729] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 1 Epoch -1|0||0|false
[2024-07-26 02:31:00.839] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 1 Epoch -1|0||0|false
[2024-07-26 02:31:01.334] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:01.642] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:01.835] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 1 Epoch 75|0||0|false
[2024-07-26 02:31:02.338] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 1 Epoch 75|0||0|false
[2024-07-26 02:31:02.846] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:03.352] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:03.854] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:04.436] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:04.880] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:05.388] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:05.894] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:06.400] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:06.903] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:07.406] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:07.907] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:08.411] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:08.920] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:09.436] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:31:09.938] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false

a lot of same info

[2024-07-26 02:46:03.228] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:46:03.733] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:46:04.236] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:46:04.748] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:46:05.241] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:46:05.746] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch -1|0||0|false
[2024-07-26 02:46:06.640] [info] ConsumerEvent|my-topic-name|LOG|Success|7|METADATA|[thrd:main]:   Topic my-topic-name partition 0 Leader 2 Epoch 77|0||0|false

code

static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
                                         int32_t partition,
                                         int32_t leader_id,
                                         rd_kafka_broker_t *leader,
                                         int32_t leader_epoch) {
        rd_kafka_toppar_t *rktp;
        rd_bool_t need_epoch_validation = rd_false;
        int r                           = 0;

        rktp = rd_kafka_toppar_get(rkt, partition, 0);
        if (unlikely(!rktp)) {
                /* Have only seen this in issue #132.
                 * Probably caused by corrupt broker state. */
                rd_kafka_log(rkt->rkt_rk, LOG_WARNING, "BROKER",
                             "%s [%" PRId32
                             "] is unknown "
                             "(partition_cnt %i): "
                             "ignoring leader (%" PRId32 ") update",
                             rkt->rkt_topic->str, partition,
                             rkt->rkt_partition_cnt, leader_id);
                return -1;
        }

        rd_kafka_toppar_lock(rktp);

        if (leader_epoch < rktp->rktp_leader_epoch) {
                rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
                             "%s [%" PRId32
                             "]: ignoring outdated metadata update with "
                             "leader epoch %" PRId32
                             " which is older than "
                             "our cached epoch %" PRId32,
                             rktp->rktp_rkt->rkt_topic->str,
                             rktp->rktp_partition, leader_epoch,
                             rktp->rktp_leader_epoch);
                if (rktp->rktp_fetch_state !=
                    RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) {
                        rd_kafka_toppar_unlock(rktp);
                        rd_kafka_toppar_destroy(rktp); /* from get() */
                        return 0; // <----------------------------------------------------------it quit directly
                }
        }
emasab commented 2 months ago

To prevent this and improve availability when doing a cluster roll to upgrade to a version with KIP 320 we can do a change so validations are skipped when new leader doesn't support them, even if a previous leader did.