pangeo-data / rechunker

Disk-to-disk chunk transformation for chunked arrays.
https://rechunker.readthedocs.io/
MIT License
163 stars 25 forks source link

403 forbidden at the very end of rechunking #86

Open jsadler2 opened 3 years ago

jsadler2 commented 3 years ago

I'm executing a rechunk plan with a dask kubernetes cluster. Everything goes swimmingly for the entire operation ... until it mysteriously stops.

I get 12,263 out of 12,279 (99.9%) of the _copy_chunk tasks done and then it just sits there: image

After ~30 minutes I get this error:

Exception ignored in: <finalize object at 0x7fbac8535da0; dead>
Traceback (most recent call last):
  File "/srv/conda/envs/pangeo/lib/python3.7/weakref.py", line 572, in __call__
    return info.func(*info.args, **(info.kwargs or {}))
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/dask_kubernetes/core.py", line 707, in _cleanup_resources
    namespace, label_selector=format_labels(labels)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 16097, in list_namespaced_service
    return self.list_namespaced_service_with_http_info(namespace, **kwargs)  # noqa: E501
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/api/core_v1_api.py", line 16222, in list_namespaced_service_with_http_info
    collection_formats=collection_formats)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 353, in call_api
    _preload_content, _request_timeout, _host)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 184, in __call_api
    _request_timeout=_request_timeout)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/api_client.py", line 377, in request
    headers=headers)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/rest.py", line 243, in GET
    query_params=query_params)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/kubernetes/client/rest.py", line 233, in request
    raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (403)
Reason: Forbidden
HTTP response headers: HTTPHeaderDict({'Audit-Id': 'fa6fe50f-4cd9-4ec8-adaf-8becf2533a6f', 'Content-Type': 'application/json', 'X-Content-Type-Options': 'nosniff', 'Date': 'Mon, 26 Apr 2021 21:21:50 GMT', 'Content-Length': '299'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"services is forbidden: User \"system:serviceaccount:pangeo:daskkubernetes\" cannot list resource \"services\" in API group \"\" in the namespace \"pangeo\"","reason":"Forbidden","details":{"kind":"services"},"code":403}

Unclosed client session
client_session: <aiohttp.client.ClientSession object at 0x7fbac5e3c3d0>
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x7fbac5e98c50>>, <Task finished coro=<SpecCluster._close() done, defined at /srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/deploy/spec.py:393> exception=RPCClosed("RPC Closed: while trying to call remote method 'close'")>)
Traceback (most recent call last):
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/core.py", line 805, in send_recv_from_rpc
    comm = await self.live_comm()
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/core.py", line 752, in live_comm
    raise RPCClosed("RPC Closed")
distributed.core.RPCClosed: RPC Closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/tornado/ioloop.py", line 765, in _discard_future_result
    future.result()
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/deploy/spec.py", line 407, in _close
    await self.scheduler_comm.close(close_workers=True)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/core.py", line 810, in send_recv_from_rpc
    "%s: while trying to call remote method %r" % (e, key)
distributed.core.RPCClosed: RPC Closed: while trying to call remote method 'close'

So close! When I look at the bucket, a lot of the data has been written:

zarr store size (GB)
source 682.3
intermediate 604.5
target 595.5

I can also load the target dataset and pull out data from it. Obviously not all of the data is there, though. It's also not obvious (to me anyway) which of the data doesn't make it.

rabernat commented 3 years ago

Frustrating!

Can you give more details? What sort of storage are you using? Just post your full workflow if you can.

jsadler2 commented 3 years ago

Sure can:

import xarray as xr
import fsspec
from rechunker import rechunk

fs = fsspec.filesystem('s3', profile='ds-drb-creds', anon=False)
nldas_path = 'ds-drb-data/nldas'
nldas_full = fs.get_mapper(nldas_path)
ds_full = xr.open_zarr(nldas_full)

intermediate = fs.get_mapper('ds-drb-data/nldas_intermediate')
target = fs.get_mapper('ds-drb-data/nldas_timeseries_chunks')

target_chunksizes = {'time': 61376, "lat": 28, "lon": 29}

target_chunks = {}

# make the chunk sizes for all my variables
for v in ds_full.var():
    target_chunks[v] = target_chunksizes

