confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.21k stars 1.11k forks source link

Got "Error retrieving Avro schema for id" after the reboot of Schema registry #667

Open gvdm90 opened 6 years ago

gvdm90 commented 6 years ago

Hi all,

I'm experiencing a blocking issue using the Schema Registry with Kafka and a Kafka Streams application. I posted it in details on the Confluent Google group

https://groups.google.com/forum/#!topic/confluent-platform/0L-kbkkoAbo

Just for reference I sum up here the problem (but you can find a detailed analysis is in the link above)

I have a Kafka/Confluent 3.3.1 stack made up of a Kafka broker (0.11.0.1, the Confluent one), Schema registry and a Kafka Streams application I wrote. I have all the agents running on the same machine and only one instance per type (one broker, one registry, one streams app) for test purposes. It's been a while that the stack is working but suddenly I noticed that no data was processed by the Kafka Streams application.

After restarting the Schema Registry I try to restart the Streams application but it fails with this exception

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 221
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)

There is an error SerializationException: Error retrieving Avro schema for id 221. It seems that the schema registry crush caused the schema with ID 221 to not being saved or it is not available on reboot. I checked also the URL http://10.17.36.101:8081/schemas/ids/221 but I got the same response

{
 error_code: 40403,
 message: "Schema not found"
}

Regarding the producer, it generates the Schema dynamically according to the data structure. Infact the data structure of the value written in Kafka may change, adding or removing fields, so I wrote a method which creates the JSON structure starting from the Class definition of the value (the implementation is in the Google groups discussion).

The Schema is not recreated each time we send the data but it is cached in a Map in the Producer application.

Do you know of known bugs of the registry in such a situation?

I temporary fixed this problem by "hard resetting" the Kafka stack.

ghost commented 6 years ago
  1. As far as I understand, you do not need schema-registry at all. From the use case that you have described, you are not using it in the way it is supposed to be used.
  2. Schema registry, in fact, won't create duplicate schemas. You may check __schemas topic and in Value section, you will find field: id which refers to Schema ID.
  3. Have you tried to restart Producer after SR restart? Just to exclude the possibility that you have incorrect values for schemas in CachedSchemaRegistryClient that you are using in your Producer.
gvdm90 commented 6 years ago

Hi @lashakv, thanks for the response

  1. What is the way it is supposed to be used? I followed the Confluent general guidelines adding the property schema.registry.url, creating the Parser.Schema object of the schemas and then creating the GenericData objects with that schemas. My use case needs to have multiple schemas for the same topic because the data may vary. The schemas get created in the registry and in fact a Kafka Streams app I wrote is able to deserialize the data (if the SR works). What is wrong with this implementation?
  2. Yes, I already checked the _schemas topic and understood the general behaviour. I implemented a Schema cache because after a while I run into this problem and there the advice is to cache the schemas to not create too much of them. I add a detail here. Before implementing the cache, even though I got the too many schema objects created error in the schema registry I got only a few of schemas. That was a strange behaviour but I fixed with the cache. Is the advice valid yet?
  3. No I didn't restart the producers as it is not a solution for us. We are using the producers as part of the classpath of a company's application so restarting the producer means restarting the application.

I would like to help you helping me/us to understand this issue so please feel free to ask any detail you need.

gvdm90 commented 6 years ago

Hi,

I tried disabling the schema cache and performing a lot of writes with the producer. I'm using the kafka-schema-registry-client version 3.3.1 and the kafka-client 0.11.0.1.

This is the exception regarding the too many schemas objects created

16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole) org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole) Caused by: java.lang.IllegalStateException: Too many schema objects created for TRACE_AUX-value!
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole)    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91) ~[kafka-schema-registry-client-3.3.1.jar:?]
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole)    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:74) ~[kafka-avro-serializer-3.3.1.jar:?]
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole)    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53) ~[kafka-avro-serializer-3.3.1.jar:?]
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole)    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:65) ~[kafka-clients-0.11.0.1.jar:?]
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole)    at org.apache.kafka.common.serialization.ExtendedSerializer$Wrapper.serialize(ExtendedSerializer.java:55) ~[kafka-clients-0.11.0.1.jar:?]
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole)    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:768) ~[kafka-clients-0.11.0.1.jar:?]
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole)    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:745) ~[kafka-clients-0.11.0.1.jar:?]
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole)    at test.KafkaTracerHelper.lambda$0(KafkaTracerHelper.java:342) ~[kafka-tracer-shared-1.4.11-SNAPSHOT.jar:?]
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole)    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:1.8.0_151]
16:45:30,624 INFO  [stdout] (AsyncAppender-asyncConsole)    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:1.8.0_151]
16:45:30,625 INFO  [stdout] (AsyncAppender-asyncConsole)    at java.lang.Thread.run(Unknown Source) [?:1.8.0_151]

