spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.56k stars 513 forks source link

Improve data colocation support for Parquet writes #5466

Open clairemcginty opened 2 months ago

clairemcginty commented 2 months ago

Parquet files benefit greatly when similar data values are colocated in the same page, row group, or file:

This is very hard to do in Scio, or in distributed data processing engines in general, because the data is by default parallelized and unordered. The closest we have right now is SMB, where you can group and sort by up to 2 columns.

However, for non-SMB use cases, we should be able to leverage Beam's ShardingFunction to colocate data efficiently. We could offer a custom implementation of ShardingFunction that could assign shard # based on a hash of user-specified column(s), for example:

case class User(userId: String, date: DateTime, age: Int)

val data: SCollection[User] = ...
data.saveAsTypedParquetFile(
  path,
  shardBy = ShardBy[User](numShards = 1024, columns = Set(_.userId, _.age))
)

class ShardBy[T](numShards: Int, columns: Set[FilteringColumn]) extends ShardingFunction[T] { ... }

(...basically a low-powered SMB that doesn't cost much on the write side.)

We should also look into z-ordering, which would incur more penalty on write performance but potentially unlock even greater downstream performance gains.