dask / distributed

A distributed task scheduler for Dask
https://distributed.dask.org
BSD 3-Clause "New" or "Revised" License
1.56k stars 715 forks source link

Dask-Yarn & Dask-Dataframe map_partitions apply Error #2577

Open erico-souza opened 5 years ago

erico-souza commented 5 years ago

Hi

I have a large python function that I wish to execute in parallel. The dataframe contains only the text data and an identifier. When I execute the following command to process the function in parallel it works fine.

results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'],nlpe, nlpf))).compute(scheduler='processes')

When I try to run the same command using the dask-yarn interface, the command does not work:

results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'],nlpe, nlpf))).compute(scheduler='distributed')

The error I get is the following:

Traceback (most recent call last):
  File "pandas/_libs/index.pyx", line 162, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 958, in pandas._libs.hashtable.Int64HashTable.get_item
TypeError: an integer is required

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/utils.py", line 137, in raise_on_meta_error
    yield
  File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 3601, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "fullscript.py", line 677, in <lambda>
    results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'], nlpe, nlpf))).compute(scheduler='distributed')
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/frame.py", line 6014, in apply
    return op.get_result()
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 318, in get_result
    return super(FrameRowApply, self).get_result()
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 142, in get_result
    return self.apply_standard()
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 248, in apply_standard
    self.apply_series_generator()
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 277, in apply_series_generator
    results[i] = self.f(v)
  File "fullscript.py", line 677, in <lambda>
    results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'], nlpe, nlpf))).compute(scheduler='distributed')
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/series.py", line 767, in __getitem__
    result = self.index.get_value(self, key)
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/indexes/base.py", line 3118, in get_value
    tz=getattr(series.dtype, 'tz', None))
  File "pandas/_libs/index.pyx", line 106, in pandas._libs.index.IndexEngine.get_value
  File "pandas/_libs/index.pyx", line 114, in pandas._libs.index.IndexEngine.get_value
  File "pandas/_libs/index.pyx", line 164, in pandas._libs.index.IndexEngine.get_loc
KeyError: ('txt', 'occurred at index pm')

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "fullscript.py", line 748, in <module>
    partitiondata(tmpdata2)
  File "fullscript.py", line 160, in partitiondata
    data = procdata(tmp)
  File "fullscript.py", line 677, in procdata
    results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'], nlpe, nlpf))).compute(scheduler='distributed')
  File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 543, in map_partitions
    return map_partitions(func, self, *args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 3640, in map_partitions
    meta = _emulate(func, *args, udf=True, **kwargs2)
  File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 3601, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "/usr/lib/python3.5/contextlib.py", line 77, in __exit__
    self.gen.throw(type, value, traceback)
  File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/utils.py", line 154, in raise_on_meta_error
    raise ValueError(msg)
ValueError: Metadata inference failed in `lambda`.

You have supplied a custom function and Dask is unable to
determine the type of output that that function returns.

To resolve this please provide a meta= keyword.
The docstring of the Dask function you ran should have more information.

Original error is below:
------------------------
KeyError('txt', 'occurred at index pm')

Traceback:
---------
  File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/utils.py", line 137, in raise_on_meta_error
    yield
  File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 3601, in _emulate
    return func(*_extract_meta(args, True), **_extract_meta(kwargs, True))
  File "fullscript.py", line 677, in <lambda>
    results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'], nlpe, nlpf))).compute(scheduler='distributed')
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/frame.py", line 6014, in apply
    return op.get_result()
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 318, in get_result
    return super(FrameRowApply, self).get_result()
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 142, in get_result
    return self.apply_standard()
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 248, in apply_standard
    self.apply_series_generator()
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 277, in apply_series_generator
    results[i] = self.f(v)
  File "fullscript.py", line 677, in <lambda>
    results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'], nlpe, nlpf))).compute(scheduler='distributed')
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/series.py", line 767, in __getitem__
    result = self.index.get_value(self, key)
  File "/usr/local/lib/python3.5/dist-packages/pandas/core/indexes/base.py", line 3118, in get_value
    tz=getattr(series.dtype, 'tz', None))
  File "pandas/_libs/index.pyx", line 106, in pandas._libs.index.IndexEngine.get_value
  File "pandas/_libs/index.pyx", line 114, in pandas._libs.index.IndexEngine.get_value
  File "pandas/_libs/index.pyx", line 164, in pandas._libs.index.IndexEngine.get_loc

Thanks

Erico

mrocklin commented 5 years ago

As suggested in the error message

To resolve this please provide a meta= keyword. The docstring of the Dask function you ran should have more information.

On Mon, Mar 25, 2019 at 10:52 AM erico-souza notifications@github.com wrote:

Hi

I have a large python function that I wish to execute in parallel. The dataframe contains only the text data and an identifier. When I execute the following command to process the function in parallel it works fine.

