dask / dask-ec2

Start a cluster in EC2 for dask.distributed
106 stars 37 forks source link

Hanging in waiter.acquire(True, timeout) #96

Closed lionfish0 closed 7 years ago

lionfish0 commented 7 years ago

My python script hangs (inside search.fit). When I stop it, I find that it's blocked on line 299, gotit = waiter.acquire(True, timeout). I'm not 100% sure what's happened, but I can run it happily from the remote jupyter notebook (over the workers). I wonder if it's a python/pickle compatibility issue. I've just upgraded from 14.04 to 16.04 on the local machine, while the remote workers etc are all on 14.04 (due to #38). The version of python is slightly different, and I wonder if that's causing the problem?

On workers: Python 3.5.2 (ubuntu 14.04) On local: Python 3.6.1 (ubuntu 16.04)

I don't know if that's the problem... I'll investigate more (I wonder how to get a bit more info from the workers??)

lionfish0 commented 7 years ago

I realised the workers produced logs in /var/log/dask-worker.log (I don't know if this is because I added distributed.worker: debug to .dask/config.yaml).

Anyway, it looks like this is the problem:

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f95d1e4f588> exception was never retrieved: Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/core.py", line 239, in handle_comm
    + str(msg))
TypeError: Bad message type.  Expected dict, got
  [{'key': 'cv-parameters-ffeaf7708bc5a807e04134ba8785cabf', 'op': 'release-task'}, {'key': 'array-ae6a72ba4372b6e6a66362a12d606e12', 'op': 'release-task'}]
