scipp / scippnexus

h5py-like utility for NeXus files with seamless scipp integration
https://scipp.github.io/scippnexus/
BSD 3-Clause "New" or "Revised" License
3 stars 3 forks source link

Issue with Dask scheduler? #188

Closed nvaytet closed 5 months ago

nvaytet commented 6 months ago

Sometimes, when computing a pipeline result, the whole thing hangs. When I kill it, the error message is

---------------------------------------------------------------------------
KeyboardInterrupt                         Traceback (most recent call last)
Cell In[8], line 1
----> 1 results = direct_beam(pipelines=[pipeline_full, pipeline_bands], I0=I0, niter=6)
      2 # Unpack the final result
      3 iofq_full = results[-1]['iofq_full']

File ~/code/sans/jupyter/esssans/direct_beam.py:104, in direct_beam(pipelines, I0, niter)
    101 for it in range(niter):
    102     print("Iteration", it)
--> 104     iofq_full = pipeline_full.compute(BackgroundSubtractedIofQ)
    105     iofq_slices = pipeline_bands.compute(BackgroundSubtractedIofQ)
    107     if per_layer:

File ~/code/sans/jupyter/sciline/pipeline.py:686, in Pipeline.compute(self, tp)
    674 def compute(self, tp: type | Iterable[type] | Item[T]) -> Any:
    675     """
    676     Compute result for the given keys.
    677 
   (...)
    684         Can be a single type or an iterable of types.
    685     """
--> 686     return self.get(tp).compute()

File ~/code/sans/jupyter/sciline/task_graph.py:66, in TaskGraph.compute(self, keys)
     64     return dict(zip(keys, results))
     65 else:
---> 66     return self._scheduler.get(self._graph, [keys])[0]

File ~/code/sans/jupyter/sciline/scheduler.py:78, in DaskScheduler.get(self, graph, keys)
     76 dsk = {tp: (provider, *args) for tp, (provider, args) in graph.items()}
     77 try:
---> 78     return self._dask_get(dsk, keys)
     79 except RuntimeError as e:
     80     if str(e).startswith("Cycle detected"):

File ~/software/mambaforge/lib/python3.10/site-packages/dask/threaded.py:90, in get(dsk, keys, cache, num_workers, pool, **kwargs)
     87     elif isinstance(pool, multiprocessing.pool.Pool):
     88         pool = MultiprocessingPoolExecutor(pool)
---> 90 results = get_async(
     91     pool.submit,
     92     pool._max_workers,
     93     dsk,
     94     keys,
     95     cache=cache,
     96     get_id=_thread_get_id,
     97     pack_exception=pack_exception,
     98     **kwargs,
     99 )
    101 # Cleanup pools associated to dead threads
    102 with pools_lock:

File ~/software/mambaforge/lib/python3.10/site-packages/dask/local.py:501, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    499 while state["waiting"] or state["ready"] or state["running"]:
    500     fire_tasks(chunksize)
--> 501     for key, res_info, failed in queue_get(queue).result():
    502         if failed:
    503             exc, tb = loads(res_info)

File ~/software/mambaforge/lib/python3.10/site-packages/dask/local.py:138, in queue_get(q)
    137 def queue_get(q):
--> 138     return q.get()

File ~/software/mambaforge/lib/python3.10/queue.py:171, in Queue.get(self, block, timeout)
    169 elif timeout is None:
    170     while not self._qsize():
--> 171         self.not_empty.wait()
    172 elif timeout < 0:
    173     raise ValueError("'timeout' must be a non-negative number")

File ~/software/mambaforge/lib/python3.10/threading.py:320, in Condition.wait(self, timeout)
    318 try:    # restore state no matter what (e.g., KeyboardInterrupt)
    319     if timeout is None:
--> 320         waiter.acquire()
    321         gotit = True
    322     else:

KeyboardInterrupt: 

I have a feeling this happens when there are some steps that perform a file download from pooch (file was not in local cache). I've seen that pooch tries to start downloads in parallel when requesting mutliple files. Maybe there is a clash with Dask?

I don't think it happens with sciline.NaiveScheduler?

nvaytet commented 6 months ago

I am not so sure it has to do with pooch downloads now, it seems to happen in other cases as well...