results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['_id'],nlpe, nlpf))).compute(scheduler='processes')```

When I try to run the same command using the dask-yarn interface, the command does not work:python results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['_id'],nlpe, nlpf))).compute(scheduler='distributed') The error I get is the following:

Traceback (most recent call last): File "pandas/_libs/index.pyx", line 162, in pandas._libs.index.IndexEngine.get_loc File "pandas/_libs/hashtable_class_helper.pxi", line 958, in pandas._libs.hashtable.Int64HashTable.get_item TypeError: an integer is required

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/utils.py", line 137, in raise_on_meta_error yield File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 3601, in _emulate return func(*_extract_meta(args, True), *_extract_meta(kwargs, True)) File "fullscript.py", line 677, in results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'], nlpe, nlpf))).compute(scheduler='distributed') File "/usr/local/lib/python3.5/dist-packages/pandas/core/frame.py", line 6014, in apply return op.get_result() File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 318, in get_result return super(FrameRowApply, self).get_result() File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 142, in get_result return self.apply_standard() File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 248, in apply_standard self.apply_series_generator() File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 277, in apply_series_generator results[i] = self.f(v) File "fullscript.py", line 677, in results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'], nlpe, nlpf))).compute(scheduler='distributed') File "/usr/local/lib/python3.5/dist-packages/pandas/core/series.py", line 767, in getitem* result = self.index.get_value(self, key) File "/usr/local/lib/python3.5/dist-packages/pandas/core/indexes/base.py", line 3118, in get_value tz=getattr(series.dtype, 'tz', None)) File "pandas/_libs/index.pyx", line 106, in pandas._libs.index.IndexEngine.get_value File "pandas/_libs/index.pyx", line 114, in pandas._libs.index.IndexEngine.get_value File "pandas/_libs/index.pyx", line 164, in pandas._libs.index.IndexEngine.get_loc KeyError: ('txt', 'occurred at index pm')

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "fullscript.py", line 748, in partitiondata(tmpdata2) File "fullscript.py", line 160, in partitiondata data = procdata(tmp) File "fullscript.py", line 677, in procdata results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'], nlpe, nlpf))).compute(scheduler='distributed') File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 543, in map_partitions return map_partitions(func, self, *args, **kwargs) File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 3640, in map_partitions meta = _emulate(func,

*args, udf=True, *kwargs2) File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 3601, in _emulate return func(_extract_meta(args, True), *_extract_meta(kwargs, True)) File "/usr/lib/python3.5/contextlib.py", line 77, in exit* self.gen.throw(type, value, traceback) File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/utils.py", line 154, in raise_on_meta_error raise ValueError(msg) ValueError: Metadata inference failed in lambda.

You have supplied a custom function and Dask is unable to determine the type of output that that function returns.

To resolve this please provide a meta= keyword. The docstring of the Dask function you ran should have more information. Original error is below:

KeyError('txt', 'occurred at index pm') Traceback:

File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/utils.py", line 137, in raise_on_meta_error yield File "/usr/local/lib/python3.5/dist-packages/dask/dataframe/core.py", line 3601, in _emulate return func(*_extract_meta(args, True), *_extract_meta(kwargs, True)) File "fullscript.py", line 677, in results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'], nlpe, nlpf))).compute(scheduler='distributed') File "/usr/local/lib/python3.5/dist-packages/pandas/core/frame.py", line 6014, in apply return op.get_result() File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 318, in get_result return super(FrameRowApply, self).get_result() File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 142, in get_result return self.apply_standard() File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 248, in apply_standard self.apply_series_generator() File "/usr/local/lib/python3.5/dist-packages/pandas/core/apply.py", line 277, in apply_series_generator results[i] = self.f(v) File "fullscript.py", line 677, in results = dd.from_pandas(dt,npartitions=nCores).map_partitions(lambda df1 : df1.apply(lambda x : proctxt(x['txt'], x['pm'], nlpe, nlpf))).compute(scheduler='distributed') File "/usr/local/lib/python3.5/dist-packages/pandas/core/series.py", line 767, in getitem* result = self.index.get_value(self, key) File "/usr/local/lib/python3.5/dist-packages/pandas/core/indexes/base.py", line 3118, in get_value tz=getattr(series.dtype, 'tz', None)) File "pandas/_libs/index.pyx", line 106, in pandas._libs.index.IndexEngine.get_value File "pandas/_libs/index.pyx", line 114, in pandas._libs.index.IndexEngine.get_value File "pandas/_libs/index.pyx", line 164, in pandas._libs.index.IndexEngine.get_loc```

Thanks

Erico

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2577, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszGL4QoigS5OluDIJu3bu05FVWS3cks5vaOLDgaJpZM4cHFNT .

erico-souza commented 5 years ago

But it still does not explain the issue with the parallel execution and distributed. Is there an explanation why the same function works in one case and not in the other?

