apache / parquet-java

Apache Parquet Java
https://parquet.apache.org/
Apache License 2.0
2.63k stars 1.41k forks source link

Refactor the configuration for bloom filters #2464

Closed asfimport closed 4 years ago

asfimport commented 4 years ago

Refactor the hadoop configuration for bloom filters according to PARQUET-1784.

Reporter: Gabor Szadovszky / @gszadovszky Assignee: Gabor Szadovszky / @gszadovszky

Related issues:

Note: This issue was originally created as PARQUET-1805. Please see the migration documentation for further details.

asfimport commented 3 years ago

Yuming Wang / @wangyum: It seems that the previous configuration is better, enabling bloom filter seriously affects the writing performance:


val numRows = 1024 * 1024 * 15
val df = spark.range(numRows).selectExpr(
  "id",
  "cast(id as string) as s",
  "cast(id as timestamp) as ts",
  "cast(cast(id as timestamp) as date) as td",
  "cast(id as decimal) as dec")
val benchmark = new org.apache.spark.benchmark.Benchmark(
  "Benchmark bloom filter write",
  numRows,
  minNumIters = 5)
Seq(false, true).foreach { pushDownEnabled =>
  val name = s"Write parquet ${if (pushDownEnabled) s"(bloom filter)" else ""}"
  benchmark.addCase(name) { _ =>
    withSQLConf(org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED -> s"$pushDownEnabled") {
      df.write.mode("overwrite").parquet("/tmp/spark/parquet")
    }
  }
}
benchmark.run()

Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Benchmark bloom filter write:             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Write parquet                                      5531           6001         503          2.8         351.6       1.0X
Write parquet (bloom filter)                      10529          11633        1113          1.5         669.4       0.5X
asfimport commented 3 years ago

Gabor Szadovszky / @gszadovszky: @wangyum, I think this performance issue is not related to this jira but the whole bloom filter feature (PARQUET-41). If you turn on the writing of the bloom filters for all the columns it will impact writing performance. (You may check the related configuration parameters at https://github.com/apache/parquet-mr/tree/master/parquet-hadoop for details.)

I am not an expert of this feature and maybe we can improve the writing performance but generating bloom filters will have performance impact. It is up to the user to decide if this impact worth for the potential benefit at read time. That's why it is highly suggested to specify which exact columns are the bloom filters required for and also to specify the other parameters for bloom filter.

@chenjunjiedada, any comments on this?

asfimport commented 3 years ago

Junjie Chen / @chenjunjiedada: I think what @wangyum concern is we enable all columns' bloom filter when parquet.bloom.filter.enabled is set to true. That behaviour is a bit odd consider if we have a table with a heap of columns. We could change to use parquet.bloom.filter.enabled#column.path to enable the bloom filter for the specific column after setting parquet.bloom.filter.enabled.

asfimport commented 3 years ago

Gabor Szadovszky / @gszadovszky: Oh, I got it, thanks @chenjunjiedada. I've felt it was more logical this way. The "major" configuration is for all columns and the "column specific" one is to configure otherwise. Since the "major" one is false by default you only need to enable the bloom filters for the columns one-by-one. You don't even need to set parquet.bloom.filter.enabled but the columns specific ones only. We've tried to describe this in the README.

asfimport commented 3 years ago

Yuming Wang / @wangyum: Thank you @gszadovszky @chenjunjiedada  This is what I want:


set parquet.bloom.filter.enabled=false;
set parquet.bloom.filter.enabled#ts=true;
set parquet.bloom.filter.enabled#dec=true;

Benchmark and benchmark result:


val numRows = 1024 * 1024 * 15
val df = spark.range(numRows).selectExpr(
  "id",
  "cast(id as string) as s",
  "cast(id as timestamp) as ts",
  "cast(cast(id as timestamp) as date) as td",
  "cast(id as decimal) as dec")
val benchmark = new org.apache.spark.benchmark.Benchmark(
  "Benchmark bloom filter write",
  numRows,
  minNumIters = 5)

benchmark.addCase("default") { _ =>
  withSQLConf() {
    df.write.mode("overwrite").parquet("/tmp/spark/parquet")
  }
}

benchmark.addCase("Build bloom filter for ts column") { _ =>
  withSQLConf(
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED -> "false",
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#ts" -> "true") {
    df.write.mode("overwrite").parquet("/tmp/spark/parquet")
  }
}

benchmark.addCase("Build bloom filter for ts and dec column") { _ =>
  withSQLConf(
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED -> "false",
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#ts" -> "true",
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED + "#dec" -> "true") {
    df.write.mode("overwrite").parquet("/tmp/spark/parquet")
  }
}

benchmark.addCase("Build bloom filter for all column") { _ =>
  withSQLConf(
    org.apache.parquet.hadoop.ParquetOutputFormat.BLOOM_FILTER_ENABLED -> "true") {
    df.write.mode("overwrite").parquet("/tmp/spark/parquet")
  }
}
benchmark.run()

Java HotSpot(TM) 64-Bit Server VM 1.8.0_251-b08 on Mac OS X 10.15.7
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
Benchmark bloom filter write:             Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
default                                            5207           5314          72          3.0         331.1       1.0X
Build bloom filter for ts column                   5808           6065         245          2.7         369.2       0.9X
Build bloom filter for ts and dec column           6685           6776          79          2.4         425.0       0.8X
Build bloom filter for all column                  9077           9889         629          1.7         577.1       0.6X

cc @dongjoon-hyun

asfimport commented 3 years ago

Gabor Szadovszky / @gszadovszky: @wangyum, sorry but I don't get what the problem is here. Could you please describe your problem in more details? (If you think it is really a bug then please create a separate jira?)

asfimport commented 3 years ago

Yuming Wang / @wangyum: Thank you @gszadovszky   No issue for now.