AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

Jobs are failing during the schema registration while writing to kafka from a batch dataframe #165

Closed srinugajjala closed 3 years ago

srinugajjala commented 3 years ago

Hi,

We have built an ETL tool(https://github.com/homeaway/datapull) for moving data across many data platforms and we are trying to include Kafka as part of our platform.

we are trying to use this library for writing data to Kafka from a batch dataframe and couldn't get it to work because of the following error.


Exception in thread "main" java.lang.NoSuchFieldError: FACTORY
        at org.apache.avro.Schemas.toString(Schemas.java:36)
        at org.apache.avro.Schemas.toString(Schemas.java:30)
        at io.confluent.kafka.schemaregistry.avro.AvroSchema.canonicalString(AvroSchema.java:140)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244)
        at io.confluent.kafka.schemaregistry.client.SchemaRegistryClient.register(SchemaRegistryClient.java:42)
        at za.co.absa.abris.avro.read.confluent.SchemaManager.register(SchemaManager.scala:77)
        at za.co.absa.abris.avro.read.confluent.SchemaManager.$anonfun$getIfExistsOrElseRegisterSchema$1(SchemaManager.scala:124)
        at scala.runtime.java8.JFunction0$mcI$sp.apply(JFunction0$mcI$sp.java:23)
        at scala.Option.getOrElse(Option.scala:189)
        at za.co.absa.abris.avro.read.confluent.SchemaManager.getIfExistsOrElseRegisterSchema(SchemaManager.scala:124)
        at za.co.absa.abris.config.ToSchemaRegisteringConfigFragment.usingSchemaRegistry(Config.scala:135)
        at za.co.absa.abris.config.ToSchemaRegisteringConfigFragment.usingSchemaRegistry(Config.scala:131)
        at org.example.App$.main(App.scala:37)
        at org.example.App.main(App.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)```

This is a bare minimum project which we have written for testing this: https://github.com/markovarghese/simplespark before we implement this as part of our tool.

Any immediate help would be greatly appreciated as we are trying to onboard a few users who are on-hold for this functionality. and please feel free to contact us for any questions.

Thanks for looking into this.

Regards,
Srini
cerveada commented 3 years ago

Hello, Can you check which version of org.apache.avro library is on your classpath? It should ideally be 1.9.2+

markovarghese commented 3 years ago

Thank you for looking into this issue. The version of org.apache.avro library seems to be 1.9.2, ref the maven dependency tree https://github.com/markovarghese/simplespark#maven-dependency-tree . We have only 2 dependencies in the pom file at their latest versions, ref https://github.com/markovarghese/simplespark/blob/main/pom.xml#L29-L40 ; and we are running this in a docker spark container at Spark 3.0.1, ref https://github.com/markovarghese/simplespark/blob/main/docker_spark_server/Dockerfile#L52 .

cerveada commented 3 years ago

I build simplespark project locally and Schema.FACTORY is in the jar so the problem must be somewhere else. I suspect it has to do something with a way Spark is loading these classes.

Could you try to provide Abris to spark-submit using --packages and then use your application as a normal jar (without dependencies included)

cerveada commented 3 years ago

This may be related https://github.com/AbsaOSS/ABRiS/issues/142

markovarghese commented 3 years ago

Thank you for your advice. On a docker spark container with Spark 3.0.1, Scala 2.12, Hadoop 2.3.1, I tried to provide Abris to spark-submit using --packages and then use the application as a normal jar (without dependencies included). The command I used was

docker run -v $(pwd):/core -w /core -it --rm --network docker_kafka_server_default  spark3.0.1-scala2.12-hadoop3.2.1:latest spark-submit --repositories https://packages.confluent.io/maven --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,za.co.absa:abris_2.12:4.0.0 --deploy-mode client --class org.example.App target/simplespark-1.0-SNAPSHOT.jar

It fails while spark-submits loads the Abris 4.0.0 package (i.e. even before reaching the App code with Schema.FACTORY etc.) , with the error

Exception in thread "main" java.lang.RuntimeException: [download failed: javax.ws.rs#javax.ws.rs-api;2.1.1!javax.ws.rs-api.${packaging.type}]
        at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1389)
       ...

I have documented the full error at https://github.com/markovarghese/simplespark/tree/main#running-the-jar-without-dependencies

We get the same error when we try to load spark-shell using the command

docker run -v $(pwd):/core -w /core -it --rm --network docker_kafka_server_default  spark3.0.1-scala2.12-hadoop3.2.1:latest spark-shell --repositories https://packages.confluent.io/maven --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,za.co.absa:abris_2.12:4.0.0

However, if we use Abris 3.2.2 instead of 4.0.0, spark shell loads fine with the command

docker run -v $(pwd):/core -w /core -it --rm --network docker_kafka_server_default  spark3.0.1-scala2.12-hadoop3.2.1:latest spark-shell --repositories https://packages.confluent.io/maven --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,za.co.absa:abris_2.12:3.2.2

After seeing this behavior and #142 , we tried the same logic, but using Abris 3.2.2 and a docker spark container with Spark 2.4.6, Scala 2.11, Hadoop 2.10.0 (ref https://github.com/markovarghese/simplespark/tree/spark2.4.6-scala2.11-hadoop2.10.0-abris3.2.2 ) . Barring a relatively minor (and probably self-inflicted) issue related to schema registration, it works fine, when using either the JAR with dependencies, or the JAR without dependencies.

cerveada commented 3 years ago

I will update Abris to use the 5.5.2 version, but for now you can try to specify it directly in packages.

--packages io.confluent:kafka-schema-registry-client:5.5.2, ...
markovarghese commented 3 years ago

Thank you for looking into this. Your suggestion to use io.confluent:kafka-schema-registry-client:5.5.2 worked for remove the error regarding javax.ws.rs-api:2.1.1.

Using the suggestion, I tried to provide Abris to spark-submit using --packages and then use the application as a normal jar (without dependencies included). The command I used was

docker run -v $(pwd):/core -w /core -it --rm --network docker_kafka_server_default  spark3.0.1-scala2.12-hadoop3.2.1:latest spark-submit --repositories https://packages.confluent.io/maven --packages io.confluent:kafka-schema-registry-client:5.5.2,org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,za.co.absa:abris_2.12:4.0.0 --deploy-mode client --class org.example.App target/simplespark-1.0-SNAPSHOT.jar

It still gave the same Exception in thread "main" java.lang.NoSuchFieldError: FACTORY error as with the JAR with the dependencies.

I also tried running the JARs (both with dependencies and without) against Spark node at 2.4.6, Scala 2.11, Hadoop 2.10.0. The results were the same. I've documented this experiment at https://github.com/markovarghese/simplespark/blob/spark2.4.6-scala2.11-hadoop2.10.0-abris4.0.0/README.md

cerveada commented 3 years ago

I found there is a hidden breaking change between Avro 1.8 and 1.9. The FACTORY field in Schema class has different type:

1.8.2  org.codehaus.jackson.JsonFactory
1.9.2  com.fasterxml.jackson.core.JsonFactory

This explains the java.lang.NoSuchFieldError: FACTORY Error.

As far as I know this makes schema registry 5.4.x+ incompatible with Spark (because Spark uses Avro 1.8.x)

So I am going to downgrade to schema registry 5.3.x, that should make everything compatible again.

cerveada commented 3 years ago

Another library causing issue, although only for abris internal tests, is:

com.thoughtworks.paranamer : paranamer : 2.7

Version 2.7 comes with Avro 1.8 and this version seems to have some bug, version 2.8 used by Spark seems to be ok. I will change import order and put Spark first that will give it priority and 2.8 will be used.

cerveada commented 3 years ago

@markovarghese version 4.0.1 that should fix this issue was released.

Would you mind testing if it works as expected?

markovarghese commented 3 years ago

Thank you for debugging and fixing this issue! it works now!

I've documented the successful run at https://github.com/markovarghese/simplespark/tree/spark3.0.1-scala2.12-hadoop2.10.0-abris4.0.1

There is just one gotcha, probably related to Hadoop than this library

Minor Issue

If you run the Spark application on a spark cluster with Hadoop 3.0.0 to 3.2.1, you will get the following runtime error

2020-11-14 23:32:33,626 ERROR executor.Executor: Exception in task 6.0 in stage 3.0 (TID 22)
java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.KafkaProducer.flush()V
        at org.apache.spark.sql.kafka010.KafkaWriteTask.$anonfun$close$1(KafkaWriteTask.scala:61)
        at org.apache.spark.sql.kafka010.KafkaWriteTask.$anonfun$close$1$adapted(KafkaWriteTask.scala:60)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.sql.kafka010.KafkaWriteTask.close(KafkaWriteTask.scala:60)
        at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$3(KafkaWriter.scala:73)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1386)
        at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$1(KafkaWriter.scala:73)
        at org.apache.spark.sql.kafka010.KafkaWriter$.$anonfun$write$1$adapted(KafkaWriter.scala:70)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:994)
        at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:994)
        at org.apache.spark.SparkContext.$anonfun$runJob$5(SparkContext.scala:2139)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:446)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:449)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Workaround

Run the spark application on a spark cluster with Hadoop 3.3.0 or Hadoop 2.x (I've tested successfully with 2.8.5 and 2.10.0)

cerveada commented 3 years ago

Thanks for help. I will close this issue, feel free to open a new one if needed.