Besides the error starts with

Traceback (most recent call last):
  File "pandas/_libs/index.pyx", line 162, in pandas._libs.index.IndexEngine.get_loc
  File "pandas/_libs/hashtable_class_helper.pxi", line 958, in pandas._libs.hashtable.Int64HashTable.get_item
TypeError: an integer is required

Would the error on the meta be generated by this?

mrocklin commented 5 years ago

Good point. I don't know why that would be the case.

On Mon, Mar 25, 2019 at 11:48 AM erico-souza notifications@github.com wrote:

But it still does not explain the issue with the parallel execution and distributed. Is there an explanation why the same function works in one case and not in the other?

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/dask/distributed/issues/2577#issuecomment-476257006, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszFurLc2bNqUL2WfKs-yrbgwKjGT-ks5vaO-9gaJpZM4cHFNT .

quasiben commented 5 years ago

Can you post the versions of dask/distributed/pandas you are using in both cases ? Are you using conda-pack to bundle an env for dask-yarn ?

erico-souza commented 5 years ago

All five members of the cluster have the same set of libraries. I do not have conda anywhere. Everything was installed with pip3 (I'm using Python 3.5.2).

As a brief description of my task: I want to run a NLP pipeline using Spacy in a cluster environment. I can run the function in a single machine with parallelism without problems. My issue is when I try to pass it to the hadoop server.

Thanks for your help

Erico

My dask-yarn version is

---
Metadata-Version: 2.1
Name: dask-yarn
Version: 0.5.1
Summary: Deploy dask clusters on Apache YARN
Home-page: UNKNOWN
Author: UNKNOWN
Author-email: UNKNOWN
Installer: pip
License: BSD
Location: /usr/local/lib/python3.5/dist-packages
Requires: dask, skein, distributed, grpcio
Classifiers:
Entry-points:
  [console_scripts]
  dask-yarn=dask_yarn.cli:main

My dask version is the following:

Metadata-Version: 2.0
Name: dask
Version: 1.0.0
Summary: Parallel PyData with Task Scheduling
Home-page: http://github.com/dask/dask/
Author: None
Author-email: None
Installer: pip
License: BSD
Location: /usr/local/lib/python3.5/dist-packages
Requires:
Classifiers:
  Programming Language :: Python :: 2
  Programming Language :: Python :: 2.7
  Programming Language :: Python :: 3
  Programming Language :: Python :: 3.5
  Programming Language :: Python :: 3.6
  Programming Language :: Python :: 3.7

I have the dask-ml (which may be important):

Metadata-Version: 2.1
Name: dask-ml
Version: 0.11.0
Summary: A library for distributed and parallel machine learning
Home-page: https://github.com/dask/dask-ml
Author: Tom Augspurger
Author-email: taugspurger@anaconda.com
Installer: pip
License: BSD
Location: /usr/local/lib/python3.5/dist-packages
Requires: dask, multipledispatch, scipy, numba, pandas, numpy, dask-glm, six, packaging, scikit-learn
Classifiers:
  Development Status :: 5 - Production/Stable
  Intended Audience :: Developers
  Topic :: Database
  Topic :: Scientific/Engineering
  License :: OSI Approved :: BSD License
  Programming Language :: Python :: 2
  Programming Language :: Python :: 2.7
  Programming Language :: Python :: 3
  Programming Language :: Python :: 3.4
  Programming Language :: Python :: 3.5
  Programming Language :: Python :: 3.6

Spacy Version:

Metadata-Version: 2.1
Name: spacy
Version: 2.1.2
Summary: Industrial-strength Natural Language Processing (NLP) with Python and Cython
Home-page: https://spacy.io
Author: Explosion AI
Author-email: contact@explosion.ai
Installer: pip
License: MIT
Location: /usr/local/lib/python3.5/dist-packages
Requires: requests, srsly, blis, preshed, cymem, jsonschema, thinc, wasabi, plac, murmurhash, numpy
Classifiers:
  Development Status :: 5 - Production/Stable
  Environment :: Console
  Intended Audience :: Developers
  Intended Audience :: Science/Research
  License :: OSI Approved :: MIT License
  Operating System :: POSIX :: Linux
  Operating System :: MacOS :: MacOS X
  Operating System :: Microsoft :: Windows
  Programming Language :: Cython
  Programming Language :: Python :: 2
  Programming Language :: Python :: 2.7
  Programming Language :: Python :: 3
  Programming Language :: Python :: 3.4
  Programming Language :: Python :: 3.5
  Programming Language :: Python :: 3.6
  Programming Language :: Python :: 3.7
  Topic :: Scientific/Engineering
mrocklin commented 5 years ago

@erico-souza I recommend two things:

  1. Call client.get_versions(check=True) to verify that your versions are consistent
  2. Produce a minimal example that maintainers can use to reproduce your error