Apicurio / apicurio-registry

An API/Schema registry - stores APIs and Schemas.
https://www.apicur.io/registry/
Apache License 2.0
586 stars 259 forks source link

Error when using the ProtobufKafkaDeseserializer - InvalidWireTypeException: Protocol message tag had invalid wire type. #1186

Open ebbnflow opened 3 years ago

ebbnflow commented 3 years ago

I have a kafka consumer reading messages and deserializing them using the ProtobufKafkaDeserializer (I subclassed it to make it support using the headers for storing the global Id) and when reading the messages the deserializer throws a messages at the line 59 Serde.Schema s = Serde.Schema.parseFrom(schema); in the readData((byte[] schema, ByteBuffer buffer, int start, int length)) method.

    at com.google.protobuf.InvalidProtocolBufferException.invalidWireType(InvalidProtocolBufferException.java:111)
    at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:557)
    at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:521)
    at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:634)
    at com.google.protobuf.UnknownFieldSet$Builder.mergeFrom(UnknownFieldSet.java:308)
    at com.google.protobuf.CodedInputStream$ArrayDecoder.readGroup(CodedInputStream.java:833)
    at com.google.protobuf.UnknownFieldSet$Builder.mergeFieldFrom(UnknownFieldSet.java:548)
    at com.google.protobuf.GeneratedMessageV3.parseUnknownField(GeneratedMessageV3.java:320)
    at io.apicurio.registry.common.proto.Serde$Schema.<init>(Serde.java:130)
    at io.apicurio.registry.common.proto.Serde$Schema.<init>(Serde.java:63)
    at io.apicurio.registry.common.proto.Serde$Schema$1.parsePartialFrom(Serde.java:1015)
    at io.apicurio.registry.common.proto.Serde$Schema$1.parsePartialFrom(Serde.java:1009)
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:158)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:191)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:203)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:208)
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:48)
    at io.apicurio.registry.common.proto.Serde$Schema.parseFrom(Serde.java:349)
    at com.mywork.events.registry.serde.ProtobufKafkaSerdeTest.shouldSerdeMessage(ProtobufKafkaSerdeTest.java:103)

I wrote a unit test in attempt to discover the issue. It started out with a simple serialize and deserialize call, which of course failed with the above error. I then started picking apart the serializer code and placed some of those lines in my unit test. It's saying that my schema has unknown fields. I've converted the results from the api call to a string and I don't see anything out of sorts. 

Any ideas? Is this a bug in the protobuf serializer code?

@Test void shouldSerdeMessage() throws Exception { Map<String, Object> producerParams = new HashMap<>(); producerParams.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, schemaRegistryProperties.getUrl()); producerParams.put(AbstractKafkaStrategyAwareSerDe.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, schemaRegistryProperties.getArtifactIdStrategy()); producerParams.put(AbstractKafkaStrategyAwareSerDe.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, schemaRegistryProperties.getGlobalIdStrategy()); producerParams.put(AbstractKafkaSerDe.REGISTRY_ID_HANDLER_CONFIG_PARAM, schemaRegistryProperties.getIdHandler()); producerParams.put(AbstractKafkaSerDe.USE_HEADERS, schemaRegistryProperties.getUseHeaders()); //global Id location

ProtobufKafkaSerializerWithHeaders<Ptc2080> serializer = new ProtobufKafkaSerializerWithHeaders<>();
serializer.configure(producerParams, false);

Map<String, Object> consumerParams = new HashMap<>();
consumerParams.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, schemaRegistryProperties.getUrl());
consumerParams.put(AbstractKafkaSerDe.REGISTRY_ID_HANDLER_CONFIG_PARAM, schemaRegistryProperties.getIdHandler());
consumerParams.put(AbstractKafkaSerDe.USE_HEADERS, schemaRegistryProperties.getUseHeaders()); //global Id location

ProtobufKafkaDeserializerWithHeaders deserializer = new ProtobufKafkaDeserializerWithHeaders();
deserializer.configure(consumerParams, false);

Ptc2080 ptc2080 = Ptc2080
    .newBuilder()
    .setHeadEndPtcSubdivisionDistrictId(1)
    .setLocoPtcState(1)
    .setLocoPtcStateSummary(3)
    .setPtcAuthorityReferenceNumber(4333)
    .setHeader(Header.newBuilder().setMessageTypeId("ptc2080").setUuid("12345").build())
    .build();

Serde.Schema expected_schemaFromFileDescriptor = toSchemaProto(ptc2080.getDescriptorForType().getFile());
byte[] binarySchemaFromJavaProtoObj = expected_schemaFromFileDescriptor.toByteArray();
String schemaStringFromJavaObject = new String(binarySchemaFromJavaProtoObj);
Serde.Schema parsedSchemaFromJavaProtoObject = Serde.Schema.parseFrom(binarySchemaFromJavaProtoObj);
assertEquals(expected_schemaFromFileDescriptor, parsedSchemaFromJavaProtoObject, "expected_schemaFromFileDescriptor from the java object and the parsed expected_schemaFromFileDescriptor from the binary should equal");

Headers headers = new RecordHeaders();
final byte[] ptc2080SerializedData = serializer.serialize(Ptc2080.class.getCanonicalName().toLowerCase(), headers, ptc2080);

byte[] queriedSchema;
HeaderUtils headerUtils = new HeaderUtils((Map<String, Object>) consumerParams, false);

try (RegistryRestClient restClient = RegistryRestClientFactory.create(schemaRegistryProperties.getUrl())) {
  Long id = headerUtils.getGlobalId(headers);
  InputStream artifactResponse = restClient.getArtifactByGlobalId(id);
  queriedSchema = IoUtil.toBytes(artifactResponse);
}

// this line fails Serde.Schema queriedSchemaObj = Serde.Schema.parseFrom(queriedSchema); assertEquals(expected_schemaFromFileDescriptor, queriedSchemaObj, "expected_schemaFromFileDescriptor from the java object and the parsed queriedSchemaObj from the web api call should equal");

assertEquals(schemaStringFromJavaObject, new String(queriedSchema), "the queried binarySchemaFromJavaProtoObj must match the binarySchemaFromJavaProtoObj from the proto java object file descriptor");

//this line will also fail final DynamicMessage deserialize = deserializer.deserialize(Ptc2080.class.getCanonicalName().toLowerCase(), headers, ptc2080SerializedData);

assertNotNull(deserialize, "deserialized 2080 must not be null");

}

 For clarity:

