Closed devin-petersohn closed 5 years ago
Hi @devin-petersohn. I started working on a simply multiprocess engine. My approach is quite naive. In the partition implementation I simply send the work to a different process. Similar to what Ray does with a remote task. Had some serialization issues so for now using dill instead of pickle. I have run it on the following example and it indeed works and opens new python processes for each partition
import modin.pandas as pd
import numpy as np
data = np.random.randint(0, 7, size=(10000, 100))
df = pd.DataFrame(data)
print(df)
print(df.count())
My code is far from complete, hence, I am not opening a PR just yet (unless we want to put it in WIP). Following is my partition class. Does this approach makes sense? feedbacks would be appreciated
import pandas
from multiprocessing import Pool
from modin.data_management.utils import length_fn_pandas, width_fn_pandas
from modin.engines.base.frame.partition import BaseFramePartition
def func_wrapper(call_queue, data):
import dill
def deserialize(obj):
if isinstance(obj, bytes):
return dill.loads(obj)
return obj
for func, kwargs in call_queue:
func = deserialize(func)
kwargs = deserialize(kwargs)
data = func(data, **kwargs)
return data
def deploy_mp(call_queue, data):
pid = pool.starmap_async(func_wrapper, [(call_queue, data), ])
return pid
def put(obj):
return obj
def put_mp(obj):
pid = pool.map_async(put, (obj,))
return pid
pool = Pool()
class PandasOnMultiprocessFramePartition(BaseFramePartition):
"""This abstract class holds the data and metadata for a single partition.
The methods required for implementing this abstract class are listed in
the section immediately following this.
The API exposed by the children of this object is used in
`BaseFrameManager`.
Note: These objects are treated as immutable by `BaseFrameManager`
subclasses. There is no logic for updating inplace.
"""
def __init__(self, pid, length=None, width=None, call_queue=[]):
self.pid = pid
self.call_queue = call_queue
self._length_cache = length
self._width_cache = width
def get(self):
"""Flushes the call_queue and returns the data.
Note: Since this object is a simple wrapper, just return the data.
Returns:
The object that was `put`.
"""
self.drain_call_queue()
# blocking operation
df = self.pid.get()[0]
return df
def apply(self, func, **kwargs):
"""Apply some callable function to the data in this partition.
Note: It is up to the implementation how kwargs are handled. They are
an important part of many implementations. As of right now, they
are not serialized.
Args:
func: The lambda to apply (may already be correctly formatted)
Returns:
A new `BaseFramePartition` containing the object that has had `func`
applied to it.
"""
call_queue = self.call_queue + [(func, kwargs)]
self.call_queue = []
pid = deploy_mp(call_queue, self.get().copy())
return PandasOnMultiprocessFramePartition(pid)
def add_to_apply_calls(self, func, **kwargs):
return PandasOnMultiprocessFramePartition(
self.pid, call_queue=self.call_queue + [(func, kwargs)]
)
def drain_call_queue(self):
if len(self.call_queue) == 0:
return
self.apply(lambda x: x)
def mask(self, row_indices=None, col_indices=None):
new_obj = self.add_to_apply_calls(
lambda df: pandas.DataFrame(df.iloc[row_indices, col_indices])
)
new_obj._length_cache, new_obj._width_cache = len(row_indices), len(col_indices)
return new_obj
def __copy__(self):
return PandasOnMultiprocessFramePartition(
self.pid, self._length_cache, self._width_cache
)
def to_pandas(self):
"""Convert the object stored in this partition to a Pandas DataFrame.
Note: If the underlying object is a Pandas DataFrame, this will likely
only need to call `get`
Returns:
A Pandas DataFrame.
"""
dataframe = self.get()
assert type(dataframe) is pandas.DataFrame or type(dataframe) is pandas.Series
return dataframe
@classmethod
def put(cls, obj):
"""A factory classmethod to format a given object.
Args:
obj: An object.
Returns:
A `RemotePartitions` object.
"""
pid = put_mp(obj)
return cls(pid)
@classmethod
def preprocess_func(cls, func):
"""Preprocess a function before an `apply` call.
Note: This is a classmethod because the definition of how to preprocess
should be class-wide. Also, we may want to use this before we
deploy a preprocessed function to multiple `BaseFramePartition`
objects.
Args:
func: The function to preprocess.
Returns:
An object that can be accepted by `apply`.
"""
import dill
return dill.dumps(func)
@classmethod
def length_extraction_fn(cls):
"""The function to compute the length of the object in this partition.
Returns:
A callable function.
"""
return length_fn_pandas
@classmethod
def width_extraction_fn(cls):
"""The function to compute the width of the object in this partition.
Returns:
A callable function.
"""
return width_fn_pandas
_length_cache = None
_width_cache = None
def length(self):
if self._length_cache is None:
self._length_cache = type(self).length_extraction_fn()(self.get())
return self._length_cache
def width(self):
if self._width_cache is None:
self._width_cache = type(self).width_extraction_fn()(self.get())
return self._width_cache
@classmethod
def empty(cls):
return cls(pandas.DataFrame())
That looks great @eavidan, would cloudpickle work instead of dill? It is likely because we use lambda functions internally, and if I remember correctly, pickle doesn't support lambdas.
Feel free to open a WIP PR, that way I can comment and pull the code to test/play around.
We will have preliminary windows support starting with release 0.6.0.
Has this been implemented?
@christina-zhou-96 yes, the README has the details on how to get started.
Thanks. I used pip install modin[dask]
as I was having problems installing ray
.
I found that if I didn't include this line os.environ["MODIN_ENGINE"] = "dask"
, I received this error:
Traceback (most recent call last): File "D:\PycharmProjects\Regents Closeout\venv-py3\lib\site-packages\modin__init__.py", line 28, in get_execution_engine import ray ModuleNotFoundError: No module named 'ray'
I don't know whether modin runs successfully as I'm still receiving runtime errors, but at least now it's imported fine.
Can you open a new issue on that? It might be a Python version issue with the standard Error types.
Yes, thank you. Just noting that according to this trial, the docs inaccurately state that modin will automatically detect which engine I have.
Yes, it should be detecting it, but that is what the new issue will be about 😄
Ray does not support Windows. It would be good to publish wheels that don't require Ray for Windows users. This is also related to publishing to conda-forge.
We can add an engine that runs strictly off of Python Multiprocessing for starters. It should be relatively straight-forward to add.
Marking as Help Wanted, feel free to reach out about required implementation details.