AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
228 stars 74 forks source link

Error while trying to use Schema Reference! #232

Closed detoxfarm3 closed 3 years ago

detoxfarm3 commented 3 years ago

Hi

I tried to use multiple schema for different event in single topic.

1 So far, I have explored TopicRecordNameStrategy, where I can use a multiple schemas in a topic, but, this doesn't work with PySpark; it throws exception when trying to de-serialize multiple types of messages present in a single topic. This is due to the fact that a stream is bounded to a single schema only; I have seem similar question in the Issues as well where the conclusion is that we cant use it for this purpose!

Adding the error log for reference-

2021-07-19 11:24:21 ERROR TaskSetManager:73 - Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/home/ronak/Desktop/POC_KAFKA/avro_abris_test.py", line 117, in <module>
    withColumn('value', to_json(col('value'))).show()
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 484, in show
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o94.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (10.0.2.15 executor driver): 
org.apache.spark.SparkException: Malformed records are detected in record parsing.
        at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:82)
        at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:476)
        at org.apache.spark.sql.catalyst.expressions.UnaryExpression.eval(Expression.scala:472)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:346)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.avro.AvroTypeException: Found com.test.High, expecting com.test.Test, missing required field test
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:292)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at org.apache.avro.io.ResolvingDecoder.readFieldOrder(ResolvingDecoder.java:130)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:215)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at za.co.absa.abris.avro.sql.AvroDataToCatalyst.decodeConfluentAvro(AvroDataToCatalyst.scala:145)
        at za.co.absa.abris.avro.sql.AvroDataToCatalyst.decode(AvroDataToCatalyst.scala:122)
        at za.co.absa.abris.avro.sql.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:74)
        ... 19 more

2 Tried Schema reference with TopicNameStrategy; where top level schema is a union of reference schemas ex. schema: ["<namespace>.<schema name>"]. But, this is failing with the following error; which to me looks as if reference schema is not yet supported. I have tried exploring some of the Abris code/example & didn't see any such mention of schema reference.

For, from_avro below config code is used-

    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    scala_map = jvm_gateway.PythonUtils.toScalaMap(config_map)
    return jvm_gateway.za.co.absa.abris.config \
        .AbrisConfig \
        .fromConfluentAvro() \
        .downloadReaderSchemaByLatestVersion() \
        .andTopicNameStrategy(topic, is_key) \
        .usingSchemaRegistry(scala_map)

from_avro code-

    jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    abris_avro = jvm_gateway.za.co.absa.abris.avro
    return Column(abris_avro.functions.from_avro(_to_java_column(col), config))jvm_gateway = SparkContext._active_spark_context._gateway.jvm
    abris_avro = jvm_gateway.za.co.absa.abris.avro
    return Column(abris_avro.functions.from_avro(_to_java_column(col), config))

Below is the error I got when trying to use schema reference

Traceback (most recent call last):
  File "/home/ronak/Desktop/POC_KAFKA/avro_topic_name.py", line 66, in <module>
    f_c = from_avro_abris_config({'schema.registry.url': 'http://localhost:8081'}, 'apic')
  File "/home/ronak/Desktop/POC_KAFKA/abris_topic_name.py", line 34, in from_avro_abris_config
    .usingSchemaRegistry(scala_map)
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 111, in deco
  File "/home/ronak/Desktop/POC_KAFKA/.venv/lib/python3.7/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o69.usingSchemaRegistry.
: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "references" (class io.confluent.kafka.schemaregistry.client.rest.entities.Schema), not marked as ignorable (4 known properties: "version", "schema", "id", "subject"])
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 142] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.Schema["references"])
        at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
        at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:840)
        at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1179)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1592)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1542)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:438)
        at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
        at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
        at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4202)
        at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3250)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:221)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:265)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:524)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:516)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:275)
        at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaMetadataBySubjectAndVersion(SchemaManager.scala:64)
        at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaBySubjectAndVersion(SchemaManager.scala:53)
        at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchema(SchemaManager.scala:44)
        at za.co.absa.abris.config.FromSchemaDownloadingConfigFragment.usingSchemaRegistry(Config.scala:249)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
cerveada commented 3 years ago

1) Yes, the only way to do this is to separate the data in two dataframes each with one schema and then call abris separately on each of them. If the key is same for both it would be possible to do it by key, I think. But it depends on your use case.

2) Abris currently uses confluent 5.3. To be compatible with spark libraries. Schema reference are available in confluent 5.5 and higher. (for more details look at #175)

detoxfarm3 commented 3 years ago

@cerveada Thanks for the info. We were able to create multiple streams by filtering the message. But we used Kafka headers to add metadata and used the same for filtering.