Closed lciolecki closed 1 week ago
The changes in this pull request introduce updates primarily focused on version 1.19.0 for the documentation and code related to Kafka components in Flink. The changelog and migration guide have been enhanced to reflect new features and improvements, including handling missing TypeInformation and modifications to method signatures across various classes. New classes and methods have been added to improve serialization and deserialization of Kafka ConsumerRecord
objects, along with enhancements to error handling and type information management.
File | Change Summary |
---|---|
docs/Changelog.md | Added a new section for version 1.19.0 detailing improvements in Flink Kafka Source/Sink TypeInformation and lifting TypingResult information for dictionaries. |
docs/MigrationGuide.md | Updated to include changes in version 1.19.0, detailing removal of old ConsumerRecord constructor support and various enhancements in handling TypeInformation. Method signatures updated for several classes. |
engine/flink/components-api/src/main/scala/pl/touk/nussknacker/engine/flink/api/typeinformation/TypeInformationDetection.scala | Modified forClass method to handle Any type explicitly. |
engine/flink/components/kafka/src/main/scala/pl/touk/nussknacker/engine/flink/util/transformer/FlinkKafkaComponentProvider.scala | Updated imports and changed instantiation of universalSerdeProvider to use FlinkUniversalSchemaBasedSerdeProvider . |
engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/FlinkUniversalSchemaBasedSerdeProvider.scala | Introduced new object with a create method for schema-based serialization/deserialization. |
engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeInfo.scala | Added ConsumerRecordTypeInfo and ConsumerRecordSerializer classes for managing Kafka ConsumerRecord serialization. |
engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/flink/FlinkKafkaUniversalSink.scala | Restructured registerSink method to include type information handling. |
engine/flink/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/source/flink/FlinkKafkaSchemaRegistryBasedKeyValueDeserializationSchemaFactory.scala | Introduced new class for creating deserializers that leverage schema registry. |
engine/flink/schemed-kafka-components-utils/src/test/scala/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordSerializerSpec.scala | Added a test suite for ConsumerRecordSerializer . |
utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/ConsumerRecordDeserializationSchemaFactory.scala | Restructured deserialization logic to improve separation of concerns. |
utils/kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/kafka/consumerrecord/SerializableConsumerRecord.scala | Removed deprecated annotation from toKafkaConsumerRecord method, clarifying handling of leaderEpoch . |
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/schemaregistry/universal/UniversalSchemaBasedSerdeProvider.scala | Changed visibility of createSchemaIdFromMessageExtractor from private to public. |
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/serialization/KafkaSchemaBasedKeyValueDeserializationSchemaFactory.scala | Added createKeyOrUseStringDeserializer method to enhance key deserialization flexibility. |
utils/schemed-kafka-components-utils/src/main/scala/pl/touk/nussknacker/engine/schemedkafka/sink/UniversalKafkaSinkFactory.scala | Enhanced error handling and validation logic in schema extraction methods. |
utils/test-utils/src/main/scala/pl/touk/nussknacker/test/RandomImplicits.scala | Introduced RandomImplicits object with methods for generating random strings. |
engine/flink/schemed-kafka-components-utils/src/main/java/pl/touk/nussknacker/engine/schemedkafka/flink/typeinfo/ConsumerRecordTypeSerializerSnapshot.java | Added ConsumerRecordTypeSerializerSnapshot class for managing serialization of ConsumerRecord . |
🐰 In the fields where code does play,
New features hop and dance today.
Kafka's records, swift and bright,
With serializers taking flight.
In version nineteen, joy we find,
A world of types, all intertwined! 🌼
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?
Describe your changes
Checklist before merge
Summary by CodeRabbit
New Features
FlinkUniversalSchemaBasedSerdeProvider
for enhanced schema-based serialization/deserialization.ConsumerRecordTypeInfo
andConsumerRecordSerializer
classes for managing KafkaConsumerRecord
serialization.Bug Fixes
UniversalKafkaSinkFactory
.Documentation
Tests
ConsumerRecordSerializer
to validate serialization/deserialization processes.