fsspec / gcsfs

Pythonic file-system interface for Google Cloud Storage
http://gcsfs.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
320 stars 141 forks source link

Invalid multipart request on retry #621

Closed Metamess closed 1 month ago

Metamess commented 1 month ago

NOTE: This issue is eerily similar to #290 , which was fixed at the time by @nbren12 in #380 . I have the creeping suspicion that the same issue might have arisen again, though I have been unable to suss it out just yet.

Problem description

Every now and then, a request stemming from simple_upload() will fail on its first attempt, for some retriable reason, but then the retry will receive a non-retriable HTTP 400 error: Invalid multipart request with 0 mime parts.. As the error message suggests, it appears that the Content-Type header (as required by the docs) is missing (or empty?) on the retry, though it was evidently present on the initial attempt. EDIT: It seems I have misinterpreted the error message, and the issue is instead that the data stream is empty on the retry. I will keep remainder of the original post as-is for historical accuracy.

Investigation

In my use case, the call originates from using the xarray.Dataset.to_zarr() function from xarray (which, interestingly, seems to be the use case for more people reporting similar issues. I suspect because the hard failure on the retry causes the upload to fail halfway though, leading to a corrupted Zarr store; but I digress). I have included the Traceback below, starting from the call to to_zarr for the sake of completion. The more relevant bit starts when we enter fsspec code with a call to fsspec.mapping.setitems() which takes a dict of {path_as_str: data_as_bytes} as only argument; This means it's a fairly basic call without potentially weird kwargs bleeding through and impacting the behaviour of fsspec/gcsfs, and so we can pretty much disregard anything related to xarray/zarr as a root cause.

The most relevant code starts when we enter gcsfs with a call to _pipe_file() with only a path and data parameter, meaning the rest of the arguments are the defaults (most notably: content_type="application/octet-stream"). This leads to a call to simple_upload(), where the core of the request is created. The content_type parameter is used to generate a part of the body of the POST request, while the actual header of the call is set to headers={"Content-Type": 'multipart/related; boundary="==0=="'}.

This leads to a call of _request() (via _call), where we see the first attempt failed (for some unknown but retriable reason), and the retry_request decorator from gcsfs/retry.py supposedly performs the exact same _request call again, however this time it fails with the non-retriable gcsfs.retry.HttpError: Invalid multipart request with 0 mime parts., 400, suggesting that the retried _request did not have the same headers. However, I have failed to spot where this change might occur.

Traceback:

Traceback (most recent call last):
(...)
  File "/usr/local/lib/python3.10/site-packages/xarray/backends/api.py", line 1832, in to_zarr
    dump_to_store(dataset, zstore, writer, encoding=encoding)
  File "/usr/local/lib/python3.10/site-packages/xarray/backends/api.py", line 1362, in dump_to_store
    store.store(variables, attrs, check_encoding, writer, unlimited_dims=unlimited_dims)
  File "/usr/local/lib/python3.10/site-packages/xarray/backends/zarr.py", line 657, in store
    self.set_variables(
  File "/usr/local/lib/python3.10/site-packages/xarray/backends/zarr.py", line 779, in set_variables
    writer.add(v.data, zarr_array, region)
  File "/usr/local/lib/python3.10/site-packages/xarray/backends/common.py", line 241, in add
    target[region] = source
  File "/usr/local/lib/python3.10/site-packages/zarr/core.py", line 1495, in __setitem__
    self.set_orthogonal_selection(pure_selection, value, fields=fields)
  File "/usr/local/lib/python3.10/site-packages/zarr/core.py", line 1684, in set_orthogonal_selection
    self._set_selection(indexer, value, fields=fields)
  File "/usr/local/lib/python3.10/site-packages/zarr/core.py", line 2058, in _set_selection
    self._chunk_setitems(lchunk_coords, lchunk_selection, chunk_values, fields=fields)
  File "/usr/local/lib/python3.10/site-packages/zarr/core.py", line 2261, in _chunk_setitems
    self.chunk_store.setitems(to_store)
  File "/usr/local/lib/python3.10/site-packages/zarr/storage.py", line 1441, in setitems
    self.map.setitems(values)
  File "/usr/local/lib/python3.10/site-packages/fsspec/mapping.py", line 124, in setitems
    self.fs.pipe(values)
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 118, in wrapper
    return sync(self.loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 103, in sync
    raise return_result
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 56, in _runner
    result[0] = await coro
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 399, in _pipe
    return await _run_coros_in_chunks(
  File "/usr/local/lib/python3.10/site-packages/fsspec/asyn.py", line 254, in _run_coros_in_chunks
    await asyncio.gather(*chunk, return_exceptions=return_exceptions),
  File "/usr/local/lib/python3.10/asyncio/tasks.py", line 408, in wait_for
    return await fut
  File "/usr/local/lib/python3.10/site-packages/gcsfs/core.py", line 1268, in _pipe_file
    location = await simple_upload(
  File "/usr/local/lib/python3.10/site-packages/gcsfs/core.py", line 1954, in simple_upload
    j = await fs._call(
  File "/usr/local/lib/python3.10/site-packages/gcsfs/core.py", line 437, in _call
    status, headers, info, contents = await self._request(
  File "/usr/local/lib/python3.10/site-packages/decorator.py", line 221, in fun
    return await caller(func, *(extras + args), **kw)
  File "/usr/local/lib/python3.10/site-packages/gcsfs/retry.py", line 158, in retry_request
    raise e
  File "/usr/local/lib/python3.10/site-packages/gcsfs/retry.py", line 123, in retry_request
    return await func(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/gcsfs/core.py", line 430, in _request
    validate_response(status, contents, path, args)
  File "/usr/local/lib/python3.10/site-packages/gcsfs/retry.py", line 112, in validate_response
    raise HttpError({"code": status, "message": msg})  # text-like
gcsfs.retry.HttpError: Invalid multipart request with 0 mime parts., 400

Environment

martindurant commented 1 month ago

Actually, the exception sounds to me as if the header went in fine, but the content was missing. Maybe this means, that the data (which is a in-memory file-like object for simple_upload) needs a seek(0) before retrying. I can see this happening if in the first call, all the data was sent, but the request failed to complete after this (as opposed to an error setting up the connection, which is probably more common).

martindurant commented 1 month ago
--- a/gcsfs/core.py
+++ b/gcsfs/core.py
@@ -421,6 +421,8 @@ class GCSFileSystem(asyn.AsyncFileSystem):
         self, method, path, *args, headers=None, json=None, data=None, **kwargs
     ):
         await self._set_session()
+        if hasattr(data, "seek"):
+            data.seek(0)
         async with self.session.request(
             method=method,
Metamess commented 1 month ago

Welp, now I feel stupid for misinterpreting the error message 🤦 Thanks for the reply @martindurant , and the suggested code change!

I'm a bit at the edge of my knowledge and understanding here, so I apologize if this doesn't make sense, but I am left wondering: What would cause a request to fail if the data was all sent, and thus the request has effectively finished? Perhaps there is some other root cause that should be fixed to prevent this situation from occurring in the first place? (Sadly I don't have any debug logs from the gcsfs library itself when I encountered these errors, and I have been unable to reproduce them on demand ☹️) Furthermore, could we be introducing a problem if we resend the data, if it was already fully received (and stored)?

Lastly, I also noticed that simple_upload wraps the data into an UncloseableBytesIO instance:

class UnclosableBytesIO(io.BytesIO):
    """Prevent closing BytesIO to avoid errors during retries."""

    def close(self):
        """Reset stream position for next retry."""
        self.seek(0)

Which seems to suggest to me that the seek(0) on data should already be called on the data when a retry occurs. So either the close() is not called while this is assumed to happen, or the seek(0) is not the solution for this issue (or, of course, I am missing something else here)

martindurant commented 1 month ago

I cannot say why the situation occurs, but it doesn't surprise me that something can happen even after the data is sent, but before a success response comes back. Without the response, we can assume that the data isn't stored.

The unclosable thing was created for the case where the initial connection fails. The asyncio request function closes the input file-like anyway in this case, but we still want to read from it in the retry. seek(0) seems like a reasonable thing to do in any case. Having to pass a file-like in a the first place is strange (to me): apparently it makes for a more responsive event loop as asyncio can send the data in chunks.

Metamess commented 1 month ago
--- a/gcsfs/core.py
+++ b/gcsfs/core.py
@@ -421,6 +421,8 @@ class GCSFileSystem(asyn.AsyncFileSystem):
         self, method, path, *args, headers=None, json=None, data=None, **kwargs
     ):
         await self._set_session()
+        if hasattr(data, "seek"):
+            data.seek(0)
         async with self.session.request(
             method=method,

If this is indeed all that's required to fix this issue, do you want to make a PR for this @martindurant , or would you prefer if I tried to do so?

martindurant commented 1 month ago

Please do make a PR