flink-extended / flink-scala-api

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
Apache License 2.0
69 stars 15 forks source link

`scala.MatchError: None (of class scala.None$)` during `CaseClassSerializer` #148

Closed chollinger93 closed 3 weeks ago

chollinger93 commented 2 months ago

Direct follow up from #106, but I figured a new issue would be helpful. Using "1.18.1_1.1.5" w/ scala 3.4.0 on AWS' managed Flink (i.e., little control over their setup) w/ Flink 1.18.

I am seeing scala.MatchError: None (of class scala.None$) issues just like in #106, also having a hard time replicating it locally at all, so I can't test against this, nor run a debugger through it. I think @novakov-alexey 's classpath suspicion is spot on.

My flow is essentially []byte => A (via Kafka) => B (via ProcessFunction), both A and B being case classes w/ Optional values.

The Kafka deserializer works flawlessly.

It seems to fail on the custom mapping step, which is a ProcessFunction[A, B]. The stacktrace points me to a ctx.output(tag, p) w/in the ProcessFunction, where the following happens:

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:92)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:61)
    at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:95)
    at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:93)
    at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:103)
    at (... ctx.output)
...
Caused by: scala.MatchError: None (of class scala.None$)
    at org.apache.flinkx.api.serializer.OptionSerializer.copy(OptionSerializer.scala:50)
    at org.apache.flinkx.api.serializer.OptionSerializer.copy(OptionSerializer.scala:48)
    at org.apache.flinkx.api.serializer.CaseClassSerializer.$anonfun$2(CaseClassSerializer.scala:76)
    at org.apache.flinkx.api.serializer.CaseClassSerializer.$anonfun$adapted$2(CaseClassSerializer.scala:76)
    at scala.collection.immutable.Range.map(Range.scala:59)
    at org.apache.flinkx.api.serializer.CaseClassSerializer.copy(CaseClassSerializer.scala:76)
    at org.apache.flinkx.api.serializer.CaseClassSerializer.copy(CaseClassSerializer.scala:72)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
    ... 34 more

The line in question is https://github.com/flink-extended/flink-scala-api/blob/v1.18.1_1.1.5/src/main/scala/org/apache/flinkx/api/serializer/OptionSerializer.scala#L50

In the past, my A => B map has failed on something seemingly simple like

