dask / knit

Deprecated, please use https://github.com/jcrist/skein or https://github.com/dask/dask-yarn instead
http://knit.readthedocs.io/en/latest/
BSD 3-Clause "New" or "Revised" License
53 stars 10 forks source link

Need an example of submitting a dask/python script to YARN #124

Closed j-bennet closed 6 years ago

j-bennet commented 6 years ago

I looked all over Knit documentation, but I could not find an example of running a dask/python script on yarn.

Let's say I already have an application that runs with dask.distributed client:

t3.py

import dask.dataframe as dd
from dask.distributed import Client

if __name__ == '__main__':
    client = Client()
    df = dd.read_parquet('s3://path/here')
    print(df.head(3))

How do run this with the YARN scheduler?

P.S. I am aware that this is a dummy question, sorry.

martindurant commented 6 years ago

If you were willing to add to the documentation, that would be highly appreciated (although I am aware of other efforts to make a more seamless python/yarn experience).

In https://github.com/dask/knit/issues/123 you have a nice working example of how to use the client for a cluster started in the same process.

To reuse that cluster from other python processes, you can find the address to connect to from cluster.local_cluster.scheduler_address, and use this in your call to Client().

>>> cluster = DaskYARNCluster(env='/home/hadoop/reqs/dvss.zip')
>>> cluster.start(n_workers=3, memory=500, cpus=5, checks=True)
>>> cluster.local_cluster.scheduler_address
"tcp://10.56.73.28:8786"

in other sessions

>>> client = distributed.Client("tcp://10.56.73.28:8786")
j-bennet commented 6 years ago

@martindurant Well, my example is not quite working, so I'm sure I'm doing something wrong. I have this problem: after my application completes, it is being killed, with some scary error messages:

                        url              referrer session_id  \
0  http://a.com/articles/15    http://google.com/        yyy
1  http://a.com/articles/17    http://google.com/        xxx
2   http://a.com/articles/7  http://facebook.com/        yyy

                   ts hour customer
0 2017-09-17 00:09:00    0    a.com
1 2017-09-17 00:21:00    0    a.com
2 2017-09-17 00:00:00    0    a.com
18/04/14 22:46:44 INFO impl.YarnClientImpl: Killed application application_1523740467851_0005
2018-04-14 22:46:45,108 - asyncio - ERROR - Future exception was never retrieved
future: <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed',)>
Traceback (most recent call last):
  File "/home/hadoop/conda/envs/dvss/lib/python3.6/site-packages/distributed/comm/tcp.py", line 179, in read
    n_frames = yield stream.read_bytes(8)
  File "/home/hadoop/conda/envs/dvss/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/hadoop/conda/envs/dvss/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/hadoop/conda/envs/dvss/lib/python3.6/site-packages/distributed/comm/tcp.py", line 200, in read
    convert_stream_closed_error(self, e)
  File "/home/hadoop/conda/envs/dvss/lib/python3.6/site-packages/distributed/comm/tcp.py", line 128, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
2018-04-14 22:46:45,109 - asyncio - ERROR - Future exception was never retrieved
future: <Future finished exception=CommClosedError('in <closed TCP>: Stream is closed',)>
Traceback (most recent call last):
  File "/home/hadoop/conda/envs/dvss/lib/python3.6/site-packages/distributed/comm/tcp.py", line 179, in read
    n_frames = yield stream.read_bytes(8)
  File "/home/hadoop/conda/envs/dvss/lib/python3.6/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/hadoop/conda/envs/dvss/lib/python3.6/site-packages/tornado/gen.py", line 1107, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/hadoop/conda/envs/dvss/lib/python3.6/site-packages/distributed/comm/tcp.py", line 200, in read
    convert_stream_closed_error(self, e)
  File "/home/hadoop/conda/envs/dvss/lib/python3.6/site-packages/distributed/comm/tcp.py", line 128, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed

You see it prints out those 3 records from my dataframe, and then 💣. Ouch. This is not what I want to see.

screenshot 2018-04-14 15 54 29

And yes, once I get things right, I'd be happy to PR a documentation example!

martindurant commented 6 years ago

"KILLED" sounds like an action on behalf of yarn. I'm afraid you will need to look in the resourcemanager logs to see why that happened. Was there any message from the process that launched the dask_yarn cluster?

