dask / dask-yarn

Deploy dask on YARN clusters
http://yarn.dask.org
BSD 3-Clause "New" or "Revised" License
69 stars 41 forks source link

YarnCluster hangs #143

Closed lucio-f closed 3 years ago

lucio-f commented 3 years ago

Hello, I'm trying to run a project on a Hadoop/Yarn cluster. I've been using it without any significant issues on my machine with LocalCluster, but every time I try to deploy it on the cluster it hangs. The application is created and classified as running, in the logs I can see that the remote scheduler and (sometimes, but not always) workers have been created (so the Conda environment should not be the problem), but my script never goes beyond the YarnCluster constructor. I've tried stripping it down to the bare minimum, and it still hangs:

from dask_yarn import YarnCluster

print("Connecting to the cluster")
with YarnCluster(environment="test.tar.gz") as cluster: # Hangs here, test.tar.gz has only dask_yarn and its dependencies
    client = Client(cluster)
    print("Connection to the cluster enstablished")

I have tried to debug it and it seems to be stuck here:

    with YarnCluster(environment="test.tar.gz") as cluster:
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 383, in __init__
    self._init_common(
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 533, in _init_common
    self._sync(self._start_internal())
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 701, in _sync
    return future.result()
  File "/home/luc/miniconda3/envs/test/lib/python3.8/concurrent/futures/_base.py", line 434, in result
    self._condition.wait(timeout)

This is all I see:

Connecting to the cluster
21/03/22 22:16:42 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/22 22:16:42 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
21/03/22 22:16:42 INFO client.RequestHedgingRMFailoverProxyProvider: Created wrapped proxy for [rm1, rm2]
21/03/22 22:16:42 INFO client.AHSProxy: Connecting to Application History server at epod-master3.vgt.vito.be/192.168.207.58:10200
21/03/22 22:16:43 INFO skein.Driver: Driver started, listening on 43012
21/03/22 22:16:43 INFO conf.Configuration: resource-types.xml not found
21/03/22 22:16:43 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/03/22 22:16:43 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...
21/03/22 22:16:44 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm2]
21/03/22 22:16:44 INFO hdfs.DFSClient: Created token for luc: HDFS_DELEGATION_TOKEN owner=***, renewer=yarn, realUser=, issueDate=*************, maxDate=*************, sequenceNumber=*******, masterKeyId=**** on ha-hdfs:hacluster
21/03/22 22:16:44 INFO security.TokenCache: Got dt for hdfs://hacluster; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hacluster, Ident: (token for luc: HDFS_DELEGATION_TOKEN owner=***, renewer=yarn, realUser=, issueDate=*************, maxDate=*************, sequenceNumber=*******, masterKeyId=****)
21/03/22 22:16:44 INFO skein.Driver: Uploading application resources to hdfs://hacluster/user/luc/.skein/application_1611572280718_138279
21/03/22 22:16:47 INFO skein.Driver: Submitting application...
21/03/22 22:16:47 INFO impl.TimelineClientImpl: Timeline service address: epod-master3.vgt.vito.be:8188
21/03/22 22:16:47 INFO impl.YarnClientImpl: Submitted application application_1611572280718_138279

Here are some other logs (after I have manually killed the application)