rechunk_plan = rechunk(ds_full, target_chunks, max_mem='500MB', target_store=target, temp_store=intermediate)

result = rechunk_plan.execute()

Here is a gist of my notebook: https://gist.github.com/jsadler2/1a8faf1171f3164be2fafbe044d91c57

rabernat commented 3 years ago

Thanks for the very cool and clean example! We should put this on the rechunker website 🤩

To me, these errors are dask mumbo jumbo, not really telling us much about the underlying error. We want to get deeper into the root cause for the process hanging. To me this has the faint whiff of an fsspec asyc-io issue. For those things I often ping @martindurant and he responds 🦸‍♂️ from across the internet and immediately gets to the bottom of it.

Getting logs from the workers would be the next step, if you were willing to dig deeper.

If you were getting actual errors, rather than just a zombie process, I would recommend rechunk_plan.execute(retries=5). When working with lots of binary data in S3, you have to be resilient to a bit corrupted in transit here and there.

rabernat commented 3 years ago

It seems like a dask task timeout would help here. That idea was proposed a long time ago - https://github.com/dask/distributed/issues/391 - but hasn't been implemented.

You could try executing your flow with prefect (with a prefect Dask Executor) and using a prefect task timeout to interrupt / retry long-running tasks. But what qualifies as "long running"? How do we know a process has really hung as opposed to just taking an unusually long time?

martindurant commented 3 years ago

I'm not getting much from that traceback either, except that dask seems to think that it's done and errors during shutdown of the cluster.

If it is s3fs, you could set S3FS_LOGGING_LEVEL=DEBUG to get a lot more output on what's going on.

I also have experimental https://github.com/intake/filesystem_spec/pull/617 , which could be used to allow getting debug info from stalled coroutines or cancelling them (which would trigger a retry or a real exception traceback).

