AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

Unexpected behavior of `to_confluent_avro` #129

Closed Vincent-Zeng closed 4 years ago

Vincent-Zeng commented 4 years ago

Hi, team.

See

  def to_confluent_avro(
    data: Column,
    schemaRegistryConf: Map[String,String]
  ): Column = {

    val schemaProvider: SchemaProvider = createSchemaProvider(schemaRegistryConf)

    new Column(CatalystDataToAvro(
      data.expr, schemaProvider, Some(schemaRegistryConf), confluentCompliant = true))
  }
case class CatalystDataToAvro(
   child: Expression,
   schemaProvider: SchemaProvider,
   schemaRegistryConf: Option[Map[String,String]],
   confluentCompliant: Boolean)
  extends UnaryExpression {

  override def dataType: DataType = BinaryType

  private lazy val schemaId = schemaRegistryConf.flatMap { _ =>
    schemaManager.schemaId
      .orElse(registerSchema(schemaProvider.wrappedSchema(child)))
      .filter(_ => confluentCompliant)
  }

  @transient private lazy val serializer: AvroSerializer =
    new AvroSerializer(child.dataType, schemaProvider.originalSchema(child), child.nullable)

  @transient private lazy val schemaManager = SchemaManagerFactory.create(schemaRegistryConf.get)

   ...
}

The schemaProvider provided as a parameter, while schemaId provided as a private field. It means that schemaProvider initialize once for the whole execution while schemaId initialize for each structured streaming batch.

Let's see a case: Step 1. Specify the config value.schema.id as latest, and now latest schema id is 10 Step 2. schemaProvider fetch the schema which schema id is 10 from Schema Registry Step 3. batch 1: private lazy val schemaId is initialized with schema id 10, now magic number is 10 when serialize. Step 4. envolve the schema in Schema Registry, now latest schema id is 11. Step 5. batch2: private lazy val schemaId is initialized with schema id 11, now magic number is 11 when serialize, while schemaProvider still provide the schema which schema id is 10 to encode the data.

So, maybe schemaId should initialize in driver and passed as a parameter instead init inside the expression, is that?

Vincent-Zeng commented 4 years ago

Maybe we can pass schemaManager as a parameter.

cerveada commented 4 years ago

I see. I think best sollution would be to add oprional schemaId in SchemaProvider and populat that field in createSchemaProvider(...) than inside CatalystDataToAvro we can get that id from the provider instead of manager.

schemaManager is not serializable. It's intentionaly done like that so it's initialized again in CatalystDataToAvro when it's needed. I would not send it as a parameter.

Vincent-Zeng commented 4 years ago

@cerveada Yeah. Your solution is great.

racevedoo commented 4 years ago

Hi all! Is this fixed? can we get a new release containing the fix?

Right now I'm having a minor issue, as the schema id is being resolved multiple times (not sure why). Sometimes the request to schema registry fails, leading to the following error: image

Maybe a solution that mitigates this is resolving the schema only once per executor.

If this is not related to this issue, please tell me so I can provide a more detailed report. Thanks!

felipemmelo commented 4 years ago

Hi @racevedoo , version 3.2.2 has just been published, and it contains the fixes. Cheers.

racevedoo commented 4 years ago

Hi @racevedoo , version 3.2.2 has just been published, and it contains the fixes. Cheers.

Great! Thanks for the quick release. Can we close this issue? :smile:

EDIT: I can't find the release on Github or maven central (https://search.maven.org/artifact/za.co.absa/abris_2.11)

cerveada commented 4 years ago

It takes some time to appear there, but it may already be ready for download from command line maven.