21/03/22 22:16:53 INFO skein.ApplicationMaster: Starting Skein version 0.8.1
21/03/22 22:16:56 INFO skein.ApplicationMaster: Running as user luc
21/03/22 22:16:56 INFO conf.Configuration: found resource resource-types.xml at file:/etc/hadoop/3.1.4.0-315/0/resource-types.xml
21/03/22 22:16:56 INFO skein.ApplicationMaster: Application specification successfully loaded
21/03/22 22:16:58 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/22 22:16:58 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
21/03/22 22:16:59 INFO client.RequestHedgingRMFailoverProxyProvider: Created wrapped proxy for [rm1, rm2]
21/03/22 22:17:00 INFO skein.ApplicationMaster: gRPC server started at epod120.vgt.vito.be:44750
21/03/22 22:17:01 INFO skein.ApplicationMaster: WebUI server started at epod120.vgt.vito.be:42192
21/03/22 22:17:01 INFO skein.ApplicationMaster: Registering application with resource manager
21/03/22 22:17:01 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...
21/03/22 22:17:02 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm2]
21/03/22 22:17:04 INFO client.RequestHedgingRMFailoverProxyProvider: Created wrapped proxy for [rm1, rm2]
21/03/22 22:17:04 INFO client.AHSProxy: Connecting to Application History server at epod-master3.vgt.vito.be/192.168.207.58:10200
21/03/22 22:17:04 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...
21/03/22 22:17:05 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm2]
21/03/22 22:17:05 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
21/03/22 22:17:05 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
21/03/22 22:17:05 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
21/03/22 22:17:06 INFO skein.ApplicationMaster: Starting container_e4897_1611572280718_138279_01_000003...
21/03/22 22:17:06 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_e4897_1611572280718_138279_01_000003
21/03/22 22:22:45 WARN skein.ApplicationMaster: Application master shutdown by external signal. This usually means that the application was killed by a user/administrator, or that the application master memory limit was exceeded. See the diagnostics for more information.
21/03/22 22:22:45 INFO skein.ApplicationMaster: Unregistering application with status FAILED
21/03/22 22:22:45 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
21/03/22 22:22:45 INFO skein.ApplicationMaster: Deleted application directory hdfs://hacluster/user/luc/.skein/application_1611572280718_138279
distributed.http.proxy - INFO - To route to workers diagnostics web server please install jupyter-server-proxy: python -m pip install jupyter-server-proxy
distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at: tcp://192.168.207.235:41802
distributed.scheduler - INFO -   dashboard at:                    :33843

If a worker is started, I see something like this, but it does not always appears:

distributed.nanny - INFO -         Start Nanny at: 'tcp://192.168.207.196:40402'
distributed.worker - INFO -       Start worker at: tcp://192.168.207.196:38274
distributed.worker - INFO -          Listening to: tcp://192.168.207.196:38274
distributed.worker - INFO -          dashboard at:      192.168.207.196:39918
distributed.worker - INFO - Waiting to connect to: tcp://192.168.207.225:42006
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                    4.29 GB
distributed.worker - INFO -       Local Directory: /data1/hadoop/yarn/local/usercache/luc/appcache/application_1611572280718_137944/container_e4897_1611572280718_137944_01_000004/dask-worker-space/worker-j1goommp
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -         Registered to: tcp://192.168.207.225:42006
distributed.worker - INFO - -------------------------------------------------
distributed.core - INFO - Starting established connection
distributed.process - INFO - reaping stray process <SpawnProcess name='Dask Worker process (from Nanny)' pid=14568 parent=14504 started daemon>
Exception in thread AsyncProcess Dask Worker process (from Nanny) watch process join:
Traceback (most recent call last):
  File "/data1/hadoop/yarn/local/usercache/luc/appcache/application_1611572280718_137944/container_e4897_1611572280718_137944_01_000004/environment/lib/python3.8/threading.py", line 932, in _bootstrap_inner

Environment:

quasiben commented 3 years ago

YarnCluster should start a scheduler but does not create any workers. You will need to scale the cluster:

with YarnCluster(environment="test.tar.gz") as cluster: 
    cluster.scale(2)
    client = Client(cluster)
lucio-f commented 3 years ago

YarnCluster should start a scheduler but does not create any workers. You will need to scale the cluster:

with YarnCluster(environment="test.tar.gz") as cluster: 
    cluster.scale(2)
    client = Client(cluster)

The problem is that it doesn't even reach that line (I even tried setting workers in the constructor but to no avail).

quasiben commented 3 years ago

Can you see the dashboard in the browser ?

lucio-f commented 3 years ago

No, I can only ping the IP of the container

quasiben commented 3 years ago

Hmm, maybe that suggests a networking issue between where you are launching the cluster and the YARN resource itself. Can you try deploy_mode="remote" ?

lucio-f commented 3 years ago

It is already deployed remotely, if I try to set it to local I can see the dashboard, it still hangs on the same location, and after a few seconds the script terminates (and with it the scheduler).

