AbsaOSS / ABRiS

Avro SerDe for Apache Spark structured APIs.
Apache License 2.0
229 stars 75 forks source link

java.util.NoSuchElementException: key not found: value.schema.id #9

Closed sanori closed 5 years ago

sanori commented 6 years ago

I have tried ABRiS on spark-shell as follows (on Spark 2.3.1, Confluent 4.1.0):

Invoking spark-shell:

spark-shell \
--repositories "https://packages.confluent.io/maven/" \
--packages "org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,io.confluent:kafka-avro-serializer:4.1.1,za.co.absa:abris:2.0.0"

spark-shell script (formatted for reader):

import za.co.absa.abris.avro.AvroSerDe._
import za.co.absa.abris.avro.read.confluent.SchemaManager
import za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies._

val schemaRegistryConfs = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://schema-registry:8081/", 
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "topic1"
)
val df = spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka1:9092")
    .option("subscribe", "topic1")
    .fromConfluentAvro("value", None, Some(schemaRegistryConfs))(RETAIN_SELECTED_COLUMN_ONLY)

But it makes following error:

java.util.NoSuchElementException: key not found: value.schema.id
  at scala.collection.MapLike$class.default(MapLike.scala:228)
  at scala.collection.AbstractMap.default(Map.scala:59)
  at scala.collection.MapLike$class.apply(MapLike.scala:141)
  at scala.collection.AbstractMap.apply(Map.scala:59)
  at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistry(SchemaLoader.scala:65)
  at za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils$.load(AvroSchemaUtils.scala:50)
  at za.co.absa.abris.avro.serde.AvroToRowEncoderFactory$.createRowEncoder(AvroToRowEncoderFactory.scala:44)
  at za.co.absa.abris.avro.serde.AvroDecoder.fromConfluentAvroToRow(AvroDecoder.scala:58)
  at za.co.absa.abris.avro.AvroSerDe$StreamDeserializer.fromConfluentAvro(AvroSerDe.scala:154)
  ... 53 elided

What would be the reason of the problem?

felipemmelo commented 6 years ago

Hi Joo-Won, the schema id is missing, as explained here.

However, the documentation is outdated, I'm updating it right away. For now, you can do:

val schemaRegistryConfs = Map( SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://schema-registry:8081/", SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "topic1", SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest" )

This will bring the latest schema id for the value column.

sanori commented 6 years ago

Thank you for your prompt answer. I tried as you say:

val schemaRegistryConfs = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://schema-registry:8081/", 
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "topic1",
    SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest"
)

but I got another error as follows:

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
  at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:438)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:430)
  at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:211)
  at za.co.absa.abris.avro.read.confluent.SchemaManager$.getLatestVersion(SchemaManager.scala:83)
  at za.co.absa.abris.avro.schemas.SchemaLoader$.getSchemaId(SchemaLoader.scala:84)
  at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistry(SchemaLoader.scala:72)
  at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistry(SchemaLoader.scala:67)
  at za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils$.load(AvroSchemaUtils.scala:50)
  at za.co.absa.abris.avro.serde.AvroToRowEncoderFactory$.createRowEncoder(AvroToRowEncoderFactory.scala:44)
  at za.co.absa.abris.avro.serde.AvroDecoder.fromConfluentAvroToRow(AvroDecoder.scala:58)
  at za.co.absa.abris.avro.AvroSerDe$StreamDeserializer.fromConfluentAvro(AvroSerDe.scala:154)
  ... 53 elided
felipemmelo commented 6 years ago

What are you getting from this URL? http://schema-registry:8081/subjects/topic1-value/versions/

sanori commented 6 years ago

What are you getting from this URL? http://schema-registry:8081/subjects/topic1-value/versions/

{
  "error_code": 40401,
  "message": "Subject not found."
}

