spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.55k stars 513 forks source link

BinaryIO reads break when file size > 64 MB #5472

Closed clairemcginty closed 5 days ago

clairemcginty commented 1 week ago

Dataflow will try to break the file into offset splits of desiredByteSizeBytes, which we've set to 64MB, although binary files should not be split.

Sample repro:

object TestBinaryWrite {
  val coder = AvroCoder.of(classOf[Account]) // used to map avro -> bytes

  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    args("method") match {
     // produces a singlefile about 280MB
      case "write" =>
          val records = sc.parallelize(1 to 10_000)
            .flatMap(i => (1 to 250).map(_ * i))
            .map { i =>
              Account
                .newBuilder()
                .setId(i)
                .setAmount(i.toDouble)
                .setName(UUID.randomUUID().toString)
                .setType("checking")
                .build()
            }
             .map(CoderUtils.encodeToByteArray(coder, _))
            .saveAsBinaryFile(args("output"), numShards = 1)
      // read the file we just wrote
      case "read" =>
          sc.binaryFile(args("input"), reader = MyBinaryReader)
  }

    sc.run()

  // A completely meaningless implementation, doesn't matter to demo this bug
  case object MyBinaryReader extends BinaryFileReader {
    override type State = Int
    override def start(is: InputStream): Int = 1
    override def readRecord(state: Int, is: InputStream): (Int, Array[Byte]) = {
      val buf = new Array[Byte](1000)
      is.read(buf)
      (state, buf)
    }
  }
}

which, on read, throws this error in DF:

Error message from worker: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: CompressedSources must start reading at offset 0. Requested offset: 201326592
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:187)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
    org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
    org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:117)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
    org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
    org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
    org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:304)
    org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:276)
    org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:206)
    org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:150)
    org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:130)
    org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:117)
    java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
    java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.lang.IllegalArgumentException: CompressedSources must start reading at offset 0. Requested offset: 201326592
    org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
    org.apache.beam.sdk.io.ReadAllViaFileBasedSource$ReadFileRangesFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
    org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
    org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
    org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:276)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:86)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:432)
    org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:89)
    org.apache.beam.sdk.transforms.MapElements$2.processElement(MapElements.java:151)
    org.apache.beam.sdk.transforms.MapElements$2$DoFnInvoker.invokeProcessElement(Unknown Source)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
    org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
    org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
    org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:276)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:86)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:432)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:412)
    org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
    org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
    org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
    org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
    org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
    org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:54)
    org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185)
    ... 22 more
Caused by: java.lang.IllegalArgumentException: CompressedSources must start reading at offset 0. Requested offset: 201326592
    org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument(Preconditions.java:204)
    org.apache.beam.sdk.io.CompressedSource.<init>(CompressedSource.java:232)
    org.apache.beam.sdk.io.CompressedSource.createForSubrangeOfFile(CompressedSource.java:259)
    org.apache.beam.sdk.io.ReadAllViaFileBasedSourceTransform$AbstractReadFileRangesFn.process(ReadAllViaFileBasedSourceTransform.java:131)

My guess is that something is misconfigured with the splittability of the BinaryIO reader implementation? Or there's some method to direct ReadAllViaFileBasedSource not to try to split the source into offset ranges. We should compare against a sample Beam non-splittable source (I think TFRecordIO is such an example?)

clairemcginty commented 1 week ago

Upon following my own advice, I tried copying TFRecordIO's technique of setting desiredBundleSizeBytes of Long.MAX_VALUE to avoid splitting, and it solved the problem 👍