coiled / feedback

A place to provide Coiled feedback
14 stars 3 forks source link

Network issues - What ports and protocol does Coiled need? #53

Closed matthiasdv closed 4 years ago

matthiasdv commented 4 years ago

Running the Coiled getting started example works fine on my local development machine. But when I attempt to run it from a Kubernetes cluster on Google Cloud Engine I run into the following error when trying to instantiate the client:

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-7-bc46b4527c7f> in <module>
----> 1 client = Client(cluster)

/opt/conda/lib/python3.8/site-packages/distributed/client.py in __init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    742             ext(self)
    743 
--> 744         self.start(timeout=timeout)
    745         Client._instances.add(self)
    746 

/opt/conda/lib/python3.8/site-packages/distributed/client.py in start(self, **kwargs)
    946             self._started = asyncio.ensure_future(self._start(**kwargs))
    947         else:
--> 948             sync(self.loop, self._start, **kwargs)
    949 
    950     def __await__(self):

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    337     if error[0]:
    338         typ, exc, tb = error[0]
--> 339         raise exc.with_traceback(tb)
    340     else:
    341         return result[0]

/opt/conda/lib/python3.8/site-packages/distributed/utils.py in f()
    321             if callback_timeout is not None:
    322                 future = asyncio.wait_for(future, callback_timeout)
--> 323             result[0] = yield future
    324         except Exception as exc:
    325             error[0] = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/tornado/gen.py in run(self)
    733 
    734                     try:
--> 735                         value = future.result()
    736                     except Exception:
    737                         exc_info = sys.exc_info()

/opt/conda/lib/python3.8/site-packages/distributed/client.py in _start(self, timeout, **kwargs)
   1043 
   1044         try:
-> 1045             await self._ensure_connected(timeout=timeout)
   1046         except OSError:
   1047             await self._close()

/opt/conda/lib/python3.8/site-packages/distributed/client.py in _ensure_connected(self, timeout)
   1124         else:
   1125             msg = await comm.read()
-> 1126         assert len(msg) == 1
   1127         assert msg[0]["op"] == "stream-start"
   1128 

AssertionError: 

Inspecting the cluster logs in the dashboard I see the following stack trace from the scheduler:

distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy

distributed.scheduler - INFO - Clear task state

distributed.scheduler - INFO -   Scheduler at:    tls://10.1.12.248:8786

distributed.scheduler - INFO -   dashboard at:                     :8787

distributed.preloading - INFO - Run preload setup function: https://beta.coiled.io/preloads/insights.py

distributed.preloading - INFO - Run preload setup function: https://beta.coiled.io/preloads/aws-credentials.py

tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7fa9e5d05670>, <Task finished name='Task-22' coro=<BaseTCPListener._handle_stream() done, defined at /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:445> exception=KeyError('pickle-protocol')>)

Traceback (most recent call last):

  File "/opt/conda/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback

    ret = callback()

  File "/opt/conda/lib/python3.8/site-packages/tornado/tcpserver.py", line 327, in <lambda>

    gen.convert_yielded(future), lambda f: f.result()

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py", line 457, in _handle_stream

    await self.on_connection(comm)

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 222, in on_connection

    comm.handshake_options = comm.handshake_configuration(

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 138, in handshake_configuration

    "pickle-protocol": min(local["pickle-protocol"], remote["pickle-protocol"])

KeyError: 'pickle-protocol'

distributed.comm.tcp - WARNING - Closing dangling stream in <TLS  local=tls://10.1.12.248:8786 remote=tls://35.233.61.133:60166>

distributed.scheduler - INFO - Register worker <Worker 'tls://10.1.12.253:39729', name: matthiasdv-510-worker-3-90f3ea, memory: 0, processing: 0>

distributed.scheduler - INFO - Starting worker compute stream, tls://10.1.12.253:39729

distributed.core - INFO - Starting established connection

distributed.scheduler - INFO - Register worker <Worker 'tls://10.1.13.16:35139', name: matthiasdv-510-worker-2-5f43e5, memory: 0, processing: 0>

distributed.scheduler - INFO - Starting worker compute stream, tls://10.1.13.16:35139

distributed.core - INFO - Starting established connection

distributed.scheduler - INFO - Register worker <Worker 'tls://10.1.11.103:39877', name: matthiasdv-510-worker-1-404df1, memory: 0, processing: 0>

distributed.scheduler - INFO - Starting worker compute stream, tls://10.1.11.103:39877

distributed.core - INFO - Starting established connection

tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7fa9e5b5f700>, <Task finished name='Task-676' coro=<BaseTCPListener._handle_stream() done, defined at /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:445> exception=KeyError('pickle-protocol')>)

