dask / dask-yarn

Deploy dask on YARN clusters
http://yarn.dask.org
BSD 3-Clause "New" or "Revised" License
69 stars 41 forks source link

Dask on Yarn ( HDP 2.6 ) #23

Closed hamroune closed 5 years ago

hamroune commented 6 years ago
18/09/25 15:24:57 INFO skein.Daemon: Removing callbacks for application_1537170416259_7296
18/09/25 15:25:36 INFO impl.YarnClientImpl: Killed application application_1537170416259_7296
Traceback (most recent call last):
  File "main.py", line 10, in <module>
    cluster = YarnCluster()
  File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/dask_yarn/core.py", line 179, in __init__
    self._start_cluster(spec, skein_client)
  File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/dask_yarn/core.py", line 204, in _start_cluster
    scheduler_address = app.kv.wait('dask.scheduler').decode()
  File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/skein/kv.py", line 655, in wait
    event = event_queue.get()
  File "/analytics/anaconda3/envs/myenv/lib/python3.6/site-packages/skein/kv.py", line 281, in get
    raise out
skein.exceptions.ConnectionError: Unable to connect to application
18/09/25 15:25:36 INFO skein.Daemon: Starting process disconnected, shutting down
18/09/25 15:25:36 INFO skein.Daemon: Shutting down gRPC server
18/09/25 15:25:36 INFO skein.Daemon: gRPC server shut down`

my main.py

from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.dataframe as dd
import dask

print('dask.config.config =>', dask.config.config)
cluster = YarnCluster()
client = Client(cluster)
version = client.get_versions(check=True)
print('version', version)
 df = dd.read_csv('hdfs:///data/my.csv')
 print(df)

`

the coda.yml

---
name: myenv
channels:
  - defaults
  - conda-forge
dependencies:
  - conda-pack
  - python=3.6
  - dask
  - dask-yarn
  - gcsfs
  - pyarrow
  - hdfs3
  - libhdfs3

the config/dask/yarn.yaml

`yarn:
#   specification: null        # A full Skein specification or path to a
#                              # specification yaml file. Overrides the following
#                              # configuration if given

   name: dask                 # Application name
   queue: defult             # Yarn queue to deploy to
   environment: environment.tar.gz          # Path to conda packed environment
#   tags: []                   # List of strings to tag applications

   scheduler:                 # Specifications of scheduler container
     vcores: 1
     memory: 2GiB

   worker:                   # Specifications of worker containers
     vcores: 1
     memory: 2GiB
     count: 2                # Number of workers to start on initialization
     restarts: -1            # Allowed number of restarts, -1 for unlimited
     env: {}                 # A map of environment variables to set on the worker`

the only logs that I have are poor

18/09/25 15:25:13 INFO skein.ApplicationMaster: INTIALIZING: dask.worker:
18/09/25 15:25:13 INFO skein.ApplicationMaster: WAITING: dask.worker_0:
18/09/25 15:25:13 INFO skein.ApplicationMaster: WAITING: dask.worker_1:
18/09/25 15:25:13 INFO skein.ApplicationMaster: INTIALIZING: dask.scheduler:
18/09/25 15:25:13 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0:
18/09/25 15:25:14 INFO skein.ApplicationMaster: watch called:
18/09/25 15:25:14 INFO skein.ApplicationMaster: Created Watcher. id=0, start=dask.scheduler, end=dask.scheduler, type=PUT:
18/09/25 15:25:15 INFO impl.AMRMClientImpl: Received new token for : server1:45454:
18/09/25 15:25:15 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000002:
18/09/25 15:25:15 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_e50_1537170416259_7296_01_000002:
18/09/25 15:25:15 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0:
18/09/25 15:25:15 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1:
18/09/25 15:25:15 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:17 INFO impl.AMRMClientImpl: Received new token for : server1:45454:
18/09/25 15:25:17 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000003:
18/09/25 15:25:17 INFO skein.ApplicationMaster: RUNNING: dask.worker_0 on container_e50_1537170416259_7296_01_000003:
18/09/25 15:25:17 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000004:
18/09/25 15:25:17 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:17 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_e50_1537170416259_7296_01_000004:
18/09/25 15:25:17 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:18 INFO skein.ApplicationMaster: FAILED: dask.worker_1:
18/09/25 15:25:18 INFO skein.ApplicationMaster: RESTARTING: adding new container to replace dask.worker_1:
18/09/25 15:25:18 INFO skein.ApplicationMaster: REQUESTED: dask.worker_2:
18/09/25 15:25:20 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000005:
18/09/25 15:25:20 INFO skein.ApplicationMaster: RUNNING: dask.worker_2 on container_e50_1537170416259_7296_01_000005:
18/09/25 15:25:20 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:52 INFO skein.ApplicationMaster: Shutting down gRPC server:
18/09/25 15:25:52 INFO skein.ApplicationMaster: gRPC server shut down:

