fsspec / s3fs

S3 Filesystem
http://s3fs.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
869 stars 271 forks source link

Runtime Error: Loop is not running #768

Open vedantroy opened 1 year ago

vedantroy commented 1 year ago

Versions:

fsspec                        2023.6.0
s3fs                          2023.6.0

I'm trying to use a basic method .get, inside of a fast API route. Like this:

@app.post("/run_job")
async def run_job(payload: JobPayload):
    metadata = payload.metadata
    job_id, job_type, creds = metadata.job_id, metadata.job_type, metadata.s3_creds

    job_dir = get_job_dir(job_id)
    job_dir.mkdir()

    import asyncio

    s3 = S3FileSystem(
        key=creds.access_key,
        secret=creds.secret_key,
        client_kwargs={"endpoint_url": creds.endpoint, "region_name": creds.region},
        asynchronous=True,
        loop=asyncio.get_event_loop(),
    )
    session = await s3.set_session()
    await s3._get(some_value.rstrip('/') + "/", local_path, recursive=True)

    await session.close()

This does not work, but using aiobotocore directly does. What am I doing wrong?

martindurant commented 1 year ago

If you are creating the filesystem instance within a couroutine and only calling async methods on it, there is no need any more to explicitly pass asynchronous=, loop= and even call set_session(). Everything should just work.

loop=asyncio.get_event_loop(),

here you create a new event loop, not the same one running the coroutine, hence the error. If you really wanted another event loop (which is allowed), you would need to run it in another thread. You probably wanted asyncio.get_running_loop().

martindurant commented 1 year ago

(edit, maybe get_event_loop should have worked, actually - but you shouldn't need this line at all)

abdallahashraf22 commented 5 days ago

Hello @martindurant , I've came across this error today when trying to use this code

@app.get("/file")
async def download_file():
    file_path = "some_file_path"
    fs = S3FileSystem(endpoint_url=DO_SPACES_ENDPOINT, key=DO_SPACES_KEY, secret=DO_SPACES_SECRET, asynchronous=True)
    session = await fs.set_session()
    try:
        return StreamingResponse(fs.open(file_path))
    finally:
        session.close()

tried to add the loop as

@app.get("/file")
async def download_file():
    file_path = "some_file_path"
    fs = S3FileSystem(endpoint_url=DO_SPACES_ENDPOINT, key=DO_SPACES_KEY, secret=DO_SPACES_SECRET, asynchronous=True, loop=asyncio.get_running_loop())
    session = await fs.set_session()
    try:
        return StreamingResponse(fs.open(file_path))
    finally:
        session.close()

still facing the same issue, so I came across this question where you suggested that the removal of those would still work as long as I call it from inside a co-routine

and the final code is

@app.get("/file")
async def download_file():
    file_path = "some_file_path"
    fs = S3FileSystem(endpoint_url=DO_SPACES_ENDPOINT, key=DO_SPACES_KEY, secret=DO_SPACES_SECRET)
    return StreamingResponse(fs.open(file_path))

my question here is ... how do I know if it was actually async? tried to print the type of fs.open(file_path) and got an s3fs.core.S3File, where I would regularly be expecting a coroutine

I also looked at fastapi's implementation of StreamingResponse the important part being (added the print statements myself)

        if isinstance(content, typing.AsyncIterable):
            print("AsyncIterable")
            self.body_iterator = content
        else:
            print("not AsyncIterable")
            self.body_iterator = iterate_in_threadpool(content)

and it goes as not AsyncIterable, so the content fetching is ran in threadpool, meaning not async

am I doing something wrong or is this a problem not expected from fastapi's part with how s3fs async functions are added through sync wrappers?

how do I actually know if fs.open(file_path) is running as an async call

martindurant commented 5 days ago

The File abstraction is fundamentally synchronous, and you probably don't want to use it at all inside asynchronous code. There is an open_async method you might want to use, but also _cat_file to just get the contents in one go (with optional range specified).

abdallahashraf22 commented 4 days ago

my use case is mostly gonna be allowing the download of files that could go up to 2GBs in size, so I'm looking for a way to asynchronously "do it" / stream it, my current way is getting chunk after chunk using an AsyncIterator, but it's 512kb chunk at a time, so the download looks buggy slow (not to mention the several calls to s3)

would there be anyway to do it better so using s3fs? (sorry if this might be an irrelevant question to the main topic now, if it happens that there's an answer I'll open a new question, answer it for future references, or add it to the docs)