delta-io / delta

An open-source storage framework that enables building a Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs
https://delta.io
Apache License 2.0
7.26k stars 1.63k forks source link

[BUG][Flink] Caused by: java.lang.RuntimeException: Type not supported DATE #2331

Open quangnguyen-icariohealth opened 7 months ago

quangnguyen-icariohealth commented 7 months ago

Bug

Describe the problem

I know it's not actually a bug but I would like to mention my issue here. When I am trying to using Flink DeltaSink to write a DataStream<RowData> of the following RowType, and partitioned by the column effective_date with type DATE, the exception is thrown.

 public static final RowType ROW_TYPE = new RowType(Arrays.asList(
            new RowType.RowField("word", new VarCharType(VarCharType.MAX_LENGTH)),
            new RowType.RowField("frequency", new IntType()),
            new RowType.RowField("effective_date", new DateType())
    ));
Caused by: java.lang.RuntimeException: Type not supported DATE
    at io.delta.flink.sink.internal.DeltaPartitionComputer$DeltaRowDataPartitionComputer.generatePartitionValues(DeltaPartitionComputer.java:103)
    at io.delta.flink.sink.internal.DeltaPartitionComputer$DeltaRowDataPartitionComputer.generatePartitionValues(DeltaPartitionComputer.java:43)
    at io.delta.flink.sink.internal.DeltaBucketAssigner.getBucketId(DeltaBucketAssigner.java:100)
    at io.delta.flink.sink.internal.DeltaBucketAssigner.getBucketId(DeltaBucketAssigner.java:87)
    at io.delta.flink.sink.internal.writer.DeltaWriter.write(DeltaWriter.java:288)

I have checked the source at https://github.com/delta-io/connectors/blob/master/flink/src/main/java/io/delta/flink/sink/internal/DeltaPartitionComputer.java and found out that currenly, flink delta connector only supports 5 Logical Types:

if (staticPartitionSpec.containsKey(partitionKey)) {
    // We want the output partition values to be String's anyways, so no need
    // to parse/cast the staticPartitionSpec value
    partitionValues.put(partitionKey, staticPartitionSpec.get(partitionKey));
} else if (keyType.getTypeRoot() == LogicalTypeRoot.VARCHAR) {
    partitionValues.put(partitionKey, element.getString(keyIndex).toString());
} else if (keyType.getTypeRoot() == LogicalTypeRoot.INTEGER) {
    partitionValues.put(partitionKey, String.valueOf(element.getInt(keyIndex)));
} else if (keyType.getTypeRoot() == LogicalTypeRoot.BIGINT) {
    partitionValues.put(partitionKey, String.valueOf(element.getLong(keyIndex)));
} else if (keyType.getTypeRoot() == LogicalTypeRoot.SMALLINT) {
    partitionValues.put(partitionKey, String.valueOf(element.getShort(keyIndex)));
} else if (keyType.getTypeRoot() == LogicalTypeRoot.TINYINT) {
    partitionValues.put(partitionKey, String.valueOf(element.getByte(keyIndex)));
} else {
    throw new RuntimeException("Type not supported " + keyType.getTypeRoot());
}

What I expected?

Why we do not utilize converters inside this package org.apache.flink.table.data.conversion.* to support more Logical Type, for example, I could use the DateDateConverter and modify the code so that we can convert INT representation of a date to yyyy-mm-dddd representation?

The code below is just for demonstration:

} else if (keyType.getTypeRoot() == LogicalTypeRoot.TINYINT) {
      partitionValues.put(partitionKey, String.valueOf(element.getByte(keyIndex)));
  } else if (keyType.getTypeRoot() == LogicalTypeRoot.DATE) {
      DateDateConverter converter = new DateDateConverter();
      String value = String.valueOf(converter.toExternal(element.getInt(keyIndex)));
      partitionValues.put(partitionKey, value); // now value is `yyyy-mm-dd` string
  } else {
      throw new RuntimeException("Type not supported " + keyType.getTypeRoot());
  }

Environment information

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

pabloFloresHdz commented 3 weeks ago

I like this approach. I've been using this partitionValues.put(partitionKey, LocalDate.ofEpochDay(element.getInt(keyIndex)).toString()); for a couple months without issues, but to me it seems better to use DateDateConverter