dask / dask-yarn

Deploy dask on YARN clusters
http://yarn.dask.org
BSD 3-Clause "New" or "Revised" License
69 stars 41 forks source link

"Module not found" error using groupby-apply in dask-yarn on EMR #86

Closed jkirkby closed 5 years ago

jkirkby commented 5 years ago

I'm having problems getting dask groupby-apply working on EMR, I am seeing error messages in the worker logs which indicate a problem with deserialization the function being applied. I have a reproducable example that I have simplified as far as possible, which I am running on a small cluster (master: m4.large, workers: 2x m4.large)

1) I create the cluster using a modified version of the dask-yarn example bootstrap script. The script installs miniconda and other dependencies on the master node and all work nodes. It also creates a python source file (dask_test.py) containing a simple function on all nodes, and adds the file to the PYTHONPATH. This is my bootstrap script: bootstrap-dask-test.txt (I have changed the extension so I can upload it)

2) Running the following code using a LocalCluster in a notebook on the master works as expected:

import dask
from distributed import Client
from dask_test import test_func 

client = Client(n_workers=2, threads_per_worker=1, memory_limit='3 GiB')

df = dask.datasets.timeseries().groupby(['name']).apply(test_func, meta={'elements': 'int'}).compute()

print(f'Result has {df.shape[0]} rows and {df.shape[1]} columns')

3) The following code in a notebook using a YarnCluster fails:

import dask
from distributed import Client
from dask_yarn import YarnCluster
from dask_test import test_func 

env='python:///home/hadoop/miniconda/bin/python3.6'
cluster = YarnCluster(environment=env, deploy_mode='local', n_workers=4, worker_vcores=1, worker_memory='3 GiB')
client = Client(cluster)

df = dask.datasets.timeseries().groupby(['name']).apply(test_func, meta={'elements': 'int'}).compute()

print(f'Result has {df.shape[0]} rows and {df.shape[1]} columns')

This is the worker log containing the serialization error message: dask.worker.log

I have also tried using a packaged conda environment, and using Client.upload_file, with no luck. Any advice would be greatly appreciated.

jcrist commented 5 years ago

Thanks for filing an excellent issue report.

It also creates a python source file (dask_test.py) containing a simple function on all nodes, and adds the file to the PYTHONPATH

What's happening here is that YARN manages the worker environments, so you setting PYTHONPATH manually beforehand isn't seen by the workers (since YARN doesn't run user's .bashrc files). Mucking with PYTHONPATH is not the recommended way to install code in python, as it relies on the environment variables being set appropriately in all environments. I recommend one of the following:

Using Client.upload_file will work if your cluster is fixed in size, but won't work if you add/remove nodes during operation (it only uploads things once, so new workers won't get the files added). I don't recommend using it, it's around mostly for legacy reasons.

If you really want to set PYTHONPATH manually, you can pass worker_env={'PYTHONPATH': ...} to YarnCluster, and it will be set in your worker environments.

jkirkby commented 5 years ago

Great. Both of these two options worked for me:

  • Make your code a true package, and install it properly. Note that this doesn't require you to put it on pypi, pip can install fine from source or from a git url (e.g. pip install git+https://github.com/dask/dask-yarn.git).
  • If your code is small enough you could define everything in the notebook/script/whatever and cloudpickle will handle serializing the code to all nodes. Anything in main will be serialized automatically.

Thanks for the quick response.