confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
23 stars 958 forks source link

Time type synchronization problem #745

Open qding-developer opened 5 years ago

qding-developer commented 5 years ago

Background: kafka connect 5.3.1 + mysql 5.6.36 + debezium 0.10 + kafka_2.12

Issue: After the value (1000-11-18 02:06:45) was synchronized into kafka as the value (-30582511138000), it was turned into 1000-11-12 02:01:02 in sink stage;

As the same, 1000-01-01 was turned into -354285 in kafka but 0999-12-27 after processed by jdbc sink stage;

And new Object such as java.sql.timestamp(1000-11-18 02:06:45), the millisecond was also turned into 1000-11-12 02:01:02. Could this be the cause?

Another value 1000-11-18 02:06:45 with type as timestamp was normal but error was occurred as follows:

org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: java.sql.BatchUpdateException: Data truncation: Incorrect datetime value: '2019-11-17T18:07:09Z' for column 'timestampstr' at row 1 com.mysql.jdbc.MysqlDataTruncation: Data truncation: Incorrect datetime value: '2019-11-17T18:07:09Z' for column 'timestampstr' at row 1

    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:538)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

Caused by: java.sql.SQLException: java.sql.BatchUpdateException: Data truncation: Incorrect datetime value: '2019-11-17T18:07:09Z' for column 'timestampstr' at row 1

Source params: { "name": "mysql-source-binlog-snapshot_year-jobId", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "snapshot.locking.mode": "none", "database.user": "", "database.server.id": "60761620", "database.history.kafka.topic": "dbhistory.mysql-source-binlog-snapshot_year-jobId", "database.history.kafka.bootstrap.servers": "10.37.253.81:9092,10.37.253.31:9092,10.37.253.80:9092", "database.server.name": "mysql-source-binlog-snapshot_year-jobId", "include.schema.changes": "false", "database.port": "3336", "enable.time.adjuster": "false", "table.whitelist": "connect_source_test.source_all_type_datetime_4", "decimal.handling.mode": "double", "database.serverTimezone": "Asia/Shanghai", "database.hostname": "", "database.password": "", "name": "mysql-source-binlog-snapshot_year-jobId", "database.whitelist": "connect_source_test", "time.precision.mode":"connect" } } Sink params { "name": "mysql-sink-binlog-snapshot_year-taskId", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "table.name.format": "sink_all_type_datetime_4", "transforms.unwrap.delete.handling.mode": "rewrite", "topics": "mysql-source-binlog-snapshot_year-jobId.connect_source_test.source_all_type_datetime_4", "db.timezone": "Asia/Shanghai", "name": "mysql-sink-binlog-snapshot_year-taskId", "transforms": "unwrap", "transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope", "connection.url": "jdbc:MYSQL://10.37.253.19:3338/devds?user=qd&password=N01rf982", "insert.mode": "upsert", "pk.mode": "record_value", "pk.fields": "int" } }

Peng-666 commented 3 years ago

Hi,has this problem been solved? I have the same problem recently