distributed.scheduler - INFO - Clear task state
distributed.scheduler - INFO -   Scheduler at:  tcp://10.11.11.181:46704
distributed.scheduler - INFO -   dashboard at:                    :46488
21/03/23 01:19:38 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/23 01:19:38 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
21/03/23 01:19:39 INFO client.RequestHedgingRMFailoverProxyProvider: Created wrapped proxy for [rm1, rm2]
21/03/23 01:19:39 INFO client.AHSProxy: Connecting to Application History server at epod-master3.vgt.vito.be/192.168.207.58:10200
21/03/23 01:19:39 INFO skein.Driver: Driver started, listening on 35001
21/03/23 01:19:40 INFO conf.Configuration: resource-types.xml not found
21/03/23 01:19:40 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
21/03/23 01:19:40 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...
21/03/23 01:19:40 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm2]
21/03/23 01:19:41 INFO hdfs.DFSClient: Created token for luc: HDFS_DELEGATION_TOKEN owner=luc@VGT.VITO.BE, renewer=yarn, realUser=, issueDate=1616458781077, maxDate=1617063581077, sequenceNumber=4052285, masterKeyId=1956 on ha-hdfs:hacluster
21/03/23 01:19:41 INFO security.TokenCache: Got dt for hdfs://hacluster; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:hacluster, Ident: (token for luc: HDFS_DELEGATION_TOKEN owner=luc@VGT.VITO.BE, renewer=yarn, realUser=, issueDate=1616458781077, maxDate=1617063581077, sequenceNumber=4052285, masterKeyId=1956)
21/03/23 01:19:41 INFO skein.Driver: Uploading application resources to hdfs://hacluster/user/luc/.skein/application_1611572280718_138530
21/03/23 01:19:43 INFO skein.Driver: Submitting application...
21/03/23 01:19:43 INFO impl.TimelineClientImpl: Timeline service address: epod-master3.vgt.vito.be:8188
21/03/23 01:19:44 INFO impl.YarnClientImpl: Submitted application application_1611572280718_138530
21/03/23 01:20:18 INFO impl.YarnClientImpl: Killed application application_1611572280718_138530
distributed.scheduler - INFO - Scheduler closing...
distributed.scheduler - INFO - Scheduler closing all comms
Traceback (most recent call last):
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 81, in submit_and_handle_failures
    yield skein_client.connect(app_id, security=spec.master.security)
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 556, in _start_cluster
    app.kv["dask.scheduler"] = scheduler_address.encode()
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/skein/kv.py", line 607, in __setitem__
    self.put(key, value=value)
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/skein/kv.py", line 802, in method
    return self._apply_op(cls(*args, **kwargs))
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/skein/kv.py", line 597, in _apply_op
    resp = self._client._call(op._rpc, req, timeout=timeout)
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/skein/core.py", line 279, in _call
    raise ConnectionError("Unable to connect to %s" % self._server_name)
