Closed srikanth-db closed 3 years ago
When using Abris 4 be sure to configuring properly.
The configuration for schema registry is in separated map. As shown here https://github.com/AbsaOSS/ABRiS#schema-registry-security-and-other-additional-settings That's where all the security stuff goes.
The concrete schema coordinates are set using the configurator, as you can see in the Abris examples.
I can't help you much with the security configuration. It is managed by Confluent Schema Registry client that is used by Abris. You can read more in https://github.com/AbsaOSS/ABRiS/issues/179
I also find this by quick search https://stackoverflow.com/questions/9619030/resolving-javax-net-ssl-sslhandshakeexception-sun-security-validator-validatore
I was able to fix by adding
val registryConfig = Map( AbrisConfig.SCHEMA_REGISTRY_URL -> schemaRegistry, "schema.registry.topic" -> topic, "schema.registry.ssl.truststore.location" -> ("ssl.truststore.location"), "schema.registry.ssl.truststore.password" -> ("ssl.truststore.password"), "schema.registry.ssl.keystore.location" -> ("ssl.keystore.location"), "schema.registry.ssl.keystore.password" -> ("ssl.keystore.password"), "schema.registry.ssl.key.password" ->("ssl.key.password") )
val abrisConfig = AbrisConfig .fromConfluentAvro .downloadReaderSchemaByLatestVersion .andTopicNameStrategy(topic) .usingSchemaRegistry(registryConfig)
now I am able to deserialize the Value from avro but getting below error while trying to read key, any suggestions on this ?
inputDf.select(from_avro(col("key"), abrisConfig).as("key"))
Caused by: org.apache.avro.AvroTypeException: Found
Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2765) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2712) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2706) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2706) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1255) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1255) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1255) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2973) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2914) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2902) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2446) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2429) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:392) ... 56 more Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing.
Add isKey=true
when you want to deserialize key columns, false is default
.andTopicNameStrategy("topicName", isKey=true)
// Use isKey=true for the key schema and isKey=false for the value schema
Also you don't need "schema.registry.topic" -> topic,
in registryConfig
since this is specified in andTopicNameStrategy
Awesome, it worked, Thanks for the response. is there any advantage running with latest version of ABRiS library (4.2 over 3.1)
Yes, there were some bug fixes and performance improvements.
You can see the full list of changes here: https://github.com/AbsaOSS/ABRiS/releases
We have an application consuming data from Kafka (in avro format) using Spark streaming, but Kafka is enabled with 2 way SSL.
our Spark code has been working fine ABRiS (za.co.absa:abris_2.11:4.2.0) older version for longer time, we had to upgrade to Scala 2.12 and tried with library za.co.absa:abris_2.12:4.2.0 and it started throwing SSL error while deserializing.
Here is the SSL settings which is working with 2.11 version of scala
but same thing is throwing below error with 2.12 version
Can you please throw some inputs on this