confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.19k stars 1.11k forks source link

Schema Registration Failure (due to RecordTooLargeException) #277

Open pronzato opened 8 years ago

pronzato commented 8 years ago

Hi All,

We have a large schema that is failing to register due to what looks to be a RecordTooLargeException being thrown by the KafkaStore (full StackTrace below).

The cause being reported back to my client is "The message is 1724096 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration".

I've tried changing the MAX_REQUEST_SIZE_CONFIG (max.request.size) of my client KafkaProducer to 104857600 and have added the following configs for the Kafka broker (server.properties) but the registration still fails:

server.properties replica.fetch.max.bytes=104857600 message.max.bytes=104857600

I guessing that the issue might be with the KafkaProducer used by the registry? If so, is there a way to pass in a MAX_REQUEST_SIZE_CONFIG (max.request.size) to it?

Regards

Gianni

Full StackTrace :

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register schema operation failed while writing to the Kafka store io.confluent.kafka.schemaregistry.rest.exceptions.RestSchemaRegistryStoreException: Register schema operation failed while writing to the Kafka store io.confluent.kafka.schemaregistry.rest.exceptions.RestSchemaRegistryStoreException: Register schema operation failed while writing to the Kafka store at io.confluent.kafka.schemaregistry.rest.exceptions.Errors.storeException(Errors.java:80) at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:160) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81) at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144) at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161) at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:143) at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99) at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389) at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347) at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102) at org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:308) at org.glassfish.jersey.internal.Errors$1.call(Errors.java:271) at org.glassfish.jersey.internal.Errors$1.call(Errors.java:267) at org.glassfish.jersey.internal.Errors.process(Errors.java:315) at org.glassfish.jersey.internal.Errors.process(Errors.java:297) at org.glassfish.jersey.internal.Errors.process(Errors.java:267) at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317) at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:291) at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1140) at org.glassfish.jersey.servlet.WebComponent.service(WebComponent.java:403) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:386) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:334) at org.glassfish.jersey.servlet.ServletContainer.service(ServletContainer.java:221) at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:808) at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:587) at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221) at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127) at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515) at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185) at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061) at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141) at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159) at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97) at org.eclipse.jetty.server.Server.handle(Server.java:499) at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:310) at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:257) at org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:540) at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635) at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555) at java.lang.Thread.run(Thread.java:745) Caused by: io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException: Error while registering the schema in the backend Kafka store at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:335) at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.registerOrForward(KafkaSchemaRegistry.java:351) at io.confluent.kafka.schemaregistry.rest.resources.SubjectVersionsResource.register(SubjectVersionsResource.java:154) ... 44 more Caused by: io.confluent.kafka.schemaregistry.storage.exceptions.StoreException: Put operation failed while waiting for an ack from Kafka at io.confluent.kafka.schemaregistry.storage.KafkaStore.put(KafkaStore.java:272) at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.register(KafkaSchemaRegistry.java:326) ... 46 more Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1724096 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.(KafkaProducer.java:686) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:449) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339) at io.confluent.kafka.schemaregistry.storage.KafkaStore.put(KafkaStore.java:262) ... 47 more Caused by: org.apache.kafka.common.errors.RecordTooLargeException: The message is 1724096 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration. ; error code: 50001 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:157) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:174) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:225) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:217) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:212) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:57) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:89) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:50) at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:61) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:425) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:339) at com.baml.ficc.xa.kafka.confluent.ConfluentXAProducer.main(ConfluentXAProducer.java:50)

ewencp commented 8 years ago

@pronzato That definitely looks like you're exceeding the permitted size. Unfortunately addressing this is more complicated than just changing that one setting because the message is larger than the default size permitted by Kafka. You'd need to change the maximum message size permitted on the __schemas topic with a topic-level config, then have schema registry allow larger messages in both its producer and consumer.

Currently the schema registry doesn't allow overriding the relevant producer and consumer settings -- usually you should not need to override any of these setting so there's no reason to pass configs along like we do for services like the REST proxy. We'd need to expose that functionality or add new schema registry configs for specifying the maximum message size.

pronzato commented 8 years ago

Hi Ewen,

Thanks for getting back to me so quickly. I was able to build the schema-registry from source and add support to pass in the "max.request.size" for the Producer and "fetch.message.max.bytes" for the Consumer. That combined with changing the Kafka server config "message.max.bytes" and "replica.fetch.max.bytes" we're able to now register our large schemas.

Would it be possible to expose those Producer/Consumer configs in your next release so we would be able to register large schemas out of the box?

In the meantime, when you say create a topic-level config for _schemas, would that mean that I could then remove the Kafka server settings I added ("message.max.bytes" and "replica.fetch.max.bytes") which I assume are global settings? If so, how would I go about doing so for _schemas?

Again, thank you for your quick response.

Regards

Gianni

ewencp commented 8 years ago

@pronzato Glad that worked. I'm not sure we want to expose every producer/consumer config. That makes more sense for something like REST where it's supposed to be a light wrapper around the clients. Here, the use of Kafka is an implementation detail, and even which version of the clients (old vs new producer/consumer) may change, which would mean the config names could also change.

I think in this case, it might make more sense to expose a specific schema registry config for maximum message size (or maybe even use terminology different than "message") and translate to the appropriate client configs. We don't really need access to all the configs, only a couple of very specific ones.

Re: the topic-level config, yes, some settings can be configured on a per-topic basis so you don't have to change them globally. This is useful both because it limits the scope of the change to that single topic (so, e.g., you don't have to also allow such large messages in topics that should never have messages that large) and because you can make the change without having to bounce all your brokers with a new config. Details are here: http://kafka.apache.org/documentation.html#topic-config You'd want to set the max.message.bytes property (which is unfortunately inconsistent with the broker config name, message.max.bytes).

pronzato commented 8 years ago

Yes, totally agree with you that not all configs should be exposed, just the ones which would allow the larger message sizes (and possibly any timing related ones that might be affected due to the larger messages). Using different terminology is also a good idea to remove the tie in to the Kafka specific config names.

Thanks for the topic config link, will take a look.

Regards

Gianni

StephenZeng-Wonder commented 2 years ago

For anyone who involves in this issue: If u deploys in docker (Kubernetes YAML), then add the below configs to solve the problem:

            - name: SCHEMA_REGISTRY_KAFKASTORE_MAX_REQUEST_SIZE
              value: "10485760"

No document, you can find the feature in schema-registry-image src and schema-registry src: https://github.com/confluentinc/schema-registry/blob/master/core/src/main/java/io/confluent/kafka/schemaregistry/storage/KafkaStore.java#L126