fsspec / filesystem_spec

A specification that python filesystems should adhere to.
BSD 3-Clause "New" or "Revised" License
988 stars 347 forks source link

Async http fs not working #709

Open dbernaciak opened 3 years ago

dbernaciak commented 3 years ago

With the latest changes the asynchronous http file system is throwing Loop not running when own loop provided.

This is due to call to loop being set to None which is then caught in line 42 of asyn.py (2021.7.0)

async def run(loop):
   fs = fsspec.filesystem("http", asynchronous=True, loop=loop)
   fs.cat(["https://httpbin.org/ip"])

loop=...
loop.run_until_complete(run(loop))
loop.close()
martindurant commented 3 years ago

If you are explicitly running asynchronously within a coroutine, you do not need to specify the loop, and (it seems) probably should not. Your function would be

async def run(loop):
   fs.fsspec.filesystem("http", asynchronous=True, loop=loop)
   await fs._cat(["https://httpbin.org/ip"])
dbernaciak commented 3 years ago

Thanks. Even without loop=loop but with explicit asynchronous=True the error is still there. Seems like the events looped is None by default and then is not fetched from the coroutine.

martindurant commented 3 years ago

Both of the following work for me with 2021.07.0. Perhaps you can be more specific about what you have done and the exception you see?

import fsspec
import asyncio

loop = asyncio.get_event_loop()
async def run():
   fs = fsspec.filesystem("http", asynchronous=True)
   await fs._cat(["https://httpbin.org/ip"])

loop.run_until_complete(run()) # OK

async def run(loop):
   fs = fsspec.filesystem("http", asynchronous=True, loop=loop)
   await fs._cat(["https://httpbin.org/ip"])

loop.run_until_complete(run(loop)) # OK
dbernaciak commented 3 years ago

You're right, typo on my side. This fixes the cat case.

In my real use case I'm trying to put the http filesystem into pyarrow.ParquetDataset to fetch a file via http get request.

martindurant commented 3 years ago

I'm pretty sure pyarrow will not support an asynchronous file system

dbernaciak commented 3 years ago

I got it working in fsspec 0.8.7. It was performing very well. Now it seems that their fs wrapper is calling abstract asynchronous filesystem on fsspec side without setting the loop which is causing the Loop not running error I mentioned in the first message.

Note that even in the examples you got running you need to call the private method if you call public method .cat (as pyarrow does) the error appears.

martindurant commented 3 years ago

It is not a private method, it is the asynchronous coroutine https://filesystem-spec.readthedocs.io/en/latest/async.html#using-from-async We tried using the same methods for both sync and async calling and found ti too much trouble to be worth it.

Now it seems that their fs wrapper is calling abstract asynchronous filesystem on fsspec side

I don't follow what you mean. It would be best to show the code you are calling and how you would like it to work. We can, of course, raise with them if there's something they could be doing better.

dbernaciak commented 3 years ago

I don't have a publicly hosted parquet file but the code is something like:

import pyarrow.parquet as pq 

async def run(loop):
   fs = fsspec.filesystem("http", asynchronous=True, loop=loop)
   df = pq.ParquetDataset(url, use_legacy_dataset=False, filesystem=fs).read_pandas(columns=[col1, col2]).to_pandas()
   return df

loop=...
df = loop.run_until_complete(run(loop))
loop.close()

I use pyarrow 4.0.1 The code goes into fs.py get_file_info() (line 229) to check whether the file is a parquet file. self.fs.info() is called and errors out due to the absence of loop.

Please let me know if there is anything else I could provide in terms of details.

martindurant commented 3 years ago

I'm pretty sure pyarrow will not support an asynchronous file system

I'll reiterate this. It will be true for any library that directly calls the synchronous API, you cannot run it with an async filesystem, sorry. As you know, async code can call blocking code, but not vice-versa. Either do your IO in pure-async (cat-> bytes, or copy to local/memory fs) and call arrow later, or use a sync filesystem on a different loop and thread.

dbernaciak commented 3 years ago

It might be that it is not actively supported. However, what I wanted to point out was that when using fsspec version 0.8.7 I was able to perform async http calls (one call per column, fully managed by pyarrow ParquetDataset) which provided a significant speed up.

I see in the code that significant changes were made in async.py since then and it looks like my async calls per column are not working any more.

martindurant commented 3 years ago

We did, for a while, implement nested async-in-async calling, but it caused a lot of problems for systems that used event loops internally, particularly Dask.

cc @jorisvandenbossche , might have some thoughts about supporting async IO calls within (py)arrow.

dbernaciak commented 3 years ago

Not aware of dask issues but my feedback on that is that it was a great feature because when doing column by column (range) requests the actual data that is being fetched is small in size and the main bottleneck is waiting for the get request to come back and unlock. Async-in-async was taking that lock off.

jorisvandenbossche commented 2 years ago

might have some thoughts about supporting async IO calls within (py)arrow.

Our C++ filesystems actually support some async operations as well (and which is used in the datasets code where possible). However, the Python interface doesn't expose this at the moment, and also the python bridge (that allows a C++ filesystem to wrap a python (eg fsspec) filesystem) doesn't support async. I suppose both could be done, if someone is interested in contributing that.