confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
116 stars 1.04k forks source link

Can't create a KSQL stream or table with inferred column values on cp all-in-one docker v 6.0.0 (1de4e42) #6435

Closed erikologic closed 4 years ago

erikologic commented 4 years ago

Describe the bug I am trying to create a KSQLDB Stream from a JSON event, with value columns inferred from the Schema Registry.

If I use JSON as VALUE_FORMAT, an error No columns supplied will be reported and the Stream will not be created. If I use JSON_SR, the Stream will be created, but it will not show any record.

Creating a Stream with a KEY set to the event key and VALUE_FORMAT JSON_SR, will add a new table showing just the event key as record.

Creating a Table face similar issues.

To Reproduce Follow Confluent docs to get cp-all-in-one (docker) working. Control Center reports v 6.0.0 (1de4e42).

Create a topic via the webUI.

Add a JSON value schema e.g.

{
  "$id": "http://example.com/myURI.schema.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "description": "Sample schema to help you get started.",
  "properties": {
    "a": {
      "description": "The integer type is used for integral numbers.",
      "type": "integer"
    },
    "b": {
      "description": "The number type is used for any numeric type, either integers or floating point numbers.",
      "type": "number"
    }
  },
  "title": "value_topic",
  "type": "object"
}

Produce an event into the topic - I'm using KafkaJS 1.14.0

const kafka = new Kafka({
    clientId: 'my-first-kafka-app',
    brokers: ['localhost:9092']
})

const producer = kafka.producer();
...
await producer.connect();
await producer.send({
    topic: "topic",
    messages: [{
        a: 1,
        b: 2
    }]
});
await producer.disconnect();

Confirm the message arrived in the topic on the webUI

Try creating a JSON inferred Stream in KSQLDB:

ksql> CREATE STREAM topic WITH(KAFKA_TOPIC='topic', VALUE_FORMAT='json');
No columns supplied <-- ouch!

Try creating a JSON_SR stream:

ksql> CREATE STREAM topic WITH(KAFKA_TOPIC='topic', VALUE_FORMAT='json');
...
Stream created

Query the created stream:

ksql> SELECT * FROM topic EMIT CHANGES;
| A | B |

I can try to send more event and/or confirm the properties auto.offset.reset = "Earliest"... there will be no record here.

I found out by inspecting the stream:

ksql> DESCRIBE EXTENDED  TOPIC ;
...
"statement": "CREATE STREAM TOPIC (A BIGINT, B DOUBLE) WITH (KAFKA_TOPIC='topic', SCHEMA_ID=1, VALUE_FORMAT='json_sr');"

If I use that statement to create a JSON stream:

ksql> CREATE STREAM TOPIC_JSON (A BIGINT, B DOUBLE) WITH (KAFKA_TOPIC='topic', SCHEMA_ID=1, VALUE_FORMAT='json');

I can query the stream just fine:

ksql> SELECT * FROM TOPIC_JSON EMIT CHANGES;
| A | B |
| 1 | 2 |

Also, checking the logs, I couldn't find any particular entry mentioning parsing error, as reported in the troubleshooting KSQLDB documentation.

Expected behavior Reading the docs https://docs.ksqldb.io/en/latest/concepts/schemas/#schema-inference, I would have expected to not have to declare the value columns at all.

Additional context I'm working on importing a complicated JSON. If I use the statement returned by DESCRIBE EXTENDED I'll hit the 2500+ char limit... :(

Also, I have tried digging out from the doc what's the difference between VALUE_FORMAT JSON and JSON_SR. I'm sorry, but I'm not able to understand it - it would be nice if somebody would clarify it.

agavra commented 4 years ago

Also, I have tried digging out from the doc what's the difference between VALUE_FORMAT JSON and JSON_SR.

I think this probably gets to the root of your confusion, and we can probably improve the documentation around it. JSON is vanilla JSON which does not integrate with schema registry in any way. JSON_SR is a schema-registery specific format that is not byte-for-byte compatible with JSON. Namely, schema registry serializers will prepend the a magic byte (0x00) and then four bytes indicating the schema id (0x00 0x00 0x00 0x01 for example) to anything that uses the JSON_SR format. Furthermore, it expects to read these first 5 bytes for any incoming JSON_SR data.

While these formats at face value seem to be compatible, they are not byte-for-byte compatible.

We have some documentation around it here: https://docs.ksqldb.io/en/latest/developer-guide/serialization/#json but it doesn't seem to cover it very well, I'll work on updating that a little bit today.

cc @dlfaraujo @MichaelDrogalis @JimGalasyn we need to figure out a plan to best communicate these differences.

erikologic commented 4 years ago

Thanks for the clarification @agavra.

This indeed confirms my feelings that there is nothing wrong with KSQL, but this behaviour is due to how I am producing data with KafkaJS and this latter not supporting JSON_SR.

I had the (wrong) naive assumption that registering a JSON Schema on the topic was good enough to then produce an unencoded JSON into the topic and have it been properly inferred by KSQL. In this scenario, I'm able to see a message on the Kafka topic but KSQL will not - that is what confused me, my assumption was that if I can see it in a nice table on the topic view of the Confluent UI, then KSQL should be able to pick it up correctly. The problem lies in that message not having been encoded properly.

A message can be produced by KafkaJS encoded with a registered schema using @kafkajs/confluent-schema-registry:

const registry = new SchemaRegistry({ host: 'http://localhost:8081' })

const kafka = new Kafka({
  brokers: ['localhost:9092'],
  clientId: 'example-consumer',
})

const producer = kafka.producer()

// ...

const outgoingMessage = {
        key: message.key,
        value: await registry.encode(id, message)
      }

await producer.send({
    topic: outgoingTopic,
    messages: [ outgoingMessage ]
})

At this time, this is ok with AVRO, although Protobuf and JSON_SR are unsupported, yet: https://github.com/kafkajs/confluent-schema-registry/issues/79

My solution to overcome this problem is to use the kafka-rest API: https://github.com/kafkajs/confluent-schema-registry/issues/79

var myHeaders = new Headers();
myHeaders.append("Content-Type", "application/vnd.kafka.jsonschema.v2+json");
myHeaders.append("Accept", "application/vnd.kafka.v2+json");

var data = {
  key_schema_id: "6",
  value_schema_id: "4",
  records: [
    {
      key: "aKey",
      value: {
        myField1: 2,
        myField2: 3.0,
        myField3: "4"
      }
    }
  ]
};

var requestOptions = {
  method: 'POST',
  headers: myHeaders,
  body: JSON.stringify(data),
  redirect: 'follow'
};

fetch("http://localhost:8082/topics/test", requestOptions)
  .then(response => response.text())
  .then(result => console.log(result))
  .catch(error => console.log('error', error));

Eventually, I'll try with converting to AVRO - it sounds in any case more useful.

Pretty much sure for you guys this is obvious, I'm leaving my notes here for anybody else banging their heads on this problem...