redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.13k stars 830 forks source link

Schema Registry Issue | Serialization/Deserialization Decimal Logical Type #1040

Open AplHelp05 opened 2 years ago

AplHelp05 commented 2 years ago

Hello everyone, I am having some issues with the serialization/deserialization of a message that contains a Decimal Logical Type.

Serialization Issue

Scenario: We have a Avro Schema with two fields: DepartmentId (String) & PaymentFee (Logical Type: Decimal), When We tried to serialize the following message: { "departmentId": "123HumanResources", "paymentFee": 23.900 }

We get the following issue: Error: cannot encode binary record "payment" field "paymentFee": value does not match its schema: cannot transform to bytes, expected *big.Rat, received json.Number

Deserialization Issue

Scenario: We have a Dotnet Producer (which sends the exact same message), When Benthos deserializes the message, the value comes in Null & if we add .string() the value of the paymentFee is "239/10"

- bloblang: |-
    root = this
    root.departmentId = this.departmentId
    root.paymentFee = this.paymentFee.string()

Questions

  1. Is Benthos able to serialize/contains Messages that contains a Decimal Logical Type?

Notes

Here is a little poc with the issue: https://github.com/AplHelp05/SchemaRegistryPoC

Jeffail commented 2 years ago

Under the hood benthos is using github.com/linkedin/goavro/v2, specifically using https://pkg.go.dev/github.com/linkedin/goavro/v2#Codec.NativeFromBinary and https://pkg.go.dev/github.com/linkedin/goavro/v2#Codec.BinaryFromNative to do conversions.

For the purpose of decoding avro into a generic format we could probably solve this to a satisfactory level by walking the resulting structure and manually picking out these values. However, I'm not sure what we can do for serialization without either forking or commiting upstream the ability to work with json.Number values, as currently that's the standard way for benthos internals to carry large numerical values.

I've therefore flagged as needs investigation. At the same time we ought to look into any other custom types that we might need to convert to/from.

AplHelp05 commented 2 years ago

Perfect, thank you so much! Also, at first We tried to implement a couple of plugging in order to do the conversations, I can shared them with you, maybe they can be useful:

Convert_float_to_bigRat

Notes: The problem is when the result is too big we lose precision of the value, for example: 3242591731706757/72057594037927936 = 0.180 (value generated by the plugging), it is pushed to kafka like 179/1000 = 0.179 (value generated when benthos pushes the message to the server)

func ConvertFloatToBigRat(floatVal float64) (interface{}, error) {
    bigRatValue := new(big.Rat).SetFloat64(floatVal)
    return bigRatValue, nil
}

Convert_bigRat_to_float

Notes: Returns a Float number with three fractional numbers (for example: 0.100) & receives the a string parameter, for example: this.paymentFee.string() = "239/10"

func ConvertBigRatToFloat(bigRatValue string) (interface{}, error) {
    resultBigRat, exact := new(big.Rat).SetString(strings.Replace(bigRatValue,"\"","",-1))
    if exact == false {
        err := errors.New("Argument BigRat cannot be undefined or null")
        return nil, err
    }

    floatVal,exactValue := resultBigRat.Float64() 

    if exactValue {
        v := math.Round(floatVal*100)/1000
        return v,nil
    }
    return floatVal, nil
}

Maybe they can be useful:)

mihaitodor commented 2 years ago

First of all, thanks for reporting this bug and providing reproduction steps!

TL; DR: Note that this fix I made to schema_registry_decode, which went into Benthos v4.0.0, might impact you when you'll upgrade (see below), but it makes ConvertBigRatToFloat unnecessary. Also, you can use the bloblang number() method instead of ConvertFloatToBigRat or just get your number in the input JSON to be contained in a string, as it should probably be represented for this particular logical type (see below).

I did some digging and testing and here is what I found. Brace yourselves, this is going to be a doozy...

In order to test stuff I ran the following prerequisites:

