fsspec / filesystem_spec

A specification that python filesystems should adhere to.
BSD 3-Clause "New" or "Revised" License
994 stars 350 forks source link

Pickle writing fails with `simplecache::` using xrootd paths (maybe with other remote paths as well). #1671

Open ikrommyd opened 1 week ago

ikrommyd commented 1 week ago

Something like this to reproduce:

import pickle
import fsspec

with fsspec.open("simplecache::root://cmseos.fnal.gov//store/user/ikrommyd/dummy/dummy.pkl", "wb") as f:
    d = {"1": 1, "2": 2}
    pickle.dump(d, f)

fails with:

---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
----> 1 with fsspec.open("simplecache::root://cmseos.fnal.gov//store/user/ikrommyd/dummy/dummy.pkl", "wb") as f:
      2     d = {"1": 1, "2": 2}
      3     pickle.dump(d, f)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/core.py:134, in OpenFile.__exit__(self, *args)
    133 def __exit__(self, *args):
--> 134     self.close()

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/core.py:154, in OpenFile.close(self)
    152     if "r" not in self.mode and not f.closed:
    153         f.flush()
--> 154     f.close()
    155 self.fobjects.clear()

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/implementations/cached.py:911, in LocalTempFile.close(self)
    909 self.closed = True
    910 if self.autocommit:
--> 911     self.commit()

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/implementations/cached.py:918, in LocalTempFile.commit(self)
    917 def commit(self):
--> 918     self.fs.put(self.fn, self.path, **self.kwargs)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:118, in sync_wrapper.<locals>.wrapper(*args, **kwargs)
    115 @functools.wraps(func)
    116 def wrapper(*args, **kwargs):
    117     self = obj or args[0]
--> 118     return sync(self.loop, func, *args, **kwargs)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:103, in sync(loop, func, timeout, *args, **kwargs)
    101     raise FSTimeoutError from return_result
    102 elif isinstance(return_result, BaseException):
--> 103     raise return_result
    104 else:
    105     return return_result

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:56, in _runner(event, coro, result, timeout)
     54     coro = asyncio.wait_for(coro, timeout=timeout)
     55 try:
---> 56     result[0] = await coro
     57 except Exception as ex:
     58     result[0] = ex

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:589, in AsyncFileSystem._put(self, lpath, rpath, recursive, callback, batch_size, maxdepth, **kwargs)
    586     put_file = callback.branch_coro(self._put_file)
    587     coros.append(put_file(lfile, rfile, **kwargs))
--> 589 return await _run_coros_in_chunks(
    590     coros, batch_size=batch_size, callback=callback
    591 )

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:268, in _run_coros_in_chunks(coros, batch_size, callback, timeout, return_exceptions, nofiles)
    266     done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
    267     while done:
--> 268         result, k = await done.pop()
    269         results[k] = result
    271 return results

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:245, in _run_coros_in_chunks.<locals>._run_coro(coro, i)
    243 async def _run_coro(coro, i):
    244     try:
--> 245         return await asyncio.wait_for(coro, timeout=timeout), i
    246     except Exception as e:
    247         if not return_exceptions:

File ~/miniforge3/envs/egamma_dev/lib/python3.10/asyncio/tasks.py:408, in wait_for(fut, timeout)
    405 loop = events.get_running_loop()
    407 if timeout is None:
--> 408     return await fut
    410 if timeout <= 0:
    411     fut = ensure_future(fut, loop=loop)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/callbacks.py:81, in Callback.branch_coro.<locals>.func(path1, path2, **kwargs)
     78 @wraps(fn)
     79 async def func(path1, path2: str, **kwargs):
     80     with self.branched(path1, path2, **kwargs) as child:
---> 81         return await fn(path1, path2, callback=child, **kwargs)

File ~/miniforge3/envs/egamma_dev/lib/python3.10/site-packages/fsspec/asyn.py:517, in AsyncFileSystem._put_file(self, lpath, rpath, **kwargs)
    516 async def _put_file(self, lpath, rpath, **kwargs):
--> 517     raise NotImplementedError

NotImplementedError:

I haven't tried with other types of remote paths. Perhaps there's a more general problem that isn't only xrootd+simplecache.

martindurant commented 5 days ago

I think I commented on this elsewhere, but let me copy here for the record.

It seems that xrootd doesn't implement _put_file. It would maybe be reasonable to implement this upstream in a simplistic way for backends that don't have their own, but looking in put_file (in AbstractFileSystem) shows that there ought to be more going on, e.g., for callbacks.

--- a/fsspec/asyn.py
+++ b/fsspec/asyn.py
@@ -514,7 +514,8 @@ class AsyncFileSystem(AbstractFileSystem):
         )

     async def _put_file(self, lpath, rpath, **kwargs):
-        raise NotImplementedError
+        data = open(lpath, "rb").read()
+        await self._pipe_file(rpath, data, **kwargs)

     async def _put(
         self,
@@ -591,7 +592,10 @@ class AsyncFileSystem(AbstractFileSystem):
         )

     async def _get_file(self, rpath, lpath, **kwargs):
-        raise NotImplementedError
+        data = await self._cat_file(rpath, **kwargs)
+        with open(lpath, "wb") as f:
+            f.write(data)