confluentinc / ksql

The database purpose-built for stream processing applications.
https://ksqldb.io
Other
77 stars 1.04k forks source link

KSQL prints AVRO topic as STRING #1553

Closed rmoff closed 5 years ago

rmoff commented 6 years ago
Robin@asgard02 ~/c/confluent-5.0.0-beta180702222458> ./bin/ksql-datagen quickstart=orders format=avro schemaRegistryUrl=http://localhost:8081

Schema exists:

Robin@asgard02 ~/c/confluent-5.0.0-beta180702222458> curl -s localhost:8081/subjects/orders_kafka_topic_avro-value/versions/latest
{"subject":"orders_kafka_topic_avro-value","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"KSQLDefaultSchemaName\",\"fields\":[{\"name\":\"ordertime\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"orderid\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"itemid\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"orderunits\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"address\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"KSQLDefaultSchemaName_address\",\"fields\":[{\"name\":\"city\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"state\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"zipcode\",\"type\":[\"null\",\"long\"],\"default\":null}]}],\"default\":null}]}"}
ksql> print 'orders_kafka_topic_avro' from beginning;
Format:STRING
7/7/18 6:22:40 PM UTC , 0 , \x00\x00\x00\x00\x01\x02\xFC\xD7\xA8\xE1\xB2W\x02\x00\x02\x10Item_374\x02F\xC1\xEA.w\x5C\x1E@\x02\x02\x0ECity_17\x02\x10State_74\x02\xC6\xC0\x0A
7/7/18 6:22:40 PM UTC , 1 , \x00\x00\x00\x00\x01\x02\xCC\xB3\xE8\xD2\xD2V\x02\x02\x02\x10Item_459\x02\xCC\xC7\x0F\xE7\xA5m\xF5?\x02\x02\x0ECity_87\x02\x10State_97\x02\xC4\xC5\x0A
7/7/18 6:22:40 PM UTC , 2 , \x00\x00\x00\x00\x01\x02\xE4\xBD\xAC\xBF\x8EW\x02\x04\x02\x10Item_764\x02x\xEAnl\xF7\x09\x0D@\x02\x02\x0ECity_38\x02\x10State_37\x02\xB4\x93\x0A
7/7/18 6:22:41 PM UTC , 3 , \x00\x00\x00\x00\x01\x02\xDE\xA5\xC0\xCB\x84X\x02\x06\x02\x10Item_542\x02\x16\xA9\x81\xF4\xC5\x9D\x14@\x02\x02\x0ECity_64\x02\x10State_84\x02\xC8\x96\x06
7/7/18 6:22:41 PM UTC , 4 , \x00\x00\x00\x00\x01\x02\xAA\xCD\xFA\xB3\x8DX\x02\x08\x02\x10Item_837\x02\xFD\xC0\xE1\xD6n\x86\x1D@\x02\x02\x0ECity_62\x02\x10State_84\x02\xD6\x8F\x07

Maybe SR was unreachable, because straight after tried:

ksql> create stream orders with (kafka_topic='orders_kafka_topic_avro', value_format='avro');
 Unable to verify the AVRO schema is compatible with KSQL. Connection refused (Connection refused)

Restart entire stack, re-run datagen—works fine now:

ksql> print 'orders_kafka_topic_avro' from beginning;
Format:AVRO
07/07/18 19:44:58 BST, 0, {"ordertime": 1489341619565, "orderid": 0, "itemid": "Item_705", "orderunits": 6.424277452435259, "address": {"city": "City_17", "state": "State_49", "zipcode": 78468}}
07/07/18 19:44:58 BST, 1, {"ordertime": 1508347188454, "orderid": 1, "itemid": "Item_432", "orderunits": 2.764169052800083, "address": {"city": "City_54", "state": "State_19", "zipcode": 98638}}
07/07/18 19:44:58 BST, 2, {"ordertime": 1513264604593, "orderid": 2, "itemid": "Item_884", "orderunits": 1.5032338216341627, "address": {"city": "City_49", "state": "State_21", "zipcode": 59780}}
07/07/18 19:44:59 BST, 3, {"ordertime": 1496294298548, "orderid": 3, "itemid": "Item_827", "orderunits": 8.711524279478931, "address": {"city": "City_57", "state": "State_45", "zipcode": 51950}}
blueedgenick commented 6 years ago

interesting - i saw the same thing (Format: STRING) for a JSON topic a couple of days ago but forgot to log it

apurvam commented 6 years ago

@rmoff can you share the server logs from the print which returned the string format?

rmoff commented 6 years ago

Unfortunately not, I don't have them anymore. Will be sure to save them if it re-occurs.

rohit-chowdhary commented 5 years ago