> docker run --rm -p 8081:8081 -p 8082:8082 -p 9092:9092 --name redpanda docker.vectorized.io/vectorized/redpanda redpanda start --smp 1 --overprovisioned --kafka-addr 0.0.0.0:9092 --advertise-kafka-addr 0.0.0.0:9092 --pandaproxy-addr 0.0.0.0:8082 --advertise-pandaproxy-addr 0.0.0.0:8082
> curl --location --request POST 'http://localhost:8081/subjects/payment/versions' --header 'Content-Type: application/json' --data-raw '{"schema": "{ \"type\": \"record\", \"name\": \"payment\", \"fields\": [ { \"name\" : \"departmentId\" , \"type\" : \"string\" }, { \"name\" : \"paymentFee\" , \"type\" : { \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 18, \"scale\": 3 }} ]}"}'

For the serialisation issue, as you already noticed, it looks like the underlying goavro library expects the number to be stored as a string in the JSON for this particular schema. For example, this works:

input:
  generate:
    mapping: |
      root = {
        "departmentId": "123HumanResources",
        "paymentFee": 23.900
      }
    interval: 0s
    count: 1

pipeline:
  processors:
    - bloblang: |
        root = this
        root.paymentFee = this.paymentFee.string()
    - schema_registry_encode:
        url: http://localhost:8081
        subject: payment
        refresh_period: 10m
        avro_raw_json: true
    - log:
        message: "Encode error: ${! error() }"

output:
  kafka:
    addresses:
      - localhost:9092
    topic: payment
    client_id: benthos

Note that I'm setting avro_raw_json: true for the schema_registry_encode processor to force Benthos to send the message as a JSON string to goavro instead of a structured object, because goavro's NativeFromTextual should be called first before calling BinaryFromNative. I guess in simple cases it's fine to skip NativeFromTextual and call BinaryFromNative directly with a Benthos structured object (by setting avro_raw_json: false), but it definitely doesn't work for logical types. As you can see above, I'm forcing the paymentFee field to be a string via the bloblang processor (this might be a bit problematic, because 23.900 being converted to "23.9" might be undesirable in some cases), which means that Benthos has to unmarshal the JSON string into an object, apply this transformation and then marshal it back into a JSON string in order to call NativeFromTextual , which is definitely not ideal from a performance perspective... I can see here and here how they end up with bytes.decimal for this particular logical type and then here they're trying to parse it from the given JSON string and reject anything that doesn't start with ". I guess accepting a number in this case contradicts the bytes part of bytes.decimal?

I decided to check what Java does in this case and it barfs with error org.apache.avro.AvroTypeException: Expected bytes. Got VALUE_NUMBER_FLOAT, so I think it's probably best to not change this, given that both implementations agree with each other. If you wish to play with the Java code, here it is (I'm not a Java developer, so it's just some Stackoverflow copy / pasta):

// > export JAVA_HOME=/usr/local/opt/openjdk
// > export PATH="${JAVA_HOME}/bin:$PATH"
// > java --version
// openjdk 18.0.1 2022-04-19
// OpenJDK Runtime Environment Homebrew (build 18.0.1+0)
// OpenJDK 64-Bit Server VM Homebrew (build 18.0.1+0, mixed mode, sharing)
// > java -cp avro_1.11/avro-1.11.0.jar:avro_1.11/jackson-core-2.12.5.jar:avro_1.11/jackson-annotations-2.12.5.jar:avro_1.11/jackson-databind-2.12.5.jar:avro_1.11/slf4j-api-1.7.32.jar Main.java

import java.math.BigInteger;
import java.util.Arrays;
import java.io.*;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.*;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.Conversions;
import org.apache.avro.file.DataFileReader;
import java.util.HexFormat;

