streamnative / pulsar-archived

Apache Pulsar - distributed pub-sub messaging system
https://pulsar.apache.org
Apache License 2.0
73 stars 25 forks source link

ISSUE-9922: Create c++ producer/consumer with protobuf schema #2288

Open sijie opened 3 years ago

sijie commented 3 years ago

Original Issue: apache/pulsar#9922


Expected behavior

I would like to use the schema registry for my c++ consumers/producers with protobuf schemas. There are examples/test for AVRO and Json but not for protobuf and I'm seeing failures creating the producer/consumer.

Actual behavior

c++ test app output:

2021-03-16 15:31:46.224 WARN  [140638990731008] ClientConnection:959 | [127.0.0.1:35056 -> 127.0.0.1:6650] Received error response from server: IncompatibleSchema -- req_id: 0
2021-03-16 15:31:46.224 ERROR [140638990731008] ConsumerImpl:257 | [persistent://public/default/schema-publisher23, adg_sub, 0] Failed to create consumer: IncompatibleSchema

pulsar-standalone container output:

pulsar-standalone    | 15:41:43.175 [ForkJoinPool.commonPool-worker-1] WARN  org.apache.pulsar.broker.service.ServerCnx - [/10.0.2.2:45928][persistent://public/default/schema-publisher23][adg_sub] Failed to create consumer: Invalid schema definition data for PROTOBUF schema

Steps to reproduce

Code snippet:

void createConsumer()
{
    /*
        message MyProto{
            int a = 1;
            double b = 2;
        }
    */
  ::pulsar::Client client("pulsar://0.0.0.0:6650");
  ::pulsar::ConsumerConfiguration config;
  MyProto msg{};
  msg.set_a(22);
  msg.set_b(1.0);
  std::string msgJson;
  google::protobuf::util::MessageToJsonString(msg, &msgJson);
  ::pulsar::SchemaInfo schema(::pulsar::SchemaType::PROTOBUF, "Protobuf", msgJson);
  config.setSchema(schema);
  ::pulsar::Consumer consumer;
  auto result = client.subscribe("persistent://public/default/schema-publisher23", "adg_sub",
                                 config, consumer);
}

System configuration

Using pulsar-standalone docker image

adg25 commented 3 years ago

I wondered if I needed to manually upload the schema. Appears to not be able to though:

$ apache-pulsar-2.7.0/bin/pulsar-admin schemas upload --filename /path/to/MyMessageSchema persistent://public/default/schema-publisher23
Picked up JAVA_TOOL_OPTIONS: -Xmx1879m
Warning: Nashorn engine is planned to be removed from a future JDK release
18:54:29.816 [AsyncHttpClient-7-1] WARN  org.apache.pulsar.client.admin.internal.BaseResource - [http://localhost:8080/admin/v2/schemas/public/default/schema-publisher23/schema] Failed to perform http post request: javax.ws.rs.InternalServerErrorException: HTTP 500 Internal Server Error
HTTP 500 Internal Server Error

Reason: HTTP 500 Internal Server Error
 $ apache-pulsar-2.7.0/bin/pulsar-admin clusters get standalone
Picked up JAVA_TOOL_OPTIONS: -Xmx1879m
Warning: Nashorn engine is planned to be removed from a future JDK release
{
  "serviceUrl" : "http://localhost:8080",
  "brokerServiceUrl" : "pulsar://localhost:6650"
}
$ apache-pulsar-2.7.0/bin/pulsar-admin clusters list
Picked up JAVA_TOOL_OPTIONS: -Xmx1879m
Warning: Nashorn engine is planned to be removed from a future JDK release
"standalone"

Contents of MyMessageSchema file:

{
    "type": "Protobuf",
    "schema": "{\"a\":\"1\", \"b\":\"2\"}",
    "properties": {}
}

Proto:

message MyMessage
{
    int32 a = 1;
    double b = 2;
}