By the way,

  1. The raw content of "value" column starts with 0x00 0x00 0x00 0x00 0x7A that means the value is prefixed with the magic byte and schema ID of 122 (0x7a). Can I configure ABRiS to notice the schema ID from the stream to automate the Avro parsing, instead of designate schema ID or topic name?

  2. Is SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC means kafka topic name or registered schema (=subject) name? In my case Kafka topic1 is encoded in schema1. Therefore, http://schema-registry:8081/subjects/schema1/versions/ returns the array of version numbers. But http://schema-registry:8081/subjects/schema1-value/versions/ or http://schema-registry:8081/subjects/topic1-value/versions/ returns 404.

felipemmelo commented 6 years ago

So the problem is this, ABRiS expects the topic to translate the subject from it and get the schema from Schema registry.

  1. It does not, since the schema is necessary to create the row encoder that will serialize data to executors.

  2. What do you get if you try SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "schema1"?

sanori commented 6 years ago

I'm sorry for insufficient explanation and late reply. I'm trying to parse kafka stream with Confluent schema registry which is provided by other part. I'm testing deserialization part only.

What do you get if you try SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "schema1"?

Similar to https://github.com/AbsaOSS/ABRiS/issues/9#issuecomment-408375615

io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
  at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:202)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:229)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:438)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:430)
  at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:211)
  at za.co.absa.abris.avro.read.confluent.SchemaManager$.getLatestVersion(SchemaManager.scala:83)
  at za.co.absa.abris.avro.schemas.SchemaLoader$.getSchemaId(SchemaLoader.scala:84)
  at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistry(SchemaLoader.scala:72)
  at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistry(SchemaLoader.scala:67)
  at za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils$.load(AvroSchemaUtils.scala:50)
  at za.co.absa.abris.avro.serde.AvroToRowEncoderFactory$.createRowEncoder(AvroToRowEncoderFactory.scala:44)
  at za.co.absa.abris.avro.serde.AvroDecoder.fromConfluentAvroToRow(AvroDecoder.scala:58)
  at za.co.absa.abris.avro.AvroSerDe$StreamDeserializer.fromConfluentAvro(AvroSerDe.scala:154)
  ... 53 elided

I have succeed to parse my stream (=topic1) by setting schema ID directly as follows:

val schemaRegistryConfs = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://schema-registry:8081/", 
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> "topic1", 
    SchemaManager.PARAM_VALUE_SCHEMA_ID -> "122")

It seems that SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC is not relevant to parse the stream in this case.

Anyway it works! Thank you for your product ABRiS!

But, it would be the off-topic of this issue, a new problem is occurs when a stream (=topic2) uses two or more schemata. Therefore I asked if there is a configuration for ABRiS to get schema by schema ID in prefixed stream. The prefix format (magic byte(0) + schema ID) seem to be an official feature of Confluent platform due to https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format .

felipemmelo commented 6 years ago

Hey Joo-Won, thanks a lot for the detailed feedback.

Regarding the off-topic you mentioned, if I got it right, maybe the comment 5) on this class can shed some light? https://github.com/AbsaOSS/ABRiS/blob/641cdda10cbc52630a66091dbe03297673f81868/src/main/scala/za/co/absa/abris/avro/read/confluent/ScalaConfluentKafkaAvroDeserializer.scala#L79

sanori commented 6 years ago

Ok, I see that ABRiS checks the schema ID.

Is there any way to designate subject name instead of -(key|value) subject name? Now I understand that use subject name as <topic name>-(key|value) is the default subject name strategy, but we use the strategy like io.confluent.kafka.serializers.subject.RecordNameStrategy.

Thanks

felipemmelo commented 6 years ago

Thanks a lot for the reply, Joo-Won.

It does not support that naming strategy yet, but I'll look into it and come back in the next couple of days, ok?

sterkh66 commented 5 years ago

Hi! Anyway SchemaManager.PARAM_VALUE_SCHEMA_ID fails to deal with "latest" value. So it would be nice either to make it behave appropriately or correct the docs.

felipemmelo commented 5 years ago

Hi Joo-Won, how are you?

