dask / dask-yarn

Deploy dask on YARN clusters
http://yarn.dask.org
BSD 3-Clause "New" or "Revised" License
69 stars 41 forks source link

AttributeError while running dask on amazon EMR. #158

Closed RahulJangir2003 closed 1 year ago

RahulJangir2003 commented 2 years ago

What happened: I want to run yarn cluster on EMR i have used the sample bootstrap script from the dask-yarn docs with little bit changes here is the script

Script that I have run ``` #!/bin/bash HELP="Usage: bootstrap-dask [OPTIONS] Example AWS EMR Bootstrap Action to install and configure Dask and Jupyter By default it does the following things: - Installs miniconda - Installs dask, distributed, dask-yarn, pyarrow, and s3fs. This list can be extended using the --conda-packages flag below. - Packages this environment for distribution to the workers. - Installs and starts a jupyter notebook server running on port 8888. This can be disabled with the --no-jupyter flag below. Options: --jupyter / --no-jupyter Whether to also install and start a Jupyter Notebook Server. Default is True. --password, -pw Set the password for the Jupyter Notebook Server. Default is 'dask-user'. --conda-packages Extra packages to install from conda. " # Parse Inputs. This is specific to this script, and can be ignored # ----------------------------------------------------------------- # ----------------------------------------------------------------------------- # 1. Check if running on the master node. If not, there's nothing do. # ----------------------------------------------------------------------------- grep -q '"isMaster": true' /mnt/var/lib/info/instance.json \ || { echo "Not running on master node, nothing to do" && exit 0; } # ----------------------------------------------------------------------------- # 2. Install Miniconda # ----------------------------------------------------------------------------- echo "Installing Miniconda" curl https://repo.anaconda.com/miniconda/Miniconda3-latest-Linux-x86_64.sh -o /tmp/miniconda.sh bash /tmp/miniconda.sh -b -p $HOME/miniconda rm /tmp/miniconda.sh echo -e '\nexport PATH=$HOME/miniconda/bin:$PATH' >> $HOME/.bashrc source $HOME/.bashrc conda update conda -y # configure conda environment #source ~/miniconda/etc/profile.d/conda.sh #conda activate base # ----------------------------------------------------------------------------- # 3. Install packages to use in packaged environment # # We install a few packages by default, and allow users to extend this list # with a CLI flag: # # - dask-yarn >= 0.7.0, for deploying Dask on YARN. # - pyarrow for working with hdfs, parquet, ORC, etc... # - s3fs for access to s3 # - conda-pack for packaging the environment for distribution # - ensure tornado 5, since tornado 6 doesn't work with jupyter-server-proxy # ----------------------------------------------------------------------------- echo "Installing base packages" conda install \ -c conda-forge \ -y \ -q \ dask-yarn \ s3fs \ conda-pack \ tornado pip3 install pyarrow # ----------------------------------------------------------------------------- # 4. Package the environment to be distributed to worker nodes # ----------------------------------------------------------------------------- echo "Packaging environment" conda pack -q -o $HOME/environment.tar.gz # ----------------------------------------------------------------------------- # 5. List all packages in the worker environment # ----------------------------------------------------------------------------- echo "Packages installed in the worker environment:" conda list # ----------------------------------------------------------------------------- # 6. Configure Dask # # This isn't necessary, but for this particular bootstrap script it will make a # few things easier: # # - Configure the cluster's dashboard link to show the proxied version through # jupyter-server-proxy. This allows access to the dashboard with only an ssh # tunnel to the notebook. # # - Specify the pre-packaged python environment, so users don't have to # # - Set the default deploy-mode to local, so the dashboard proxying works # # - Specify the location of the native libhdfs library so pyarrow can find it # on the workers and the client (if submitting applications). # ------------------------------------------------------------------------------ echo "Configuring Dask" mkdir -p $HOME/.config/dask cat <> $HOME/.config/dask/config.yaml distributed: dashboard: link: "/proxy/{port}/status" yarn: environment: /home/hadoop/environment.tar.gz deploy-mode: local worker: env: ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/ client: env: ARROW_LIBHDFS_DIR: /usr/lib/hadoop/lib/native/ EOT # Also set ARROW_LIBHDFS_DIR in ~/.bashrc so it's set for the local user echo -e '\nexport ARROW_LIBHDFS_DIR=/usr/lib/hadoop/lib/native' >> $HOME/.bashrc # ----------------------------------------------------------------------------- # 8. Install jupyter notebook server and dependencies # # We do this after packaging the worker environments to keep the tar.gz as # small as possible. # # We install the following packages: # # - notebook: the Jupyter Notebook Server # - ipywidgets: used to provide an interactive UI for the YarnCluster objects # - jupyter-server-proxy: used to proxy the dask dashboard through the notebook server # ----------------------------------------------------------------------------- echo "Installing Jupyter" conda install \ -c conda-forge \ -y \ -q \ notebook \ ipywidgets \ jupyter-server-proxy \ jupyter # ----------------------------------------------------------------------------- # 9. List all packages in the client environment # ----------------------------------------------------------------------------- echo "Packages installed in the client environment:" conda list # ----------------------------------------------------------------------------- # 10. Configure Jupyter Notebook # ----------------------------------------------------------------------------- echo "Configuring Jupyter" mkdir -p $HOME/.jupyter JUPYTER_PASSWORD="dask-user" HASHED_PASSWORD=`python -c "from notebook.auth import passwd; print(passwd('$JUPYTER_PASSWORD'))"` cat <> $HOME/.jupyter/jupyter_notebook_config.py c.NotebookApp.password = u'$HASHED_PASSWORD' c.NotebookApp.open_browser = False c.NotebookApp.ip = '0.0.0.0' c.NotebookApp.port = 8888 EOF # ----------------------------------------------------------------------------- # 11. Define an upstart service for the Jupyter Notebook Server # # This sets the notebook server up to properly run as a background service. # ----------------------------------------------------------------------------- echo "Configuring Jupyter Notebook Upstart Service" cat < /tmp/jupyter-notebook.service [Unit] Description=Jupyter Notebook [Service] ExecStart=$HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py Type=simple PIDFile=/run/jupyter.pid WorkingDirectory=$HOME Restart=always RestartSec=10 [Install] WantedBy=multi-user.target EOF sudo mv /tmp/jupyter-notebook.service /etc/systemd/system/ sudo systemctl enable jupyter-notebook.service # ----------------------------------------------------------------------------- # 12. Start the Jupyter Notebook Server # ----------------------------------------------------------------------------- echo "Starting Jupyter Notebook Server" sudo systemctl daemon-reload sudo systemctl restart jupyter-notebook.service $HOME/miniconda/bin/jupyter-notebook --allow-root --config=$HOME/.jupyter/jupyter_notebook_config.py ```