Traceback (most recent call last):

  File "/opt/conda/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback

    ret = callback()

  File "/opt/conda/lib/python3.8/site-packages/tornado/tcpserver.py", line 327, in <lambda>

    gen.convert_yielded(future), lambda f: f.result()

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py", line 457, in _handle_stream

    await self.on_connection(comm)

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 222, in on_connection

    comm.handshake_options = comm.handshake_configuration(

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 138, in handshake_configuration

    "pickle-protocol": min(local["pickle-protocol"], remote["pickle-protocol"])

KeyError: 'pickle-protocol'

tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7fa9e5b5f4c0>, <Task finished name='Task-671' coro=<BaseTCPListener._handle_stream() done, defined at /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:445> exception=KeyError('pickle-protocol')>)

Traceback (most recent call last):

  File "/opt/conda/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback

    ret = callback()

  File "/opt/conda/lib/python3.8/site-packages/tornado/tcpserver.py", line 327, in <lambda>

    gen.convert_yielded(future), lambda f: f.result()

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py", line 457, in _handle_stream

    await self.on_connection(comm)

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 222, in on_connection

    comm.handshake_options = comm.handshake_configuration(

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 138, in handshake_configuration

    "pickle-protocol": min(local["pickle-protocol"], remote["pickle-protocol"])

KeyError: 'pickle-protocol'

distributed.comm.tcp - WARNING - Closing dangling stream in <TLS  local=tls://10.1.12.248:8786 remote=tls://35.233.61.133:60226>

distributed.comm.tcp - WARNING - Closing dangling stream in <TLS  local=tls://10.1.12.248:8786 remote=tls://35.233.61.133:60228>

tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7fa9e5a5fe50>, <Task finished name='Task-1534' coro=<BaseTCPListener._handle_stream() done, defined at /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:445> exception=KeyError('pickle-protocol')>)

Traceback (most recent call last):

  File "/opt/conda/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback

    ret = callback()

  File "/opt/conda/lib/python3.8/site-packages/tornado/tcpserver.py", line 327, in <lambda>

    gen.convert_yielded(future), lambda f: f.result()

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py", line 457, in _handle_stream

    await self.on_connection(comm)

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 222, in on_connection

    comm.handshake_options = comm.handshake_configuration(

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 138, in handshake_configuration

    "pickle-protocol": min(local["pickle-protocol"], remote["pickle-protocol"])

KeyError: 'pickle-protocol'

tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7fa9e5b5fe50>, <Task finished name='Task-1529' coro=<BaseTCPListener._handle_stream() done, defined at /opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py:445> exception=KeyError('pickle-protocol')>)

