fsspec / gcsfs

Pythonic file-system interface for Google Cloud Storage
http://gcsfs.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
334 stars 143 forks source link

Max retries exceeded with url: /o/oauth2/token #91

Open rabernat opened 6 years ago

rabernat commented 6 years ago

I am trying to push a very large dataset to gcs via the xarray / zarr / gcsfs / dask stack. I have encountered a new error at the gcsfs level.

Here's a summary of what I am doing

# manually created an xarray dataset called `ds` by concat-ing together many individual ones
token = 'cache'
fs = gcsfs.GCSFileSystem(project='pangeo-181919', token=token)
gcsmap = gcsfs.mapping.GCSMap(bucket + path, gcs=fs, check=True, create=False)
delayed_store = ds.to_zarr(store=gcsmap, encoding=encoding, compute=False)
persist_store = delayed_store.persist(retries=10)

I'm doing this via a distributed client connected to a local multithreaded cluster.

There are almost a million tasks in the graph. It will generally get about 5% in and then hit some sort of intermittent, non-reproducible error.

This is the error I have now.

distributed.utils - ERROR - None: Max retries exceeded with url: /o/oauth2/token (Caused by None)
Traceback (most recent call last):
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py", line 237, in f
    result[0] = yield make_coro()
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py", line 1055, in run
    value = future.result()
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 4, in raise_exc_info
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py", line 1356, in _gather
    traceback)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py", line 692, in reraise
    raise value.with_traceback(tb)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/protocol/pickle.py", line 59, in loads
    return pickle.loads(x)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/zarr/core.py", line 1930, in __setstate__
    self.__init__(*state)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/zarr/core.py", line 123, in __init__
    self._load_metadata()
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/zarr/core.py", line 140, in _load_metadata
    self._load_metadata_nosync()
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/zarr/core.py", line 149, in _load_metadata_nosync
    meta_bytes = self._store[mkey]
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/mapping.py", line 67, in __getitem__
    with self.gcs.open(key, 'rb') as f:
  File "<decorator-gen-28>", line 2, in open
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 944, in open
    metadata=metadata)
  File "<decorator-gen-31>", line 2, in __init__
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 1068, in __init__
    self.details = gcsfs.info(path)
  File "<decorator-gen-17>", line 2, in info
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 802, in info
    return self._get_object(path)
  File "<decorator-gen-3>", line 2, in _get_object
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 492, in _get_object
    bucket, key))
  File "<decorator-gen-2>", line 2, in _call
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 51, in _tracemethod
    return f(self, *args, **kwargs)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 441, in _call
    raise e
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py", line 431, in _call
    r = meth(self.base + path, params=kwargs, json=json)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/requests/sessions.py", line 521, in get
    return self.request('GET', url, **kwargs)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/auth/transport/requests.py", line 198, in request
    self._auth_request, method, url, request_headers)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/auth/credentials.py", line 121, in before_request
    self.refresh(request)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/oauth2/credentials.py", line 126, in refresh
    self._client_secret))
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/oauth2/_client.py", line 237, in refresh_grant
    response_data = _token_endpoint_request(request, token_uri, body)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/oauth2/_client.py", line 106, in _token_endpoint_request
    method='POST', url=token_uri, headers=headers, body=body)
  File "/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/auth/transport/requests.py", line 124, in __call__
    six.raise_from(new_exc, caught_exc)
  File "<string>", line 3, in raise_from
google.auth.exceptions.TransportError: None: Max retries exceeded with url: /o/oauth2/token (Caused by None)
---------------------------------------------------------------------------
TransportError                            Traceback (most recent call last)
<ipython-input-25-5fcf41d2c0b1> in <module>()
----> 1 persist_store.compute()

