confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
127 stars 1.04k forks source link

Schema Registry 409 when issuing pull query against table with multi-column keys #7174

Closed agavra closed 2 years ago

agavra commented 3 years ago

Describe the bug

When issuing the follwoing pull query against a table with a multi-column key

SELECT * FROM  REEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR 
WHERE fishing_vessel_mmsi = '273219900' and reefer_mmsi='273219900';

ksqlDB returns the following error:

io.confluent.ksql.util.KsqlStatementException: Error serializing message to topic: _confluent-ksql-pksqlc-k8m02query_CTAS_REEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR_23-Aggregate-GroupBy-repartition. Failed to serialize Avro data from topic _confluent-ksql-pksqlc-k8m02query_CTAS_REEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR_23-Aggregate-GroupBy-repartition

When digging into the stack trace, it looks like we're trying to register a different schema during the pull query step than we do during the persistent query processing:

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema for subject _confluent-ksql-pksqlc-k8m02query_CTAS_REEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR_23-Aggregate-GroupBy-repartition-key and schema: {"type":"record","name":"ReefersAndVesselsCloseForGt2hrKey","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"FISHING_VESSEL_MMSI","type":["null","string"],"default":null},{"name":"REEFER_MMSI","type":["null","string"],"default":null}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "_confluent-ksql-pksqlc-k8m02query_CTAS_REEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR_23-Aggregate-GroupBy-repartition-key"; error code: 409; error code: 409
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:295)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:355)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:498)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:489)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:462)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:214)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:276)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:252)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:77)
    at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:143)
    at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84)
    at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:53)
    at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
    at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:173)
    at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:136)
    at io.confluent.ksql.serde.GenericSerializer.serialize(GenericSerializer.java:62)
    at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
    at org.apache.kafka.streams.processor.internals.DefaultStreamPartitioner.partition(DefaultStreamPartitioner.java:38)
    at org.apache.kafka.streams.processor.internals.StreamsMetadataState.getKeyQueryMetadataForKey(StreamsMetadataState.java:370)
    at org.apache.kafka.streams.processor.internals.StreamsMetadataState.getKeyQueryMetadataForKey(StreamsMetadataState.java:237)
    at org.apache.kafka.streams.processor.internals.StreamsMetadataState.getKeyQueryMetadataForKey(StreamsMetadataState.java:195)
    at org.apache.kafka.streams.KafkaStreams.queryMetadataForKey(KafkaStreams.java:1288)
    at io.confluent.ksql.execution.streams.materialization.ks.KsLocator.locate(KsLocator.java:88)
    at io.confluent.ksql.physical.pull.HARouting.handlePullQuery(HARouting.java:106)
    at io.confluent.ksql.engine.EngineExecutor.executePullQuery(EngineExecutor.java:171)
    at io.confluent.ksql.engine.KsqlEngine.executePullQuery(KsqlEngine.java:283)
    at io.confluent.ksql.rest.server.resources.streaming.PullQueryPublisher.lambda$subscribe$1(PullQueryPublisher.java:93)
    at io.confluent.ksql.rest.server.resources.streaming.PullQueryPublisher$PullQuerySubscription.request(PullQueryPublisher.java:140)
    at io.confluent.ksql.rest.server.resources.streaming.WebSocketSubscriber.onSubscribe(WebSocketSubscriber.java:47)
    at io.confluent.ksql.rest.server.resources.streaming.PullQueryPublisher.subscribe(PullQueryPublisher.java:109)
    at io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint.startPullQueryPublisher(WSQueryEndpoint.java:385)
    at io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint.handleQuery(WSQueryEndpoint.java:297)
    at io.confluent.ksql.rest.server.resources.streaming.WSQueryEndpoint.executeStreamQuery(WSQueryEndpoint.java:216)
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeWebsocketStream$20(KsqlServerEndpoints.java:293)
    at io.confluent.ksql.rest.server.KsqlServerEndpoints.lambda$executeOnWorker$21(KsqlServerEndpoints.java:304)
    at io.vertx.core.impl.ContextImpl.lambda$executeBlocking$2(ContextImpl.java:313)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

