haesleinhuepf / napari-time-slicer

A meta plugin for processing timelapse data timepoint by timepoint in napari
BSD 3-Clause "New" or "Revised" License
5 stars 3 forks source link

Have 4D dask arrays as result of time-sliced functions #6

Open haesleinhuepf opened 2 years ago

haesleinhuepf commented 2 years ago

This turns result of time-slicer annotated functions into 4D delayed dask arrays as proposed by @Cryaaa in #5

This PR doesn't fully work yet in the interactive napari user-interface. After setting up a workflow and when going through time, it crashes sometimes with a KeyError while saving the duration of an operation. This is related to a computation finishing while the result has already be replaced. Basically multiple threads writing to the same result. It's this error: https://github.com/dask/dask/issues/896

Reproduce:

I'm not sure yet how to solve this.

Cryaaa commented 2 years ago

@haesleinhuepf, that is very weird. I encounter the same error even only using one workflow step. This is extremely alarming since I wasn't encountering the error when doing my own tests. Definitely looking into this though..

haesleinhuepf commented 2 years ago

This is extremely alarming since I wasn't encountering the error when doing my own tests.

I'd that's good because it means that I introduced that issue. That means, we can also remove it again :-) A major change I did, which might be related is that I put the "getting time point from array" step into the delayed function here.

Cryaaa commented 2 years ago

@haesleinhuepf, actually it might have something to do with the assistant. I've done some further testing and the time-slicer dask function you created works with simple test functions I generated and with the nsbatwm-functions when I make simple guis without the wrappers introduced in the assistant. My feeling is that somewhere in the many layers of wrappers things are getting messed up. I'll report back once I have something concrete though. As for the testing notebooks, shall I add them to this branch so you can see what I'm testing?

haesleinhuepf commented 2 years ago

Hi @GenevieveBuckley,

I'm tagging you here because you're the most dask-knowledgable person I know. Maybe, you have an Idea how to fix an issue here in this draft.

Assume there is napari open with 4 layers, which are connected via dask. Layer 2 has a 4D dask-array as data, that computes on the fly (delayed) 3D images from layer 1. Layer three works the same and produces image data from layer 2, and so on.

In napari this works, until the user moves the time-slider too fast. The layers then access each other in a hard-to-reproduce way (race conditions) and it crashes in this line in dask:

https://github.com/dask/dask/blob/50ab8af982a31b186a8e47597f0ad7e5b59bcab5/dask/cache.py#L56

because this line was called just before and emptied the dictionary:

https://github.com/dask/dask/blob/50ab8af982a31b186a8e47597f0ad7e5b59bcab5/dask/cache.py#L65

To me it sounds like dask is not thread-safe, which is hard to believe. I conclude, that we are using dask in a wrong way.

Did you ever see a dask-napari example where multiple dask-backed-delayed layers were computed from each other?

If we could make this to work, it would be a game-changer!

Any hint is welcome!

Thanks!

Best, Robert

GenevieveBuckley commented 2 years ago

If you are running into race conditions, perhaps you might want to look at asynchronous operation of Dask? I'm not very familiar with this, but have a look at the docs here and see if you think it might help: https://distributed.dask.org/en/latest/asynchronous.html

My other best advice is to post a copy of this question on the Dask discourse forum, to try and get more eyes on it: https://dask.discourse.group/ @jacobtomlinson is quite knowledgeable about dask/distributed, so maybe he has some insight? I see @gjoseph92 has also been doing a fair bit of work recently over there too.

Did you ever see a dask-napari example where multiple dask-backed-delayed layers were computed from each other?

No, I think I've only seen examples like Talley's lattice lightsheet deskew/deconv, which have one dask array computed on the fly from the raw data. I'm not convinced it is a good idea to chain multiple dask arrays together like that - would that maybe cause Dask to recompute the earlier tasks/layers multiple times? My gut feeling is that it might end up being pretty slow.

GenevieveBuckley commented 2 years ago

To me it sounds like dask is not thread-safe, which is hard to believe. I conclude, that we are using dask in a wrong way.

I did find this issue at dask/distributed, so at the very least, parts of dask are known to NOT be thread safe: https://github.com/dask/distributed/issues/3827 What to do about that information, I'm not so sure (let's see if your question on https://dask.discourse.group/ generates any suggestions first)

gjoseph92 commented 2 years ago

it sounds like dask is not thread-safe, which is hard to believe

The dask API is absolutely not thread-safe. Dask can execute tasks in multiple threads safely, but virtually nothing in dask is guaranteed to be safe to interact with from multiple threads. Many things happen to be thread-safe, but it's not part of the API contract and shouldn't be relied on.

I haven't looked at your code at all, but if you're sharing the same Cache instance between multiple threads, which might be calling compute concurrently, that's definitely going to cause issues. (Also, be aware that the dask cache doesn't avoid recomputation like you'd expect: https://github.com/dask/dask/issues/6228.)

haesleinhuepf commented 2 years ago

HI @gjoseph92,

thanks for the feedback!

I haven't looked at your code at all, but if you're sharing the same Cache instance between multiple threads, which might be calling compute concurrently, that's definitely going to cause issues. (Also, be aware that the dask cache doesn't avoid recomputation like you'd expect: https://github.com/dask/dask/issues/6228.)

How can I use different caches in different threads? My code is at the core something like:

               lazy_arrays = [delayed(partial(some_function)) for some_function in list_of_functions]

               dask_arrays = [
                    da.from_delayed(
                        delayed_reader,
                        shape=output_sample.shape,
                        dtype=output_sample.dtype)]
                    )
                    for delayed_reader in lazy_arrays
                ]

Can I add a custom cache somewhere in dask.delayed or dask.array.from_delayed?

Thanks! Robert