spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.56k stars 513 forks source link

Side inputs for SMB transforms #3314

Closed kellen closed 3 years ago

kellen commented 4 years ago

I have a few use-cases where I am using SMB transforms and want to merge in some (small) external data, usually from some other SCollection. The way I can achieve this at present is to write out a sparkey or a distcache in some other job then load that manually in the via block.

Ideally I'd be able to merge in normal side inputs to a SMB transform without this additional job & code overhead & while still avoiding a shuffle penalty.

Something like:

    val mySideInput: SideInput[Map[String, String]] = ???
    sc
      .sortMergeTransform[String, MyEntity]( ... )
      .to( ... )
      .withSideInputs(mySideInput)
      .via { case (key, myEntity, outputCollector, ctx) =>
          val fromSI = ctx(mySideInput).get(myEntity.someField)
          // ...
      }
clairemcginty commented 4 years ago

Initially I thought this would be a really easy fix -- allow the user-supplied TransformFn to optionally take in a ProcessContext argument to retrieve a side input from. Unfortunately, since we rewrote this as a BoundedSource implementation instead of a plain PTransform, we no longer have access to ProcessContext and can't surface that. 🤦‍♀️

I wonder if Dataflow would still fuse transforms if we re-structured SortedBucketTransform to be a chained SortedBucketSource followed by a MapElements transform that applies the transformFn (with access to side inputs) and writes to temp file.... that could eliminate some code re-use if so, too. I can check.

clairemcginty commented 4 years ago

Ok, I did some investigation into this today. I couldn't find any way to extract the transformation logic into a PTransform without materializing the entire bucket contents into memory, which defeats the point of the SortedBucketTransform optimization. I was looking into how we could get side inputs into our BoundedSource impl, and we could wrap it in something like this:

begin
  .getPipeline()
-  .apply("SortedBucketSource", Read.from(boundedSource))
+  .apply("SortedBucketSource", new PTransform<PBegin, PCollection<MergedBucket>>() {
+    Iterable<PCollectionView<?>> sideInputs = ... // user-supplied side inputs
+
+     @Override
+     public PCollection<MergedBucket> expand(PBegin input) {
+        return input.apply(Read.from(boundedSource));
+     }
+     @Override
+     public Map<TupleTag<?>, PValue> getAdditionalInputs() {
+       return PCollectionViews.toAdditionalInputs(sideInputs);
+     }
+  })

This will load the side inputs in, but the actual materialization logic (going from PCollectionView<T> => T) is delegated to package private methods in each runner implementation.

so unfortunately I think sideInputs are out, we might be able to support something that allows the user to load an arbitrary "resource" (like DoFnWithResource does), maybe even a specific resource per bucket. But it would have to be fully materialized rather than any kind of PCollection/side input.

nevillelyh commented 4 years ago

Instead of side input, which requires access to custom Dofn functionalities, can we use or make it easy to use DistCache as an alternative? DistCache uses GCS to pass data to workers and lambda ser/de to pass URI to a DoFn and should just work ™ in the existing SortMergeTransform? We could even add helpers to materialize SCollection[T] into a DistCache (similar to sparkey) in the same job, just need to make sure that transform (e.g. asDistCache) is executed before the SMB one.

clairemcginty commented 4 years ago

that's a good idea. We need to make sure that it's possible to execute the asDistCache transform before the SMB one though... since Read.from(BoundedSource) creates a root-node transform. Maybe we could use Beam's Wait transform to achieve this.

kellen commented 4 years ago

A guaranteed-completed sparkey/distcache would also serve my purposes; mostly trying to avoid having a proliferation of jobs

clairemcginty commented 4 years ago

Cool. I think we can really easily add some built-in support for supplying an already-materialized DistCache to the TransformFn, in time for next release. But it looks like the other route (of materializing an SCollection[T] => DistCache[T], to be completed before the SMBTransform runs) is gonna be more complex -- using the Beam Wait transform won't work since any implementation of BoundedSource must be applied to PBegin. The Beam API designers did a good job of ensuring root transforms stay root transforms 😅 .

clairemcginty commented 4 years ago

@syodage , would you be interested in exploring this? I think the current consensus here is to just add an example of a SortedBucketTransform that utilizes a DistCache within the transformFn to https://github.com/spotify/scio/blob/master/scio-examples/src/main/scala/com/spotify/scio/examples/extra/SortMergeBucketExample.scala. @nevillelyh and I think this can be done without having to add anything to the Scio SMB APIs--we just want to have an example to point users to--but we haven't actually tried it out yet.

kellen commented 3 years ago

Closed by #3915