spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.55k stars 513 forks source link

Support for withTemplateCompatibility #1426

Open gianmoretti opened 5 years ago

gianmoretti commented 5 years ago

When is used a bigquery input method (e.g. toTable), it would be nice to have the possibility to set the template compatibility to typeRead to be able to use the template feature and launch them more times on Gcp.

gianmoretti commented 5 years ago

When is used a bigquery input method (e.g. toTable), it would be nice to have the possibility to set the template compatibility to typeRead to be able to use the template feature and launch them more times on Gcp.

We built a pipeline as shown below:


 def main(cmdlineArgs: Array[String]): Unit = {
  val (sc, args) = ContextAndArgs(cmdlineArgs)
  sc
      .typedBigQuery[SearchTerm]("some sql query...")
      .someComputation
      .saveToBigQuery("an output table")

  sc.close()
 }

We deployed the pipeline as a template to GCP by the following command:

./target/pack/bin/my-pipeline \
    --inputTable=an-input-table \
    --outputTable=an-output-table \
    --project=my-project \
    --maxNumWorkers=1000 \
    --runner=DataflowRunner \
    --stagingLocation=gs://my-project/some-staging-location \
    --templateLocation=gs://my-project/some-template-location/my-template

The first time we launch the template (by Google Dashboard or Google Dataflow REST API) all went fine. The second time it went in error because of this exception:

Request failed with code 409, performed 0 retries due to IOExceptions, performed 0 retries due to unsuccessful status codes, HTTP framework says request can be retried, (caller responsible for retrying): https://www.googleapis.com/bigquery/v2/projects/my-project/jobs

Looking for similar problems in Internet, we found the problem seem to be related to the option ".withTemplateCompatibility()" (see Beam's documentation: https://beam.apache.org/documentation/sdks/javadoc/2.3.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.html under "Usage with templates"). The API doc says when you are using a template and "read()" or "readTablesRows()", it must be use the template compatibility.

In the Scio code the template compatibility is always set to false.

After applying the following patch to the Scio library (v.0.6.1), the problem was resolved:

Class "scio/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala" from row #694:

def typedBigQuery[T <: HasAnnotation : ClassTag : TypeTag](newSource: String = null)
  : SCollection[T] = {
    val bqt = BigQueryType[T]
    //Added forcefully withTemplateCompatibility
    val typedRead = avroBigQueryRead[T].withTemplateCompatibility()
    if (newSource == null) {
    [...]

Is it possible to modify the Scio library in order to support this feature?

brodin commented 5 years ago

Is this worked on? :) Would be super value-able to have – or is there some workaround to get something similar to the templates?

nevillelyh commented 5 years ago

Not at the moment. We don't use templates and this API, especially how it works within Cloud Dataflow might change so it's not a high priority. Feel free to look into it though.

brodin commented 5 years ago

@nevillelyh rather than using templates – what would be a good way to run a job multiple times with different configs?