Closed zonca closed 3 years ago
@zonca dask workers would potentially be nice. I don't have any examples of a distributed computation, though - we haven't explored this yet. If it would be straightforward to set up dask, then I could have a student start testing our some of our analysis tasks using its distributed features. If it's a pain then I'd suggest we consider waiting.
Data is currently all stored at SLAC. There is a "data catalog" python library that allows users to query for data paths. If they don't exist locally they're downloaded to disk.
Is it possible to have a storage disk that is mounted to everyone's container? For initial testing 50 GB would be more than enough. If we want to try to support full CDMS analysis efforts that's more like 10 TB.
Ok, let's add dask as well. Moved discussion about data to #8
I'll take care of adding dask to the image in #3 and setup Kubernetes accordingly. Mostly following https://zonca.github.io/2018/06/private-dask-kubernetes-jetstream.html
Next I'll work on deploying dask clusters and then use it to launch a bunch of tasks in parallel, using the technique used in https://stackoverflow.com/questions/49551759/running-shell-commands-in-parallel-using-dask-distributed to launch jobs as explained in #13
initially I'll create a new conda environment using the pangeo/base-notebook
environment from https://github.com/pangeo-data/pangeo-stacks/blob/master/base-notebook/binder/environment.yml
Try first from scratch following https://zonca.dev/2018/06/private-dask-kubernetes-jetstream.html
I started to work on dask
, in particular using dask-gateway
, https://gateway.dask.org/, which provides infrastructure to handle multiple dask
clusters inside Kubernetes.
I haven't managed to get it working yet, it has a complex architecture and I am finding difficult to even understand why it is not working.
Anyway, I'll keep testing and debugging.
@pibion good news, I got dask-gateway
working (on Jetstream, but outside of CDMS for now), I prepared a tutorial here:
https://zonca.dev/2020/08/dask-gateway-jupyterhub.html
see also an example from the user point of view: https://gist.github.com/zonca/355a7ec6b5bd3f84b1413a8c29fbc877
Next I need to customize this for CDMS and install to the production deployment.
@ziqinghong for the analysis you want to try execute in dask
, do you need the whole Python environment via CVMFS?
If you do, then I need to make sure the CVMFS Python environment is updated to include all the requirements to connect to dask-gateway
.
If you only need a subset of the functionality, it would be easier to write a custom image just with those.
@pibion @ziqinghong for now I'll assume we want to use the full environment from CVMFS, that seems the most robust solution.
Sorry this fell through the crack. The whole python environment via cvmfs seems ideal. Thank you!
ok, I deployed dask gateway
on the SuperCDMS JupyterHub, for now using a standard image, just to test the infrastructure and it works fine.
Next I need to have the workers use CVMFS, first though I need some packages installed, @bloer could you please install:
dask_gateway==0.8.0 dask==2.25.0 distributed==2.25.0
these packages in the next CDMS software release on CVMFS? When that is available, I can work on integrating it.
@ziqinghong if you want to test drive it, you can run the lastest CDMS kernel, install locally the extra packages with:
pip install --user dask_gateway==0.8.0 dask==2.25.0 distributed==2.25.0
restart the kernel and then run this notebook (replace js-xxx-yyy
with supercdms
):
https://gist.github.com/zonca/355a7ec6b5bd3f84b1413a8c29fbc877
for now the workers do not have the CDMS packages, so you can only test pure numpy
stuff.
@bloer @pibion, do you think we can get these packages installed in the CDMS python environment on CVMFS?
dask_gateway==0.8.0 dask==2.25.0 distributed==2.25.0
@zonca do we want the strict == requirement, or >= ?
Also note that because CVMFS does it's magic by setting PYTHONPATH, pip install --user
will behave weirdly. If the package doesn't exist at all in CVMFS it should work fine, but otherwise packages on PYTHONPATH will preempt the user's local directory. So it's best to explicitly add userhome to PYTHONPATH. I have a ticket open to try to figure out how to do this more automatically but right now I haven't figured out how yet.
==
,
pip install --user
is just for testing purposes right now, and it works fine, once we have them in CVMFS, no need.
@bloer would you have any update on this? thank you
@zonca sorry, too many deadlines lately. I should have some time at the end of this week.
This should now be addressed in the V03-05 release on cvmfs
I get an error in the V03-05
kernel, but I don't understand yet why, still debugging,
when I call new_cluster
:
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.6.5-f74f0/x86_64-centos7-gcc8-opt/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
yield from waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
but it looks like it still creates the cluster, but then fails when I try to get the client:
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.6.5-f74f0/x86_64-centos7-gcc8-opt/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
yield from waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
Task exception was never retrieved
future: <Task finished coro=<connect.<locals>._() done, defined at /home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py:288> exception=CommClosedError()>
Traceback (most recent call last):
File "/home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py", line 297, in _
handshake = await asyncio.wait_for(comm.read(), 1)
File "/cvmfs/sft.cern.ch/lcg/releases/Python/3.6.5-f74f0/x86_64-centos7-gcc8-opt/lib/python3.6/asyncio/tasks.py", line 351, in wait_for
yield from waiter
concurrent.futures._base.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/jovyan/.local/lib/python3.6/site-packages/distributed/comm/core.py", line 304, in _
raise CommClosedError() from e
distributed.comm.core.CommClosedError
---------------------------------------------------------------------------
OSError Traceback (most recent call last)
~/.local/lib/python3.6/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
321 if not comm:
--> 322 _raise(error)
323 except FatalCommClosedError:
~/.local/lib/python3.6/site-packages/distributed/comm/core.py in _raise(error)
274 )
--> 275 raise IOError(msg)
276
OSError: Timed out trying to connect to 'gateway://traefik-dask-gateway:80/jhub.3e2819943639456c9f0432253d6c0879' after 10 s: connect() didn't finish in time
During handling of the above exception, another exception occurred:
OSError Traceback (most recent call last)
<ipython-input-20-affca45186d3> in <module>
----> 1 client = cluster.get_client()
~/.local/lib/python3.6/site-packages/dask_gateway/client.py in get_client(self, set_as_default)
1066 set_as_default=set_as_default,
1067 asynchronous=self.asynchronous,
-> 1068 loop=self.loop,
1069 )
1070 if not self.asynchronous:
~/.local/lib/python3.6/site-packages/distributed/client.py in __init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
743 ext(self)
744
--> 745 self.start(timeout=timeout)
746 Client._instances.add(self)
747
~/.local/lib/python3.6/site-packages/distributed/client.py in start(self, **kwargs)
948 self._started = asyncio.ensure_future(self._start(**kwargs))
949 else:
--> 950 sync(self.loop, self._start, **kwargs)
951
952 def __await__(self):
~/.local/lib/python3.6/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
337 if error[0]:
338 typ, exc, tb = error[0]
--> 339 raise exc.with_traceback(tb)
340 else:
341 return result[0]
~/.local/lib/python3.6/site-packages/distributed/utils.py in f()
321 if callback_timeout is not None:
322 future = asyncio.wait_for(future, callback_timeout)
--> 323 result[0] = yield future
324 except Exception as exc:
325 error[0] = sys.exc_info()
/cvmfs/sft.cern.ch/lcg/views/LCG_96python3/x86_64-centos7-gcc8-opt/lib/python3.6/site-packages/tornado/gen.py in run(self)
1131
1132 try:
-> 1133 value = future.result()
1134 except Exception:
1135 self.had_exception = True
~/.local/lib/python3.6/site-packages/distributed/client.py in _start(self, timeout, **kwargs)
1045
1046 try:
-> 1047 await self._ensure_connected(timeout=timeout)
1048 except (OSError, ImportError):
1049 await self._close()
~/.local/lib/python3.6/site-packages/distributed/client.py in _ensure_connected(self, timeout)
1103 try:
1104 comm = await connect(
-> 1105 self.scheduler.address, timeout=timeout, **self.connection_args
1106 )
1107 comm.name = "Client->Scheduler"
~/.local/lib/python3.6/site-packages/distributed/comm/core.py in connect(addr, timeout, deserialize, handshake_overrides, **connection_args)
332 backoff = min(backoff, 1) # wait at most one second
333 else:
--> 334 _raise(error)
335 else:
336 break
~/.local/lib/python3.6/site-packages/distributed/comm/core.py in _raise(error)
273 error,
274 )
--> 275 raise IOError(msg)
276
277 backoff = 0.01
OSError: Timed out trying to connect to 'gateway://traefik-dask-gateway:80/jhub.3e2819943639456c9f0432253d6c0879' after 10 s: Timed out trying to connect to 'gateway://traefik-dask-gateway:80/jhub.3e2819943639456c9f0432253d6c0879' after 10 s: connect() didn't finish in time
I even tried overriding the packages installed in CVMFS with packages in .local
(I was worried there was a timeout due to NFS being slow), but I get the same error in both cases.
However, this works fine with V03-04
.
I'll keep debugging to try understand what is going on.
Also double checked the different packages installed in the 2 kernel environments (using pip freeze
), but they are basically the same:
5c5
< arviz==0.9.0
---
> arviz==0.10.0
7c7
< asteval==0.9.18
---
> asteval==0.9.19
21a22
> bokeh==2.2.1
58c59,60
< fastprogress==0.2.6
---
> fastprogress==1.0.0
> fsspec==0.8.3
104c106
< jupyterlab==2.2.4
---
> jupyterlab==2.2.8
113c115
< livereload==2.6.2
---
> livereload==2.6.3
133,135c135,137
< mkdocs-material==5.5.6
< mkdocs-material-extensions==1.0
< mkdocs-pymdownx-material-extras==1.0.7
---
> mkdocs-material==6.0.2
> mkdocs-material-extensions==1.0.1
> mkdocs-pymdownx-material-extras==1.1.1
154c156
< openpyxl==3.0.4
---
> openpyxl==3.0.5
166c168
< pdoc3==0.8.4
---
> pdoc3==0.9.1
170c172
< Pint==0.14
---
> Pint==0.16.1
201c203
< pymdown-extensions==8.0
---
> pymdown-extensions==8.0.1
223c225
< QETpy==1.1.0
---
> QETpy==1.3.0
227c229
< rawio==1.2.2
---
> rawio==1.2.3
229c231
< regex==2020.7.14
---
> regex==2020.9.27
239c241
< RQpy==0.1.0
---
> RQpy==0.2.0
268c270
< tifffile==2020.8.13
---
> tifffile==2020.9.3
271c273
< tqdm==4.48.2
---
> tqdm==4.50.0
274c276
< typing-extensions==3.7.4.2
---
> typing-extensions==3.7.4.3
286c288
< xarray==0.16.0
---
> xarray==0.16.1
In your first traceback, the error says it's coming from your .local
directory; is that possibly part of the problem?
No, I tried both ways and the error is the same
found some errors in the Traefik logs:
time="2020-10-10T03:51:20Z" level=error msg="subset not found for jhub/dask-6194dda20abf411a837e38c466a2ae33" providerName=kubernetescrd namespace=jhub ingress=dask-6194dda20abf411a837e38c466a2ae33
time="2020-10-10T03:51:20Z" level=error msg="Cannot create service: subset not found" servicePort=8786 providerName=kubernetescrd ingress=dask-6194dda20abf411a837e38c466a2ae33 namespace=jhub serviceName=dask-6194dda20abf411a837e38c466a2ae33
V03-05
after trying to update distributed
:
aiohttp/ distributed-2.25.0.dist-info/
aiohttp-3.6.2.dist-info/ idna_ssl-1.1.0.dist-info/
async_timeout/ idna_ssl.py
async_timeout-3.0.1.dist-info/ multidict/
cloudpickle/ multidict-4.7.6.dist-info/
cloudpickle-1.6.0.dist-info/ __pycache__/
dask/ tblib/
dask-2.25.0.dist-info/ tblib-1.7.0.dist-info/
dask_gateway/ yarl/
dask_gateway-0.8.0.dist-info/ yarl-1.6.0.dist-info/
distributed/
V03-04
:
aiohttp/ distributed-2.25.0.dist-info/
aiohttp-3.6.2.dist-info/ idna_ssl-1.1.0.dist-info/
async_timeout/ idna_ssl.py
async_timeout-3.0.1.dist-info/ multidict/
cloudpickle/ multidict-4.7.6.dist-info/
cloudpickle-1.6.0.dist-info/ __pycache__/
dask/ tblib/
dask-2.25.0.dist-info/ tblib-1.7.0.dist-info/
dask_gateway/ yarl/
dask_gateway-0.8.0.dist-info/ yarl-1.6.0.dist-info/
distributed/
worked on this a bit more, cannot find what is wrong, asked on Stackoverflow: https://stackoverflow.com/questions/64347182/
too bad, no answer on Stackoverflow, next I want to create a test Kubernetes cluster from scratch on another account and do some testing there, see if in a simpler setup I can understand what is going on.
Dask Gateway is probably going to release a new version soon, I'll wait 1 or 2 weeks and test directly with the new release, you never know that this could fix the issue magically. https://github.com/dask/dask-gateway/releases
good news, I think I have identified the problem thanks to an answer from the dask gateway devs.
It was just a version incompatibility, I was mistaken thinking that the worker and scheduler images had distributed
2.25.0 instead they had an older version of distributed
.
So my plan now is to build custom docker images for CDMS which don't have a Python environment but load it from CVMFS and use them both as scheduler and worker images.
So that we are sure we have consistent versions across the cluster.
I'll update this ticket with the results from this test.
@pibion do you also want capability like Pangeo for a user to request a cluster of dask workers so that they can run in parallel?
In this case, do you have an example of such distributed computation?
Also, the data. How are Jupyter Notebook users going to access data? where do we store them?