public class Main {
    static byte[] fromJsonToAvro(String json, Schema schema) throws Exception {
        InputStream input = new ByteArrayInputStream(json.getBytes());
        DataInputStream din = new DataInputStream(input);

        Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

        DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
        Object datum = reader.read(null, decoder);

        GenericDatumWriter<Object>  w = new GenericDatumWriter<Object>(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);

        w.write(datum, e);
        e.flush();

        return outputStream.toByteArray();
    }
    public static void main(String[] args) {
        try {
            String json = "{\"departmentId\": \"123HumanResources\",\"paymentFee\": 23.900}";

            String schemaJSON ="{ \"type\": \"record\", \"name\": \"payment\", \"fields\": [ { \"name\" : \"departmentId\" , \"type\" : \"string\" }, { \"name\" : \"paymentFee\" , \"type\" : { \"type\": \"bytes\", \"logicalType\": \"decimal\", \"precision\": 18, \"scale\": 3 }} ]}";

            Schema schema = new Schema.Parser().parse(schemaJSON);

            byte[] avroByteArray = fromJsonToAvro(json, schema);

            DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);

            Decoder decoder = DecoderFactory.get().binaryDecoder(avroByteArray, null);
            GenericRecord record = reader.read(null, decoder);
            System.out.println(record);
        }
        catch (Exception e) {
            System.out.println(e);
        }
    }
}

Try replacing 23.900 with \"23.900\" on line 47 and it will no longer emit an error.

For the deserialisation issue, since this fix I mentioned at the beginning went in, we get the following output:

INFO 20                                            @service=benthos label="" path=root.pipeline.processors.1
{"departmentId":"123HumanResources","paymentFee":"23.9"}

for this config:

input:
  kafka:
    addresses:
      - localhost:9092
    topics:
      - payment
    consumer_group: foo
    client_id: benthos

pipeline:
  processors:
    - schema_registry_decode:
        url: http://localhost:8081
    - log:
        message: ${! json("paymentFee").number() - 3.9}

output:
  stdout: {}

Note that I had to cast the paymentFee explicitly to a number, since schema_registry_decode will deserialise it as a string.

For your particular schema, things seem OK so far, but let's look at the following example:

> curl --location --request POST 'http://localhost:8081/subjects/payment/versions' --header 'Content-Type: application/json' --data-raw '{"schema": "{ \"type\": \"record\", \"name\": \"bytesdecimal\", \"fields\": [ { \"default\": null, \"name\": \"pos_0_33333333\", \"type\": [ \"null\", { \"logicalType\": \"decimal\", \"precision\": 16, \"scale\": 2, \"type\": \"bytes\" } ] } ]}"}'
input:
  generate:
    mapping: |
      root = {"pos_0_33333333": "!"}
    interval: 0s
    count: 1

pipeline:
  processors:
    - schema_registry_encode:
        url: http://localhost:8081
        subject: payment
        refresh_period: 10m
        avro_raw_json: true
    - log:
        message: "Encode error: ${! error() }"

output:
  kafka:
    addresses:
      - localhost:9092
    topic: bytesdecimal
    client_id: benthos
input:
  kafka:
    addresses:
      - localhost:9092
    topics:
      - bytesdecimal
    consumer_group: foo
    client_id: benthos

pipeline:
  processors:
    - schema_registry_decode:
        url: http://localhost:8081
    - log:
        message: "Decode error: ${! error() }"

output:
  stdout: {}

On the consumer side we get the following output after my fix:

{"pos_0_33333333":{"bytes.decimal":"!"}}

However, that's not what most people would expect. First, we lose the precision and scale from the schema (but maybe that should be expected, given how this type is declared in the schema?), but we also get that extra nesting in there with the type as a key, which might not be all that useful. Also, plugging this schema and JSON into the Java code above, we get this output:

// ...
String json = "{\"pos_0_33333333\":{\"bytes\":\"!\"}}";
String schemaJSON ="{ \"type\": \"record\", \"name\": \"bytesdecimal\", \"fields\": [ { \"default\": null, \"name\": \"pos_0_33333333\", \"type\": [ \"null\", { \"logicalType\": \"decimal\", \"precision\": 16, \"scale\": 2, \"type\": \"bytes\" } ] } ]}";
// ...
{"pos_0_33333333": "!"}