but if I use the kafka topics UI application I installed I can see only three different schemas registered, the ones that my application generates, and a few of this kind

Key: {
  magic: 0,
  keytype: NOOP
}
Value: {}

So it seems that creating multiple Parser.Schema objects instead of reusing them will cause that exception (which is blocking), and a Schema cache is at the moment required.

nicktoker commented 6 years ago

i had the same problem , i use schema-registry 3.3.0 the problem was diffidence between the producer and consumer "schema.registry.url" in the producer i used hostname ( http://hostname:8081) and in the consumer an IP address ( for example http://192.168.3.2:8081) i changed both user host name (e.g. to be the same) and it solve the problem hope it helps

gvdm90 commented 6 years ago

Hi @nicktoker, thanks for the response.

I'm pretty sure that the properties were set at the same value. Moreover, I've been able to write to kafka using the schema registry for a while, so the stack (with the same properties) was working as usual.

scriperdjq commented 6 years ago

Am having the same issue....I had to restart the server (Am having a test setup by following the quickstart). After restart, The connectors, console-consumers throw 404 for any schema. Its trying to get data from old Id but its not present. I have added the schema again to schema registry but How to make kafka-avro-console-consumer / Kafka-connect to use the new schema ?

i-doit181 commented 6 years ago

we are using confluent platform 3.3.0 and we have the same issue. By few topics it was enough to restart the source but for other topics I must delete the whole topic to register a new schema.

gvdm90 commented 6 years ago

What do you think @lashakv ?

ghost commented 6 years ago

@gvdm90

  1. According to the usage of schema-registry. Maybe I am wrong, but if you have very dynamic schemas, that could be changed via several producers, you may try not to use schema-registry at all. Schema Registry is not the required service, you may use JSON as your value and key and take responsibility for key/values in the Kafka Topics.
  2. I had the same situation as @scriperdjq described. Even more, I had a lot of errors with invalid deserialization. The root cause of both problems for me was CachedSchemaRegistry.

My case was:

If you can share some test example that causes your problem, I can take a look at it.

gvdm90 commented 6 years ago

My schemas are not very dynamic. Starting from a clean situation, in the first minutes all the schemas get created dynamically (and are three or four, not so many) and then no other schema needs to be created because the data written in Kafka has the same structure of the previous ones. Using Avro is better then JSON for us becouse it handles the different schema changes, versions and in general has a built-in schema management which I should implement from ground. Moreover the other reasons written here convince us to use it.

I do not have a fixed test example to reproduce the problem because it occurred when the schema-registry Linux service crashed. Sometimes I succeeded reproducing the issue restarting the schema-registry service by myself, but this does not occur always.

i-doit181 commented 6 years ago

@gvdm90 @lashakv I have for example a default retention of 1h on my messagebus. Today in the morning I have restarted all connect-sources and the schemas were in _schema topic to find. Now the topic is completely empty. I think if I restart the schema registry, all schemas are gone. cleanup.policy of "_schema" is always compact (that was my first guess), so data should be present.

mageshn commented 6 years ago

@gvdm90 are you using confluent cli to start your ecosystem?

gvdm90 commented 6 years ago

Hi,

No, I'm starting Kafka, the streams application and the schema registry manually.

Il 4 dic 2017 11:25 PM, "Magesh Nandakumar" notifications@github.com ha scritto:

@gvdm90 https://github.com/gvdm90 are you using confluent cli to start your ecosystem?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/confluentinc/schema-registry/issues/667#issuecomment-349127268, or mute the thread https://github.com/notifications/unsubscribe-auth/AFBtbWU5z9PahPDWJ-424B-R32qgKCqSks5s9HFqgaJpZM4QgI-4 .

Igosuki commented 6 years ago

@salvax86 If your data is gone it is likely that the topic retention is not properly set.

i-doit181 commented 6 years ago

@Igosuki do you need retention for _schema topic? I though topic's cleanup.policy should be compact.

gvdm90 commented 6 years ago

Yes, Reading the documentation is told to set only the cleanup policy to compact instead of delete

Il 19 dic 2017 9:17 AM, "salvax86" notifications@github.com ha scritto:

@Igosuki https://github.com/igosuki do you need retention for _schema topic? I though topic's cleanup.policy should be compact.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/confluentinc/schema-registry/issues/667#issuecomment-352670997, or mute the thread https://github.com/notifications/unsubscribe-auth/AFBtbfaNy4RKfAsUybSxnUkYYkLfKWHWks5tB3EIgaJpZM4QgI-4 .

i-doit181 commented 6 years ago

Yesterday we have update our cluster to confluent 4. I will tell you in few days if everything is working fine.

mageshn commented 6 years ago

If you are losing schemas and you are managing the cluster start/stop using confluent CLI, then its a known limitation where CLI puts all the topic data in /tmp and hence on reboot you loose data.

If you would like to use a different directory, use export CONFLUENT_CURRENT=

gvdm90 commented 6 years ago

So that could be the problema! :) I will try the solution as soon as I can. One question: if I change the CONFLUENT_CURRENT variable on an existing cluster, changing it from /tmp to something else, will I lose all my previous data in /tmp or will the CLI scripts re-create the topic data in the new location?

