apache / datafusion-comet

Apache DataFusion Comet Spark Accelerator
https://datafusion.apache.org/comet
Apache License 2.0
823 stars 163 forks source link

feat: Implement bloom_filter_agg #987

Closed mbutrovich closed 1 month ago

mbutrovich commented 1 month ago

Which issue does this PR close?

Closes #846.

Rationale for this change

What changes are included in this PR?

How are these changes tested?

codecov-commenter commented 1 month ago

Codecov Report

Attention: Patch coverage is 76.47059% with 4 lines in your changes missing coverage. Please review.

Project coverage is 34.41%. Comparing base (c3023c5) to head (bf22902). Report is 19 commits behind head on main.

Files with missing lines Patch % Lines
.../scala/org/apache/comet/serde/QueryPlanSerde.scala 76.47% 2 Missing and 2 partials :warning:
Additional details and impacted files ```diff @@ Coverage Diff @@ ## main #987 +/- ## ============================================ + Coverage 34.03% 34.41% +0.38% - Complexity 875 889 +14 ============================================ Files 112 112 Lines 43289 43428 +139 Branches 9572 9627 +55 ============================================ + Hits 14734 14947 +213 + Misses 25521 25437 -84 - Partials 3034 3044 +10 ``` | [Flag](https://app.codecov.io/gh/apache/datafusion-comet/pull/987/flags?src=pr&el=flags&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | | |---|---|---| | [](https://app.codecov.io/gh/apache/datafusion-comet/pull/987/flags?src=pr&el=flag&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | `?` | | Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

mbutrovich commented 1 month ago

Results from the benchmark I just added:

BloomFilterAggregate Exec (cardinality 100):       Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (BloomFilterAgg)                         117            136          18         89.4          11.2       1.0X
SQL Parquet - Comet (Scan) (BloomFilterAgg)                  117            134          18         89.4          11.2       1.0X
SQL Parquet - Comet (Scan, Exec) (BloomFilterAgg)             71             78           9        148.3           6.7       1.7X

BloomFilterAggregate Exec (cardinality 1024):      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (BloomFilterAgg)                         111            128          11         94.7          10.6       1.0X
SQL Parquet - Comet (Scan) (BloomFilterAgg)                  110            135          15         95.7          10.4       1.0X
SQL Parquet - Comet (Scan, Exec) (BloomFilterAgg)             69             78          12        152.9           6.5       1.6X

BloomFilterAggregate Exec (cardinality 1048576):   Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
---------------------------------------------------------------------------------------------------------------------------------
SQL Parquet - Spark (BloomFilterAgg)                         165            183          14         63.6          15.7       1.0X
SQL Parquet - Comet (Scan) (BloomFilterAgg)                  169            184          11         62.0          16.1       1.0X
SQL Parquet - Comet (Scan, Exec) (BloomFilterAgg)            117            126           9         89.2          11.2       1.4X
alamb commented 1 month ago

Results from the benchmark I just added:

Looks like a 40% improvement.

image

mbutrovich commented 1 month ago

Just putting notes for the test failure. It's failing one Spark test in InjectRuntimeFilterSuite. The test is "Merge runtime bloom filters". It's failing in a check in CometArrayImporter when it's bringing Arrow data from Native back over to JVM.

The plan is a bit of a monster, but I'll provide it below:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [c1#45920, b1#45919], [c2#45926, b2#45925], Inner
   :- Sort [c1#45920 ASC NULLS FIRST, b1#45919 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(c1#45920, b1#45919, 5), ENSURE_REQUIREMENTS, [plan_id=739]
   :     +- Filter (((isnotnull(c1#45920) AND isnotnull(b1#45919)) AND might_contain(Subquery subquery#45946, [id=#610].bloomFilter, xxhash64(c1#45920, 42))) AND might_contain(Subquery subquery#45949, [id=#678].bloomFilter, xxhash64(b1#45919, 42)))
   :        :  :- Subquery subquery#45946, [id=#610]
   :        :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :        :  :     +- CometProject [mergedValue#45952], [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :  :        +- !CometHashAggregate [buf#45954, buf#45955], Final, [bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :  :           +- CometExchange SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=605]
   :        :  :              +- !CometHashAggregate [c2#45926, b2#45925], Partial, [partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :  :                 +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :  :                    +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :  :                       +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        :  +- Subquery subquery#45949, [id=#678]
   :        :     +- AdaptiveSparkPlan isFinalPlan=false
   :        :        +- CometProject [mergedValue#45952], [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :           +- !CometHashAggregate [buf#45954, buf#45955], Final, [bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :              +- CometExchange SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=673]
   :        :                 +- !CometHashAggregate [c2#45926, b2#45925], Partial, [partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)]
   :        :                    +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :                       +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :                          +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        +- CometScan parquet spark_catalog.default.bf1[a1#45918,b1#45919,c1#45920,d1#45921,e1#45922,f1#45923] Batched: true, DataFilters: [isnotnull(c1#45920), isnotnull(b1#45919)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/matt/git/spark/spark-warehouse/org.apache.spark.sql.Inject..., PartitionFilters: [], PushedFilters: [IsNotNull(c1), IsNotNull(b1)], ReadSchema: struct<a1:int,b1:int,c1:int,d1:int,e1:int,f1:int>
   +- CometSort [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], [c2#45926 ASC NULLS FIRST, b2#45925 ASC NULLS FIRST]
      +- CometExchange hashpartitioning(c2#45926, b2#45925, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=742]
         +- CometFilter [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
            +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>

This is what it looks like on the main branch:

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- SortMergeJoin [c1#45920, b1#45919], [c2#45926, b2#45925], Inner
   :- Sort [c1#45920 ASC NULLS FIRST, b1#45919 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(c1#45920, b1#45919, 5), ENSURE_REQUIREMENTS, [plan_id=729]
   :     +- Filter (((isnotnull(c1#45920) AND isnotnull(b1#45919)) AND might_contain(Subquery subquery#45946, [id=#605].bloomFilter, xxhash64(c1#45920, 42))) AND might_contain(Subquery subquery#45949, [id=#668].bloomFilter, xxhash64(b1#45919, 42)))
   :        :  :- Subquery subquery#45946, [id=#605]
   :        :  :  +- AdaptiveSparkPlan isFinalPlan=false
   :        :  :     +- Project [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :  :        +- ObjectHashAggregate(keys=[], functions=[bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :  :           +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=601]
   :        :  :              +- ObjectHashAggregate(keys=[], functions=[partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :  :                 +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :  :                    +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :  :                       +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/Users/matt/git/spark/spark-warehouse/org.apache.spark.sql.Inject..., PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        :  +- Subquery subquery#45949, [id=#668]
   :        :     +- AdaptiveSparkPlan isFinalPlan=false
   :        :        +- Project [named_struct(bloomFilter, bloomFilter#45945, bloomFilter, bloomFilter#45948) AS mergedValue#45952]
   :        :           +- ObjectHashAggregate(keys=[], functions=[bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :              +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=664]
   :        :                 +- ObjectHashAggregate(keys=[], functions=[partial_bloom_filter_agg(xxhash64(c2#45926, 42), 1000000, 8388608, 0, 0), partial_bloom_filter_agg(xxhash64(b2#45925, 42), 1000000, 8388608, 0, 0)])
   :        :                    +- CometProject [c2#45926, b2#45925], [c2#45926, b2#45925]
   :        :                       +- CometFilter [a2#45924, b2#45925, c2#45926], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
   :        :                          +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int>
   :        +- CometScan parquet spark_catalog.default.bf1[a1#45918,b1#45919,c1#45920,d1#45921,e1#45922,f1#45923] Batched: true, DataFilters: [isnotnull(c1#45920), isnotnull(b1#45919)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(c1), IsNotNull(b1)], ReadSchema: struct<a1:int,b1:int,c1:int,d1:int,e1:int,f1:int>
   +- CometSort [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], [c2#45926 ASC NULLS FIRST, b2#45925 ASC NULLS FIRST]
      +- CometExchange hashpartitioning(c2#45926, b2#45925, 5), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=732]
         +- CometFilter [a2#45924, b2#45925, c2#45926, d2#45927, e2#45928, f2#45929], (((isnotnull(a2#45924) AND (a2#45924 = 62)) AND isnotnull(c2#45926)) AND isnotnull(b2#45925))
            +- CometScan parquet spark_catalog.default.bf2[a2#45924,b2#45925,c2#45926,d2#45927,e2#45928,f2#45929] Batched: true, DataFilters: [isnotnull(a2#45924), (a2#45924 = 62), isnotnull(c2#45926), isnotnull(b2#45925)], Format: CometParquet, Location: ... PartitionFilters: [], PushedFilters: [IsNotNull(a2), EqualTo(a2,62), IsNotNull(c2), IsNotNull(b2)], ReadSchema: struct<a2:int,b2:int,c2:int,d2:int,e2:int,f2:int>
mbutrovich commented 1 month ago

Debugger output from the failing state in CometArrayImporter.

this = {CometArrayImporter@18451} 
snapshot = {ArrowArray$Snapshot@18448} 
 length = 1
 null_count = 0
 offset = 0
 n_buffers = 1
 n_children = 2
 buffers = 105553139081104
 children = 105553139081088
 dictionary = 0
 release = 6002611972
 private_data = 105553130135808
children = {long[2]@18449} [105553174820048, 105553174820208]
 0 = 105553174820048
 1 = 105553174820208
childVectors = {ArrayList@18450}  size = 1
 0 = {VarBinaryVector@22881} "[]"
vector = {StructVector@18452} "[]"
 reader = {NullableStructReaderImpl@22883} 
 writer = {NullableStructWriter@22884} "org.apache.comet.shaded.arrow.vector.complex.impl.NullableStructWriter@8a493b[index = 0]"
 validityBuffer = {ArrowBuf@22885} "ArrowBuf[1], address:0, capacity:0"
 validityAllocationSizeInBytes = 497
 NonNullableStructVector.reader = {SingleStructReaderImpl@22886} 
 field = {Field@22887} "Struct not null"
 valueCount = 0
 ephPair = null
 vectors = {PromotableMultiMapWithOrdinal@22888} 
 allowConflictPolicyChanges = true
 conflictPolicy = {AbstractStructVector$ConflictPolicy@22889} "CONFLICT_REPLACE"
 name = null
 allocator = {RootAllocator@18453} "Allocator(ROOT) 0/2176/4196904/9223372036854775807 (res/actual/peak/limit)\n"
 callBack = null
children.length = 2

The entire subquery runs in native code now, so my guess is that the output from that projection, which looks like it should be a struct with two binary values in it, is wrong. I'm not sure if it's a bug in the projection, or something further downstream.

viirya commented 1 month ago

I do not have time to look at this error yet. I may take a look after the conference.

mbutrovich commented 1 month ago

Can't say I see a huge different in TPC-H or TPC-DS locally, but the plans I looked at were typically building filters over very small relations.

kazuyukitanimura commented 1 month ago

@mbutrovich Is it possible to trace back where children and childVectors are populated?

viirya commented 1 month ago

The Spark SQL test failure can be fixed by #1016.

viirya commented 1 month ago

I merged the fix. You can rebase and re-trigger CI now.

mbutrovich commented 1 month ago

Merged in updated main, thanks for the fix!

andygrove commented 1 month ago

I tested with TPC-H q5 and see that we are now running the bloom filter agg natively