confluentinc / kafka-rest

Confluent REST Proxy for Kafka
https://docs.confluent.io/current/kafka-rest/docs/index.html
Other
31 stars 642 forks source link

Avro RecordNameStrategy not working properly #666

Open wtrzcinski opened 4 years ago

wtrzcinski commented 4 years ago

Hi, I try to send messages to the kafka through the kafka-rest proxy with custom avro schemas per record. I tried to setup proper SubjectNameStrategy using properties in /etc/kafka-rest/kafka-rest.properties configuration file:

KAFKA_REST_client.key.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
KAFKA_REST_client.value.subject.name.strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy

And I met very strange behavior as if the kafka-rest proxy tried to register my schemas twice. Once under "topic-name-key", "topic-name-value" subject names and once under actual record names. As visible in following logs from schema-registry container:

...
kafka-schemaregistry_1  | [2020-05-06 09:30:18,570] INFO Wait to catch up until the offset of the last message at 1 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
kafka-schemaregistry_1  | [2020-05-06 09:30:18,605] INFO 172.20.0.4 - - [06/May/2020:09:30:18 +0000] "POST /subjects/search.hit-key/versions HTTP/1.1" 200 8  261 (io.confluent.rest-utils.requests)
kafka-schemaregistry_1  | [2020-05-06 09:30:18,609] INFO Wait to catch up until the offset of the last message at 2 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
kafka-schemaregistry_1  | [2020-05-06 09:30:18,619] INFO 172.20.0.4 - - [06/May/2020:09:30:18 +0000] "POST /subjects/search.hit-value/versions HTTP/1.1" 200 8  12 (io.confluent.rest-utils.requests)
kafka-schemaregistry_1  | [2020-05-06 09:30:18,655] INFO Wait to catch up until the offset of the last message at 3 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
kafka-schemaregistry_1  | [2020-05-06 09:30:18,670] INFO 172.20.0.4 - - [06/May/2020:09:30:18 +0000] "POST /subjects/com.mycompany.search.monitoring.model.SearchKey/versions HTTP/1.1" 200 8  17 (io.confluent.rest-utils.requests)
kafka-schemaregistry_1  | [2020-05-06 09:30:18,675] INFO Wait to catch up until the offset of the last message at 4 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
kafka-schemaregistry_1  | [2020-05-06 09:30:18,684] INFO 172.20.0.4 - - [06/May/2020:09:30:18 +0000] "POST /subjects/com.mycompany.search.monitoring.model.JobStart/versions HTTP/1.1" 200 8  11 (io.confluent.rest-utils.requests)
...

For the next records, which have different avro schema, the registry complains with following logs:

...
kafka-schemaregistry_1  | [2020-05-06 09:30:18,814] INFO 172.20.0.4 - - [06/May/2020:09:30:18 +0000] "POST /subjects/search.hit-value/versions HTTP/1.1" 409 93  19 (io.confluent.rest-utils.requests)
kafka-schemaregistry_1  | [2020-05-06 09:30:18,845] INFO Wait to catch up until the offset of the last message at 5 (io.confluent.kafka.schemaregistry.storage.KafkaStore)
kafka-schemaregistry_1  | [2020-05-06 09:30:18,850] INFO 172.20.0.4 - - [06/May/2020:09:30:18 +0000] "POST /subjects/search.hit-value/versions HTTP/1.1" 409 93  9 (io.confluent.rest-utils.requests)
...

And the kafka rest proxy with the following exception:

