dask / dask-yarn

Deploy dask on YARN clusters
http://yarn.dask.org
BSD 3-Clause "New" or "Revised" License
69 stars 41 forks source link

CLI commands are broken for Python < 2.7.6 #68

Closed rchaudron closed 5 years ago

rchaudron commented 5 years ago

Hello,

By running the following code:

from dask_yarn import YarnCluster
from dask.distributed import Client

cluster = YarnCluster(environment='/home/rchaudro/pyspark2.tar.gz')

I have the following error:

---------------------------------------------------------------------------
OSError                                   Traceback (most recent call last)
<ipython-input-2-16f7e6a860db> in <module>()
      2 from dask.distributed import Client
      3 
----> 4 cluster = YarnCluster(environment='pyspark2.tar.gz')
      5 # Connect to the cluster
      6 client = Client(cluster)

/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.pyc in __init__(self, environment, n_workers, worker_vcores, worker_memory, worker_restarts, worker_env, scheduler_vcores, scheduler_memory, deploy_mode, name, queue, tags, user, skein_client)
    293                                    user=user)
    294 
--> 295         self._start_cluster(spec, skein_client)
    296 
    297     @cached_property

/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.pyc in _start_cluster(self, spec, skein_client)
    337                              "'dask.worker' service")
    338 
--> 339         skein_client = _get_skein_client(skein_client)
    340 
    341         if 'dask.scheduler' not in spec.services:

/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.pyc in _get_skein_client(skein_client, security)
     44         with warnings.catch_warnings():
     45             warnings.simplefilter('ignore')
---> 46             return skein.Client(security=security)
     47     return skein_client
     48 

/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/skein/core.pyc in __init__(self, address, security, keytab, principal, log, log_level, java_options)
    351                                           log=log,
    352                                           log_level=log_level,
--> 353                                           java_options=java_options)
    354         else:
    355             proc = None

/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/skein/core.pyc in _start_driver(security, set_global, keytab, principal, log, log_level, java_options)
    250                                 stderr=outfil,
    251                                 env=env,
--> 252                                 **popen_kwargs)
    253 
    254         while proc.poll() is None:

/usr/lib64/python2.7/subprocess.pyc in __init__(self, args, bufsize, executable, stdin, stdout, stderr, preexec_fn, close_fds, shell, cwd, env, universal_newlines, startupinfo, creationflags)
    709                                 p2cread, p2cwrite,
    710                                 c2pread, c2pwrite,
--> 711                                 errread, errwrite)
    712         except Exception:
    713             # Preserve original exception in case os.close raises.

/usr/lib64/python2.7/subprocess.pyc in _execute_child(self, args, executable, preexec_fn, close_fds, cwd, env, universal_newlines, startupinfo, creationflags, shell, to_close, p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite)
   1325                         raise
   1326                 child_exception = pickle.loads(data)
-> 1327                 raise child_exception
   1328 
   1329 

OSError: [Errno 2] No such file or directory
jcrist commented 5 years ago

Hmmm, that's not a very good error message, the error here is much better in Python 3 (actually says what file wasn't found). In this case I suspect it's java that wasn't found - you can fix this by setting your JAVA_HOME environment variable to the appropriate directory (if the java executable is at /foo/bar/bin/java, JAVA_HOME would be /foo/bar).

rchaudron commented 5 years ago

Thank you for the quick reply. What does java do here?

jcrist commented 5 years ago

YARN is potentially language independent, but in practice only has good integration with Java. To start a dask-yarn application, a local java process is started for interacting with the YARN resource manager. Once started, another java process called an "Application Master" runs on the YARN cluster. Both of these are part of the Skein library: https://jcrist.github.io/skein/.

rchaudron commented 5 years ago

Ok I understand better. I added the environment variable JAVA_HOME = "/usr/jdk64/jdk1.8.0_112". But the problem is not fixed, so I looked at skein and when I try to launch in CLI: skein driver start, I have the following error:

 File "/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/skein/cli.py", line 441, in main
    func(**kwargs)