is it possible to give me some insights about how to launch dask on yarn, how to configure the debug mode?

thank you very much for your support, in advance

jcrist commented 6 years ago

Hmmm. Is that the full output of the application logs? It'd be good to see the logs from the worker or scheduler containers as well (if any), specifically I'm looking for the full output of:

yarn logs -applicationId application_1537170416259_7296
hamroune commented 6 years ago

Hi jcrist, thank you for reply the whole log

LogType:appmaster.log
Log Upload Time:Tue Sep 25 15:25:22 +0200 2018
LogLength:3252
Log Contents:
18/09/25 15:25:12 INFO skein.ApplicationMaster: Application specification successfully loaded:
18/09/25 15:25:12 INFO skein.ApplicationMaster: user: me:
18/09/25 15:25:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable:
18/09/25 15:25:12 INFO client.RMProxy: Connecting to ResourceManager at server1/10.10.10.125:8030:
18/09/25 15:25:12 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0:
18/09/25 15:25:12 INFO skein.ApplicationMaster: Server started, listening on 46765:
18/09/25 15:25:13 INFO skein.ApplicationMaster: INTIALIZING: dask.worker:
18/09/25 15:25:13 INFO skein.ApplicationMaster: WAITING: dask.worker_0:
18/09/25 15:25:13 INFO skein.ApplicationMaster: WAITING: dask.worker_1:
18/09/25 15:25:13 INFO skein.ApplicationMaster: INTIALIZING: dask.scheduler:
18/09/25 15:25:13 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0:
18/09/25 15:25:14 INFO skein.ApplicationMaster: watch called:
18/09/25 15:25:14 INFO skein.ApplicationMaster: Created Watcher. id=0, start=dask.scheduler, end=dask.scheduler, type=PUT:
18/09/25 15:25:15 INFO impl.AMRMClientImpl: Received new token for : server1:45454:
18/09/25 15:25:15 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000002:
18/09/25 15:25:15 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_e50_1537170416259_7296_01_000002:
18/09/25 15:25:15 INFO skein.ApplicationMaster: REQUESTED: dask.worker_0:
18/09/25 15:25:15 INFO skein.ApplicationMaster: REQUESTED: dask.worker_1:
18/09/25 15:25:15 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:17 INFO impl.AMRMClientImpl: Received new token for : server2:45454:
18/09/25 15:25:17 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000003:
18/09/25 15:25:17 INFO skein.ApplicationMaster: RUNNING: dask.worker_0 on container_e50_1537170416259_7296_01_000003:
18/09/25 15:25:17 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000004:
18/09/25 15:25:17 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:17 INFO skein.ApplicationMaster: RUNNING: dask.worker_1 on container_e50_1537170416259_7296_01_000004:
18/09/25 15:25:17 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server2:45454:
18/09/25 15:25:18 INFO skein.ApplicationMaster: FAILED: dask.worker_1:
18/09/25 15:25:18 INFO skein.ApplicationMaster: RESTARTING: adding new container to replace dask.worker_1:
18/09/25 15:25:18 INFO skein.ApplicationMaster: REQUESTED: dask.worker_2:
18/09/25 15:25:20 INFO skein.ApplicationMaster: Starting container_e50_1537170416259_7296_01_000005:
18/09/25 15:25:20 INFO skein.ApplicationMaster: RUNNING: dask.worker_2 on container_e50_1537170416259_7296_01_000005:
18/09/25 15:25:20 INFO impl.ContainerManagementProtocolProxy: Opening proxy : server1:45454:
18/09/25 15:25:52 INFO skein.ApplicationMaster: Shutting down gRPC server:
18/09/25 15:25:52 INFO skein.ApplicationMaster: gRPC server shut down:

End of LogType:appmaster.log

LogType:directory.info
Log Upload Time:Tue Sep 25 15:25:22 +0200 2018
LogLength:1087
Log Contents:
ls -l:
total 20
-rw------- 1 me hadoop  328 Sep 25 15:25 container_tokens
-rwx------ 1 me hadoop 4578 Sep 25 15:25 launch_container.sh
lrwxrwxrwx 1 me hadoop   99 Sep 25 15:25 skein.jar -> /hadoop/yarn/local/usercache/me/appcache/application_1537170416259_7296/filecache/11/skein.jar
drwxr-s--- 2 me hadoop 4096 Sep 25 15:25 tmp
find -L . -maxdepth 5 -ls:
60823044    4 drwxr-s---   3 me  hadoop       4096 Sep 25 15:25 .
60823060    4 -rw-------   1 me  hadoop        328 Sep 25 15:25 ./container_tokens
60823059    8 -rwx------   1 me  hadoop       4578 Sep 25 15:25 ./launch_container.sh
60823047    4 -r-x------   1 me  root         1704 Sep 25 15:24 ./.skein.pem
60823050 8748 -r-x------   1 me  root      8955523 Sep 25 15:25 ./skein.jar
60823058    4 drwxr-s---   2 me  hadoop       4096 Sep 25 15:25 ./tmp
60823056    4 -r-x------   1 me  root         1406 Sep 25 15:25 ./.skein.proto
60823053    4 -r-x------   1 me  root         1013 Sep 25 15:25 ./.skein.crt
broken symlinks(find -L . -maxdepth 5 -type l -ls):

End of LogType:directory.info

LogType:launch_container.sh
Log Upload Time:Tue Sep 25 15:25:22 +0200 2018
LogLength:4578
Log Contents:
#!/bin/bash

export PATH="/usr/sbin:/sbin:/usr/lib/ambari-server/*:/usr/sbin:/sbin:/usr/lib/ambari-server/*:/usr/lib64/qt-3.3/bin:/usr/java-1.8.0-openjdk-1.8.0.77-0.b03.el7_2.x86_64//bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/apps/toolboxes/exploit/bin:/var/lib/ambari-agent:/var/lib/ambari-agent"
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-"/usr/hdp/current/hadoop-client/conf"}
export MAX_APP_ATTEMPTS="1"
export JAVA_HOME=${JAVA_HOME:-"/usr/java-1.8.0-openjdk-1.8.0.77-0.b03.el7_2.x86_64/"}
export LANG="en_GB.UTF-8"
export APP_SUBMIT_TIME_ENV="1537881856045"
export NM_HOST="server1"
export LOGNAME="me"
export JVM_PID="$$"
export PWD="/hadoop/yarn/local/usercache/me/appcache/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001"
export LOCAL_DIRS="/hadoop/yarn/local/usercache/me/appcache/application_1537170416259_7296"
export APPLICATION_WEB_PROXY_BASE="/proxy/application_1537170416259_7296"
export NM_HTTP_PORT="8042"
export LOG_DIRS="/hadoop/yarn/log/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001"
export NM_AUX_SERVICE_mapreduce_shuffle="AAA0+gAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=
"
export NM_PORT="45454"
export USER="me"
export HADOOP_YARN_HOME=${HADOOP_YARN_HOME:-"/usr/hdp/current/hadoop-yarn-nodemanager"}
export CLASSPATH="$CLASSPATH:./*:/etc/hadoop/conf:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*"
export HADOOP_TOKEN_FILE_LOCATION="/hadoop/yarn/local/usercache/me/appcache/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001/container_tokens"
export NM_AUX_SERVICE_spark_shuffle=""
export LOCAL_USER_DIRS="/hadoop/yarn/local/usercache/me/"
export HADOOP_HOME="/usr/hdp/2.6.1.0-129/hadoop"
export HOME="/home/"
export NM_AUX_SERVICE_spark2_shuffle=""
export CONTAINER_ID="container_e50_1537170416259_7296_01_000001"
export MALLOC_ARENA_MAX="4"
ln -sf "/hadoop/yarn/local/usercache/me/appcache/application_1537170416259_7296/filecache/10/skein.pem" ".skein.pem"
hadoop_shell_errorcode=$?
if [ $hadoop_shell_errorcode -ne 0 ]
then
  exit $hadoop_shell_errorcode