distributed.utils - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/utils.py", line 381, in log_errors
    yield
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.worker - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/worker.py", line 1040, in compute_stream
    msgs = yield comm.read()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/tcp.py", line 169, in read
    msg = from_frames(frames, deserialize=self.deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/utils.py", line 30, in from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.core - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/core.py", line 260, in handle_comm
    result = yield result
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/worker.py", line 1040, in compute_stream
    msgs = yield comm.read()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/tcp.py", line 169, in read
    msg = from_frames(frames, deserialize=self.deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/utils.py", line 30, in from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)

So looking at the version of ~/anaconda3/lib/python3.6/site-packages/distributed/protocol/numpy.py,

def deserialize_numpy_ndarray(header, frames):
    with log_errors():
        if len(frames) > 1:
            frames = merge_frames(header, frames)

        if header.get('pickle'):
            return pickle.loads(frames[0])

        dt = header['dtype']
        if isinstance(dt, tuple):
            dt = list(dt)

        x = np.ndarray(header['shape'], dtype=dt, buffer=frames[0],
                strides=header['strides'])

        x = stride_tricks.as_strided(x, strides=header['strides'])

        return x

Compared to the version on the worker /opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py:

def deserialize_numpy_ndarray(header, frames):
    with log_errors():
        if len(frames) > 1:
            frames = merge_frames(header, frames)

        if header.get('pickle'):
            return pickle.loads(frames[0])

        is_custom, dt = header['dtype']
        if is_custom:
            dt = pickle.loads(dt)
        else:
            dt = np.dtype(dt)

        x = np.ndarray(header['shape'], dtype=dt, buffer=frames[0],
                       strides=header['strides'])

        return x

My local version of distributed is

Python 3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:09:58) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import distributed
>>> distributed.__version__
'1.16.3'

while on the work it is:

Python 3.5.2 |Continuum Analytics, Inc.| (default, Jul  2 2016, 17:53:06) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import distributed
>>> distributed.__version__
'1.17.1'

I ran conda update distributed on my local machine. Updating: distributed: 1.16.3-py36_0 --> 1.17.1-py36_0

And it now works!

mrocklin commented 7 years ago

Glad to hear that this was resolved. In the future you might try client.get_versions(check=True)

On Wed, Jun 28, 2017 at 10:14 AM, Mike Smith notifications@github.com wrote:

I realised the workers produced logs in /var/log/dask-worker.log (I don't know if this is because I added distributed.worker: debug to .dask/config.yaml).

Anyway, it looks like this is the problem:

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f95d1e4f588> exception was never retrieved: Traceback (most recent call last): File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run yielded = self.gen.send(value) File "/opt/anaconda/lib/python3.5/site-packages/distributed/core.py", line 239, in handle_comm

  • str(msg)) TypeError: Bad message type. Expected dict, got [{'key': 'cv-parameters-ffeaf7708bc5a807e04134ba8785cabf', 'op': 'release-task'}, {'key': 'array-ae6a72ba4372b6e6a66362a12d606e12', 'op': 'release-task'}] distributed.utils - ERROR - too many values to unpack (expected 2) Traceback (most recent call last): File "/opt/anaconda/lib/python3.5/site-packages/distributed/utils.py", line 381, in log_errors yield File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray is_custom, dt = header['dtype'] ValueError: too many values to unpack (expected 2) distributed.protocol.core - CRITICAL - Failed to deserialize Traceback (most recent call last): File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads value = _deserialize(head, fs) File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize return f(header, frames) File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray is_custom, dt = header['dtype'] ValueError: too many values to unpack (expected 2) distributed.worker - ERROR - too many values to unpack (expected 2) Traceback (most recent call last): File "/opt/anaconda/lib/python3.5/site-packages/distributed/worker.py", line 1040, in compute_stream msgs = yield comm.read() File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run value = future.result() File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run yielded = self.gen.send(value) File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/tcp.py", line 169, in read msg = from_frames(frames, deserialize=self.deserialize) File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/utils.py", line 30, in from_frames return protocol.loads(frames, deserialize=deserialize) File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads value = _deserialize(head, fs) File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize return f(header, frames) File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray is_custom, dt = header['dtype'] ValueError: too many values to unpack (expected 2) distributed.core - ERROR - too many values to unpack (expected 2) Traceback (most recent call last): File "/opt/anaconda/lib/python3.5/site-packages/distributed/core.py", line 260, in handle_comm result = yield result File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run value = future.result() File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run yielded = self.gen.throw(*exc_info) File "/opt/anaconda/lib/python3.5/site-packages/distributed/worker.py", line 1040, in compute_stream msgs = yield comm.read() File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run value = future.result() File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result raise_exc_info(self._exc_info) File "", line 3, in raise_exc_info File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run yielded = self.gen.send(value) File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/tcp.py", line 169, in read msg = from_frames(frames, deserialize=self.deserialize) File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/utils.py", line 30, in from_frames return protocol.loads(frames, deserialize=deserialize) File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads value = _deserialize(head, fs) File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize return f(header, frames) File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray is_custom, dt = header['dtype'] ValueError: too many values to unpack (expected 2)

So looking at the version of ~/anaconda3/lib/python3.6/ site-packages/distributed/protocol/numpy.py,

def deserialize_numpy_ndarray(header, frames): with log_errors(): if len(frames) > 1: frames = merge_frames(header, frames)

    if header.get('pickle'):
        return pickle.loads(frames[0])

    dt = header['dtype']
    if isinstance(dt, tuple):
        dt = list(dt)

    x = np.ndarray(header['shape'], dtype=dt, buffer=frames[0],
            strides=header['strides'])

    x = stride_tricks.as_strided(x, strides=header['strides'])

    return x

Compared to the version on the worker /opt/anaconda/lib/python3.5/ site-packages/distributed/protocol/numpy.py:

def deserialize_numpy_ndarray(header, frames): with log_errors(): if len(frames) > 1: frames = merge_frames(header, frames)

    if header.get('pickle'):
        return pickle.loads(frames[0])

    is_custom, dt = header['dtype']
    if is_custom:
        dt = pickle.loads(dt)
    else:
        dt = np.dtype(dt)

    x = np.ndarray(header['shape'], dtype=dt, buffer=frames[0],
                   strides=header['strides'])

    return x

My local version of distributed is

Python 3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:09:58) [GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux Type "help", "copyright", "credits" or "license" for more information.

import distributed distributed.version '1.16.3'

while on the work it is:

Python 3.5.2 |Continuum Analytics, Inc.| (default, Jul 2 2016, 17:53:06) [GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux Type "help", "copyright", "credits" or "license" for more information.

import distributed distributed.version '1.17.1'

I ran conda update distributed on my local machine. Updating: distributed: 1.16.3-py36_0 --> 1.17.1-py36_0

And it now works!

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/dask-ec2/issues/96#issuecomment-311672708, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszJE5MC5LrS8-KQMXKiRkCzTgne-vks5sIl_HgaJpZM4OF53v .

jmsking commented 5 years ago

I realised the workers produced logs in /var/log/dask-worker.log (I don't know if this is because I added distributed.worker: debug to .dask/config.yaml).

Anyway, it looks like this is the problem:

tornado.application - ERROR - Future <tornado.concurrent.Future object at 0x7f95d1e4f588> exception was never retrieved: Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/core.py", line 239, in handle_comm
    + str(msg))
TypeError: Bad message type.  Expected dict, got
  [{'key': 'cv-parameters-ffeaf7708bc5a807e04134ba8785cabf', 'op': 'release-task'}, {'key': 'array-ae6a72ba4372b6e6a66362a12d606e12', 'op': 'release-task'}]
distributed.utils - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/utils.py", line 381, in log_errors
    yield
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.worker - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/worker.py", line 1040, in compute_stream
    msgs = yield comm.read()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/tcp.py", line 169, in read
    msg = from_frames(frames, deserialize=self.deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/utils.py", line 30, in from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)
distributed.core - ERROR - too many values to unpack (expected 2)
Traceback (most recent call last):
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/core.py", line 260, in handle_comm
    result = yield result
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/worker.py", line 1040, in compute_stream
    msgs = yield comm.read()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/opt/anaconda/lib/python3.5/site-packages/tornado/gen.py", line 1024, in run
    yielded = self.gen.send(value)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/tcp.py", line 169, in read
    msg = from_frames(frames, deserialize=self.deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/comm/utils.py", line 30, in from_frames
    return protocol.loads(frames, deserialize=deserialize)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/core.py", line 121, in loads
    value = _deserialize(head, fs)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/serialize.py", line 161, in deserialize
    return f(header, frames)
  File "/opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py", line 98, in deserialize_numpy_ndarray
    is_custom, dt = header['dtype']
ValueError: too many values to unpack (expected 2)

So looking at the version of ~/anaconda3/lib/python3.6/site-packages/distributed/protocol/numpy.py,

def deserialize_numpy_ndarray(header, frames):
    with log_errors():
        if len(frames) > 1:
            frames = merge_frames(header, frames)

        if header.get('pickle'):
            return pickle.loads(frames[0])

        dt = header['dtype']
        if isinstance(dt, tuple):
            dt = list(dt)

        x = np.ndarray(header['shape'], dtype=dt, buffer=frames[0],
                strides=header['strides'])

        x = stride_tricks.as_strided(x, strides=header['strides'])

        return x

Compared to the version on the worker /opt/anaconda/lib/python3.5/site-packages/distributed/protocol/numpy.py:

def deserialize_numpy_ndarray(header, frames):
    with log_errors():
        if len(frames) > 1:
            frames = merge_frames(header, frames)

        if header.get('pickle'):
            return pickle.loads(frames[0])

        is_custom, dt = header['dtype']
        if is_custom:
            dt = pickle.loads(dt)
        else:
            dt = np.dtype(dt)

        x = np.ndarray(header['shape'], dtype=dt, buffer=frames[0],
                       strides=header['strides'])

        return x

My local version of distributed is

Python 3.6.1 |Anaconda 4.4.0 (64-bit)| (default, May 11 2017, 13:09:58) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import distributed
>>> distributed.__version__
'1.16.3'

while on the work it is:

Python 3.5.2 |Continuum Analytics, Inc.| (default, Jul  2 2016, 17:53:06) 
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import distributed
>>> distributed.__version__
'1.17.1'

I ran conda update distributed on my local machine. Updating: distributed: 1.16.3-py36_0 --> 1.17.1-py36_0

And it now works! Hi, lionfish0 @lionfish0 I can not find the log of workers in ~/var/log, can you share your .dask/config.yaml

mrocklin commented 5 years ago

This project is ancient and probably not supported any longer. I recommend looking through https://docs.dask.org/en/latest/setup/cloud.html

In particular, many AWS users seem to like Dask on EMR.