TypeError: <lambda>() takes no arguments (9 given)
jcrist commented 5 years ago

That's an odd error, I'm not sure what happened there. Seeing a direct copy of the terminal command and output may help. For debugging your actual issue, can you try the following:

import os
print(os.environ['JAVA_HOME'])
import skein
client = skein.Client(log_level='debug')
rchaudron commented 5 years ago

(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ python
Python 2.7.5 (default, Jul 13 2018, 13:06:57)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> print(os.environ['JAVA_HOME'])
/usr/jdk64/jdk1.8.0_112
>>> import skein
>>> client = skein.Client(log_level='debug')
19/06/03 16:48:44 DEBUG skein.Driver: Starting Skein version 0.7.3
19/06/03 16:48:44 DEBUG skein.Driver: Logging in using ticket cache
19/06/03 16:48:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/06/03 16:48:46 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/06/03 16:48:46 INFO client.AHSProxy: Connecting to Application History server at plb-hdpn3-p01.prod.lan/10.99.97.131:10200
19/06/03 16:48:46 INFO skein.Driver: Driver started, listening on 46658
19/06/03 16:48:46 DEBUG skein.Driver: Reporting gRPC server port back to the launching process
>>>
jcrist commented 5 years ago

Cool, so that's working fine. Given the same environment as above, you should be able to create a yarn cluster:

import dask_yarn
cluster = dask_yarn.YarnCluster(...)  # whatever options you want to set
# ...

If that still fails, I'd like to see the same kind of output as above to debug.

rchaudron commented 5 years ago

Thanks for your help.

(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ python
Python 2.7.5 (default, Jul 13 2018, 13:06:57)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import dask_yarn
>>> cluster = dask_yarn.YarnCluster(environment='pyspark2.tar.gz')
19/06/03 16:58:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/06/03 16:59:00 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/06/03 16:59:00 INFO client.AHSProxy: Connecting to Application History server at plb-hdpn3-p01.prod.lan/10.99.97.131:10200
19/06/03 16:59:00 INFO skein.Driver: Driver started, listening on 35819
19/06/03 16:59:01 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...
19/06/03 16:59:01 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm1]
19/06/03 16:59:01 INFO skein.Driver: Uploading application resources to hdfs://mthdp01/user/rchaudro/.skein/application_1546518863471_42256
19/06/03 16:59:03 INFO skein.Driver: Submitting application...
19/06/03 16:59:03 INFO impl.YarnClientImpl: Submitted application application_1546518863471_42256
19/06/03 16:59:28 INFO impl.YarnClientImpl: Killed application application_1546518863471_42256
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.py", line 295, in __init__
    self._start_cluster(spec, skein_client)
  File "/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/core.py", line 375, in _start_cluster
    scheduler_address = app.kv.wait('dask.scheduler').decode()
  File "/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/skein/kv.py", line 655, in wait
    event = event_queue.get()
  File "/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/skein/kv.py", line 281, in get
    raise out
skein.exceptions.ConnectionError: Unable to connect to application
jcrist commented 5 years ago

Your application was submitted successfully, but failed after starting (as shown by the logs above). To debug, you'll need to see the application logs:

$ yarn logs -applicationId application_1546518863471_42256
rchaudron commented 5 years ago
Container: container_e29_1546518863471_42256_01_000001 on plb-hdpd1-p02.prod.lan_45454
LogAggregationType: AGGREGATED
======================================================================================
LogType:application.master.log
LogLastModifiedTime:Mon Jun 03 16:59:27 +0200 2019
LogLength:2903
LogContents:
19/06/03 16:59:04 INFO skein.ApplicationMaster: Starting Skein version 0.7.3
19/06/03 16:59:05 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/06/03 16:59:05 INFO skein.ApplicationMaster: Running as user rchaudro
19/06/03 16:59:05 INFO skein.ApplicationMaster: Application specification successfully loaded
19/06/03 16:59:05 WARN shortcircuit.DomainSocketFactory: The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
19/06/03 16:59:05 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-cached-nodemanagers-proxies : 0
19/06/03 16:59:06 INFO skein.ApplicationMaster: gRPC server started at plb-hdpd1-p02.prod.lan:45575
19/06/03 16:59:06 INFO skein.ApplicationMaster: WebUI server started at plb-hdpd1-p02.prod.lan:43410
19/06/03 16:59:06 INFO skein.ApplicationMaster: Registering application with resource manager
19/06/03 16:59:06 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...
19/06/03 16:59:06 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm1]
19/06/03 16:59:06 INFO client.AHSProxy: Connecting to Application History server at plb-hdpn3-p01.prod.lan/10.99.97.131:10200
19/06/03 16:59:06 INFO client.RequestHedgingRMFailoverProxyProvider: Looking for the active RM in [rm1, rm2]...
19/06/03 16:59:06 INFO client.RequestHedgingRMFailoverProxyProvider: Found active RM [rm1]
19/06/03 16:59:06 INFO skein.ApplicationMaster: Initializing service 'dask.worker'.
19/06/03 16:59:06 INFO skein.ApplicationMaster: Initializing service 'dask.scheduler'.
19/06/03 16:59:06 INFO skein.ApplicationMaster: REQUESTED: dask.scheduler_0
19/06/03 16:59:07 INFO impl.AMRMClientImpl: Received new token for : plb-hdpd3-p02.prod.lan:45454
19/06/03 16:59:07 INFO skein.ApplicationMaster: Starting container_e29_1546518863471_42256_01_000002...
19/06/03 16:59:07 INFO skein.ApplicationMaster: RUNNING: dask.scheduler_0 on container_e29_1546518863471_42256_01_000002
19/06/03 16:59:07 INFO impl.ContainerManagementProtocolProxy: Opening proxy : plb-hdpd3-p02.prod.lan:45454
19/06/03 16:59:27 WARN skein.ApplicationMaster: FAILED: dask.scheduler_0 - Container failed during execution, see logs for more information.
19/06/03 16:59:27 INFO skein.ApplicationMaster: Shutting down: Failure in service dask.scheduler, see logs for more information.
19/06/03 16:59:27 INFO skein.ApplicationMaster: Unregistering application with status FAILED
19/06/03 16:59:27 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
19/06/03 16:59:27 INFO skein.ApplicationMaster: Deleted application directory hdfs://mthdp01/user/rchaudro/.skein/application_1546518863471_42256
19/06/03 16:59:27 INFO skein.ApplicationMaster: WebUI server shut down
19/06/03 16:59:27 INFO skein.ApplicationMaster: gRPC server shut down