/home/rpa/dask/dask/base.py in compute(self, **kwargs)
    153         dask.base.compute
    154         """
--> 155         (result,) = compute(self, traverse=False, **kwargs)
    156         return result
    157 

/home/rpa/dask/dask/base.py in compute(*args, **kwargs)
    402     postcomputes = [a.__dask_postcompute__() if is_dask_collection(a)
    403                     else (None, a) for a in args]
--> 404     results = get(dsk, keys, **kwargs)
    405     results_iter = iter(results)
    406     return tuple(a if f is None else f(next(results_iter), *a)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, **kwargs)
   2039                 secede()
   2040             try:
-> 2041                 results = self.gather(packed, asynchronous=asynchronous)
   2042             finally:
   2043                 for f in futures.values():

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in gather(self, futures, errors, maxsize, direct, asynchronous)
   1476             return self.sync(self._gather, futures, errors=errors,
   1477                              direct=direct, local_worker=local_worker,
-> 1478                              asynchronous=asynchronous)
   1479 
   1480     @gen.coroutine

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in sync(self, func, *args, **kwargs)
    601             return future
    602         else:
--> 603             return sync(self.loop, func, *args, **kwargs)
    604 
    605     def __repr__(self):

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py in sync(loop, func, *args, **kwargs)
    251             e.wait(10)
    252     if error[0]:
--> 253         six.reraise(*error[0])
    254     else:
    255         return result[0]

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    691             if value.__traceback__ is not tb:
    692                 raise value.with_traceback(tb)
--> 693             raise value
    694         finally:
    695             value = None

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/utils.py in f()
    235             yield gen.moment
    236             thread_state.asynchronous = True
--> 237             result[0] = yield make_coro()
    238         except Exception as exc:
    239             logger.exception(exc)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1053 
   1054                     try:
-> 1055                         value = future.result()
   1056                     except Exception:
   1057                         self.had_exception = True

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/concurrent.py in result(self, timeout)
    236         if self._exc_info is not None:
    237             try:
--> 238                 raise_exc_info(self._exc_info)
    239             finally:
    240                 self = None

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/util.py in raise_exc_info(exc_info)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/tornado/gen.py in run(self)
   1061                     if exc_info is not None:
   1062                         try:
-> 1063                             yielded = self.gen.throw(*exc_info)
   1064                         finally:
   1065                             # Break up a reference to itself

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1354                             six.reraise(type(exception),
   1355                                         exception,
-> 1356                                         traceback)
   1357                     if errors == 'skip':
   1358                         bad_keys.add(key)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py in reraise(tp, value, tb)
    690                 value = tp()
    691             if value.__traceback__ is not tb:
--> 692                 raise value.with_traceback(tb)
    693             raise value
    694         finally:

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/distributed/protocol/pickle.py in loads()
     57 def loads(x):
     58     try:
---> 59         return pickle.loads(x)
     60     except Exception:
     61         logger.info("Failed to deserialize %s", x[:10000], exc_info=True)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/zarr/core.py in __setstate__()
   1928 
   1929     def __setstate__(self, state):
-> 1930         self.__init__(*state)
   1931 
   1932     def _synchronized_op(self, f, *args, **kwargs):

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/zarr/core.py in __init__()
    121 
    122         # initialize metadata
--> 123         self._load_metadata()
    124 
    125         # initialize attributes

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/zarr/core.py in _load_metadata()
    138         """(Re)load metadata from store."""
    139         if self._synchronizer is None:
--> 140             self._load_metadata_nosync()
    141         else:
    142             mkey = self._key_prefix + array_meta_key

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/zarr/core.py in _load_metadata_nosync()
    147         try:
    148             mkey = self._key_prefix + array_meta_key
--> 149             meta_bytes = self._store[mkey]
    150         except KeyError:
    151             err_array_not_found(self._path)

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/mapping.py in __getitem__()
     65         key = self._key_to_str(key)
     66         try:
---> 67             with self.gcs.open(key, 'rb') as f:
     68                 result = f.read()
     69         except (IOError, OSError):

<decorator-gen-28> in open()

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in open()
    942         if 'b' in mode:
    943             return GCSFile(self, path, mode, block_size, consistency=const,
--> 944                            metadata=metadata)
    945         else:
    946             mode = mode.replace('t', '') + 'b'

<decorator-gen-31> in __init__()

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in __init__()
   1066             raise NotImplementedError('File mode not supported')
   1067         if mode == 'rb':
-> 1068             self.details = gcsfs.info(path)
   1069             self.size = self.details['size']
   1070         else:

<decorator-gen-17> in info()

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in info()
    800 
    801         try:
--> 802             return self._get_object(path)
    803         except FileNotFoundError:
    804             logger.debug("info FileNotFound at path: %s", path)

<decorator-gen-3> in _get_object()

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _get_object()
    490 
    491         result = self._process_object(bucket, self._call('get', 'b/{}/o/{}',
--> 492                                                          bucket, key))
    493 
    494         return result

<decorator-gen-2> in _call()

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _tracemethod()
     49         logger.log(logging.DEBUG - 1, tb_io.getvalue())
     50 
---> 51     return f(self, *args, **kwargs)
     52 
     53 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _call()
    439                     # retry
    440                     continue
--> 441                 raise e
    442         try:
    443             out = r.json()

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/gcsfs/core.py in _call()
    429             try:
    430                 time.sleep(2**retry - 1)
--> 431                 r = meth(self.base + path, params=kwargs, json=json)
    432                 validate_response(r, path)
    433                 break

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/requests/sessions.py in get()
    519 
    520         kwargs.setdefault('allow_redirects', True)
--> 521         return self.request('GET', url, **kwargs)
    522 
    523     def options(self, url, **kwargs):

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/auth/transport/requests.py in request()
    196 
    197         self.credentials.before_request(
--> 198             self._auth_request, method, url, request_headers)
    199 
    200         response = super(AuthorizedSession, self).request(

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/auth/credentials.py in before_request()
    119         # the http request.)
    120         if not self.valid:
--> 121             self.refresh(request)
    122         self.apply(headers)
    123 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/oauth2/credentials.py in refresh()
    124             _client.refresh_grant(
    125                 request, self._token_uri, self._refresh_token, self._client_id,
--> 126                 self._client_secret))
    127 
    128         self.token = access_token

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/oauth2/_client.py in refresh_grant()
    235     }
    236 
--> 237     response_data = _token_endpoint_request(request, token_uri, body)
    238 
    239     try:

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/oauth2/_client.py in _token_endpoint_request()
    104 
    105     response = request(
--> 106         method='POST', url=token_uri, headers=headers, body=body)
    107 
    108     response_body = response.data.decode('utf-8')

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/google/auth/transport/requests.py in __call__()
    122         except requests.exceptions.RequestException as caught_exc:
    123             new_exc = exceptions.TransportError(caught_exc)
--> 124             six.raise_from(new_exc, caught_exc)
    125 
    126 

/home/rpa/.conda/envs/dask_distributed/lib/python3.5/site-packages/six.py in raise_from()

TransportError: None: Max retries exceeded with url: /o/oauth2/token (Caused by None)
martindurant commented 6 years ago

Hm, not too much to go on there, except that it's clearly trying to re-authenticate. I wonder if we could be doing a better job of caching the GCSFileSystem instances in a given worker, or if this is just a too-many-concurrent-requests kind of thing. In any case, I would first suggest trying to throttle the number of workers that are writing, to see if that helps.

rabernat commented 6 years ago

I would first suggest trying to throttle the number of workers that are writing, to see if that helps.

Can I accomplish that using a write lock?

martindurant commented 6 years ago

Certainly, but then you would loose parallelism. Perhaps Variable would allow you to limit the number of workers/threads (@mrocklin , suggestions?)

rabernat commented 6 years ago

I tried this with zarr's thread synchronizer to prevent simulataneous writes to GCS. No luck, same errors. I am still stuck with issue and unable to move forward.

I am also seeing these errors on my worker logs

distributed.worker - WARNING - Compute Failed Function: getter args: (ImplicitToExplicitIndexingAdapter(array=CopyOnWriteArray(array=LazilyOuterIndexedArray(array=_ElementwiseFunctionArray(LazilyOuterIndexedArray(array=<xarray.backends.netCDF4_.NetCDF4ArrayWrapper object at 0x7f7fba5ddf28>, key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(None, None, None)))), func=functools.partial(<function _apply_mask at 0x7f7fba631488>, dtype=dtype('float32'), decoded_fill_value=nan, encoded_fill_values=[-1e+20]), dtype=dtype('float32')), key=BasicIndexer((slice(None, None, None), slice(None, None, None), slice(None, None, None), slice(None, None, None)))))), (slice(66, 67, None), slice(43, 44, None), slice(0, 2700, None), slice(0, 3600, None)), True, <SerializableLock: 376ac45c-c1c4-4946-8f5a-3719b667d218>) kwargs: {} Exception: OSError('Too many open files',)

I guess it could be related.

martindurant commented 6 years ago

I wonder, would it be useful to provide an insecure token mode, i.e., where the actual access token is passed to all the instances, rather than using local renew tokens which cause the calls to the /token/ endpoint? I call this insecure, since the tokens would be passed in open channels, but this is not an issue within the isolated network of kubernetes.

I think the following should do it: you should set up a gcsfs instance, and perform any operation on it (the first operation will cause the token refresh) and then

token=gcs.session.credentials

in storage parameters (be sure to also explicitly give the project when you do this).

kaipak commented 6 years ago

Was this issue completely resolved? I've been running into this exact problem when moving very large datasets (~1TB). Reducing Dask cluster size seems to help. I am using,

token=gcs.session.credentials

As mentioned by @martindurant above.

martindurant commented 6 years ago

No, I don't think we have a concrete solution, the problem comes from some sort of rate limit accessing the google metadata service.

william-silversmith commented 3 years ago

I ran into the same problem (in multi-process CloudFiles). I found this stack overflow that says it could also be too many open file descriptors (i.e. network connections), but I think you are probably right that it's a Google rate limit.

https://stackoverflow.com/questions/15286288/what-does-this-python-requests-error-mean

I wonder if it would be possible to let these connections share the DNS / auth information.