public class ProtobufKafkaDeserializerWithHeaders extends ProtobufKafkaDeserializer {

@Override public void configure(Map<String, ?> configs, boolean isKey) { super.configure(configs, isKey); if (Utils.isTrue(configs.get(USE_HEADERS))) { headerUtils = new HeaderUtils((Map<String, Object>) configs, isKey); } } }

public class ProtobufKafkaSerializerWithHeaders extends ProtobufKafkaSerializer {

@Override public void configure(Map<String, ?> configs, boolean isKey) { super.configure(configs, isKey); if (Utils.isTrue(configs.get(USE_HEADERS))) { headerUtils = new HeaderUtils((Map<String, Object>) configs, isKey); } } }

EricWittmann commented 3 years ago

@alesj Would you be able to take a look at this?

alesj commented 3 years ago

I would also need Ptc2080 class. Or will any Protobuf class do (to fail this test)?

ebbnflow commented 3 years ago

I would also need Ptc2080 class. Or will any Protobuf class do (to fail this test)?

any one will do ... I think. here:


syntax = "proto3";
package com.mywork.asset;
option java_multiple_files=true;

import "google/protobuf/timestamp.proto";
import "com/mywork/header.proto";
import "com/mywork/location/point.proto";

message Ptc2080 {
  Header header = 1;
  double d1 = 2;
  double d2 = 3;
  string s1 = 4;
  string s2 = 5;
  string s3 = 6;
  int32 i1 = 7;
  int32 i2 = 8;
  double d3 = 9;
  int32 i3 = 10;
  string s4 = 11;
  int64 bi1 = 12;
  int32 i4 = 13;
  string s5 = 14;
  string s6 = 15;
  int32 i5 = 16;
  int32 i6 = 17;
  int32 i7 = 18;
  int32 i8 = 19;
  string s7 = 20;
  int32 i9 = 21;
  google.protobuf.Timestamp state_time = 22;
  int32 i10 = 23;
  string s8 = 24;
  string s9 = 25;
  int32 i11 = 26;
  string s10 = 27;
  int32 i12 = 28;
  string s11 = 29;
  com.mywork.location.Point point = 30;
}

syntax = "proto3";
package com.mywork;

/*
* This is the header for all
*/
import "google/protobuf/timestamp.proto";

message Header {
  google.protobuf.Timestamp time = 1; 
  string source = 2; 
  string destination = 3; 
  string uuid = 4; 
  repeated string source_uuids = 5; 
  string message_type_id  = 6; 
  string raw_message = 7;
}

syntax = "proto3";
package com.mywork.location;

/* 
*  This is the point message (longitude/latitude)
*/

message Point {
  oneof longitude_oneof {
     double longitude = 1; 
  }
  oneof latitude_oneof {
      double latitude = 2; 
  }
  oneof altitude_oneof {
      double altitude = 3; 
  }       

}

I'm using this dependency.

 <dependency>
      <groupId>io.apicurio</groupId>
      <artifactId>apicurio-registry-utils-serde</artifactId>
      <version>1.3.2.Final-redhat-00002</version>
 </dependency>
alesj commented 3 years ago

This passes for me upstream ...

/*
 * Copyright 2021 Red Hat
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package io.apicurio.registry;

import com.google.protobuf.DynamicMessage;
import com.google.protobuf.Message;
import io.apicurio.registry.support.TestCmmn;
import io.apicurio.registry.utils.serde.ProtobufKafkaDeserializer;
import io.apicurio.registry.utils.serde.ProtobufKafkaSerializer;
import io.apicurio.registry.utils.serde.SerdeConfig;
import io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy;
import io.apicurio.registry.utils.serde.util.HeaderUtils;
import io.apicurio.registry.utils.serde.util.Utils;
import io.apicurio.registry.utils.tests.TestUtils;
import io.quarkus.test.junit.QuarkusTest;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

import static io.apicurio.registry.utils.serde.SerdeConfig.USE_HEADERS;
import static org.junit.jupiter.api.Assertions.assertNotNull;

/**
 * @author Ales Justin
 * @author Steve Collins (GitHub issue 1186)
 */
@QuarkusTest
public class ProtobufSerdeTest extends AbstractResourceTestBase {

    @Test
    void testIssue1886() {
        Map<String, Object> producerParams = new HashMap<>();
        producerParams.put(SerdeConfig.REGISTRY_URL, TestUtils.getRegistryApiUrl());
        producerParams.put(SerdeConfig.GLOBAL_ID_STRATEGY, GetOrCreateIdStrategy.class);
        producerParams.put(USE_HEADERS, true); //global Id location
        ProtobufKafkaSerializerWithHeaders<TestCmmn.UUID> serializer = new ProtobufKafkaSerializerWithHeaders<>();
        serializer.configure(producerParams, false);

        Map<String, Object> consumerParams = new HashMap<>();
        consumerParams.put(SerdeConfig.REGISTRY_URL, TestUtils.getRegistryApiUrl());
        consumerParams.put(USE_HEADERS, true); //global Id location
        ProtobufKafkaDeserializerWithHeaders deserializer = new ProtobufKafkaDeserializerWithHeaders();
        deserializer.configure(consumerParams, false);

        TestCmmn.UUID record = TestCmmn.UUID.newBuilder().setLsb(2).setMsb(1).build();

        Headers headers = new RecordHeaders();
        byte[] bytes = serializer.serialize("i1186", headers, record);
        //this line will fail
        final DynamicMessage deserialize = deserializer.deserialize("i1186", headers, bytes);
        assertNotNull(deserialize, "deserialized i1186 must not be null");
    }

    static class ProtobufKafkaDeserializerWithHeaders extends ProtobufKafkaDeserializer {

        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            super.configure(configs, isKey);
            if (Utils.isTrue(configs.get(USE_HEADERS))) {
                //noinspection unchecked
                headerUtils = new HeaderUtils((Map<String, Object>) configs, isKey);
            }
        }
    }

    static class ProtobufKafkaSerializerWithHeaders<U extends Message> extends ProtobufKafkaSerializer<U> {

        @Override
        public void configure(Map<String, ?> configs, boolean isKey) {
            super.configure(configs, isKey);
            if (Utils.isTrue(configs.get(USE_HEADERS))) {
                //noinspection unchecked
                headerUtils = new HeaderUtils((Map<String, Object>) configs, isKey);
            }
        }
    }

}
alesj commented 3 years ago

I stripped out some stuff, but imho, it shouldn't change behavior too much ...

@ebbnflow can you try this as well - against your env? (TestCmmn class is part of app module test classes)

alesj commented 3 years ago

OK, also mocked Protobuf classes ...

ebbnflow commented 3 years ago

