redpanda-data / redpanda

Redpanda is a streaming data platform for developers. Kafka API compatible. 10x faster. No ZooKeeper. No JVM!
https://redpanda.com
9.66k stars 589 forks source link

Cancel an ongoing AlterPartitionReassignment when a new one is issued #8697

Open NyaliaLui opened 1 year ago

NyaliaLui commented 1 year ago

From KIP-455

If a new reassignment is issued during an on-going one, we cancel the current one by emptying out both AR and RR, send StopReplica requests to the replicas that were part of the old reassignment but not part of the new one, construct AR/RR from (the updated from the last-reassignment) R and TR, and start anew.

It is unclear if Kafka implements this. Attempts to test this with the kafka-reassign-partitions.sh script failed because, when we submit requests with the identical reassignments, the script will throw instead of passing it to the Kafka API.

Redpanda currently returns UNKNOWN_SERVER_ERROR in this situation.

JIRA Link: CORE-1162

NyaliaLui commented 1 year ago

Putting this here for reference.

Kafka does cancel an ongoing reassignment when we submit an identical one.

After starting up Kafka with debug logging, creating a topic, and producing data to the topic...

  1. Set throttle rates on all brokers. For example, on broker 1:

    ./bin/kafka-configs.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094,localhost:9095 --add-config 'leader.replication.throttled.rate=10,follower.replication.throttled.rate=10' --alter --broker 1
  2. Execute identical reassignments, back-to-back

    kcl -X seed_brokers=localhost:9092,localhost:9093,localhost:9094,localhost:9095 --no-config-file admin partas alter 'foo:0->2,3,1;1->3,1,0;2->1,0,2'; echo $?; kcl -X seed_brokers=localhost:9092,localhost:9093,localhost:9094,localhost:9095 --no-config-file admin partas alter 'foo:0->2,3,1;1->3,1,0;2->1,0,2'; echo $?;
    foo   2     OK    
    foo   0     OK    
    foo   1     OK    
    0
    foo   2     OK    
    foo   0     OK    
    foo   1     OK    
    0
  3. Look at some kafka logs

    [2023-02-14 10:41:09,887] DEBUG [Controller id=2, targetBrokerId=2] Received UPDATE_METADATA response from node 2 for request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=2, correlationId=29): UpdateMetadataResponseData(errorCode=0) (org.apache.kafka.clients.NetworkClient)
    [2023-02-14 10:41:09,889] DEBUG [Controller id=2, targetBrokerId=3] Received STOP_REPLICA response from node 3 for request with header RequestHeader(apiKey=STOP_REPLICA, apiVersion=3, clientId=2, correlationId=28): StopReplicaResponseData(errorCode=0, partitionErrors=[StopReplicaPartitionError(topicName='foo', partitionIndex=2, errorCode=0)]) (org.apache.kafka.clients.NetworkClient)
    [2023-02-14 10:41:09,889] DEBUG [Controller id=2, targetBrokerId=3] Sending STOP_REPLICA request with header RequestHeader(apiKey=STOP_REPLICA, apiVersion=3, clientId=2, correlationId=29) and timeout 30000 to node 3: StopReplicaRequestData(controllerId=2, controllerEpoch=1, brokerEpoch=85, deletePartitions=false, ungroupedPartitions=[], topics=[], topicStates=[StopReplicaTopicState(topicName='foo', partitionStates=[StopReplicaPartitionState(partitionIndex=2, leaderEpoch=4, deletePartition=true)])]) (org.apache.kafka.clients.NetworkClient)
    [2023-02-14 10:41:09,889] INFO [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=0] Shutting down (kafka.server.ReplicaFetcherThread)
    [2023-02-14 10:41:09,889] INFO [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=0] Client requested connection close from node 1 (org.apache.kafka.clients.NetworkClient)
    [2023-02-14 10:41:09,890] DEBUG [Controller id=2, targetBrokerId=1] Received LEADER_AND_ISR response from node 1 for request with header RequestHeader(apiKey=LEADER_AND_ISR, apiVersion=6, clientId=2, correlationId=28): LeaderAndIsrResponseData(errorCode=0, partitionErrors=[], topics=[LeaderAndIsrTopicError(topicId=47jFKy_4RniZs3rxLLrwew, partitionErrors=[LeaderAndIsrPartitionError(topicName='', partitionIndex=2, errorCode=0)])]) (org.apache.kafka.clients.NetworkClient)
    [2023-02-14 10:41:09,890] DEBUG [Controller id=2, targetBrokerId=2] Sending UPDATE_METADATA request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=2, correlationId=30) and timeout 30000 to node 2: UpdateMetadataRequestData(controllerId=2, controllerEpoch=1, brokerEpoch=79, ungroupedPartitionStates=[], topicStates=[UpdateMetadataTopicState(topicName='foo', topicId=47jFKy_4RniZs3rxLLrwew, partitionStates=[UpdateMetadataPartitionState(topicName='foo', partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=2, isr=[3, 0, 2, 1], zkVersion=3, replicas=[3, 1, 0, 2], offlineReplicas=[])])], liveBrokers=[UpdateMetadataBroker(id=3, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=9095, host='rp-comp', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=0, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=9092, host='rp-comp', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=1, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=9093, host='rp-comp', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=2, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=9094, host='rp-comp', listener='PLAINTEXT', securityProtocol=0)], rack=null)]) (org.apache.kafka.clients.NetworkClient)
    [2023-02-14 10:41:09,890] DEBUG [Controller id=2, targetBrokerId=1] Sending UPDATE_METADATA request with header RequestHeader(apiKey=UPDATE_METADATA, apiVersion=7, clientId=2, correlationId=29) and timeout 30000 to node 1: UpdateMetadataRequestData(controllerId=2, controllerEpoch=1, brokerEpoch=81, ungroupedPartitionStates=[], topicStates=[UpdateMetadataTopicState(topicName='foo', topicId=47jFKy_4RniZs3rxLLrwew, partitionStates=[UpdateMetadataPartitionState(topicName='foo', partitionIndex=2, controllerEpoch=1, leader=1, leaderEpoch=4, isr=[1, 2, 0], zkVersion=5, replicas=[1, 0, 2], offlineReplicas=[])])], liveBrokers=[UpdateMetadataBroker(id=3, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=9095, host='rp-comp', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=0, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=9092, host='rp-comp', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=1, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=9093, host='rp-comp', listener='PLAINTEXT', securityProtocol=0)], rack=null), UpdateMetadataBroker(id=2, v0Host='', v0Port=0, endpoints=[UpdateMetadataEndpoint(port=9094, host='rp-comp', listener='PLAINTEXT', securityProtocol=0)], rack=null)]) (org.apache.kafka.clients.NetworkClient)
    [2023-02-14 10:41:09,891] DEBUG [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=0] Cancelled in-flight FETCH request with correlation id 714 due to node 1 being disconnected (elapsed time since creation: 350ms, elapsed time since send: 350ms, request timeout: 30000ms): FetchRequestData(clusterId=null, replicaId=3, maxWaitMs=500, minBytes=1, maxBytes=10485760, isolationLevel=0, sessionId=55507672, sessionEpoch=714, topics=[], forgottenTopicsData=[], rackId='') (org.apache.kafka.clients.NetworkClient)
    [2023-02-14 10:41:09,891] DEBUG [Controller id=2, targetBrokerId=0] Received STOP_REPLICA response from node 0 for request with header RequestHeader(apiKey=STOP_REPLICA, apiVersion=3, clientId=2, correlationId=23): StopReplicaResponseData(errorCode=0, partitionErrors=[StopReplicaPartitionError(topicName='foo', partitionIndex=0, errorCode=0)]) (org.apache.kafka.clients.NetworkClient)

