AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

java.lang.ClassNotFoundException: io.confluent.kafka.schemaregistry.ParsedSchema #163

Closed GopinathMC closed 3 years ago

GopinathMC commented 3 years ago

Hi,

I am passing the ABRiS package as a jar file when opening the spark shell --jars /home/dev/Documents/Plugins/jars/abris_2.11-4.0.0.jar and I could successfully able to import the ABRiS library in my spark code. But when I am trying to run the below piece of code, I am receiving class not found exception. Please find the detailed ERROR logs below val abrisConfig = AbrisConfig .fromConfluentAvro .downloadReaderSchemaByLatestVersion .andTopicNameStrategy(kafkaTopicName) .usingSchemaRegistry("http://localhost:8081")

ERROR LOG:

java.lang.NoClassDefFoundError: io/confluent/kafka/schemaregistry/ParsedSchema at za.co.absa.abris.config.FromStrategyConfigFragment.andTopicNameStrategy(Config.scala:165) ... 49 elided Caused by: java.lang.ClassNotFoundException: io.confluent.kafka.schemaregistry.ParsedSchema at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 50 more

Also when I try to import this package in my Data Bricks workspace by passing the Maven coordinate(za.co.absa:abris_2.11:4.0.0), some of the dependencies are not resolved.

ERROR in Data Bricks:

Maven Dependencies not found: unresolved dependency: io.confluent:kafka-avro-serializer:5.5.1: not found unresolved dependency: io.confluent:kafka-schema-registry-client:5.5.1: not found unresolved dependency: io.confluent:common-config:5.5.1: not found unresolved dependency: io.confluent:common-utils:5.5.1: not found Please try again by excluding these dependencies.

Could you please help me to fix these issues, I am not able to find much documentation on this. Thanks in Advance!!

GopinathMC commented 3 years ago

I tried passing the abris package along with confluent repository url as suggested by this git issue(https://github.com/AbsaOSS/ABRiS/issues/134), still having some issues with latest abris package. Please check this issue #134 for latest comment.

cerveada commented 3 years ago

Hello,

Abris is a library designed to be used via maven. It declares its dependencies and the maven can fetch them for you, but if you use it via Spark packages. Spark will not do this for you so you must declare the dependencies manually. (Other alternative would be to create an "uber jar" and include the dependencies in the jar)

All dependencies are declared in pom.xml so you can take a look there. ParsedSchema specifically is from this library:

io.confluent:kafka-schema-registry-client:5.5.1

If you add it to the packages (same way as it's done in 134) it should solve your problem.

GopinathMC commented 3 years ago

Thanks @cerveada , it worked quite well. However I am facing other issues now. When I am changing the schema in the table(add one column), the new version of the schema is successfully stored in confluent schema registry but this is not reflecting in my spark streaming application on the fly. (i.e)Whenever schema changes occurs, spark streaming data frame still pointing to the old schema.

Code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
import za.co.absa.abris.avro.registry.SchemaSubject
import za.co.absa.abris.config.AbrisConfig
import za.co.absa.abris.avro.functions.from_avro

val kafkaTopicName = "myTopic"
val schemaManager = SchemaManagerFactory.create(Map(AbrisConfig.SCHEMA_REGISTRY_URL ->"http://localhost:8081"))

val kafkaRead = spark.readStream.format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "staging-development-rds-cluster.ml.sample_api_audit_logs")
  .option("startingOffsets", "latest").load()

val abrisConfig = AbrisConfig.fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy(kafkaTopicName)
  .usingSchemaRegistry("http://localhost:8081")

val deserialized = kafkaRead.select(from_avro(col("value"), abrisConfig) as 'data)

deserialized.writeStream.format("console").option("truncate", "false").start().awaitTermination()

In spark shell console, I am not able to see the newly added column(whenever schema changes happens) though the new column is visible in schema-registry URL. Could you please clarify me where I am missing.

Thanks in Advance!!

cerveada commented 3 years ago

I am not 100% sure I understand what you want to do, but the reader schema is downloaded just once and must stay the same (This is because spark data frame has also only one schema that cannot be change through its lifetime.)

The writer schema can change, but it must be compatible with reader schema, so at the end Abris can convert the data to reader schema format.

Also, please read this, it might help: https://github.com/AbsaOSS/ABRiS#multiple-schemas-in-one-topic

GopinathMC commented 3 years ago

Thank you @cerveada . Really appreciate your quick responses. I need to look for the other possibilities to meet my requirement. Thanks team for this wonderful product ABRiS.