confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
21 stars 960 forks source link

jdbc sink to mysql using upsert becomes very slow #471

Open artiship opened 6 years ago

artiship commented 6 years ago

Hi, my jdbc sink connector write data into mysql by upsert mode, when the table becomes large the inserts become very slow and will make the sink task fail with timeout exception.

Is there any solution to this issue?

org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.BatchUpdateException: Lock wait timeout exceeded; try restarting transaction
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:78)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
    ... 10 more
Caused by: java.sql.BatchUpdateException: Lock wait timeout exceeded; try restarting transaction
    at sun.reflect.GeneratedConstructorAccessor161.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
    at com.mysql.jdbc.Util.getInstance(Util.java:408)
    at com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1162)
    at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1773)
    at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1257)
    at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:958)
    at io.confluent.connect.jdbc.sink.BufferedRecords.flush(BufferedRecords.java:120)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:69)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:69)
    ... 11 more
Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Lock wait timeout exceeded; try restarting transaction
    at sun.reflect.GeneratedConstructorAccessor160.newInstance(Unknown Source)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
    at com.mysql.jdbc.Util.getInstance(Util.java:408)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:951)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3970)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3906)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2524)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2677)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)
    at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2073)
    at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1751)
    ... 16 more
{
    "name": "target_table-mysql_sink",
    "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "table.name.format": "target_table",
    "auto.evolve": "true",
    "topics": "my_topic_name",
    "connection.url": "jdbc:mysql://target-host:3306/target?user=***&password=***&useUnicode=true&characterEncoding=utf8",
    "insert.mode": "upsert",
    "pk.mode": "record_value",
    "pk.fields": "id",
    "auto.create": "true",
}
insert into `target_table` (
    `id`,
    `nid`,
    `code`,
    `open_account_date`,
    `amount`,
    `status`,
    `create_time`,
    `update_time`,
    `create_user`,
    `update_user`,
    `_d`
) values (
    '0d2f402ace404de39b781733a636ec5f',
    '201808231401015006515587',
    'MANAOWAN',
    17766,
    2900.0000,
    'PROCESSING',
    1535035316000,
    1535035494000,
    'system',
    'system',
    '1'
) 
on duplicate key update 
`nid`=values(`nid`),
`code`=values(`code`),
`open_account_date`=values(`open_account_date`),
`amount`=values(`amount`),
`status`=values(`status`),
`create_time`=values(`create_time`),
`update_time`=values(`update_time`),
`create_user`=values(`create_user`),
`update_user`=values(`update_user`),
`_d`=values(`_d`)
katty0924 commented 3 years ago

hi recently i met the same problem, i also used the upsert mode, the connector can be very slow when the data was large, so how you figured out?thank you