dask / dask-yarn

Deploy dask on YARN clusters
http://yarn.dask.org
BSD 3-Clause "New" or "Revised" License
69 stars 41 forks source link

Cluster and Client creation error when using EMR #124

Open nmerket opened 4 years ago

nmerket commented 4 years ago

What happened:

Running dask-yarn on EMR causes a repeating error in tornado on client creation.

What you expected to happen:

No error, just the client being created and being able to use it.

Minimal Complete Verifiable Example:

I am using dask-yarn on EMR. I followed the directions outlined here and used the unaltered bootstrap-dask script from this repo. I used the emr-5.29.0 release to avoid the other bootstrap issue #122. Connect to the master node and open jupyter notebook. Start with a new notebook.

from dask_yarn import YarnCluster
from dask.distributed import Client

# Create a cluster
cluster = YarnCluster()

# Connect to the cluster
client = Client(cluster)
client

I then get this error every few seconds reported back to the notebook:

distributed.scheduler - INFO - Receive client connection: Client-d994493a-d748-11ea-afbf-025317f23bc7
distributed.core - INFO - Starting established connection
Exception in callback with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968
handle: <Handle with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968>
Traceback (most recent call last):
  File "/home/hadoop/miniconda/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py", line 970, in error_callback
    future.result()
asyncio.exceptions.CancelledError

Anything else we need to know?:

Environment:

aimran-adroll commented 4 years ago

I had to downgrade to python3.7 and then it started working again

here is the bit from bootstap.sh curl https://repo.anaconda.com/miniconda/Miniconda3-py37_4.8.3-Linux-x86_64.sh -o /tmp/miniconda.sh

Notice I pinned to 3.7.x

But it'd be good to get to the bottom of it.

timothymugayi commented 4 years ago

I seem to be having the same issue running on python 3.8

Python 3.8.3 (default, May 19 2020, 18:47:26)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from dask_yarn import YarnCluster
>>> cluster = YarnCluster(environment="/home/hadoop/environment.tar.gz")
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.0.10.191:46623
20/10/21 02:46:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/10/21 02:46:41 INFO client.RMProxy: Connecting to ResourceManager at ip-10-0-10-191.ec2.internal/10.0.10.191:8032
20/10/21 02:46:41 INFO client.AHSProxy: Connecting to Application History server at ip-10-0-10-191.ec2.internal/10.0.10.191:10200
20/10/21 02:46:42 INFO skein.Driver: Driver started, listening on 40913
20/10/21 02:46:42 INFO conf.Configuration: resource-types.xml not found
20/10/21 02:46:42 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/10/21 02:46:42 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/10/21 02:46:42 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/10/21 02:46:42 INFO skein.Driver: Uploading application resources to hdfs://ip-10-0-10-191.ec2.internal:8020/user/hadoop/.skein/application_1603177429164_0002
20/10/21 02:46:43 INFO skein.Driver: Submitting application...
20/10/21 02:46:43 INFO impl.YarnClientImpl: Submitted application application_1603177429164_0002
>>>
>>>
>>> cluster.scale(1)
>>>
>>> distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:38381', name: dask.worker_0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:38381
distributed.core - INFO - Starting established connection
Exception in callback with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968
handle: <Handle with_timeout.<locals>.error_callback(<Future cancelled>) at /home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py:968>
Traceback (most recent call last):
  File "/home/hadoop/miniconda/lib/python3.8/asyncio/events.py", line 81, in _run
    self._context.run(self._callback, *self._args)
  File "/home/hadoop/miniconda/lib/python3.8/site-packages/tornado/gen.py", line 970, in error_callback
    future.result()
asyncio.exceptions.CancelledError
timothymugayi commented 4 years ago

Downgraded to python 3.7.9 and all seems well

Steps taken

Python 3.7.9 (default, Aug 31 2020, 12:42:55)
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>
>>>
>>> from dask_yarn import YarnCluster
>>>
>>> cluster = YarnCluster(environment="/home/hadoop/environment.tar.gz")
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:   tcp://10.0.10.191:35369
distributed.scheduler - INFO -   dashboard at:                    :41787
20/10/21 05:09:26 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
20/10/21 05:09:26 INFO client.RMProxy: Connecting to ResourceManager at ip-10-0-10-191.ec2.internal/10.0.10.191:8032
20/10/21 05:09:26 INFO client.AHSProxy: Connecting to Application History server at ip-10-0-10-191.ec2.internal/10.0.10.191:10200
20/10/21 05:09:27 INFO skein.Driver: Driver started, listening on 41837
20/10/21 05:09:27 INFO conf.Configuration: resource-types.xml not found
20/10/21 05:09:27 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
20/10/21 05:09:27 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE
20/10/21 05:09:27 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE
20/10/21 05:09:27 INFO skein.Driver: Uploading application resources to hdfs://ip-10-0-10-191.ec2.internal:8020/user/hadoop/.skein/application_1603177429164_0003
20/10/21 05:09:28 INFO skein.Driver: Submitting application...
20/10/21 05:09:28 INFO impl.YarnClientImpl: Submitted application application_1603177429164_0003
>>>
>>>
>>> cluster.scale(1)
>>> distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:40657', name: dask.worker_0, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:40657
distributed.core - INFO - Starting established connection

>>>
>>>
>>>
>>> cluster.scale(3)
>>> distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:44253', name: dask.worker_1, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:44253
distributed.core - INFO - Starting established connection
distributed.scheduler - INFO - Register worker <Worker 'tcp://10.0.10.161:37565', name: dask.worker_2, memory: 0, processing: 0>
distributed.scheduler - INFO - Starting worker compute stream, tcp://10.0.10.161:37565
distributed.core - INFO - Starting established connection

>>>
>>>
>>> from dask.distributed import Client
>>>
>>> import dask.array as da
>>> client = Client(cluster)
distributed.scheduler - INFO - Receive client connection: Client-a0c24102-135d-11eb-a1a6-0a89473033af
distributed.core - INFO - Starting established connection
>>>
>>>
>>> array = da.ones((10000, 10000, 10000))
>>> print(array.mean().compute())
distributed.core - INFO - Event loop was unresponsive in Scheduler for 8.97s.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.

1.0
>>>
>>> len(client.scheduler_info()['workers'])
3
>>>
quasiben commented 3 years ago

Can you try again with latest dask/distributed ? should be 2.30.0. I don't think we want to force users to python 3.7