databricks / iceberg-kafka-connect

Apache License 2.0
220 stars 49 forks source link

Error deserializing AVRO - schema evolution example #238

Closed dynamike2010 closed 7 months ago

dynamike2010 commented 7 months ago

My setup:

Pushing sample messages (example with redpanda car schema, using rpk tool) from topic to tables using schema v1 - everything works fine - I can see parquet files in S3 no matter how many times I send a message. Key is String and value is AVRO - detailed config below, for example I send like this:

echo "{\"model\":\"rs6\",\"make\":\"audi\",\"year\":2021}" | rpk topic produce cars --schema-id=1 -k 1

and later I send like this:

echo "{\"model\":\"rs3\",\"make\":\"audi\",\"year\":2022,\"type\":{\"string\":\"sport\"}}" | rpk topic produce cars --schema-id=2 -k 2

In new version there new/optional field added to schema (schema ID==2, version==2) however when pushing messages with either --schema-id=2 or --schema-id=topic I always get on the Connect machine the following error/stack:

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
(...)
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro value schema version for id 2
(...)
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema not found; error code: 40403

On registry machine I'm getting this (don't even know what is being sent and why?): "POST /subjects/cars-value?normalize=false&deleted=true HTTP/1.1" 404 49 "-" "Java/11.0.22" 122 (io.confluent.rest-utils.requests)

Schema v1 - cars-value subject:

{
  "type": "record",
  "name": "car",
  "fields": [
    {
      "name": "model",
      "type": "string"
    },
    {
      "name": "make",
      "type": "string"
    },
    {
      "name": "year",
      "type": "float"
    }
  ]
}

Schema v2 - cars-value subject:

{
  "type": "record",
  "name": "car",
  "fields": [
    {
      "name": "model",
      "type": "string"
    },
    {
      "name": "make",
      "type": "string"
    },
    {
      "name": "year",
      "type": "float"
    },
    {
      "name": "type",
      "default": null,
      "type": [
        "null",
        {
          "type": "string"
        }
      ]
    }
  ]
}

Here is config for sink:

{
    "connector.class": "io.tabular.iceberg.connect.IcebergSinkConnector",
    "errors.deadletterqueue.context.headers.enable": "true",
    "errors.deadletterqueue.topic.name": "errors",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true",
    "header.converter": "org.apache.kafka.connect.storage.SimpleHeaderConverter",
    "iceberg.catalog.client.region": "us-east-1",
    "iceberg.catalog.s3.access-key-id": "s3.access-key-id",
    "iceberg.catalog.s3.endpoint": "http://minio:9000",
    "iceberg.catalog.s3.path-style-access": "true",
    "iceberg.catalog.s3.secret-access-key": "s3.secret-access-key",
    "iceberg.catalog.type": "rest",
    "iceberg.catalog.uri": "http://rest:8181",
    "iceberg.control.commit.interval-ms": "5000",
    "iceberg.control.topic": "iceberg-connector-control-cars",
    "iceberg.tables": "orders.cars",
    "iceberg.tables.auto-create-enabled": "true",
    "iceberg.tables.dynamic-enabled": "false",
    "iceberg.tables.evolve-schema-enabled": "true",
    "iceberg.tables.upsert-mode-enabled": "false",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "false",
    "name": "iceberg-sink-connector-c39u-3121",
    "schema.registry.url": "http://redpanda:8081",
    "topics": "cars",
    "use.latest.version": "true",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://redpanda:8081",
    "value.converter.schemas.enable": "false",
    "value.converter.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy",
    "value.serializer.auto.register.schemas": "false",
    "value.subject.name.strategy": "topic"
}

Current docker compose (I tried many variations):

version: "3"

services:
  redpanda:
    image: vectorized/redpanda:latest
    container_name: redpanda
    ports:
      - "9092:9092"