case class Foo(bar: Option[Bar])
case class Bar(id: String, baz: Option[String])
// imagine 
val x = Foo(Some(Bar("ID", None))

So I resorted to (essentially):

val x: Foo
val y: Foo = x.baz match 
  case Some(b) => x.copy(bar = Some(b))
  case _ => x.copy(bar = None)

(Note the match on _, rather than matching on None)

Which fixed the issue and provided the downstream processing an actual None value.

Today, dealing with a new message type, I had to deal with a even more nested case class that was partially None. I'll spare you the full example, but doing basically this

val x: Foo
val y: Foo = x.baz match 
  case Some(b) => x.copy(b.couldBeNone match
    case Some(c) => .... //you get the idea

Did work.

Which would support the theory that, unless you manually re-write every None type, something is making it so that None != None. The only way I can "explain" that (using that term liberally) is "something something classpath".

I haven't read through the OptionSerializer code in this repo in it's entirety, but one could argue for pattern matching against _ there too.

If I get the time I'll do more digging, but for now my super-cursed pattern matching against _ works around this (I think?)

novakov-alexey commented 2 months ago

Hi @chollinger93 , thank you for the detailed investigation and for spotted workaround. As I understood you can reproduce this issue again and again?

I think it is really needs to be tried on the Flink app/job cluster environment. Probably this can't be spotted in Flink Local mode or simply saying within single JVM.

One question: when you run on AWS Flink, do you remove Flink's Scala JAR somehow? Official recipe is to remove the Scala JAR from flink/lib folder. Not sure if that feasible in AWS.

Another option is to suppress old Flink's Scala is to use this property: classloader.parent-first-patterns.default. Works fine for me so far. Please check here the idea: https://ververica.zendesk.com/hc/en-us/articles/13583223554460-How-to-run-my-Flink-job-with-Scala-2-13-and-Scala-3-version-in-VVP

chollinger93 commented 2 months ago

Yes, this is reproducible, provided you get a message that has a None field that isn't covered by my "workaround".

I don't think you can remove jars from the AWS Flink runners - I think it's all managed K8s pods with no custom init actions.

The link you provided is a good idea. Unfortunately, I don't think the AWS runner gives me access to that property (or anything in the cluster config, outside of raising a support ticket for some of them) and setting classloader.parent-first-patterns.default programmatically isn't really an option.

novakov-alexey commented 2 months ago

Got it. I will try you code example on one of my session clusters (with and without Flink's scala around).

It would be great if you could verify at AWS whether changing the classloader.parent-first-patterns.default property helps. Yes, AWS requires to raise a ticket for that unfortunatelly.

novakov-alexey commented 1 month ago

Hi @chollinger93 ,

I've tried to reproduce your scenario with in-memory data generator, but I could not get the error in Scala Option serialiser. Could you check if this project code is similar to your scenario? https://github.com/novakov-alexey/flink-scala-option I supress Flink embeeded Scala by removing scala package from this Flink configuration property:

classloader.parent-first-patterns.default
java.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.rocksdb.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback

Another way, if could you give me complete example of a Flink job which I could build and deploy in any Flink-based platform.

chollinger93 commented 1 month ago

Hey, sorry for not responding earlier. I have it on my backlog to try Flink via K8s/locally, but haven't gotten around to it yet. Setting classloader.parent-first-patterns.default via the AWS support seems like too big of an operational burden, assuming we won't have a way to self-service this.

Your example is pretty spot on, assuming FakeProcessFunction runs on a different executor than the source.

My real code at work has the added complexity of using protobuf, but scalapb generates regular case classes, so I don't think that should have any impact.

I can see if I can put together a minimal example soon.

Sorry about the delays on my end - really appreciate you looking into this!

chollinger93 commented 1 month ago

I spent the morning trying to get a minimal example to work (and by "work", I mean "get it to break"), but I'm afraid between us having multiple modules, Kafka, protobuf (and using protobuf to parse from raw []byte), various dependencies, and our build.sbt (that is being a bit too clever about caching), I'm not able to reproduce it in isolation and had to timebox it. I wrote about a bit of that complexity here, in case you're curious.

That being said, I did try your sbt adjustments and I believe the missing piece was ExclusionRule(organization = "org.scalameta").

While poking around the jar, I saw coursierapi/shaded/scala/MatchError.class and others being included. They come from org.scalameta. A diff between 2 assemblies with and without that ExlusionRule yields 2120 classes now excluded.

On a related note, I don't think we can't actually use assemblyPackageScala / assembleArtifact, since AWS' Flink doesn't appear to be including the scala standard lib, so that would yield Caused by: java.util.concurrent.CompletionException: java.lang.NoClassDefFoundError: scala/MatchError. It is within the realms of possibility that our dependencies.scala is messing something up there, but it doesn't seem to be necessary anyways. I don't think I've ever used that rule, so maybe I'm misunderstanding it.

Here are the relevant sections:

def excludeJars(cp: Classpath) = cp filter { f =>
  Set(
    "scala-asm-.+-scala-1\\.jar",
    "interface-.+\\.jar",
    "interface-.+\\.jar",
    "jline-terminal-.+\\.jar",
    "jline-reader-.+\\.jar",
    "jline-.+\\.jar",
    "scala-compiler-.+\\.jar",
    "scala3-compiler_.+\\.jar",
    "flink-shaded-zookeeper-.+\\.jar",
    "flink-shaded-jackson-.+\\.jar",
    "annotations-.+\\.jar"
  ).map(p => f.data.getName.matches(p)).forall(identity)
}

lazy val commonSettings = Seq(
// ...
  assembly / assemblyExcludedJars := {
    val cp = (assembly / fullClasspath).value
    excludeJars(cp)
  }
}
// dependencies.scala
 "org.apache.flink" % "flink-core" % V.flink,
("org.flinkextended" % "flink-scala-api_3" % V.flinkScala)
  .excludeAll(
    ExclusionRule(organization = "org.apache.flink"),
    ExclusionRule(organization = "org.scalameta")
  )
  .exclude("com.thesamet.scalapb", "lenses_2.13")
  .exclude("com.thesamet.scalapb", "scalapb-runtime_2.13"),

If you have a docs page/repo anywhere, I'd be happy to contribute there to make this easier to discover for future AWS users.

novakov-alexey commented 1 month ago
  1. Thanks for sharing the blog post. It looks very interesting, I will definitely read it.
  2. Do I understand correctly that the issue goes away if "org.scalameta" package is excluded in assembly phase and issue does not happen anymore for you on AWS?
  3. Indeed, I usually exclude those jars in the Set of excludeJars function to get much smaller assembly JAR, otherwise it is insanely big.
  4. Nice approach to use regexp in the JAR names, I did not know it supports that. Thanks for the hint!
  5. I use assemblyPackageScala / assembleArtifact := false, because I attach Scala standard library manually as additional dependencies, i.e. as separate JAR files to a job classpath. Scala libraries:
    • https://mvnrepository.com/artifact/org.scala-lang/scala3-library_3/3.4.1
    • https://mvnrepository.com/artifact/org.scala-lang/scala-library/2.13.13 It works flawlessly in this way. One more important setting to use this approach is to remove "scala" package from the "classloader.parent-first-patterns.default" Flink property. As for AWS, I think it should work there as well, if you get AWS support to set this property for you. Otherwise, you have to include Scala standard library into your assembly JAR. By the way, Flink 2.x promises to remove Scala 2.12 from it. If it happens, then I think we won't need to change this "classloader.parent-first-patterns.default" property anymore. Thus, support of newer user's Scala will be simpler.
  6. I am of course support an idea to contribute any docs to the repo. What would the main idea of the issue documentation? Just for me to understand the right way to document that.
chollinger93 commented 1 month ago

I'm afraid this caused some premature celebration - after letting this job run in our dev environment for a while, I just got the ping for messages being dropped due to the same error from the original post.

I'm going to have to table the investigation here. Looking at the full rendered dependency graph I don't see anything blatantly wrong with my jar, so I must assume without control over the Flink classpath in the AWS runner, there's nothing else I can do.

novakov-alexey commented 1 month ago

If you have a chance to try your job with Apache Flink Kubernetes Operator or Ververica Community version that would be another option to confirm whether classpath control is exactly the root cause of the problem

chollinger93 commented 1 month ago

I will try that in the next weeks when I get a chance to set it up on a K8s cluster!

arnaud-daroussin commented 1 month ago

Hi @novakov-alexey,

I'm able to reproduce this bug with a minimalist project: https://github.com/arnaud-daroussin/DuplicatedNoneBug

Somehow, there is 2 instances of scala.None case object in the classloader, causing scala.MatchError: None (of class scala.None$) error.

A simple ProcessFunction outputting None, given to a harness test allows to reproduce the issue.

I didn't spot where the problem is, but with this minimal project, it should help you to pinpoint this bug!

novakov-alexey commented 1 month ago

Hi @arnaud-daroussin , thanks, I will have a look.

novakov-alexey commented 4 weeks ago

@arnaud-daroussin once again, thanks for creating a separate project with a test. I think the problem is not in the library code but in the user side code. It can be fixed by: https://github.com/arnaud-daroussin/DuplicatedNoneBug/pull/1

arnaud-daroussin commented 4 weeks ago

Of course, you are right, I should have seen my simplified project don't use flink-scala-api at all!

But thanks to your remark, I eventually found a missing serializer import for a ProcessFunction on my main project.

novakov-alexey commented 3 weeks ago

I will close then this ticket as we still do not have clear evidence that this issue is a bug and not a user side misconfiguration.