kafka-rest_1            | [2020-05-06 09:30:18,825] INFO 172.20.0.1 - - [06/May/2020:09:30:18 +0000] "POST /topics/search.hit HTTP/1.1" 408 69  37 (io.confluent.rest-utils.requests)
kafka-rest_1            | [2020-05-06 09:30:18,850] ERROR Request Failed with exception  (io.confluent.rest.exceptions.DebuggableExceptionMapper)
kafka-rest_1            | io.confluent.rest.exceptions.RestException: Schema registration or lookup failed
kafka-rest_1            |   at io.confluent.kafkarest.SchemaRestProducer.produce(SchemaRestProducer.java:109)
kafka-rest_1            |   at io.confluent.kafkarest.ProducerPool.produce(ProducerPool.java:209)
kafka-rest_1            |   at io.confluent.kafkarest.resources.v2.TopicsResource.produce(TopicsResource.java:165)
kafka-rest_1            |   at io.confluent.kafkarest.resources.v2.TopicsResource.produceSchema(TopicsResource.java:206)
kafka-rest_1            |   at io.confluent.kafkarest.resources.v2.TopicsResource.produceAvro(TopicsResource.java:119)
kafka-rest_1            |   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
kafka-rest_1            |   at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
kafka-rest_1            |   at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
kafka-rest_1            |   at java.lang.reflect.Method.invoke(Method.java:498)
kafka-rest_1            |   at org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)
kafka-rest_1            |   at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:124)
kafka-rest_1            |   at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:167)
kafka-rest_1            |   at org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)
kafka-rest_1            |   at org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:79)
kafka-rest_1            |   at org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:469)
kafka-rest_1            |   at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:391)
kafka-rest_1            |   at org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:80)
kafka-rest_1            |   at org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:253)
kafka-rest_1            |   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)
kafka-rest_1            |   at org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)
kafka-rest_1            |   at org.glassfish.jersey.internal.Errors.process(Errors.java:292)
kafka-rest_1            |   at org.glassfish.jersey.internal.Errors.process(Errors.java:274)
kafka-rest_1            |   at org.glassfish.jersey.internal.Errors.process(Errors.java:244)
kafka-rest_1            |   at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)
kafka-rest_1            |   at org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:232)
kafka-rest_1            |   at org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:680)
kafka-rest_1            |   at org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)
kafka-rest_1            |   at org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:386)
kafka-rest_1            |   at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:561)
kafka-rest_1            |   at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:502)
kafka-rest_1            |   at org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:439)
kafka-rest_1            |   at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1604)
kafka-rest_1            |   at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:545)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
kafka-rest_1            |   at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1607)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1297)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
kafka-rest_1            |   at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:485)
kafka-rest_1            |   at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1577)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1212)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:173)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:221)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:767)
kafka-rest_1            |   at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
kafka-rest_1            |   at org.eclipse.jetty.server.Server.handle(Server.java:500)
kafka-rest_1            |   at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:383)
kafka-rest_1            |   at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:547)
kafka-rest_1            |   at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:375)
kafka-rest_1            |   at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:270)
kafka-rest_1            |   at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
kafka-rest_1            |   at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:103)
kafka-rest_1            |   at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
kafka-rest_1            |   at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
kafka-rest_1            |   at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
kafka-rest_1            |   at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
kafka-rest_1            |   at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
kafka-rest_1            |   at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:388)
kafka-rest_1            |   at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:806)
kafka-rest_1            |   at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:938)
kafka-rest_1            |   at java.lang.Thread.run(Thread.java:748)
kafka-rest_1            | Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
kafka-rest_1            |   at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:293)
kafka-rest_1            |   at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352)
kafka-rest_1            |   at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495)
kafka-rest_1            |   at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486)
kafka-rest_1            |   at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:459)
kafka-rest_1            |   at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206)
kafka-rest_1            |   at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268)
kafka-rest_1            |   at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244)
kafka-rest_1            |   at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.register(AbstractKafkaSchemaSerDe.java:146)
kafka-rest_1            |   at io.confluent.kafkarest.SchemaRestProducer.produce(SchemaRestProducer.java:102)
kafka-rest_1            |   ... 64 more

At a first glance the issue seems to be that your SchemaRestProducer doesn't understand the configured subject strategy but your KafkaAvroSerializer does.

Are you supporting this schema per record scenario and it is just a bug? (or you don't support this scenario and the "subject.name.strategy" configuration shouldn't be accepted by the KafkaAvroSerializer or KafkaAvroSerializer shouldn't auto register schema.... Edit: I'm just guessing, I don't know the root cause, but it doesn't work properly)

Best Regards Wojtek

benhag commented 4 years ago

We have the same problem: key and value schema are registered twice when the RecordNameStrategy is used. Once using the default TopicNameStrategy (e.g. <topic-name>-key and <topic-name>-value) and once using the configured RecordNameStrategy. The first message therefore defines the schema for the whole topic and it's not possible to sent messages using a different schema to the same topic.

Any news on this?

noraab commented 4 years ago

@wtrzcinski did you receive any response or found any solution?

wtrzcinski commented 4 years ago

Hi, sorry for late reply. No, I didn't get any response. It was long time ago, and I don't remember exactly, but as far as I remember the workaround was as follows:

At the very beginning we tried to send the message:

{
   'value_schema': ...,
   "records": [...]
}

And we expected that the endpoint would return id of the schema which we wanted to use later. But we met the issue I described above - generally this scenario seems to be not working for us. So, as workaround, on the client side for every schema we send directly a request to schema registry in order to create the schema and that request returns "schema_id" which we can use later in a kafka event request. And then we send the records with already existing schema ids:

{
    'key_schema_id': ...,
    'value_schema_id': ...,
    'records': [...]
}

Best Regards Wojciech Trzciński

benhag commented 4 years ago

Requesting the schema ids from the schema-registry and sending them instead of the schema to the rest-proxy works. Thanks for the workaround @wtrzcinski

cobolbaby commented 4 months ago

Has the v3 interface already supported this capability? Does v2 only support io.confluent.kafka.serializers.subject.TopicNameStrategy?

curl --location 'http://10.190.81.165:8082/topics/lwtest3' \
--header 'Content-Type: application/vnd.kafka.jsonschema.v2+json' \
--data-raw '{
    "records": [
        {
            "value": {
                "email": "cobolbaby@qq.com",
                "favorite_color": "green",
                "favorite_number": 17,
                "name": "itc180012"
            }
        }
    ],
    "value_schema_id": 13
}'

