apache / iceberg

Apache Iceberg
https://iceberg.apache.org/
Apache License 2.0
6.39k stars 2.21k forks source link

Adding support for time-based partitioning on long column type #417

Closed shardulm94 closed 8 months ago

shardulm94 commented 5 years ago

We have datasets which store milliseconds since epoch as a long column type. This column is currently being used for time-based partitioning. However Iceberg only supports time-based partitioning on timestamp and date column types with microsecond precision. Also, the transforms are all internal to Iceberg and not something that can be provided externally. How can we go about adding support for such a case in our environment? Do you think this is something that can be added to Iceberg itself?

shardulm94 commented 5 years ago

@rdblue Any thoughts here? I think the issue is that for long types, we cannot assume the granularity of the source data. This makes it harder to fit the long-based time partitioning into the current model where the source data granularity does not really matter.

rdblue commented 5 years ago

@shardulm94, do you intend to use this for tables with existing data?

If you do intend to use this with existing tables, then I'm not sure that you want to use the time-based hidden partitioning transforms. The problem is that changing the partitioning for existing tables that use identity partitioning is that your queries may start fail because you're no longer producing the old partition columns (e.g., ts_date=cast(cast(ts as date) as string). And if you are producing the old partition columns, then there's not much of a point to add extra time-based partitioning (splits will also be pruned using time ranges from min/max metadata).

If you don't intend to use existing data, then do normal timestamps work?

I guess there's another case, where you want to rebuild the table metadata, but use old data files. In that case, is there anything to distinguish the data in these columns from timestamps with a different format, like long values that store microseconds from epoch?

The problem is correctness when other people start using this. If Iceberg supports interpreting a long column as an instant, then it must be obvious what the unit of the long type is. Maybe we could allow this if the column name includes some clue, like timestamp_millis vs timestamp_micros, but that sounds hacky to me.

Another solution is to add a way to promote from long to timestamp type and store the units of the long in metadata somewhere. Then you would be able to use old data as real timestamp columns.

shardulm94 commented 5 years ago

@rdblue Let's consider new tables first Most of the source data that we ingest has an event time which is millisSinceUTCEpoch of long type. It is not feasible to change it at the source. One option here I can think of is to transform the source event time column to a datatype that Iceberg supports partitioning on in a layer above Iceberg and then provide it to Iceberg via a new column. I would like to avoid adding another column to the data though, but as a last resort it may be fine.

The idea of deriving the unit from column name seems hacky, agreed. The ability to promote long types to timestamp based on some metadata (unit, maybe tz too?) sounds good. However, I am not sure how much we can generalize this to other data types. Will that become an overfit?

With respect to tables with existing data, the issue is that the original data files don't have the partition value as a column. So I was under the assumption that identity partitioning cannot be applied here since it depends on column data to be present in the file. However looking at Iceberg reader code, it seems like for identity partitioning, the column value is derived from metadata. Will that assumption hold true generally? If yes, that probably solves the issue for existing tables. I couldn't find anything in the spec that mentions this though.

rdsr commented 5 years ago

Seems like for existing tables which rely on identity partitioning it may be best to not change it. Both Spark and Pig seems to be reading partition value from the DataFile instead of from records, so it should work for both existing data written with Hive - which does not add the partition data in the file and new data/partitions written with Iceberg which does add partition data in the file.

rdblue commented 5 years ago

Iceberg stores identity partition data in both data files and metadata files because the values are the same. Engines are free to use either one, but for backward compatibility with existing data, Spark and other readers will use the metadata values when reading.

dannyhnu commented 4 years ago

ran into the same problem here, now I had to generate a new field for partitioning: cast(timestamp_Long as timestamp) as timestamp_Timestatmp. for user experience, please reconsider supporting partitioning on long column type

ericsun2 commented 4 years ago

How about providing fine control of timestamp conversion in the year/month/day/hour Transforms as a secondary optional parameter?

.hour(timestampValue) // current syntax .hour(timestampValue, time_zone) // enhanced syntax .hour(longValue) // default to EPOCH_MILLIS .hour(longValue, EPOCH_MILLIS) .hour(longValue, EPOCH) // seconds since epoch, this option should also be popular, the community can make it default .hour(longValue, EPOCH_MINS) // minutes since epoch .hour(longValue, EPOCH_10MINS) // 10-minute units since epoch, this is just for fun here ^_^ .hour(longValue, EPOCH_HOURS) // hours since epoch

The IoT, telemetry and Java-based logging can use the long/int-based epoch offset with various precisions.

rdblue commented 4 years ago

@ericsun2, what do you think about allowing type promotion from long to timestamp instead? Then you'd be able to use the field as a timestamp and be able to partition by it.

ericsun2 commented 4 years ago

"type promotion" is only 50% of the case/problem. For a given numeric value, it can represent various precision and hence different rollup/truncate logic.

If we limit the API to only take TIMESTAMP or EPOCH_MILLIS as input, then "type promotion" will be probably OK (except for time_zone override at partition spec level: do we only honor spark.sql.session.timeZone or user.timezone?). But users will have to do the math to convert epoch second/minute/hour offset value to millisecond first.

Will the extra math conversion impact the predicate evaluation and pruning at query planning time later?

rdblue commented 4 years ago

Type promotion is already supported for some formats, like TIMESTAMP_MILLIS in Parquet. Iceberg will convert those values to microseconds when reading and can interact with them like normal timestamps. For type promotion on a long column, we would need to store metadata somewhere for how to interpret the long value and do the conversion to micros on read. Everything would then work like a normal timestamp.

For your time zone questions, Iceberg doesn't modify values. It is up to processing engines to modify values when it makes sense. Iceberg will store a timestamp in microseconds from epoch, UTC, and return the same value. Conversion from the session time zone or using a specific zone to UTC is always handled by processing engines.

ericsun2 commented 4 years ago

I am not trying to change how Iceberg serialize TIMESTAMP type. I am hoping that Iceberg can maximize the expressiveness and CBO/planner efficiency with clearer and more straightforward partition spec.

For example,

  1. express the epoch offset precision in partition spec without adding an extra derived physical field in the data file
  2. convert value to epoch millisecond during ETL as an extra field, then use this new field in partition spec

--

  1. express the time zone in partition spec without adding an extra derived physical field in the data file
  2. convert value to TIMESTAMP in a particular time zone during ETL as an extra field, then use this new field in partition spec

If we want to make the most of the hidden partition style, I still favor option (1) a bit more :-)

