apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.15k stars 2.14k forks source link

Parquet bloom filter doesn't work with nested fields #9898

Open hussein-awala opened 6 months ago

hussein-awala commented 6 months ago

Apache Iceberg version

1.4.3 (latest release)

Query engine

Spark

Please describe the bug 🐞

I have an Iceberg table, and I want to create two bloom filters on a root string column and nested string column in a struct, I've set the properties write.parquet.bloom-filter-enabled.column.a and write.parquet.bloom-filter-enabled.column.b.c to true, and I checked with parquet-cli:

$ parquet bloom-filter /path/to/file.parquet -c a -v <not existing value>

Row group 0:
--------------------------------------------------------------------------------
value <not existing value> NOT exists.

$ parquet bloom-filter /path/to/file.parquet -c a -v <existing value>

Row group 0:
--------------------------------------------------------------------------------
value <existing value> maybe exists.

$ parquet bloom-filter /path/to/file.parquet -c b.c -v <some value>

Row group 0:
--------------------------------------------------------------------------------
column b.c has no bloom filter

# check if it's an issue with column name parsing:
$ parquet bloom-filter /path/to/file.parquet -c b.d -v <some value>
Argument error: Schema doesn't have column: b.d

However, I tried with Spark and parquet, and it worker without any issue:

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
import spark.implicits._

val schema = StructType(Array(
    StructField("a", StringType, true),
    StructField("b", StringType, true),
    StructField("nested", StructType(Array(
      StructField("c", StringType, true),
      StructField("d", StringType, true)
    )), true)
))

val data = Seq(
    Row("1", "25", Row("100", "a")),
    Row("2", "30", Row("200", "b")),
    Row("3", "35", Row("300", "c")),
    Row("4", "40", Row("400", "d")),
    Row("5", "45", Row("500", "e"))
)

val df = spark.createDataFrame(
    spark.sparkContext.parallelize(data),
    schema
)

df.write.format("parquet")
    .option("parquet.bloom.filter.enabled#a", "true")
    .option("parquet.bloom.filter.enabled#nested.c", "true")
    .save("bloom_parquet")

Check with parquet-cli

$  github parquet bloom-filter bloom_parquet/part-00002-9fac4c38-7113-45df-8db9-d96c3f6b6a8e-c000.snappy.parquet -c a -v "1"

Row group 0:
--------------------------------------------------------------------------------
value 1 maybe exists.

$  github parquet bloom-filter bloom_parquet/part-00002-9fac4c38-7113-45df-8db9-d96c3f6b6a8e-c000.snappy.parquet -c nested.c -v "1"

Row group 0:
--------------------------------------------------------------------------------
value 1 NOT exists.
amogh-jahagirdar commented 6 months ago

Thanks for reporting @hussein-awala I'm taking a look, I forgot if there's some limitation preventing us from supporting bloom filters on nested fields. At least we can see that there is not a limitation on the parquet side by the test you did.

hussein-awala commented 6 months ago

@amogh-jahagirdar I created https://github.com/apache/iceberg/pull/9902 to test if the bloom filters are added to the files, and they seem to be added for the nested field.

I will try to create my table with different catalogs to check if it's related to the catalog. Also, it could be related to how the data are written, where these tests use the FileAppender directly, I will try to use Spark API in these tests to write the data.

amogh-jahagirdar commented 6 months ago

Sounds great! I'll take a look at the PR.

I spent some time debugging this today, and added a test in the same class to try and repro and I saw the same thing. In ColumnWriteStoreBase we are actually initializing the parquet BloomFilterWriter with a valid bloomfilter for the nested type. So the table property for nested types is being passed through correctly.

As you said maybe the Spark APIs goes through a different path which ends up somehow losing the configuration. I doubt the catalog changes anything since this is more about the write path but feel free to go ahead and try it out.

But this is promising in the sense that we have already support this (our Parquet dependency supports it etc), we just need to identify why in the Spark API (or whatever mechanism used in the issue description) does not write the bloom filter for nested types.

amogh-jahagirdar commented 6 months ago

Also curious which Spark version are you using? I just tested via Spark 3.4 and Spark 3.5, and bloom filters for nested type appears to be written out based on the parquet-cli output (just a struct with a single integer field).

hussein-awala commented 6 months ago

I use Spark 3.5, Iceberg 1.4.3, and Glue Catalog.

and bloom filters for nested type appear to be written out

Interesting, I will retest it on Monday morning.

hussein-awala commented 6 months ago

@amogh-jahagirdar Today I found out that I had this issue on a single table, I tried with nested and root fields, with single and multiple bloom filters, and none of them worked. This table contains a large number of columns (over 100 columns), I don't know yet if this is related to the issue. I will continue my investigation and update the issue once I find its source.

I think #9902 is ready to merge.

cccs-jory commented 4 months ago

Hello @hussein-awala , if you're testing with a relatively small table with a small number of distinct values, Spark may be using dictionary encoding for the values. We have discovered in our testing that if Spark is able to dictionary encode the values in the parquet file, it will not write the bloom filter (which is by design).

arthurpassos commented 2 months ago

@hussein-awala which tool are you using to inspect the parquet files? You mentioned parquet-cli, but a google search leads to https://github.com/chhantyal/parquet-cli and/or https://pypi.org/project/parquet-cli/, which does not seem to offer the same API.

Plus, are you guys aware of any document that describes bloom filter support for Map/Struct type?

It is just a question, I am not in the issue loop.