Yelp / kafka-utils

Apache License 2.0
314 stars 128 forks source link

Fix iteration bug in --max-movement-size extract actions #228

Closed Baisang closed 5 years ago

Baisang commented 5 years ago

The original code had a bug that was not possible to observe until the addition of the --max-movement-size parameter.

The bug is a simple one, basically just removing an element from a list while iterating through it. It will cause the loop to terminate early, e.g.

things = [0, 1]
for thing in things:
    print(thing)
    things.remove(thing)

The above will only print out 0, skipping 1, as the loop terminates early.

We did observe the potential issue, but since we did not experience any actual issues, we did not change it. The reason we did not observe any actual issues before was due to the outer while loop in our code, which will continually loop through all actions (as long as we are guaranteed to find some action). Using the example above again,

things = [0, 1]
print_count = 0
while print_count < 2:
    for thing in things:
        print(thing)
        things.remove(thing)
        print_count += 1

This above loop will print out both 0 and 1; because even though the loop terminates early, we still have another while iteration to be able to go through the list and print the second element. A similar behavior occurred in our code. However, the added addition of the --max-movement-size condition complicated things, as we are no longer guaranteed to find some action. This meant that the outer while loop would only iterate once through topic_actions, and we would not be able to truly iterate through all actions. Using the original code to highlight this,

        topic_actions = defaultdict(list)
        for t_p, replica_change_cnt in movement_counts:
            topic_actions[t_p[0]].append((t_p, replica_change_cnt))

        extracted_actions = []
        curr_movements = 0
        curr_size = 0
        action_available = True
        while curr_movements < max_movements and curr_size <= max_movement_size and action_available:
            action_available = False
            for topic, actions in six.iteritems(topic_actions):
                for action in actions: # If the first action of every action in actions is > max_movement_size...
                    action_size = cluster_topology.partitions[action[0]].size
                    # ... Then this if statement will always execute, and we will always exit the loop early
                    # which means we never get to see the entire list of actions
                    if curr_movements + action[1] > max_movements or curr_size + action_size > max_movement_size:
                        actions.remove(action)
                    else:
                        action_available = True
                        extracted_actions.append(action[0])
                        curr_movements += action[1]
                        curr_size += action_size
                        actions.remove(action)
                        break
            # By this time, we've gone through all topics, but haven't found an action, so the while loop won't repeat again
        return extracted_actions
88manpreet commented 5 years ago

+1

yzernik commented 5 years ago

+1