But that's not what goavro's TextualFromNative outputs...

Note: I used {"pos_0_33333333":{"bytes":"!"}}" because I'm not sure how to get Java to accept {"pos_0_33333333": "!"} just like goavro does for this particular schema. However, the format I used seems to get parsed correctly. This new goavro issue makes me think that maybe goavro should also expect {"pos_0_33333333":{"bytes":"!"}}" instead of {"pos_0_33333333": "!"} for this schema, but not sure. It rejects the former format, in case you're wondering.

The good news is that I just tested this code that the current goavro maintainer is planning to merge soon and it seems to produce the same output as Java, namely {"pos_0_33333333":"!"}, so we should finally be able to round trip...

But, this isn't the end of the story, unfortunately. For example, the Snowflake Kafka Connector uses Conversions.DecimalConversion, presumably because in cases like this one, they don't want to lose the precision and scaling. So if I modify the above Java code like so (I replaced the reader in main and added the necessary imports):

// > export JAVA_HOME=/usr/local/opt/openjdk
// > export PATH="${JAVA_HOME}/bin:$PATH"
// > java --version
// openjdk 18.0.1 2022-04-19
// OpenJDK Runtime Environment Homebrew (build 18.0.1+0)
// OpenJDK 64-Bit Server VM Homebrew (build 18.0.1+0, mixed mode, sharing)
// > java -cp avro_1.11/avro-1.11.0.jar:avro_1.11/jackson-core-2.12.5.jar:avro_1.11/jackson-annotations-2.12.5.jar:avro_1.11/jackson-databind-2.12.5.jar:avro_1.11/slf4j-api-1.7.32.jar Main.java

import java.math.BigInteger;
import java.util.Arrays;
import java.io.*;
import org.apache.avro.Schema;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.*;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.Conversions;
import org.apache.avro.file.DataFileReader;
import java.util.HexFormat;

public class Main {
    static byte[] fromJsonToAvro(String json, Schema schema) throws Exception {
        InputStream input = new ByteArrayInputStream(json.getBytes());
        DataInputStream din = new DataInputStream(input);

        Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

        DatumReader<Object> reader = new GenericDatumReader<Object>(schema);
        Object datum = reader.read(null, decoder);

        GenericDatumWriter<Object>  w = new GenericDatumWriter<Object>(schema);
        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

        Encoder e = EncoderFactory.get().binaryEncoder(outputStream, null);

        w.write(datum, e);
        e.flush();

        return outputStream.toByteArray();
    }
    public static void main(String[] args) {
        try {
            String json = "{\"pos_0_33333333\":{\"bytes\":\"!\"}}";

            String schemaJSON ="{ \"type\": \"record\", \"name\": \"bytesdecimal\", \"fields\": [ { \"default\": null, \"name\": \"pos_0_33333333\", \"type\": [ \"null\", { \"logicalType\": \"decimal\", \"precision\": 16, \"scale\": 2, \"type\": \"bytes\" } ] } ]}";

            Schema schema = new Schema.Parser().parse(schemaJSON);

            byte[] avroByteArray = fromJsonToAvro(json, schema);

            final GenericData genericData = new GenericData();
            genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
            DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema, schema, genericData);

            Decoder decoder = DecoderFactory.get().binaryDecoder(avroByteArray, null);
            GenericRecord record = reader.read(null, decoder);
            System.out.println(record);
        }
        catch (Exception e) {
            System.out.println(e);
        }
    }
}

then I get the following output: {"pos_0_33333333": 0.33}, which is what people seem to really want in this case... But I don't see any way in goavro to achieve this, at least not via the public API.

Anyhow, I'll open a PR here to update the schema_registry_decode implementation once https://github.com/linkedin/goavro/pull/249 gets merged and, in the meantime, I'll ask the goavro maintainer if he's interested in maintaining some extra APIs which would allow Benthos to emit exactly what the Snowflake Kafka Connector is emitting.