wgzhao / Addax

Addax is a versatile open-source ETL tool that can seamlessly transfer data between various RDBMS and NoSQL databases, making it an ideal solution for data migration.
https://wgzhao.github.io/Addax/
Apache License 2.0
1.2k stars 305 forks source link

[Bug]: Parquet Decimal类型转换值转换异常 #1123

Closed svea-vip closed 2 months ago

svea-vip commented 2 months ago

What happened?

Reader : Mysql Writer : HDFS Parquet

程序正常运行,但是值转换出错

Mysql Data
12  2024-04-20  2024-04-20 00:00:00 2024-04-20 00:00:00 00:00:00    2024    2323.00000
12  2024-04-20  2024-04-20 21:33:43 2024-04-20 21:33:43 21:33:43    2024    32.43000
43  2024-04-20  2024-04-20 00:00:00 2024-04-20 00:00:00 00:00:00    2024    22.43400
    2024-04-20  2024-04-20 21:33:43 2024-04-20 00:00:00 21:33:43    2024    22.43400
HDFS Parquet 最后多了个dt字段,忽略
+-----------------+------------------+------------------------+------------------------+------------------+------------------+-----------------+------------------+--+
| 12              | 2024-04-20       | 2024-04-20 00:00:00.0  | 2024-04-20 00:00:00.0  | 00:00:00         | 2024             | 232.300000      | 2024-09-11       |
| 12              | 2024-04-20       | 2024-04-20 21:33:43.0  | 2024-04-20 21:33:43.0  | 21:33:43         | 2024             | 3.243000        | 2024-09-11       |
| 43              | 2024-04-20       | 2024-04-20 00:00:00.0  | 2024-04-20 00:00:00.0  | 00:00:00         | 2024             | 2.243400        | 2024-09-11       |
| NULL            | 2024-04-20       | 2024-04-20 21:33:43.0  | 2024-04-20 00:00:00.0  | 21:33:43         | 2024             | 2.243400        | 2024-09-11       |
+-----------------+------------------+------------------------+------------------------+------------------+------------------+-----------------+------------------+--+

mysql table ddl

CREATE TABLE `lgw` (
  `v` varchar(100) DEFAULT NULL,
  `d1` date NOT NULL,
  `d2` datetime NOT NULL,
  `d3` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  `d4` time NOT NULL,
  `d5` year(4) NOT NULL,
  `d` decimal(38,5) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8

writer column 配置

"column":[
{
"name":"v"
"type":"string"
}
{
"name":"d1"
"type":"string"
}
{
"name":"d2"
"type":"string"
}
{
"name":"d3"
"type":"string"
}
{
"name":"d4"
"type":"string"
}
{
"name":"d5"
"type":"int"
}
{
"name":"d"
"type":"decimal(10,6)"
}
]

Version

4.1.7 (Default)

OS Type

Linux (Default)

Java JDK Version

Oracle JDK 1.8.0

Relevant log output

No response

wgzhao commented 2 months ago
{
"name":"d"
"type":"decimal(10,6)"
}

-->

{
"name":"d"
"type":"decimal(38,5)"
}
svea-vip commented 2 months ago

修改了buildRecord 和generateParquetSchema方法中Decimal类型字段的实现

buildRecord

public Group buildRecord(
            Record record, List<Configuration> columns,
            TaskPluginCollector taskPluginCollector, SimpleGroupFactory simpleGroupFactory) {
        Column column;
        Group group = simpleGroupFactory.newGroup();
        for (int i = 0; i < record.getColumnNumber(); i++) {
            column = record.getColumn(i);
            String colName = columns.get(i).getString(Key.NAME);
            String typename = columns.get(i).getString(Key.TYPE).toUpperCase();
            if (null == column || column.getRawData() == null) {
                continue;
            }
            SupportHiveDataType columnType = SupportHiveDataType.valueOf(typename);
            switch (columnType) {
                case INT:
                case INTEGER:
                    group.append(colName, Integer.parseInt(column.getRawData().toString()));
                    break;
                case LONG:
                    group.append(colName, column.asLong());
                    break;
                case FLOAT:
                    group.append(colName, column.asDouble().floatValue());
                    break;
                case DOUBLE:
                    group.append(colName, column.asDouble());
                    break;
                case STRING:
                    group.append(colName, column.asString());
                    break;
                case BOOLEAN:
                    group.append(colName, column.asBoolean());
                    break;
                case DECIMAL:
                    group.append(colName, decimalToBinary(column.asString()));
                    break;
                case TIMESTAMP:
                    SimpleDateFormat sdf = new SimpleDateFormat(Constant.DEFAULT_DATE_FORMAT);
                    try {
                        group.append(colName, tsToBinary(sdf.format(column.asDate())));
                    } catch (ParseException e) {
                        // dirty data
                        taskPluginCollector.collectDirtyRecord(record, e);
                    }
                    break;
                case DATE:
                    group.append(colName, (int) Math.round(column.asLong() * 1.0 / 86400000));
                    break;
                default:
                    logger.warn("convert type[{}] into string", column.getType());
                    group.append(colName, column.asString());
                    break;
            }

        }
        return group;
    }

修改为

Column column;
        Group group = simpleGroupFactory.newGroup();
        for (int i = 0; i < record.getColumnNumber(); i++) {
            column = record.getColumn(i);
            Configuration configuration = columns.get(i);
            String colName = configuration.getString(Key.NAME);
            String typename = configuration.getString(Key.TYPE).toUpperCase();
            if (null == column || column.getRawData() == null) {
                continue;
            }
            SupportHiveDataType columnType = SupportHiveDataType.valueOf(typename);
            switch (columnType) {
                case INT:
                case INTEGER:
                    group.append(colName, Integer.parseInt(column.getRawData().toString()));
                    break;
                case LONG:
                    group.append(colName, column.asLong());
                    break;
                case FLOAT:
                    group.append(colName, column.asDouble().floatValue());
                    break;
                case DOUBLE:
                    group.append(colName, column.asDouble());
                    break;
                case STRING:
                    group.append(colName, column.asString());
                    break;
                case BOOLEAN:
                    group.append(colName, column.asBoolean());
                    break;
                case DECIMAL:
                    BigDecimal decimal = new BigDecimal(column.asString());
                    BigDecimal value = decimalValueValidate(configuration,decimal);
                    group.append(colName, Binary.fromReusedByteArray(value.setScale(columns.get(i).getInt(Key.SCALE), RoundingMode.UP).unscaledValue().toByteArray()));
                    break;
                case TIMESTAMP:
                    SimpleDateFormat sdf = new SimpleDateFormat(Constant.DEFAULT_DATE_FORMAT);
                    try {
                        group.append(colName, tsToBinary(sdf.format(column.asDate())));
                    } catch (ParseException e) {
                        // dirty data
                        taskPluginCollector.collectDirtyRecord(record, e);
                    }
                    break;
                case DATE:
                    group.append(colName, (int) Math.round(column.asLong() * 1.0 / 86400000));
                    break;
                default:
                    logger.warn("convert type[{}] into string", column.getType());
                    group.append(colName, column.asString());
                    break;
            }

        }
        return group;
    }

