uber / uReplicator

Improvement of Apache Kafka Mirrormaker
Apache License 2.0
917 stars 199 forks source link

InstanceTopicPartitionHolder.addTopicPartition bug #271

Closed Technoboy- closed 5 years ago

Technoboy- commented 5 years ago
  1. when add a task from controller or manager, it will invoke com.uber.stream.kafka.mirrormaker.controller.core.HelixMirrorMakerManager.addTopicToMirrorMaker(String topicName, int numTopicPartitions)
  2. addTopicToMirrorMaker will go into IdealStateBuilder.buildCustomIdealStateFor() take a look at : liveInstance.addTopicPartition(new TopicPartition(topicName, i));

    public static IdealState buildCustomIdealStateFor(String topicName,
      int numTopicPartitions,
      PriorityQueue<InstanceTopicPartitionHolder> instanceToNumServingTopicPartitionMap) {
    
    final CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(topicName);
    
    customModeIdealStateBuilder
        .setStateModel(OnlineOfflineStateModel.name)
        .setNumPartitions(numTopicPartitions).setNumReplica(1)
        .setMaxPartitionsPerNode(numTopicPartitions);
    
    for (int i = 0; i < numTopicPartitions; ++i) {
      InstanceTopicPartitionHolder liveInstance = instanceToNumServingTopicPartitionMap.poll();
      if (liveInstance != null) {
        customModeIdealStateBuilder.assignInstanceAndState(Integer.toString(i),
            liveInstance.getInstanceName(), "ONLINE");
        //here will count bigger than actual partition
        liveInstance.addTopicPartition(new TopicPartition(topicName, i));
        instanceToNumServingTopicPartitionMap.add(liveInstance);
      }
    }
  3. so I add another new method for InstanceTopicPartitionHolder
    public void addOneTopicPartition(TopicPartition topicPartitionInfo) {
    _topicPartitionSet.add(topicPartitionInfo);
    _totalNumPartitions = _totalNumPartitions + 1;
    }