spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.55k stars 514 forks source link

Enhance saveAsCustomOutput and provide access to POutput #4995

Open mkuthan opened 12 months ago

mkuthan commented 12 months ago

Function saveAsCustomOutput is an excellent extension point for implementing my own IO. It gives me also an abstraction for testing the whole pipeline using JobTest. I would like thanks all Scio authors for that functionality.

But for IOs like BigQuery, POutput is not a terminal step, it delivers information about errors. The API for accessing such errors is very different for each writing to BigQuery method.

You can also access successful rows for Streaming Inserts, Storage Write API and Batch Loads with another 3 methods.

I know that I could use internal.apply(name, io) method and extract errors like this:

val io = BigQueryIO.writeTableRows().to(tableId)

val errors = scollection.transform(name) { in =>
  val results = in.internal.apply("Write to BQ", io)

  scollection.context.wrap(results.getFailedStorageApiInserts)
    .map(failedRow => (failedRow.getRow, failedRow.getErrorMessage))
}
errors

But I can't use output(CustomIO[Out](id)) { results => ...} from JobTest anymore. I could hack this limitation with TransformOverride but it is not so easy to write assertion then:

transformOverride(TransformOverride.ofIter[In, Out](
        id,
        (r: In) =>
          // how to make an assertion here?
          Option.empty[Out].toList
      ))

I would love to see the following enhancement for saveAsCustomOutput. This is only a short showcase of the overall idea, but I'm glad to hear from you better API or implementation:

def saveAsCustomOutput[O <: POutput](
    name: String,
    transform: PTransform[PCollection[T], O]
)(outputFn: O => Unit): ClosedTap[Nothing] = {
  if (self.context.isTest) {
    TestDataManager.getOutput(self.context.testId.get)(CustomIO[T](name))(self)
  } else {
    val pOutput = self.internal.apply(name, transform)
    outputFn(pOutput)
  }

  ClosedTap[Nothing](EmptyTap)
}

With such extension I'm able to do anything I need with the writeResult, for example:

val io = BigQueryIO.writeTableRows().to(tableId)
var errors = scol.context.empty[(TableRow, String)] // I don't like var but didn't find better way to access output from the closure below

scol.saveAsCustomOutput(id, io) { writeResult =>
    errors = self.context.wrap(writeResult.getFailedStorageApiInserts)
      .map(failedRow => (failedRow.getRow, failedRow.getErrorMessage))
  }

errors

What do you think about such extension? Looks very generic and should handle all kinds of use cases when POutput delivers something valuable.

mkuthan commented 11 months ago

One more experiment with an even more generic API that allows encapsulating all steps of the custom IO as a single Scala friendly transform.

With custom output like this, I could provide a composite transform that converts domain objects into underlying storage format (for example JSON bytes) and then save bytes in the database using the Beam IO connector. In the job test I will be able to use domain object T because the whole transform will be replaced by the stub. As a bonus, transformFn has full access to the POutput to handle errors.

val self: SCollection[T] ...

def betterSaveAsCustomOutput[O <: POutput](name: String)(transformFn: SCollection[T] => O): ClosedTap[Nothing] = {
  if (self.context.isTest) {
    TestDataManager.getOutput(self.context.testId.get)(CustomIO[T](name))(self)
  } else {
    self.applyInternal(
      name,
      new PTransform[PCollection[T], O]() {
        override def expand(input: PCollection[T]): O =
          transformFn(self.context.wrap(input))
      }
    )
  }

  ClosedTap[Nothing](EmptyTap)
}

For the reading part I could do the same. Provide the composite transform that reads bytes from the database using Beam IO connector, deserializes JSON, and returns domain objects. Everything as a single transform, easy to test at the job level. In the test I only need to prepare domain objects as input. The bytes representation from the Beam IO connector is fully encapsulated in the composite transform.

val self: ScioContent = ...

def betterCustomInput[T, I >: PBegin <: PInput](name: String)(transformFn: I => SCollection[T]): SCollection[T] =
  self.requireNotClosed {
    if (self.isTest) {
      TestDataManager.getInput(self.testId.get)(CustomIO[T](name)).toSCollection(self)
    } else {
      self.applyTransform(
        name,
        new PTransform[I, PCollection[T]]() {
          override def expand(input: I): PCollection[T] =
            transformFn(input).internal
        }
      )
    }
  }
}

Alternatively I could put my composite transform into plain Beam PTransform and use existing customInput or saveAsCustomOutput. But I would prefer to use the Scio API in my code :)

RustedBones commented 10 months ago

Thanks for the feature request @mkuthan. Will look at this during our preparation of 0.14. We should also change the BQ to probably expose the handleErrors with the raw WriteResult

mkuthan commented 10 months ago

Wonderfull, thanks!

You could also look at the (ugly) workaround I applied in my playground project:

BigQuery IO aligned to my vision of the infrastructure layer:

How it plays with JobTest. Input/output defined within domain types instead of bytes from Pubsub or TableRows from BigQuery. TestDataManager fully overrides IO connectors from the infrastructure layer.

RustedBones commented 7 months ago

@mkuthan just a status update on this issue.

We've opted for another strategy that avoids passing a pipeline transforms function to the IO. We prefered to expose possible SCollection write result as side output. This way, we can keep a 'flat' pipeline definition.

On 0.14, the testing framework will mock those as empty, but we plan to let users set custom values in the future.

I hope this setup fits with your needs. Let us know otherwise.