confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

JDBC Sink fails when key is binary and "pk.mode": "none" #591

Open rmoff opened 5 years ago

rmoff commented 5 years ago

Following on from https://github.com/confluentinc/ksql/issues/2250, there seems to be a problem with the JDBC Sink connector.

The topic's data is written from KSQL, with an Avro value and binary key. For other connectors (e.g. Elasticsearch) it is sufficient to use "key.converter": "org.apache.kafka.connect.storage.StringConverter" and the connector is then happy to either ignore the key's value or use it in the sink as-is.

With the JDBC Sink connector and "pk.mode": "none", the connector aborts with:

org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: PAGEVIEWS_REGIONS
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:125)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:62)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:77)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:126)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)

Full details follow.

Sample kafka message key:

$ kafkacat -b localhost:9092 -C -K: -f '%k' -t PAGEVIEWS_REGIONS -c1|hexdump -C
00000000  46 45 4d 41 4c 45 7c 2b  7c 52 65 67 69 6f 6e 5f  |FEMALE|+|Region_|
00000010  36 00 00 01 68 b8 7c 24  10                       |6...h.|$.|
00000019

Sample kafka message value:

$ kafka-avro-console-consumer \
        --bootstrap-server localhost:9092 \
        --property schema.registry.url=http://localhost:8081 \
        --topic PAGEVIEWS_REGIONS --from-beginning
{"GENDER":{"string":"FEMALE"},"REGIONID":{"string":"Region_3"},"NUMUSERS":{"long":21}}

Streamed to Elasticsearch:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "dev-dsb-errors-es-sink2",
  "config": { 
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
   "tasks.max": "1",
   "topics": "PAGEVIEWS_REGIONS", 
   "connection.url": "http://localhost:9200",
   "type.name":"kafkaconnect",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://localhost:8081",
   "key.ignore": "false",
   "schema.ignore": "true"
   }       
}'

Works fine. The key is taken as a String as used as the doc id. The point is less that the key is handled, as that it doesn't fubar the connector.

$ curl -s -XGET "http://localhost:9200/pageviews_regions/_search"|jq '.hits.hits[]._id'
"FEMALE|+|Region_3\u0000\u0000\u0001h�|$\u0010"
"FEMALE|+|Region_4\u0000\u0000\u0001h��:�"

Now in the JDBC sink:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "dev-dsb-errors-mysql-sink2",
   "config": { 
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
    "topics": "PAGEVIEWS_REGIONS", 
        "connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",
        "auto.create": "true",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://localhost:8081",
        "pk.mode": "none"
   }       
}'

Log:

[2019-02-06 17:23:49,379] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:86)
[2019-02-06 17:23:49,476] INFO JdbcDbWriter Connected (io.confluent.connect.jdbc.sink.JdbcDbWriter:48)
[2019-02-06 17:23:49,516] INFO Checking MySql dialect for existence of table "PAGEVIEWS_REGIONS" (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect:490)
[2019-02-06 17:23:49,595] INFO Using MySql dialect table "PAGEVIEWS_REGIONS" absent (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect:498)
[2019-02-06 17:23:49,597] INFO Creating table with sql: CREATE TABLE `PAGEVIEWS_REGIONS` (
`NUMUSERS` BIGINT NULL,
`GENDER` VARCHAR(256) NULL,
`REGIONID` VARCHAR(256) NULL) (io.confluent.connect.jdbc.sink.DbStructure:91)
[2019-02-06 17:23:49,810] INFO Checking MySql dialect for existence of table "PAGEVIEWS_REGIONS" (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect:490)
[2019-02-06 17:23:49,814] INFO Using MySql dialect table "PAGEVIEWS_REGIONS" present (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect:498)
[2019-02-06 17:23:49,826] INFO Setting metadata for table "PAGEVIEWS_REGIONS" to Table{name='"PAGEVIEWS_REGIONS"', columns=[Column{'GENDER', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'REGIONID', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'NUMUSERS', isPrimaryKey=false, allowsNull=true, sqlType=BIGINT}]} (io.confluent.connect.jdbc.util.TableDefinitions:63)
[2019-02-06 17:23:49,826] INFO Closing BufferedRecords with preparedStatement: null (io.confluent.connect.jdbc.sink.BufferedRecords:184)
[2019-02-06 17:23:49,853] ERROR WorkerSinkTask{id=dev-dsb-errors-mysql-sink2-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask:585)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: PAGEVIEWS_REGIONS
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:125)
        at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:62)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:77)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:126)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Note that the table is created in MySQL, but not populated.

