dask-contrib / dask-sql

Distributed SQL Engine in Python using Dask
https://dask-sql.readthedocs.io/
MIT License
397 stars 72 forks source link

[ENH] Support predicate pushdown in dask-sql #323

Open ayushdg opened 3 years ago

ayushdg commented 3 years ago

Is your feature request related to a problem? Please describe. It would be great if filter operations in a sql clause could be pushed down to the io layer for formats like parquet that support filtering based on statistics/metadata.

Describe the solution you'd like There are a couple of ways this could be tackled:

  1. Implement this as a part of the graph optimization step within dask HLG's where dask would automatically push the filter steps down to the IO. There have been mentions of it in a few dask issues/discussions: https://github.com/dask/dask/issues/7933 and https://github.com/dask/dask/issues/7090#issuecomment-766272168, so there might be value in solving it at that level since the feature might be of value to other's in the dask community as well.
  2. Implant this by adding custom rule based optimizations during the planning stage to push the filters as close to the table scan step as possible and have logic that converts the filter predicates down to the IO stage for formats that support this. There his been some initial mention of this approach as well in https://github.com/dask-contrib/dask-sql/issues/183.

Describe alternatives you've considered While true predicate pushdown is not possible within dask-sql today, an alternative option of passing filters while reading parquet/orc datasets is possible the with Dask Dataframe api, and users could create tables from those dataframes if possible.

Curious to know what other's thoughts are and if they have a preference on which approach might be beneficial. I'm personally leaning towards 1. since it has the potential to benefit other dask users as well but I'm not sure of how complex the implementation would be.

cc: @rjzamora who is probably more familiar than I am with parquet/orc in dask and the hlg work there.

rjzamora commented 2 years ago

Thanks for the ping @ayushdg !

I was about to start exploring basic predicate-pushdown optimizations in dask-dataframe, but paused the effort when the idea of a high-level expression system started picking up steam as the "preferred solution" (and I became quite distracted by other things). I don't think it will difficult to implement predicate pushdown for filter operations that come immediately after IO, but "smarter" optimizations will likely require relatively-large changes in HLG/HLE (long story short: For non-trivial optimizations, we need to store/mutate a lot more high-level collection information than the current HLG layer is "aware" of). Therefore, my overall message would be that "this work is certainly in scope, but may take some time if the initial Dask graph does not just start with: IO layer followed by filter."

I don't know much about SQL or dask-sql, but I would honestly expect an existing query optimization engine (like Apache Calcite) to be better equipped than Dask for complex predicate-pushdown optimizations in the short term. We absolutely do plan to improve graph-optimization in Dask-Dataframe, but it does seem like a successful design will essentially capture what a good query-optimization engine is already designed to do. I will try to take some time to learn a bit more about sql/dask-sql and Calcite, but I'd certainly be grateful if someone had the time to write up a simple explanation of how an SQL query is converted to set of Dask-API calls in dask-sql :)

EDIT: After looking through dask-sql a bit (very cool btw), I do get the impression that it should be pretty straightforward to enable predicate pushdown for WHERE clauses in the case that DataFrame-collection that is updated here is just a single-layer HLG with a DataFrameIOLayer (or a two-layer graph with a DataFrameIOLayer and a Blockwise-based getitem layer, as would probably be the case after SELECT).

charlesbluca commented 2 years ago

While true predicate pushdown is not possible within dask-sql today, an alternative option of passing filters while reading parquet/orc datasets is possible the with Dask Dataframe api, and users could create tables from those dataframes if possible.

It's not true predicate pushdown, but would it be a reasonable short term solution to add support for filters in CREATE TABLE ... WITH (...) statements? This would at least allow dask-sql's read functionality to match up with Dask's in this regard.

randerzander commented 2 years ago

It would be nice to support filters in CREATE TABLE ... WITH (CTW) statements, but the primary use case is for predicates supplied and applied "dynamically" at query time.

A data engineer "in the loop" would certainly understand the ability to use CTW syntax to achieve the same effect. But queries generated programmatically or via things like a self-service BI tool wont.