generateParquetSchema中


case "DECIMAL":
                    // use fixed 16 bytes array
                    int prec = column.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION);
                    int scale = column.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE);
                    t = Types.primitive(PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
                            .length(16)
                            .as(decimalType(scale, prec))
                            .named(fieldName);
                    break;

修改为

case "DECIMAL":
                    int prec = column.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION);
                    int scale = column.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE);
                    t = Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
                            .as(decimalType(scale, prec))
                            .named(fieldName);
                    break;

然后增加了个decimal值校验

/**
     * validate the decimal value
     *
     * @param conf DecColunm
     * @return {@link BigDecimal}
     * @throws RuntimeException when the value is invalid
     */
    private static BigDecimal decimalValueValidate(final Configuration conf, BigDecimal value) {
        int scale = conf.getInt(Key.SCALE, Constant.DEFAULT_DECIMAL_MAX_SCALE);
        int valueScale = value.scale();
        boolean scaleAdjusted = false;
        if (valueScale != scale) {
            try {
                value = value.setScale(scale, RoundingMode.UNNECESSARY);
                scaleAdjusted = true;
            } catch (ArithmeticException ae) {
                throw new RuntimeException("Cannot encode decimal with scale " + valueScale + " as scale " + scale + " without rounding");
            }
        }

        int precision = conf.getInt(Key.PRECISION, Constant.DEFAULT_DECIMAL_MAX_PRECISION);
        int valuePrecision = value.precision();
        if (valuePrecision > precision) {
            if (scaleAdjusted) {
                throw new RuntimeException("Cannot encode decimal with precision " + valuePrecision + " as max precision " + precision + ". This is after safely adjusting scale from " + valueScale + " to required " + scale);
            } else {
                throw new RuntimeException("Cannot encode decimal with precision " + valuePrecision + " as max precision " + precision);
            }
        } else {
            return value;
        }
    }

测试看上去能准确运行,想知道之前用FIXED_LEN_BYTE_ARRAY接收DECIMAL,是BINARY有什么BUG么

svea-vip commented 2 months ago

类型转换可以多测几个,看上去好像只有decimal(38,10)能准确识别,我这里是随便测的,schema用BINARY接收的话,不加校验实际值的decimal长度,会出现如果目标端给的decimal类型精度小于原始值时,写入不报错,但是目标端该字段为空的情况,因此我参考了以前的校验机制增添了校验

wgzhao commented 2 months ago

对于 decimal 类型的处理,每个存储格式对“异常”情况的处理方式都不太一样,我是依据我所在行业对 decimal 的精度和位数异常处理的实践进行处理的。一般来说:

我们在采集数据时,都是指定精度和位数为 (38,10)