Aiven-Open / opensearch-connector-for-apache-kafka

Aiven's OpenSearch® Connector for Apache Kafka®
Apache License 2.0
64 stars 35 forks source link

Protobuf message with oneof generates too verbose document #187

Open yvesk opened 1 year ago

yvesk commented 1 year ago

Hi there, having a Protobuf schema like the following generates too verbose documents in OpenSearch

syntax = "proto3";
option java_package = "com.test";
option java_multiple_files = true;

message Event {
    message Value {
      oneof value {
        int32 int_value = 1;
        int64 long_value = 2;
        float float_value = 3;
        double double_value = 4;
        bool boolean_value = 5;
        bytes bytes_value = 6;
        string string_value = 7;
      }
    }
   map<string, Value> values = 1;
}

A message containing a values map with each type type once will produce the following document:

"value": {
        "doubleVal": {
          "value_0": {
            "int_value": null,
            "long_value": null,
            "float_value": null,
            "double_value": 1.23,
            "boolean_value": null,
            "bytes_value": null,
            "string_value": null
          }
        },
        "intVal": {
          "value_0": {
            "int_value": 123,
            "long_value": null,
            "float_value": null,
            "double_value": null,
            "boolean_value": null,
            "bytes_value": null,
            "string_value": null
          }
        },
        "floatVal": {
          "value_0": {
            "int_value": null,
            "long_value": null,
            "float_value": 2.34,
            "double_value": null,
            "boolean_value": null,
            "bytes_value": null,
            "string_value": null
          }
        },
        "boolVal": {
          "value_0": {
            "int_value": null,
            "long_value": null,
            "float_value": null,
            "double_value": null,
            "boolean_value": true,
            "bytes_value": null,
            "string_value": null
          }
        },
        "longVal": {
          "value_0": {
            "unit": null,
            "int_value": null,
            "long_value": 112233,
            "float_value": null,
            "double_value": null,
            "boolean_value": null,
            "bytes_value": null,
            "string_value": null
          }
        }
}
gharris1727 commented 1 year ago

The Kafka Connect schema system does not natively support the "oneof" semantics that Protobuf does, and it appears that whatever has performed the translation (your value.converter) is doing the next best thing: treating each of the elements of the oneof as a field in the containing message, and leaving the unset fields as null.

Looking at the connector's code, it's using the JsonConverter to re-serialize the data, which will preserve these explicit nulls in the output JSON sent to OpenSearch. I don't see any logic which would filter out the unset fields, either from the schema or the value. I also don't think such logic is appropriate to add to the connector, as this is a protobuf-specific problem and may appear in another connector in a similar fashion.

@yvesk If you wish to remove these unset fields from your output, I think you will need a custom Transformation to drop the field(s) in the schema for each field which has a null value. This would mean that every message which gets to the connector will have a different schema, which only contains definitions for one field out of the multiple defined in the oneof. This SMT may be useful to add to https://github.com/aiven/transforms-for-apache-kafka-connect , so if you or anyone else would like to contribute this fix, we can discuss it more there.

Alternatively, the deserializing converter could be adjusted to produce varying schemas which only define the field which is set, essentially performing the operation of the SMT inside of the converter. What value.converter are you using for this example, and how is the translation from protobuf to connect schema being performed (karapace/schema registry/etc)?