tchiotludo / akhq

Kafka GUI for Apache Kafka to manage topics, topics data, consumers group, schema registry, connect and more...
https://akhq.io/
Apache License 2.0
3.41k stars 659 forks source link

Support references in schema registry #304

Closed dblooman closed 4 years ago

dblooman commented 4 years ago

Error

com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "references" (class io.confluent.kafka.schemaregistry.client.rest.entities.Schema), not marked as ignorable (4 known properties: "version", "schema", "id", "subject"]) at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 596] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.Schema["references"])

This is where a schema exists within the registry as

{
    "schema": "{\"type\":\"record\",\"name\":\"ProductView\",\"namespace\":\"com.depop.model\",\"fields\":[{\"name\":\"eventType\",\"type\":\"string\"},{\"name\":\"eventTimestamp\"}",
    "references": [
        {
            "name": "ActivityEvent",
            "subject": "activity-base-event-value",
            "version": 1
        }
    ]
}

Registry code - https://github.com/confluentinc/schema-registry/blob/ac6843e3e7244163bb50f79a60ca8d6a847c24f9/client/src/main/java/io/confluent/kafka/schemaregistry/client/rest/entities/SchemaReference.java

API reference - https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions

Currently causing the front end to error

tchiotludo commented 4 years ago

On which url of akhq ? You are trying to produce with akhq ?

dblooman commented 4 years ago

URL : baseurl/clustername/schema

The issue occurs when visiting page. Visiting a specific schema works such as baseurl/clustername/schema/schemaName-value

full stack trace

