j3-signalroom / apache_flink-kickstarter

Examples of Apache Flink® applications showcasing the DataStream API, Table API in Java and Python, and Flink SQL, featuring AWS, GitHub, Terraform, Streamlit, and Apache Iceberg.
https://linkedin.com/in/jeffreyjonathanjennings
MIT License
1 stars 0 forks source link

Add an Avro version of the `airline.skyone` and `airline.sunset` Kafka topics. #463

Closed j3-signalroom closed 1 week ago

j3-signalroom commented 1 week ago

The Avro version of the Kafka topics: airline.skyone_avro and airline.sunset_avro.

j3-signalroom commented 1 week ago
2024-11-17 14:46:31
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
reserved (org.apache.avro.Schema$Field)
fieldMap (org.apache.avro.Schema$RecordSchema)
schema (org.apache.avro.generic.GenericData$Record)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:143)
    at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:21)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:528)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:311)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:74)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
    at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:90)
    at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collectAndCheckIfChained(ChainingOutput.java:40)
    at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:83)
    at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:34)
    at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
    at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
    at org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(IteratorSourceReaderBase.java:111)
    at org.apache.flink.api.connector.source.util.ratelimit.RateLimitedSourceReader.pollNext(RateLimitedSourceReader.java:69)
    at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:422)
    at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:579)
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:909)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:858)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.UnsupportedOperationException
    at java.base/java.util.Collections$UnmodifiableCollection.add(Collections.java:1067)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:679)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:106)
    ... 35 more
j3-signalroom commented 1 week ago

The exception you’re encountering, com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException, during serialization in a Flink job using Kryo serialization is likely due to Kryo attempting to serialize a read-only or immutable collection (such as those returned by Collections.unmodifiableList or similar) or a complex object structure it cannot handle properly.

Possible Causes and Solutions:

  1. Immutable Collections:

    • The error trace shows java.util.Collections$UnmodifiableCollection.add, suggesting Kryo is trying to deserialize a collection that was made immutable using methods like Collections.unmodifiableList().
    • Solution: Consider using mutable collections instead or configuring Kryo to use a custom serializer that can handle immutable collections.
  2. Unsupported Avro Object Serialization:

    • The exception stack trace also points to Avro schema fields (org.apache.avro.Schema$Field). Kryo may be having difficulty serializing complex Avro schemas or types.
    • Solution: Consider using Flink's built-in serializers (e.g., AvroSerializer) for Avro objects instead of Kryo. You can do this by registering the type with Flink’s TypeInformation system or using Avro-specific serialization libraries.
  3. Custom Kryo Serializer:

    • You may need to create and register a custom Kryo serializer for the problematic data types.
    • Example:
      env.getConfig().addDefaultKryoSerializer(MyClass.class, new MyCustomSerializer());
    • This approach can help if you have specific needs for serializing and deserializing your objects.
  4. Flink Configuration for Serialization:

    • If the objects causing issues are complex, you can try configuring the serialization to use Java serialization instead of Kryo, or explicitly register types with Kryo for better control.
    • Example of switching serialization:
      env.getConfig().enableGenericTypes();
    • However, this may come with a performance trade-off.
  5. Avoiding Complex Structures in State:

    • Consider simplifying data structures passed through stateful operations if feasible. Nested or complex objects can lead to serialization challenges, especially if third-party objects (e.g., Avro-generated classes) do not support efficient serialization.

Immediate Steps:

  1. Investigate and Modify Data Structure: Check your data structure, especially fields like org.apache.avro.Schema$Field, to see if they contain immutable collections or unsupported complex objects.
  2. Switch Serializer: Use a serializer compatible with the data structure (e.g., Avro-specific serializer for Avro objects).
  3. Custom Serializer: Create and register custom serializers if you need to work with objects that have complex serialization requirements.
j3-signalroom commented 1 week ago

The code should not use Kryo serialization. The data should be serialized in Avro.

j3-signalroom commented 1 week ago

Deprecated. Register data types and serializers through hard codes is deprecated, because you need to modify the codes when upgrading job version. You should configure this by config option PipelineOptions.SERIALIZATION_CONFIG.

Registers the given type with a Kryo Serializer.

j3-signalroom commented 1 week ago

Use

PipelineOptions.SERIALIZATION_CONFIG
j3-signalroom commented 1 week ago

https://flink.apache.org/2020/04/15/flink-serialization-tuning-vol.-1-choosing-your-serializer-if-you-can/

j3-signalroom commented 1 week ago

https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/formats/avro/

j3-signalroom commented 1 week ago

See Issue #469