The comm-closed errors you see above are all in response to having the worker disappear while, apparently, communication was in process.

j-bennet commented 6 years ago

Well, once my script ends, the cluster exits, right? and this is something that the cluster does when it exits. It kills the application:

https://github.com/dask/knit/blob/24a5e33ab5d6b4235cadf79f7e4244eb9935e9b4/dask_yarn/core.py#L73

and

https://github.com/dask/knit/blob/24a5e33ab5d6b4235cadf79f7e4244eb9935e9b4/dask_yarn/core.py#L173

It looks like I just don't know the proper way to finish this application.

martindurant commented 6 years ago

Ah, so you need your "master" script, i.e., the one that starts the cluster, to hang around. A while True: time.sleep(1) loops would do... We don't currently have a way to put the scheduler process into a persistent state detached from the process that launches it.

j-bennet commented 6 years ago

This is why I need an example. It's not obvious how I should do things.

So, ok, now I have this script that starts a YARN cluster:

start_dask_yarn.py

from time import sleep
import dask_yarn

cluster = dask_yarn.DaskYARNCluster(env='/home/hadoop/reqs/dvss.zip', lang='en_US.UTF-8')
cluster.start(n_workers=3, memory=5*1024, cpus=5)
try:
    while True:
        print('Cluster scheduler: {}.'.format(cluster.scheduler_address))
        sleep(20)
except KeyboardInterrupt:
    print('Cluster is done.')

Now, how do I make my original script to use this cluster?

martindurant commented 6 years ago

Presumably this prints out a string of the form "tcp://ip.address:port"? That's the thing that you want to pass to Client() in your other script.

j-bennet commented 6 years ago

Ok, so the other script becomes just this:

t2.py

import sys
import dask.dataframe as dd
from dask.distributed import Client

if __name__ == '__main__':
    address = sys.argv[1]
    print('Connecting to address: {}.'.format(address))
    client = Client(address=address)
    df = dd.read_parquet('s3://parsely-public/jbennet/daskvsspark/events/100-24/*/*/*/*/*/*.parquet')
    print('-' * 20)
    print(df.head(3))

and I run it providing my scheduler address: python t2.py tcp://10.21.0.219:35225.

This seems to work. But I still have a ton of questions!

  1. A YARN application is started when I start my DaskYARNCluster, and killed when I stop the cluster. So this is completely different from how things are done in Spark, where one script run would correspond to one YARN application. Is that correct?
  2. There seems to be no way to have an application FINISHED not KILLED?
  3. Does this mean I have to start and stop a cluster for each run of my script?
martindurant commented 6 years ago

Indeed, this method is not the same as the spark "submit" one - the scheduler is not even running within yarn, only the workers. However, it is still useful. Whether you keep the cluster going, or start a new cluster per user or per script, that is a design question - you could keep the cluster up indefinitely if you wanted.

The dask workers do not have a convenient way to exit. They can be stopped, but there is a controlling "nanny" process tasked with creating a new worker process when that happens, and there is no way to tell the "nanny" to stop doing that. Perhaps we could circumvent the nanny, but killing the container or the application seems the achieve the stated purpose. There is a method in the ApplicationMaster for it to cleanly shut itself down, but calling it proved problematic.

mrocklin commented 6 years ago

A few small comments:

  1. It looks like @j-bennet is using the dask_yarn module above. Is this still recommended? Or is the code in knit better today?
  2. @j-bennet please excuse our dust here, YARN support is not the greatest in Dask (it has proven to be the least friendly system for Python development we've encountered) hence its not being included in our setup documentation.
  3. cc @jcrist who is developing the replacement to knit and may be interested in some of these questions.
martindurant commented 6 years ago

The knit repo includes the dask_yarn (python) package https://github.com/dask/knit/tree/master/dask_yarn

j-bennet commented 6 years ago

@mrocklin @jcrist Replacement to knit? What will it be?

martindurant commented 6 years ago

It is taking shape at https://github.com/jcrist/skein , a re-imagining of knit from the ground-up. Hopefully all dask-on-yarn users will become happy.

j-bennet commented 6 years ago

Oh, I see! I think I visited that repo before in my stumbling around, and was turned away by readme:

screenshot 2018-04-20 14 49 49
martindurant commented 6 years ago

There is no promised timeline, yet, but there is definitely development unlike, for the most part, here on knit :)