java.lang.RuntimeException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "references" (class io.confluent.kafka.schemaregistry.client.rest.entities.Schema), not marked as ignorable (4 known properties: "version", "schema", "id", "subject"])
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 596] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.Schema["references"])
    at org.akhq.repositories.SchemaRegistryRepository.lambda$toSchemasLastestVersion$1(SchemaRegistryRepository.java:47)
    at java.base/java.util.stream.ReferencePipeline$3$1.accept(Unknown Source)
    at java.base/java.util.ArrayList$SubList$2.forEachRemaining(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
    at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown Source)
    at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source)
    at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source)
    at org.akhq.repositories.SchemaRegistryRepository.toSchemasLastestVersion(SchemaRegistryRepository.java:50)
    at org.akhq.repositories.SchemaRegistryRepository.lambda$list$0(SchemaRegistryRepository.java:34)
    at org.akhq.utils.PagedList.of(PagedList.java:70)
    at org.akhq.repositories.SchemaRegistryRepository.list(SchemaRegistryRepository.java:34)
    at org.akhq.controllers.SchemaController.list(SchemaController.java:55)
    at org.akhq.controllers.$SchemaControllerDefinition$$exec1.invokeInternal(Unknown Source)
    at io.micronaut.context.AbstractExecutableMethod.invoke(AbstractExecutableMethod.java:146)
    at io.micronaut.context.DefaultBeanContext$BeanExecutionHandle.invoke(DefaultBeanContext.java:2995)
    at io.micronaut.web.router.AbstractRouteMatch.execute(AbstractRouteMatch.java:286)
    at io.micronaut.web.router.RouteMatch.execute(RouteMatch.java:122)
    at io.micronaut.http.server.netty.RoutingInBoundHandler.lambda$buildResultEmitter$16(RoutingInBoundHandler.java:1474)
    at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:71)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:37)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableSwitchIfEmpty.subscribeActual(FlowableSwitchIfEmpty.java:32)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableSwitchMap.subscribeActual(FlowableSwitchMap.java:49)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.reactivex.internal.operators.flowable.FlowableDefer.subscribeActual(FlowableDefer.java:42)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.reactivex.internal.operators.flowable.FlowableSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FlowableSwitchIfEmpty.java:71)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onComplete(RxInstrumentedSubscriber.java:97)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:426)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onComplete(FlowableFlatMap.java:338)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onComplete(RxInstrumentedSubscriber.java:97)
    at io.reactivex.internal.operators.maybe.MaybeToFlowable$MaybeToFlowableSubscriber.onComplete(MaybeToFlowable.java:80)
    at io.micronaut.reactive.rxjava2.RxInstrumentedMaybeObserver.onComplete(RxInstrumentedMaybeObserver.java:96)
    at io.reactivex.internal.operators.maybe.MaybeDoOnEvent$DoOnEventMaybeObserver.onComplete(MaybeDoOnEvent.java:115)
    at io.micronaut.reactive.rxjava2.RxInstrumentedMaybeObserver.onComplete(RxInstrumentedMaybeObserver.java:96)
    at io.reactivex.internal.operators.flowable.FlowableElementAtMaybe$ElementAtSubscriber.onComplete(FlowableElementAtMaybe.java:102)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onComplete(RxInstrumentedSubscriber.java:97)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:426)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onComplete(FlowableFlatMap.java:338)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onComplete(RxInstrumentedSubscriber.java:97)
    at io.reactivex.internal.operators.flowable.FlowableFromIterable$IteratorSubscription.slowPath(FlowableFromIterable.java:255)
    at io.reactivex.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:124)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:117)
    at io.micronaut.reactive.rxjava2.RxInstrumentedSubscriber.onSubscribe(RxInstrumentedSubscriber.java:54)
    at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:69)
    at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:47)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableElementAtMaybe.subscribeActual(FlowableElementAtMaybe.java:36)
    at io.reactivex.Maybe.subscribe(Maybe.java:4290)
    at io.micronaut.reactive.rxjava2.RxInstrumentedMaybe.subscribeActual(RxInstrumentedMaybe.java:53)
    at io.reactivex.Maybe.subscribe(Maybe.java:4290)
    at io.reactivex.internal.operators.maybe.MaybeDoOnEvent.subscribeActual(MaybeDoOnEvent.java:39)
    at io.reactivex.Maybe.subscribe(Maybe.java:4290)
    at io.micronaut.reactive.rxjava2.RxInstrumentedMaybe.subscribeActual(RxInstrumentedMaybe.java:53)
    at io.reactivex.Maybe.subscribe(Maybe.java:4290)
    at io.reactivex.internal.operators.maybe.MaybeToFlowable.subscribeActual(MaybeToFlowable.java:45)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:53)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableSwitchIfEmpty.subscribeActual(FlowableSwitchIfEmpty.java:32)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14868)
    at io.micronaut.configuration.metrics.binder.web.WebMetricsPublisher.subscribe(WebMetricsPublisher.java:153)
    at io.reactivex.internal.operators.flowable.FlowableFromPublisher.subscribeActual(FlowableFromPublisher.java:29)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableSwitchMap.subscribeActual(FlowableSwitchMap.java:49)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.internal.operators.flowable.FlowableMap.subscribeActual(FlowableMap.java:37)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.micronaut.reactive.rxjava2.RxInstrumentedFlowable.subscribeActual(RxInstrumentedFlowable.java:58)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14868)
    at io.micronaut.http.context.ServerRequestTracingPublisher.lambda$subscribe$0(ServerRequestTracingPublisher.java:52)
    at io.micronaut.http.context.ServerRequestContext.with(ServerRequestContext.java:68)
    at io.micronaut.http.context.ServerRequestTracingPublisher.subscribe(ServerRequestTracingPublisher.java:52)
    at io.reactivex.internal.operators.flowable.FlowableFromPublisher.subscribeActual(FlowableFromPublisher.java:29)
    at io.reactivex.Flowable.subscribe(Flowable.java:14918)
    at io.reactivex.Flowable.subscribe(Flowable.java:14865)
    at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
    at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:288)
    at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:253)
    at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:79)
    at io.micrometer.core.instrument.Timer.lambda$wrap$0(Timer.java:144)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at io.micronaut.scheduling.instrument.InvocationInstrumenterWrappedRunnable.run(InvocationInstrumenterWrappedRunnable.java:48)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "references" (class io.confluent.kafka.schemaregistry.client.rest.entities.Schema), not marked as ignorable (4 known properties: "version", "schema", "id", "subject"])
 at [Source: (sun.net.www.protocol.http.HttpURLConnection$HttpInputStream); line: 1, column: 596] (through reference chain: io.confluent.kafka.schemaregistry.client.rest.entities.Schema["references"])
    at com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException.from(UnrecognizedPropertyException.java:61)
    at com.fasterxml.jackson.databind.DeserializationContext.handleUnknownProperty(DeserializationContext.java:840)
    at com.fasterxml.jackson.databind.deser.std.StdDeserializer.handleUnknownProperty(StdDeserializer.java:1192)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperty(BeanDeserializerBase.java:1592)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.handleUnknownProperties(BeanDeserializerBase.java:1542)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer._deserializeUsingPropertyBased(BeanDeserializer.java:438)
    at com.fasterxml.jackson.databind.deser.BeanDeserializerBase.deserializeFromObjectUsingNonDefault(BeanDeserializerBase.java:1287)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserializeFromObject(BeanDeserializer.java:326)
    at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:159)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4202)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3250)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:259)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:331)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:599)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.getLatestVersion(RestService.java:591)
    at org.akhq.repositories.SchemaRegistryRepository.getLatestVersion(SchemaRegistryRepository.java:97)
    at org.akhq.repositories.SchemaRegistryRepository.lambda$toSchemasLastestVersion$1(SchemaRegistryRepository.java:45)
    ... 126 more
