spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.55k stars 513 forks source link

Job stats estimation and join strategy? #1996

Open nevillelyh opened 5 years ago

nevillelyh commented 5 years ago

This is a broad discussion of job statistics estimation and join strategy inspired by Scalding's estimation API.

cc @anish749 @ClaireMcGinty

We want to estimate basic stats, e.g. size, # of items, key distribution, of an SCollection[T] in order to do automatic optimizations like best join strategy, etc.

There are probably 2 types of estimations, A over an arbitrary SCollection[T] at any stage of the graph, and B only at input stage, associated with a source (Avro, BQ, etc.) and a few map/filter steps, right before a join.

A is more flexible since it allows us to estimate stats of SCollection[T]s that are derived after some complex logic, e.g. after *ByKey. The down side is that there's no easy way to estimate until we run the pipeline and store historical results in some storage/service.

  1. Store the underlying PCollection[T] name at graph construction, and retrieve statistics from Dataflow service after pipeline completion. This is easy but probably not runner agnostic.
  2. Add extra steps to sample elements from the SCollection[T], estimate stats by encoding elements to Array[Byte], summing to a singleton SCollection[Estimation] and save as text file (or in a history service). As sketched in 3ca150e234a30a2a1c4df2623522a0ec68dcc414. This is runner agnostic but complex.

B doesn't require historical since we can infer stats from the source directly at graph construction time. It'll probably look like this:

  1. A EstSource[T] that wraps an existing source that we can estimate stats with, e.g. Avro (file size), BQ (table size, # of rows).

    trait EstSCollection[T] {
    // compute stats at graph construction time, and produce corresponding SCollection
    // after applying delayed map/flatMap/filter transforms
    def materialize(sc: ScioContext): (SCollection[T], Estimation)
    
    def size: Long // estimate size at graph construction time
    
    // sample items at graph construction time, e.g. by using Beam `FileSystems` API
    // with raw Avro reader or raw BigQuery client.
    def sample: Seq[T] 
    
    // delayed transform on the underlying collection, scale `size` accordingly by
    // sampling, i.e. by computing the ratio between input & output size from encoded
    // bytes
    def map[U](f: T => U): EstSCollection[U]
    def flatMap[U](f: T => TraversableOnce[U]): EstSCollection[U]
    def filter(p: T => Boolean): EstSCollection[T]
    }
  2. Add methods like EstSCollection[(K, V)]#join(that: EstSCollection[(K, W)])(strategy: JoinStrategy) that materializes LHS/RHS into SCollections and feed stats into strategy to determined the best join approach.
  3. This could potentially work with the ongoing SMB #1346 work as well, if we define SMB source as EstSource[(K, T)] which encapsulates key extraction (part of source metadata), and the strategy will just be delegating to Java SmbSource.

The down side is that not all stats can be computed for all sources, for example:

clairemcginty commented 5 years ago

I think a third option could be to create an Estimation trait (in the style of EstSCollection) wrapping around the transforms themselves, with specific implementations for different kinds of transforms like map/filter/joins. That way we can do custom analyses depending on the operation; for example, hotspot analysis for joins or average record size for .avroFile[T](...) read transforms. However, I think the implementation would get pretty complex since Scio doesn't have a unified trait that all transforms implement (i.e. we wrap PCollection in SCollection but we don't have a similar STransform-style wrapper). So it would be a lot of custom logic cluttering every transform site which might be more trouble than it's worth.

As for use case, I could see job profiling being run ad-hoc at development time by passing in some kind of --profileJob flag to ScioOptions...not sure if that conflicts with what you had in mind?

nevillelyh commented 5 years ago

Agree that a new STransform might be too complex. The goal of this is to allow power users to dynamically produce job graph for certain tricky transforms like join based on estimates and not fully automate all jobs & transforms.

Profiling arbitrary transform should be done via runner provided stats? DF already has # elements & size, and potentially key distribution once they open it up for public.

anish749 commented 5 years ago

If we are using ScioOptions will it not become runner (Dataflow) specific, unless we have an job profiling API in Beam?