I am getting (Format: STRING) for a JSON topic created from a filestreamsource connector. I am reading a logback JSON format log file for processing in kafka. Can i create/specify a JSON or AVRO schema for a FileStreamSource connector for this? How?

This is the output from the topic with the kafka-console-consumer:

{"schema":null,"payload":null} {"schema":{"type":"string","optional":false},"payload":"{\"@timestamp\":\"2018-09-18T15:07:43.455-04:00\",\"@version\":1,\"message\":\"Debug message\",\"logger_name\":\"jsonLogger\",\"thread_name\":\"main\",\"level\":\"DEBUG\",\"level_value\":10000,\"HOSTNAME\":\"RohitCho-mbpr15\"}"}

This is the output from topic in ksql:

ksql> print 'json-log' from beginning; Format:STRING 9/19/18 4:09:42 PM EDT , {"schema":null,"payload":null} , {"schema":{"type":"string","optional":false},"payload":"{\x5C"@timestamp\x5C":\x5C"2018-09-18T15:07:43.455-04:00\x5C",\x5C"@version\x5C":1,\x5C"message\x5C":\x5C"Debug message\x5C",\x5C"logger_name\x5C":\x5C"jsonLogger\x5C",\x5C"thread_name\x5C":\x5C"main\x5C",\x5C"level\x5C":\x5C"DEBUG\x5C",\x5C"level_value\x5C":10000,\x5C"HOSTNAME\x5C":\x5C"RohitCho-mbpr15\x5C"}"}

Here is my connector definition:

{ "name": "json-file-src", "config": { "connector.class": "FileStreamSource", "file": "/Users/rohit.chowdhary/tmp/json-log.json", "topic": "json-log", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter":"org.apache.kafka.connect.json.JsonConverter" } }

rohit-chowdhary commented 5 years ago

Hi, wanted to see if there is any resolution to this issue? Or if it is really an issue? Is there a workaround? Thanks

rdinkel commented 5 years ago

Hey, I also have this issue at work. I should be an AVRO format, but is a string thing. Was not able to fix it! Now I try to get things work with json and not bother with the format and schema stuff. Please help

rmoff commented 5 years ago

@rdinkel can you post details of the version you're running, a sample of the actual message from the topic (using e.g. kafkacat), and what you're seeing in KSQL

rdinkel commented 5 years ago

yes, of course. ASAP I get to work tomorrow morning.

rdinkel commented 5 years ago

Short Info about setup. It is a demo for an evaluation for a new project. 1x postgres, 1x mongodb. We want to join the CDC streams and look up data to enrich it. Currently the postgres holds a table "comments" and mongo the "customers". The following is just enough to get data from postgres to kafka and ksql.

.env

DEBEZIUM_VERSION=0.9.0.Beta1

docker-compose:

services:
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888
    networks:
      app_net:
        ipv4_address: 172.18.18.05
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
      - 9092:9092
    networks:
      app_net:
        ipv4_address: 172.18.18.06
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181
  postgres:
    image: debezium/postgres:11-alpine
    networks:
      app_net:
        ipv4_address: 172.18.18.60
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
  mongodb:
    image: debezium/example-mongodb:0.9
    hostname: mongodb
    ports:
      - 27017:27017
    networks:
      app_net:
        ipv4_address: 172.18.18.65
    environment:
      - MONGODB_USER=debezium
      - MONGODB_PASSWORD=dbz
  connect:
    image: debezium/connect:${DEBEZIUM_VERSION}
    ports:
      - 8083:8083
    networks:
      app_net:
        ipv4_address: 172.18.18.07
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - KEY_CONVERTER=io.confluent.connect.avro.AvroConverter
      - VALUE_CONVERTER=io.confluent.connect.avro.AvroConverter
      - INTERNAL_KEY_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - INTERNAL_VALUE_CONVERTER=org.apache.kafka.connect.json.JsonConverter
      - LOG_LEVEL=ERROR
      - CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
      - CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL=http://schema-registry:8081
  schema-registry:
    image: confluentinc/cp-schema-registry
    ports:
      - 8181:8181
      - 8081:8081
    networks:
      app_net:
        ipv4_address: 172.18.18.08
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081

  ksql-server:
    image: confluentinc/cp-ksql-server:5.0.1
    hostname: ksql-server
    networks:
      app_net:
        ipv4_address: 172.18.18.70
    depends_on:
      - kafka
      - connect
    ports:
      - "8088:8088"
    environment:
      KSQL_CONFIG_DIR: "/etc/ksql"
      KSQL_LOG4J_OPTS: "-Dlog4j.configuration=file:/etc/ksql/log4j-rolling.properties"
      KSQL_BOOTSTRAP_SERVERS: "kafka:9092"
      KSQL_HOST_NAME: ksql-server
      KSQL_APPLICATION_ID: "cp-all-in-one"
      KSQL_LISTENERS: "http://0.0.0.0:8088"
      KSQL_CACHE_MAX_BYTES_BUFFERING: 0
  ksql-cli:
    image: confluentinc/cp-ksql-cli:5.0.1
    networks:
      app_net:
        ipv4_address: 172.18.18.75
    depends_on:
      - kafka
      - connect
      - ksql-server
    entrypoint: /bin/sh
    tty: true

networks:
  app_net:
    driver: bridge
    driver_opts:
      com.docker.network.enable_ipv6: "false"
    ipam:
      config:
        - subnet: 172.18.18.0/24

Postgres Connector:

{
  "name": "comments-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname" : "postgres",
    "database.server.name": "dbserver1",
    "database.whitelist": "comments",

    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",

    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"

  }
}

add the comments table

create table comments
(
  id serial not null,
  comment VARCHAR(255),
  customer_id int default 1
);

create unique index comments_id_uindex
  on comments (id);

alter table comments
  add constraint comments_pk
    primary key (id);

Add Connector config:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @kafka_connectors/postgres_register_comments.json

Start ksql shell

docker-compose exec ksql-cli ksql http://ksql-server:8088

kafkacat command:

docker run --tty --interactive --network demo_app_net --rm confluentinc/cp-kafkacat kafkacat -b kafka:9092 -C -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' -t dbserver1.public.comments

kafkacat output:

Key (6 bytes):  
Value (16 bytes): 
                  asdads
Partition: 0    Offset: 0
--
% Reached end of topic dbserver1.public.comments [0] at offset 1

Key (6 bytes):  
Value (33 bytes): .Hello, This is Patrick!
Partition: 0    Offset: 1
--
% Reached end of topic dbserver1.public.comments [0] at offset 2

Key (6 bytes):  
Value (32 bytes): ,Hello, This is Sponge!
Partition: 0    Offset: 2
--
% Reached end of topic dbserver1.public.comments [0] at offset 3

Key (6 bytes): 
Value (31 bytes):*Hello, This is Doggo!

Partition: 0    Offset: 3
--
% Reached end of topic dbserver1.public.comments [0] at offset 4

Key (6 bytes): 

Value (33 bytes): 
.Hello, This is Sparta!!

Partition: 0    Offset: 4
--
% Reached end of topic dbserver1.public.comments [0] at offset 5

Add Comment:

INSERT INTO comments(comment, customer_id)
VALUES ('Hello, This is Sparta!!', 5);

KSQL

ksql> PRINT 'dbserver1.public.comments' FROM BEGINNING;
Format:STRING
12/18/18 8:42:37 AM UTC ,  , \x00\x00\x00\x00\x02\x02\x02\x0Casdads\x02\x02
12/18/18 8:43:28 AM UTC ,  , \x00\x00\x00\x00\x02\x04\x02.Hello, This is Patrick!\x02\x02
12/18/18 8:45:33 AM UTC ,  , \x00\x00\x00\x00\x02\x06\x02,Hello, This is Sponge!\x02\x06
12/18/18 8:46:04 AM UTC , , \x00\x00\x00\x00\x02\x08\x02*Hello, This is Doggo!\x02\x0A
12/18/18 9:01:08 AM UTC , 
 , \x00\x00\x00\x00\x02\x0A\x02.Hello, This is Sparta!!\x02\x0A

I dont know where the dot symbol comes from, in front of the world Hello. Also I do miss the primary key and customer_id.

rmoff commented 5 years ago

@rdinkel thanks, this is great info. From the looks of things certainly it should just be Avro coming through from Connect into the topic, and thus handled fine by KSQL. Since it's Avro, I would expect kafkacat to show 'funny' characters since it's not plain text.

What's the output from kafka-avro-console-consumer against the topic?

rdinkel commented 5 years ago

recently registered schema value

curl -X GET http://localhost:8081/subjects/dbserver1.public.comments-value/versions/latest

{"subject":"dbserver1.public.comments-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"dbserver1.public.comments\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"comment\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"customer_id\",\"type\":[\"null\",\"int\"],\"default\":null}],\"connect.name\":\"dbserver1.public.comments.Value\"}"}

recently registered schema key

curl -X GET http://localhost:8081/subjects/dbserver1.public.comments-key/versions/latest

{"subject":"dbserver1.public.comments-key","version":1,"id":1,"schema":"{\"type\":\"record\",\"name\":\"Key\",\"namespace\":\"dbserver1.public.comments\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"}],\"connect.name\":\"dbserver1.public.comments.Key\"}"}
docker-compose exec schema-registry /usr/bin/kafka-avro-console-consumer \
>     --bootstrap-server kafka:9092 \
>     --from-beginning \
>     --property print.key=true \
>     --property schema.registry.url=http://schema-registry:8081 \
>     --topic dbserver1.public.comments