nvaytet commented 6 months ago

This is apparently related to an issue between locks in h5py and dask threading locks... See for example: https://github.com/dask/dask/issues/7547 or https://github.com/h5py/h5py/issues/2019. (thanks @SimonHeybrock for sending these links)

A small code example below can reproduce the error:

from typing import NewType, TypeVar

import sciline as sl
import scippnexus as snx
import scipp as sc

Run01 = NewType("Run01", str)
Run02 = NewType("Run02", str)
Run03 = NewType("Run03", str)
Run04 = NewType("Run04", str)
Run05 = NewType("Run05", str)

Run = TypeVar(
    "Run",
    Run01,
    Run02,
    Run03,
    Run04,
    Run05,
)

class Filename(sl.Scope[Run, str], str):
    ...

class DataGroup(sl.Scope[Run, sc.DataGroup], sc.DataGroup):
    ...

DataDict = NewType("DataDict", dict)

def load_file(filename: Filename[Run]) -> DataGroup[Run]:
    with snx.File(filename) as f:
        dg = f["entry"][()]
    return DataGroup[Run](dg)

def merge_files(
    run01: DataGroup[Run01],
    run02: DataGroup[Run02],
    run03: DataGroup[Run03],
    run04: DataGroup[Run04],
    run05: DataGroup[Run05],
) -> DataDict:
    return {
        "run01": run01,
        "run02": run02,
        "run03": run03,
        "run04": run04,
        "run05": run05,
    }

params = {
    Filename[Run01]: "60250-2022-02-28_2215.nxs",
    Filename[Run02]: "60308-2022-02-28_2215.nxs",
    Filename[Run03]: "60336-2022-02-28_2215.nxs",
    Filename[Run04]: "60353-2022-02-28_2215.nxs",
    Filename[Run05]: "60392-2022-02-28_2215.nxs",
}

providers = (load_file, merge_files)

pipeline = sl.Pipeline(providers=providers, params=params)

res = pipeline.compute(DataDict)

print(res)

Running this script hangs and when killed prints the same error message as above.

This happens when trying to load multiple files in parallel. The pipeline graph for this example is Screenshot at 2024-01-04 15-35-51

A different example where files are loaded one after the other inside a loop does not hang:

from typing import NewType

import scippnexus as snx

import sciline as sl

FileList = NewType("FileList", list)
DataDict = NewType("DataDict", dict)

def load_files(filelist: FileList) -> DataDict:
    out = {}
    for filename in filelist:
        with snx.File(filename) as f:
            dg = f["entry"][()]
        out[filename] = dg
    return DataDict(out)

params = {
    FileList: [
        "60250-2022-02-28_2215.nxs",
        "60308-2022-02-28_2215.nxs",
        "60336-2022-02-28_2215.nxs",
        "60353-2022-02-28_2215.nxs",
        "60392-2022-02-28_2215.nxs",
    ]
}

providers = (load_files,)

pipeline = sl.Pipeline(providers=providers, params=params)

res = pipeline.compute(DataDict)

print(res)
SimonHeybrock commented 6 months ago

Can you reproduce this without ScippNexus and Sciline?

jokasimr commented 5 months ago

I'm able to reproduce with scippnexus and sciline (the code @nvaytet posted above). With naive scheduler there is no issue, with dask scheduler there is a deadlock.

I've not yet been able to reproduce with only dask and h5py. Experimenting a bit with the following code:

files = [
    "60248-2022-02-28_2215.nxs",
    "60250-2022-02-28_2215.nxs",
    "60339-2022-02-28_2215.nxs",
    "60392-2022-02-28_2215.nxs",
    "60393-2022-02-28_2215.nxs",
    "60394-2022-02-28_2215.nxs",
    "60248-2022-02-28_2215.nxs",
    "60250-2022-02-28_2215.nxs",
    "60339-2022-02-28_2215.nxs",
    "60392-2022-02-28_2215.nxs",
    "60393-2022-02-28_2215.nxs",
    "60394-2022-02-28_2215.nxs",
    "60248-2022-02-28_2215.nxs",
    "60250-2022-02-28_2215.nxs",
    "60339-2022-02-28_2215.nxs",
    "60392-2022-02-28_2215.nxs",
    "60393-2022-02-28_2215.nxs",
    "60394-2022-02-28_2215.nxs",
]

