fsspec / gcsfs

Pythonic file-system interface for Google Cloud Storage
http://gcsfs.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
320 stars 141 forks source link

asyncio exception while writing to zarr store #604

Closed bweasels closed 5 months ago

bweasels commented 5 months ago

Hi all - feel free to let me know if you feel that this may actually be a zarr issue, but I'm having a very frustrating bug while uploading zarr files to a google cloud bucket with gcsfs.

Context We have many terabytes of microscopy imaging data (on the order of 500 fov x 14 cycles) on a storage server that we need to upload to a google cloud bucket for analysis with our VM. We have lab members who need to upload data from their own experiments, and our workstation is generally reserved for data acquisition. This unfortunately means that ideally lab members would run the upload from their own machines, so the experiments are too large to load entirely into memory or to save to a disk prior to upload. As a result, I do the following:

  1. Create a fs.mapper of the destination zarr store on the google cloud bucket
  2. Declare a zarr store using the mapper with the finalized upload size and chunk sizes that correspond to each FOV
  3. Load batches of 10 FOVs from the data server, process and write all 10 as individual chunks to the zarr store via indexing.

Problem After anywhere from 2 - 1000 successful chunk writes to the server, the jupyter notebook becomes unresponsive, uses a ton of CPU, and can only be resolved by restarting the kernel. Occasionally it will throw out the following error message (repeated 1000s+ times), and other times it will silently fail.

Exception in callback _SelectorSocketTransport._write_send()
handle: <Handle _SelectorSocketTransport._write_send()>
Traceback (most recent call last):
  File "C:\Users\benku\anaconda3\envs\uploader\Lib\asyncio\events.py", line 84, in _run
    self._context.run(self._callback, *self._args)
  File "C:\Users\benku\anaconda3\envs\uploader\Lib\asyncio\selector_events.py", line 1133, in _write_send
    assert self._buffer, 'Data should not be empty'
AssertionError: Data should not be empty

Troubleshooting I've done the following:

On file open:

2024-01-29 23:18:55,491 - gcsfs.credentials - DEBUG - connect -- Connected with method google_default
2024-01-29 23:18:55,492 - gcsfs - DEBUG - _call -- GET: b/<bucketname>, (), None
2024-01-29 23:18:55,494 - gcsfs.credentials - DEBUG - maybe_refresh -- GCS refresh
2024-01-29 23:18:55,874 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr'), None
2024-01-29 23:18:55,913 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('<bucketname>',), None
2024-01-29 23:18:55,949 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('<bucketname>',), None
2024-01-29 23:18:55,981 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/batch/storage/v1, (), {'Content-Type': 'multipart/mixed; boundary="===============7330845974216740156=="'}
2024-01-29 23:18:55,982 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/batch/storage/v1, (), {'Content-Type': 'multipart/mixed; boundary="===============7330845974216740156=="'}
2024-01-29 23:18:55,983 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/batch/storage/v1, (), {'Content-Type': 'multipart/mixed; boundary="===============7330845974216740156=="'}
2024-01-29 23:18:57,358 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr'), None
2024-01-29 23:18:57,358 - gcsfs.credentials - DEBUG - maybe_refresh -- GCS refresh
2024-01-29 23:18:57,720 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('<bucketname>',), None
2024-01-29 23:18:57,751 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr'), None
2024-01-29 23:18:57,780 - gcsfs - DEBUG - _call -- GET: b/<bucketname>, (), None
2024-01-29 23:18:57,810 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr'), None
2024-01-29 23:18:57,837 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('<bucketname>',), None
2024-01-29 23:18:57,865 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr'), None
2024-01-29 23:18:57,893 - gcsfs - DEBUG - _call -- GET: b/<bucketname>, (), None
2024-01-29 23:18:57,917 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr/'), None
2024-01-29 23:18:57,945 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('<bucketname>',), None
2024-01-29 23:18:57,973 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr'), None
2024-01-29 23:18:58,007 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr/.zarray'), None
2024-01-29 23:18:58,042 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('<bucketname>',), None
2024-01-29 23:18:58,071 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr/.zarray'), None
2024-01-29 23:18:58,099 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:18:58,233 - gcsfs - DEBUG - _call -- GET: https://storage.googleapis.com/download/storage/v1/b/<bucketname>/o/<filename>.zarr%2F.zarray?alt=media, (), {}

For every batch of 10 chunks written it repeats the following:

2024-01-29 23:19:17,459 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr'), None
2024-01-29 23:19:17,513 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('<bucketname>',), None
2024-01-29 23:19:17,542 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr/.zarray'), None
2024-01-29 23:19:17,572 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr/.zarray'), None
2024-01-29 23:19:17,598 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr/.zarray'), None
2024-01-29 23:19:17,625 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucketname>', '<filename>.zarr/.zarray'), None
2024-01-29 23:19:17,654 - gcsfs - DEBUG - _call -- GET: https://storage.googleapis.com/download/storage/v1/b/<bucketname>/o/<filename>.zarr%2F.zarray?alt=media, (), {}
2024-01-29 23:19:17,710 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:19:17,712 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:19:17,713 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:19:17,714 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:19:17,714 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:19:17,715 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:19:17,717 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:19:17,717 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:19:17,718 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:19:17,719 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucketname>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}

And just prior to failure it writes out this (seems to be same as all other successful chunk writes)

2024-01-29 23:20:37,133 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucket name>', '<filename>.zarr'), None
2024-01-29 23:20:37,181 - gcsfs - DEBUG - _call -- GET: b/{}/o, ('<bucket name>',), None
2024-01-29 23:20:37,216 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucket name>', '<filename>.zarr/.zarray'), None
2024-01-29 23:20:37,240 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucket name>', '<filename>.zarr/.zarray'), None
2024-01-29 23:20:37,273 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucket name>', '<filename>.zarr/.zarray'), None
2024-01-29 23:20:37,300 - gcsfs - DEBUG - _call -- GET: b/{}/o/{}, ('<bucket name>', '<filename>.zarr/.zarray'), None
2024-01-29 23:20:37,327 - gcsfs - DEBUG - _call -- GET: https://storage.googleapis.com/download/storage/v1/b/<bucket name>/o/<filename>.zarr%2F.zarray?alt=media, (), {}
2024-01-29 23:20:37,383 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucket name>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:20:37,384 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucket name>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:20:37,385 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucket name>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:20:37,386 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucket name>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:20:37,387 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucket name>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:20:37,388 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucket name>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:20:37,389 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucket name>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:20:37,390 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucket name>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:20:37,390 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucket name>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}
2024-01-29 23:20:37,391 - gcsfs - DEBUG - _call -- POST: https://storage.googleapis.com/upload/storage/v1/b/<bucket name>/o, (), {'Content-Type': 'multipart/related; boundary="==0=="'}

For reference, the <bucket name> and <filename> are the actual bucket & filename in the error message. Please let me know if you need anymore information, feel that this is not a gcsfs related issue, or see anything glaringly wrong in how I'm handling the upload. Thanks in advance!

martindurant commented 5 months ago

There are an awful lot of GET calls in there that, for a whole-array-write operation are totally unnecessary. Even if the array already exists, a single read of all the metadata pieces should suffice. I wonder where we can provide a better directory listing caching experience around this. There is, separately, talk of using a transactional in-memory cache specifically for zarr metadata files (upload when finished) that would help a lot too. It is already possible to provide separate metadata and data storage backends in zarr.

I mention this because, while I don't know what the specific problem is, I can only but assume that the total number of requests/coroutines is implicated by something from deep within asyncio.

One lever you could pull on is the fsspec config setting conf["nofiles_gather_batch_size"] (default given by fsspec.asyn._NOFILES_DEFAULT_BATCH_SIZE=1280) to a smaller value.

If there are really requests being made with zero data, we should be able to find out where that's happening and continue on. Perhaps there is a race condition where all the data of a chunk is sent successfully, but the sending function subsequently errors. This would be in gcsfs.core.simple_upload.

martindurant commented 5 months ago

every batch of 10 chunks

Is this the number of zarr chunks in a dask partition, or where else does this number come from?

bweasels commented 5 months ago

Thanks for the fast reply! This number is the number of images you want to hold in memory before writing them to the bucket, so user defined really.

WRT the large number of GET for every batch upload - while trying to debug the race condition, I tried reconnecting to the zarr store on the bucket every time I wrote a set of 10 images to try to see if the error was related to the connection going stale (idk - I'm a scientist, not a networking guy). Removing that re-connection call removes the stack of GET calls.

Thanks for the pointer on gcsfs.core.simple_upload - I'll see if I can explore it to do some debugging for my weird case. If it'll help, I'll try to make a minimal reproducible example this weekend.

bweasels commented 5 months ago

I was able to manually trace it back to _request on line 412 in gcsfs.core. It seems like the: async with self.session.request( ... ) as r: command on line 416 may be where it fails prior to going into the race condition. The data object going into that command prior to failure is not empty, (<gcsfs.core.UnclosableBytesIO object at 0x0000021B6ACB7BF0> with a non-zero size from getvalue), so I'm guessing its something in self.session.request? That said, it seems like self.session.request comes from another package (aiohttp?), so I ran out of steam and stopped pursuing it. Given that this is the first you're seeing of this, this may be specific to my situation, but maybe this issue thread can help if someone else has this issue. I chatted with the lab and we'll pursue a different, slower uploading schema to get around this. Thanks again for your help!

martindurant commented 5 months ago

I hope you are right, but good to provide this information for others anyway