flink-extended / flink-scala-api

Flink Scala API is a thin wrapper on top of Flink Java API which support Scala Types for serialisation as well the latest Scala version
Apache License 2.0
68 stars 14 forks source link

Fix concurrency bug in subclasses of TypeInformation #112

Closed sonowz closed 6 months ago

sonowz commented 6 months ago

Fixes #111

I made sure that all subclasses of TypeInformation in this repository creates a new instance of TypeSerializer when .createSerializer() method is called.

Besides, while working on it I found that the most of the TypeSerializers in this repository have incorrect method implementations (duplicate, copy, isImmutableType, etc.) They could lead to undesirable behaviors when utilized by Flink. I'll try to fix them when I have some time to do it.

novakov-alexey commented 6 months ago

Thanks @sonowz ! Great bug-report and PR as well. I am wondering what if we apply another strategy to make CaseClassSerializer immutable? I am not sure whether CaseClassSerializer really needs to have the var fields or it could have that fields array as a local variable.

sonowz commented 6 months ago

That's a good idea! I think the reason CaseClassSerializer is implemented with mutable array, is to reduce the memory footprint by allocating permanent array in the heap then reusing it, and therefore achieve the same memory footprint as PojoSerializer. IMHO the performance wouldn't hurt that bad in most cases, so I agree with you.

However, I think createSerializer() method should still call TypeSerializer.duplicate() as well, because if it's not changed there still exists risk of data inconsistencies which is nasty to find out. There would be cases where an user provides custom TypeSerializer such as AvroSerializer, which is thread-unsafe:

given typeInfo1: TypeInformation[Dto] = AvroSchemaConverter.convertToTypeInfo("...")
val typeInfo2: TypeInformation[List[Dto]] = deriveTypeInformation
novakov-alexey commented 6 months ago

I do not follow how the memory allocation would be reduced if that array is used only one time to serialize or deserialize a case class? We anyway have to allocate it. I think there is zero-win doing this.

It is hard to say why it was done in this way, since this code was copied from original flink-scala-api which is primarily object-oriented. I bet if that piece would be written by FP-aware person, that would be different, but maybe we do not know all of the reasons yet.

Anyway we should improve the legacy code by removing such places with mutable code, unless it is really critical to do mutation internally. For multi-threaded context, I think we definitely must not have mutable fields.

sonowz commented 6 months ago

Yep I was just guessing why it had been implemented in this way. Can't read the actual intention of the original author.

novakov-alexey commented 6 months ago

We continue in PR #114