apache / hudi

Upserts, Deletes And Incremental Processing on Big Data.
https://hudi.apache.org/
Apache License 2.0
5.46k stars 2.43k forks source link

'read.utc-timezone=false' has no effect on writes #9424

Open JingFengWang opened 1 year ago

JingFengWang commented 1 year ago

Tips before filing an issue hudi 0.14.0 hudi-flink-bundle The COW/MOR table type writes timestamp data, and the time zone for writing data when read.utc-timezone=false is set is still the UTC time zone. AvroToRowDataConverters and RowDataToAvroConverters timestamp time zone conversion is hardcoded to UTC time zone.

Describe the problem you faced

  1. hudi-flink1.13-bundle-0.14.0-rc1 write timestamp does not support configuration time zone type
  2. The read.utc-timezone attribute only takes effect when the data is read

To Reproduce

Steps to reproduce the behavior:

  1. ./bin/sql-client.sh embedded -j hudi-flink1.13-bundle-0.14.0-rc1.jar shell
  2. When setting read.utc-timezone=true, it is normal to write query timestamp data
  3. When setting read.utc-timezone= false to write data, the query time will be -8 hours
    Flink SQL> select LOCALTIMESTAMP as tm, timestamph from hudi_mor_all_datatype_2 where inth=44;
    +----+-------------------------+-------------------------+
    | op |                      tm |              timestamph |
    +----+-------------------------+-------------------------+
    | +I | 2023-08-11 10:36:38.793 | 2023-08-11 03:10:17.267 |
    +----+-------------------------+-------------------------+

Expected behavior

hudi-flink1.13-bundle supports writing timestamps in non-UTC time zones in a configurable way

Environment Description

Related code location

public class AvroToRowDataConverters {
// ...
  private static AvroToRowDataConverter createTimestampConverter(int precision) {
    // ...
    return avroObject -> {
      final Instant instant;
      if (avroObject instanceof Long) {
        instant = Instant.EPOCH.plus((Long) avroObject, chronoUnit);
      } else if (avroObject instanceof Instant) {
        instant = (Instant) avroObject;
      } else {
        JodaConverter jodaConverter = JodaConverter.getConverter();
        if (jodaConverter != null) {
          // joda time has only millisecond precision
          instant = Instant.ofEpochMilli(jodaConverter.convertTimestamp(avroObject));
        } else {
          throw new IllegalArgumentException(
              "Unexpected object type for TIMESTAMP logical type. Received: " + avroObject);
        }
      }
      // TODO:Hardcoded to UTC here
      return TimestampData.fromInstant(instant);
    };
  }
// ...
}

public class RowDataToAvroConverters {
// ...
  public static RowDataToAvroConverter createConverter(LogicalType type) {
  // ...
          case TIMESTAMP_WITHOUT_TIME_ZONE:
        final int precision = DataTypeUtils.precision(type);
        if (precision <= 3) {
          converter =
              new RowDataToAvroConverter() {
                private static final long serialVersionUID = 1L;

                @Override
                public Object convert(Schema schema, Object object) {
                  // TODO:Hardcoded to UTC here
                  return ((TimestampData) object).toInstant().toEpochMilli();
                }
              };
        } else if (precision <= 6) {
          converter =
              new RowDataToAvroConverter() {
                private static final long serialVersionUID = 1L;

                @Override
                public Object convert(Schema schema, Object object) {
                  // TODO:Hardcoded to UTC here
                  Instant instant = ((TimestampData) object).toInstant();
                  return  Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), instant.getNano() / 1000);
                }
              };
        } else {
          throw new UnsupportedOperationException("Unsupported timestamp precision: " + precision);
        }
        break;
  // ...
  }
// ...
}
danny0405 commented 1 year ago

You are right, the write now only supports UTC timezone, cc @SteNicholas Do you think we should support the local timezone write ?

@JingFengWang Did you try to set up the timestamp of table as type TIMESTAMP_LTZ type ?

SteNicholas commented 1 year ago

@JingFengWang, you could use TIMESTAMP_LTZ type to solve the above problem. I have tested that uses TIMESTAMP_LTZ type and worked well. Meanwhile, I think we could support the local timezone write to help users to avoid this problem.

JingFengWang commented 1 year ago

You are right, the write now only supports UTC timezone, cc @SteNicholas Do you think we should support the local timezone write ?

@JingFengWang Did you try to set up the timestamp of table as type TIMESTAMP_LTZ type ?

