dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.55k stars 712 forks source link

PubSub functionality kills stream-based connections due to race conditions #8685

Open hendrikmakait opened 2 weeks ago

hendrikmakait commented 2 weeks ago

The PubSub functionality is prone to race conditions and may bring down stream-based connections when it raises errors.

Reproducer

from distributed import Event, Pub, Sub
from distributed.utils_test import freeze_batched_send, gen_cluster

@gen_cluster(client=True, nthreads=[("", 1)])
async def test_race(c, s, a):
    sub = Sub("a")
    in_event = Event()
    in_event_2 = Event()
    block_event = Event()
    block_event_2 = Event()
    def f(x, in_event, block_event, in_event_2, block_event_2):
        pub = Pub("a")
        in_event.set()
        block_event.wait()
        pub.put({"status": "OK"})
        in_event_2.set()
        block_event_2.wait()
        del pub
        return x

    fut = c.submit(f, 1, in_event, block_event, in_event_2, block_event_2)
    await in_event.wait()
    with freeze_batched_send(s.client_comms[c.id]):
        await block_event.set()
        await in_event_2.wait()
        del sub
        await block_event_2.set()
    await fut
    await c.submit(lambda x: x, 2)

This reproducer kills the client connection due to call in a non-existent stream handler (remove-pubpus-subscribers) and also kills the client connection with

Traceback (most recent call last):
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 970, in _handle_comm
    result = await result
             ^^^^^^^^^^^^
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/scheduler.py", line 5716, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/core.py", line 1053, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/hendrikmakait/projects/dask/distributed/distributed/pubsub.py", line 86, in remove_subscriber
    self.client_subscribers[name].remove(client)
KeyError: 'Client-89725a46-27eb-11ef-9134-be79fecea867'

if the appropriate handler is used.

I suspect that there are more possible race conditions hidden in this code.

hendrikmakait commented 2 weeks ago

Usually, we just leave things that already exist but haven't been touched in ages in distributed unless they cause issues. We have now had a Coiled customer report issues that trace back to pub/sub being used in https://github.com/saturncloud/dask-pytorch-ddp. In that light, I think that we should deprecate pub/sub to avoid future issues for users who expect this to work. I am not aware of anyone but https://github.com/saturncloud/dask-pytorch-ddp using pub/sub, so deprecating seems fine.

An alternative approach would be to rewrite it based on the existing event-logging functionality. The major difference in functionality would be that the scheduler acts as the central broker for event logs whereas pub/sub has been designed to use point-to-point communication between workers. (Between workers and clients the scheduler still acts as the broker.) Once again, I am not aware of any project but https://github.com/saturncloud/dask-pytorch-ddp using pub/sub (let alone in a performance-critical way that requires point-to-point communication), so I don't think that this difference is relevant for now.

fjetter commented 1 week ago

I'm -1 on rewriting the existing pub/sub. I think for the dask-pytorch-ddp thing the Client.subscribe_topic feature could be a better drop in replacement.

I'm OK with dropping support for Pub/Sub

mrocklin commented 1 week ago

Probably Actors are better for this kind of thing?

On Thu, Jun 13, 2024 at 6:06 AM Florian Jetter @.***> wrote:

I'm -1 on rewriting the existing pub/sub. I think for the dask-pytorch-ddp thing the Client.subscribe_topic https://distributed.dask.org/en/stable/api.html#distributed.Client.subscribe_topic feature could be a better drop in replacement.

I'm OK with dropping support for Pub/Sub

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/8685#issuecomment-2165333897, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGHLD4U3NFJ54753TTZHF4KVAVCNFSM6AAAAABJEHCNN6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCNRVGMZTGOBZG4 . You are receiving this because you are subscribed to this thread.Message ID: @.***>

mrocklin commented 1 week ago

And they have slightly better support :)

On Thu, Jun 13, 2024 at 6:07 AM Matthew Rocklin @.***> wrote:

Probably Actors are better for this kind of thing?

On Thu, Jun 13, 2024 at 6:06 AM Florian Jetter @.***> wrote:

I'm -1 on rewriting the existing pub/sub. I think for the dask-pytorch-ddp thing the Client.subscribe_topic https://distributed.dask.org/en/stable/api.html#distributed.Client.subscribe_topic feature could be a better drop in replacement.

I'm OK with dropping support for Pub/Sub

— Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/8685#issuecomment-2165333897, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACKZTGHLD4U3NFJ54753TTZHF4KVAVCNFSM6AAAAABJEHCNN6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDCNRVGMZTGOBZG4 . You are receiving this because you are subscribed to this thread.Message ID: @.***>