tidb-incubator / TiBigData

TiDB connectors for Flink/Hive/Presto
Apache License 2.0
216 stars 56 forks source link

Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff] #23

Open lookhua opened 4 years ago

lookhua commented 4 years ago

public class TidbAlarmClearUp {

public static void main(String[] args) throws Exception {

    //运行job
    String jobName = TidbAlarmClearUp.class.getSimpleName();

    // set up the streaming execution environment
    StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

    bsTableEnv.executeSql("CREATE TABLE flinkTest(\n"
            + " id     bigint,\n"
            + " ps_id  int,\n"
            + " ps_key string,\n"
            + " happen_time timestamp\n"
            + ") WITH (\n"
            + "  'connector' = 'tidb',\n"
            + "  'tidb.database.url' = 'jdbc:tidb://tc-fat-tidb.tidb-fat:4000/health',\n"
            + "  'tidb.driver' = 'com.mysql.jdbc.Driver',\n"
            + "  'tidb.database.name' = 'xxxx',\n"
            + "  'tidb.table.name' = 'flink_test',\n"
            + "  'tidb.maximum.pool.size' = '10',\n"
            + "  'tidb.minimum.idle.size' = '10',"
            + "  'tidb.username' = 'xxxx',\n"
            + "  'tidb.password' = 'xxxxx'\n"
            + ")"
    );

    // execute Table
    TableResult tableResult1 = bsTableEnv.executeSql(
            "INSERT INTO flinkTest(id,ps_id,ps_key,happen_time) " +
                     "SELECT id,ps_id,ps_key,happen_time FROM HealthOnlineAlarm ");
}

}

2020-11-20 09:02:47,044 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: TableSourceScan(table=[[default_catalog, default_database, HealthOnlineAlarm]], fields=[id, ps_id, ps_key, happen_time]) -> SinkConversionToRow -> Sink: Select table sink (1/1) (e28717441b6b8eb1ba8013dc2e6a5527) switched from RUNNING to FAILED. java.lang.IllegalArgumentException: Timestamp format must be yyyy-mm-dd hh:mm:ss[.fffffffff] at java.sql.Timestamp.valueOf(Timestamp.java:204) ~[?:1.8.0_265] at com.zhihu.tibigdata.flink.tidb.TypeUtils.getObjectWithDataType(TypeUtils.java:146) ~[flink-tidb-connector-0.0.1.jar:?] at com.zhihu.tibigdata.flink.tidb.TiDBRowDataInputFormat.nextRecord(TiDBRowDataInputFormat.java:167) ~[flink-tidb-connector-0.0.1.jar:?] at com.zhihu.tibigdata.flink.tidb.TiDBRowDataInputFormat.nextRecord(TiDBRowDataInputFormat.java:53) ~[flink-tidb-connector-0.0.1.jar:?] at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:91) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) ~[flink-dist_2.12-1.11.2.jar:1.11.2] at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213) ~[flink-dist_2.12-1.11.2.jar:1.11.2]

humengyu2012 commented 4 years ago

It seems that the datatype of column happen_time is varchar in tidb, but you want convert it to timestamp. We will add a config which can be used to specify timestamp format, or you could try (happen_time string ).

humengyu2012 commented 4 years ago

Thanks for your issue, now you can specify timestamp format by config timestamp-format.${columnName}, like timestamp-format.happen_time='yyyy-MM-dd HH:mm:ss.SSS'.

lookhua commented 4 years ago

I build this project with flink verion 1.11.2 not 1.11.1 and the datatype of column happen_time is timestamp in tidb

humengyu2012 commented 4 years ago

can you provide your tidb create table sql and some sample data? I want to test your case in flink 1.11.2