Finally, I have the following change (see discussion in https://github.com/intake/filesystem_spec/pull/560) which might help.... or not. Bottom line, I still don't know what conditions lead to a deadlock. Bu it's obviously very important to get this right!

--- a/s3fs/core.py
+++ b/s3fs/core.py
@@ -381,9 +381,17 @@ class S3FileSystem(AsyncFileSystem):
     @staticmethod
     def close_session(loop, s3):
         if loop is not None and loop.is_running():
-            sync(loop, s3.__aexit__, None, None, None, timeout=0.1)
-        else:
-            s3._endpoint.http_session._connector._close
+            try:
+                sync(loop, s3.__aexit__, None, None, None, timeout=0.1)
+                return
+            except TimeoutError:
+                pass
+        try:
+            # close the actual socket
+            s3._client._endpoint.http_session._connector._close()
+        except AttributeError:
+            # but during shutdown, it may have gone
+            pass

     async def _get_delegated_s3pars(self, exp=3600):
jsadler2 commented 3 years ago

Okay. So I tried again. I got a slightly different error this time:

tornado.application - ERROR - Uncaught exception in write_error
Traceback (most recent call last):
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/tornado/web.py", line 1681, in _execute
    result = self.prepare()
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/notebook/base/handlers.py", line 502, in prepare
    raise web.HTTPError(403)
tornado.web.HTTPError: HTTP 403: Forbidden

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/tornado/web.py", line 1217, in send_error
    self.write_error(status_code, **kwargs)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/notebook/base/handlers.py", line 585, in write_error
    html = self.render_template('%s.html' % status_code, **ns)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/notebook/base/handlers.py", line 515, in render_template
    template = self.get_template(name)
  File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/notebook/base/handlers.py", line 511, in get_template
    return self.settings['jinja2_env'].get_template(name)
KeyError: 'jinja2_env'

I also looked at the worker logs. I had 60 workers. 49 of the 60 had blank logs.

5 workers had logs showing a timeout error like this:

distributed.worker - ERROR - Worker stream died during communication: tcp://10.12.77.128:41871 Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/comm/core.py", line 319, in connect handshake = await asyncio.wait_for(comm.read(), time_left()) File "/srv/conda/envs/pangeo/lib/python3.7/asyncio/tasks.py", line 449, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 2033, in gather_dep self.rpc, deps, worker, who=self.address File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/core.py", line 1030, in connect **self.connection_args, File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/comm/core.py", line 326, in connect ) from exc OSError: Timed out during handshake while connecting to tcp://10.12.77.128:41871 after 10 s

distributed.worker - ERROR - Worker stream died during communication: tcp://10.12.77.128:41871 Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/comm/core.py", line 320, in connect await asyncio.wait_for(comm.write(local_info), time_left()) File "/srv/conda/envs/pangeo/lib/python3.7/asyncio/tasks.py", line 423, in wait_for raise futures.TimeoutError() concurrent.futures._base.TimeoutError The above exception was the direct cause of the following exception: Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 2033, in gather_dep self.rpc, deps, worker, who=self.address File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 3269, in get_data_from_worker return await retry_operation(_get_data, operation="get_data_from_worker") File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/utils_comm.py", line 389, in retry_operation operation=operation, File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/utils_comm.py", line 369, in retry return await coro() File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 3246, in _get_data comm = await rpc.connect(worker) File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/core.py", line 1030, in connect **self.connection_args, File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/comm/core.py", line 326, in connect ) from exc OSError: Timed out during handshake while connecting to tcp://10.12.77.128:41871 after 10 s

6 workers had logs showing a timeout error like this:

distributed.worker - ERROR - 'stage-f685cd4829bf2182a4dccc5128af3c17' Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 2545, in execute data[k] = self.data[k] File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/zict/buffer.py", line 80, in __getitem__ raise KeyError(key) KeyError: 'stage-f685cd4829bf2182a4dccc5128af3c17' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 2549, in execute data[k] = Actor(type(self.actors[k]), self.address, k, self) KeyError: 'stage-f685cd4829bf2182a4dccc5128af3c17'

distributed.worker - ERROR - 'stage-f685cd4829bf2182a4dccc5128af3c17' Traceback (most recent call last): File "/srv/conda/envs/pangeo/lib/python3.7/site-packages/distributed/worker.py", line 2227, in update_who_has self.tasks[dep].who_has.update(workers) KeyError: 'stage-f685cd4829bf2182a4dccc5128af3c17'

I did this !export S3FS_LOGGING_LEVEL=DEBUG as you suggested, @martindurant. I'm not sure it made a difference though.

martindurant commented 3 years ago

No, nothing about s3fs there :|

I notice the reference to an Actor - are you using dask actors?

jsadler2 commented 3 years ago

Hm. I don't know what dask actors are

jsadler2 commented 3 years ago

I should also say that I ran this on a subset of the full dataset that was 1/5th of the full time dimension, and that worked without error. So it seems like a scaling issue.

rabernat commented 3 years ago

If we are stuck in terms of debugging, another option is to ping @jrbourbeau. Figuring out dask / rechunker issues is definitely in-scope for the Pangeo / Coiled collaboration.

jrbourbeau commented 3 years ago

@jsadler2 what versions of dask / distributed / dask-kubernetes are you using? From the notebook you posted it looks like you're using dask=2020.12.0. Could you try upgrading those packages to their latest release version and trying again?

jsadler2 commented 3 years ago

this was with

I'll try again with the latest release version.

jsadler2 commented 3 years ago

@jrbourbeau - I've updated to the following via conda update:

But now when I try to start up my cluster, it just hangs here: image

Do I need to change anything else along with those dask libraries or do I have to do something different to start up the cluster with the upgrade? Maybe there is something traceback/progress I can see to get an idea of why it's not working?

jsadler2 commented 3 years ago

ping @jrbourbeau

jrbourbeau commented 3 years ago

Hmm it's not immediately clear what the issue is. Could you look at the kubernetes pod logs to see if there are any informative errors / tracebacks there?

jrbourbeau commented 3 years ago

Just checking in here @jsadler2. Were there any k8s logs that had useful information?

jsadler2 commented 3 years ago

Hi, @jrbourbeau - thanks for checking back in on this. I actually don't know how to check the k8s logs. Is that something I can do from the Jupyter interface?

I realized that I didn't need the rechunked dataset after all, so this hasn't been a top priority for me (hence the slow response 😬). I can keep trying this some though for the sake of understanding what went wrong and if there is a solution.