I created a proto with the same fields in my repo, compiled & generated the java classes then used the maven plugin to upload it to my local registry. I created a new unit test and replaced the model with the testcmmn one. I still am getting the same error.

   @Test
  void shouldSerdeTestMessage() throws Exception {
    //configure producer
    Map<String, Object> producerParams = new HashMap<>();
    producerParams.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, schemaRegistryProperties.getUrl());
    producerParams.put(AbstractKafkaStrategyAwareSerDe.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, schemaRegistryProperties.getArtifactIdStrategy());
    producerParams.put(AbstractKafkaStrategyAwareSerDe.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, schemaRegistryProperties.getGlobalIdStrategy());
    producerParams.put(AbstractKafkaSerDe.REGISTRY_ID_HANDLER_CONFIG_PARAM, schemaRegistryProperties.getIdHandler());
    producerParams.put(AbstractKafkaSerDe.USE_HEADERS, schemaRegistryProperties.getUseHeaders()); //global Id location

    ProtobufKafkaSerializerWithHeaders<TestCmmn> serializer = new ProtobufKafkaSerializerWithHeaders<>();
    serializer.configure(producerParams, false);

    //configure consumer
    Map<String, Object> consumerParams = new HashMap<>();
    consumerParams.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, schemaRegistryProperties.getUrl());
    consumerParams.put(AbstractKafkaSerDe.REGISTRY_ID_HANDLER_CONFIG_PARAM, schemaRegistryProperties.getIdHandler());
    consumerParams.put(AbstractKafkaSerDe.USE_HEADERS, schemaRegistryProperties.getUseHeaders()); //global Id location

    ProtobufKafkaDeserializerWithHeaders deserializer = new ProtobufKafkaDeserializerWithHeaders();
    deserializer.configure(consumerParams, false);

    TestCmmn testCmmn = TestCmmn
        .newBuilder()
        .setLsb(1)
        .setMsb(2)
        .build();

    Serde.Schema expected_schemaFromFileDescriptor = toSchemaProto(testCmmn.getDescriptorForType().getFile());
    byte[] binarySchemaFromJavaProtoObj = expected_schemaFromFileDescriptor.toByteArray();
    String schemaStringFromJavaObject = new String(binarySchemaFromJavaProtoObj);
    Serde.Schema parsedSchemaFromJavaProtoObject = Serde.Schema.parseFrom(binarySchemaFromJavaProtoObj);
    assertEquals(expected_schemaFromFileDescriptor, parsedSchemaFromJavaProtoObject, "expected_schemaFromFileDescriptor from the java object and the parsed expected_schemaFromFileDescriptor from the binary should equal");

    Headers headers = new RecordHeaders();
    final byte[] serializedData = serializer.serialize("com.csx.testcmmn", headers, testCmmn);

    byte[] queriedSchema;
    HeaderUtils headerUtils = new HeaderUtils((Map<String, Object>) consumerParams, false);

    try (RegistryRestClient restClient = RegistryRestClientFactory.create(schemaRegistryProperties.getUrl())) {
      Long id = headerUtils.getGlobalId(headers);
      InputStream artifactResponse = restClient.getArtifactByGlobalId(id);
      queriedSchema = IoUtil.toBytes(artifactResponse);
    }
    //test still fails here
    Serde.Schema queriedSchemaObj = Serde.Schema.parseFrom(queriedSchema);
    assertEquals(expected_schemaFromFileDescriptor, queriedSchemaObj, "expected_schemaFromFileDescriptor from the java object and the parsed queriedSchemaObj from the web api call should equal");

    assertEquals(schemaStringFromJavaObject, new String(queriedSchema), "the queried binarySchemaFromJavaProtoObj must match the binarySchemaFromJavaProtoObj from the proto java object file descriptor");

    final DynamicMessage deserialize = deserializer.deserialize(Ptc2080.class.getCanonicalName().toLowerCase(), headers, serializedData);

    assertNotNull(deserialize, "deserialized 2080 must not be null");

  }

my config

   schema-registry:
   url: http://localhost:8081/api
   artifact-id-strategy: io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy
   global-id-strategy: io.apicurio.registry.utils.serde.strategy.FindLatestIdStrategy
   use-headers: true
   id-handler: io.apicurio.registry.utils.serde.strategy.DefaultIdHandler

My dockerfile, compose and supporting scripts

  FROM apicurio/apicurio-registry-streams:1.3.2.Final
  USER root:root
  RUN mkdir /scripts
  COPY ./scripts/schema-registry-entrypoint.sh /scripts/
  RUN ["chmod", "+x", "/scripts/schema-registry-entrypoint.sh"]
  ENTRYPOINT ["sh", "/scripts/schema-registry-entrypoint.sh"]

The entry script. Waits for topics to be created before kicking off the registry jar

#!/bin/sh
until curl 'http://kafka-rest:8082/topics' | grep -q 'global-id-topic' ;
do
  echo 'topics are not available';
  sleep 1;
done;
echo 'topics are available!'
java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -javaagent:/opt/agent-bond/agent-bond.jar=jmx_exporter{{9779:/opt/agent-bond/jmx_exporter_config.yml}} -XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=40 -XX:+ExitOnOutOfMemoryError -cp . -jar /deployments/apicurio-registry-storage-streams-1.3.2.Final-runner.jar

schemaregistry:
  container_name: schemaregistry
    #image: apicurio/apicurio-registry-streams:1.3.2.Final 
  build:
    context: ./aqueduct/schema-registry
    dockerfile: Dockerfile
  depends_on:
    - broker
    - zookeeper
    - schemaregistrytopics
  ports:
    - 8081:8080
    - 9000
  environment:
    KAFKA_BOOTSTRAP_SERVERS: broker:9092
    QUARKUS_PROFILE: prod
    APPLICATION_ID: registry_id
    APPLICATION_SERVER: localhost:9000
  networks:
    - testharnessnetwork

schemaregistrytopics:
  image: docker.csx.com/confluentinc/cp-kafka
  container_name: schemaregistrytopics
  depends_on:
    - broker
    - zookeeper
  volumes:
    - ./schema-registry/scripts:/tmp
  command: bash -c "sleep 45s && chmod 777 ./tmp/create-schema-registry-topics.sh && ./tmp/create-schema-registry-topics.sh"
  networks:
    - testharnessnetwork
alesj commented 3 years ago

Does the upstream also fail for you?

On Wed, 10 Feb 2021 at 20:11, Steve Collins notifications@github.com wrote:

I created a proto with the same fields in my repo, compiled & generated the java classes then used the maven plugin to upload it to my local registry. I created a new unit test and replaced the model with the testcmmn one. I still am getting the same error.