fi
ln -sf "/hadoop/yarn/local/usercache/me/appcache/application_1537170416259_7296/filecache/12/skein.crt" ".skein.crt"
hadoop_shell_errorcode=$?
if [ $hadoop_shell_errorcode -ne 0 ]
then
  exit $hadoop_shell_errorcode
fi
ln -sf "/hadoop/yarn/local/usercache/me/appcache/application_1537170416259_7296/filecache/11/skein.jar" "skein.jar"
hadoop_shell_errorcode=$?
if [ $hadoop_shell_errorcode -ne 0 ]
then
  exit $hadoop_shell_errorcode
fi
ln -sf "/hadoop/yarn/local/usercache/me/appcache/application_1537170416259_7296/filecache/13/.skein.proto" ".skein.proto"
hadoop_shell_errorcode=$?
if [ $hadoop_shell_errorcode -ne 0 ]
then
  exit $hadoop_shell_errorcode
fi
# Creating copy of launch script
cp "launch_container.sh" "/hadoop/yarn/log/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001/launch_container.sh"
chmod 640 "/hadoop/yarn/log/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001/launch_container.sh"
# Determining directory contents
echo "ls -l:" 1>"/hadoop/yarn/log/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001/directory.info"
ls -l 1>>"/hadoop/yarn/log/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001/directory.info"
echo "find -L . -maxdepth 5 -ls:" 1>>"/hadoop/yarn/log/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001/directory.info"
find -L . -maxdepth 5 -ls 1>>"/hadoop/yarn/log/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001/directory.info"
echo "broken symlinks(find -L . -maxdepth 5 -type l -ls):" 1>>"/hadoop/yarn/log/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001/directory.info"
find -L . -maxdepth 5 -type l -ls 1>>"/hadoop/yarn/log/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001/directory.info"
exec /bin/bash -c "$JAVA_HOME/bin/java -Xmx128M com.anaconda.skein.ApplicationMaster hdfs://server1:8020/user/me/.skein/application_1537170416259_7296 application_1537170416259_7296 >/hadoop/yarn/log/application_1537170416259_7296/container_e50_1537170416259_7296_01_000001/appmaster.log 2>&1"
hadoop_shell_errorcode=$?
if [ $hadoop_shell_errorcode -ne 0 ]
then
  exit $hadoop_shell_errorcode
fi

End of LogType:launch_container.sh

how to activate the debug mode on dask?

jcrist commented 6 years ago

There unfortunately is no debug mode currently. Improving logging (and making a custom log4j.properties file configurable) is on the to-do list.

The issue here looks to be with skein/yarn and has nothing to do with dask in particular. Yarn has numerous configuration options, we test with many of them, but some configurations may still have bugs.

For now, debugging is a bit manual. Can you save the following is test.py, then run python test.py and post the output of the script as well as the logs from the application? You may want to glance through both beforehand to anonymize (addresses, etc...) as needed.

import time
import skein

spec = skein.ApplicationSpec.from_yaml("""
name: skein-test

services:
    sleeper:
        resources:
            memory: 128
            vcores: 1
        files:
            # Specify any file to include, to test resource localization
            test.py: test.py
        commands:
            - ls
            - env
            - sleep infinity
""")

client = skein.Client()

app = client.submit_and_connect(spec)

try:
    print("Application started: %s" % app.id)
    print("")
    print("Please get the logs using `yarn logs -applicationId %s`" % app.id)
    print("")

    print("Testing the key-value store")
    app.kv.put('foo', b'bar')
    assert app.kv.get('foo') == b'bar'

    print("Testing scaling")
    app.scale("sleeper", 2)

    timeout = 5
    while len(app.get_containers(states=['RUNNING'])) < 2:
        time.sleep(0.5)
        timeout -= 0.5
        assert timeout > 0
    print("Scaled successfully")

    print("Letting processes run for a few seconds to generate logs")
    time.sleep(10)

    print("Shutting down application")
    app.shutdown()
    print("Shutdown successfully")
