confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
44 stars 1.04k forks source link

Using compiled class with protobuf #9554

Open hjwalt opened 1 year ago

hjwalt commented 1 year ago

Is your feature request related to a problem? Please describe.

We use protobuf bytes for our Kafka record key and payload without Confluent wire format. With the addition of protobuf_nosr, we can use implicit mapping from schema to table / stream assuming:

  1. The same data type is used
  2. The sequence is maintained
  3. There are no skipped sequence number

Example:

Proto schema:

message Payload {
  int64 id  = 1;
  string some_value = 2;
}

Implicitly correct protobuf_nosr stream:

CREATE STREAM payload (
  id BIGINT,
  some_value STRING
) WITH (
  KAFKA_TOPIC='some.existing.topic', 
  VALUE_FORMAT='PROTOBUF_NOSR'
);

The first assumption is acceptable. The remaining assumption causes correctness to depend on uncontrollable implicit mechanism.

To illustrate, with the following schema and stream combination and Kafka payload generated by another producer, the stream will silently fail due to ordering.

message Payload {
  int64 id  = 1;
  string some_value = 2;
}
CREATE STREAM payload (
  some_value STRING,
  id BIGINT
) WITH (
  KAFKA_TOPIC='some.existing.topic', 
  VALUE_FORMAT='PROTOBUF_NOSR'
);

Also the following combination will silently fail due to skipped field:

message Payload {
  int64 id  = 1;
  string some_value = 3;
}
CREATE STREAM payload (
  id BIGINT,
  some_value STRING
) WITH (
  KAFKA_TOPIC='some.existing.topic', 
  VALUE_FORMAT='PROTOBUF_NOSR'
);

Describe the solution you'd like

This problem can be resolved by inferring field index from protobuf descriptor information if we can supply:

  1. A JAR of protobuf classes, which should be scanned as if it is a library JAR (similar to UDF construct)
  2. The full class name of the class referred by the table or stream data structure
CREATE STREAM payload (
  id BIGINT,
  some_value STRING
) WITH (
  KAFKA_TOPIC='some.existing.topic', 
  VALUE_FORMAT='PROTOBUF_CLASS',
  VALUE_PROTOBUF_CLASS='com.foo.Payload'
);

This way the protobuf index of id and some_value can be inferred with field descriptor information from the class.

Describe alternatives you've considered

Use schema registry format (PROTOBUF)

An option, but a separate concern from this feature

Use schema registry url instead of VALUE_PROTOBUF_CLASS with protostuff / protoparser

Schema registry stores protobuf schema as the text file, not the compiled proto descriptor. This cannot be directly used by standard protobuf library for getting field descriptor information

Protoparser: already archived Protostuff: an option, but still not the standard way for protobuf in java

Use other kind of schema registry

An example is stencil. This might work, but I think from the perspective of most people in the community, this is even more non-standard.

Get schema registry to store protobuf descriptors

Maybe? but not the right repository.

Additional context

I, of course, do not know yet how complex the field mapping and descriptor assignment will be based on the data structure and classpath scanning that is internally used by ksqldb. However, if this sounds like something useful for ksqldb I am happy to work on this.

suhas-satish commented 1 year ago

@hjwalt , it will be great if you can work on it. Let me know if you need any help getting started

hjwalt commented 1 year ago

I'll start on it and let you know if I need any help, either from here or from the community slack!

hjwalt commented 1 year ago

I'm considering yet another thing that will extend this concept to add dynamic serde, but that might need a KLIP. So the idea for this implementation will be to create a path towards that.

Plan as of now:

  1. Create SerdeClassLoader similar to UDF class loader, this will scan from KSQL_KSQL_SERDE_DIR defaulting to /opt/ksqldb-serdes in ksqldb-serde in serde package
  2. Create the necessary classes for protobuf_class format (Translator, Converter, etc) in ksqldb-serde in protobuf package
  3. Add configuration documentation for VALUE_PROTOBUF_CLASS, which is practically just the full class name of the protobuf class

Is there anything else I need to take note of @suhas-satish ?