I have run this script on SSH command line after running the cluster. Original bootstrap script was note running like it is not able to install pyarrow using conda so i have used pip instead. and some other error were there too like #122

after running the script i was able to successfully run jupyter notebook and when i tried to run

from dask_yarn import YarnCluster
from dask.distributed import Client

# Create a cluster
cluster = YarnCluster()

# Connect to the cluster
client = Client(cluster)

I got error

AttributeError                            Traceback (most recent call last)
Input In [3], in <cell line: 1>()
----> 1 client = Client(cluster)

File ~/miniconda/lib/python3.9/site-packages/distributed/client.py:835, in Client.__init__(self, address, loop, timeout, set_as_default, scheduler_file, security, asynchronous, name, heartbeat_interval, serializers, deserializers, extensions, direct_to_workers, connection_limit, **kwargs)
    832 elif isinstance(getattr(address, "scheduler_address", None), str):
    833     # It's a LocalCluster or LocalCluster-compatible object
    834     self.cluster = address
--> 835     status = getattr(self.cluster, "status")
    836     if status and status in [Status.closed, Status.closing]:
    837         raise RuntimeError(
    838             f"Trying to connect to an already closed or closing Cluster {self.cluster}."
    839         )

AttributeError: 'YarnCluster' object has no attribute 'status'

when i am using local cluster it's running perfectly.

Cluster Dump State:
RahulJangir2003 commented 2 years ago

Here are the detailed container logs

/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/dask_yarn/core.py:16: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead.
  from distributed.utils import (
/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/dask_yarn/core.py:16: FutureWarning: parse_timedelta is deprecated and will be removed in a future release. Please use dask.utils.parse_timedelta instead.
  from distributed.utils import (
/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/dask_yarn/cli.py:16: DeprecationWarning: the distributed.cli.utils module is deprecated
  from distributed.cli.utils import install_signal_handlers
/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/dask_yarn/core.py:16: FutureWarning: format_bytes is deprecated and will be removed in a future release. Please use dask.utils.format_bytes instead.
  from distributed.utils import (
/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/dask_yarn/core.py:16: FutureWarning: parse_timedelta is deprecated and will be removed in a future release. Please use dask.utils.parse_timedelta instead.
  from distributed.utils import (
Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/deploy.py", line 12, in <module>
    main()
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/deploy.py", line 8, in main
    client = Client(cluster)
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/distributed/client.py", line 835, in __init__
    status = getattr(self.cluster, "status")
AttributeError: 'YarnCluster' object has no attribute 'status'
Exception ignored in: <function YarnCluster.__del__ at 0x7faa8b7afb80>
Traceback (most recent call last):
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/dask_yarn/core.py", line 788, in __del__
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/dask_yarn/core.py", line 780, in close
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/dask_yarn/core.py", line 771, in shutdown
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/distributed/utils.py", line 504, in stop
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/distributed/utils.py", line 519, in _stop_unlocked
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/distributed/utils.py", line 528, in _real_stop
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 321, in close
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/site-packages/tornado/platform/asyncio.py", line 140, in close
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/asyncio/unix_events.py", line 58, in close
  File "/mnt/yarn/usercache/hadoop/appcache/application_1655961395149_0009/container_1655961395149_0009_01_000003/environment/lib/python3.9/asyncio/selector_events.py", line 84, in close
RuntimeError: Cannot close a running event loop
2022-06-23 08:46:51,228 - distributed.core - WARNING - rpc object <rpc to 'tcp://10.0.1.106:39285', 1 comms> deleted with 1 open comms
User submitted application deploy.py failed with returncode 1, shutting down.
RahulJangir2003 commented 2 years ago

I am able to do this if i pass scheduler address in Client. like instead of client = Client(cluster) i use client = Client('tcp://10.0.1.64:42047') then it don't give any error and i am able to submit the application So now in the code i have to use client = Client(cluster.scheduler_address) instead. Is that a bug or ia m doing something wrong

jacobtomlinson commented 1 year ago

Closing as duplicate of #155