alibaba / Alink

Alink is the Machine Learning algorithm platform based on Flink, developed by the PAI team of Alibaba computing platform.
Apache License 2.0
3.57k stars 799 forks source link

throw java.lang.ClassCastException: java.util.Date cannot be cast to java.sql.Timestamp when use odps catalog #182

Open wysstartgo opened 3 years ago

wysstartgo commented 3 years ago

1. 问题原因分析:

在odps中有一张表中有两列的数据类型为:

 expired_time              DATETIME,
    create_time               DATETIME

上述类型在odps的com.aliyun.odps.commons.proto.ProtobufRecordStreamReader#readField方法中会将datatime类型的字段处理成java.util.Date类型,而在Alink的com.alibaba.alink.common.io.catalog.odps.OdpsTableUtil类中有一段静态代码块,具体如下:

static {
        ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.BOOLEAN, Types.BOOLEAN());
        FLINK_TYPE_TO_ODPS_MAP.put(Types.BOOLEAN(), OdpsType.BOOLEAN);

        ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.BIGINT, Types.LONG());
        FLINK_TYPE_TO_ODPS_MAP.put(Types.LONG(), OdpsType.BIGINT);

        ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DOUBLE, Types.DOUBLE());
        FLINK_TYPE_TO_ODPS_MAP.put(Types.DOUBLE(), OdpsType.DOUBLE);

        ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.STRING, Types.STRING());
        FLINK_TYPE_TO_ODPS_MAP.put(Types.STRING(), OdpsType.STRING);

        ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_TIMESTAMP());
        FLINK_TYPE_TO_ODPS_MAP.put(Types.SQL_TIMESTAMP(), OdpsType.DATETIME);

        ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.TINYINT, Types.BYTE());
        ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.SMALLINT, Types.SHORT());
        ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.INT, Types.INT());
        ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.FLOAT, Types.FLOAT());
    }

在这里会将odpsType为OdpsType.DATETIME类型的转换成flink的Types.SQL_TIMESTAMP()类型。而由于上面datetime类型的已经被转换成java.util.Date,于是便出现了上面的类强转异常。

2. 问题尝试解决方案一:

将ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_TIMESTAMP());修改为ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, BasicTypeInfo.DATE_TYPE_INFO); 运行时发现,报了新的错误:Type is not supported:Date。 报错源码位于org.apache.flink.table.calcite.FlinkTypeFactory#typeInfoToSqlTypeName方法,在里面是找不到java.util.Date的,它支持的time类型主要有java.sql.Date和Timestamp等。 于是这种解决方案宣告失败。

3. 解决方案二:

将ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_TIMESTAMP());修改为ODPS_TYPE_TO_FLINK_MAP.put(OdpsType.DATETIME, Types.SQL_DATE());同时用一个新的ProtobufRecordStreamReader类覆盖原有的代码,并将java.util.Date改为java.sql.Date。

请问还有更好的解决办法吗?

wysstartgo commented 3 years ago

注:用的是当前的master分支的代码,发现使用的是flink old planner,在blink planner中是没有org.apache.flink.table.planner.calcite.FlinkTypeFactory中typeInfoToSqlTypeName方法的。