dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
386 stars 71 forks source link

[ENH] Support a "coalesce" repartitioning type hint #280

Open randerzander opened 2 years ago

randerzander commented 2 years ago

In SQL, it's common to work w/ large data and aggregate or filter it down to few enough rows that it could be merged into a single partition in memory.

Today you can achieve this with something like:

import pandas as pd
import dask.dataframe as dd

pdf = pd.DataFrame({'val': [0, 1, 2, 3, 4, 5]})
ddf = dd.from_pandas(pdf, npartitions=2)

c.create_table('test', ddf)
print(c.sql("select * from test").npartitions)
# 1
coalesced_ddf = c.sql("select * from test").repartition(npartitions=1)
c.create_table("test_coalesced", coalesced_ddf)
print(c.sql("select * from test_coalesced").npartitions)
# 2

It would be nice to support something like:

c.sql("""
SELECT /*+ COALESCE(1) */ * FROM test
""").npartitions
# 1

As a motivator, Dask DataFrames can use a broadcast or "map-side" join if one of the DataFrames consists of a single partition. Allowing users to specify partition coalescing hints will allow finer control over performance of Dask-SQL join performance.

randerzander commented 2 years ago

https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-hints.html for reference

ayushdg commented 2 years ago

I believe it would also be useful to add join based hints as well to explicitly specify joins to go via the broadcast_join path in dask-sql. The syntax is similar to the one specified here: https://spark.apache.org/docs/3.0.0/sql-ref-syntax-qry-select-hints.html#join-hints and the api used on the dask side would be the broadcast param in the dask merge api. https://docs.dask.org/en/latest/generated/dask.dataframe.DataFrame.merge.html?highlight=merge#dask-dataframe-dataframe-merge

jdye64 commented 2 years ago

I can handle this one