delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.49k stars 1.68k forks source link

[PROTOCOL RFC] Specify which table properties will control writing of Apache Parquet Bloom filters #2751

Open jkylling opened 6 months ago

jkylling commented 6 months ago

Protocol Change Request

Description of the protocol change

In https://github.com/delta-io/delta/issues/1347#issuecomment-1246214693 it is suggested that Bloom filters can be supported through Apache Parquet Bloom filters. Given that we want to support Bloom filters through Apache Parquet bloom filters, we would want to have persistent Bloom filter configuration on the Delta table through some table properties on the Delta table. What would be the table properties to set to instruct Delta writers to write Parquet files with Bloom filters for certain columns?

For inspiration, Iceberg has the table properties to configure writing of Bloom filters using

write.parquet.bloom-filter-enabled.column.<a-column-name>
write.parquet.bloom-filter-max-bytes

https://github.com/apache/iceberg/blob/732fbfd516a3dfb2028fd6795f8f564f70e44742/core/src/main/java/org/apache/iceberg/TableProperties.java#L166-L171 while using

write.orc.bloom.filter.columns
write.orc.bloom.filter.fpp

for configuring Bloom filters on ORC files.

Parquet-mr uses the per column configuration:

parquet.bloom.filter.enabled
parquet.bloom.filter.expected.ndv
parquet.bloom.filter.max.bytes
parquet.bloom.filter.fpp

which are used as

parquet.bloom.filter.enabled#column=true
parquet.bloom.filter.expected.ndv#column=100000
parquet.bloom.filter.max.bytes#column=1024000
parquet.bloom.filter.fpp#column=0.99

https://github.com/apache/parquet-mr/blob/20d43639b5a380335953742ad6c9b3dd98e09f29/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java#L152-L155

These can be enabled on Apache Spark when writing using parquet-mr with

df.write.mode("overwrite")
  .option("parquet.bloom.filter.enabled#column", "true")
  .option("parquet.bloom.filter.expected.ndv#column", "1000000")
  .parquet("output")

Adopting either the approach of Iceberg or parquet-mr would both be good approaches. Using the same properties as Iceberg might enable further compatibility between the formats.

When a column mapping mode is set, we require that the the column names of the table property are resolved using algorithm described in https://github.com/delta-io/delta/blob/c046547b5721ee096e5c0beb04bd9b2059021630/PROTOCOL.md#reader-requirements-for-column-mapping .

Willingness to contribute

The Delta Lake Community encourages protocol innovations. Would you or another member of your organization be willing to contribute this feature to the Delta Lake code base?

findinpath commented 3 months ago

@tdas trying to revive here this feature request to help out the community user @jkylling to speed up some of their ad-hoc queries with Bloom Filters.

This seems a straightforward addition. Can you pls prioritize this or refuse it if it doesn't make sense in the Delta protocol?

jkylling commented 2 months ago

@dennyglee could this be assigned a reviewer please? 🙏