Open aregm opened 4 years ago
@amyskov Is it possible with just a MapFunction
?
MapFunction.register(lambda x, value, side, sorter: x.squeeze(axis=1).searchsorted(value, side, sorter))
That is a start, you will need to convert the result of searchsorted
back to a DataFrame
, then in the API layer (dataframe.py
) convert it to pandas, then to a list. Does that make sense to you? Do you think it would work?
@devin-petersohn i don't think that it can help us to solve this problem, because after calling map function on each partition, we will obtain results from each function call, which have to be processed (reduced) anyway.
I have measured execution time of Series.searchsorted function for pure Pandas with different Series sizes (see attached graph, 1e8 elements in Series is equivalent to ~500 MB .csv file). The curve shape is very close to k*log(N) curve, as it mentioned in numpy docs and even for 1e9 elements, execution time is ~1e-4 second, that is less than potential Modin overheads. Based on this, i think that applying function on each partition separately is not good approach in this case and we have to function on the full frame at once. One of possible solutions is implemented in the linked to this issue PR in commit https://github.com/modin-project/modin/pull/1668/commits/97bcd1e6b9c112ce4e9e7aea5179e309cffa6436 using query_compiler._modin_frame._apply_full_axis
function. What do you think about it?
What is the execution time of the MapReduce
version you wrote in comparison with that chart?
Execution time of the MapReduce
version is greater than pure Pandas, see attached graph
Additional measurements of Modin searchsorted implementations (with usage of default_to_pandas
function and with query_compiler._modin_frame._apply_full_axis
introduced in the commit 97bcd1e6b9c112ce4e9e7aea5179e309cffa6436) with python and ray engines were made
As it can be seen from the graph, curve shapes of all implementations are close, so i think, that curve shape can be determined by Modin specific processes with partitions.
Script for execution time measurement
import modin.pandas as pd
from timeit import default_timer as timer
import csv
import random
import os
rows = [10, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8]
value_low = 0
value_high = 10
runs_number = 5
values = 5
csv_name_series = "../local_benches_data/local_bench_series.csv"
times = {}
for rows_number in rows:
try:
# creation of files with test data
with open(csv_name_series, "w", newline='') as f1:
w1=csv.writer(f1, delimiter=',')
w1.writerow(['data_column1'])
for i in range(int(rows_number)):
w1.writerow([round(random.uniform(value_low, value_high), 1)])
except Exception as exc:
print(exc)
os.remove(csv_name_series)
t_ser_modin_array = {}
for run in range(runs_number):
ser_modin = pd.read_csv(csv_name_series).squeeze().sort_values().reset_index(drop=True)
t0 = timer()
ans_ser_modin = ser_modin.searchsorted(values)
t_ser_modin_array[run] = timer() - t0
times[rows_number] = min(t_ser_modin_array.values())
print("times \n", times)
In order to find out the reason of such performance degradation in comparison with pandas, execution time of pure partitioning and pure functions applying was measured in modin.engines.base.frame.partition_manager.py::map_axis_partition
function (function that called from query_compiler._modin_frame._apply_full_axis
function) for 97bcd1e6b9c112ce4e9e7aea5179e309cffa6436 commit.
Partitioning part:
partitions = (
cls.column_partitions(partitions)
if not axis
else cls.row_partitions(partitions)
)
Function applying part:
result_blocks = np.array(
[
part.apply(preprocessed_map_func, num_splits=num_splits)
for part in partitions
]
)
Results are next
As it can be seen full_time curve (execution time of all searchsorted function) is mostly determined by partitioning time, which mostly spent by modin.engines.python.pandas_on_python.frame.axis_partition.py::PandasOnPythonFrameAxisPartition::__init__
in code block
for obj in list_of_blocks:
obj.drain_call_queue()
Using workaround in ba7594bdcdb697e01936ee6c58c5b2bff775f3c5 (clearing partitions call_queue before functions applying), partitioning time can be decreased
For Python engine full_execution time is decreased, but now it determined by function apply time (function apply time is large enough, that can be caused by data.copy in apply
function for python engine).
For Ray engine apply time is almost constant, but most of the time is spent on the new _modin_frame
creation in the query_compiler._modin_frame._apply_full_axis
function (doesn't shown on the chart).
It was found, that searchsorted
function performance degradation showed in the comments above determined by several aspects:
1) Time of draining call queue is big enough in comparison with function execution time. Since time of draining call queue is not directly related to the function implementation, this time should measured and analyzed separately.
2) During manipulations with partitions for python engine in pandas_on_python.frame.partition
partition data is copied, that causes performance degradation.
Reworked measurement script is shown below (call queue draining is measured separately):
```c# import modin.pandas as pd from timeit import default_timer as timer import csv import random import os rows = [10, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7, 1e8] value_low = 0 value_high = 10 runs_number = 5 values = 5 csv_name_series = "../local_benches_data/local_bench_series.csv" def drain_call_queue_of_partitions(partitions): for i in range(len(partitions)): for j in range(len(partitions[i])): partitions[i][j].drain_call_queue() times_searchsorted = {} times_drain_call_queue = {} for rows_number in rows: try: # creation of files with test data with open(csv_name_series, "w", newline='') as f1: w1=csv.writer(f1, delimiter=',') w1.writerow(['data_column1']) for i in range(int(rows_number)): w1.writerow([round(random.uniform(value_low, value_high), 1)]) except Exception as exc: print(exc) os.remove(csv_name_series) t_searchsorted = {} t_drain_call_queue = {} for run in range(runs_number): ser_modin = pd.read_csv(csv_name_series).squeeze().sort_values().reset_index(drop=True) t0 = timer() drain_call_queue_of_partitions(ser_modin._query_compiler._modin_frame._partitions) t_drain_call_queue[run] = timer() - t0 t0 = timer() ans = ser_modin.searchsorted(values) repr(ans) t_searchsorted[run] = timer() - t0 times_searchsorted[rows_number] = min(t_searchsorted.values()) times_drain_call_queue[rows_number] = min(t_drain_call_queue.values()) print("times_searchsorted \n", times_searchsorted) print("times_drain_call_queue \n", times_drain_call_queue) ```
Measurement results with removing data.copy() from pandas_on_python.frame.partition::apply
(commit 10a107dc3c22b44ffedbd054e983b60adbc75a11):
As it can be seen, pure time of searchsorted
function execution for python and ray engines is almost constant (shift of the mean value is determined by minimal overhead caused by manipulations with partitions). Time of call queue draining is big enough and depends on the dataset size (especially for python engine).
Analysis of performance degradation with data.copy() in pandas_on_python.frame.partition::apply
(comparison of 10a107dc3c22b44ffedbd054e983b60adbc75a11 and 5dc39e150b4d4c859bc65106604e7b41b50e9115 commits):
As it can be seen searchsorted
performance degrades with data set size increase with data.copy in the apply function.
implementation of this function was reverted and now defaults to pandas #2655, new implementation is needed
First version of searchsorted function was implemented via MapReduce approach, but it was found that function performance degraded in comparison with pure Pandas. The reason is that complexity of numpy.searchsorted (that is called in Pandas) is very low and Modin conversions and reduce stage overheads became significant. Looking for a way to solve this issue.