except Exception as exc:
    print("Shutdown errored: %s" % exc)
    client.kill_application(app.id)
hamroune commented 6 years ago

I tried your example it throws an error, so i tried a very minimalist only submit the spec app_id = client.submit(spec) then

18/09/26 10:55:57 INFO skein.ApplicationMaster: Application specification successfully loaded:
18/09/26 10:55:57 INFO skein.ApplicationMaster: user: me:
18/09/26 10:55:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable:
18/09/26 10:55:57 INFO client.RMProxy: Connecting to ResourceManager at server1/10.10.10.125:8030:
18/09/26 10:55:57 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0:
18/09/26 10:55:58 INFO skein.ApplicationMaster: Server started, listening on 33871:
18/09/26 10:55:58 INFO skein.ApplicationMaster: INTIALIZING: sleeper:
18/09/26 10:55:58 INFO skein.ApplicationMaster: REQUESTED: sleeper_0:
18/09/26 10:56:00 INFO impl.AMRMClientImpl: Received new token for : server1:45454:
18/09/26 10:56:00 WARN skein.ApplicationMaster: No matching service found for resource: <memory:2048, vCores:1>, priority: 1, releasing container_e50_1537170416259_7577_01_000002:

the application still on status RUNNING without any procesing

hamroune commented 6 years ago

Humm, interesting, if I print the version, like following code

cluster = YarnCluster()
client = Client(cluster)

version = client.get_versions(check=True)
print('version', version) # <==== this works
df = dd.read_csv('hdfs:///path/to/my.csv')
print(df)

in my Edge node I can see :

version {'scheduler': {'host': (('python', '3.6.6.final.0'), ('python-bits', 64), ('OS', 'Linux'), ('OS-release', '3.10.0-693.21.1.el7.x86_64'), ('machine', 'x86_64'), ('processor', 'x86_64'), ('byteorder', 'little'), ('LC_ALL', 'None'), ('LANG', 'en_GB.UTF-8'), ('LOCALE', 'en_GB.UTF-8')), 'packages': {'required': (('dask', '0.19.1'), ('distributed', '1.23.1'), ('msgpack', '0.5.6'), ('cloudpickle', '0.5.5'), ('tornado', '5.1'), ('toolz', '0.9.0')), 'optional': (('numpy', '1.15.1'), ('pandas', '0.23.4'), ('bokeh', '0.13.0'), ('lz4', None), ('dask_ml', None), ('blosc', None))}}, 'workers': {'tcp://10.10.10.125:36999': {'host': (('python', '3.6.6.final.0'), ('python-bits', 64), ('OS', 'Linux'), ('OS-release', '3.10.0-693.21.1.el7.x86_64'), ('machine', 'x86_64'), ('processor', 'x86_64'), ('byteorder', 'little'), ('LC_ALL', 'None'), ('LANG', 'en_GB.UTF-8'), ('LOCALE', 'en_GB.UTF-8')), 'packages': {'required': (('dask', '0.19.1'), ('distributed', '1.23.1'), ('msgpack', '0.5.6'), ('cloudpickle', '0.5.5'), ('tornado', '5.1'), ('toolz', '0.9.0')), 'optional': (('numpy', '1.15.1'), ('pandas', '0.23.4'), ('bokeh', '0.13.0'), ('lz4', None), ('dask_ml', None), ('blosc', None))}}}, 'client': {'host': [('python', '3.6.6.final.0'), ('python-bits', 64), ('OS', 'Linux'), ('OS-release', '3.10.0-693.21.1.el7.x86_64'), ('machine', 'x86_64'), ('processor', 'x86_64'), ('byteorder', 'little'), ('LC_ALL', 'None'), ('LANG', 'en_GB.UTF-8'), ('LOCALE', 'en_GB.UTF-8')], 'packages': {'required': [('dask', '0.19.1'), ('distributed', '1.23.1'), ('msgpack', '0.5.6'), ('cloudpickle', '0.5.5'), ('tornado', '5.1'), ('toolz', '0.9.0')], 'optional': [('numpy', '1.15.1'), ('pandas', '0.23.4'), ('bokeh', '0.13.0'), ('lz4', None), ('dask_ml', None), ('blosc', None)]}}}
Segmentation fault