From KIP-455, If a new reassignment is issued during an on-going one, we cancel the current one by emptying out both AR and RR, send StopReplica requests to the replicas that were part of the old reassignment but not part of the new one, construct AR/RR from (the updated from the last-reassignment) R and TR, and start anew.

That StopReplica request is what we are looking for in the logs and we indeed see them. For example,

[2023-02-14 10:41:09,889] DEBUG [Controller id=2, targetBrokerId=3] Sending STOP_REPLICA request with header RequestHeader(apiKey=STOP_REPLICA, apiVersion=3, clientId=2, correlationId=29) and timeout 30000 to node 3: StopReplicaRequestData(controllerId=2, controllerEpoch=1, brokerEpoch=85, deletePartitions=false, ungroupedPartitions=[], topics=[], topicStates=[StopReplicaTopicState(topicName='foo', partitionStates=[StopReplicaPartitionState(partitionIndex=2, leaderEpoch=4, deletePartition=true)])]) (org.apache.kafka.clients.NetworkClient)
NyaliaLui commented 1 year ago

Redpanda currently returns UNKNOWN_SERVER_ERROR in this situation.

This is because our topics frontend will check for inprogress updates and return errc::update_in_progress which we then map to unknown_server_error in kafka/server/errors.h::map_topic_error_code