AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

AvroTypeException when reading Confluent Avro Records #121

Closed mostafa-asg closed 4 years ago

mostafa-asg commented 4 years ago

Hello, thanks for open sourcing this project. We are using ABRiS to read Confluent Avro Records from Kafka. The records are Salesforce, produced by Kafka Connect. When we read data with the help of from_confluent_avro, we get this error: org.apache.avro.AvroTypeException: Found io.confluent.ksql.avro_schemas.KsqlDataSourceSchema, expecting defaultNamespace.defaultName, missing required field LP_PostalProvince__c

Here are configs:

    val commonRegistryConfig = Map(
      SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "registry URL",
      SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "topicName",
      "basic.auth.credentials.source" -> "USER_INFO",
      "schema.registry.basic.auth.user.info" -> "some value",
    )
    val valueRegistryConfig = commonRegistryConfig ++ Map(
      SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> "topic.name",
      SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest",
      SchemaManager.PARAM_VALUE_SCHEMA_NAME_FOR_RECORD_STRATEGY -> "value",
    )

And usage

val newDf = df.select(from_confluent_avro(df("value"),valueRegistryConfig).alias("data")).select("data.*")

Full error:

Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing.
    at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:68)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$2(WriteToDataSourceV2Exec.scala:117)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.$anonfun$doExecute$2(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroTypeException: Found io.confluent.ksql.avro_schemas.KsqlDataSourceSchema, expecting defaultNamespace.defaultName, missing required field LP_PostalProvince__c
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
    at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:128)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
    at za.co.absa.abris.avro.sql.AvroDataToCatalyst.decodeConfluentAvro(AvroDataToCatalyst.scala:122)
    at za.co.absa.abris.avro.sql.AvroDataToCatalyst.decode(AvroDataToCatalyst.scala:101)
    at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:59)
    ... 15 more

Any idea what's the problem?

cerveada commented 4 years ago

Hello, the avro data are missing required field LP_PostalProvince__c. That means the data are incorrect or you have wrong schema. Either way it's not an Abris issue.

Could you double check that the schema coordinates that you are using are referring to the correct schema?

If you still think that it's Abris issue let us please know the version you are using.

mostafa-asg commented 4 years ago

Thanks for your answer, I debugged the code, in class AvroDataToCatalyst, method decodeConfluentAvro, it gets writerSchema correctly, but avroSchema in the next line is totally different from writerSchema:

reader = new GenericDatumReader[Any](writerSchema, avroSchema)

Here is just the head of those schema: writerSchema:

{
  "type": "record",
  "name": "Account",
  "namespace": "io.confluent.salesforce",
  "fields": [
    {
      "name": "Id",
      "type": {
        "type": "string",
        "connect.doc": "Unique identifier for the object."
      },
      "doc": "Unique identifier for the object."
    },
    {
      "name": "IsDeleted",
      "type": [
        "null",
        "boolean"
      ],
      "default": null
    },
    .....

avroSchema:

{
  "type": "record",
  "name": "defaultName",
  "namespace": "defaultNamespace",
  "fields": [
    {
      "name": "Id",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "Company",
      "type": [
        "string",
        "null"
      ]
    },
    .....

I also deserialized the record manually, it deserialized correctly, so the binary format is correct. I think I missed something in configuration. From where avroSchema was populated?

cerveada commented 4 years ago

What version of Abris do you use? It may be important.

Both schemas must be compatible. If you don't care about schema evolution you can just use the same schema.

To get the correct schema you have to fix the config, take a look at the documentation https://docs.confluent.io/current/schema-registry/serdes-develop/index.html the naming strategies is what you need to configure. And also id or version.

you can try it like this:

val schema = SchemaManagerFactory.create(schemaRegistryConf).downloadSchema()

If you provide correct schemaRegistryConf it should return the correct schema.

mostafa-asg commented 4 years ago

@cerveada Thanks for your help Sorry, but that was my mistake. I set the topic name here wrongly: SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "topicName",