{
    "error_code": 42207,
    "message": "Error serializing message. Error when fetching schema version. subject = lwtest3-value, schema = {\"$schema\":\"http://json-schema.org/draft-07/schema#\",\"description\":\"A Confluent Kafka Python User\",\"properties\":{\"email\":{\"description\":\"User's email\",\"format\":\"email\",\"type\":\"string\"},\"favorite_color\":{\"description\":\"User's favorite color\",\"enum\":[\"red\",\"green\"],\"type\":\"string\"},\"favorite_number\":{\"description\":\"User's favorite number\",\"exclusiveMinimum\":0,\"type\":\"number\"},\"name\":{\"description\":\"User's name\",\"minLength\":1,\"pattern\":\"^itc\\\\d{6}$\",\"type\":\"string\"}},\"required\":[\"name\",\"favorite_number\",\"favorite_color\",\"email\"],\"title\":\"User\",\"type\":\"object\"}\nSchema not found io.confluent.rest.exceptions.RestNotFoundException: Schema not found\nio.confluent.rest.exceptions.RestNotFoundException: Schema not found\n\tat io.confluent.kafka.schemaregistry.rest.exceptions.Errors.schemaNotFoundException(Errors.java:121)\n\tat io.confluent.kafka.schemaregistry.rest.resources.SubjectsResource.lookUpSchemaUnderSubject(SubjectsResource.java:132)\n\tat jdk.internal.reflect.GeneratedMethodAccessor79.invoke(Unknown Source)\n\tat java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.base/java.lang.reflect.Method.invoke(Method.java:566)\n\tat org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory.lambda$static$0(ResourceMethodInvocationHandlerFactory.java:52)\n\tat org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:134)\n\tat org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:177)\n\tat org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$VoidOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:159)\n\tat org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:81)\n\tat org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:475)\n\tat org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:397)\n\tat org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:81)\n\tat org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:255)\n\tat org.glassfish.jersey.internal.Errors$1.call(Errors.java:248)\n\tat org.glassfish.jersey.internal.Errors$1.call(Errors.java:244)\n\tat org.glassfish.jersey.internal.Errors.process(Errors.java:292)\n\tat org.glassfish.jersey.internal.Errors.process(Errors.java:274)\n\tat org.glassfish.jersey.internal.Errors.process(Errors.java:244)\n\tat org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:265)\n\tat org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:234)\n\tat org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:684)\n\tat org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:394)\n\tat org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:378)\n\tat org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:553)\n\tat org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:494)\n\tat org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:431)\n\tat org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)\n\tat org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1626)\n\tat org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:552)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)\n\tat org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:143)\n\tat org.eclipse.jetty.server.handler.RequestLogHandler.handle(RequestLogHandler.java:54)\n\tat org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)\n\tat io.confluent.kafka.schemaregistry.rest.RequestIdHandler.handle(RequestIdHandler.java:51)\n\tat org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:235)\n\tat org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1440)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)\n\tat org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:505)\n\tat org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)\n\tat org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1355)\n\tat org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)\n\tat org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)\n\tat org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:146)\n\tat org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:181)\n\tat org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:234)\n\tat org.eclipse.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:772)\n\tat org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)\n\tat org.eclipse.jetty.server.Server.handle(Server.java:516)\n\tat org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:487)\n\tat org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:732)\n\tat org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:479)\n\tat org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)\n\tat org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)\n\tat org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)\n\tat org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:883)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1034)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\n; error code: 40403"
}
curl --location 'http://10.190.81.165:8082/v3/clusters/6nzE5kIdQY-IaxDsadO30g/topics/lwtest3/records' \
--header 'Content-Type: application/json' \
--data-raw '{
    "value": {
        "subject_name_strategy": "RECORD_NAME",
        "schema_id": 13,
        "data":  {
            "email": "cobolbaby@qq.com",
            "favorite_color": "green",
            "favorite_number": 17,
            "name": "itc180012"
        }
    }
}'

{
    "error_code": 200,
    "cluster_id": "6nzE5kIdQY-IaxDsadO30g",
    "topic_name": "lwtest3",
    "partition_id": 0,
    "offset": 6,
    "timestamp": "2024-04-13T06:25:21.330Z",
    "value": {
        "type": "JSONSCHEMA",
        "subject": "User",
        "schema_id": 13,
        "schema_version": 5,
        "size": 98
    }
}
Fryie commented 3 months ago

@cobolbaby To me, it looks like kafka-rest previously supported setting key.subject.name.strategy and value.subject.name.strategy at a global level which would appear to have worked in conjunction with setting the schema IDs explicitly. It looks like this configuration was removed in newer versions of kafka-rest (I don't know when exactly). I don't know if there is any other way of getting v2 to work. In v3, you can set the subject name strategy in the payload, which appears to work.

At least this is what I found out after attempting a pretty major update of kafka-rest.