dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 719 forks source link

Sets cause non-determinism in the WorkerState #6620

Open crusaderky opened 2 years ago

crusaderky commented 2 years ago

The recent worker state machine refactor made it possible to achieve truly deterministic and repeatable behaviour in worker unit tests (at least for as long as integration with the scheduler is not needed).

There is a big element of randomness left however, which was highlighted by #6587: anything that goes into a set and then is extracted from it is in random order.

For example, it is now possible to put many tasks in the fetch queue and get the exact same Instructions as a response every time - unless any of those tasks counts more than one worker in the TaskState.who_has set. In that case, every time you restart the interpreter, WorkerState.handle_stimulus will give you a different result.

If you transition a task to memory and that task has 2 or more dependents, they will transition from waiting to ready in random order (which in turn will cause WorkerState.log to be in random order), but their place in WorkerState.ready will be deterministic, thanks to the fact that generation is appended to task priority, so the Execute instructions returned by handle_stimulus will be deterministic. However, if the dependents have resource constraints, they will be executed in random order, because WorkerState.constrained is a simple deque that ignores priority (which is a separate problem, out of scope for this ticket).

There are likely other use cases.

Proposed design 1

Proposed design 2

CC @fjetter @gjoseph92 @graingert

fjetter commented 2 years ago

Fun fact: Set ordering is deterministic for objects with deterministic hashes. It's not insertion ordered so we should not rely on what the ordering actually is. However, as long as we are just looking for determinism, it would be sufficient to change the hash implementation of TaskState (e.g. a counter) what makes sets non-deterministic is that the hash function seed changes on interpreter restart (that's a security feature, see https://docs.python.org/3/using/cmdline.html#envvar-PYTHONHASHSEED)

See the below script to test this (just run it twice). Note that I'm not even setting a random seed but the result is still reproducible since it's only about the hash values.

This is a property we can't necessarily rely on to not change over python versions but it's at the same time something very easy to test.

import random

long_list = list(range(1000))
random.shuffle(long_list)

class FakeTaskState:
    def __init__(self, counter):
        self.counter = counter

    def __hash__(self):
        return self.counter

    def __eq__(self, other):
        if not isinstance(other, type(self)):
            return False
        return other.counter == self.counter

types = [int, str, FakeTaskState]

for _typ in types:

    aset = set()
    for el in long_list:
        aset.add(_typ(el))

    reconstructed_order = []
    while aset:
        reconstructed_order.append(aset.pop())

    import pickle

    filename = f"pickled_order_{_typ.__name__}.pkl"
    try:
        with open(filename, "rb") as fd:
            pickled_reconstructed_order = pickle.load(fd)
    except OSError:
        print("File not found, yet. Skipping assert")
    else:
        if reconstructed_order == pickled_reconstructed_order:
            print(f"OK: Order for type {_typ.__name__} deterministic.")
        else:
            print(f"ERROR: Order for type {_typ.__name__} NOT deterministic.")

    with open(filename, "wb") as fd:
        pickle.dump(reconstructed_order, fd)