farmdawgnation / registryless-avro-converter

An avro converter for Kafka Connect without a Schema Registry
Apache License 2.0
53 stars 21 forks source link

Using Binary Encode/Decoder #11

Open malhomaid opened 3 years ago

malhomaid commented 3 years ago

Hi Matt, Thank you for this project.

I had to use BinaryDecoder to decode my messages but I didn't know how to tell DataFileReader to use the BinaryDecoder to decode my messages so I used GenericDatumReader directly, I'm assuming you are using DataFileReader in case the avro file contained more than one message(which isn't my case at least for now), is this something from the Connect framework ?

Attached my code:

public SchemaAndValue toConnectData(String topic, byte[] value) {
  DatumReader<GenericRecord> datumReader;
  if (avroSchema != null) {
    datumReader = new GenericDatumReader<>(avroSchema);
  } else {
    datumReader = new GenericDatumReader<>();
  }
  GenericRecord instance;

  binaryDecoder = DecoderFactory.get().binaryDecoder(value, binaryDecoder);

  try {
    instance = datumReader.read(null, binaryDecoder);
  } catch (IOException ioe) {
    throw new DataException("Failed to deserialize Avro data from topic %s :".format(topic), ioe);
  }
    if (instance == null) {
      logger.warn("Instance was null");
    }

    if (avroSchema != null) {
      return avroDataHelper.toConnectData(avroSchema, instance);
    } else {
      return avroDataHelper.toConnectData(instance.getSchema(), instance);
    }
}
Brettuss commented 2 years ago

I will preface my comment with - "I AM NOT A JAVA DEVELOPER, AND I DON'T KNOW JAVA." I will also preface my comment with - "I AM NOT A DEVELOPER AT ALL"

I am a DBA that is dabbling in Data Engineering, but coding is my weakness.

I ran into an issue with a Python producer that was out of my jurisdiction that was producing to a topic using Avro, but was not using Schema Registry.

The serialization of the message value looked something like this:

def serialize(myschema, myobject):
     writer = DatumWriter(myschema)
     bytes_writer = io.BytesIO()
     encoder = BinaryEncoder(bytes_writer)
     writer.write(myobject, encoder)
     raw_bytes = bytes_writer.getvalue()
     return (raw_bytes)    

I was running into an issue with the RegistrylessAvroConverter - it was complaining that the value of the messages was not a valid Avro file. So, I went looking and found this issue from @mhomaid1 and got some feedback from the folks at r/ApacheKafka.

https://www.reddit.com/r/apachekafka/comments/qfpxb9/schema_registryless_avro_python_produced_messages/

His solution worked, for the most part. Here is what mine ended up looking like:

public SchemaAndValue toConnectData(String topic, byte[] value) {
    DatumReader<GenericRecord> datumReader;
    if (avroSchema != null) {
      datumReader = new GenericDatumReader<>(avroSchema);
    } else {
      datumReader = new GenericDatumReader<>();
    }
    GenericRecord instance;

    Decoder binaryDecoder = DecoderFactory.get().binaryDecoder(value, null);

    try {
      instance = datumReader.read(null, binaryDecoder);
    } catch (IOException ioe) {
      throw new DataException("Failed to deserialize Avro data from topic %s :".format(topic), ioe);
    }
      if (instance == null) {
        logger.warn("Instance was null");
      }

      if (avroSchema != null) {
        return avroDataHelper.toConnectData(avroSchema, instance);
      } else {
        return avroDataHelper.toConnectData(instance.getSchema(), instance);
      }
  }

I also had to have the correct imports:

import org.apache.avro.io.*;

I then had to compile while ignoring the test. I am guessing this was because I changed the .java file and the tests were no longer applicable to how the class was coded.

./gradlew build -x test

After that, my connector worked and was able to move data downstream. I am not happy about it, but I do appreciate the work of both @farmdawgnation and @mhomaid1