modin-project / modin

Modin: Scale your Pandas workflows by changing a single line of code
http://modin.readthedocs.io
Apache License 2.0
9.81k stars 651 forks source link

Pleasingly Parallel Applications with Modin #3069

Open vibhatha opened 3 years ago

vibhatha commented 3 years ago

I am looking into the parallelism offered by modern, especially interested in the autonomous parallelism offered with Ray backend.

Is it possible to design pleasingly parallel applications with Modin?

Simply split the data into the number of CPUs and execute operators such that no communication operations are taken place among the processes.

devin-petersohn commented 3 years ago

Parallelism for dataframes comes in many forms. Modin will try to keep the data in its existing layout, but sometimes that is not possible. See https://arxiv.org/pdf/2001.00888 for details on this.

Trivially parallelizable dataframe programs are fairly straightforward to construct. applymap is the simplest way to achieve this.

vibhatha commented 3 years ago

@devin-petersohn I checked out the paper. But assume a case where someone needs to do all the data engineering operators in a pleasingly parallel manner, in that case, it is hard to use applymap operator to do all the data engineering operators. Is this a feature that will be supported in future?

devin-petersohn commented 3 years ago

@vibhatha I am not sure I understand what you mean. Modin is able to parallelize and distribute work across all axes and even across cells. Most operators are rewritten into a low-level set of operators described in the paper. I do not understand the term "pleasingly parallel", what does it mean? I assumed it meant embarrassingly parallel, which is why I suggested applymap.

vibhatha commented 3 years ago

Pleasingly parallel means embarrassingly parallel. But can applymap be used to write other operators like join, groupby, loc,etc.

On Mon, May 24, 2021 at 11:27 AM Devin Petersohn @.***> wrote:

@vibhatha https://github.com/vibhatha I am not sure I understand what you mean. Modin is able to parallelize and distribute work across all axes and even across cells. Most operators are rewritten into a low-level set of operators described in the paper. I do not understand the term "pleasingly parallel", what does it mean? I assumed it meant embarrassingly parallel, which is why I suggested applymap.

— You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub https://github.com/modin-project/modin/issues/3069#issuecomment-847120614, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC45OE3UHLULVCFCPXMP3L3TPJV4JANCNFSM446ODRAA .

-- Vibhatha Abeykoon

devin-petersohn commented 3 years ago

If you are asking about the parallelism required for each operator, we have not released the paper proving the decomposition rules yet (it is under submission). From your question, join and groupby decompose into group-wise parallelism, loc is a filter. Dataframe operations require different levels of parallelism, so whether or not something is embarrassingly parallel is not enough, you must also know if it is embarrassingly parallel across rows or columns or cells. Modin supports all of these modes of parallelism, most systems cannot.

vibhatha commented 3 years ago

Simply, think of an application which has a join followed by a filter and a groupby.

Here the join is a distributed operation, but the filter needs to run in pleasingly parallel manner. And again the groupby as a distributed operation. This is a classical bulk synchronous parallel mode of execution where operators can be selected to run in either format depending on vivid use cases. Can this be handled in Modin? Assume this is row based data parallel mode of execution.

On Mon, May 24, 2021 at 3:55 PM Devin Petersohn @.***> wrote:

If you are asking about the parallelism required for each operator, we have not released the paper proving the decomposition rules yet (it is under submission). From your question, join and groupby decompose into group-wise parallelism, loc is a filter. Dataframe operations require different levels of parallelism, so whether or not something is embarrassingly parallel is not enough, you must also know if it is embarrassingly parallel across rows or columns or cells. Modin supports all of these modes of parallelism, most systems cannot.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/modin-project/modin/issues/3069#issuecomment-847295030, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC45OEYV52US6554ASPBRATTPKVKBANCNFSM446ODRAA .

-- Vibhatha Abeykoon

devin-petersohn commented 3 years ago

Yes, this can be done in Modin.

vibhatha commented 3 years ago

Can a user handle this in dataframe level? Can you give me an example how to do it?

On Mon, May 24, 2021 at 8:10 PM Devin Petersohn @.***> wrote:

Yes, this can be done in Modin.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/modin-project/modin/issues/3069#issuecomment-847436303, or unsubscribe https://github.com/notifications/unsubscribe-auth/AC45OE7XKPR4GXIMCBIDRVLTPLTIPANCNFSM446ODRAA .

-- Vibhatha Abeykoon

vibhatha commented 3 years ago

@devin-petersohn Here I am referring to a code snippet.

Note: If you can also show case how to run a join or a filter in embarrassingly parallel vs distributed for partitioned data would be highly appreciated.

RehanSD commented 1 year ago

Hi @vibhatha ! For more information about how our operators decompose, you can check out our latest paper: https://www.vldb.org/pvldb/vol15/p739-petersohn.pdf.

As for a code snippet, we are currently working on implementing operators like sort, groupby, and join in a parallel fashion - #4601 is a PR that implements, sort, while groupby and join are soon to follow! Hope this helps - please let me know if you have any other questions!

vibhatha commented 1 year ago

@RehanSD thank you for following up with this. I will take a look at the resources.

vibhatha commented 1 year ago

@RehanSD is there an open source script to run the join benchmarks. It is very interesting to see the new changes.

RehanSD commented 1 year ago

@RehanSD is there an open source script to run the join benchmarks. It is very interesting to see the new changes.

Hi @vibhatha! We still don't have benchmarks for the new (not broadcast join) join - I'll ping this thread with benchmarks + numbers once we have a prototype built out!

vibhatha commented 1 year ago

Awesome, @RehanSD, thank you. Looking forward to it.

RehanSD commented 1 year ago
Hi @vibhatha! The sort PR just got merged - here are some numbers for 5 million rows x 100 columns on an M5.2XLarge! pandas Modin (Ray)
sort on 1 column 2.890 ± 0.047 s 8.462 ± 0.0225 s
sort on 3 columns 16.07 ± 0.0747 s 9.71 ± 0.0799 s
vibhatha commented 1 year ago

@RehanSD thanks a lot for sharing the updates with me, it is very interesting to see the improvements. Is there a benchmark script that I can try out?

RehanSD commented 1 year ago

@RehanSD thanks a lot for sharing the updates with me, it is very interesting to see the improvements. Is there a benchmark script that I can try out?

Yup - the new sort is implemented fully, so calling sort_values will hit it, but I used the following script to get the numbers for the table above:

from time import perf_counter
import ray
import numpy as np

from modin.config import IsRayCluster, BenchmarkMode
IsRayCluster.put(True)
BenchmarkMode.put(True)
ray.init(object_store_memory=2.79586e10) # Make sure Ray has enough Object Store Memory

import modin.pandas as pd
# import pandas as pd

df = pd.DataFrame(np.random.standard_cauchy(size=(5_000_000, 100)), columns=[f"col {i}" for i in range(100)])
# df = pd.DataFrame(np.random.uniform(size=(5_000_000, 100), high=5_000_000), columns=[f"col {i}" for i in range(100)])

start = perf_counter()
df.sort_values(by=['col 0', 'col 1', 'col 2'])
end = perf_counter()
print(f"{end - start}")

Hope this helps!

vibhatha commented 1 year ago

@RehanSD thanks a lot for sharing the updates with me, it is very interesting to see the improvements. Is there a benchmark script that I can try out?

Yup - the new sort is implemented fully, so calling sort_values will hit it, but I used the following script to get the numbers for the table above:

from time import perf_counter
import ray
import numpy as np

from modin.config import IsRayCluster, BenchmarkMode
IsRayCluster.put(True)
BenchmarkMode.put(True)
ray.init(object_store_memory=2.79586e10) # Make sure Ray has enough Object Store Memory

import modin.pandas as pd
# import pandas as pd

df = pd.DataFrame(np.random.standard_cauchy(size=(5_000_000, 100)), columns=[f"col {i}" for i in range(100)])
# df = pd.DataFrame(np.random.uniform(size=(5_000_000, 100), high=5_000_000), columns=[f"col {i}" for i in range(100)])

start = perf_counter()
df.sort_values(by=['col 0', 'col 1', 'col 2'])
end = perf_counter()
print(f"{end - start}")

Hope this helps!

Perfect! This helps.