Traceback (most recent call last):

  File "/opt/conda/lib/python3.8/site-packages/tornado/ioloop.py", line 743, in _run_callback

    ret = callback()

  File "/opt/conda/lib/python3.8/site-packages/tornado/tcpserver.py", line 327, in <lambda>

    gen.convert_yielded(future), lambda f: f.result()

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/tcp.py", line 457, in _handle_stream

    await self.on_connection(comm)

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 222, in on_connection

    comm.handshake_options = comm.handshake_configuration(

  File "/opt/conda/lib/python3.8/site-packages/distributed/comm/core.py", line 138, in handshake_configuration

    "pickle-protocol": min(local["pickle-protocol"], remote["pickle-protocol"])

KeyError: 'pickle-protocol'

distributed.comm.tcp - WARNING - Closing dangling stream in <TLS  local=tls://10.1.12.248:8786 remote=tls://35.233.61.133:60316>

distributed.comm.tcp - WARNING - Closing dangling stream in <TLS  local=tls://10.1.12.248:8786 remote=tls://35.233.61.133:60318>

Obviously, the client on the GCE machine is having issues when trying to connect to the Coiled managed scheduler. What would be the first things to check? What ports should be opened for which protocols?

Any help is much appreciated

mrocklin commented 4 years ago

Thank you for raising an issue @matthiasdv .

This is a signal that your dask.distributed versions are out of sync. I've raised a small PR in Dask to improve the error message here: https://github.com/dask/distributed/pull/4076

I'm curious, how did you install coiled? I would have expected it to require a sufficiently recent version of dask/distributed.

Can I ask for the following:

import coiled, dask, distributed
print(coiled.__version__)
print(dask.__version__)
print(distributed.__version__)
jrbourbeau commented 4 years ago

Thanks for raising an issue @matthiasdv! The KeyError: 'pickle-protocol' error that's popping up looks like it's related to some recent changes in distributed. Can you check that you're using distributed >= 2.23.0 ?

mrocklin commented 4 years ago

I think that this is the problem: https://github.com/coiled/cloud/pull/735

matthiasdv commented 4 years ago

I tried reproducing the issue, but unfortunately it solved itself. I re-installed the Coiled library using:

!pip install coiled --force-reinstall --user

And it seems to work now. This is what I believe must have happened:

I struggled to reproduce this on the actual environment. But the versions on the Docker image are as follows:

distributed               2.18.0           py38h32f6830_0    conda-forge
dask                      2.18.1                     py_0    conda-forge

After applying a pip --forece-reinstall coiled

import coiled, dask, distributed
print(coiled.__version__)
print(dask.__version__)
print(distributed.__version__)

0.0.21
2.24.0
2.24.0

The missing piece of information are the versions during the failed state. I did notice that the --force-reinstall upgraded the distributed version:

Successfully installed Jinja2-2.11.2 MarkupSafe-1.1.1 aiobotocore-1.1.0 aiohttp-3.6.2 aioitertools-0.7.0 async-timeout-3.0.1 attrs-20.1.0 backcall-0.2.0 bokeh-2.2.0 botocore-1.17.44 chardet-3.0.4 click-7.1.2 cloudpickle-1.5.0 coiled-0.0.21 dask-2.24.0 decorator-4.4.2 **_distributed-2.24.0_** docutils-0.15.2 fsspec-0.8.0 heapdict-1.0.1 idna-2.10 ipython-7.17.0 ipython-genutils-0.2.0 jedi-0.17.2 jmespath-0.10.0 locket-0.2.0 msgpack-1.0.0 multidict-4.7.6 numpy-1.19.1 packaging-20.4 pandas-1.1.1 parso-0.7.1 partd-1.1.0 pexpect-4.8.0 pickleshare-0.7.5 pillow-7.2.0 prompt-toolkit-3.0.6 psutil-5.7.2 ptyprocess-0.6.0 pygments-2.6.1 pyparsing-2.4.7 python-dateutil-2.8.1 pytz-2020.1 pyyaml-5.3.1 s3fs-0.5.0 setuptools-49.6.0 six-1.15.0 sortedcontainers-2.2.2 tblib-1.7.0 toolz-0.10.0 tornado-6.0.4 traitlets-4.3.3 typing-extensions-3.7.4.3 urllib3-1.25.10 wcwidth-0.2.5 wrapt-1.12.1 yarl-1.5.1 zict-2.0.0

matthiasdv commented 4 years ago

@mrocklin Dask has a .get_versions(check=True) method that we use regularly to validate that both the scheduler and workers are running the desired versions of certain libraries. This is the first go-to in case of strange behaviour. Is there a way to use that in this particular situation?

matthiasdv commented 4 years ago

Not entirely there yet though:

/home/jovyan/.local/lib/python3.8/site-packages/distributed/client.py:1138: VersionMismatchWarning: Mismatched versions found

+-------------+---------------+---------------+---------------+
| Package     | client        | scheduler     | workers       |
+-------------+---------------+---------------+---------------+
| dask        | 2.24.0        | 2.23.0        | 2.23.0        |
| distributed | 2.24.0        | 2.23.0        | 2.23.0        |
| python      | 3.8.3.final.0 | 3.8.5.final.0 | 3.8.5.final.0 |
+-------------+---------------+---------------+---------------+
  warnings.warn(version_module.VersionMismatchWarning(msg[0]["warning"]))

it seems the install resolved to a higher version than the one running on the cluster.

mrocklin commented 4 years ago

I suspect that that version combination should be fine though

mrocklin commented 4 years ago

2.23 introduced a significant break in the protocol. As long as you're above that you should be ok.

matthiasdv commented 4 years ago

Ok, wonderful. Currently computing some aggregations on the New York Taxi dataset. All in all Coiled was fairly easy to get up and running during first use.

jrbourbeau commented 4 years ago

Glad to hear it : )