dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.57k stars 717 forks source link

Worker task deserialization and ThreadPoolExecutor interactions #2965

Open jrbourbeau opened 5 years ago

jrbourbeau commented 5 years ago

I've had users report issues where a task arrives at a worker and is deserialized on the worker's main thread (here https://github.com/dask/distributed/blob/2bff61d9bee59e0bf655937922d9d4c37e49820a/distributed/worker.py#L1328) which causes code to be executed that interacts poorly with other tasks running concurrently on the worker ThreadPoolExecutor. Is there an existing mechanism that could help prevent this task deserialization and task execution from happening at the same time?

One potential development could be to have task deserialization happen on the ThreadPoolExecutor right before the task is executed. This won't fix the problem in general, but would allow for limiting the worker's ThreadPoolExecutor to a single thread so that task deserialization and task execution never happen concurrently. This approach also has the added benefit of not deserializing tasks that are stolen before being executed. That said, I'm sure this approach also has some associated down sides and may not be appropriate for the common case.

Does this approach seem like a reasonable way forward (I'm not sure if it is)? Is improving non-threadsafe code execution in scope for distributed?

mrocklin commented 5 years ago

One straightforward way to resolve this would be with a SerializedLock, both in the deserialization code and in execution code. However that assumes that people have control over that code.

It is not entirely unreasonable to put task deserialization in with execution. However, I wouldn't be surprised if this uncovers as many or more bugs than it fixes. I don't have particularly strong thoughts on this, except for a default preference for status quo.

mrocklin commented 5 years ago

Another complication. I thought that we cached deserialization of functions. This is particularly valuable (maybe?) for numba jitted functions, where we don't want to have to keep jitting code.

It looks like we don't currently cache deserialization of functions though (although we do cache serialization). Anyway, this is the sort of thing that would conflict with deserializing in different threads

jrbourbeau commented 5 years ago

Thanks for the SerializedLock suggestion! I'll see if it's applicable to the particular use case I'm thinking of.

IIUC using a lock to coordinate between _deserialize on the worker's main loop and running tasks in the ThreadPoolExecutor will cause the main loop to block and be unresponsive when executing longer running tasks (e.g. the worker won't be heartbeating, transferring dependencies, etc.). Do we have a way to coordinate handlers like distributed/worker.py::add_task in a non-blocking way?

I don't have particularly strong thoughts on this, except for a default preference for status quo.

I have a similar preference, but it might be worth me submitting a preliminary PR which moves task deserialization to the worker ThreadPoolExecutor in order to test against CI. This could give us an idea of how many failures the move introduces (at least as measured by use cases in the test suite)

mrocklin commented 5 years ago

As a note, I think that this also comes up with Keras. I think that Tensorflow wants to run computations in the same thread where they are deserialized.

Carreau commented 4 years ago

As far as I can tell, and tell me if I'm wrong data/dependencies deserialisation (in gather, or get from worker) appear to be happening as the same time as message deserialization, in serialize.py:deserialize(), would it be ok to introduce there a mode where the object returned does not contain the data, but a object that lazyly deserialize when asked for it ?

One drawback in worker would be the inability to compute the per-type bandwidth, or maybe delay this computation until the data is ran through the TPE.