mjakubowski84 / parquet4s

Read and write Parquet in Scala. Use Scala classes as schema. No need to start a cluster.
https://mjakubowski84.github.io/parquet4s/
MIT License
283 stars 65 forks source link

Update akka/pekko partition path generation to avoid unnecessary object creation #335

Closed sndnv closed 9 months ago

sndnv commented 10 months ago

Updates the akka/pekko partitioned path generation to make it more efficient as creating Hadoop paths appears to be somewhat expensive.

As part of this change, the akka/pekko benchmark was updated to have an extra test for larger records with more partitioning columns. I did some benchmarks before and after adjusting the tests, and with an extra implementation, just to see how the performance changes (if at all):

The benchmarks were run on a 2019 MacBook Pro with a 2,4 GHz 8-Core Intel Core i9, 64 GB of memory and the following reported by JMH:

[info] # JMH version: 1.37
[info] # VM version: JDK 17.0.8, OpenJDK 64-Bit Server VM, 17.0.8+7
[info] # VM invoker: /Users/<user>/.sdkman/candidates/java/17.0.8-tem/bin/java
[info] # VM options: <none>
[info] # Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
[info] # Warmup: 5 iterations, 1 s each
[info] # Measurement: 12 iterations, 1 s each
[info] # Timeout: 10 min per iteration
[info] # Threads: 1 thread, will synchronize iterations
[info] # Benchmark mode: Average time, time/op

Original Code

    private def partition(record: RowParquetRecord): (Path, RowParquetRecord) =
      partitionBy.foldLeft(basePath -> record) { case ((currentPath, currentRecord), partitionPath) =>
        currentRecord.removed(partitionPath) match {
          case (Some(BinaryValue(binary)), modifiedRecord) =>
            Path(currentPath, s"$partitionPath=${binary.toStringUsingUTF8}") -> modifiedRecord
          case (None, _) =>
            throw new IllegalArgumentException(s"Field '$partitionPath' does not exist.")
          case (Some(NullValue), _) =>
            throw new IllegalArgumentException(s"Field '$partitionPath' is null.")
          case _ =>
            throw new IllegalArgumentException(s"Non-string field '$partitionPath' used for partitioning.")
        }
      }
>>> Original Benchmark
    [info] Benchmark                            (datasetSize)  Mode  Cnt     Score     Error  Units
    [info] AkkaPekkoBenchmark.read                     524288  avgt   60   494,071 ±  16,565  ms/op
    [info] AkkaPekkoBenchmark.write                    524288  avgt   60   456,750 ±  54,906  ms/op
    [info] AkkaPekkoBenchmark.writePartitioned         524288  avgt   60  2499,189 ± 104,748  ms/op
    -----

>>> Updated Benchmark
    [info] Benchmark                                       (datasetSize)  Mode  Cnt      Score     Error  Units
    [info] AkkaPekkoBenchmark.read                                524288  avgt   60    504,247 ±  43,806  ms/op
    [info] AkkaPekkoBenchmark.write                               524288  avgt   60    405,576 ±   4,822  ms/op
    [info] AkkaPekkoBenchmark.writePartitioned                    524288  avgt   60   2330,558 ±  21,177  ms/op
    [info] AkkaPekkoBenchmark.writePartitionedLargeRecord         524288  avgt   60  19430,961 ± 703,004  ms/op
    -----

Updated version (with string interpolation)

This version keeps most of the existing code, only moving the final string/path building to the end:

    private def partition(record: RowParquetRecord): (Path, RowParquetRecord) =
      if (partitionBy.nonEmpty) {
        val (pathParts, updatedRecord) = partitionBy.foldLeft(Queue.empty[String] -> record) {
          case ((collected, currentRecord), partitionPath) =>
            currentRecord.removed(partitionPath) match {
              case (Some(BinaryValue(binary)), modifiedRecord) =>
                (collected :+ s"$partitionPath=${binary.toStringUsingUTF8}") -> modifiedRecord
              case (None, _) =>
                throw new IllegalArgumentException(s"Field '$partitionPath' does not exist.")
              case (Some(NullValue), _) =>
                throw new IllegalArgumentException(s"Field '$partitionPath' is null.")
              case _ =>
                throw new IllegalArgumentException(s"Non-string field '$partitionPath' used for partitioning.")
            }
        }

        Path(basePath, pathParts.mkString(Path.Separator)) -> updatedRecord
      } else {
        basePath -> record
      }