[2018-12-18 09:42:43,443] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2018-12-18 09:42:43,757] INFO ConsumerConfig values: 
        auto.commit.interval.ms = 5000
        auto.offset.reset = earliest
        bootstrap.servers = [kafka:9092]
        check.crcs = true
        client.id = 
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = true
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = console-consumer-55265
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
 (org.apache.kafka.clients.consumer.ConsumerConfig)
[2018-12-18 09:42:43,831] INFO Kafka version : 2.0.1-cp1 (org.apache.kafka.common.utils.AppInfoParser)
[2018-12-18 09:42:43,831] INFO Kafka commitId : 815feb8a888d39d9 (org.apache.kafka.common.utils.AppInfoParser)
[2018-12-18 09:42:43,972] INFO Cluster ID: 6neLt4bCRnqCWUimz4O4yg (org.apache.kafka.clients.Metadata)
[2018-12-18 09:42:43,973] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Discovered group coordinator 172.18.18.6:9092 (id: 2147483646 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-18 09:42:43,975] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Revoking previously assigned partitions [] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-12-18 09:42:43,976] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-18 09:42:43,998] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Successfully joined group with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2018-12-18 09:42:44,000] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Setting newly assigned partitions [dbserver1.public.comments-0] (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
[2018-12-18 09:42:44,032] INFO [Consumer clientId=consumer-1, groupId=console-consumer-55265] Resetting offset for partition dbserver1.public.comments-0 to offset 0. (org.apache.kafka.clients.consumer.internals.Fetcher)

{"id":1}        {"id":1,"comment":{"string":"Hello, This is Sparta!!"},"customer_id":{"int":5}}
rmoff commented 5 years ago

@rdinkel interesting. So does this work?

CREATE STREAM COMMENTS WITH (KAFKA_TOPIC='dbserver1.public.comments', VALUE_FORMAT='AVRO');
DESCRIBE COMMENTS;
SET 'auto.offset.reset'='earliest';
SELECT * FROM COMMENTS;
rdinkel commented 5 years ago

it doesnt.

ksql> CREATE STREAM COMMENTS WITH (KAFKA_TOPIC='dbserver1.public.comments', VALUE_FORMAT='AVRO');

Unable to verify if the Avro schema for topic dbserver1.public.comments is compatible with KSQL.
Reason: Connection refused (Connection refused)

Connection is ok, I guess. I can still do this:

ksql> PRINT 'dbserver1.public.comments' FROM BEGINNING;
Format:STRING
12/18/18 10:10:55 AM UTC ,  , \x00\x00\x00\x00\x02\x02\x02,Hello, This is Doggo!!\x02\x0A
rdinkel commented 5 years ago

Probably this issue is connected to https://github.com/confluentinc/ksql/issues/1691 ? But unfortunately a restart doesnt help.

rdinkel commented 5 years ago

@rmoff Can you provide a working example/demo from a fresh setup, please?

rmoff commented 5 years ago

I've reproduced this behaviour, and it occurs when the KSQL Server cannot reach the Schema Registry. I've logged #2293 suggesting that KSQL should warn the user if it fails to deserialise what it has identified as Avro data.

rmoff commented 5 years ago

@rdinkel so to your specific issue; your Schema Registry is not reachable. This can actually be seen in the CREATE STREAM that you tried to run:

Reason: Connection refused (Connection refused)

Looking at your Docker Compose, you've not specified the Schema Registry for KSQL Server, and so it will be defaulting to localhost I think. You can verify this in the KSQL server logs, which in my environment read:

[2018-12-19 21:21:00,817] INFO KafkaAvroDeserializerConfig values:
   schema.registry.url = [http://schema-registry:8081]

See here for an example of the stack with Schema Registry correctly configured.

jeffbeagley commented 5 years ago

@rmoff literally been chasing my tail on this for entirely long, until I read your recommendation to register schema-registry with ksql. Thank you so much, glad I stumbled upon this.

My data was also coming in and displaying it as a string, but soon as I applied the schema.registry.url it displays as avro and as expected.

stauvel commented 5 years ago

@rmoff the link does not work anymore. Please update : https://github.com/confluentinc/demo-scene/blob/e05ca9b85497bde042bf268e1f7604f43c4bc554/cos/docker-compose.yml#L54

Summary, adding this line worked for me :

ksql-server:
  ...
  environment :
  ...
    KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8081
  ...

Thanks

ShahNewazKhan commented 2 years ago

had to add the KSQL suffix twice for KSQL_KSQL_SCHEMA_REGISTRY_URL 🤷🏽