Runners have access to a Beam API which allows us to specify PTransformOverride. Choosing an implementation of a particular transform, is usually at the optimization layer in SQL, so we might think about using the Runner APIs to change the job graph and add some PTransform overrides before submitting to the runner (very hacky). Beam has internal / private APIs which allows runners to traverseTopologically a Pipeline and then we can have our own matchers which can replace transforms with something other transform. Even though these are internal APIs these are used by Runners and these have been very stable. (eg: traverseTopologically has been stable for the last 4 years since Beam was released)

A complete Scio implementation of the above would however be more difficult, I believe, and would require having a layer where we traverse the whole graph and decide how to change things before creating a Beam Graph using PTransforms. -> Which basically means having SCollection as an entity instead of a wrapper around PCollection

Throwing around another idea, similar to @ClaireMcGinty's: We can think of Estimator as a type class parameterized on PTransform. Something in the lines of Estimator[T <: PTransform]. And have implicit Estimators for each PTransform. This would need joins to be written as PTransforms and then we can have Estimator[GBK] and Estimator[InnerJoin], and Estimator[ParDo]. This way we wouldn't need a common interface for all transforms or another wrapper over PTransform.

anish749 commented 5 years ago

@nevillelyh @ClaireMcGinty

I had a (wild) thought around this.. more like a quick brain dump at this stage, but here is how I am thinking this:

Requirements at pipeline construction time:

Given a transform T with Inputs I1 to In and Outputs O1 to On,

Given the above knowledge, figure out a strategy and accordingly execute the PTransform

Thoughts around number 2

The first one is relatively simple, we can have the data profiles stored externally and use something like this to find the data profile. However one major problem is identifying the SCollection

Is the profile still relevant. This depends on how we identify the SCollection. One way is by using the name as here The drawback to this is that it doesn't answer the relevancy of the profile. It also would fail to identify an SCollection if the transform name changed, or if there is a name collision.

Let us think of something like a fingerprint (not hashCode) for PCollections and PTransforms. A fingerprint of a Beam's Transform uniquely identifies the lineage of the creation of the PCollection. Each fingerprint is then uniquely associated with a data profile, on a given date (partition / job run id).

Fingerprinting: Lets define a merkle tree (or a hash tree) for each PCollection with the current PCollection at the root and its parent PTransform which produced it as the first child from the merkle tree's root. Each of the PTransform's inputs forms children of the first child node in the merkle tree, and so on, leading to the PInput as a leaf.

The hash (for the merkle tree) of each PTransform is calculated based on the DoFn it encloses (with no relation to data). Hence, if the code changes, the hash changes. If a blank line was added in the user's code which doesn't change the enclosed function, the hash doesn't change, because the SerializableFunction it enclosed didn't change. If the closure in which the function was defined changes, the function has changed and the hash changes.

After calculating the root of the merkle tree, it should be relative easy to figure out, which Transform has changed, and which data profile should be invalidated.

This also helps in mapping PCollection across pipelines. When writing a pipeline, we write this fingerprint alongside data. This is more like a checksum / md5, of the pipeline producing it instead of the data. A ScioIO[T] reading this data now can know the fingerprint of the PInput (a leaf node in the next pipeline). Once we apply a ParDo on this, it calculates is fingerprint of the output PTransform as a merkle tree. This means if there was a code change in an upstream pipeline, we identify all downstream data profiles are invalidated, because the fingerprints changed.

Fingerprint to Data Profile mapping This draws from Beam's expansion service. This can be an external service running in the cloud which tells Strategies for expanding a Transform. I think we can have something similar which can use the fingerprint to map and find an appropriate Data Profile and then expand accordingly.

nevillelyh commented 5 years ago

Beam SQL's BeamSqlTable interface actually has table statistics notion, so maybe we can leverage that. Then again graph optimizations are probably easier in SQL than scala code.

https://github.com/apache/beam/blob/master/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamTableStatistics.java

nevillelyh commented 4 years ago

After discussion IRL. This should be handled in Beam SQL. Too much complexity adding it to the Scala layer. Will leave this open as a reminder.