AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
230 stars 75 forks source link

schema registry being called with http instead of https #323

Closed tomas-sedlon closed 9 months ago

tomas-sedlon commented 1 year ago

Hi, I am using the ABRiS library with schema registry to read data from Kafka and deserialize them. This works well if I have a Confluent schema registry that requires just http call, but breaks when I have an https one. I am unable to force ABRis to make a https call.

My code:

val df = spark.readStream.format("kafka") ....

val schemaRegistry: String = "https://MY_SCHEMA_REGISTRY_URL:443"

val registryConfig = Map(
  AbrisConfig.SCHEMA_REGISTRY_URL -> schemaRegistry,
  "basic.auth.credentials.source" -> "USER_INFO",
  "basic.auth.user.info" -> s"$user:$pass",
  "schema.registry.ssl.enabled" -> "true",
  "ssl.endpoint.identification.algorithm" -> "",
  "schema.registry.ssl.truststore.type" -> "sth",
  "schema.registry.ssl.keystore.type" -> "sth",
  "schema.registry.ssl.truststore.location" -> "sth",
  "schema.registry.ssl.truststore.password" -> "sth",
  "schema.registry.ssl.keystore.location" -> "sth",
  "schema.registry.ssl.keystore.password" -> "sth"
)

val abrisConfig = AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy(topic)
  .usingSchemaRegistry(registryConfig)

val deserializedStream = df.select(
  from_avro(col("value"), abrisConfig) as "data",
  col("topic"),
  col("timestamp") as "kafka_timestamp"
)

deserializedStream 
  .writeStream 
  .format("console") ...

produces:

org.apache.spark.SparkException: [WRITING_JOB_ABORTED] Writing job aborted

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 15) (10.61.26.6 executor 1): org.apache.spark.SparkException: Malformed record detected.

Caused by: java.lang.RuntimeException: Not able to download writer schema

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: RESTEASY003210: Could not find resource for full path: 

http://MY_SCHEMA_REGISTRY+URL/schemas/ids/102?fetchMaxId=false&subject=

; error code: 404
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:314)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:853)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:826)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:311)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:433)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaById(CachedSchemaRegistryClient.java:409)
    at za.co.absa.abris.avro.registry.AbstractConfluentRegistryClient.getById(AbstractConfluentRegistryClient.scala:44)
    at za.co.absa.abris.avro.read.confluent.SchemaManager.getSchemaById(SchemaManager.scala:49)
    at za.co.absa.abris.avro.sql.AvroDataToCatalyst.$anonfun$downloadWriterSchema$1(AvroDataToCatalyst.scala:160)
    at scala.util.Try$.apply(Try.scala:213)
    at za.co.absa.abris.avro.sql.AvroDataToCatalyst.downloadWriterSchema(AvroDataToCatalyst.scala:160)

notice the http:// in calling the schema registry. How can I force abris to use https?

Thanks a lot for any help!

tomas-sedlon commented 1 year ago

also found - https://github.com/AbsaOSS/ABRiS/issues/208

but the docs mentioned there do not seem to say anything about how to enable https

cerveada commented 1 year ago

Hello, Abris just takes this URL and gives it to io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient. Since you have issues with this, I would recommend debugging this issue using CachedSchemaRegistryClient alone and when it's working you can go back to full Abris.

You can see the exact way how client is created here: https://github.com/AbsaOSS/ABRiS/blob/master/src/main/scala/za/co/absa/abris/avro/registry/ConfluentRegistryClient.scala