Tinche / aiofiles

File support for asyncio
Apache License 2.0
2.67k stars 149 forks source link

async versions of shutil #61

Open graingert opened 5 years ago

graingert commented 5 years ago

shutil.copyfile and shutil.copyfileobj

thedrow commented 4 years ago

I came here to find out if this is already implemented. What does it require?

pwwang commented 3 years ago

Does this work?

import asyncio
from functools import partial, wraps
import shutil

def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)

    return run

copyfile = wrap(shutil.copyfile)
copyfileobj = wrap(shutil.copyfileobj)
async def main():
  await copyfile('a', 'b')

asyncio.run(main())
graingert commented 3 years ago

https://docs.python.org/3.9/library/asyncio-task.html#asyncio.to_thread

Of course there's

await asyncio.to_thread(shutil.copyfile, "a", "b")
MatthewScholefield commented 3 years ago

@graingert Right, but this would spawn a new thread for each call, right? (So if you are copying a lot of small files it would be inefficient)

@pwwang From what I understand, if you ran this hundreds of times would it only create some fixed number of threads in the threadpool the executor creates? If so, this sounds like the correct solution.

graingert commented 3 years ago

asyncio.to_thread uses the default executor which is a bounded pool of worker threads by default

xyloguy commented 3 years ago

If you are using a version of python earlier than 3.9 (which I was) you can use the aiofiles.os.wrap, the implementation is identical to what @pwwang mention in their comment. Otherwise I would agree with using asyncio.to_thread as @graingert suggested.

import shutil

from aiofiles.os import wrap

copyfile = wrap(shutil.copyfile)
copyfileobj = wrap(shutil.copyfileobj)

then they can be used as coroutines

await copyfile(src, dst)
SyntaxColoring commented 1 year ago

I don't think the implementations above (based on loop.run_in_executor() and asyncio.to_thread()) will promptly handle ^C interruptions.

For example, suppose you accidentally shutil.copyfile or shutil.rmtree the wrong path. You'd expect to be able to interrupt it midway through with ^C. But the shutil function is running in its own worker thread, which your main thread has no way of cancelling. If you spam ^C multiple times, you can probably get the process to exit faster, but the stack trace will show an inelegant interruption of asyncio internals, and I don't think resource cleanup will be orderly.

This is a problem for any function that you run in loop.run_in_executor()/asyncio.to_thread(), but it might be especially surprising here because we usually expect async I/O to be cancellable.

Tinche commented 1 year ago

I think you're correct, but that's an inherent limitation of the approach we're using. Any suggestions?

graingert commented 1 year ago

It seems like you'd need a rewrite of the shutil functions designed to support explicit cancellation, using a cancel token or similar flag that's checked every time it copies a chunk or iterates to a new file

fgoudreault commented 1 year ago

already exists here I believe https://pypi.org/project/aioshutil/

davidfstr commented 1 year ago

https://pypi.org/project/aioshutil/

aioshutil v1.3 - the latest version at the time of writing - implements most functions, including copyfileobj and copyfile, using loop.run_in_executor() meaning it still just runs the original shutil functions inside a thread pool rather than providing a true async implementation.

davidfstr commented 1 year ago

Here's my own async implementation of shutil.copyfileobj():

_DEFAULT_CHUNK_SIZE = 32768  # bytes; arbitrary

async def aioshutil_copyfileobj(async_fsrc, async_fdst, *, chunksize: int=_DEFAULT_CHUNK_SIZE) -> None:
    while (chunk := await async_fsrc.read(chunksize)) != b'':
        await async_fdst.write(chunk)