scylladb / kafka-connect-scylladb

Kafka Connect Scylladb Sink
Apache License 2.0
42 stars 22 forks source link

Not quoted column names in `CREATE TABLE` query #53

Closed avelanarius closed 2 years ago

avelanarius commented 3 years ago

Scylla Sink Connector provides the functionality to automatically create Scylla table based on Kafka message schema.

However, the generated CREATE TABLE query does not correctly quote column names (for example if you have a column name one two three with spaces you must write it in quotes "one two three" - quoteIfNecessary method in Java Driver).

Reproducer:

  1. Scylla (cqlsh): CREATE KEYSPACE ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
  2. In Kafka create a mytable topic.
  3. Start the connector with the following configuration:
    {
      "key.converter.schemas.enable": "true",
      "value.converter.schemas.enable": "true",
      "name": "mytableConnector",
      "connector.class": "io.connect.scylladb.ScyllaDbSinkConnector",
      "key.converter": "org.apache.kafka.connect.json.JsonConverter",
      "value.converter": "org.apache.kafka.connect.json.JsonConverter",
      "topics": [
        "mytable"
      ],
      "scylladb.contact.points": "127.0.0.2",
      "scylladb.port": "9042",
      "scylladb.keyspace": "ks"
    }
  4. Write the message to Kafka using console producer (columns that show the issue: primary key and ck!@#$%^&*()):
    $ bin/kafka-console-producer --topic mytable --bootstrap-server localhost:9092 --property parse.key=true --property "key.separator=?"
    {"schema": {"type": "struct", "fields": [{"type": "int64", "optional": false, "field": "primary key"}, {"type": "int64", "optional": false, "field": "ck!@#$%^&*()"} ], "optional": false, "name": "mytablePK"}, "payload": {"primary key": 5, "ck!@#$%^&*()": 10}}?{"schema": {"type": "struct", "fields": [{"type": "int64", "optional": false, "field": "primary key"}, {"type": "int64", "optional": false, "field": "ck!@#$%^&*()"}, {"type": "int64", "optional": false, "field": "v"} ], "optional": false, "name": "mytableVAL"}, "payload": {"primary key": 5, "ck!@#$%^&*()": 10, "v": 15}} 
  5. The connector fails. In the logs (/tmp/confluent.XXXXXX/connect/logs/connect.log in case of Confluent) you can see:
    CREATE TABLE ks.mytable(
               primary key bigint,
               ck!@#$%^&*() bigint,
               v bigint,
               PRIMARY KEY((primary key, ck!@#$%^&*())))

    The correct query should be:

    CREATE TABLE ks.mytable (
              "primary key" bigint, 
              "ck!@#$%^&*()" bigint, 
              v bigint, 
              PRIMARY KEY("primary key", "ck!@#$%^&*()"));