rdblue commented 4 years ago

I don't think I understand why you prefer extending partition specs instead of promoting from a long to a timestamp. To be clear, I'm not suggesting a solution that requires adding a new field set in ETL, or an additional derived field, or changing the way data is stored. I'm suggesting that you can convert an existing long field into a timestamp. New data would be written as a regular timestamp, existing data would be interpreted correctly and would appear to be a timestamp.

Do you want to store data as a long directly and not use it as a timestamp? If so, what is the use case for keeping the data as it is and not using timestamp?

ericsun2 commented 4 years ago

With long to timestamp promotion, we can cover the epoch_millis migration use case. But not epoch_second or time zone.

Do you want to store data as a long directly and not use it as a timestamp?

Thank you so much for this great question. In Li***n, epoch offset millisecond/second (bigint/long) is the standard way to store time. And in the foreseeable future that might still be the case. The reason behind it is really related to Avro.

Before I can push TIMESTAMP data type into any standard data model, neither can I create a new field with TIMESTAMP as type, nor replace the bigint epoch_millis with TIMESTAMP.

I understand that this choice probably does not make much sense to Netflix, because who does not want to use TIMESTAMP natively, huh? But sometimes epoch second/millisecond dies hard, this is why @shardulm94 raised this issue.

rdblue commented 4 years ago

Thanks for the response, @ericsun2! I think it makes sense now.

I think that it would be reasonable in that case to extend the partition spec to handle this rather than promoting a long. But I'll have to think more about whether it's a good idea to do that in Iceberg or not. While I see the utility if you have lots of code that expects this, I'm not sure I'd make it easy to do this for new use cases.

We have something similar, where we partition dates by an integer value, like 20201001. Right now, we have no way to automatically derive these and take advantage of hidden partitioning. I've been considering the idea of custom transform functions to solve this. So we would be able to define and plug in transforms specific to our use cases. That would solve a few problems. What do you think about defining these transforms as custom transforms and plugging them in for your environment?

ericsun2 commented 4 years ago

IMHO, there are always 1.5 solutions:

Some of the popular custom Transform(s) can graduate into built-in category, and even reshape the built-in API over time.

If the long term vision of Iceberg also includes query optimization and file layout (sorting and partitioning) recommendation, then having more built-in Transform(s) can help accelerate that goal, because we can have proper parser to decode the true intention of the predicates and the corresponding granularity. Custom Transformation will probably make more predicates like blackboxes.

Custom Transform is definitely needed (e.g. geo-region rollup hierarchy is a very useful UDT for partitioning structure), but I don't feel that should be the go-to solution. Using time zone as the example, those Chinese internet giants can really be the flagship users and contributors of Iceberg, and all of them need the time zone Transform. I don't really want to see 3 different custom Transform(s) from Alibaba, Tencent, and Bytedance with different names but very similar logic. Yet if the built-in Transform stays over-simplified or timezone-insensitive, then they don't have a choice.

Also custom Transform is more like a flattened approach:

If Iceberg provides all the 4 other Transform(s) as well, then at least we have a better built-in base. These 4 may not be used as frequently as the 1st one, but it offers a better foundation to grow the community (and better template to write more custom Transform if necessary).

The optional argument is more like a nested approach:

We can continue discussing/evaluating the complexity of partition pruning implementation for the flattened approach and the nested approach.

zhongyujiang commented 2 years ago

I've been considering the idea of custom transform functions to solve this. So we would be able to define and plug in transforms specific to our use cases.

Is there any progress on this issue? @rdblue @ericsun2 We ran into the same problem as this one, so we want to know about the community’s plan on this issue.

murilo commented 1 year ago

that's ugly: ALTER TABLE {DATABASE}.{TABLE} ADD PARTITION FIELD truncate(eventTimeMillis,3600000) to partition hourly

ADD PARTITION FIELD hour_from_milliseconds(eventTimeMilles) would be nicer

github-actions[bot] commented 9 months ago

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] commented 8 months ago

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'