Il 21 dic 2017 7:06 AM, "Magesh Nandakumar" notifications@github.com ha scritto:

If you are losing schemas and you are managing the cluster start/stop using confluent CLI, then its a known limitation where CLI puts all the topic data in /tmp and hence on reboot you loose data.

If you would like to use a different directory, use export CONFLUENT_CURRENT=

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/confluentinc/schema-registry/issues/667#issuecomment-353266267, or mute the thread https://github.com/notifications/unsubscribe-auth/AFBtbSgkuZA0Nf9MCautG6bqji5nnCO_ks5tCfVogaJpZM4QgI-4 .

mageshn commented 6 years ago

@gvdm90 you would lose everything . You could try copying them to your desired location and it should likely work.

gvdm90 commented 6 years ago

Hi @mageshn

I checked the /tmp folder but nothing regarding the schema registry is there. There is only a kafka-streams folder which contains the temporary data of my kafka streams agent (but in some trials, deleting this folder does not trigger the problem in subject).

This is the command I use in a systemd conf file to launch the schema registry:

/usr/bin/schema-registry-start /root/kafka-conf/schema-registry.properties

Are you sure that this command writes the temp data in /tmp?

gvdm90 commented 6 years ago

A quick update: before writing the previous message I stopped all my confluent services (kafka, kafka-connect-elasticsearch, schema-registry, kafka-streams agent) with the command systemctl stop <service>.

Then I restarted the services with a simply systemctl restart <service> and now the kafka-streams agent starts with the error of this issue...

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema for id 1
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:457)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:480)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:604)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:483)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:84)

So again, I got this error, simply restarting the Linux services of the Confluent applications. What should I do now? I have a locked Kafka stack again.

mageshn commented 6 years ago

@gvdm90 your issue is certainly different from the Confluent CLI issue. If you are using the schema registry start scripts directly along with with your own props file, nothing should be written into /tmp folder. More than Schema Registry, i'm interested in how you start/stop Kafka and whats the log dir for Kafka. Also, when you restart your stack, are the messages in other topic intact? If so, can you please provide me with Schema Registry startup logs? I would certainly be needing more information to help you out since Schema Registry by design doesn't lose schemas unless something happens to the topic itself.

gvdm90 commented 6 years ago

@mageshn I start and stop Kafka as a systemd service

systemctl start kafka systemctl stop kafka

This is the configuration used by systemd

[Unit]
Description=Kafka service
After=zookeeper.service
Requires=zookeeper.service
BindsTo=zookeeper.service

[Service]
ExecStart=/usr/bin/kafka-server-start /root/kafka-conf/kafka.properties
Type=simple
Restart=on-failure
SuccessExitStatus=143
RestartSec=10

[Install]
WantedBy=default.target

This is the kafka.properties file

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://the.hostname:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000
confluent.support.metrics.enable=true
group.initial.rebalance.delay.ms=0
confluent.support.customer.id=anonymous
inter.broker.protocol.version=1.0

log.dirs=/var/lib/kafka

As you can see I write the kafka data in /var/lib/kafka

When the error occurs the messages of all the topics are corrupted because no schema is present in _schemas.

Unfortunately, yesterday I reset the kafka installation (and upgraded to confluent 4.0/kafka 1.0), so at the moment I have no error. I needed a working kafka stack as soon as possible and the error was stucking me again.

soxofaan commented 6 years ago

related: #698 (summary: make sure _schemas topic is configured with cleanup.policy=compact)

Inylschek commented 4 years ago

After a cluster update we had this issue. Error retrieving Avro schema for id X where X is greater than any schema id remaining in the _schemas topic. The error was happening when attempting to use Kafka Streams.

To fix this we deleted all of the Kafka Streams topics in the Kafka cluster (the topics with STATE-STORE in the name). Some metadata was persisting in those topics which was causing the error.

After deleting the topics normal service was resumed.

alok87 commented 4 years ago

Really sad, such issues keep lying open, with so many people keep reporting it. :(

akshayhanda commented 3 years ago

Could anyone please help in above issue. I am also stuck in the same.

foolOnTheHill commented 2 years ago

The same happened to me.

akshayhanda commented 2 years ago

Could anyone please help in above issue. I am also stuck in the same.

I was able to resolve this issue. I tried registering the schema in schema registry . Also changed the key converter to String Converter instead of Avro