AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

How to deserialize multiple kafka topics consuming from same structured streaming stream. #139

Closed sivasai-quartic closed 4 years ago

sivasai-quartic commented 4 years ago

I"m reading the data from multiple Kafka topics in a structured streaming job and the schema for all these topics is not the same. How to deserialize all the messages other than filtering the data by topic and applying deserialization. Is there an internal method available to deserialize respective topics like below ?

batchDF.withColumn("value", from_confluent_avro(col("value"),schemaConfigByTopic(col("topic"), config)))

Thanks

cerveada commented 4 years ago

So you want to save data from multiple Kafka topics with different schemas to one dataframe? Wouldn't it cause problems with the dataframe's schema?

sivasai-quartic commented 4 years ago

I can cast after deserialization as single string like below.

batchDF.withColumn("value", from_confluent_avro(col("value"),schemaConfigByTopic(col("topic"), config)).cast(StringType()))

Then it will have the same schema for that column

cerveada commented 4 years ago

There is no schemaConfigByTopic. In this case I would recommend creating one dataframe per topic each with its own Abris config. Then you can cast them and merge them together.

sivasai-quartic commented 4 years ago

Ok thanks.