@Test void shouldSerdeTestMessage() throws Exception { //configure producer Map<String, Object> producerParams = new HashMap<>(); producerParams.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, schemaRegistryProperties.getUrl()); producerParams.put(AbstractKafkaStrategyAwareSerDe.REGISTRY_ARTIFACT_ID_STRATEGY_CONFIG_PARAM, schemaRegistryProperties.getArtifactIdStrategy()); producerParams.put(AbstractKafkaStrategyAwareSerDe.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, schemaRegistryProperties.getGlobalIdStrategy()); producerParams.put(AbstractKafkaSerDe.REGISTRY_ID_HANDLER_CONFIG_PARAM, schemaRegistryProperties.getIdHandler()); producerParams.put(AbstractKafkaSerDe.USE_HEADERS, schemaRegistryProperties.getUseHeaders()); //global Id location

ProtobufKafkaSerializerWithHeaders serializer = new ProtobufKafkaSerializerWithHeaders<>(); serializer.configure(producerParams, false);

//configure consumer Map<String, Object> consumerParams = new HashMap<>(); consumerParams.put(AbstractKafkaSerDe.REGISTRY_URL_CONFIG_PARAM, schemaRegistryProperties.getUrl()); consumerParams.put(AbstractKafkaSerDe.REGISTRY_ID_HANDLER_CONFIG_PARAM, schemaRegistryProperties.getIdHandler()); consumerParams.put(AbstractKafkaSerDe.USE_HEADERS, schemaRegistryProperties.getUseHeaders()); //global Id location

ProtobufKafkaDeserializerWithHeaders deserializer = new ProtobufKafkaDeserializerWithHeaders(); deserializer.configure(consumerParams, false);

TestCmmn testCmmn = TestCmmn .newBuilder() .setLsb(1) .setMsb(2) .build();

Serde.Schema expected_schemaFromFileDescriptor = toSchemaProto(testCmmn.getDescriptorForType().getFile()); byte[] binarySchemaFromJavaProtoObj = expected_schemaFromFileDescriptor.toByteArray(); String schemaStringFromJavaObject = new String(binarySchemaFromJavaProtoObj); Serde.Schema parsedSchemaFromJavaProtoObject = Serde.Schema.parseFrom(binarySchemaFromJavaProtoObj); assertEquals(expected_schemaFromFileDescriptor, parsedSchemaFromJavaProtoObject, "expected_schemaFromFileDescriptor from the java object and the parsed expected_schemaFromFileDescriptor from the binary should equal");

Headers headers = new RecordHeaders(); final byte[] serializedData = serializer.serialize("com.csx.testcmmn", headers, testCmmn);

byte[] queriedSchema; HeaderUtils headerUtils = new HeaderUtils((Map<String, Object>) consumerParams, false);

try (RegistryRestClient restClient = RegistryRestClientFactory.create(schemaRegistryProperties.getUrl())) { Long id = headerUtils.getGlobalId(headers); InputStream artifactResponse = restClient.getArtifactByGlobalId(id); queriedSchema = IoUtil.toBytes(artifactResponse); } //test still fails here Serde.Schema queriedSchemaObj = Serde.Schema.parseFrom(queriedSchema); assertEquals(expected_schemaFromFileDescriptor, queriedSchemaObj, "expected_schemaFromFileDescriptor from the java object and the parsed queriedSchemaObj from the web api call should equal");

assertEquals(schemaStringFromJavaObject, new String(queriedSchema), "the queried binarySchemaFromJavaProtoObj must match the binarySchemaFromJavaProtoObj from the proto java object file descriptor");

final DynamicMessage deserialize = deserializer.deserialize(Ptc2080.class.getCanonicalName().toLowerCase(), headers, serializedData);

assertNotNull(deserialize, "deserialized 2080 must not be null");

}

my config

schema-registry: url: http://localhost:8081/api artifact-id-strategy http://localhost:8081/apiartifact-id-strategy: io.apicurio.registry.utils.serde.strategy.SimpleTopicIdStrategy global-id-strategy: io.apicurio.registry.utils.serde.strategy.FindLatestIdStrategy use-headers: true id-handler: io.apicurio.registry.utils.serde.strategy.DefaultIdHandler

My dockerfile, compose and supporting scripts

FROM apicurio/apicurio-registry-streams:1.3.2.Final USER root:root RUN mkdir /scripts COPY ./scripts/schema-registry-entrypoint.sh /scripts/ RUN ["chmod", "+x", "/scripts/schema-registry-entrypoint.sh"] ENTRYPOINT ["sh", "/scripts/schema-registry-entrypoint.sh"]

The entry script. Waits for topics to be created before kicking off the registry jar

!/bin/sh

until curl 'http://kafka-rest:8082/topics' | grep -q 'global-id-topic' ; do echo 'topics are not available'; sleep 1; done; echo 'topics are available!' java -Dquarkus.http.host=0.0.0.0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -javaagent:/opt/agent-bond/agent-bond.jar=jmx_exporter{{9779:/opt/agent-bond/jmx_exporter_config.yml}} -XX:+UseParallelGC -XX:GCTimeRatio=4 -XX:AdaptiveSizePolicyWeight=90 -XX:MinHeapFreeRatio=20 -XX:MaxHeapFreeRatio=40 -XX:+ExitOnOutOfMemoryError -cp . -jar /deployments/apicurio-registry-storage-streams-1.3.2.Final-runner.jar

schemaregistry: container_name: schemaregistry

image: apicurio/apicurio-registry-streams:1.3.2.Final

build: context: ./aqueduct/schema-registry dockerfile: Dockerfile depends_on:

  • broker
  • zookeeper
  • schemaregistrytopics ports:
  • 8081:8080
  • 9000 environment: KAFKA_BOOTSTRAP_SERVERS: broker:9092 QUARKUS_PROFILE: prod APPLICATION_ID: registry_id APPLICATION_SERVER: localhost:9000 networks:
  • testharnessnetwork

schemaregistrytopics: image: docker.csx.com/confluentinc/cp-kafka container_name: schemaregistrytopics depends_on:

  • broker
  • zookeeper volumes:
  • ./schema-registry/scripts:/tmp command: bash -c "sleep 45s && chmod 777 ./tmp/create-schema-registry-topics.sh && ./tmp/create-schema-registry-topics.sh" networks:
  • testharnessnetwork

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/Apicurio/apicurio-registry/issues/1186#issuecomment-776944205, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACRA6AGYROVKLFV3QFKOCTS6LK77ANCNFSM4XBF5WXQ .