yes

  1. We use spark for batch computing, because the batch computing capability of flink-1.13.2 is unstable
  2. Spark needs to read and write hive and hudi tables at the same time, spark.sql.session.timeZone is configured as local timezone
  3. Due to the status of spark+hive, 'spark.sql.session.timeZone=UTC' cannot be configured on the platform side
xuzifu666 commented 1 year ago

You are right, the write now only supports UTC timezone, cc @SteNicholas Do you think we should support the local timezone write ?

@JingFengWang Did you try to set up the timestamp of table as type TIMESTAMP_LTZ type ?

flink upsert to hudi with timestamp type would use utc timezone,currently can use udf or expand it like https://github.com/apache/flink/pull/23220, so we can keep hudi timezone consistent with hive @danny0405

JingFengWang commented 1 year ago

@JingFengWang, you could use TIMESTAMP_LTZ type to solve the above problem. I have tested that uses TIMESTAMP_LTZ type and worked well. Meanwhile, I think we could support the local timezone write to help users to avoid this problem.

This solution is not applicable in our usage scenario

  1. We use flink and spark to implement streaming batch computing
  2. We have unified metadata and used spark to build tables.
    
    spark-sql> create table hudi_mor_all_datatype_2 (
         >   booleanh BOOLEAN,
         >   inth INT,
         >   longh LONG,
         >   floath FLOAT,
         >   doubleh DOUBLE,
         >   timestamph TIMESTAMP_LTZ,
         >   stringh STRING,
         >   decimalh DECIMAL(3, 2),
         >   listh ARRAY<INT>,
         >   structh STRUCT<strg STRING, intg INT>,
         >   maph MAP<STRING, INT>
         > ) using hudi
         > tblproperties (
         >   hoodie.metadata.enable = 'false',
         >   hoodie.datasource.hive_sync.enable = 'false',
         >   hoodie.datasource.meta.sync.enable = 'false',
         >   hoodie.datasource.write.hive_style_partitioning = 'false',
         >   hoodie.index.type = 'BUCKET',
         >   hoodie.bucket.index.num.buckets = '61',
         >   hoodie.bucket.index.max.num.buckets = '127',
         >   hoodie.bucket.index.min.num.buckets = '31',
         >   hoodie.bucket.index.merge.threshold = '0.2',
         >   hoodie.bucket.index.split.threshold = '0.2',
         >   hoodie.index.bucket.engine = 'SIMPLE',
         >   hoodie.finalize.write.parallelism = '40',
         >   hoodie.write.buffer.limit.bytes = '419430400',
         >   type = 'mor',
         >   primaryKey = 'inth',
         >   preCombineField = 'longh'
         >  )
         > partitioned by (stringh);
    Error in query: 
    DataType timestamp_ltz is not supported.(line 7, pos 13)

== SQL == create table hudi_mor_all_datatype_2 ( booleanh BOOLEAN, inth INT, longh LONG, floath FLOAT, doubleh DOUBLE, timestamph TIMESTAMP_LTZ, -------------^^^ stringh STRING, decimalh DECIMAL(3, 2), listh ARRAY, structh STRUCT<strg STRING, intg INT>, maph MAP<STRING, INT> ) using hudi tblproperties ( hoodie.metadata.enable = 'false', hoodie.datasource.hive_sync.enable = 'false', hoodie.datasource.meta.sync.enable = 'false', hoodie.datasource.write.hive_style_partitioning = 'false', hoodie.index.type = 'BUCKET', hoodie.bucket.index.num.buckets = '61', hoodie.bucket.index.max.num.buckets = '127', hoodie.bucket.index.min.num.buckets = '31', hoodie.bucket.index.merge.threshold = '0.2', hoodie.bucket.index.split.threshold = '0.2', hoodie.index.bucket.engine = 'SIMPLE', hoodie.finalize.write.parallelism = '40', hoodie.write.buffer.limit.bytes = '419430400', type = 'mor', primaryKey = 'inth', preCombineField = 'longh' ) partitioned by (stringh)

spark-sql>

danny0405 commented 1 year ago

@JingFengWang Does spark assume local datatime by default?

JingFengWang commented 1 year ago

@JingFengWang Does spark assume local datatime by default?

Yes, Spark does not use UTC time zone

danny0405 commented 1 year ago

Fine, we may need to consider a general solution for both Flink read/writer of timestamp.

JingFengWang commented 1 year ago

Fine, we may need to consider a general solution for both Flink read/writer of timestamp.

OK, thanks!