This is extra concerning because we should not be registering anything for a pull query!

To Reproduce

I have not reproduced this locally, but is the table definition for the one that caused the issue:

CREATE TABLE REEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR 
  WITH (KAFKA_TOPIC='REEFERS_AND_VESSELS_CLOSE_FOR_GT_2HR_V00') AS
SELECT   FISHING_VESSEL_MMSI,
         REEFER_MMSI,
         STRUCT("lat":=LATEST_BY_OFFSET(FISHING_VESSEL_LAT),"lon":=LATEST_BY_OFFSET(FISHING_VESSEL_LON)) AS LATEST_FISHING_VESSEL_LOCATION,
         STRUCT("lat":=LATEST_BY_OFFSET(REEFER_LAT),        "lon":=LATEST_BY_OFFSET(REEFER_LON))         AS LATEST_REEFER_LOCATION,
         MIN(DISTANCE_KM)                                                                                AS CLOSEST_DISTANCE_KM,
         MAX(DISTANCE_KM)                                                                                AS FURTHEST_DISTANCE_KM,
         MIN(FISHING_VESSEL_TS)                                                                          AS FIRST_TS,
         MAX(FISHING_VESSEL_TS)                                                                          AS LAST_TS,
         (MAX(FISHING_VESSEL_TS) - MIN(FISHING_VESSEL_TS)) / 1000                                        AS DIFF_SEC
FROM     REEFERS_AND_VESSELS_WITHIN_1KM WINDOW SESSION (10 MINUTES)
WHERE    IN_RANGE_AND_SPEED = 1
GROUP BY FISHING_VESSEL_MMSI,
         REEFER_MMSI
HAVING   (MAX(ROWTIME) - MIN(ROWTIME)) / 1000 > 7200;

The source for this was a stream that's a result of a stream-stream join and some filtering.

Expected behavior

We can issue a pull query.

Actual behaviour

See above

Additional context

The problem occurs on ccloud master at time of writing (which I believe is 0.15)

vvcephei commented 3 years ago

This seems like it's probably related to https://github.com/confluentinc/ksql/issues/7211 . It seems like what must be happening is that when we create a serde for the pull query, we create it differently than the one we made in the persistent query.

I'll try to repro when I get the chance.

vvcephei commented 3 years ago

Ok, I did manage a repro with a pretty simple table. Here's the repro:

docker-compose.yml

---
version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:${CP_VERSION}
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
  broker:
    image: confluentinc/cp-enterprise-kafka:${CP_VERSION}
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1 
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
  schema-registry:
    image: confluentinc/cp-schema-registry:${CP_VERSION}
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
  ksqldb-server:
    #image: confluentinc/cp-ksqldb-server:${CP_VERSION}
    image: confluentinc/ksqldb-server:${KSQLDB_VERSION}
    hostname: ksqldb-server
    container_name: ksqldb-server
    depends_on:
      - broker
    ports:
      - "8088:8088"
    environment:
      KSQL_LISTENERS: http://0.0.0.0:8088
      KSQL_BOOTSTRAP_SERVERS: broker:9092
      KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
      KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
      KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"

.env

CP_VERSION=6.0.0
KSQLDB_VERSION=0.15.

then:

$ docker-compose up

then:

$ docker-compose exec ksqldb-server ksql http://ksqldb-server:8088
ksql> CREATE STREAM TEST_B (ID1 BIGINT KEY, ID2 BIGINT KEY, IGNORED STRING) WITH (kafka_topic='test_topic', partitions=1, format='avro');

 Message        
----------------
 Stream created 
----------------
ksql> CREATE TABLE S2_B as SELECT ID1, ID2, WindowStart as wstart, WindowEnd as wend, COUNT(1) AS Count FROM TEST_B WINDOW SESSION (30 SECONDS) group by id1, id2;

 Message                            