alesj commented 3 years ago
ebbnflow commented 3 years ago

So I'm continuing to investigate... It may have something to do with the api server client.

I'll back up a little. When I run my unit test with only the following entry in my pom:

     <dependency>
      <groupId>io.apicurio</groupId>
      <artifactId>apicurio-registry-utils-serde</artifactId>
      <version>1.3.2.Final-redhat-00002</version>
    </dependency>

I get the following error:


 java.lang.NullPointerException
    at io.apicurio.registry.utils.serde.strategy.FindLatestIdStrategy.findId(FindLatestIdStrategy.java:30)
    at io.apicurio.registry.utils.serde.AbstractKafkaSerializer.serialize(AbstractKafkaSerializer.java:75)
    at com.csx.events.registry.serde.ProtobufKafkaSerdeTest.shouldSerdeTestMessage(ProtobufKafkaSerdeTest.java:88)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:513)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
    ...

09:48:56.801 [main] ERROR i.a.r.c.r.RequestHandler$ResultCallback - Error getting call result
java.lang.RuntimeException: java.net.ConnectException: Failed to connect to localhost/0:0:0:0:0:0:0:1:8081
    at io.apicurio.registry.utils.ConcurrentUtil.get(ConcurrentUtil.java:43)
    at io.apicurio.registry.utils.ConcurrentUtil.get(ConcurrentUtil.java:27)
    at io.apicurio.registry.client.request.RequestHandler$ResultCallback.getResult(RequestHandler.java:49)
    at io.apicurio.registry.client.request.RequestHandler.execute(RequestHandler.java:20)
    at io.apicurio.registry.client.RegistryRestClientImpl.getArtifactMetaData(RegistryRestClientImpl.java:253)
    at io.apicurio.registry.client.CompatibleClient.getArtifactMetaData(CompatibleClient.java:104)
    at io.apicurio.registry.utils.serde.strategy.FindLatestIdStrategy.findId(FindLatestIdStrategy.java:29)
    at io.apicurio.registry.utils.serde.AbstractKafkaSerializer.serialize(AbstractKafkaSerializer.java:75)
    at com.csx.events.registry.serde.ProtobufKafkaSerdeTest.shouldSerdeTestMessage(ProtobufKafkaSerdeTest.java:88)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:513)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:170)
    at org.junit.jupiter.engine.execution.ThrowableCollector.execute(ThrowableCollector.java:40)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:166)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:113)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:58)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:113)
    at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:121)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:121)
    at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:121)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
    at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
    at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
    at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:121)
    at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:55)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:43)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:170)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:154)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:90)
    at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:71)
    at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
    at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
    at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
Caused by: java.net.ConnectException: Failed to connect to localhost/0:0:0:0:0:0:0:1:8081
    at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:265)
    at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:183)
    at okhttp3.internal.connection.ExchangeFinder.findConnection(ExchangeFinder.java:224)
    at okhttp3.internal.connection.ExchangeFinder.findHealthyConnection(ExchangeFinder.java:108)
    at okhttp3.internal.connection.ExchangeFinder.find(ExchangeFinder.java:88)
    at okhttp3.internal.connection.Transmitter.newExchange(Transmitter.java:169)
    at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:41)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:94)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:88)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:142)
    at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:117)
    at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:229)
    at okhttp3.RealCall$AsyncCall.execute(RealCall.java:172)
    at okhttp3.internal.NamedRunnable.run(NamedRunnable.java:32)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused (Connection refused)
    at java.net.PlainSocketImpl.socketConnect(Native Method)
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
    at java.net.Socket.connect(Socket.java:589)
    at okhttp3.internal.platform.Platform.connectSocket(Platform.java:130)
    at okhttp3.internal.connection.RealConnection.connectSocket(RealConnection.java:263)

So somethings missing as far as some api client goes. So I added this entry to the pom and it started succeeding in the registry search..... but this could be part of the problem with loading the schema.

     <dependency>
      <groupId>org.glassfish.jersey.core</groupId>
      <artifactId>jersey-client</artifactId>
      <version>2.27</version>
    </dependency>

What pom entries do I need for using the apicurio-registry-utils-serde Protobuf serdes? Because apparently the apicurio-registry-utils-serde pom entry isn't enough.

dependencies:

 <dependencyManagement>
    <dependencies>
      <!-- JUnit 5.x -->
      <dependency>
        <groupId>org.junit</groupId>
        <artifactId>junit-bom</artifactId>
        <version>5.2.0</version>
        <scope>import</scope>
        <type>pom</type>
      </dependency>

      <!-- Mockito 2.x -->
      <dependency>
        <groupId>org.mockito</groupId>
        <artifactId>mockito-core</artifactId>
        <version>2.21.0</version>
      </dependency>

    </dependencies>
  </dependencyManagement>

  <dependencies>
    <!-- core dependencies  -->
    <dependency>
      <groupId>io.apicurio</groupId>
      <artifactId>apicurio-registry-utils-serde</artifactId>
      <version>1.3.2.Final-redhat-00002</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-core</artifactId>
      <version>${fuse.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-kafka</artifactId>
      <version>${fuse.version}</version>
    </dependency>
    <dependency>
      <groupId>org.glassfish.jersey.core</groupId>
      <artifactId>jersey-client</artifactId>
      <version>2.27</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot</artifactId>
      <version>${spring-boot.version}</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <version>${spring-boot.version}</version>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <groupId>org.junit.vintage</groupId>
          <artifactId>junit-vintage-engine</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-test-spring</artifactId>
      <version>${fuse.version}</version>
      <scope>test</scope>
    </dependency>
    <!-- Junit Test  -->
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-params</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-engine</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.junit.platform</groupId>
      <artifactId>junit-platform-launcher</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.junit.platform</groupId>
      <artifactId>junit-platform-runner</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-api</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.mockito</groupId>
      <artifactId>mockito-core</artifactId>
      <scope>test</scope>
    </dependency>
    <!-- slf4j - log4j jar -->
    <dependency>
      <groupId>org.apache.logging.log4j</groupId>
      <artifactId>log4j-slf4j-impl</artifactId>
      <version>2.11.1</version>
      <scope>test</scope>
    </dependency>

  </dependencies>
alesj commented 3 years ago

OK, so you're saying if I run this as a completely separate project I should see this error? OK, will try this first thing tomorrow ...

ebbnflow commented 3 years ago

OK, so you're saying if I run this as a completely separate project I should see this error? OK, will try this first thing tomorrow ...

I do believe so - and thank you for helping!

We have a multitude of spring boot / red hat fuse (apache camel) projects which are reading and writing to kafka. I'm integrating the schema registry into those microservices. The only thing I need for these fuse services are the protobuf serde's from apicurio. I then configure the kafka producer and consumers with those (your) serdes. This is where I get my original error. The serde's aren't working fully (as shown by the errors linked in this thread).

To test locally, I'm spinning up an actual kafka broker, zookeeper, the kstreams flavor of the apicurio registry. I'm pumping messages with one of my boot/camel services and then consuming it with another of my boot/camel services. The producer serde puts the messages on the topic fine, it also writes the global ID into the header.

As an aside, I had to extend the proto serializer and deserializer to add support for them to write the global ID to the header because the base proto serde's write the global ID to in the body of the kafka message (I don't want that sort of message intrusion - for backwards compatibility purposes).

