dask-yarn job fails with dumps_msgpack ImportError #147

ikerforce commented 3 years ago

The following script fails when running it con EMR and HDInsight.

import os
os.environ['ARROW_LIBHDFS_DIR'] = '/usr/hdp/'

from dask_yarn import YarnCluster
from dask.distributed import Client
import dask.dataframe as dd

env_path = 'hdfs:///conda_envs/dask_yarn.tar.gz'

cluster = YarnCluster(environment=env_path,


# if __name__ == '__main__':

client = Client(cluster)
path = 'hdfs:///samples/data_100K_dask_casted/data_100K_dask_casted/*'

# df = dd.read_csv('hdfs:///samples/test.csv')
df = dd.read_parquet(path, engine='pyarrow')


What happened:

The error is the following:

Traceback (most recent call last):
  File "dask_test.py", line 30, in <module>
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/base.py", line 284, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/base.py", line 566, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/client.py", line 2646, in get
    futures = self._graph_to_futures(
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/client.py", line 2554, in _graph_to_futures
    dsk = dsk.__dask_distributed_pack__(self, keyset)
  File "/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/dask/highlevelgraph.py", line 946, in __dask_distributed_pack__
    from distributed.protocol.core import dumps_msgpack
ImportError: cannot import name 'dumps_msgpack' from 'distributed.protocol.core' (/home/hadoop/miniconda3/envs/dask_yarn/lib/python3.8/site-packages/distributed/protocol/core.py)
Exception ignored in: <function YarnCluster.__del__ at 0x7f6584a2ac10>

What you expected to happen:

Correct execution of the code as in my local computer (without dask-yarn).

Anything else we need to know?: I was able to get around this error by checking your change-logs and realising that dumps_msgpack was removed in the last distributed version. However I followed the exact steps from the offical latest documentation, so I believe this should be corrected or a note posted advising to use distributed 2021.4.0 instead of the default 2021.4.1.


jacobtomlinson commented 3 years ago

As you say it looks like the dumps_msgpack method was removed in dask/distributed#4677 and dask/dask#7525.

It also look like distributed 2021.4.1 should depend on dask 2021.4.1 and that is being discussed here.

As a workaround could you ensure you have the latest versions of both dask and distributed installed. It looks like you have an older version of dask in your environment.

ikerforce commented 3 years ago

I followed your instruction and installed with conda install -c conda-forge dask=2021.4.1 dask-core=2021.4.1 distributed=2021.4.1 dask-yarn and the issue is gone.

Hope that the default conda install works soon.


jacobtomlinson commented 3 years ago

That's great. I'm going to close this out as worked around, and things should be addressed in the conda recipe soon.