intake / intake-parquet

Parquet plugin for Intake
https://intake-parquet.readthedocs.io/en/latest/?badge=latest
BSD 2-Clause "Simplified" License
12 stars 14 forks source link

Support for new fsspec-style caching #18

Open ian-r-rose opened 4 years ago

ian-r-rose commented 4 years ago

As far as I can tell, this driver doesn't support fsspec-style caching at the moment:

import intake_parquet

path = "https://github.com/apache/parquet-testing/raw/master/data/alltypes_plain.parquet"
intake_parquet.ParquetSource(urlpath=path, engine="pyarrow").read()  # works
intake_parquet.ParquetSource(urlpath="simplecache::"+path, engine="pyarrow").read()  # fails

Unsure if this is more appropriate to be fixed at the dask layer or here. Is there a roadmap for rolling out fsspec caching to various drivers?

martindurant commented 4 years ago

Huff, OK. I'm not entirely sure why, but the latter is doing an extra isdir somewhere, and regex can't find any links (which is reasonable). The following works, but I wonder what would happen if there was by chance a link in the binary data.

--- a/fsspec/implementations/http.py
+++ b/fsspec/implementations/http.py
@@ -12,8 +12,8 @@ from fsspec.asyn import sync_wrapper, sync, AsyncFileSystem, maybe_sync
 from ..caching import AllBytes

 # https://stackoverflow.com/a/15926317/3821154
-ex = re.compile(r"""<a\s+(?:[^>]*?\s+)?href=(["'])(.*?)\1""")
-ex2 = re.compile(r"""(http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""")
+ex = re.compile(b"""<a\\s+(?:[^>]*?\s+)?href=(["'])(.*?)\1""")
+ex2 = re.compile(b"""(http[s]?://[-a-zA-Z0-9@:%_+.~#?&/=]+)""")

 async def get_client():
@@ -98,7 +98,7 @@ class HTTPFileSystem(AsyncFileSystem):
         kw.update(kwargs)
         async with self.session.get(url, **self.kwargs) as r:
             r.raise_for_status()
-            text = await r.text()
+            text = await r.read()
         if self.simple_links:
             links = ex2.findall(text) + ex.findall(text)
         else:
@@ -108,6 +108,7 @@ class HTTPFileSystem(AsyncFileSystem):
         for l in links:
             if isinstance(l, tuple):
                 l = l[1]
+            l = l.decode()
             if l.startswith("/") and len(l) > 1:
                 # absolute URL on this server
                 l = parts.scheme + "://" + parts.netloc + l
martindurant commented 4 years ago

(PS: the parquet driver should not really be calling isdir, but isfile - and in HTTP, and sometimes s3/gcs, the same path can be both)

zaneselvans commented 2 years ago

Lest anybody else be confused by this issue, fsspec based simplecache caching does seem to work now, and can be enabled by passing the right storage_option to dask.dataframe.read_parquet(). See @martindurant's comment on an issue of ours over here: https://github.com/catalyst-cooperative/pudl/issues/1496#issuecomment-1085033950