The problem comes when the consumer pulls the message off the topic, it uses the header to pull the global id, it queries successfully the schema, but when it attempts to create the schema java object from that query result, it fails with that "wire type" error.

alesj commented 3 years ago
alesj commented 3 years ago

While running Streams storage ...

java -jar target/apicurio-registry-storage-streams-1.3.3-SNAPSHOT-runner.jar
ebbnflow commented 3 years ago

I pulled down your fork and ran it. I've made some adjustments and i'm able to reproduce it with your branch.

Add this to your code:

change your strategy to : producerParams.put(AbstractKafkaStrategyAwareSerDe.REGISTRY_GLOBAL_ID_STRATEGY_CONFIG_PARAM, FindLatestIdStrategy.class);

now go delete your proto from the registry.... now upload it with either the maven plugin or copy paste by hand.

Now run your test. It will fail with that "wire type" error.

What I found is when the GetOrCreateStrategy is used, it uploads a (proto) binary version of the schema; in the registry ui

 �
    log.proto io.apicurio.registry.demo.domain"2
LogMerge
fst (
snd (
info (  B'
 io.apicurio.registry.demo.domainBLogbproto3

yet when I upload it via the maven plugin or copy paste, it looks like normal strings when viewing it from the registry UI.

     syntax = "proto3";

      package io.apicurio.registry.demo.domain;

      option java_package = "io.apicurio.registry.demo.domain";

      option java_outer_classname = "Log";

      message LogMerge {
          fixed64 fst = 1;
          fixed64 snd = 2;
          string info = 3;
      }
ebbnflow commented 3 years ago

My plugin pom entry. to upload: mvn compile -Denv=local

    <profile>
      <id>env-local</id>
      <activation>
        <property>
          <name>env</name>
          <value>local</value>
        </property>
      </activation>
      <build>
        <plugins>
          <plugin>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-maven-plugin</artifactId>
            <version>${registry.version}</version>
            <executions>
              <execution>
                <phase>generate-sources</phase>
                <goals>
                  <goal>register</goal>
                </goals>
                <configuration>
                  <registryUrl>http://localhost:8081/api</registryUrl>
                  <artifactType>PROTOBUF</artifactType>
                  <artifacts>
                    <com.csx.testcmmn>${project.basedir}/src/main/protobuf/com/mywork/testcmmn.proto</com.csx.testcmmn>
                  </artifacts>
                </configuration>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </profile>
ebbnflow commented 3 years ago

I believe the root of the problem lies in the proto deserializer, on the line where it calls Schema.parseFrom(). The deserializer will need another configuration setting to either parse a proto binary schema into a schema object or some other code to convert an ascii string into a java schema object.

 @Override
    protected DynamicMessage readData(byte[] schema, ByteBuffer buffer, int start, int length) {
        try {
            //if binary
            Serde.Schema s = Serde.Schema.parseFrom(schema); <-- that

            //else if config.getStorageType() == string
            //parse string 
            //Serde.Schema s = Serde.Schema.fromString(schemaString)

            Descriptors.FileDescriptor fileDescriptor = toFileDescriptor(s);

            byte[] bytes = new byte[length];
            System.arraycopy(buffer.array(), start, bytes, 0, length);
            ByteArrayInputStream is = new ByteArrayInputStream(bytes);

            Serde.Ref ref = Serde.Ref.parseDelimitedFrom(is);

            Descriptors.Descriptor descriptor = fileDescriptor.findMessageTypeByName(ref.getName());
            return DynamicMessage.parseFrom(descriptor, is);
        } catch (IOException | Descriptors.DescriptorValidationException e) {
            throw new IllegalStateException(e);
        }
    }

That is assuming the bytes that it's parsing after the rest client returns it's results is in binary proto form (it's a little paradoxical that the proto schema is itself proto'ed), yet the ones uploaded by hand are in ASCII plain text format, as are the ones uploaded via the maven plugin. Two different stored formats. The serde's will need to know how they are stored if multiple storage formats are supported and will need to be converted into a proto schema java object.

alesj commented 3 years ago

Yeah good catch. The moment you posted plugin upload + FindLatest I knew what the problem was :) Just dunno now how to properly solve this. I guess the plugin needs to be smarter ...

On Fri, 12 Feb 2021 at 18:07, Steve Collins notifications@github.com wrote:

I believe the root of the problem lies in here:

io.apicurio.registry.common.proto.Serde$Schema.parseFrom(Serde.java:349)

That is assuming the bytes that it's parsing after the rest client returns it's results is in binary proto form (it's a little paradoxical that the proto schema is itself proto'ed), yet the ones uploaded by hand are in ASCII plain text format, as are the ones uploaded via the maven plugin. Two different stored formats. The serde's will need to know how they are stored if multiple storage formats are supported and will need to be converted into a proto schema java object.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/Apicurio/apicurio-registry/issues/1186#issuecomment-778319916, or unsubscribe https://github.com/notifications/unsubscribe-auth/AACRA6FGSP4QVB7I3JIBNMTS6VN5LANCNFSM4XBF5WXQ .

ebbnflow commented 3 years ago

Yeah good catch. The moment you posted plugin upload + FindLatest I knew what the problem was :) Just dunno now how to properly solve this. I guess the plugin needs to be smarter ...

If that's the approach then uploading via copy paste in the web console will have to be changed too?

ebbnflow commented 3 years ago

After reading the api doc, there's two different proto artifact types. https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/assets-attachments/registry-rest-api.htm#operation/createArtifact

Protobuf (PROTOBUF) Protobuf File Descriptor (PROTOBUF_FD)

GetOrCreate uploads a PROTOBUF_FD type, the plugin and the web console must upload PROTOBUF

What's the difference?

Does original intention/design not support uploading plain text proto schemas?

famarting commented 3 years ago

Hi @ebbnflow I'm not sure if I missed something but maybe a solution would be to just configure the maven plugin to create the artifact with the type PROTOBUF_FD

something like

    <profile>
      <id>env-local</id>
      <activation>
        <property>
          <name>env</name>
          <value>local</value>
        </property>
      </activation>
      <build>
        <plugins>
          <plugin>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-maven-plugin</artifactId>
            <version>${registry.version}</version>
            <executions>
              <execution>
                <phase>generate-sources</phase>
                <goals>
                  <goal>register</goal>
                </goals>
                <configuration>
                  <registryUrl>http://localhost:8081/api</registryUrl>
                  <artifactType>PROTOBUF_FD</artifactType>
                  <artifacts>
                    <com.csx.testcmmn>${project.basedir}/src/main/protobuf/com/mywork/testcmmn.proto</com.csx.testcmmn>
                  </artifacts>
                </configuration>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </profile>

have you tried this? and sorry for the late response :)

