confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
78 stars 1.04k forks source link

Ksqldb Query with Value Schema ID expects data type of keys and headers to be from Schema Registry #9577

Open nbonavia opened 1 year ago

nbonavia commented 1 year ago

I have a strange behaviour in ksqldb 0.27.2 where I’m creating a Stream with a specific value_scema_id

CREATE SOURCE STREAM IF NOT EXISTS user_profile_updated (
    "_tenant_id" BYTES HEADER('tenant_id')
    , "_traceparent" BYTES HEADER('traceparent')
    , "_is_performance_test" BYTES HEADER('is_performance_test')
    , "_is_test" BYTES HEADER('is_test')
    , `Key` string key
) WITH (
    kafka_topic='user_profile_updated'
    ,value_format='avro'
    ,partitions=${partitions}
    ,value_schema_id = ${user_profile_updated_value_schema_id}
);

This creates a valid stream, and I can query without any problems, however when I run the below query, that involves a Group By on the Key, which is not included in the value schema registry (because it is the kafka message’s key)

select    
    latest_by_offset(`CreatedOn`) as "CreatedOn"
from user_profile_updated
GROUP BY `Key`
HAVING `Key` is not null
em
it changes limit 1;

I get a stacktrace error informing me that The Schema from Schema Registry misses field with name: Key. I know it’s missing in the schema registry because it was specified as the key. Should I have done anything in order to tell ksqldb the type of the key? I also get the same error if I include any of the columns that are part of the Kafka Header. Stacktrace is as follows: -

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=user_profile_updated, partition=0, offset=191386, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: _confluent-ksql-endeavourtransient_transient_USER_PROFILE_UPDATED_3700099330102127396_1663586982533-Aggregate-Aggregate-Materialize-changelog. Schema from Schema Registry misses field with name: Key
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56)
    at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
    at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:188)
    at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:151)
    at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
    at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:83)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:74)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:31)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:195)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:330)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:330)
    at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:127)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:121)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
    at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
    at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
    at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
    at org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:100)
    at org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:81)
    at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1182)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:768)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:588)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:550)
Caused by: io.confluent.ksql.util.KsqlException: Schema from Schema Registry misses field with name: Key
    at io.confluent.ksql.serde.connect.ConnectSRSchemaDataTranslator.validate(ConnectSRSchemaDataTranslator.java:48)
    at io.confluent.ksql.serde.avro.AvroSRSchemaDataTranslator.toConnectRow(AvroSRSchemaDataTranslator.java:50)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:52)
    ... 39 more
suhas-satish commented 1 year ago

@nbonavia , we fixed a bunch of schema_id bugs in 0.28.2. Can you try if it reproduces there ? Thanks

nbonavia commented 1 year ago

@nbonavia , we fixed a bunch of schema_id bugs in 0.28.2. Can you try if it reproduces there ? Thanks

I have upgraded from ksqldb 0.27.2 to ksqldb 0.28.2 and unfortunately the issue is still present. To give you an update, I have the following source stream: -

CREATE SOURCE STREAM IF NOT EXISTS user_profile_updated (
    "_tenant_id" BYTES HEADER('tenant_id')
    , "_traceparent" BYTES HEADER('traceparent')
    , "_is_performance_test" BYTES HEADER('is_performance_test')
    , "_is_test" BYTES HEADER('is_test')
    , `Key` string key
) WITH (
    kafka_topic='user_profile_updated'
    ,value_format='avro'
    ,partitions=${partitions}
    ,value_schema_id = ${user_profile_updated_value_schema_id}
);

but when running the below query, it tries to find the Key from the Schema Registry, which is not there because it is declared in the user_profile_updated's stream definition.

SELECT
    `Key`
    ,AS_VALUE(`Key`) as "UserProfileId"
    ,TOEPOCH(Coalesce(latest_by_offset(`Current`->`LastUpdatedOn`),latest_by_offset(`CreatedOn`))) as "TimestampEpoch"
    ,TOTICKS(Coalesce(latest_by_offset(`Current`->`LastUpdatedOn`),latest_by_offset(`CreatedOn`))) as "TimestampTicks"
    ,Coalesce(latest_by_offset(`Current`->`LastUpdatedOn`),latest_by_offset(`CreatedOn`)) as "Timestamp"
FROM user_profile_updated
GROUP BY `Key`
EMIT CHANGES
limit 1;

I got the following error in the log: -

org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-0000000000, topic=user_profile_updated, partition=0, offset=250397, stacktrace=io.confluent.ksql.serde.KsqlSerializationException: Error serializing message to topic: _confluent-ksql-endeavourtransient_transient_USER_PROFILE_UPDATED_4794616796213001027_1665561969788-Aggregate-Aggregate-Materialize-changelog. Schema from Schema Registry misses field with name: Key
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:56)
    at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
    at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:188)
    at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:151)
    at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
    at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:83)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:74)
    at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:31)
    at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:195)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:330)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:330)
    at org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:127)
    at org.apache.kafka.streams.kstream.internals.KStreamAggregate$KStreamAggregateProcessor.process(KStreamAggregate.java:121)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
    at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
    at org.apache.kafka.streams.kstream.internals.KStreamTransformValues$KStreamTransformValuesProcessor.process(KStreamTransformValues.java:66)
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:290)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:269)
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:228)
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
    at org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:793)
    at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:872)
    at org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:793)
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:724)
slanj commented 1 year ago

I also have this strange behaviour. When I use value_schema_id to refer protobuf - I cannot use groupby. But If create protobuf stream without value_schema_id (just use the same name for stream and schema in schema registry, so ksqldb understands where to get schema) -groupby works.