------------------------------------
 Created query with ID CTAS_S2_B_11 
------------------------------------
ksql> insert into test_b (id1, id2, ignored) values (1, 2, 'a');
ksql> select * from s2_b where id1 = 1 and id2 = 2;
+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|ID1                |ID2                |WINDOWSTART        |WINDOWEND          |WSTART             |WEND               |COUNT              |
+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+-------------------+
|1                  |2                  |1617055488830      |1617055488830      |1617055488830      |1617055488830      |1                  |
Query terminated
ksql> 
Exiting ksqlDB.

Looking at schema registry:

[john@arcturus repro-7174-2]$ curl http://localhost:8081/subjects/S2_B-key/versions/1 | jq .
{
  "subject": "S2_B-key",
  "version": 1,
  "id": 7,
  "schema": "{\"type\":\"record\",\"name\":\"S2BKey\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"ID1\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"ID2\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.confluent.ksql.avro_schemas.S2BKey\"}"
}
[john@arcturus repro-7174-2]$ curl http://localhost:8081/subjects/S2_B-key/versions/2 | jq .
{
  "subject": "S2_B-key",
  "version": 2,
  "id": 11,
  "schema": "{\"type\":\"record\",\"name\":\"S2BKey\",\"namespace\":\"io.confluent.ksql.avro_schemas\",\"fields\":[{\"name\":\"ID1\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"ID2\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
}

I think that there should only be one version. It looks like the second version is because the pull query actually generated a new schema, which is textually different because of \"connect.name\":\"io.confluent.ksql.avro_schemas.S2BKey\"}.

mikebin commented 3 years ago

@vvcephei - is the pull query causing the new schema version to be created in the your example, or the INSERT VALUES which precedes it?

vvcephei commented 3 years ago

Hey @mikebin, sorry, I missed your question. I'm actually not sure. When someone picks up this ticket, it would be a good start to re-play my repro and check SR after every command to see where exactly it gets created.

mikebin commented 3 years ago

Another simple repro, using ksqlDB 6.2.0:

create table source (id1 int primary key, id2 string primary key, val string) with (kafka_topic='source', format='avro', partitions=1);

insert into source (id1, id2, val) values (1, 'a', 'val1');

create table materialized_table as select * from source;

select * from materialized_table where id1 = 1 and id2 = 'a';

Error:

Error serializing message to topic: source. Failed to access Avro data from topic source : Schema being registered is incompatible with an earlier schema for subject "source-key"; error code: 409

Works fine if only querying by a single key field above:

select * from materialized_table where id1 = 1;

+--------------------+--------------------+--------------------+
|ID1                 |ID2                 |VAL                 |
+--------------------+--------------------+--------------------+
|1                   |a                   |val1                |
agavra commented 3 years ago

Raising the priority of this one to P0 since multiple users have complained about it. We should pick it up for the next release

mikebin commented 3 years ago

Setting schema compatibility to NONE on the source topic provides a workaround. For some reason, ksqlDB attempts to register a new key schema version on the source topic with the materialized table's key record name:

image

cprasad1 commented 3 years ago

This happens because with multi-column AVRO keys, we query the key metadata in kafka streams here https://github.com/confluentinc/ksql/blob/66b311d7044b67fb473d6a9d054a9acab41327bf/ksqldb-streams/src/main/java/io/confluent/ksql/execution/streams/materialization/ks/KsLocator.java#L145 which uses the connect serdes to serialise the keys. This, in turn tries to register the schema of the key in schema registry because we have a default config ksql.schema.registry.auto.register.schemas set to true. We hit an issue here because we try to register the schema associated with the keySerializer that we get from the materialization provider (MaterializedTableKey which is the keySerializer for the table sink) against the source topic for the state store (SourceKey in the screenshot above). We should never be trying to register any schemas for pull queries, but sidestepping it would require not relying on the connect serdes or unwrapping the state store and using reflection to get the appropriate serdes.