EricWittmann commented 3 years ago

Protobuf (PROTOBUF) Protobuf File Descriptor (PROTOBUF_FD)

GetOrCreate uploads a PROTOBUF_FD type, the plugin and the web console must upload PROTOBUF

What's the difference?

The former is the plain text protobuf schema. The latter is the binary form of it (file descriptor). Unfortunately I'm not experienced enough with proto to know more than that.

Does original intention/design not support uploading plain text proto schemas?

Perhaps an exploration of your desired use case would be useful here. What do you want your Protobuf Serde to do, ideally? It's possible that the Apicurio Registry Protobuf Serde is designed to solve a different use case than yours. If so, perhaps a custom Serde is in order (but still integrating with Apicurio Registry). We could certainly help with this!

EricWittmann commented 3 years ago

OK after reading through this thread again, I think I understand the scenario more than I thought. :)

You're 100% right - the protobuf serdes we have assumes that the artifact in the repository is the binary (_FD) description of the schema. I think the reason for this is that the use-case we were trying to address is the more Avro-style case, where the Serializer is responsible for registering the schema definition in the registry. And the serializer typically only has the binary format, not the textual format. So that is what gets registered.

I assume that in your use-case, you want to register the protobuf schema either via maven or some other external flow. And then you want both the serializer and deserializer to use that textual protobuf schema. Correct?

If yes, then this is where my lack of protobuf experience gets in my way. I thought the protobuf textual schema was used to generate java classes that were ultimately used for serializing and deserializing. And so any change to the schema would require re-generating those classes and deploying new consumers/producers.

In any case, what I will say is that I don't believe we have a way to register a binary protobuf schema in the registry other than the protobuf Serializer itself. So if what we have today is actually what you want (i.e. protobuf Serdes classes that serialize/deserialize using DynamicMessage instead of generated java classes), then I think there are two possibilities:

1) Update the maven plugin to support PROTOBUF_FD 2) Update the Serdes to support PROTOBUF schemas

I would prefer solution (2) but I don't know if it's possible to parse a .proto file into a Serde.Schema. I know that solution (1) is possible, but honestly I would prefer to deprecate PROTOBUF_FD if a solution to (2) can be found.

ebbnflow commented 3 years ago

I am following along with the internal JIRA but am unable to post comments on there so I'lll post some feedback here.

I am able to upload protobuf file descriptors via the apicurio maven plugin. Our protobuf java project code gens the FD and the .java classes using the proto compiler v 3.11.1. I have the plugin set to look in the folder where the file descriptors are placed.

 <build>
        <plugins>
          <plugin>
            <groupId>io.apicurio</groupId>
            <artifactId>apicurio-registry-maven-plugin</artifactId>
            <version>${registry.version}</version>
            <executions>
              <execution>
                <phase>generate-sources</phase>
                <goals>
                  <goal>register</goal>
                </goals>
                <configuration>
                  <registryUrl>http://localhost:8081/api</registryUrl>
                  <artifactType>PROTOBUF_FD</artifactType>
                  <artifacts>
                    <com.mywork.artifactName>${project.basedir}/target/classes/my_Model.desc</com.mywork.artifactName>
                  </artifacts>
                </configuration>
              </execution>
            </executions>
          </plugin>
        </plugins>
      </build>

However, the apicurio proto deserializer still breaks, but in another part of the code.

java.lang.IllegalStateException: com.google.protobuf.Descriptors$DescriptorValidationException: com.mywork.asset.rolling.Header.time: ".google.protobuf.Timestamp" is not defined.

    at io.apicurio.registry.utils.serde.ProtobufKafkaDeserializer.readData(ProtobufKafkaDeserializer.java:71)
    at io.apicurio.registry.utils.serde.ProtobufKafkaDeserializer.readData(ProtobufKafkaDeserializer.java:77)
    at io.apicurio.registry.utils.serde.ProtobufKafkaDeserializer.readData(ProtobufKafkaDeserializer.java:38)
    at io.apicurio.registry.utils.serde.AbstractKafkaDeserializer.deserialize(AbstractKafkaDeserializer.java:103)
    at com.csx.events.registry.serde.ProtobufKafkaSerdeTest.shouldSerde2080Message(ProtobufKafkaSerdeTest.java:186)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:513)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:170)
    at org.junit.jupiter.engine.execution.ThrowableCollector.execute(ThrowableCollector.java:40)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:166)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:113)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:58)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$3(HierarchicalTestExecutor.java:113)
    at org.junit.platform.engine.support.hierarchical.SingleTestExecutor.executeSafely(SingleTestExecutor.java:66)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.executeRecursively(HierarchicalTestExecutor.java:108)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.execute(HierarchicalTestExecutor.java:79)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor$NodeExecutor.lambda$executeRecursively$2(HierarchicalTestExecutor.java:121)

The problem lies in this method of code on the deserializer.

It's not constructing the file descriptor reader in the same manner as the proto compiler creates the FDs. To add, the method in which the apicurio code creates the file descriptors is based on the model itself and not the file descriptor created from the protobuf compiler. In my personal opinion I think that is a risky way to design it. I would trust the compiler's FD and not the runtime model's FD. They are in fact different. I will paste the differences down below.

    private Descriptors.FileDescriptor toFileDescriptor(Serde.Schema s) throws Descriptors.DescriptorValidationException {
        List<Descriptors.FileDescriptor> imports = new ArrayList<>();
        for (Serde.Schema i : s.getImportList()) {
            imports.add(toFileDescriptor(i));
        }
        return Descriptors.FileDescriptor.buildFrom(s.getFile(), imports.toArray(new Descriptors.FileDescriptor[0]));
    }

