confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
20 stars 956 forks source link

MySQL sink connector topic namespace error #292

Open mfe5003 opened 7 years ago

mfe5003 commented 7 years ago

Summary: If there is a period "." in the topic name (like when a namespace is used) the MySQL JDBC connector fails, while the SQLite connector is fine.

I have a namespace data.* with a single topic data.test_stream_0 at the moment. If I use the sqlite3 connector with the following .properties file contents:

name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=data.test_stream_0
connection.url=jdbc:sqlite:test.db
pk.mode=record_value
pk.fields=CLIENT_TIMESTAMP
auto.create=true

The connector creates the test.db file (when I start a python AvroProducer with topic=data.test_stream_0) and the data.test_stream_0 table and I can select the entries in the table as I expect.

I then switch to mysql (v5.7.19) with jdbc connector (v5.1.38) and .properties file contents:

name=test-sink-mysql
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=data.test_stream_0
#table.name.format=`${topic}`  # <- I tried this to manually escape the table name
connection.url=jdbc:mysql://localhost:3306/origin_test
connection.user=test_writer
connection.password=password
pk.mode=record_value
pk.fields=CLIENT_TIMESTAMP
auto.create=true

After starting a producer, the connector creates the table correctly (confirmed with the mysql client), but experiences an error:

[2017-09-26 20:50:12,557] INFO Checking table:data.test_stream_0 exists for product:MySQL schema:null catalog: (io.confluent.connect.jdbc.sink.DbMetadataQueries:47)
[2017-09-26 20:50:12,563] INFO product:MySQL schema:null catalog:origin_test -- table:data.test_stream_0 is absent (io.confluent.connect.jdbc.sink.DbMetadataQueries:51)
[2017-09-26 20:50:12,568] INFO Creating table:data.test_stream_0 with SQL: CREATE TABLE `data.test_stream_0` (
`f1` INT NOT NULL,
`CLIENT_TIMESTAMP` BIGINT NOT NULL,
PRIMARY KEY(`CLIENT_TIMESTAMP`)) (io.confluent.connect.jdbc.sink.DbStructure:88)
[2017-09-26 20:50:12,650] INFO Querying column metadata for product:MySQL schema:null catalog:origin_test table:data.test_stream_0 (io.confluent.connect.jdbc.sink.DbMetadataQueries:64)
[2017-09-26 20:50:12,657] INFO Updating cached metadata -- DbTable{name='data.test_stream_0', columns={}} (io.confluent.connect.jdbc.sink.metadata.TableMetadataLoadingCache:49)
[2017-09-26 20:50:12,658] ERROR Task test-sink-mysql-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{INT32}, name='f1', isPrimaryKey=false}, as it is not optional and does not have a default value
        at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:130)
        at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:72)
        at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:66)
        at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)
        at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I have traced the discrepancy down to the following two log lines, where the SQLite connector is correctly reading in the table columns after it creates the table, and the MySQL is not.

SQLite:

[2017-09-26 20:50:12,433] INFO Updating cached metadata -- DbTable{name='data.test_stream_0', columns={f1=DbTableColumn{name='f1', isPrimaryKey=false, allowsNull=false, sqlType=4}, CLIENT_TIMESTAMP=DbTableColumn{name='CLIENT_TIMESTAMP', isPrimaryKey=false, allowsNull=false, sqlType=4}}} (io.confluent.connect.jdbc.sink.metadata.TableMetadataLoadingCache:49)

MySQL:

[2017-09-26 20:50:12,657] INFO Updating cached metadata -- DbTable{name='data.test_stream_0', columns={}} (io.confluent.connect.jdbc.sink.metadata.TableMetadataLoadingCache:49)

The MySQL connector works correctly if I use a topic that does not have a period in it, so it seems to me to be an issue with escaping the table name, but the issue does not occur if I use another deliminator that also requires escaping in MySQL such as '-'.

Is there a way to replace the period in the topic with an underscore or some other symbol, that would be an appropriate work around?

rhauch commented 7 years ago

Thanks for logging this. You should be able to workaround this issue in the sink connector for the released versions by using an SMT in the sink connector that "corrects" the name of the topic in each sink record before Connect passes them to the connector. You could use the RegexRouter SMT that comes with Connect (see the Kafka documentation). That uses a matching regex and replaces the first occurrence with a specified value. If for some reason that doesn't work or is not sufficient, you could write a custom SMT.

okeyokoro commented 5 years ago

a friendlier example: https://docs.confluent.io/current/connect/transforms/regexrouter.html