That is strange, since I've bee using this flag myself :)

Is it throwing or returning an incorrect value?

sterkh66 commented 5 years ago

Yes, it's throwing exception about missing schema id. Maybe in your case version_id just equals to schema_id and you think everything is just fine ? ) Guess getLatestVersion returns latest version_id but not schema_id

felipemmelo commented 5 years ago

May I see the stack trace so that I can see exactly where the problem is?

sterkh66 commented 5 years ago

Here it is:


io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
  at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:187)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:352)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:344)
  at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getLatestSchemaMetadata(CachedSchemaRegistryClient.java:136)
  at za.co.absa.abris.avro.read.confluent.SchemaManager$.getLatestVersion(SchemaManager.scala:83)
  at za.co.absa.abris.avro.schemas.SchemaLoader$.getSchemaId(SchemaLoader.scala:84)
  at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistry(SchemaLoader.scala:72)
  at za.co.absa.abris.avro.schemas.SchemaLoader$.loadFromSchemaRegistry(SchemaLoader.scala:67)
  at za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils$.load(AvroSchemaUtils.scala:50)```
felipemmelo commented 5 years ago

Hi Joo-Won, thanks for coming back. A couple of comments about the fact.

  1. The "latest" version is indeed supported as you can see from the logs. It invoked SchemaLoader$.getSchemaId which invoked SchemaManager$.getLatestVersion, which means the keyword "latest" was understood. You can check it happening here

  2. There is no confusion between version and schema ids. As you can see here, the id is chosen from the response from Schema Registry Client, which is produced by this piece of code.

  3. Finally, the message says Subject not found. If you navigate to http://schema_registry_url:port/subjects, can you see the very same subject there?

sterkh66 commented 5 years ago

Hi, Felipe! All right, my point is very simple. Look at this snippet of my code

    val schemaRegistryInConfig = Map(
      SchemaManager.PARAM_SCHEMA_REGISTRY_URL   -> conf.schemaRegistryUrl,
      SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> conf.schemaRegistryTopic,
      SchemaManager.PARAM_VALUE_SCHEMA_ID       -> conf.schemaRegistrySchemaId
    )

Whenever I set conf.schemaRegistrySchemaId to valid schema id it works great, but when I set it to "latest" it simply doesn't and fails with the exception above. What can I say else ?

felipemmelo commented 5 years ago

Yes, and that is because the subject is not there. We can see the problem by taking the steps in the code.

Case 1 - Informing the schema id as number

  1. Following the stack trace you informed above, the flow will eventually get here. Since it is not the keyword "latest", it will return the code itself, as integer.

  2. It will then follow the path get here, inside Confluent's own library. At this point, since neither the subject nor the id are cached, the schema will be retrieved by id. There's nothing related to subject at this point, thus, it works.

Case 2 - Informing the schema id as "latest"

  1. It will get to this line, and then it will try to retrieve the latest metadata by subject, so that it can retrieve the latest id as integer from there. This is when "latest" gets converted to a number.

  2. Since your schema is not found, everything will break. But please notice that it breaks while trying to access data about the subject. So, the same questio remains: is you subject actually there?

sterkh66 commented 5 years ago

Subject does exist, because following code works just fine

        val client = new CachedSchemaRegistryClient(url, 0)
        client.getLatestSchemaMetadata(s"$topic-value").getId.toString

I also tried different PARAM_SCHEMA_REGISTRY_TOPIC here SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> s"$topic", and SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> s"$topic-value", but still no success

sterkh66 commented 5 years ago

So far my workaround is getting actual schema id by CachedSchemaRegistryClient as in the snippet above

sterkh66 commented 5 years ago

Ok, seems I finally got it working. The problem was indeed (what I missed in the first place) in SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC which was set to s"$topic-value" but not to s"$topic". So thank you, Felipe, and sorry.

felipemmelo commented 5 years ago

Amazing! Happy it worked. Will be closing this thread then. Cheers.