skein.exceptions.ConnectionError: Unable to connect to application

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/luc/test/test.py", line 6, in <module>
    with YarnCluster(environment="test.tar.gz", deploy_mode="local") as cluster:
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 383, in __init__
    self._init_common(
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 533, in _init_common
    self._sync(self._start_internal())
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 701, in _sync
    return future.result()
  File "/home/luc/miniconda3/envs/test/lib/python3.8/concurrent/futures/_base.py", line 439, in result
    return self.__get_result()
  File "/home/luc/miniconda3/envs/test/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 592, in _start_internal
    await self._start_task
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 607, in _start_async
    await self.loop.run_in_executor(None, self._start_cluster)
  File "/home/luc/miniconda3/envs/test/lib/python3.8/concurrent/futures/thread.py", line 57, in run
    result = self.fn(*self.args, **self.kwargs)
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 558, in _start_cluster
    app.kv["dask.dashboard"] = dashboard_address.encode()
  File "/home/luc/miniconda3/envs/test/lib/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 88, in submit_and_handle_failures
    raise DaskYarnError(
dask_yarn.core.DaskYarnError: Failed to start dask-yarn application_1611572280718_138530
See the application logs for more information:

$ yarn logs -applicationId application_1611572280718_138530
Exception ignored in: <function YarnCluster.__del__ at 0x7f58214b7700>
Traceback (most recent call last):
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 750, in __del__
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 742, in close
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/dask_yarn/core.py", line 733, in shutdown
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/distributed/utils.py", line 463, in stop
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/distributed/utils.py", line 478, in _stop_unlocked
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/distributed/utils.py", line 487, in _real_stop
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/tornado/platform/asyncio.py", line 321, in close
  File "/home/luc/miniconda3/envs/test/lib/python3.8/site-packages/tornado/platform/asyncio.py", line 140, in close
  File "/home/luc/miniconda3/envs/test/lib/python3.8/asyncio/unix_events.py", line 58, in close
  File "/home/luc/miniconda3/envs/test/lib/python3.8/asyncio/selector_events.py", line 89, in close
RuntimeError: Cannot close a running event loop
(test) [luc@luc test]$ yarn logs -applicationId application_1611572280718_138530
2021-03-23 01:21:34,469 INFO client.RequestHedgingRMFailoverProxyProvider: Created wrapped proxy for [rm1, rm2]
2021-03-23 01:21:34,471 INFO client.AHSProxy: Connecting to Application History server at epod-master3.vgt.vito.be/192.168.207.58:10200
2021-03-23 01:21:34,816 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...
2021-03-23 01:21:35,187 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm2]
2021-03-23 01:21:36,346 INFO zlib.ZlibFactory: Successfully loaded & initialized native-zlib library
2021-03-23 01:21:36,346 INFO compress.CodecPool: Got brand-new decompressor [.deflate]
Container: container_e4897_1611572280718_138530_01_000001 on epod123.vgt.vito.be_45454
LogAggregationType: AGGREGATED

The log is not very helpful either

21/03/23 01:19:48 INFO skein.ApplicationMaster: Starting Skein version 0.8.1
21/03/23 01:19:50 INFO skein.ApplicationMaster: Running as user luc
21/03/23 01:19:51 INFO conf.Configuration: found resource resource-types.xml at file:/etc/hadoop/3.1.4.0-315/0/resource-types.xml
21/03/23 01:19:51 INFO skein.ApplicationMaster: Application specification successfully loaded
21/03/23 01:19:53 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/23 01:19:53 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
21/03/23 01:19:54 INFO client.RequestHedgingRMFailoverProxyProvider: Created wrapped proxy for [rm1, rm2]
21/03/23 01:19:56 INFO skein.ApplicationMaster: gRPC server started at epod123.vgt.vito.be:39279
21/03/23 01:19:58 INFO skein.ApplicationMaster: WebUI server started at epod123.vgt.vito.be:35316
21/03/23 01:19:58 INFO skein.ApplicationMaster: Registering application with resource manager
21/03/23 01:19:58 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...
21/03/23 01:19:58 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm2]
21/03/23 01:20:00 INFO client.RequestHedgingRMFailoverProxyProvider: Created wrapped proxy for [rm1, rm2]
21/03/23 01:20:00 INFO client.AHSProxy: Connecting to Application History server at epod-master3.vgt.vito.be/192.168.207.58:10200
21/03/23 01:20:00 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...
21/03/23 01:20:01 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm2]
21/03/23 01:20:01 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
21/03/23 01:20:18 WARN skein.ApplicationMaster: Application master shutdown by external signal. This usually means that the application was killed by a user/administrator, or that the application master memory limit was exceeded. See the diagnostics for more information.
21/03/23 01:20:18 INFO skein.ApplicationMaster: Unregistering application with status FAILED
21/03/23 01:20:18 WARN ipc.Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.StandbyException): Operation category READ is not supported in state standby. Visit https://s.apache.org/sbnn-error
21/03/23 01:20:18 INFO skein.ApplicationMaster: Deleted application directory hdfs://hacluster/user/luc/.skein/application_1611572280718_138530
quasiben commented 3 years ago

Can you describe a bit more of your setup ? Are you on an edge node ? Can you navigate to the YARN WebUI ? It seems like a networking issue still. Perhaps a way of confirming this is submitting a different yarn job ?

lucio-f commented 3 years ago

Sure, thank you for your replies. I am submitting my application from an edge node by simply running python test.py, and I can also see the YARN WebUI from there. I tried to submit the simple hello_world Skein application just to check if there were any problem with that and it worked without any issue.

quasiben commented 3 years ago

Can you add the env var DASK_LOGGING__DISTRIBUTED=debug to YarnCluster and see if there any useful debug output?

lucio-f commented 3 years ago

I added the env var DASK_LOGGING__DISTRIBUTED=debug to the edge node and then launched the script with deploy_mode="local" (if that is what you meant), but didn't see anything new.

lucio-f commented 3 years ago

I tried to run the Skein echo-server example and I think I have the same issue, so I don't think it is a problem with Dask-Yarn. Thank you for your support @quasiben.