import h5py

def readh5(data):
    if isinstance(data, h5py.Dataset):
        return dict(data.attrs)
    return {key: readh5(val) for key, val in data.items()}

def read_file(fname):
    with h5py.File(fname,'r') as f:
        return readh5(f['entry'])

from multiprocessing import Pool
with Pool(24) as p:
    t = p.map(read_file, files)

from dask.distributed import Client, progress
client = Client(n_workers=24, threads_per_worker=1)
client.gather(client.map(read_file, files))

The following also runs without deadlock:

import scippnexus as snx

files = [
    "60248-2022-02-28_2215.nxs",
    "60250-2022-02-28_2215.nxs",
    "60339-2022-02-28_2215.nxs",
    "60392-2022-02-28_2215.nxs",
    "60393-2022-02-28_2215.nxs",
    "60394-2022-02-28_2215.nxs",
    "60248-2022-02-28_2215.nxs",
    "60250-2022-02-28_2215.nxs",
    "60339-2022-02-28_2215.nxs",
    "60392-2022-02-28_2215.nxs",
    "60393-2022-02-28_2215.nxs",
    "60394-2022-02-28_2215.nxs",
    "60248-2022-02-28_2215.nxs",
    "60250-2022-02-28_2215.nxs",
    "60339-2022-02-28_2215.nxs",
    "60392-2022-02-28_2215.nxs",
    "60393-2022-02-28_2215.nxs",
    "60394-2022-02-28_2215.nxs",
]

def read_file_nx(fname):
    start = time.time()
    with snx.File(fname) as f:
        dg = f["entry"][()]
    end = time.time()
    return fname, start, end

from dask.distributed import Client, progress

client = Client(n_workers=24, threads_per_worker=1)
client.gather(client.map(read_file_nx, files))

and outputs

[('60248-2022-02-28_2215.nxs', 1705325478.320537, 1705325478.4924474),
 ('60250-2022-02-28_2215.nxs', 1705325478.3321545, 1705325478.5453203),
 ('60339-2022-02-28_2215.nxs', 1705325478.2532856, 1705325478.490789),
 ('60392-2022-02-28_2215.nxs', 1705325478.2536583, 1705325478.4460962),
 ('60393-2022-02-28_2215.nxs', 1705325478.2981782, 1705325478.4704068),
 ('60394-2022-02-28_2215.nxs', 1705325478.254613, 1705325478.4515636),
 ('60248-2022-02-28_2215.nxs', 1705325478.320537, 1705325478.4924474),
 ('60250-2022-02-28_2215.nxs', 1705325478.3321545, 1705325478.5453203),
 ('60339-2022-02-28_2215.nxs', 1705325478.2532856, 1705325478.490789),
 ('60392-2022-02-28_2215.nxs', 1705325478.2536583, 1705325478.4460962),
 ('60393-2022-02-28_2215.nxs', 1705325478.2981782, 1705325478.4704068),
 ('60394-2022-02-28_2215.nxs', 1705325478.254613, 1705325478.4515636),
 ('60248-2022-02-28_2215.nxs', 1705325478.320537, 1705325478.4924474),
 ('60250-2022-02-28_2215.nxs', 1705325478.3321545, 1705325478.5453203),
 ('60339-2022-02-28_2215.nxs', 1705325478.2532856, 1705325478.490789),
 ('60392-2022-02-28_2215.nxs', 1705325478.2536583, 1705325478.4460962),
 ('60393-2022-02-28_2215.nxs', 1705325478.2981782, 1705325478.4704068),
 ('60394-2022-02-28_2215.nxs', 1705325478.254613, 1705325478.4515636)]

illustrating that the files were read simultaneously.

jokasimr commented 5 months ago

Okay here is a reproduction without sciline

import h5py
import dask
import scippnexus as snx

files = [
    "60248-2022-02-28_2215.nxs",
    "60250-2022-02-28_2215.nxs",
    "60339-2022-02-28_2215.nxs",
    "60392-2022-02-28_2215.nxs",
]

def read_file_nx(fname):
    with snx.File(fname) as f:
        dg = f["entry"][()]
    return fname

