flink-extended / flink-scala-api

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
Apache License 2.0
69 stars 15 forks source link

Concurrency issue in subclasses of TypeInformation #111

Closed sonowz closed 6 months ago

sonowz commented 6 months ago

Hi, I stumbled upon the race conditions in a app using this library.

Description

As far as I know, TypeInformation class is okay to be used as singleton, whereas TypeSerializer isn't. JavaDoc of TypeSerializer class reads:

The methods in this class are not necessarily thread safe. To avoid unpredictable side effects, it is recommended to call duplicate() method and use one serializer instance per thread.

However, the TypeInformation classes in the library just pass TypeSerializer instance, resulting the instance to be used in multiple threads: https://github.com/flink-extended/flink-scala-api/blob/892bd718b4fb0f9c43d815095ce47ddab965b196/src/main/scala/org/apache/flinkx/api/typeinfo/ProductTypeInformation.scala#L18

Therefore, this can lead to data inconsistency when used with thread-unsafe TypeSerializer such as CaseClassSerializer (It has mutable variable used during deserialization.)

Steps to reproduce

This example code shows that the data inconsistency could happen when run in multicore environment:

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.apache.flinkx.api.serializers.*
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.parallel.CollectionConverters.given
import scala.collection.parallel.ParSeq

object ExampleApp:

  case class TestClass(a: String, b: String)

  val typeInfo: TypeInformation[TestClass] = deriveTypeInformation

  def randomString: String = scala.util.Random.alphanumeric.take(100).mkString

  @main
  def main(): Unit =

    // Make field 'a' and 'b' always have the same value
    val values: ParSeq[TestClass] = (1 to 1000).map {_ =>
      val s = randomString
      TestClass(s, s)
    }.par

    values.foreach { value =>
      val serializer = typeInfo.createSerializer(null)

      val out = new ByteArrayOutputStream()
      val outView = new DataOutputViewStreamWrapper(out)
      serializer.serialize(value, outView)
      val bytes = out.toByteArray

      val in = new ByteArrayInputStream(bytes)
      val inView = new DataInputViewStreamWrapper(in)
      val value2 = serializer.deserialize(inView)

      // The assertion fails randomly
      assert(value2.a == value2.b)
    }

Suggested fix

Looking at the createSerializer() method implementation of POJOTypeInfo in flink-core, it creates a new instance of TypeSerializer.

In the above example, if the TypeSerializer instantiation is modified like this:

val serializer = typeInfo.createSerializer(null).duplicate()

the data inconsistency does not happen anymore.

I'll submit a pull request to fix the issue. Feel free to ask if you have question about the issue or the fix.

novakov-alexey commented 6 months ago

I have looked one more time on this issue and tried to understand its probability in real life example.

Why would you use multi-threading in Flink program yourself/manually? We usually rely on Flink execution environment to schedule all operators of the job graph and respective tasks for them. Don't you just let Flink to schedule a job graph which would derive TypeInformation as many times as needed, thus it will create own typeSerializer for each task, even if all tasks run in the same TaskManager. I mean that this case with ParSeq above is an artificial example even though it reveals the problem.

sonowz commented 6 months ago

No, I'm afraid that this bug affects Flink-controlled internals. It actually happened in my app, which is just a normal Flink app.

I'll try to give an example of such typical app:

source.map[SomeDto](...)
  .setParallelism(32)

When this part derives TypeInformation, all tasks always refer to the same TypeInformation instance because it's cached. Therefore, all tasks call createSerializer() method of the same instance, and they get the same thread-unsafe TypeSerializer instance (this behavior could be confirmed using Java debugging tool). Since tasks are typically run in parallel, almost every Flink app has a chance to have data inconsistencies.

Recently I ran a Flink app run in 48-core environment and experienced data inconsistency, and I can confirm after fixing the code the inconsistency no longer happens. I hope this gets fixed soon!

novakov-alexey commented 6 months ago

Fixed by #113 and #114