confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
77 stars 1.04k forks source link

Stream created using value_schema_id for protobuf schema fails to deserialise records when used in join #9162

Closed SamiShaikh closed 2 years ago

SamiShaikh commented 2 years ago

Describe the bug Stream created using value_schema_id for protobuf schema fails to deserialise records when used in join See illustration below. first_stream_2 is created without value_schema_id, first_stream is created using VALUE_SCHEMA_ID

ksql> select `FIELD1` from first_stream_2 s1 LEFT JOIN country_table t1 ON t1.FIELD1 = s1.FIELD2  emit changes;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|FIELD1                                                                                                                                                                                                         |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1                                                                                                                                                                                                                |
^CQuery terminated
ksql> select `FIELD1` from first_stream s1 LEFT JOIN country_table t1 ON t1.FIELD1 = s1.FIELD2  emit changes;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|FIELD1                                                                                                                                                                                                         |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Error in processing query. Check server logs for details.
Query terminated

To Reproduce Register schema

syntax = "proto3";

message Message123 {
  string FIELD1 = 1;
  string FIELD2 = 2;
  string FIELD3 = 3;
}

Create a table for RHS and produce a key that is going to match the event column used in join

create table country_table(FIELD1 string primary key, FIELD2 string) with(kafka_topic='countrytopic', partitions=1, value_format='protobuf');
ksql> select * from country_table emit changes;
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|FIELD1                                                                                        |FIELD2                                                                                            |
+---------------------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------------------+
|DE                                                                                                 |mcc                                                                                                |

Produce a message

./bin/kafka-protobuf-console-producer --bootstrap-server localhost:9092 --topic first_topic_3 --property schema.registry.url=http://localhost:8081 --property value.schema.id=12
{"FIELD1":"1","FIELD2":"DE","FIELD3":"One"}

Create streams

ksql> CREATE STREAM first_stream_2
>WITH
>(
>  KAFKA_TOPIC = 'first_topic',
>  VALUE_FORMAT = 'PROTOBUF',
>  KEY_FORMAT = 'KAFKA',
>  PARTITIONS = 1
>);

ksql> CREATE STREAM first_stream
>WITH
>(
>  KAFKA_TOPIC = 'first_topic',
>  VALUE_FORMAT = 'PROTOBUF',
>  KEY_FORMAT = 'KAFKA',
>  PARTITIONS = 1,
>  VALUE_SCHEMA_ID = '12'
>);

Create a join

ksql> select `FIELD1` from first_stream_2 s1 LEFT JOIN country_table t1 ON t1.FIELD1 = s1.FIELD2  emit changes;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|FIELD1                                                                                                                                                                                                         |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1                                                                                                                                                                                                                |
^CQuery terminated
ksql> select `FIELD1` from first_stream s1 LEFT JOIN country_table t1 ON t1.FIELD1 = s1.FIELD2  emit changes;
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|FIELD1                                                                                                                                                                                                         |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Error in processing query. Check server logs for details.
Query terminated

Expected behavior Both joins should emit records

Actual behaviour Only the join using stream created without VALUE_SCHEMA_ID emits records

Additional context The join using stream with VALUE_SCHEMA_ID fails in repartition stream

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: _confluent-ksql-stdalonetransient_transient_S1_2748697664827909033_1654093227745-Join-repartition. Schema from Schema Registry misses field with name: S1_FIELD1
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56)
    at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
    at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:186)
    at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:149)
    at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
    at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:157)
    ... 38 more
suhas-satish commented 2 years ago

closing as duplicate of https://github.com/confluentinc/ksql/issues/9163