>>> Original Benchmark
    [info] Benchmark                            (datasetSize)  Mode  Cnt     Score     Error  Units
    [info] AkkaPekkoBenchmark.read                     524288  avgt   60   543,831 ±  54,112  ms/op
    [info] AkkaPekkoBenchmark.write                    524288  avgt   60   397,764 ±   3,064  ms/op
    [info] AkkaPekkoBenchmark.writePartitioned         524288  avgt   60  2512,997 ± 102,813  ms/op
    -----

>>> Adjusted Benchmark
    [info] Benchmark                                       (datasetSize)  Mode  Cnt      Score     Error  Units
    [info] AkkaPekkoBenchmark.read                                524288  avgt   60    502,459 ±  11,891  ms/op
    [info] AkkaPekkoBenchmark.write                               524288  avgt   60    421,095 ±  10,046  ms/op
    [info] AkkaPekkoBenchmark.writePartitioned                    524288  avgt   60   2530,084 ±  66,342  ms/op
    [info] AkkaPekkoBenchmark.writePartitionedLargeRecord         524288  avgt   60  12262,918 ± 459,912  ms/op
    -----

Updated version (with StringBuilder)

This is the version in the PR; uses StringBuilder instead of interpolation in an attempt to be most performant:

    private def partition(record: RowParquetRecord): (Path, RowParquetRecord) =
      if (partitionBy.nonEmpty) {
        val (pathParts, updatedRecord) = partitionBy.foldLeft(Queue.empty[(ColumnPath, Binary)] -> record) {
          case ((collected, currentRecord), partitionPath) =>
            currentRecord.removed(partitionPath) match {
              case (Some(BinaryValue(binary)), modifiedRecord) =>
                (collected :+ (partitionPath, binary)) -> modifiedRecord
              case (None, _) =>
                throw new IllegalArgumentException(s"Field '$partitionPath' does not exist.")
              case (Some(NullValue), _) =>
                throw new IllegalArgumentException(s"Field '$partitionPath' is null.")
              case _ =>
                throw new IllegalArgumentException(s"Non-string field '$partitionPath' used for partitioning.")
            }
        }

        val builder = new StringBuilder()
        pathParts.foreach { case (partitionPath, binary) =>
          builder.append(partitionPath.toString)
          builder.append("=")
          builder.append(binary.toStringUsingUTF8)
          builder.append(Path.Separator)
        }
        builder.setLength(builder.length - 1) // removes the trailing separator

        Path(basePath, builder.toString()) -> updatedRecord
      } else {
        basePath -> record
      }
>>> Original Benchmark
    ## StringBuilder optimization
    [info] Benchmark                            (datasetSize)  Mode  Cnt     Score    Error  Units
    [info] AkkaPekkoBenchmark.read                     524288  avgt   60   542,138 ±  7,840  ms/op
    [info] AkkaPekkoBenchmark.write                    524288  avgt   60   455,340 ± 23,143  ms/op
    [info] AkkaPekkoBenchmark.writePartitioned         524288  avgt   60  2255,743 ± 23,189  ms/op
    -----

>>> Adjusted Benchmark
    [info] Benchmark                                       (datasetSize)  Mode  Cnt     Score     Error  Units
    [info] AkkaPekkoBenchmark.read                                524288  avgt   60   488,982 ±  10,304  ms/op
    [info] AkkaPekkoBenchmark.write                               524288  avgt   60   406,276 ±   4,301  ms/op
    [info] AkkaPekkoBenchmark.writePartitioned                    524288  avgt   60  2286,137 ±  20,466  ms/op
    [info] AkkaPekkoBenchmark.writePartitionedLargeRecord         524288  avgt   60 10679,075 ±  75,044  ms/op
    -----

Overall, the largest impact comes from reducing the number of paths being created; the StringBuilder and string interpolation implementation are very close (mkString uses StringBuilder underneath, so it makes sense that the performance is similar) but in the end it looks like using StringBuilder directly is the fastest.

Let me know what you think or if I missed something.

Fixes #332