confluentinc / schema-registry

Confluent Schema Registry for Kafka
https://docs.confluent.io/current/schema-registry/docs/index.html
Other
2.19k stars 1.11k forks source link

Cannot make avro deserialization with KafkaAvroDeserializer #636

Open margitbomholt opened 6 years ago

margitbomholt commented 6 years ago

Hi I'm a bit confused, but I'm also fairly new to the Kafka and Schema-registry. We have made more consumers and producers to Kafka, using "ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");" and they work well (together) amd they can deserialize records from topics that are produced from a "node-red" producer (node-red : https://nodered.org/).

We have also newly started to try to use Kafka-streams and it seems that if we have to get any luck with the streams applications, we have to produce records using "ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());"

If we try to "consume/stream" data from a topic generated from "node-red" using the "io.confluent.kafka.serializers.KafkaAvroDeserializer" we get the following error:

"Exception in thread "main" org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition xxxx at offset 0 Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!"

The schema is in the scehemaregistry (we provided that using Postman)

What are we doing wrong?

Has it been seen before, reading from a topic generated with "node-red" using the "io.confluent.kafka.serializers.KafkaAvroDeserializer" ?

The testcomsumer code is attached

Any inputs will be highly appreciated.

yarshad commented 6 years ago

Can you share your testconsumer code

margitbomholt commented 6 years ago

TestNodeRedAvroConsumer.zip

margitbomholt commented 6 years ago

Hi

I have added the file on GitHub and here it is. Thanks.

Best Regards

Margit Kjær Bomholt GIS udvikler

http://www.artogis.dk/

mkb@artogis.dk mailto:mkb@artogis.dk
Fra: yarshad [mailto:notifications@github.com] Sendt: 29. september 2017 18:00 Til: confluentinc/schema-registry Cc: Margit Kjær Bomholt; Author Emne: Re: [confluentinc/schema-registry] Cannot make avro deserialization with KafkaAvroDeserializer (#636)

Can you share your testconsumer code — You are receiving this because you authored the thread. Reply to this email directly, view it on GitHub https://github.com/confluentinc/schema-registry/issues/636#issuecomment-333164825 , or mute the thread https://github.com/notifications/unsubscribe-auth/AFuJHMwdwwdakqsslaPNSrTcCIM6CwQCks5snRI9gaJpZM4PnVQg . https://github.com/notifications/beacon/AFuJHILI0GsP9h6Q5UeroHeSayZuxJGAks5snRI9gaJpZM4PnVQg.gif

mageshn commented 6 years ago

@margitbomholt The messages would need to be serialized the right way for it to be deserialized right. Are you using KafkaAvroSerializer during the message produce or the serialized message should conform to how KafkaAvroSerializer does serialization.

IRobL commented 6 years ago

Here's a link to the gist for the uploaded code. I have a similar error message, and am currently trying to figure out why it won't deserialize with a spark job, but it will with kafka-avro-console-consumer.sh

https://gist.github.com/IRobL/3452c7c33548cd72e1659897289e584e

loicdescotte commented 6 years ago

Hi, I have the same issue, did you find a solution? I use confluent HDFS Connector (HDFS Sink) to deserialize the avro messages.

margitbomholt commented 6 years ago

We decided not to use the avro serialization, for this project. It's a bit far away, as it was some experiments we made 8 months ago, but we did manage to get data through from node-red to Kafka and further on to Kafka Streaming process, we found that it was nessesary that we in Node-red added a missing "magic" byte as a part of the "datablock" that was send to Kafka. Hopes this can help you.

jeroenr commented 6 years ago

I got the same issue. Any update / pointers on this?

marcintustin commented 6 years ago

+1 to this. This appears not to be able to read data written with the code below:

import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericDatumReader;
import java.io.DataInputStream;
import org.apache.avro.io.DatumReader;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Decoder;

import java.io.ByteArrayOutputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.EOFException;

  public byte[] jsonToAvro(byte[] data, org.apache.avro.Schema schema) {
    try {
      String strData = new String(data, "UTF-8");
      DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
      Decoder decoder = DecoderFactory.get().jsonDecoder(schema, strData);

      ByteArrayOutputStream output = new ByteArrayOutputStream();
      DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(new GenericDatumWriter<GenericRecord>());
      writer.create(schema, output);

      GenericRecord datum;
      while (true) {
        try {
          datum = reader.read(null, decoder);
        } catch (EOFException eofe) {
          break;
        }
        writer.append(datum);
      }
      writer.flush();
      System.out.println("Avroified datum:"+new String(output.toByteArray(), "UTF-8"));
      return output.toByteArray();
    } catch (IOException e) {
      throw new DataException("Failed to convert to AVRO.", e);
    }
  }
rgilles commented 6 years ago

Hi all, I add the same issue while using the kafka-connect-jdbc I did an error in the configuration of the connector. In fact, the documentation here: Installing and Configuring Kafka Connect advise you to configure the convert with AvroConverter. This is where I was wrong I use the default converter configuration which is JsonConverter. In a case of KafkaProducer you must do the same thing but with Serializer this time:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer );
props.put("schema.registry.url", "http://localhost:8081");
new KafkaProducer<>(props)

In conclusion: make sure you have setup Avro**** to produce record in a topic. For me, it has solved the Unknown magic byte! issue.

Regards, Romain

bnymnsntrk commented 2 years ago

Hi all, I add the same issue while using the kafka-connect-jdbc I did an error in the configuration of the connector. In fact, the documentation here: Installing and Configuring Kafka Connect advise you to configure the convert with AvroConverter. This is where I was wrong I use the default converter configuration which is JsonConverter. In a case of KafkaProducer you must do the same thing but with Serializer this time:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "io.confluent.kafka.serializers.KafkaAvroSerializer );
props.put("schema.registry.url", "http://localhost:8081");
new KafkaProducer<>(props)

In conclusion: make sure you have setup Avro**** to produce record in a topic. For me, it has solved the Unknown magic byte! issue.

Regards, Romain

I'm already using these codes and I still get the error. But the thing is, I can consume (deserialize) messages from the producer without a problem. It gives that error when I try to receive them from connector. Probably there is a configuration mistake with the connector.