End of LogType:application.master.log
***************************************************************************************
jcrist commented 5 years ago

That appears not to be the full logs - the scheduler container failed during startup, but the logs aren't included in the above. There should be logs for container container_e29_1546518863471_42256_01_000002 somewhere.

rchaudron commented 5 years ago

I did not see anything interesting in the full log: container-log.txt

jcrist commented 5 years ago

It's at the bottom of that log:

Container: container_e29_1546518863471_42256_01_000002 on plb-hdpd3-p02.prod.lan_45454
LogAggregationType: AGGREGATED
======================================================================================
LogType:dask.scheduler.log
LogLastModifiedTime:Mon Jun 03 16:59:26 +0200 2019
LogLength:51
LogContents:
usage: dask-yarn [--help] [--version] command ...

End of LogType:dask.scheduler.log
***********************************************************************************

Looks like the startup command failed, which is odd. We test thoroughly on both Python 3 and Python 2, and have never seen this kind of error before. You also had issues with running the CLI above - I wonder if there's something odd with your environment (perhaps LANG environment?).

The following should run without error locally (as demonstrated below):

$ dask-yarn services scheduler --help
usage: dask-yarn services scheduler [--help]

Start a Dask scheduler process

optional arguments:
  --help, -h      Show this help message then exit

If you get an error, something is up with your environment (I'm not sure what).

rchaudron commented 5 years ago

Indeed, there is a problem on several command lines.

(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ $JAVA_HOME
-bash: /usr/jdk64/jdk1.8.0_112/: Is a directory
(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ dask-yarn services scheduler
usage: dask-yarn [--help] [--version] command ...

(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ dask-yarn submit --environment pyspark2.tar.gz jupyter/notebook/test.py
Traceback (most recent call last):
  File "/home/rchaudro/jupyter/virtenv/pyspark2/bin/dask-yarn", line 10, in <module>
    sys.exit(main())
  File "/home/rchaudro/jupyter/virtenv/pyspark2/lib/python2.7/site-packages/dask_yarn/cli.py", line 407, in main
    func(**kwargs)
TypeError: <lambda>() takes no arguments (19 given)

The LANG environment variable

(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ echo $LANG
en_US.UTF-8
jcrist commented 5 years ago

Interesting, that's not it. A few more questions:

$ dask-yarn
$ dask-yarn -h
$ dask-yarn services
$ dask-yarn services -h
$ python --version
# test.py
import argparse

parser = argparse.ArgumentParser(prog='PROG')
subparsers = parser.add_subparsers(help='sub-command help')

# create the parser for the "a" command
parser_a = subparsers.add_parser('a', help='a help')
parser_a.add_argument('bar', type=int, help='bar help')

# create the parser for the "b" command
parser_b = subparsers.add_parser('b', help='b help')
parser_b.add_argument('--baz', choices='XYZ', help='baz help')

print(parser.parse_args(['a', '12']))
print(parser.parse_args(['b', '--baz', 'Z']))
print(parser.parse_args())
$ python test.py b --baz Z
rchaudron commented 5 years ago
(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ dask-yarn
usage: dask-yarn [--help] [--version] command ...
dask-yarn: error: too few arguments
(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ dask-yarn -h
usage: dask-yarn [--help] [--version] command ...

Deploy Dask on Apache YARN

positional arguments:
  command
    submit        Submit a Dask application to a YARN cluster
    status        Check the status of a submitted Dask application
    kill          Kill a Dask application
    services      Manage Dask services

optional arguments:
  --help, -h      Show this help message then exit
  --version       Show version then exit
(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ dask-yarn services
usage: dask-yarn services [--help] command ...
dask-yarn services: error: too few arguments
(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ dask-yarn services -h
usage: dask-yarn services [--help] command ...

Manage Dask services

positional arguments:
  command
    scheduler     Start a Dask scheduler process
    worker        Start a Dask worker process
    client        Start a Dask client process

optional arguments:
  --help, -h      Show this help message then exit
(pyspark2) [rchaudro@vla-hdpjup-p02 ~]$ python --version
Python 2.7.5
jcrist commented 5 years ago

Ah, this is a bug in Python 2.7.5's argparse module, it works fine in Python 2.7.10. I'm looking to see if there's an easy workaround on our end, but if possible I highly recommend upgrading to a more recent Python release (3.8 is almost out).

rchaudron commented 5 years ago

Ok, thank you for your help. We can not update Python for the moment because we would have to update HDP on the cluster, and we do not have time for that right now. Keep me informed if you have another solution. Thanks again.

jcrist commented 5 years ago

Yeah, there's not an easy workaround for this. Subparsers are broken for python < 2.7.6. A workaround is possible, but requires more effort than I'm willing to spend. If you'd like to make a PR fixing things I'd accept it.

We can not update Python for the moment because we would have to update HDP on the cluster, and we do not have time for that right now.

You can have multiple different versions of Python installed on a machine, as long as their major.minor version numbers are different. So you could install python 3.7 on every node and not worry about breaking things (as long as python -> python2.7 still). Alternatively, conda (http://conda.pydata.org/miniconda.html) environments live separately from the system python and would allow using a non-legacy python version without upgrading on your cluster.

jcrist commented 5 years ago

Dask just dropped Python 2 support, and we're likely to do the same in the next release. I recommend installing a non-legacy Python version on your cluster using one of the methods described above (if you use conda, you'd only need it on the edge node: http://yarn.dask.org/en/latest/environments.html).