dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

Dask requires consistent Python hashing #4141

Open itamarst opened 3 years ago

itamarst commented 3 years ago

As per https://github.com/dask/dask/issues/6640, Dask breaks if Python hashing is inconsistent across workers. This appears to be a bug in Distributed Dask backend as well:

hashing.py:

from dask.distributed import Client

from dask.bag import Bag
dsk = {("x", 0): (range, 5), ("x", 1): (range, 5), ("x", 2): (range, 5)}
b = Bag(dsk, "x", 3)

def iseven(x):
    return x % 2 == 0

def test_bag_groupby_pure_hash():
    # https://github.com/dask/dask/issues/6640
    result = b.groupby(iseven).compute()
    assert result == [(False, [1, 3] * 3), (True, [0, 2, 4] * 3)]

def test_bag_groupby_normal_hash():
    # https://github.com/dask/dask/issues/6640
    client = Client(n_workers=3)
    result = b.groupby(lambda x: "even" if iseven(x) else "odd").compute()
    assert len(result) == 2
    assert ("odd", [1, 3] * 3) in result
    assert ("even", [0, 2, 4] * 3) in result

def main():
    client = Client(n_workers=3)
    test_bag_groupby_normal_hash()
    test_bag_groupby_pure_hash()
    print("HORRAH")

if __name__ == '__main__':
    import hashing
    hashing.main()

When run:

$ python hashing.py
...
  File "../dask/hashing.py", line 36, in <module>
    hashing.main()
  File "/home/itamarst/Devel/dask/hashing.py", line 29, in main
    test_bag_groupby_normal_hash()
  File "/home/itamarst/Devel/dask/hashing.py", line 22, in test_bag_groupby_normal_hash
    assert len(result) == 2

Solving this

The solution for Dask (https://github.com/dask/dask/pull/6660) was to set PYTHONHASHSEED for worker processes. And you can do that similar solution for Distributed in some cases, e.g. Client().

However, I'm pretty sure the distributed-worker CLI just runs the worker inline, it's not a subprocess, so by the time you're running Python code the hash seed has already been set, and you can't change it. It could e.g. set it and the fork()+exec() Python again, I suppose.

mrocklin commented 3 years ago

Hrm, so dask-worker does run the worker in a sub process by default (we create a Nanny, which runs Worker in a spawned process). However, this isn't universal (some people run without the Nanny process).

Doing the same fix as we did in dask.multiprocessing would be a good first step here, and would probably solve 90% of the problem.

Going beyond that though, I'm not sure. We could consider alternative hashing functions. Are there ways to specify the hash see programmatically somehow?

itamarst commented 3 years ago

There's no public API for setting it after startup, which makes sense given what it does. Only thing I can think of is something like

import os, sys

if not os.environ.get("PYTHONHASHSEED") == "6640":
    os.environ["PYTHONHASHSEED"] = "6640"
   os.execv(sys.argv)

in the startup code (which won't work on Windows...).

itamarst commented 3 years ago

For the Distributed-specific issue, is non-nanny mode actually useful, or could it be dropped?

Stepping back to fundamental solution—

with set_hash_seed(6640):
    h = hash(obj)

is maybe possible... but there are no public APIs, so you have to be OK with munging private CPython internals. And it's global state, so it will impact other threads' hashing which will e.g. break objects. Might be better to get a PEP through so that it's possible with future Python.

Another approach is a reimplemented hash algorithm. For example https://deepdiff.readthedocs.io/en/latest/deephash.html. The question here is how good it is at hashing arbitrary objects, would need to do some digging/reading/testing. Basic sense of what it supports: https://github.com/seperman/deepdiff/blob/master/deepdiff/deephash.py#L429

itamarst commented 3 years ago

deephash seems promising:

from deepdiff import DeepHash
from pprint import pprint

class CustomHash:

    def __init__(self, x):
        self.x = x

    def __repr__(self):
        return f"CustomHash({self.x})"

objects = [
    125,
    "lalala",
    (12, 17),
    CustomHash(17),
    CustomHash(17),
    CustomHash(25),
]

for o in objects:
    print(repr(o), "has hash", DeepHash(o)[o])

Results in:

$ python example.py 
125 has hash 08823c686054787db6befb006d6846453fd5a3cbdaa8d1c4d2f0052a227ef204
'lalala' has hash 2307d4a08f50b743ec806101fe5fc4664e0b83c6c948647436f932ab38daadd3
(12, 17) has hash c7129efa5c7d19b0efabda722ae4cd8d022540b31e8160f9c181d163ce4f1bf6
CustomHash(17) has hash 477631cfc0315644152933579c23ed0c9f27743d632613e86eb7e271f6fc86e4
CustomHash(17) has hash 477631cfc0315644152933579c23ed0c9f27743d632613e86eb7e271f6fc86e4
CustomHash(25) has hash 8fa8427d493399d6aac3166331d78e9e2a2ba608c2b3c92a131ef0cf8882be14
itamarst commented 3 years ago

Moved discussion of Dask-side fix to https://github.com/dask/dask/issues/6723

towr commented 1 year ago

There really ought to be a warning about this in the documentation. It would be one thing if this raised an exception and just broke your application, but instead it unexpectedly gives completely wrong results.

If it's impossible to come up with a good hash function in two and a half years, maybe the solution is to just give groupby a hash_function argument. This could default to hash() for data where it's safe to use (like integers), and otherwise raise an exception if it's not supplied.

cisaacstern commented 7 months ago

For those following this, noting that I've just revived the discussion over in https://github.com/dask/dask/issues/6723#issuecomment-1848010871.

cisaacstern commented 7 months ago

Doing the same fix as we did in dask.multiprocessing would be a good first step here, and would probably solve 90% of the problem.

Gradually understanding the problem/solution space here, and realized that this basic fix has not yet been implemented, which I agree that would solve a large percentage of use cases. I will work up a PR for this for others to consider.