modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.88k stars 653 forks source link

to_parquet is not supported #626

Closed dazza-codes closed 3 years ago

dazza-codes commented 5 years ago

System information

$ cat /etc/lsb-release 
DISTRIB_ID=Ubuntu
DISTRIB_RELEASE=18.04
DISTRIB_CODENAME=bionic
DISTRIB_DESCRIPTION="Ubuntu 18.04.2 LTS"
$ conda --version
conda 4.6.14
$ python --version
Python 3.7.3
$ pip --version
pip 19.1 from /home/dlweber/miniconda3/envs/gis-dataprocessing/lib/python3.7/site-packages/pip (python 3.7)

$ pip freeze | grep modin
modin==0.5.0
$ pip freeze | grep pandas
pandas==0.24.2
$ pip freeze | grep numpy
numpy==1.16.3

miniconda3 was used to install most of the sci-py stack, with a pip clause to add modin, e.g.

# environment.yaml
channels:
  - conda-forge
  - defaults

dependencies:
  - python>=3.7
  - affine
  - configobj
  - dask
  - numpy
  - pandas
  - pyarrow
  - rasterio
  - s3fs
  - scikit-learn
  - scipy
  - shapely
  - xarray
  - pip
  - pip:
    - modin

Describe the problem

https://modin.readthedocs.io/en/latest/pandas_supported.html says to_parquet is supported, but maybe not:

import numpy as np
import modin.pandas as pd
size = (1, 10 * 10)
column_ij = ["%04d_%04d" % (i, j) for i in range(10) for j in range(10)]
data = np.random.randint(0, 10000, size=size, dtype="uint16")
df = pd.DataFrame(data, columns=column_ij)
df.to_parquet('/tmp/tmp.parquet')
UserWarning: `DataFrame.to_parquet` defaulting to pandas implementation.

More details:

2019-05-21 16:03:46,207 WARNING worker.py:1337 -- WARNING: Not updating worker name since `setproctitle` is not installed. Install this with `pip install setproctitle` (or ray[debug]) to enable monitoring of worker processes.
2019-05-21 16:03:46,207 INFO node.py:469 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-21_16-03-46_18437/logs.
2019-05-21 16:03:46,310 INFO services.py:407 -- Waiting for redis server at 127.0.0.1:55558 to respond...
2019-05-21 16:03:46,418 INFO services.py:407 -- Waiting for redis server at 127.0.0.1:41726 to respond...
2019-05-21 16:03:46,420 INFO services.py:804 -- Starting Redis shard with 2.1 GB max memory.
2019-05-21 16:03:46,426 INFO node.py:483 -- Process STDOUT and STDERR is being redirected to /tmp/ray/session_2019-05-21_16-03-46_18437/logs.
2019-05-21 16:03:46,427 WARNING services.py:1304 -- WARNING: The object store is using /tmp instead of /dev/shm because /dev/shm has only 5238738944 bytes available. This may slow down performance! You may be able to free up space by deleting files in /dev/shm or terminating any running plasma_store_server processes. If you are inside a Docker container, you may need to pass an argument with the flag '--shm-size' to 'docker run'.
2019-05-21 16:03:46,427 INFO services.py:1427 -- Starting the Plasma object store with 6.0 GB memory using /tmp.
UserWarning: Distributing <class 'list'> object. This may take some time.
UserWarning: `DataFrame.to_parquet` defaulting to pandas implementation.
To request implementation, send an email to feature_requests@modin.org.

Maybe modin could be added to conda-forge so that conda can help with resolving version dependencies?

devin-petersohn commented 5 years ago

Thanks @darrenleeweber, good point. The documentation is not very clear or consistent about the to_<something> methods. It could use an update once we get #613 merged in. I'll try to dedicate some time to that this week.

Maybe modin could be added to conda-forge so that conda can help with resolving version dependencies?

I'm definitely open to the idea. Does pip cause problems for you in your environment due to package collisions?

dazza-codes commented 5 years ago