Drop the table, and then switch to "pk.mode": "kafka"

mysql> drop TABLE PAGEVIEWS_REGIONS;
Query OK, 0 rows affected (0.12 sec)
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "dev-dsb-errors-mysql-sink6",
  "config": { 
   "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
   "tasks.max": "1",
   "topics": "PAGEVIEWS_REGIONS", 
   "connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",
   "auto.create": "true",
   "auto.evolve":"true",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://localhost:8081",
   "pk.mode": "kafka"
  }    
}'

This works.

[2019-02-06 16:17:06,142] INFO Using MySql dialect table "PAGEVIEWS_REGIONS" absent (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect:498)
[2019-02-06 16:17:06,164] INFO Creating table with sql: CREATE TABLE `PAGEVIEWS_REGIONS` (
`__connect_topic` VARCHAR(256) NOT NULL,
`__connect_partition` INT NOT NULL,
`__connect_offset` BIGINT NOT NULL,
`NUMUSERS` BIGINT NULL,
`GENDER` VARCHAR(256) NULL,
`REGIONID` VARCHAR(256) NULL,
PRIMARY KEY(`__connect_topic`,`__connect_partition`,`__connect_offset`)) (io.confluent.connect.jdbc.sink.DbStructure:91)
[2019-02-06 16:17:06,468] INFO Checking MySql dialect for existence of table "PAGEVIEWS_REGIONS" (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect:490)
[2019-02-06 16:17:06,472] INFO Using MySql dialect table "PAGEVIEWS_REGIONS" present (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect:498)
[2019-02-06 16:17:06,502] INFO Setting metadata for table "PAGEVIEWS_REGIONS" to Table{name='"PAGEVIEWS_REGIONS"', columns=[Column{'GENDER', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'REGIONID', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'__connect_topic', isPrimaryKey=true, allowsNull=false, sqlType=VARCHAR}, Column{'__connect_offset', isPrimaryKey=true, allowsNull=false, sqlType=BIGINT}, Column{'NUMUSERS', isPrimaryKey=false, allowsNull=true, sqlType=BIGINT}, Column{'__connect_partition', isPrimaryKey=true, allowsNull=false, sqlType=INT}]} (io.confluent.connect.jdbc.util.TableDefinitions:63)
[2019-02-06 16:17:06,511] INFO Closing BufferedRecords with preparedStatement: null (io.confluent.connect.jdbc.sink.BufferedRecords:184)
[2019-02-06 16:17:06,659] INFO Closing BufferedRecords with preparedStatement: com.mysql.cj.jdbc.ClientPreparedStatement: INSERT INTO `PAGEVIEWS_REGIONS`(`__connect_topic`,`__connect_partition`,`__connect_offset`,`GENDER`,`REGIONID`,`NUMUSERS`) VALUES('PAGEVIEWS_REGIONS',2,23,'FEMALE','Region_3',68) (io.confluent.connect.jdbc.sink.BufferedRecords:184)

But, we now have a bunch of extraneous columns (the Kafka message coordinates) in our target table:

mysql> select * from pageviews_regions;
+-------------------+---------------------+------------------+----------+--------+----------+
| __connect_topic   | __connect_partition | __connect_offset | NUMUSERS | GENDER | REGIONID |
+-------------------+---------------------+------------------+----------+--------+----------+
| PAGEVIEWS_REGIONS |                   0 |                0 |       51 | FEMALE | Region_6 |
| PAGEVIEWS_REGIONS |                   0 |                1 |      125 | FEMALE | Region_6 |
| PAGEVIEWS_REGIONS |                   0 |                2 |      136 | FEMALE | Region_6 |
| PAGEVIEWS_REGIONS |                   0 |                3 |      118 | FEMALE | Region_6 |
| PAGEVIEWS_REGIONS |                   0 |                4 |      115 | FEMALE | Region_6 |
| PAGEVIEWS_REGIONS |                   0 |                5 |      138 | FEMALE | Region_6 |
k1th commented 5 years ago

Can anyone verify how this relates to the different database dialects?

Apart from the possibility of using bytea on the database level which is automatically created by: "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",

I did a few tests after replacing null characters in case STRING: in maybeBindPrimitive in the GenericDatabaseDialect. The replacement would apply to every String value though – which isn't to bad, as Postgres doesn't like \0 values in text fields anyway. This works fine in Postgres resulting in unreadable but working record_keys:

image

Not sure about others, though.