Open philipschm1tt opened 3 years ago
Hello @maeh2k the fact that INSERT INTO ... VALUES
is failing is expected (as outlined in the other issue you linked #5801) but that you cannot perform a JOIN is unexpected. Can you give us the information regarding that specific error (with the JOIN)?
I attempted to reproduce this locally and could not:
$ cat test.avsc
{
"schema": "{\r\n \"namespace\": \"com.philip.test\",\r\n \"type\": \"record\",\r\n \"name\": \"TestSchema\",\r\n \"doc\": \"Test schema to reproduce bug with minimal schema.\",\r\n \"fields\": [\r\n {\r\n \"name\": \"someAttribute\",\r\n \"type\": \"string\",\r\n \"doc\": \"Some non-nullable attribute in a schema with FULL_TRANSITIVE compatibility.\"\r\n }\r\n ]\r\n}"
}
$ curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data "@test.avsc" http://localhost:8081/subjects/test-value/versions
{"id":1}
$ curl -X PUT -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"compatibility": "FULL_TRANSITIVE"}' http://localhost:8081/config/test-value
{"compatibility":"FULL_TRANSITIVE"}%
Insert some data:
./bin/kafka-avro-console-producer --bootstrap-server localhost:9092 --topic test --property value.schema='{ 24s 10:35:11
"namespace": "com.philip.test",
"type": "record",
"name": "TestSchema",
"doc": "Test schema to reproduce bug with minimal schema.",
"fields": [
{
"name": "someAttribute",
"type": "string",
"doc": "Some non-nullable attribute in a schema with FULL_TRANSITIVE compatibility."
}
]
}' --property key.serializer=org.apache.kafka.common.serialization.StringSerializer --property parse.key=true --property key.separator=,
1,{"someAttribute": "foo"}
Then I spun up ksql:
ksql> create table test (id varchar primary key) with (kafka_topic='test', value_format='avro', partitions=2);
Message
---------------
Table created
---------------
ksql> create stream s (id varchar key, col int) with (kafka_topic='s', value_format='avro', partitions=2);
Message
----------------
Stream created
----------------
ksql> create stream j as select * from s join test on s.id = test.id;
Message
--------------------------------
Created query with ID CSAS_J_9
--------------------------------
ksql> insert into s (id, col) values ('1', 1);
I confirmed that the INSERT VALUES
call does not work:
ksql> insert into test (id, someattribute) values ('1', 'bar');
Failed to insert values into 'TEST'. Could not serialize value: [ 'bar' ]. Error serializing message to topic: test. Failed to serialize Avro data from topic test :
But the join does work as it is confirmed that it has data in it (from the manual insert):
ksql> SET 'auto.offset.reset'='earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> select * from j emit changes;
+-------------------+-------------------+-------------------+-------------------+
|S_ID |S_COL |TEST_ID |TEST_SOMEATTRIBUTE |
+-------------------+-------------------+-------------------+-------------------+
|1 |1 |1 |foo |
Thank you for looking into it, @agavra.
I have not had time to compile a minimum example from scratch for the join error yet. But I can give you the details:
The actual schema where I experience the issue is this (with an anonymized package name):
{
"namespace": "test.philip.avroschemas",
"type": "record",
"name": "CmpCustomerConsent",
"doc": "Customer's consent to handling of cookies and other private data.",
"fields": [
{
"name": "messageId",
"type": "string",
"doc": "Unique message-id. Maxlen(64)."
},
{
"name": "messageTime",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
},
"doc": "Time when message was produced."
},
{
"name": "correlationId",
"type": [
"null",
"string"
],
"default": null,
"doc": "Correlation-id. Maxlen(64)."
},
{
"name": "producer",
"type": [
"null",
"string"
],
"default": null,
"doc": "Application that generated this message. Maxlen(64)."
},
{
"name": "producerVersion",
"type": [
"null",
"string"
],
"default": null,
"doc": "Version of the producer application, e.g. '0.17.22'. Maxlen(32)."
},
{
"name": "producerHost",
"type": [
"null",
"string"
],
"default": null,
"doc": "Host name (DNS name or IP) of the server that produced the message. Maxlen(64)."
},
{
"name": "data",
"type": {
"type": "record",
"name": "CmpCustomerConsentData",
"fields": [
{
"name": "webshopCustomerId",
"type": "string",
"doc": "Webshop customer-id. E.g 12345678-123456. Maxlen(32)."
},
{
"name": "hashedWebshopCustomerUid",
"type": ["null", "string"],
"default": null,
"doc": "Hashed webshop customer-uid (login name) - used to track the customer at Criteo or opt out of tracking. Maxlen(32)."
},
{
"name": "customerContactEmailAddress",
"type": ["null", "string"],
"default": null,
"doc": "Contact email address of the customer. Maxlen(256)."
},
{
"name": "customerCountry",
"type": ["null", "string"],
"default": null,
"doc": "The country of residence of the customer in Alpha-2 code as of ISO 3166-1."
},
{
"name": "cmpConsentId",
"type": "string",
"doc": "usercentrics controllerId - used for cross-device, cross-browser and cross-domain consent sharing. E.g. ef5d0cafe6c19fee51915cba162826ad94f78822bf353572ebdf5844516539d5. Maxlen(64)."
},
{
"name": "consents",
"type": {
"type": "map",
"values": "boolean"
},
"default": {},
"doc": "Consent for different services. By default no consent is given. The key of the map is composed by a unique identifier of the service at usercentrics followed by a human readable name of this service (separated by a space)."
}
]
}
}
]
}
It’s the fifth version of the schema – we have only added attributes so far with FULL_TRANSITIVE compatibility.
Here are the commands I used to join this topic to another table:
CREATE TABLE customer_contact_emails (key_customerid VARCHAR PRIMARY KEY, contactEmailAddress VARCHAR)
WITH (kafka_topic='philip.customerContactMails', value_format='JSON', wrap_single_value=false);
CREATE TABLE customer_consents (key_customerid VARCHAR PRIMARY KEY)
WITH (kafka_topic='philip.cmpCustomerConsents', value_format='avro');
SELECT customer_consents.key_customerid AS key_customerid,
customer_consents.data->webshopCustomerId,
customer_consents.data->hashedWebshopCustomerUid,
customer_contact_emails.contactEmailAddress AS customerContactEmailAddress,
customer_consents.data->consents['Company1'] AS consentToCompany1
FROM customer_consents
LEFT JOIN customer_contact_emails ON customer_consents.key_customerid = customer_contact_emails.key_customerid
EMIT CHANGES LIMIT 100;
This results in an error:
ksqldb-server | [2020-12-07 06:38:03,485] ERROR Exception occurred while writing to connection stream: (io.confluent.ksql.rest.server.resources.streaming.QueryStreamWriter:105)
ksqldb-server | org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_3, processor=KSTREAM-SOURCE-0000000001, topic=philip.cmpCustomerConsents, partition=3, offset=0, stacktrace=org.apache.kafka.common.errors.SerializationException: Error serializing message to topic: philip.cmpCustomerConsents. Failed to serialize Avro data from topic philip.cmpCustomerConsents :
ksqldb-server | Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic philip.cmpCustomerConsents :
ksqldb-server | at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:91)
ksqldb-server | at io.confluent.ksql.serde.connect.KsqlConnectSerializer.serialize(KsqlConnectSerializer.java:53)
ksqldb-server | at io.confluent.ksql.serde.tls.ThreadLocalSerializer.serialize(ThreadLocalSerializer.java:37)
ksqldb-server | at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:250)
ksqldb-server | at io.confluent.ksql.serde.connect.ConnectFormat$ListToStructSerializer.serialize(ConnectFormat.java:213)
ksqldb-server | at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:146)
ksqldb-server | at io.confluent.ksql.serde.GenericRowSerDe$GenericRowSerializer.serialize(GenericRowSerDe.java:122)
ksqldb-server | at io.confluent.ksql.logging.processing.LoggingSerializer.serialize(LoggingSerializer.java:47)
ksqldb-server | at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:104)
ksqldb-server | at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:95)
ksqldb-server | at org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
ksqldb-server | at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
ksqldb-server | at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.lambda$putIfDifferentValues$1(MeteredTimestampedKeyValueStore.java:83)
ksqldb-server | at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:898)
ksqldb-server | at org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore.putIfDifferentValues(MeteredTimestampedKeyValueStore.java:81)
ksqldb-server | at org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:143)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:46)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:157)
ksqldb-server | at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:157)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:237)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:217)
ksqldb-server | at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:169)
ksqldb-server | at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:676)
ksqldb-server | at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:676)
ksqldb-server | at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1084)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:648)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
ksqldb-server | at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
ksqldb-server | Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"KsqlDataSourceSchema","namespace":"io.confluent.ksql.avro_schemas","fields":[{"name":"MESSAGEID","type":["null","string"],"default":null},{"name":"MESSAGETIME","type":["null","long"],"default":null},{"name":"CORRELATIONID","type":["null","string"],"default":null},{"name":"PRODUCER","type":["null","string"],"default":null},{"name":"PRODUCERVERSION","type":["null","string"],"default":null},{"name":"PRODUCERHOST","type":["null","string"],"default":null},{"name":"DATA","type":["null",{"type":"record","name":"KsqlDataSourceSchema_DATA","fields":[{"name":"WEBSHOPCUSTOMERID","type":["null","string"],"default":null},{"name":"HASHEDWEBSHOPCUSTOMERUID","type":["null","string"],"default":null},{"name":"CUSTOMERCONTACTEMAILADDRESS","type":["null","string"],"default":null},{"name":"CUSTOMERCOUNTRY","type":["null","string"],"default":null},{"name":"CMPCONSENTID","type":["null","string"],"default":null},{"name":"CONSENTS","type":["null",{"type":"array","items":{"type":"record","name":"KsqlDataSourceSchema_DATA_CONSENTS","fields":[{"name":"key","type":["null","string"],"default":null},{"name":"value","type":["null","boolean"],"default":null}],"connect.internal.type":"MapEntry"}}],"default":null}]}],"default":null}]}
ksqldb-server | Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "philip.cmpCustomerConsents-value"; error code: 409; error code: 409
...
The Problem I create a table from an existing topic using Avro with FULL_TRANSITIVE compatibility – with full schema inference. Creating the table is successful, but when inserting data or just joining it to another topic, there is an error: "Schema being registered is incompatible with an earlier schema for subject"
Disclaimer This might be a duplicate of #5801 @MichaelDrogalis suggested to open an issue so that this gets another look. The error is the same. The original schema is also converted to a KSQL schema and checked for compatibility. In that issue the problem occurs when creating a stream – as opposed to using a table that was already created. But the issue mentions that "FULL_TRANSITIVE makes it almost certain that ksql registered schemas will not be compatible with previous schemas because the fields are nullable."
To Reproduce I’m using ksqlDB 0.13 in docker against Confluent Cloud incl. the Confluent Cloud Schema Registry.
Results in:
Additional context Originally, I did not even want to insert data into the table. I wanted to join the table with another table into a new topic. So I was surprised when it looked like ksqlDB was trying to register a new schema on my input topic.
My workaround was to create a STREAM from the topic instead, and then create a new table (new topic) from that stream.
Our schemas use FULL_TRANSITIVE compatibility by default. If this issue occurs with any of our topics, we do not want to work around the issue by copying the same data to a new topic every time.