spotify / scio

A Scala API for Apache Beam and Google Cloud Dataflow.
https://spotify.github.io/scio
Apache License 2.0
2.55k stars 513 forks source link

SCollectionMatcher issue when targeting Java 17+ #5483

Open f-loris opened 5 days ago

f-loris commented 5 days ago

I recently upgraded one project from Java 8 to Java 21 and I'm experiencing issues in my tests when targeting Java 17 or 21 bytecode. In this case some tests started to fail using the matchers satisfy/satisfySingleValue in my JobTests.

Therefore, I built some simple reproducer that shows that something is dropping the local variable in my lambda expression as it's value is null when executed.

Scio: 0.14.7 Java: 21 Scala: 2.13.14 Scala Compiler flags: -release 21 -Ydelambdafy:inline -Ymacro-annotations -language:higherKinds -language:implicitConversions

import com.spotify.scio.ContextAndArgs
import com.spotify.scio.io.TextIO
import com.spotify.scio.testing._

object TestPipeline {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(cmdlineArgs)
    sc.read(TextIO(args("source")))(TextIO.ReadParam())
      .map(v => s"Hello $v")
      .write(TextIO(args("destination")))(TextIO.WriteParam())

    sc.run().waitUntilFinish()
  }
}

object IssueReproducer {
  val WORLD = "World"
}

class IssueReproducer extends PipelineSpec {

  // fails now
  it should "work with local value" in {
    val world = "World"
    JobTest[TestPipeline.type]
      .args("--source=source.csv", "--destination=destination.csv")
      .input(TextIO("source.csv"), Seq(world))
      .output(TextIO("destination.csv"))(
        _ should satisfySingleValue[String] { value =>
          println(s"Hello $world") // shows "Hello null"
          value == s"Hello $world"
        }
      )
      .run()
  }

  // works
  it should "work with global value" in {
    JobTest[TestPipeline.type]
      .args("--source=source.csv", "--destination=destination.csv")
      .input(TextIO("source.csv"), Seq(IssueReproducer.WORLD))
      .output(TextIO("destination.csv"))(
        _ should satisfySingleValue[String] {
          _ == s"Hello ${IssueReproducer.WORLD}"
        }
      )
      .run()
  }

}

The output of the failing test is Hello null as the variable world is then nullwith the test result being of course a failing assertion:

pply@{Matchers.scala:7291}:2/ParMultiDo(Anonymous).output: assertion failed
java.lang.AssertionError: apply@{Matchers.scala:7291}:2/ParMultiDo(Anonymous).output: assertion failed
    at org.apache.beam.sdk.testing.PAssert$PAssertionSite.capture(PAssert.java:175)
    at org.apache.beam.sdk.testing.PAssert.thatSingleton(PAssert.java:498)
    at org.apache.beam.sdk.testing.PAssert.thatSingleton(PAssert.java:490)
    at com.spotify.scio.testing.SCollectionMatchers$$anon$19$$anon$20.apply(SCollectionMatchers.scala:488)
    at com.spotify.scio.testing.SCollectionMatchers$$anon$19$$anon$20.apply(SCollectionMatchers.scala:482)
    ....

I already experimented with ClosureCleaner and Externalizer but I couldn't reproduce it with them alone. When changing the Scala compiler release flag to 8 or even 11 it works fine, but fails with 17 or 21. Isn't it supposed to work when targeting newer Java versions? Is there a general risk that targeting Java 21 might break things like this also in productive pipelines and not just only tests? At least right now I haven't discovered any other issue.

clairemcginty commented 5 days ago

thanks for reporting! I can reproduce running sbt scio-test-core/test with release version set to 17, will look into what's going on.

Also, maybe we should be setting release version as well in our GHA java matrix workflows

clairemcginty commented 5 days ago

looks to be closure-related; I modified scio's satisfy() test to catch the assertion exception, like so:

runWithContext {
  _.parallelize(Seq(newTR(1))) should satisfy[TestRecord] { l =>
    try {
      val newTr = newTR(1) // exception is actually thrown here, not on assertion
      // l.toList.contains(newTr)
    } catch {
      case e: Exception =>
        e.printStackTrace()
        throw e
    }
  }
}

and get:

java.lang.NullPointerException: Cannot invoke "com.spotify.scio.testing.SCollectionMatchersTest$$anonfun$9.com$spotify$scio$testing$SCollectionMatchersTest$$anonfun$$$outer()" because the return value of "com.spotify.scio.testing.SCollectionMatchersTest$$anonfun$9$$anonfun$apply$91.com$spotify$scio$testing$SCollectionMatchersTest$$anonfun$$anonfun$$$outer()" is null
    at com.spotify.scio.testing.SCollectionMatchersTest$$anonfun$9$$anonfun$apply$91$$anonfun$apply$92.apply(SCollectionMatchersTest.scala:334)
    at com.spotify.scio.testing.SCollectionMatchersTest$$anonfun$9$$anonfun$apply$91$$anonfun$apply$92.apply(SCollectionMatchersTest.scala:332)
    at com.spotify.scio.testing.SCollectionMatchers$$anon$17$$anon$18$$anonfun$8.apply(SCollectionMatchers.scala:461)
    at com.spotify.scio.testing.SCollectionMatchers$$anon$17$$anon$18$$anonfun$8.apply(SCollectionMatchers.scala:461)
    at com.spotify.scio.testing.ScioMatchers$$anonfun$assert$1.apply(SCollectionMatchers.scala:144)
    at com.spotify.scio.testing.ScioMatchers$$anonfun$assert$1.apply(SCollectionMatchers.scala:144)
    at com.spotify.scio.testing.ScioMatchers$$anon$3.apply(SCollectionMatchers.scala:114)
    at com.spotify.scio.testing.ScioMatchers$$anon$3.apply(SCollectionMatchers.scala:108)
    at org.apache.beam.sdk.testing.PAssert.doChecks(PAssert.java:1621)
    at org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn.processElement(PAssert.java:1588)
    at org.apache.beam.sdk.testing.PAssert$GroupedValuesCheckerDoFn$DoFnInvoker.invokeProcessElement(Unknown Source)
    at org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
    at org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
    at org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner.processElementInReadyWindows(SimplePushbackSideInputDoFnRunner.java:79)
    at org.apache.beam.runners.direct.ParDoEvaluator.processElement(ParDoEvaluator.java:244)
    at org.apache.beam.runners.direct.DoFnLifecycleManagerRemovingTransformEvaluator.processElement(DoFnLifecycleManagerRemovingTransformEvaluator.java:54)
    at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
    at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

Unfortunately, following the stack trace to SCollectionMatchers.scala:461, this speaks to an issue with ClosureCleaner, which isn't maintained