#      - "8081:8081"
#      - "8082:8082"
      - "29092:29092"
    command:
      - redpanda
      - start
      - --overprovisioned
      - --smp
      - "1"
      - --memory
      - "1G"
      - --reserve-memory
      - "0M"
      - --node-id
      - "0"
      - --kafka-addr
      - PLAINTEXT://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      - --advertise-kafka-addr
      - PLAINTEXT://redpanda:29092,OUTSIDE://localhost:9092
      - --check=false

  registry:
    image: confluentinc/cp-schema-registry:7.6.1
    hostname: registry
    container_name: registry
    restart: always
    depends_on:
      - redpanda
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: registry
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: redpanda:29092
      SCHEMA_REGISTRY_LOG4J_LOGGERS: "io.confluent.rest-utils.requests=TRACE"

#  connect:
#    image: redpandadata/connectors:v1.0.21
#    depends_on:
#      - redpanda
#    hostname: connect
#    container_name: connect
#    ports:
#      - "8083:8083"
#    environment:
#      CONNECT_CONFIGURATION: |
#        key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
#        value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
#        group.id=connectors-group
#        offset.storage.topic=_connectors_offsets
#        config.storage.topic=_connectors_configs
#        status.storage.topic=_connectors_status
#        config.storage.replication.factor=-1
#        offset.storage.replication.factor=-1
#        status.storage.replication.factor=-1
#      CONNECT_BOOTSTRAP_SERVERS: "redpanda:29092"
#      CONNECT_PLUGIN_PATH: "/opt/kafka/connect-plugins"
#    volumes:
#      - ./connect-plugins:/opt/kafka/connect-plugins

  connect:
    image: confluentinc/cp-kafka-connect-base:7.6.1
    depends_on:
      - redpanda
      - registry
    hostname: connect
    container_name: connect
    ports:
      - 8083:8083
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'redpanda:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: "connect"
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: connect-cluster-group
      CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-status
      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://registry:8081"
#      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
      CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://registry:8081"
#      CONNECT_INTERNAL_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
      CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
      CONNECT_LOG4J_LOGGERS: "org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: "1"
      CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
      AWS_ACCESS_KEY_ID: "minioadmin"
      AWS_SECRET_ACCESS_KEY: "minioadmin"
    command:
      - bash
      - -c
      - |
        #
        echo "Installing connector plugins"
        confluent-hub install --no-prompt tabular/iceberg-kafka-connect:0.6.15
        #
        echo "Launching Kafka Connect worker"
        /etc/confluent/docker/run &
        #
        echo "Waiting for Kafka Connect to start listening on localhost ⏳"
        while : ; do
          curl_status=$$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
          echo -e $$(date) " Kafka Connect listener HTTP state: " $$curl_status " (waiting for 200)"
          if [ $$curl_status -eq 200 ] ; then
            break
          fi
          sleep 5
        done
        sleep infinity

  console:
    image: vectorized/console
    entrypoint: /bin/sh
    command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
    environment:
      CONFIG_FILEPATH: /tmp/config.yml
      CONSOLE_CONFIG_FILE: |
        kafka:
          brokers: ["redpanda:29092"]
          schemaRegistry:
            enabled: true
            urls: ["http://redpanda:8081"]
        connect:
          enabled: true
          clusters:
            - name: local-connect-cluster
              url: http://connect:8083
        redpanda:
          adminApi:
            enabled: true
            urls: ["http://redpanda:9644"]
    ports:
      - 18080:8080
    depends_on:
      - redpanda
      - connect

  minio:
    image: minio/minio
    hostname: minio
    container_name: minio
    environment:
      - MINIO_ROOT_USER=minioadmin
      - MINIO_ROOT_PASSWORD=minioadmin
      - MINIO_DOMAIN=minio
    networks:
      default:
        aliases:
          - warehouse.minio
    ports:
      - 9001:9001
      - 9000:9000
    command: ["server", "/data", "--console-address", ":9001"]

  aws:
    image: amazon/aws-cli
    container_name: aws-cli
    command: |
      -c "sleep 5 && \
      aws --endpoint-url http://minio:9000 s3 mb s3://warehouse --region eu-west-1 && \
      sleep infinity"
    entrypoint: [/bin/bash]
    environment:
      AWS_ACCESS_KEY_ID: "minioadmin"
      AWS_SECRET_ACCESS_KEY: "minioadmin"
    depends_on:
      - minio

  rest:
    image: tabulario/iceberg-rest
    hostname: rest
    container_name: rest
    ports:
      - 8181:8181
    depends_on:
      - aws
    environment:
      - AWS_ACCESS_KEY_ID=minioadmin
      - AWS_SECRET_ACCESS_KEY=minioadmin
      - AWS_REGION=eu-west-1
      - CATALOG_WAREHOUSE=s3://warehouse/
      - CATALOG_IO__IMPL=org.apache.iceberg.aws.s3.S3FileIO
      - CATALOG_S3_ENDPOINT=http://minio:9000
      - CATALOG_S3_PATH__STYLE__ACCESS=True

