provectus / kafka-ui

Open-Source Web UI for Apache Kafka Management
Apache License 2.0
9.91k stars 1.2k forks source link

Schema validation error when using AVRO schema with union types #4288

Open emad-eldeen opened 1 year ago

emad-eldeen commented 1 year ago

Issue submitter TODO list

Describe the bug (actual behavior)

I am trying to publish messages in the topic test with Value Serde Schemaregistry.

The following message is published successfully:

{"offerPrice": null,"customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}

image

image

However, the following message is faild to be published:

{"offerPrice": "teststring","customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}

image

Expected behavior

the message

{"offerPrice": "teststring","customerOrderId":"6b39d152-c94c-13c4-8116-7f64136f2fa5"}

should pass the schema validation

Your installation details

I am using the following docker-compose:

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:7.5.0
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.0
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

  kafka-ui:
    container_name: kafka-ui
    image: provectuslabs/kafka-ui:53a6553765a806eda9905c43bfcfe09da6812035
    ports:
      - 8089:8080
    environment:
      DYNAMIC_CONFIG_ENABLED: 'true'
      KAFKA_CLUSTERS_0_NAME: LOCAL
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: broker:29092
      KAFKA_CLUSTERS_0_SCHEMAREGISTRY: "http://schema-registry:8081"
      KAFKA_CLUSTERS_0_PROPERTIES_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: '' # DISABLE COMMON NAME VERIFICATION
    depends_on:
      - broker
      - schema-registry

and the following AVRO schema test-value

{
    "type": "record",
    "name": "test",
    "namespace": "com.test.model.avro",
    "fields": [
        {
            "name": "customerOrderId",
            "type": {
                "type": "string",
                "logicalType": "uuid"
            }
        },
        {
            "name": "offerPrice",
            "type": [
                "null",
                "string"
            ],
            "default": null
        }
    ]
}

Steps to reproduce

Screenshots

No response

Logs

No response

Additional context

No response

github-actions[bot] commented 1 year ago

Hello there emad-eldeen! šŸ‘‹

Thank you and congratulations šŸŽ‰ for opening your very first issue in this project! šŸ’–

In case you want to claim this issue, please comment down below! We will try to get back to you as soon as we can. šŸ‘€

emad-eldeen commented 1 year ago

I tried bypass the frontend and send a POST request to the backend. image

I got error code 500 with the following response:

{
    "code": 5000,
    "message": "Failed to serialize record for topic test",
    "timestamp": 1697609387664,
    "requestId": "a61877db-220",
    "fieldsErrors": null,
    "stackTrace": "java.lang.RuntimeException: Failed to serialize record for topic test\n\tat com.provectus.kafka.ui.serdes.builtin.sr.AvroSchemaRegistrySerializer.serialize(AvroSchemaRegistrySerializer.java:40)\n\tSuppressed: The stacktrace has been enhanced by Reactor, refer to additional information below: \nError has been observed at the following site(s):\n\t*__checkpoint ā‡¢ Handler com.provectus.kafka.ui.controller.MessagesController#sendTopicMessages(String, String, Mono, ServerWebExchange) [DispatcherHandler]\n\t*__checkpoint ā‡¢ com.provectus.kafka.ui.config.CorsGlobalConfiguration$$Lambda$1025/0x00000008016838c8 [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ com.provectus.kafka.ui.config.CustomWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ com.provectus.kafka.ui.config.ReadOnlyModeFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ AuthorizationWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ ExceptionTranslationWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ LogoutWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ ServerRequestCacheWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ SecurityContextServerWebExchangeWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ ReactorContextWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ HttpHeaderWriterWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ ServerWebExchangeReactorContextWebFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ org.springframework.security.web.server.WebFilterChainProxy [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ org.springframework.web.filter.reactive.ServerHttpObservationFilter [DefaultWebFilterChain]\n\t*__checkpoint ā‡¢ HTTP POST \"/api/clusters/LOCAL/topics/test/messages\" [ExceptionHandlingWebHandler]\nOriginal Stack Trace:\n\t\tat com.provectus.kafka.ui.serdes.builtin.sr.AvroSchemaRegistrySerializer.serialize(AvroSchemaRegistrySerializer.java:40)\n\t\tat com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerializer.serialize(SchemaRegistrySerializer.java:29)\n\t\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$3(SerdeInstance.java:63)\n\t\tat com.provectus.kafka.ui.serdes.SerdeInstance.wrapWithClassloader(SerdeInstance.java:34)\n\t\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$4(SerdeInstance.java:63)\n\t\tat com.provectus.kafka.ui.serdes.ProducerRecordCreator.create(ProducerRecordCreator.java:27)\n\t\tat com.provectus.kafka.ui.service.MessagesService.sendMessageImpl(MessagesService.java:137)\n\t\tat com.provectus.kafka.ui.service.MessagesService.lambda$sendMessage$8(MessagesService.java:112)\n\t\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)\n\t\tat reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:181)\n\t\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\n\t\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\n\t\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\t\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\t\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\t\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: org.apache.avro.AvroTypeException: Expected start-union. Got VALUE_STRING\n\tat org.apache.avro.io.JsonDecoder.error(JsonDecoder.java:511)\n\tat org.apache.avro.io.JsonDecoder.readIndex(JsonDecoder.java:430)\n\tat org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)\n\tat org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)\n\tat org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)\n\tat org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)\n\tat org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)\n\tat org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)\n\tat org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)\n\tat org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)\n\tat io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils.toObject(AvroSchemaUtils.java:214)\n\tat com.provectus.kafka.ui.serdes.builtin.sr.AvroSchemaRegistrySerializer.serialize(AvroSchemaRegistrySerializer.java:38)\n\tat com.provectus.kafka.ui.serdes.builtin.sr.SchemaRegistrySerializer.serialize(SchemaRegistrySerializer.java:29)\n\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$3(SerdeInstance.java:63)\n\tat com.provectus.kafka.ui.serdes.SerdeInstance.wrapWithClassloader(SerdeInstance.java:34)\n\tat com.provectus.kafka.ui.serdes.SerdeInstance.lambda$serializer$4(SerdeInstance.java:63)\n\tat com.provectus.kafka.ui.serdes.ProducerRecordCreator.create(ProducerRecordCreator.java:27)\n\tat com.provectus.kafka.ui.service.MessagesService.sendMessageImpl(MessagesService.java:137)\n\tat com.provectus.kafka.ui.service.MessagesService.lambda$sendMessage$8(MessagesService.java:112)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:132)\n\tat reactor.core.publisher.MonoPublishOn$PublishOnSubscriber.run(MonoPublishOn.java:181)\n\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)\n\tat reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\n"
}