flink-extended / flink-scala-api

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
Apache License 2.0
68 stars 14 forks source link

How to package with sbt assembly? #122

Closed qcfu-bu closed 5 months ago

qcfu-bu commented 5 months ago

Following the g8 template, I can run the wordcount example successfully using sbt run. However, when I try packaging the code into a fat JAR with sbt assembly and submitting to a local flink cluster, I run into the following error

java.lang.NoSuchMethodError: 'scala.collection.immutable.ArraySeq scala.runtime.ScalaRunTime$.wrapRefArray(java.lang.Object[])'
        at WordCount$package$.main(WordCount.scala:11)
        at main.main(WordCount.scala:4)
        at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
        at java.base/java.lang.reflect.Method.invoke(Method.java:580)
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:105)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:851)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
        at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1095)
        at org.apache.flink.client.cli.CliFrontend.lambda$mainInternal$9(CliFrontend.java:1189)
        at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at org.apache.flink.client.cli.CliFrontend.mainInternal(CliFrontend.java:1189)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1157)

The following are the contents of my build.sbt file.

val scala3Version = "3.3.3"

lazy val root = project
  .in(file("."))
  .settings(
    name := "flink-test",
    version := "0.1.0-SNAPSHOT",
    scalaVersion := scala3Version,
    libraryDependencies += "org.flinkextended" %% "flink-scala-api" % "1.18.1_1.1.5",
    libraryDependencies += "org.apache.flink" % "flink-clients" % "1.18.1" % "provided"
  )

Compile / run := Defaults
  .runTask(
    Compile / fullClasspath,
    Compile / run / mainClass,
    Compile / run / runner
  )
  .evaluated

// stays inside the sbt console when we press "ctrl-c" while a Flink programme executes with "run" or "runMain"
Compile / run / fork := true
Global / cancelable := true

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x                             => MergeStrategy.first
}
novakov-alexey commented 5 months ago

@qcfu-bu thanks for the question.

This is what I usually do to run Scala-based Flink job:

// I do not usually package Scala std library(s) with SBT assembly, so set to "false"
assemblyPackageScala / assembleArtifact := false

Then when you go to run your Flink job in any other mode than local one, i.e. session cluster, application cluster, job cluster:

  1. Add both Scala standard libraries 2.13 and Scala 3 to the dependency classpath when running with Scala 3:
    • scala3-library_3-3.x.y.jar
    • scala-library-2.13.x.jar When running with 2.13, only one Scala 2.13 is enough.
  2. Change this Flink property by removing scala package from it:

classloader.parent-first-patterns.default: java.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.rocksdb.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback It is needed to load Scala from child classloader to avoid loading old Scala 2.12 which is still in the Apache Flink 1.x distribution. Scala 2.12 won't be packaged in the Flink 2.x, so then we do not need to modify this property any more.

qcfu-bu commented 5 months ago

Thanks for the reply! But, I'm not quite sure what you mean by add Scala standard library to the dependency classpath. I tried to download the JARs of the scala libraries and put them in the flink/lib directory as follows

Screenshot 2024-06-08 at 12 57 01 PM

I've added the settings you've written to my build.sbt and flink-conf.yaml respectively.

But after submitting the rebuilt JAR to flink, I still get the same error as before.

novakov-alexey commented 5 months ago

Can you try two test to see what works better?

  1. Remove flink-scala_2.12-1.18.1.jar from the from flink/lib and start your job again
  2. Do not put Scala JARs into the flink/lib but add them to the user's classpath. In case you submit your job into the Flink Standalone cluster, then both Scala JAR files can be passed via flink CLI as
    flink run --classpath <url>
qcfu-bu commented 5 months ago

I've tried both methods.

  1. After removing the flink-scala JAR, Flink complains that it can't load the scala api. So I put the flink-scala JAR back into the flink/lib directory.
  2. After moving the 2 scala JARs out of flink/lib and passing them to the flink cli manually, I get a different error message. Screenshot 2024-06-08 at 2 26 55 PM
novakov-alexey commented 5 months ago

This example works for me fine: https://github.com/novakov-alexey/flink-sandbox/blob/main/modules/core/src/main/scala/org/example/connectedStreams.scala

flink --version
Version: 1.18.0, Commit ID: a5548cc

> flink run --class org.example.ConnectedStreams ../flink-sandbox/modules/core/target/scala-3.3.1/core-assembly-0.1.0-SNAPSHOT.jar
Job has been submitted with JobID 35ac392accbb978f07034bd6753ffffe
Screenshot 2024-06-08 at 20 58 12

Both Scala libs in the flink/lib folder.

Can you also try it?

novakov-alexey commented 5 months ago
337926659-6765576e-492a-44f2-8225-aba2297807bc
qcfu-bu commented 5 months ago

I've tried to build JARs using your sandbox repo and submitting them and it still does not work. What's frustrating is that flink clearly sees the 2 scala libraries in the classpath.

Screenshot 2024-06-08 at 9 05 52 PM

I'm giving up on getting this to work. Thanks for the help.

novakov-alexey commented 5 months ago

Perhaps try with Docker containers to get some reproducibility.