apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.45k stars 3.7k forks source link

Druid error when using protobuf and Schema Registry #16881

Closed Ycallaer closed 2 months ago

Ycallaer commented 2 months ago

Hi, As part of a POC we are trying to ingest kafka protobuf messages, the schema is stored in schema registry. We are following these 2 blogs:

Since this is a POC, no security is present on the kafka cluster ( we are using Confluent CFK, the demo repo that they made available).

To run druid we are using https://github.com/datainfrahq/druid-operator.

The ingestion configuration we are submitting from the UI is the following

{
"type": "kafka",
"spec": {
    "dataSchema": {
        "dataSource": "etf_dummy_data",
        "timestampSpec": {
            "column": "timestamp",
            "format": "auto"
        },
        "dimensionsSpec": {
            "dimensions": [
                "unit",
                "http_method",
                "http_code",
                "page",
                "metricType",
                "server"
            ],
            "dimensionExclusions": [
                "timestamp",
                "value"
            ]
        },
        "metricsSpec": [
            {
                "name": "count",
                "type": "count"
            },
            {
                "name": "value_sum",
                "fieldName": "value",
                "type": "doubleSum"
            },
            {
                "name": "value_min",
                "fieldName": "value",
                "type": "doubleMin"
            },
            {
                "name": "value_max",
                "fieldName": "value",
                "type": "doubleMax"
            }
        ],
        "granularitySpec": {
            "type": "uniform",
            "segmentGranularity": "HOUR",
            "queryGranularity": "NONE"
        }
    },
    "tuningConfig": {
        "type": "kafka",
        "maxRowsPerSegment": 5000000
    },
    "ioConfig": {
        "topic": "etf_dummy_data",
        "consumerProperties": {
            "bootstrap.servers": "kafka.confluent.svc.cluster.local:9092"
        },
        "inputFormat": {
            "type": "protobuf",
            "protoBytesDecoder": {
                "url": "http://schemaregistry.confluent.svc.cluster.local:8081",
            "type": "schema_registry",
                "capacity": 100,
            },
            "flattenSpec": {
                "useFieldDiscovery": true
            },
            "binaryAsString": false
        },
        "taskCount": 1,
        "replicas": 1,
        "taskDuration": "PT1H",
        "type": "kafka"
    }
}
}

When this is submitted, the following stacktrace is generated from the UI

Failed to submit supervisor: Cannot construct instance of `org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder`,
 problem: io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 302]
 (through reference chain: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec["spec"]
 ->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIngestionSpec["ioConfig"]
 ->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig["inputFormat"]
 ->org.apache.druid.data.input.protobuf.ProtobufInputFormat["protoBytesDecoder"])

If we just provide topic and broker we get a successful connection, but we get binary data. So we know it is not a network / connectivity issue. The moment the schema registry part is added is when we get errors.

Affected Version

I have tested this with the image of Apache druid 25.0.0 and 28.0.1 and 29.0.1

Description

Please include as much detailed information about the problem as possible.

If we change the keyword inputFormat to parser (the old syntax way), the job submits, but the configuration part of schema registry dissapears and the job in the end fails stating that inputFormat block is missing.

I had already reached out in the slack channel, and they requested me to log an incident.

Additionally, when these jobs get triggered I can see that the following libraries being loaded but not the 3 libs that I add through the side container. Not sure if that is relevant or not.

