I have a beam pipeline running on DataFlow using the Java SDK that pulls Proto wrapper messages from a PubSub subscription, I partition these by the OneOf-value and then apply a MapElements to extract the underlying Proto message, so that I end up with a PCollectionList<T extends Message>. I then do some more processing and try to write them to different sinks. BigQueryIO works absolutely fine. However when I try to use the PubsubIO or ParquetIO, I end up with this error when using FileIO (for Parquet):
ParameterizedType pType = (ParameterizedType) subclass.getGenericSuperclass(); Class<T> tClass = (Class<T>)
pType.getActualTypeArguments()[0]; // In the case where the constructor for `T` takes no arguments.
parser = tClass.newInstance().getParserForType();
Thank you for taking the time to read this. I'd be more than happy to contribute to a solution, but I'm not a Proto/Beam or even Java super-user so I would need some assistance.
Imported from Jira BEAM-7938. Original Jira may contain additional context.
Reported by: paliendroom.
Has there been any update / fix planned for this? Facing the same issue when using T extends Message generics with PCollectionTuple output (multiple PCollections in the output) with Flink runner
Context
I have a beam pipeline running on DataFlow using the Java SDK that pulls Proto wrapper messages from a PubSub subscription, I partition these by the OneOf-value and then apply a MapElements to extract the underlying Proto message, so that I end up with a PCollectionList<T extends Message>. I then do some more processing and try to write them to different sinks. BigQueryIO works absolutely fine. However when I try to use the PubsubIO or ParquetIO, I end up with this error when using FileIO (for Parquet):
and this for PubsubIO:
Source Code for Error
ProtoCoder.java (lines 278-292: https://github.com/apache/beam/blob/968a80611d424764962e79b726bfa4fd17ced104/sdks/java/extensions/protobuf/src/main/java/org/apache/beam/sdk/extensions/protobuf/ProtoCoder.java#L278)
Potential Solution?
I am guessing the "hack" they propose on Stackoverflow (https://stackoverflow.com/questions/44134712/generic-protobuf-deserializer-in-java) could potentially solve the problem?
Thank you for taking the time to read this. I'd be more than happy to contribute to a solution, but I'm not a Proto/Beam or even Java super-user so I would need some assistance.
Imported from Jira BEAM-7938. Original Jira may contain additional context. Reported by: paliendroom.