dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.58k stars 718 forks source link

`dask.distributed.Lock` always async #6402

Open jamesstidard opened 2 years ago

jamesstidard commented 2 years ago

What happened: Using an asynchronous dask.distributed.Client both the asynchronous and synchronous context managers are asynchronous. Producing a coroutine 'PooledRPCCall.__getattr__.<locals>.send_recv_from_rpc' was never awaited at runtime.

What you expected to happen: I would intuitively always expect async functions to be asynchronous and regular functions synchronous.

Minimal Complete Verifiable Example:

import asyncio

from dask.distributed import Client
from dask.distributed import Lock

async def lets_go(client):
    print("requesting a")
    async with Lock(name="a", client=client):
        print("acquired a")
    print("released a")

    print("requesting b")
    with Lock(name="b", client=client):
        print("acquired b")
    print("released b")

async def main():
    async with Client(asynchronous=True) as client:
        await lets_go(client)

if __name__ == "__main__":
    asyncio.run(main())# Put your MCVE code here

Anything else we need to know?:

Environment:

fjetter commented 2 years ago

Thank you for your report @jamesstidard

Please be aware that distributed.Lock is only a sensible lock if used within a tasks execution. The below should work as expected

import asyncio

from dask.distributed import Client
from dask.distributed import Lock

def protected_task(lock):
    with lock:
        return "foo"

l = Lock()
async def main():
    async with Client(asynchronous=True) as client:
        await client.map(protected_task, l, range(100))
if __name__ == "__main__":
    asyncio.run(main())

If you want to lock in your own async code, I suggest using the stdlib asyncio lock

jamesstidard commented 2 years ago

Hi @fjetter,

I've been using distributed.Lock both inside and outside of task execution, to synchronise access to resources from code that executes outside a worker, as well as from within a worker. Is that not safe?

I had used the asyncio lock in the past, but need synchronisation between the client and the remote workers.

I have experienced what you say though, synchronous access in a task works as expected, it's just use of the synchronised context manager outside of the task that still behaves asynchronously.

fjetter commented 2 years ago

Is that not safe?

It is safe but maybe uncommon. This is not necessarily what it is designed for. However, as long as there is a client around it should still work.

Regarding the asynchronicity of the object, it is coupled to the state the Client is in. If you initialize a Client asynchronously, the lock will behave the same.

in your example, the synchronous with Lock(...) should not work because we'll never await the tasks.

Maybe a sensible behavior would be to raise in this case?

jamesstidard commented 2 years ago

The context is: the application is a web server with a dask backend to perform some more heavy computation. So for the most part requests are handled by the web server, which makes database entries and files, but dask is then used to perform some heavy computation on these files. So I'm using the Lock to synchronise access to those files.

You might have spotted, I'm the person who made the other Lock related issues. I've not been able to set up an environment to do tests on that code locally yet, but will soon.

The code in question is actually code that can currently be executed by the web server or the task, so I would have expected it to block on the asynchronous side, with a regular with. I think it probably requires some refactoring in my code anyway - as I wouldn't want that to be blocking. I just made the ticket as I felt it was maybe not expected behaviour (at least from my perspective).

Feel free to close this, if working as expected.