Also, the from_parquet is not supported for a partitioned data set. Same versions as above.


  File "/home/joe/miniconda3/envs/project/lib/python3.7/site-packages/modin/backends/pandas/query_compiler.py", line 871, in _full_reduce
    mapped_parts = self.data.map_across_blocks(map_func)
  File "/home/joe/miniconda3/envs/project/lib/python3.7/site-packages/modin/engines/base/frame/partition_manager.py", line 209, in map_across_blocks
    preprocessed_map_func = self.preprocess_func(map_func)
  File "/home/joe/miniconda3/envs/project/lib/python3.7/site-packages/modin/engines/base/frame/partition_manager.py", line 100, in preprocess_func
    return self._partition_class.preprocess_func(map_func)
  File "/home/joe/miniconda3/envs/project/lib/python3.7/site-packages/modin/engines/ray/pandas_on_ray/frame/partition.py", line 108, in preprocess_func
    return ray.put(func)
  File "/home/joe/miniconda3/envs/project/lib/python3.7/site-packages/ray/worker.py", line 2216, in put
    worker.put_object(object_id, value)
  File "/home/joe/miniconda3/envs/project/lib/python3.7/site-packages/ray/worker.py", line 375, in put_object
    self.store_and_register(object_id, value)
  File "/home/joe/miniconda3/envs/project/lib/python3.7/site-packages/ray/worker.py", line 309, in store_and_register
    self.task_driver_id))
  File "/home/joe/miniconda3/envs/project/lib/python3.7/site-packages/ray/utils.py", line 475, in _wrapper
    return orig_attr(*args, **kwargs)
  File "pyarrow/_plasma.pyx", line 496, in pyarrow._plasma.PlasmaClient.put
  File "pyarrow/serialization.pxi", line 355, in pyarrow.lib.serialize
  File "pyarrow/serialization.pxi", line 150, in pyarrow.lib.SerializationContext._serialize_callback
  File "/home/joe/miniconda3/envs/project/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle.py", line 952, in dumps
    cp.dump(obj)
  File "/home/joe/miniconda3/envs/project/lib/python3.7/site-packages/ray/cloudpickle/cloudpickle.py", line 271, in dump
    raise pickle.PicklingError(msg)
_pickle.PicklingError: Could not pickle object as excessively deep recursion required.
dazza-codes commented 5 years ago

For relatively small data sets, e.g. a 10x10 numpy.ndarray, the overhead of serializing to/from plasma and tracking the objectIDs is killing performance. A single process using pyarrow directly with pandas is much faster with parquet IO. Whether or not the pure IO aspects of modin can out-perform pandas/pyarrow is a primary interest - esp. when selecting parquet columns and whether or not modin can perform concurrent reads of parquet.

dazza-codes commented 5 years ago

Does pip cause problems for you in your environment due to package collisions?

Somehow, it's working OK when the pip requirements are bundled into the conda environment.yaml specs. It seems to have the correct release versions of modin and pandas (as defined in the setup.py for modin). BTW, it's early days for modin, so I get it, but try to relax the version sem-ver on pandas if possible.

devin-petersohn commented 5 years ago

Also, the from_parquet is not supported for a partitioned data set. Same versions as above.

Thanks, would you mind to open a new ticket for this one since it's not related to the original issue? It's fine to just leave a link to your system info on this page, but I prefer to track it separately. If you are able, please also include a way I can reproduce the issue.

For relatively small data sets, e.g. a 10x10 numpy.ndarray, the overhead of serializing to/from plasma and tracking the objectIDs is killing performance.

You're right, the performance on a few hundred bytes of data will be worse, and that's not likely to change much. The difference here is a matter of ~10-20ms (depending on system specs), which is not typically noticeable to the user and should not negatively affect any workflows in a meaningful way in most use cases. There's a chance this could change as we transition to Pyarrow compute kernels, but that would be more of a side-effect because many overheads are fixed, and small data is a solved problem for most of these tools.

BTW, it's early days for modin, so I get it, but try to relax the version sem-ver on pandas if possible.

We pin to a specific pandas version so that the behavior of Modin can be well-defined in the context of that pandas version. Because pandas still has API changes, and the behavior is not consistent between minor versions, we pin to the newest version to make sure we can keep the guarantee of a true drop-in replacement. We don't have the manpower to guarantee this for more than one version, which is why it is pinned.

devin-petersohn commented 5 years ago

Let me clarify since it may not have been clear:

Users don't need Modin to process 100's of bytes. Pandas works at a microsecond level in that data range, and it's optimized for that scale (keeping around copies, vectorization, etc.). There's no chance Modin can be faster than pandas on 400 bytes of data because serialization alone will exceed that. Do you have thoughts on this?

dazza-codes commented 5 years ago
devin-petersohn commented 5 years ago

pin to sem-ver like 0.x and allow the patch/bug-fix releases like 0.x.y to increment automatically (hope that makes sense)

This makes sense, but there are also behaviors that change between versions that can break our tests. E.g. in pandas 0.24.0, functionality was broken that we detected in our tests because the Modin behavior was correct and pandas had a regression. To make it compatible with 0.24.0, we intentionally introduced a regression. We contributed a fix, but that didn't get merged in until 0.24.2.

(although it would be 💯 + if it could be lazy too)

Stay tuned 😄

RE documentation, the dask docs could be relevant to performance advice,

We should do something like this. A list of behaviors to avoid would be great, and it is likely that those same behaviors are also bad in pandas.

dazza-codes commented 5 years ago
devin-petersohn commented 3 years ago

I created a POC for to_parquet that works:

@ray.remote
def write_partition(df, path):
    df.to_parquet(path)
    return 0

from modin.distributed.dataframe.pandas import unwrap_partitions
partitions = unwrap_partitions(df, axis=0)
ret = [write_partition.remote(part, f"{output_path}/part-{i:04d}.snappy.parquet") for i, part in enumerate(partitions)]
ray.get(ret)
return output_path