Open fengguangyuan opened 2 years ago
This shouldn't be implemented as an optimizer rule. Instead, Iceberg connector should support applyAggregation
.
Things to keep in mind
max
for given column) and some files are still processed (we don't have max
). Today this would require implementing the aggregations on the connector side, but would be nice if the SPI allowed the connector and the engine to cooperate. For example, a connector would tell the SPI it's accepting the pushdown, but will return "partial aggregation" of some sort. cc @sopel39 @martint @losipiuk min
and max
are not guaranteed to be exact.
varchars
can be truncatedtimestamp
values are rounded up or down to millisecond precision (in case of ORC)count
is the easiest, as it doesn't suffer from exactness doubts, but it's also probably least important. count(*)
requires reading no columns, so only file metadata is touched. Room for improvement still.cc @alexjo2144 @homar
- ideally we should support "mixed mode" where some files are skipped (e.g. we have
max
for given column) and some files are still processed (we don't havemax
). Today this would require implementing the aggregations on the connector side, but would be nice if the SPI allowed the connector and the engine to cooperate. For example, a connector would tell the SPI it's accepting the pushdown, but will return "partial aggregation" of some sort. cc @sopel39 @martint @losipiuk
See also https://github.com/trinodb/trino/pull/10964 which could benefit with a similar concept.
@findepi Thanks for your reply. Yes, indeed what you mentioned is the key points, I don't agree that any more, and that's why I said Limitations
.
There two ways to do the optimization, possibly I thought:
ShowStatsRewrite
, just extracting aggregated values from stats, if the stats are not Null
or NaN
, and the columns stats are reliable.Local Aggregation
in each split (simply regarded as a data file), so that no matter the file has the stats or not, the accurate stats can be computed or extract from each data file adaptively. Therefore more works need on SPI side.But the two approaches are both based on the assumption: if the expected stats are not NULLs or NaNs, they should be correct, at least for the Iceberg connector, otherwise they are not reliable and should be calculated from the real data.
Considering the implementation complexity, we just simply implemented the easier approach, which will skip rewriting plan once found the unreliable stats or found min/max on the non-numeric columns (except timestamp type), because Trino only carries double ranges
, then the plan can still aggregating the real data, so the correctness could be guaranteed, no matter on table level or column level.
After all, the rule approach is the cheapest, while the mix-mode approach is the ideal, but need much more nuts to crack. :)
which will skip rewriting plan once found the unreliable stats or found min/max on the non-numeric columns (expect timestamp type), because Trino only carries double ranges
oh, you mean base the logic on io.trino.spi.statistics.ColumnStatistics
?
that's not the write API, as statistics are defined to allow them to be inexact, also for numeric types.
per #18, the API to use for aggregation pushdown is the ConnectorMetadata#applyAggregation
.
oh, you mean base the logic on io.trino.spi.statistics.ColumnStatistics? that's not the write API, as statistics are defined to allow them to be inexact, also for numeric types.
Yep, this optimize is only for queries, based on ColumnStatistics
.
per https://github.com/trinodb/trino/issues/18, the API to use for aggregation pushdown is the ConnectorMetadata#applyAggregation.
Thanks for the tips, but I think the mix-mode implementation is far more than this one interface.
So you guys prefer the mix-mode implementation? :)
@findepi can you please share any update on this, are we planning to use the min/max
if present, for the query optimization? thanks
I would probably start with an implementation for applyAggregation
for count(*). Min/max are going to have the problem of making sure that Parquet/ORC's sorting is the same as what we want for Trino sorting, but count should be a bit simpler. As long as all files report row count stats in the manifest, we don't have to read any data files. (as long as there are no unenforced predicates)
Min/max are going to have the problem of making sure that Parquet/ORC's sorting is the same as what we want for Trino sorting
These must not be different, otherwise predicate pushdown would be totally wrong.
But min/max can be inaccurate (varchars truncated, timestamps rounded).
As long as all files report row count stats in the manifest, we don't have to read any data files.
When some files have row count in the manifest, but some do not, microplans could be useful -- https://github.com/trinodb/trino/issues/13534
These must not be different, otherwise predicate pushdown would be totally wrong.
IIRC in Delta we had to skip using min/max values for pushdown of Double types in certain situations.
it's not strictly because of ordering (as in ORDER BY
). Rather, it's because of semantics of double comparisons with NaN (they 5 < NaN
and 5 >= NaN
are both false). This impacts refusal of a Domain
to handle NaNs (these would need to be handled explicitly, just like NULLs).
@findepi @alexjo2144 even if the aggregation like count
can be pushed down and use the metrics (if available) then it will be very useful.
So, shall we start with supporting aggregate push down for Iceberg and handle the case for count
?
I would probably start with an implementation for
applyAggregation
for count(*). Min/max are going to have the problem of making sure that Parquet/ORC's sorting is the same as what we want for Trino sorting, but count should be a bit simpler. As long as all files report row count stats in the manifest, we don't have to read any data files. (as long as there are no unenforced predicates)
@alexjo2144 I have started working on a PR for the count(*)
support. thanks!
Hi. I am new to iceberg and I was also thinking on similar lines , though from a different perspective. Currently spark allows Dynamic Partition Pruning and the underlying data sources return the filter columns only if they are partitioned. If we allow non partition cols also participate in DPP, then the DPP query becomes expensive and is not worth. I am wondering if the DPP query is lightweight & approximate ( needing only the min/max values for non partition column as joining key) , would we see benefit.
Quick update: Refining some of the work in the local, then will start WIP, as will continue to add test-cases etc..
Hi. I am new to iceberg and I was also thinking on similar lines , though from a different perspective. Currently spark allows Dynamic Partition Pruning and the underlying data sources return the filter columns only if they are partitioned. If we allow non partition cols also participate in DPP, then the DPP query becomes expensive and is not worth. I am wondering if the DPP query is lightweight & approximate ( needing only the min/max values for non partition column as joining key) , would we see benefit.
@ahshahid Sorry I didnt get everything. Are you trying to ask, if we can use Iceberg metrics/metadata's min/max for DPP or Joins? May be if you can please share some example here?
thanks
@osscm Well what i meant was that if this PR is able to provide max min support at iceberg level using the stats ( atleast in simple scenarios), then it may be possible to leverage this for using Dynamic Partition Pruning (DPP) mechanism of spark to work for non partition columns too. Right now when I tried to make use of DPP for non partition col( by modifying iceberg code), the perf degraded as the cost of DPP query is too high. But if the max/min gets evlauted using stats of manifest files, then possibly cost of dpp query for non partition cols can be brough down..
@ahshahid I'm afraid not, I think you are talking about the the output of SHOW STATS FOR <tlb>
.
This PR will target queries like
SELECT count(*) FROM <tbl> WHERE part1=1
and subsequent PRs will be for min/max as well.
Right now stats output looks like this (after running ANALYZE
), not sure about some of the columns lower/higher values are coming as null.
show stats FOR sample_partitionedv2;
column_name | data_size | distinct_values_count | nulls_fraction | row_count | low_value | high_value
-------------+-----------+-----------------------+----------------+-----------+------------+------------
userid | NULL | 3.0 | 0.0 | NULL | -3 | -1
country | 332.0 | 2.0 | 0.0 | NULL | NULL | NULL
event_date | NULL | 2.0 | 0.0 | NULL | 2022-01-01 | 2022-11-01
city | 332.0 | 1.0 | 0.0 | NULL | NULL | NULL
Started a issue to target count(*)
first.
https://github.com/trinodb/trino/issues/15745
Quick update: Refining some of the work in the local, then will start WIP, as will continue to add test-cases etc.. Thanks for you first sub task of this BIG issue.
Anyway, I will post a rough implementation from my branch recently, for min/max/count
mixed queries, and hope it help us to hackle the possibilities and impossibilities of this IDEAR.
:)
Just FYI, I have started a PR for count agg pushdown.
Regards, Manish
On Fri, Feb 17, 2023 at 2:09 AM fengguangyuan @.***> wrote:
Quick update: Refining some of the work in the local, then will start WIP, as will continue to add test-cases etc.. Thanks for you first sub task of this BIG issue.
Anyway, I will post a rough implementation from my branch recently, for min/max/count mixed queries, and hope it help us to hackle the possibilities and impossibilities of this IDEAR. :)
— Reply to this email directly, view it on GitHub https://github.com/trinodb/trino/issues/10974#issuecomment-1434418638, or unsubscribe https://github.com/notifications/unsubscribe-auth/AXQ2PYQJQ4EEUHYYLC6PW3LWX5E4RANCNFSM5NZQ4W7A . You are receiving this because you were mentioned.Message ID: @.***>
Just FYI, I have started a PR for count agg pushdown. Regards, Manish … On Fri, Feb 17, 2023 at 2:09 AM fengguangyuan @.> wrote: Quick update: Refining some of the work in the local, then will start WIP, as will continue to add test-cases etc.. Thanks for you first sub task of this BIG issue. Anyway, I will post a rough implementation from my branch recently, for min/max/count mixed queries, and hope it help us to hackle the possibilities and impossibilities of this IDEAR. :) — Reply to this email directly, view it on GitHub <#10974 (comment)>, or unsubscribe https://github.com/notifications/unsubscribe-auth/AXQ2PYQJQ4EEUHYYLC6PW3LWX5E4RANCNFSM5NZQ4W7A . You are receiving this because you were mentioned.Message ID: @.>
Thanks, that's great. It's my pleasure to know that you have been working on count
. I will try to figure out the possibilities of other aggs.
@osscm What is the status of iceberg aggregate pushdown ? Are you still working on count pushdown ? Is other aggregate function like min/max is also being worked upon by someone ?
@atifiu there is a simpler proposal for count(*)
handling here: https://github.com/trinodb/trino/pull/19303
the min/max
has a potential of correctness issues due to bounds not being exact min/max values
Ok got it. I am aware about that issue. Thanks
FWIW https://github.com/trinodb/trino/pull/19303 should improve performance for count(*)
queries (without a filter, or when filtering over partition values).
Purpose
This issue is aimed to have a basic optimize rule for
min/max/count
queries on the connectors having accuratetable/partitions/columns statistics
, likeIceberg
composed of Orc/Parquet files.Reason
Nowadays, most storage engines or self-describing files are storing table level/partition level/column level statistics to supply a more effective ability of data retrieval, e.g. Iceberg, hive.
We know that Iceberg is now supporting
Orc/Parquet
files, of which table metrics are aggregated from each data file, therefore it's table metrics is trustworthy for calculatingmin(T)/max(T)/count(T)/count(*)
, no matter the stored data is written by Trino or Spark, hence we can manually construct the results, for the queries only with min/max/count aggregations, from metadata.For example, for query
select count(x) from test
, if column x has precomputed statistics with2 total rows, 0 null values and [0, 9] range
, the query could be rewritten toselect 2
, in which2
is the difference between total rows and nulls count.Conclusion
Trino should supply an optimize rule to rewrite the queries from metadata, doing the stuff like hive-2847. Obviously, this optimize rule is adaptive to the simple queries without complex syntax, such as
group by, distinct, join
etc.Now we had a basic implementation on this issue and tested on Iceberg connector (instead on all connectors considering the statistics maybe inaccurate), if Trino expect this improvement, please let me know, and it's my pleasure to make a PR.