confluentinc / libserdes

Avro Serialization/Deserialization C/C++ library with Confluent schema-registry support
Apache License 2.0
5 stars 64 forks source link

serdes->deserialize() gives error: Invalid CP1 magic byte 96, expected 0 #26

Closed jnarego closed 4 years ago

jnarego commented 4 years ago

Hello,

In a CentOS linux system, I have a C++ test program to public and consume json messages to a kafka cluster using a schema-registry. I don't have any control over the kafka cluster.

I'm using libserdes and rdkafka with most of the code from the libserdes examples: kafka-serdes-avro-console-producer.cpp kafka-serdes-avro-console-consumer.cpp. I can publish an avro serialized json validated by the obtained schema, but when I'm consuming it, the method serdes->deserialize(...) gives me the following error:

The output of running in consumer mode is:

Serdes schema.registry.url = https://schema-registry.xxxxxx.com Serdes serializer.framing = cp1 Serdes deserializer.framing = cp1 Query schema-registry by name [xxxxx-test-value] Get Schema [xxxxx-test-value], id [1] New consumer created consuming partition: 0 Message on topic [topic-test] partition [0] at offset [0]: Key: 01 Value: `�r ERROR deserializing message payload: Invalid CP1 magic byte 96, expected 0 Message on [topic-test] partition [0] at offset [1]: Key: 02 Value: �B ERROR deserializing message payload: Invalid CP1 magic byte 144, expected 0

Is this a problem from my producer or some kind of configuration in kafka connectors? I read about the wire format in Avro serialized data and I think libseres sends the correct 5 bytes (1byte magic byte = 0 + 4byte = schemaId ) in the payload to kafka.

Can someone help me please? Many thanks in advance. Joao Rego

edenhill commented 4 years ago

That indeed looks like the producer did not use the cp1 serializer framing. Were you using kafka-serdes-avro-console-producer.cpp to produce the messages?

jnarego commented 4 years ago

Hello @edenhill ,

I'm using parts of code from that example. Here is my producer:

static void producer()
{
  // Config kafka Topic
  rd_kafka_topic_conf_t *topic_conf = rd_kafka_topic_conf_new();

  // Create Kafka producer handler
  rd_kafka_t *rk;
  if (!(rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr)))) {
    cout << "Failed to create new producer - " << errstr << endl;
    exit(1);
  }
  else
    cout << "KAFKA New producer created" << endl;

  // Create topic to produce to 
  rd_kafka_topic_t *rkt = rd_kafka_topic_new(rk, myKafkaTopic, topic_conf);

  // testing simple json & schema: mysim-tv-test-value
  string payload = "{\"id\":\"01\", \"amount\":123.45}";
  cout << "json to send: " << payload << endl;

  avro::GenericDatum *datum = NULL;
  std::vector<char> out;

  // Convert JSON to Avro object
  if (json2avro(schema, payload, &datum) == -1)
    cout << "Failed to convert json2avro " << endl;
  else
    cout << "Convert json2avro OK" << endl;

  // Serialize Avro out
  ssize_t numBytes = 0;
  if ((numBytes = serdes->serialize(schema, datum, out, errcod)) == -1)
    cout << "% Avro serialization failed: " << errcod << endl;
  else
    cout << "Avro serialization OK [" << numBytes << " bytes]" << endl;

  delete datum;

  string myKey = "jrego";

  // producing to kafka topic
  if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, &out, sizeof(out), (char*)myKey.c_str(), myKey.size(), NULL) == -1)
    cout << "Failed to produce to topic [" << myKafkaTopic << "] : " << rd_kafka_err2str(rd_kafka_last_error()) << endl;
  else
    cout << "Send msg [" << (char*)payload.c_str() << "] to topic [" << myKafkaTopic << "]" << endl;

  if (rd_kafka_flush(rk, 2*1000) == -1)
    cout << "Failed to flush " << rd_kafka_err2str(rd_kafka_last_error()) << endl;
  else
    cout << "flush - OK" << endl;

  cout << "rd_kafka_destroy..." << endl;
  rd_kafka_topic_destroy(rkt);
  rd_kafka_destroy(rk);
  cout << "rd_kafka_destroy OK" << endl;
}

Despite the default configuration of serdes I also tried to force the serializing.framing/deserializing.framing = cp1 and I got the same error.

Am I missing something?

Note: If I publish the json as a string (not avro), everything is ok.

Thanks a lot for your help Joao Rego

edenhill commented 4 years ago

You're mixing C++ with the librdkafka C API, you will probably want out.size() rather than sizeof(out), and out.data() rather than &out here: if (rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, &out, sizeof(out), (char*)myKey.c_str(), myKey.size(), NULL) == -1)

jnarego commented 4 years ago

Hello @edenhill

Thanks a lot for your help. That was the problem.

I'm mixing rdkafka C and libserdes C++ because I need to set ssl configs and I can't manage how I do than in rdkafka C++ and the RdKafka::Conf::set_ssl_cert. I used all the combinations of RdKafka::CertificateType RdKafka::CertificateEncoding And always get an error of invalid encoding

// cliente config rd_kafka_conf_set(conf, "security.protocol", "SSL", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "ssl.ca.location", "/home/mysim/tmp/ca.crt", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "ssl.certificate.location", "/home/mysim/tmp/user.crt", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "ssl.key.location", "/home/mysim/tmp/user.key", errstr, sizeof(errstr));

I can't find examples how to do that. Thanks a lot Joao Rego

edenhill commented 4 years ago

Can you create a separate issue on librdkafka with the problems you find with C++ set_ssl_cert()?