Open fangzhouxie opened 3 months ago
+1
Thank you for the error report. Are you able to produce a minimal reproducer? That would help a lot in identifying the issue.
I can't (yet) come up with an example of how to reproduce the error but I was able to solve it.
In our example, we were using delayed API. The inputs and outputs of our delayed functions are list[pd.DataFrame]
objects. The problem was that in our functions, we were mutating the inputs and returning them. Bad idea, because the dask best practice mentions that we shouldn't do it.
Somewhere in our computation graph, we had a delayed function which took two list[pd.DataFrame]
s. Both of these inputs had the same root delayed function.
(We think) this caused dask to run into deserialization issues with this function. We solved this issue by making sure that our delayed functions actually copy the inputs (using the .copy()
method) before mutating them. The issue vanished after that.
I don't know if it helps your case but I thought I should write it down here in case someone else runs into this issue and maybe this might help them.
Describe the issue:
Hi team, in our use case, task B depends on the output of task A, which is a pandas dataframe. Occasionally, dask/distributed fails to deserialize the output of task A (i.e. a pandas dataframe) while gathering dependencies for task B, causing task B to hang forever. Currently, we can fix the hanging task by killing its worker so that it is retried on another worker.
Minimal Complete Verifiable Example: Unfortunately I was not able to re-create the issue in dev environment even though it has happened multiple times in our production environment.
Anything else we need to know?: It would be great if tasks could be retried on deserialization errors. But that does not seem to be the case right now according to https://github.com/dask/distributed/issues/6705.
Environment:
2024.1.0
1.5.3
3.10.8
Ubuntu 20.04.6 LTS
pip
Stacktrace