AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

Python binding results in RuntimeException from org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType #37

Closed florissmit10 closed 5 years ago

florissmit10 commented 5 years ago

We are calling your library from python like mentioned in this StackOverflow post.

I have changed the function to use the schema_path and it looks as follows:

def expand_avro_local(spark_context, sql_context, data_frame, avsc_file_path ):
    j = spark_context._gateway.jvm
    dataframe_deserializer = j.za.co.absa.abris.avro.AvroSerDe.DataframeDeserializer(data_frame._jdf)
    schemaPath = j.scala.Some.apply(avsc_file_path)
    conf = j.scala.Option.apply(None)
    policy = getattr(j.za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies, "RETAIN_SELECTED_COLUMN_ONLY$")()
    data_frame = dataframe_deserializer.fromConfluentAvro("value", schemaPath, conf, policy)
    data_frame = DataFrame(data_frame, sql_context)
    return data_frame

raw_df = apply_kafka_settings(spark_session.read).load().limit(100).cache()

raw_df.schema # StructType(List(StructField(value,BinaryType,true)))

extract_df = expand_avro_local(sc, spark_session, raw_df, transaction_schema_path)

extract_df.schema # StructType(List(StructField(operations,ArrayType(BinaryType,false),false)))

extract_df.count() # 100

extract_df.show()

Everything up to the last line works, but then the following exception is thrown:

19/06/26 13:43:24 ERROR Executor: Exception in task 0.0 in stage 31.0 (TID 16)
java.lang.RuntimeException: java.nio.HeapByteBuffer is not a valid external type for schema of binary
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
19/06/26 13:43:24 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 16, localhost, executor driver): java.lang.RuntimeException: java.nio.HeapByteBuffer is not a valid external type for schema of binary
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.serializefromobject_doConsume_0$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:212)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

This error seems super weird to me, because a ByteBuffer should be castable to a binary field. I found that the check happens in org.apache.spark.sql.catalyst.expressions.objects.ValidateExternalType but somehow fails.

Does anyone have an idea what might be causing this? Am I doing the python binding wrong somehow, or is it a deeper issue? Thanks!

felipemmelo commented 5 years ago

Hi @florissmit10 , can I have a look at the schema you're using so that I can try to replicate the issue? Also, are you writing the data in "Confluent" way, i.e. attaching the schema id to the top of the payload?

florissmit10 commented 5 years ago

Hi @felipemmelo,

Thanks for responding!

We are writing the data from a java application and we use the confluent serializers. We use a nested binary avro message. (Dont ask me why) That might be what causes this specific error. { "type":"record", "name":"Transaction", "namespace":"transaction", "fields":[ { "name":"operations", "type":{"type":"array","items":"bytes"}, "doc":"Subject operation, encoded with Confluent-Avro wire format." } ] }

In the meantime I have also tried using scala, but I get yet a different error, so maybe there is something wrong with our avro. Iam going to give Databricks a go, maybe that will work better!