dblooman commented 4 years ago

Also when visiting that schema page directly

tchiotludo commented 4 years ago

OK I better understand and I'm not aware of this feature of SR ! PR are welcome on that !

For Help, can you also provide the reference schema ? When I try to add schema and do some tricks on my SR, no way to have this schema inserted.

dblooman commented 4 years ago

schema to be referenced

{
    "type": "record",
    "name": "ActivityEvent",
    "namespace": "com.depop.model",
    "fields": [
        {
            "name": "schemaVersion",
            "type": "string"
        },
        {
            "name": "eventType",
            "type": "string"
        },
        {
            "name": "eventTime",
            "type": "long"
        },
        {
            "name": "loadedComponents",
            "type": {
                "type": "array",
                "items": "string"
            }
        }
    ]
}

Schema with reference

{
    "type": "record",
    "name": "Foo",
    "namespace": "com.depop.model",
    "fields": [
        {
            "name": "eventType",
            "type": "string"
        },
        {
            "name": "eventTimestamp",
            "type": "long"
        },
        {
            "name": "baseEvent",
            "type": "com.depop.model.ActivityEvent"
        }
    ]
}
dblooman commented 4 years ago

I am using version 5.5.0 of schema registry also

tchiotludo commented 4 years ago

@dblooman I just merged on dev a new version, I think it will work with reference since I update the SR client.

Can you test please ?

mredjem commented 4 years ago

Hi, I am currently facing a similar issue when dealing with schema references. I tested it using Confluent SR 5.5.1 and the latest Akhq dev Docker image. When accessing the SR tab on the UI, I get the following error:

org.apache.avro.SchemaParseException: Undefined name: "ServiceActivation"
    at org.apache.avro.Schema.parse(Schema.java:1633)
    at org.apache.avro.Schema.parse(Schema.java:1751)
    at org.apache.avro.Schema.parse(Schema.java:1668)
    at org.apache.avro.Schema$Parser.parse(Schema.java:1425)
    at org.apache.avro.Schema$Parser.parse(Schema.java:1413)
    at org.akhq.models.Schema.<init>(Schema.java:53)
    at org.akhq.repositories.SchemaRegistryRepository.getLatestVersion(SchemaRegistryRepository.java:101)
    at org.akhq.repositories.SchemaRegistryRepository.lambda$toSchemasLastestVersion$1(SchemaRegistryRepository.java:47)

My Avro schema is:

{
    "subject": "service-value",
    "version": 1,
    "id": 4,
    "references": [
        {
            "name": "ServiceActivation",
            "subject": "service_activation-value",
            "version": 1
        }
    ],
    "schema": "{\"type\":\"record\",\"name\":\"Service\",\"namespace\":\"com.models\",\"doc\":\"A service managed by ???.\",\"fields\":[{\"name\":\"id\",\"type\":\"int\",\"doc\":\"The numeric identifier for the service.\"},{\"name\":\"label\",\"type\":\"string\",\"doc\":\"The display name for the service.\"},{\"name\":\"type\",\"type\":\"string\",\"doc\":\"\"},{\"name\":\"activation\",\"type\":[\"null\",\"ServiceActivation\"]}]}"
}
tchiotludo commented 4 years ago

Ok 😢 So there is some work on it 😢

mredjem commented 4 years ago

Managed to make it work on a snippet but there is a lot more code than before:

RestService restService = new RestService("http://localhost:8081");
restService.setHttpHeaders(Collections.singletonMap("Accept", "application/json"));

Schema schema = restService.getLatestVersion("store-value");
Parser parser = new Parser().setValidateDefaults(false);

AvroSchemaProvider provider = new AvroSchemaProvider();
provider.configure(Collections.singletonMap(
  "schemaVersionFetcher", new CachedSchemaRegistryClient(restService, 100)
));

ParsedSchema parsedSchema = provider
  .parseSchema(schema.getSchema(), schema.getReferences())
  .orElseThrow(() -> new RuntimeException("invalid schema"));

org.apache.avro.Schema avroSchema = parser.parse(parsedSchema.rawSchema().toString());

System.out.println(avroSchema.getFullName());
tchiotludo commented 4 years ago

Hi @mredjem ! Thanks it will help to implement the feature!

(heey also PR are welcome if you have time :smile:)

mredjem commented 4 years ago

Hi @tchiotludo, I created the PR #408 to manage Avro schema references. Can you check it out ?