AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

[WARNING] Avro: Invalid default for field metadata: "null" not a ["null" #34

Closed madhuhad closed 5 years ago

madhuhad commented 5 years ago

Hi, We have a use case where we need to consume avro data from kafka topic and ingest into hdfs using spark 2.2 version. I have followed all the instructions which you mentioned in the readme page. I am getting below warning started failing. [WARNING] Avro: Invalid default for field metadata: "null" not a ["null"

java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:347) at scala.None$.get(Option.scala:345) at org.apache.spark.sql.catalyst.analysis.TypeCoercion$$anonfun$1$$anonfun$apply$12.apply(TypeCoercion.scala:107) at org.apache.spark.sql.catalyst.analysis.TypeCoercion$$anonfun$1$$anonfun$apply$12.apply(TypeCoercion.scala:102) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.catalyst.analysis.TypeCoercion$$anonfun$1.apply(TypeCoercion.scala:102) at org.apache.spark.sql.catalyst.analysis.TypeCoercion$$anonfun$1.apply(TypeCoercion.scala:82) at org.apache.spark.sql.catalyst.analysis.TypeCoercion$.findWiderTypeForTwo(TypeCoercion.scala:150) at org.apache.spark.sql.catalyst.analysis.TypeCoercion$IfCoercion$$anonfun$apply$9.applyOrElse(TypeCoercion.scala:649) at org.apache.spark.sql.catalyst.analysis.TypeCoercion$IfCoercion$$anonfun$apply$9.applyOrElse(TypeCoercion.scala:645) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272) at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304) at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:272) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsDown$1.apply(QueryPlan.scala:258) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:279) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:289) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:293) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:293) at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$6.apply(QueryPlan.scala:298) at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187) at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:298) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDown(QueryPlan.scala:258) at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressions(QueryPlan.scala:249) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveExpressions$1.applyOrElse(LogicalPlan.scala:80) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveExpressions$1.applyOrElse(LogicalPlan.scala:79) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:62) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:61) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveExpressions(LogicalPlan.scala:79) at org.apache.spark.sql.catalyst.analysis.TypeCoercion$IfCoercion$.apply(TypeCoercion.scala:645) at org.apache.spark.sql.catalyst.analysis.TypeCoercion$IfCoercion$.apply(TypeCoercion.scala:644) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124) at scala.collection.immutable.List.foldLeft(List.scala:84) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82) at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74) at scala.collection.immutable.List.foreach(List.scala:381) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:69) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:67) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50) at org.apache.spark.sql.Dataset.(Dataset.scala:164) at org.apache.spark.sql.Dataset.(Dataset.scala:170) at org.apache.spark.sql.Dataset.mapPartitions(Dataset.scala:2253) at za.co.absa.abris.avro.serde.AvroDecoder.fromAvroToRow(AvroDecoder.scala:313) at za.co.absa.abris.avro.serde.AvroDecoder.fromAvroToRow(AvroDecoder.scala:326) at za.co.absa.abris.avro.AvroSerDe$StreamDeserializer.fromAvro(AvroSerDe.scala:127) ... 52 elided

Code:

export JAR_FILES=/home/madhuhad04/config-1.3.1.jar,/home/madhuhad04/spark-sql-kafka-0-10_2.11-2.2.0.jar,/home/madhuhad04/spark-streaming-kafka-0-10-assembly_2.11-2.2.1.jar,/home/madhuhad04/abris_2.11-2.2.2.jar,/home/madhuhad04/spark-avro_2.11-4.0.0.jar

export SPARK_MAJOR_VERSION=2

spark-shell --verbose --conf spark.ui.port=4096 --jars ${JAR_FILES}

import za.co.absa.abris.avro.AvroSerDe. import za.co.absa.abris.avro.read.confluent.SchemaManager import za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies.

import org.apache.spark.sql.avro._

val stream = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "test-topic").fromAvro("value", "/db/madhuhad04/test.avsc")(RETAIN_SELECTED_COLUMN_ONLY)

Can you please help me to resolve this issue.

felipemmelo commented 5 years ago

Hi @madhuhad , can I see the content of "/db/madhuhad04/test.avsc"?

felipemmelo commented 5 years ago

Closing for now, if the issue persists, feel free to reopen it.