sksamuel / avro4s

Avro schema generation and serialization / deserialization for Scala
Apache License 2.0
719 stars 238 forks source link

Apache Flink & SchemaRegistry with GenericRecords and Avro4s #783

Open andrelu opened 1 year ago

andrelu commented 1 year ago

Hello forks, first thanks for this awesome library. Its help me build a lot of automation around Scala case classes and avro. BTW I know that this library is in maintenance mode; never than less I'm here with some doubts about it. Said that lets say that I have a Flink Job that outputs Data to many topic and each output is represented by a case class as given:

case class Topic1(attributeA: String, attributeB: String)
case class Topic2(attributeA: String, attributeB: String, attributeC: Int)
case class Topic3(attributeA: String, attributeB: String, attributeD: Long)

All my outputs are backed by Schema Registry so we can keep track of schemas outputted by our jobs. To output the data to Kafka with Schema registry I had to create a SerializationSchema that first converts the case class to GenericRecord and then use ConfluentRegistryAvroSerializationSchema to proper produce the data to Kafka. The problem is that I'm still facing serialization issues with the RecordEncoder class from avro4s. I have managed to handle org.apache.avro.Schema serialization issues but I could not find a way of handling this situation with RecordEncoder. here is how my SerializationSchema class is right now:

class CustomAvroSerializationSchema[T >: Null: SchemaFor: Encoder](subjectName: String, schemaRegistryUrl: String) extends SerializationSchema[T]:

  val schema = SchemaFor[T]
  val encoder = Encoder[T]

  private val confluentSerialization: ConfluentRegistryAvroSerializationSchema[GenericRecord] =
    ConfluentRegistryAvroSerializationSchema
      .forGeneric(
        subjectName,
        schema.schema,
        schemaRegistryUrl
      )

  override def open(context: SerializationSchema.InitializationContext): Unit =
    confluentSerialization.open(context)
    super.open(context)

  override def serialize(element: T): Array[Byte] =
    val record = encoder.encode(schema.schema).asInstanceOf[Record]
    confluentSerialization.serialize(record)

Question is: Is there any other way to use Avro4s with schemaregistry and Kafka serializers? I saw SerDe implementation but I could not figure out how to adapt to my situation.