so I think the Segment fault at the last, is related to the dd.read_csv

martindurant commented 6 years ago

Perhaps the read from hdfs is failing, or even the import of the hdfs library. I would first make sure that you are using the pyarrow driver (pyarrow must be installed). Execute the following after waiting for workers to come alive:

clent.run(dask.config.set, hdfs_driver='pyarrow'})

(this is better done with a dask config file, but this should work)

hamroune commented 6 years ago

what do you mean after "workers come alive" I cannot see that in my code, is there any piece of code to add?

martindurant commented 6 years ago

I don't know if client = Client(cluster) waits for the containers to start up, but you can always poll client.ncores() to see if you have enough workers appeared. The reason is, the run() command above will only run on workers it is sent to, the current set of workers known to the scheduler. That is why the configuration would be better achieved by a config file, but I thought this might get you going faster.

jcrist commented 6 years ago

I tried your example it throws an error,

Yes, but how did it error? The script should have run successfully, failure would indicate a bug (probably due to a YARN configuration that we're not testing against).

the application still on status RUNNING without any procesing

If you're using the spec that I provided above, that service just runs sleep infinity which runs sleeps forever. You'd need to kill that application manually. This was intended and does not indicate a bug.

so I think the Segment fault at the last, is related to the dd.read_csv

Interesting. Previously your cluster died before connection, now you're able to connect? Did you change anything, or was the error intermittent?

Regarding the segfault, this is almost certainly due to the hdfs driver. Martin's suggestion is one way to handle this. Another would be to remove hdfs3 and libhdfs3 from your environment. If those packages are not available, the pyarrow driver will be used. This is a more robust solution, and is what I'd do. The next release of dask will default to using pyarrow instead of hdfs3, removing this problem completely.

jcrist commented 6 years ago

Any news on this @hamroune?

There unfortunately is no debug mode currently. Improving logging (and making a custom log4j.properties file configurable) is on the to-do list.

This has now been fixed in the new release (version 0.2.0, available now on PyPI, and shortly on conda-forge). The new debug options aren't exposed yet in dask-yarn, but Skein itself now has improved logging, as well as support for using a custom log4j.properties file. This might prove useful in determining the cause of the issue above. See the documentation here.


# Add either of these in the specification to increase logging
master:
  # Increase skein's log level 
  log_level: debug
  # OR use a custom log4j.properties file. This allows for increased logging in skein's dependencies as well.
  log_config: path/to/custom/log4j.properties
jcrist commented 5 years ago

A new version of dask-yarn (0.3.2) and skein have been released including several feature and bug improvements. Please try it out and see if it fixes your issue. Planning on closing in 7 days if no comment.

rzuidhof commented 5 years ago

With dask-yarn version 0.3.1 I got an error when trying to scale down in HDP 2.6.

>>> cluster.scale(4)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/a-rzuidhof/.conda/envs/demo/lib/python3.6/site-packages/dask_yarn/core.py", line 317, in scale
    to_close = self._select_workers_to_close(n_to_delete)
  File "/home/a-rzuidhof/.conda/envs/demo/lib/python3.6/site-packages/dask_yarn/core.py", line 287, in _select_workers_to_close
    workers = sorted((v['memory'], k) for k, v in worker_info.items())
  File "/home/a-rzuidhof/.conda/envs/demo/lib/python3.6/site-packages/dask_yarn/core.py", line 287, in <genexpr>
    workers = sorted((v['memory'], k) for k, v in worker_info.items())
KeyError: 'memory'

This has been fixed in version 0.4.0. Thanks for the good job.

jcrist commented 5 years ago

With dask-yarn version 0.3.1 I got an error when trying to scale down in HDP 2.6.

That was due to an incompatibility brought on by a new release of dask.distributed. This was fixed in #29.

Planning on closing in 7 days if no comment.

Closing due to inactivity. Feel free to reopen if you can provide a reproducible test case or more information for debugging.