dask.threaded.get({fname: (lambda: read_file_nx(fname),) for fname in files}, files)

Interestingly dask.multiprocessing.get does not deadlock.

jokasimr commented 5 months ago

This also deadlocks, so the issue seems to be unrelated to dask.

import scippnexus as snx
from multiprocessing.pool import ThreadPool

files = [
    "60248-2022-02-28_2215.nxs",
    "60250-2022-02-28_2215.nxs",
    "60339-2022-02-28_2215.nxs",
    "60392-2022-02-28_2215.nxs",
]

def read_file_nx(fname):
    with snx.File(fname) as f:
        dg = f["entry"][()]
    return fname

# Fewer threads also hang, least I've seen cause the hang is 5
with ThreadPool(24) as p:
    p.map(read_file_nx, files)
SimonHeybrock commented 5 months ago

Transferring to ScippNexus...

jokasimr commented 5 months ago

Might be an issue with combining h5py file reading and releasing the gil: Here is a comment mentioning this is a potential problem: https://github.com/h5py/h5py/issues/2019#issuecomment-998943691

import h5py
import numpy as np
from multiprocessing.pool import ThreadPool

files = [
    "60248-2022-02-28_2215.nxs",
    "60250-2022-02-28_2215.nxs",
    "60339-2022-02-28_2215.nxs",
    "60392-2022-02-28_2215.nxs",
    "60393-2022-02-28_2215.nxs",
    "60394-2022-02-28_2215.nxs",
]

def readh5(data):
    if isinstance(data, h5py.Dataset):
        arr = data[()]
        if hasattr(arr, 'reshape'):
            # Do an operation releasing the gil
            np.exp(arr + arr.reshape(-1, 1)) # <- this line makes it deadlock
            # pass # <- this does not deadlock
        return dict(data.attrs, data=arr)
    return {key: readh5(val) for key, val in data.items()}

def read_file(fname):
    with h5py.File(fname,'r') as f:
        return readh5(f['entry'])

with ThreadPool(24) as p:
    t = p.map(read_file, files)
t
SimonHeybrock commented 5 months ago

Can you clean this up into an example that does not rely on our files (maybe there is some standard hdf5 test data)? Then we could consider reporting it to h5py.

SimonHeybrock commented 5 months ago

Different question: Can you reproduce this using visititems() instead of recursion?

jokasimr commented 5 months ago

Minimal example with simple hdf5 file and no recursion:

# Create test files
import h5py
import numpy as np

files = [
    f"test{i}.h5"
    for i in range(10)
]

for fname in files:
    with h5py.File(fname, 'w') as f:
        N = 100000
        dset = f.create_dataset('data', (N,), dtype='i')
        dset[...] = np.random.randint(0, N, )
# Read files concurrently, this deadlocks.
from multiprocessing.pool import ThreadPool

def read_file(fname):
    with h5py.File(fname,'r') as f:
        arr = f['data'][()]
        # Commenting out the line below removes the deadlock
        np.exp(arr + arr.reshape(-1, 1))
        return arr

with ThreadPool(24) as p:
    result = p.map(read_file, files)

result
SimonHeybrock commented 5 months ago

np.exp(arr + arr.reshape(-1, 1))

This uses a massive amount of memory, I don't think there is a deadlock causing the problem in this case (try with 1 thread). Can you find another, working, example?

jokasimr commented 5 months ago

You're right. That was sloppy.

When I remove the reshape the previous example (hpy5 + ThreadPool) runs fine without any "deadlock" (actually caused by just doing a massive amount of work). This is still the case if I increase the size of the array.

~This example still deadlocks also with the reshape removed~ Scratch that, I made a mistake.

import ...
# some code
jokasimr commented 5 months ago

After some more experimentation the conclusion is that I'm not able to reproduce without scippnexus.

SimonHeybrock commented 5 months ago

It appears the deadlock is related to us using functools.cached_property, which apparently has a lock that locks the property across all instances. See https://discuss.python.org/t/finding-a-path-forward-for-functools-cached-property/23757 and https://github.com/python/cpython/issues/87634, which has been fixed in Python 3.12.

So (for now at least!) I fail to reproduce the lock in 3.12 (and the threaded file load completes 3x faster).