Last summer:
The exception I am mainly encountering is:
Caused by: java.lang.ClassCastException: [B cannot be cast to org.apache.flink.api.java.tuple.Tuple
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:124)
at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.serialize(TupleSerializer.java:30)
at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85)
at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
at org.apache.flink.python.api.streaming.data.PythonReceiver.collectBuffer(PythonReceiver.java:108)
at org.apache.flink.python.api.streaming.data.PythonStreamer.streamBufferWithoutGroups(PythonStreamer.java:272)
at org.apache.flink.python.api.functions.PythonMapPartition.mapPartition(PythonMapPartition.java:54)
at org.apache.flink.runtime.operators.MapPartitionDriver.run(MapPartitionDriver.java:103)
at org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:486)
at org.apache.flink.runtime.iterative.task.AbstractIterativeTask.run(AbstractIterativeTask.java:146)
at org.apache.flink.runtime.iterative.task.IterationIntermediateTask.run(IterationIntermediateTask.java:92)
at org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:351)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
at java.lang.Thread.run(Thread.java:745)
I have exhausted all of the workarounds I could think of that might have tricked Flink. I am not sure why this issue happens, but it appears to be something in the deserializer that handles Python-Java communication.
I put in a workaround to convert a tuple, if found, into a byte array, but then I got an exception that at some point, a byte array was being passed into the tuple deserializer. That means both types are being passed at the wrong times to the wrong deserializers. Adding the same workaround to convert data types to both functions didn't work, so there must be something deeper.
In order to find the source of the wrong data types, I need a simpler program that will allow me to debug with more ease. I am working on writing a basic Flink plan in Python that will still reproduce the same issue.
Last summer: The exception I am mainly encountering is:
I have exhausted all of the workarounds I could think of that might have tricked Flink. I am not sure why this issue happens, but it appears to be something in the deserializer that handles Python-Java communication.
Up to 9/14: I placed some print statements inside of https://github.com/GEOFBOT/flink/blob/master/flink-libraries/flink-python/src/main/java/org/apache/flink/python/api/streaming/data/PythonReceiver.java#L92 (which is in the stack trace) to see what was going on. Interestingly, the deserializer in question receives the correct data type until it reaches the join operation between the iterative data set and the newly calculated iteration at the end of the pyflink-r1dl algorithm. At that point, the byte array deserializer receives a tuple (as seen in the exception).
I put in a workaround to convert a tuple, if found, into a byte array, but then I got an exception that at some point, a byte array was being passed into the tuple deserializer. That means both types are being passed at the wrong times to the wrong deserializers. Adding the same workaround to convert data types to both functions didn't work, so there must be something deeper.
In order to find the source of the wrong data types, I need a simpler program that will allow me to debug with more ease. I am working on writing a basic Flink plan in Python that will still reproduce the same issue.