DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
3.98k stars 1.69k forks source link

[Bug][chunjun-connection] DateTime type conversion fails in different modes #1217

Open kinoxyz1 opened 2 years ago

kinoxyz1 commented 2 years ago

Search before asking

Description

Description

When I try to run the task in Flink on yarn mode, it throws an exception, but when I run the task in Flink on pre_job mode, it works fine.

case

create table statement

-- MySQL CREATE TABLE(source)
CREATE TABLE `source` (
  `business_date` date NOT NULL,
  `tenant_id` bigint(20) NOT NULL,
  `etl_time` datetime DEFAULT NULL
);

-- MySQL CREATE TABLE(sink)
CREATE TABLE `sink` (
  `business_date` date DEFAULT NULL ,
  `tenant_id` bigint(20) DEFAULT NULL,
  `etl_time` varchar(255) DEFAULT NULL
);

-- data
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-01', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-01', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-01', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-02', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-02', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-02', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-03', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-03', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-03', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-04', 177, '2022-05-24 17:47:36');
INSERT INTO `source` (`business_date`, `tenant_id`, `etl_time`) VALUES ('2020-01-04', 177, '2022-05-24 17:47:36');

chunjun json

{
  "job" : {
    "content" : [ {
      "reader" : {
        "parameter" : {
          "password" : "000000",
          "startLocation" : "",
          "increColumn" : "",
          "column" : [ {
            "format" : "",
            "name" : "business_date",
            "type" : "DATE",
            "key" : "business_date"
          }, {
            "format" : "",
            "name" : "tenant_id",
            "type" : "BIGINT",
            "key" : "tenant_id"
          }, {
            "format" : "",
            "name" : "etl_time",
            "type" : "DATETIME",
            "key" : "etl_time"
          } ],
          "connection" : [ {
            "schema" : "test",
            "jdbcUrl" : [ "jdbc:mysql://localhost:3308/test?characterEncoding=UTF-8&useSSL=false&allowMultiQueries=true" ],
            "type" : 1,
            "table" : [ "source" ]
          } ],
          "username" : "root"
        },
        "name" : "mysqlreader"
      },
      "writer" : {
        "parameter" : {
          "postSql" : [ ],
          "mode" : "insert",
          "password" : "000000",
          "column" : [ {
            "name" : "business_date",
            "format" : "",
            "isPart" : false,
            "type" : "DATE",
            "key" : "business_date"
          }, {
            "name" : "tenant_id",
            "format" : "",
            "isPart" : false,
            "type" : "BIGINT",
            "key" : "tenant_id"
          }, {
            "name" : "etl_time",
            "format" : "",
            "isPart" : false,
            "type" : "VARCHAR",
            "key" : "etl_time"
          } ],
          "connection" : [ {
            "jdbcUrl" : "jdbc:mysql://localhost:3308/test1?characterEncoding=UTF-8&useSSL=false&allowMultiQuerie",
            "table" : [ "sink" ]
          } ],
          "writeMode" : "insert",
          "preSql" : [ ],
          "username" : "root"
        },
        "name" : "mysqlwriter"
      }
    } ],
    "setting" : {
      "restore" : {
        "isRestore" : false,
        "isStream" : false
      },
      "speed" : {
        "readerChannel" : 1,
        "writerChannel" : 1,
        "bytes" : -20971520,
        "channel" : 1
      }
    }
  }
}

exception information

The exception information is as follows:

2022-09-05 16:13:48.175 [Legacy Source Thread - Source: mysqlsourcefactory -> Sink: mysqlsinkfactory (1/1)#0] ERROR com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter  - value [2022-05-24T17:47:36] convent failed 
2022-09-05 16:13:48.225 [dirty-consumer-pool-2-thread-2] WARN  com.dtstack.chunjun.dirty.log.LogDirtyDataCollector  - 
====================Dirty Data=====================
DirtyDataEntry[jobId='ddc99f057f9d827ee95b3a8ca1ad861e', jobName='Flink_Job', operatorName='Source: mysqlsourcefactory', dirtyContent='{"extHeader":[],"byteSize":1,"arity":0,"rowKind":"INSERT","headerInfo":null,"string":"()","headers":null}', errorMessage='com.dtstack.chunjun.throwable.ReadRecordException: 
java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp
    at com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat.nextRecordInternal(JdbcInputFormat.java:309)
    at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:197)
    at com.dtstack.chunjun.source.format.BaseRichInputFormat.nextRecord(BaseRichInputFormat.java:67)
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:133)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:267)
Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp
    at com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter.lambda$createInternalConverter$e77f182$2(JdbcColumnConverter.java:176)
    at com.dtstack.chunjun.converter.AbstractRowConverter.lambda$wrapIntoNullableInternalConverter$66e1293c$1(AbstractRowConverter.java:97)
    at com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter.toInternal(JdbcColumnConverter.java:114)
    at com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter.toInternal(JdbcColumnConverter.java:56)
    at com.dtstack.chunjun.connector.jdbc.source.JdbcInputFormat.nextRecordInternal(JdbcInputFormat.java:287)
    ... 6 more
', fieldName='null', createTime=2022-09-05 16:13:48.179]

===================================================

Code of Conduct

kinoxyz1 commented 2 years ago

At present, after modifying the latest code of the master branch, com.dtstack.chunjun.connector.jdbc.converter.JdbcColumnConverter.java 177 behaves as follows, the session mode is normal, what is the difference between pre_job and session running?

(Timestamp) val, ((TimestampType) (type)).getPrecision());  -> Timestamp.valueOf((LocalDateTime) val), ((TimestampType) (type)).getPrecision());