hli21 commented 2 years ago

When creating a stream or table, we add an entry into the command topic, which contains the key/value formats and schemas of the stream or table and steps (or a plan) to create the stream or table. During ksqlDB server startup, we re-populate the QueryRegistry and PersistentQueryMetadata for each persistent query from the command topic and put it in memory. The QueryRegistry and PersistentQueryMetadata contains schemas, serializers and topology information of the persistent queries. Those information in QueryRegistry and PersistentQueryMetadata will be utilized by subsequent pull queries.

The problems found:

  1. Given a lookup query, kafka stream tries to find the partitions for the key with the schema registration entry of the corresponding persistent query. It finds the schema registry with two inputs: subject and schema. The kafka stream partitioner gets the subject from the state store (source), and gets the schema from the serializer of the sink. The state store (source) also has its own schema/serializer and it may not match the ones of the sink. If the auto.register.schemas config is on (by default), it may fail with the 409 error due to the mismatch of subject and schema. If the auto.register.schemas config is off, it will try to retrieve the subject and schema from schema registry, but it still fails because of the mismatch.

  2. The "create stream" and "create table" cases have different behaviors.

For a "create stream" example as below:

create stream data (col1 int key, col2 string key, col3 string) with (kafka_topic='data', format='avro', partitions=1);
create table materialized_data as select col1, col2, count(1) as cnt from data group by col1, col2;  
select * from materialized_data where col1 = 10 and col2 = 'b';

Kafka stream uses a subject key created from changelog topic to find the schema from schema registry: _confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-GroupBy-repartition-key.

For a "create table" example as below:

create table source (id1 int primary key, id2 string primary key, val string) with (kafka_topic='source', format='avro', partitions=1);
create table materialized_table as select * from source;
select * from materialized_table where id1 = 1 and id2 = 'a'; 

Kafka stream uses a subject key created from source topic to find the schema from schema registry. The key is source-key. Kafka stream created a change log topic for the query:_confluent-ksql-default_query_CTAS_MATERIALIZED_TABLE_9-KsqlTopic-Reduce-changelog. But the key and schema of this change log topic is empty.

  1. For "create stream", the order of SQLs leads to different subject/schema, which causes problems to the subsequent pull queries. For the following SQLs,
    create stream data (col1 int key, col2 string key, col3 string) with (kafka_topic='data', format='avro', partitions=1);
    create table materialized_data as select col1, col2, count(1) as cnt from data group by col1, col2;
    insert into data (col1, col2, col3) values (10, 'b', 'c');

curl -X GET http://localhost:8081/subjects/_confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-GroupBy-repartition-key/versions/1

{
"subject": "_confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-GroupBy-repartition-key",
"version": 1,
"id": 23,
"schema": "{"type":"record","name":"DataKey","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"COL1","type":["null","int"],"default":null},{"name":"COL2","type":["null","string"],"default":null}]}"
}

curl -X GET http://localhost:8081/subjects/_confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-Aggregate-Materialize-changelog-key/versions/1

{
"subject": "_confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-Aggregate-Materialize-changelog-key",
"version": 1,
"id": 23,
"schema": "{"type":"record","name":"DataKey","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"COL1","type":["null","int"],"default":null},{"name":"COL2","type":["null","string"],"default":null}]}"
}

For the following SQLs, create stream data (col1 int key, col2 string key, col3 string) with (kafka_topic='data', format='avro', partitions=1); insert into data (col1, col2, col3) values (10, 'b', 'c'); create table materialized_data as select col1, col2, count(1) as cnt from data group by col1, col2;

No schema is created for the two change log topics: _confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-GroupBy-repartition, _confluent-ksql-default_query_CTAS_MATERIALIZED_DATA_21-Aggregate-Aggregate-Materialize-changelog.