12:34:12.739 [main] WARN  org.apache.druid.indexing.common.tasklogs.ConsoleLoggingEnforcementConfigurationFactory - Clearing all configured appenders for logger root. Using _Injected_Console_Appender_ instead.
12:34:12.743 [main] WARN  org.apache.druid.indexing.common.tasklogs.ConsoleLoggingEnforcementConfigurationFactory - Clearing all configured appenders for logger root. Using _Injected_Console_Appender_ instead.
2024-08-12T12:34:13,312 INFO [main] org.hibernate.validator.internal.util.Version - HV000001: Hibernate Validator 6.2.5.Final
2024-08-12T12:34:14,710 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-avro-extensions], jars: schema-repo-common-0.1.3.jar, druid-avro-extensions-28.0.1.jar, velocity-engine-core-2.3.jar, xz-1.9.jar, avro-1.11.1.jar, guava-31.1-jre.jar, jersey-client-1.19.4.jar, failureaccess-1.0.1.jar, hadoop-client-api-3.3.6.jar, jackson-dataformat-yaml-2.12.7.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snakeyaml-1.33.jar, snappy-java-1.1.10.4.jar, swagger-core-1.6.2.jar, schema-repo-client-0.1.3.jar, common-config-5.5.12.jar, javax.annotation-api-1.3.2.jar, schema-repo-avro-0.1.3.jar, swagger-models-1.6.2.jar, checker-qual-3.12.0.jar, gson-2.3.1.jar, schema-repo-api-0.1.3.jar, kafka-clients-5.5.12-ccs.jar, j2objc-annotations-1.3.jar, lz4-java-1.8.0.jar, error_prone_annotations-2.20.0.jar, avro-mapred-1.11.1.jar, kafka-schema-registry-client-5.5.12.jar, avro-ipc-jetty-1.11.1.jar, avro-ipc-1.11.1.jar, zstd-jni-1.5.2-3.jar, commons-lang3-3.12.0.jar, common-utils-5.5.12.jar, swagger-annotations-1.6.2.jar
2024-08-12T12:34:14,732 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-s3-extensions], jars: jmespath-java-1.12.497.jar, aws-java-sdk-sts-1.12.497.jar, commons-logging-1.1.1.jar, aws-java-sdk-core-1.12.497.jar, httpcore-4.4.11.jar, ion-java-1.0.2.jar, druid-s3-extensions-28.0.1.jar, joda-time-2.12.5.jar, commons-codec-1.13.jar, jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar
2024-08-12T12:34:14,740 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-hdfs-storage], jars: jmespath-java-1.12.497.jar, aws-java-sdk-s3-1.12.497.jar, wildfly-openssl-1.1.3.Final.jar, commons-logging-1.1.1.jar, hadoop-client-api-3.3.6.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snappy-java-1.1.10.4.jar, aws-java-sdk-core-1.12.497.jar, httpcore-4.4.11.jar, ion-java-1.0.2.jar, druid-hdfs-storage-28.0.1.jar, joda-time-2.12.5.jar, commons-codec-1.13.jar, hadoop-client-runtime-3.3.6.jar, jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar, hadoop-aws-3.3.6.jar, aws-java-sdk-kms-1.12.497.jar
2024-08-12T12:34:14,762 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-kafka-indexing-service], jars: druid-kafka-indexing-service-28.0.1.jar, snappy-java-1.1.10.4.jar, lz4-java-1.8.0.jar, zstd-jni-1.5.2-3.jar, kafka-clients-3.5.1.jar
2024-08-12T12:34:14,765 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-datasketches], jars: druid-datasketches-28.0.1.jar, commons-math3-3.6.1.jar
2024-08-12T12:34:14,772 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-kubernetes-extensions], jars: commons-collections4-4.2.jar, checker-qual-2.10.0.jar, joda-convert-2.2.1.jar, bcpkix-jdk15on-1.70.jar, gson-2.8.6.jar, okhttp-3.14.9.jar, jose4j-0.7.3.jar, caffeine-2.8.0.jar, client-java-extended-11.0.4.jar, simpleclient_httpserver-0.9.0.jar, commons-io-2.11.0.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snakeyaml-1.33.jar, gson-fire-1.8.5.jar, okio-1.17.2.jar, zjsonpatch-0.4.11.jar, bcprov-ext-jdk15on-1.70.jar, javax.annotation-api-1.3.2.jar, bcprov-jdk15on-1.70.jar, druid-kubernetes-extensions-28.0.1.jar, joda-time-2.12.5.jar, bcutil-jdk15on-1.70.jar, commons-codec-1.13.jar, simpleclient-0.9.0.jar, error_prone_annotations-2.20.0.jar, client-java-api-11.0.4.jar, client-java-11.0.4.jar, protobuf-java-3.24.0.jar, client-java-proto-11.0.4.jar, logging-interceptor-3.14.9.jar, bucket4j-core-4.10.0.jar, commons-lang3-3.12.0.jar, swagger-annotations-1.6.2.jar, simpleclient_common-0.9.0.jar
2024-08-12T12:34:14,808 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-protobuf-extensions], jars: druid-protobuf-extensions-28.0.1.jar
2024-08-12T12:34:15,534 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-avro-extensions], jars: schema-repo-common-0.1.3.jar, druid-avro-extensions-28.0.1.jar, velocity-engine-core-2.3.jar, xz-1.9.jar, avro-1.11.1.jar, guava-31.1-jre.jar, jersey-client-1.19.4.jar, failureaccess-1.0.1.jar, hadoop-client-api-3.3.6.jar, jackson-dataformat-yaml-2.12.7.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snakeyaml-1.33.jar, snappy-java-1.1.10.4.jar, swagger-core-1.6.2.jar, schema-repo-client-0.1.3.jar, common-config-5.5.12.jar, javax.annotation-api-1.3.2.jar, schema-repo-avro-0.1.3.jar, swagger-models-1.6.2.jar, checker-qual-3.12.0.jar, gson-2.3.1.jar, schema-repo-api-0.1.3.jar, kafka-clients-5.5.12-ccs.jar, j2objc-annotations-1.3.jar, lz4-java-1.8.0.jar, error_prone_annotations-2.20.0.jar, avro-mapred-1.11.1.jar, kafka-schema-registry-client-5.5.12.jar, avro-ipc-jetty-1.11.1.jar, avro-ipc-1.11.1.jar, zstd-jni-1.5.2-3.jar, commons-lang3-3.12.0.jar, common-utils-5.5.12.jar, swagger-annotations-1.6.2.jar
2024-08-12T12:34:15,541 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-s3-extensions], jars: jmespath-java-1.12.497.jar, aws-java-sdk-sts-1.12.497.jar, commons-logging-1.1.1.jar, aws-java-sdk-core-1.12.497.jar, httpcore-4.4.11.jar, ion-java-1.0.2.jar, druid-s3-extensions-28.0.1.jar, joda-time-2.12.5.jar, commons-codec-1.13.jar, jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar
2024-08-12T12:34:15,547 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-hdfs-storage], jars: jmespath-java-1.12.497.jar, aws-java-sdk-s3-1.12.497.jar, wildfly-openssl-1.1.3.Final.jar, commons-logging-1.1.1.jar, hadoop-client-api-3.3.6.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snappy-java-1.1.10.4.jar, aws-java-sdk-core-1.12.497.jar, httpcore-4.4.11.jar, ion-java-1.0.2.jar, druid-hdfs-storage-28.0.1.jar, joda-time-2.12.5.jar, commons-codec-1.13.jar, hadoop-client-runtime-3.3.6.jar, jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar, hadoop-aws-3.3.6.jar, aws-java-sdk-kms-1.12.497.jar
2024-08-12T12:34:15,554 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-kafka-indexing-service], jars: druid-kafka-indexing-service-28.0.1.jar, snappy-java-1.1.10.4.jar, lz4-java-1.8.0.jar, zstd-jni-1.5.2-3.jar, kafka-clients-3.5.1.jar
2024-08-12T12:34:15,564 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-datasketches], jars: druid-datasketches-28.0.1.jar, commons-math3-3.6.1.jar
2024-08-12T12:34:15,588 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-kubernetes-extensions], jars: commons-collections4-4.2.jar, checker-qual-2.10.0.jar, joda-convert-2.2.1.jar, bcpkix-jdk15on-1.70.jar, gson-2.8.6.jar, okhttp-3.14.9.jar, jose4j-0.7.3.jar, caffeine-2.8.0.jar, client-java-extended-11.0.4.jar, simpleclient_httpserver-0.9.0.jar, commons-io-2.11.0.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snakeyaml-1.33.jar, gson-fire-1.8.5.jar, okio-1.17.2.jar, zjsonpatch-0.4.11.jar, bcprov-ext-jdk15on-1.70.jar, javax.annotation-api-1.3.2.jar, bcprov-jdk15on-1.70.jar, druid-kubernetes-extensions-28.0.1.jar, joda-time-2.12.5.jar, bcutil-jdk15on-1.70.jar, commons-codec-1.13.jar, simpleclient-0.9.0.jar, error_prone_annotations-2.20.0.jar, client-java-api-11.0.4.jar, client-java-11.0.4.jar, protobuf-java-3.24.0.jar, client-java-proto-11.0.4.jar, logging-interceptor-3.14.9.jar, bucket4j-core-4.10.0.jar, commons-lang3-3.12.0.jar, swagger-annotations-1.6.2.jar, simpleclient_common-0.9.0.jar
2024-08-12T12:34:15,603 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-protobuf-extensions], jars: druid-protobuf-extensions-28.0.1.jar
Ycallaer commented 2 months ago

Hi, I was able to solve it. The blogs I listed were a bit contradictory as to where the JARs needed to be.

I refactored the code to have all the JARs in druid-protobuf-extensions and now I am able to submit the job. The UI no longer throws an error and from the supervisor I can see the schema registry definition is kept intact.