I did some digging and came up with this, I inherited the apicurio proto deserializer and did a method override, which works for the deserializer. It works for the FD's generated from the proto compiler but does not work with the FD's generated from the apicurio proto serializer GetOrCreate strategy.


  private Descriptors.FileDescriptor toFileDescriptor(byte[] schema) throws DescriptorValidationException, IOException {
    DescriptorProtos.FileDescriptorSet set = DescriptorProtos.FileDescriptorSet.parseFrom(schema);

    List<FileDescriptor> dependencyFileDescriptorList = new ArrayList<>();
    for (int i = 0; i < set.getFileCount() - 1; i++) {
      dependencyFileDescriptorList
          .add(Descriptors.FileDescriptor.buildFrom(set.getFile(i), dependencyFileDescriptorList.toArray(new FileDescriptor[i])));
    }
    return Descriptors.FileDescriptor.buildFrom(set.getFile(set.getFileCount() - 1), dependencyFileDescriptorList.toArray(new FileDescriptor[0]));
  }

Apicurio FD

�
)com/mywork/asset/rolling/loco/ptc_2080.protocom.mywork.asset.rolling.locogoogle/protobuf/timestamp.protocom/mywork/header.protocom/mywork/location/point.proto"�
Ptc2080
header (2.com.csx.Header
altitude (
rear_end_milepost (
scac (  
rear_end_railroad_scac (    
head_end_track_name (   
speed (,
$head_end_ptc_subdivision_district_id (
head_end_milepost  ()
!distance_lapsed_from_1080_message
 ( 
rear_end_milepost_suffix (  &
ptc_authority_reference_number ($
current_position_uncertainty
 ( 
head_end_milepost_prefix (   
rear_end_milepost_prefix (  ,
$rear_end_ptc_subdivision_district_id (
control_brake (%
time_lapsed_from_1080_message (
loco_ptc_state (
head_end_railroad_scac (    
direction_of_travel (3
loco_state_time (2.google.protobuf.Timestamp
loco_ptc_state_summary ( 
head_end_milepost_suffix (  
rear_end_track_name (   
gps_position_validity (

track_name ( 
reason_for_ptc_report (
loco_id (   &
point (2.com.csx.location.PointBPbproto3�
�
google/protobuf/timestamp.protogoogle.protobuf"+
    Timestamp
seconds (
nanos (B~
com.google.protobufBTimestampProtoPZ+github.com/golang/protobuf/ptypes/timestamp��GPB�Google.Protobuf.WellKnownTypesbproto3�
�
com/mywork/header.protocom.csxgoogle/protobuf/timestamp.proto"�
Header(
time (2.google.protobuf.Timestamp
source (    
destination (   
uuid (  
source_uuids (  
message_type_id (   
raw_message (   bproto3�
�
google/protobuf/timestamp.protogoogle.protobuf"+
    Timestamp
seconds (
nanos (B~
com.google.protobufBTimestampProtoPZ+github.com/golang/protobuf/ptypes/timestamp��GPB�Google.Protobuf.WellKnownTypesbproto3�
�
com/mywork/location/point.protocom.mywork.location"{
Point
    longitude (H

protobuf compiler FD


�
google/protobuf/timestamp.protogoogle.protobuf";
    Timestamp
seconds (Rseconds
nanos (RnanosB~
com.google.protobufBTimestampProtoPZ+github.com/golang/protobuf/ptypes/timestamp��GPB�Google.Protobuf.WellKnownTypesbproto3
�
com/mywork/header.protocom.csxgoogle/protobuf/timestamp.proto"�
Header.
time (2.google.protobuf.TimestampRtime
source (    Rsource 
destination (   Rdestination
uuid (  Ruuid!
source_uuids (  RsourceUuids&
message_type_id (   R
messageTypeId
raw_message (   R
rawMessagebproto3
�
com/mywork/location/point.protocom.mywork.location"�
Point
    longitude (HR  longitude
latitude (HRlatitude
altitude (HRaltitudeB
longitude_oneofB
latitude_oneofB
altitude_oneofbproto3
�
)com/mywork/asset/rolling/loco/ptc_2080.protocom.csx.asset.rolling.locogoogle/protobuf/timestamp.protocom/mywork/header.protocom/mywork/location/point.proto"�
Ptc2080'
header (2.com.csx.HeaderRheader
altitude (Raltitude*
rear_end_milepost (RrearEndMilepost
scac (  Rscac3
rear_end_railroad_scac (    RrearEndRailroadScac-
head_end_track_name (   RheadEndTrackName
speed (RspeedM
$head_end_ptc_subdivision_district_id (RheadEndPtcSubdivisionDistrictId*
head_end_milepost  (RheadEndMilepostH
!distance_lapsed_from_1080_message
 (RdistanceLapsedFrom1080Message7
rear_end_milepost_suffix (  RrearEndMilepostSuffixC
ptc_authority_reference_number (RptcAuthorityReferenceNumber@
current_position_uncertainty
 (RcurrentPositionUncertainty7
head_end_milepost_prefix (  RheadEndMilepostPrefix7
rear_end_milepost_prefix (  RrearEndMilepostPrefixM
$rear_end_ptc_subdivision_district_id (RrearEndPtcSubdivisionDistrictId#

control_brake (RcontrolBrake@
time_lapsed_from_1080_message (RtimeLapsedFrom1080Message$
loco_ptc_state (RlocoPtcState3
head_end_railroad_scac (    RheadEndRailroadScac.
direction_of_travel (RdirectionOfTravelB
loco_state_time (2.google.protobuf.TimestampR
locoStateTime3
loco_ptc_state_summary (RlocoPtcStateSummary7
head_end_milepost_suffix (  RheadEndMilepostSuffix-
rear_end_track_name (   RrearEndTrackName2
gps_position_validity (RgpsPositionValidity

track_name ( R   trackName1
reason_for_ptc_report (RreasonForPtcReport
loco_id (   RlocoId-
point (2.com.csx.location.PointRpointBPbproto3
famarting commented 3 years ago

Hi @ebbnflow could you share your target/classes/my_Model.desc file? I would like to use your example for my testing, plus I'm going to try fix the current serdes or create some specific classes to solve your issues

ebbnflow commented 3 years ago

Hi @ebbnflow could you share your target/classes/my_Model.desc file? I would like to use your example for my testing, plus I'm going to try fix the current serdes or create some specific classes to solve your issues

I can upload them to my red hat ticket. Can you find them there? My dev manager suggests to do it that way because we have an NDA with red hat. https://access.redhat.com/support/cases/#/case/02865657

EricWittmann commented 3 years ago

Yes we can access that!