What am I missing?

Thank you!

fqtab commented 7 months ago

This is not a connector issue. The errors are all happening before they reach the actual connector code. You should see the same thing happen with any other connector. This is an issue with your value.converter settings:

    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://redpanda:8081",
    "value.converter.schemas.enable": "false",
    "value.converter.subject.name.strategy": "io.confluent.kafka.serializers.subject.TopicNameStrategy",
    "value.serializer.auto.register.schemas": "false",
    "value.subject.name.strategy": "topic"

You're better off reaching out to Confluent or Redpanda folks for help with this.


That said, I can give you some pointers based on my experience.

I suspect these are not the correct names for the configs: value.serializer.auto.register.schemas and value.subject.name.strategy: These should be prefixed with value.converter usually.

In addition, you will want to check value.converter.subject.name.strategy is actually io.confluent.kafka.serializers.subject.TopicNameStrategy to make sure the registry-client resolves schemas correctly.


On registry machine I'm getting this (don't even know what is being sent and why?): "POST /subjects/cars-value?normalize=false&deleted=true HTTP/1.1" 404 49 "-" "Java/11.0.22" 122 (io.confluent.rest-utils.requests)

This is a bit weird. I'm not familiar with that route: POST /subjects/cars-value?normalize=false&deleted=true I'm familiar with POST /subjects/cars-value?normalize=false route. Didn't know you could add deleted=true to the end of that TBH. Doesn't make much sense to me 🤷 Might be worth digging into that more.

dynamike2010 commented 7 months ago

That's the point. At first when I was using Redpanda as SR it could be compatibility issue but once I switch to confluent SR, now both Connect and SR are from same "provider" so in theory it SHOULD work/be well tested. I encountered this &delete=true here https://github.com/redpanda-data/redpanda/issues/11912 as someone had similar problem but issue wasn't solved there.

dynamike2010 commented 7 months ago

OK. I was searching using this "&deleted=true" and found dozen of entries, one of them pointing to a bug that changes YOUR SCHEMA ;-) So it seems that it converts every field of type="string" so my schema need to be sth like this:

{
    "type": "record",
    "name": "car",
    "fields": [
        {
            "name": "model",
            "type": "string",
            "avro.java.string": "String"
        },
        {
            "name": "make",
            "type": "string",
            "avro.java.string": "String"
        },
        {
            "name": "year",
            "type": "float"
        },
        {
            "name": "type",
            "type": [
                "null",
                {
                    "type": "string",
                    "avro.java.string": "String"
                }
            ],
            "default": null
        }
    ]
}

So it has nothing to do with IcebergSink cause the error happens between kafka connect and kafka registry.

Just make sure if you have this problem enrich your string type from:

{
  "type": "string"
}

to

{
  "type": "string",
  "avro.java.string": "String" 
}

I will try to find "official" response/articles on this and will paste here.

dynamike2010 commented 7 months ago

More details here: https://github.com/confluentinc/schema-registry/issues/868 https://github.com/confluentinc/schema-registry/issues/2479