trinodb / trino

Official repository of Trino, the distributed SQL query engine for big data, formerly known as PrestoSQL (https://trino.io)
https://trino.io
Apache License 2.0
10.43k stars 3k forks source link

Ability to OPTIMIZE a single time-based partition in Iceberg #12362

Closed erichwang closed 2 years ago

erichwang commented 2 years ago

Iceberg connector has the ability to optimize the internal data files for specific partitions with the following command:

ALTER TABLE my_table EXECUTE optimize WHERE partition_key_1 = "blah";

However, if a table is partitioned using the hidden partition column feature (e.g. day(timestamp)), there does not seem to be a way to target a single day for optimization. This is quite problematic as time based partitioning is one of the most common mechanisms for ingested data, and once you have more than 100 days, you start to hit the max open writers problem (which caps at 100 by default) -- meaning you can no longer optimize your table.

findepi commented 2 years ago

I think this should be handled by

ALTER TABLE my_table EXECUTE optimize WHERE CAST(timestamp_col AS date) = DATE '...'

We can push expression-based predicate like this into the Iceberg connector.

Do need to know it applies to a whole partition? This seems the right thing to do, and should be doable. However, how do we deal with a table that underwent schema evolution and some files are not partitioned by day(timestamp_col) at all? We can still require a full optimize, but maybe we can do better.

This is quite problematic as time based partitioning is one of the most common mechanisms for ingested data, and once you have more than 100 days, you start to hit the max open writers problem (which caps at 100 by default)

@erichwang this shouldn't be the case. Iceberg OPTIMIZE repartitions data between the nodes before writing, it shouldn't hit 100 limit that easily.

cc @alexjo2144 @homar @ebyhr @findinpath @losipiuk

erichwang commented 2 years ago

This is quite problematic as time based partitioning is one of the most common mechanisms for ingested data, and once you have more than 100 days, you start to hit the max open writers problem (which caps at 100 by default)

@erichwang this shouldn't be the case. Iceberg OPTIMIZE repartitions data between the nodes before writing, it shouldn't hit 100 limit that easily.

Yes, this is just a simplified example, but I had partitioning underneath the day too. Regardless, the capability to target needs to exist for this to be reasonably used in a scaling system.

alexjo2144 commented 2 years ago

I think this should be handled by ALTER TABLE my_table EXECUTE optimize WHERE CAST(timestamp_col AS date) = DATE '...' We can push expression-based predicate like this into the Iceberg connector.

No, I don't think that works. If I remember correctly the predicate on ALTER TABLE EXECUTE needs to be fully consumed and enforced by the connector. For Iceberg that's only identity partition columns right now

losipiuk commented 2 years ago

I think this should be handled by

ALTER TABLE my_table EXECUTE optimize WHERE CAST(timestamp_col AS date) = DATE '...'

We can push expression-based predicate like this into the Iceberg connector.

Do need to know it applies to a whole partition? This seems the right thing to do, and should be doable. However, how do we deal with a table that underwent schema evolution and some files are not partitioned by day(timestamp_col) at all? We can still require a full optimize, but maybe we can do better.

In case when partitioning scheme changed through the life of the table, we can adreess this problem by changing applyFilter to be aware of the statement context (that the query is ALTER TABLE EXECUTE .. OPTIMIZE. Then it can perform different pushdown rules. I.e. then we can push WHERE some_predicate_on(partition_key_1) as WHERE row is from the file which contains any rows which match some_predicate_on(partition_key_1).

findepi commented 2 years ago

We can push expression-based predicate like this into the Iceberg connector.

No, I don't think that works. If I remember correctly the predicate on ALTER TABLE EXECUTE needs to be fully consumed and enforced by the connector.

@alexjo2144 i am aware. I was referring to a change we can make (leveraging existing SPI), not current state. Sorrry for not being clear.

we can adreess this problem by changing applyFilter to be aware of the statement context (that the query is ALTER TABLE EXECUTE .. OPTIMIZE. Then it can perform different pushdown rules.

@losipiuk feasible, but a dangerous direction IMO.

erichwang commented 2 years ago

Ok, I think I understand the context of this discussion now after going through the code. Is there a standard we were following when we decided to use the ALTER TABLE ... OPTIMIZE WHERE ... syntax? It doesn't seem right for us to overscan the results of a WHERE clause @losipiuk (even if it is just for an in internal table procedure). Assuming that's the case, I can only think of a few more options:

  1. Filter on an internal partition ID field on each row directly - this might be an ok stop gap, but it's pretty annoying to use it this way. But having something is still better than nothing.
  2. Maybe have hidden partition stats appended to each row, and the WHERE clause can filter on the partitions stats (this seems kind of nasty too, but would be semantically correct from a row predicate perspective).
  3. Pass the expression as a hint directly to the table procedure, rather than as a WHERE clause
  4. Use an entirely different syntax than ALTER TABLE ... OPTIMIZE WHERE ....

@findepi and @losipiuk, any opinions on this? This is actually a really important question for us to solve since it represents one of the main blockers to realistically considering Iceberg as a mainstream data warehousing format for Trino.

losipiuk commented 2 years ago

Ok, I think I understand the context of this discussion now after going through the code. Is there a standard we were following when we decided to use the ALTER TABLE ... OPTIMIZE WHERE ... syntax?

No. The syntax came up as a result of a discussion between @martint, @electrum, @findepi, and myself.

It doesn't seem right for us to overscan the results of a WHERE clause @losipiuk (even if it is just for an in internal table procedure).

I get your point. I am concerned about that too. Less of the fact that we are overscanning (technically we overscan for most queries with WHERE clause, e.g if there is no partitioning we scan whole table, even if WHERE is very selective). But the fact that we actually mutate the table's underlying format for the table part which does not match the WHERE expression could be a massive surprise for users.

Assuming that's the case, I can only think of a few more options:

  1. Filter on an internal partition ID field on each row directly - this might be an ok stop gap, but it's pretty annoying to use it this way. But having something is still better than nothing.

Not a big fan of this one. Partition ID feels like sth very internal, and should not be exposed to the user. Is there even such an entity already in Iceberg? Or would it be sth Iceberg-Trino specific?

  1. Maybe have hidden partition stats appended to each row, and the WHERE clause can filter on the partitions stats (this seems kind of nasty too, but would be semantically correct from a row predicate perspective).

Then we would write WHERE clause like: $partitionStats.partColumn.minValue <= val and $partitionStats.partColumn.maxValue >= val ???

  1. Pass the expression as a hint directly to the table procedure, rather than as a WHERE clause
  2. Use an entirely different syntax than ALTER TABLE ... OPTIMIZE WHERE ....

[3] and [4] are kinda similar. Changing syntax is painful due to backward compatibility issues. IIUC when it comes to backward compatibility SQL is what we care about the most.

Another option would be to expose partitioning info as a hidden column. So we may explicitly say in a query that we want to only operate on a subset of table which is partitioned using a given scheme, and ignore the rest. This would work if we support for expression pushdown @findepi suggested above.

@findepi and @losipiuk, any opinions on this? This is actually a really important question for us to solve since it represents one of the main blockers to realistically considering Iceberg as a mainstream data warehousing format for Trino.

alexjo2144 commented 2 years ago

If we improve the IcebergMetadata#applyFilter implementation to account for transforms and historical PartitionSpecs we get the win/win of being able to have better optimize filters, and better read performance.

Right now only filters on columns that have always been identity partition columns are guaranteed to be enforced, but we can do better than that.

erichwang commented 2 years ago

Another option would be to expose partitioning info as a hidden column. So we may explicitly say in a query that we want to only operate on a subset of table which is partitioned using a given scheme, and ignore the rest. This would work if we support for expression pushdown @findepi suggested above.

Yea, that's a pretty good idea too, especially given that repartitioning is hopefully an infrequent operation. So most of the time people won't need to think about it.

I think the only minor downside is that the way users interact with this system forces them to be aware of the exact underlying partitioning scheme (which is true of many of the proposals suggested), which is kind of defeating the purpose of the hidden partitioning feature of Iceberg. That being said, having something working is still better than nothing.

findepi commented 2 years ago

Yea, that's a pretty good idea too, especially given that repartitioning is hopefully an infrequent operation. So most of the time people won't need to think about it.

I agree we can solve the problem for "no partition evolution" first, and require full optimize in case of partitioning changes. We will need to revisit this though.

I think the only minor downside is that the way users interact with this system forces them to be aware of the exact underlying partitioning scheme (which is true of many of the proposals suggested), which is kind of defeating the purpose of the hidden partitioning feature of Iceberg

I think hidden partitioning's main benefit is that you don't need to modify SELECT nor INSERT queries (which can be many different queries). OPTIMIZE is likely a centralized maintenance operation, so it's one query to update. Much smaller a pain.

erichwang commented 2 years ago

We had a quick discussion today and assuming we don't want to rethink the language syntax, this seems like a reasonable path forward:

  1. Add Iceberg support for hidden time projection partition pruning (covering both timezone and non-timezone time types) predicate push down.
  2. Publish partitioning scheme physical definition for partitioning projection functions so that users can determine how to write predicate expressions that align with these partitions. For example: when partitioning on day(event_time_with_time_zone), we would need to declare which time zone the day is being determined from.
  3. Add a new catalog system table that enumerates all partition scheme versions
  4. Update OPTIMIZE to take a partition version ID as an optional argument -- otherwise, the whole usability of OPTIMIZE can break down if the user ever changes the table partitioning. They should at least be able to process different versions separately.
  5. Optional: update UnwrapTimestampToDateCastInComparison to handle timestamp with timezone -- which would be the most common way users will probably want to provide the OPTIMIZE predicate.

cc @losipiuk @alexjo2144 @martint - did I miss anything?

findepi commented 2 years ago

Thanks @erichwang for gathering options and making a proposal.

  • Add Iceberg support for hidden time projection partition pruning (covering both timezone and non-timezone time types) predicate push down.

Do you have example of how user would use it? eg. how this would look in SQL?

Update OPTIMIZE to take a partition version ID as an optional argument

What would it do? Would it limit operation to only files adhering to that partitioning?

Optional: update UnwrapTimestampToDateCastInComparison to handle timestamp with timezone

This is not possible. timestamptz values are ordered by point-in-time portion. Thus, CAST(a_timestsamp_tz AS date) is not monotonic function, and cannot be unwrapped.

However, in Iceberg case we don't have this problem, as we have point-in-time in UTC zone only. (This would be better modeled as an "instant" type: https://github.com/trinodb/trino/issues/2273)

This means that the unwrapping can be done within the connector, but cannot be done in the engine. We should use io.trino.spi.connector.Constraint#expression and handle this on the connector side.

(Other option would be to introduce some type trait for timestamptz values so engine knows they are UTC-only, but this probably would be a big change.)

losipiuk commented 2 years ago

Thanks @erichwang for gathering options and making a proposal.

  • Add Iceberg support for hidden time projection partition pruning (covering both timezone and non-timezone time types) predicate push down.

Do you have example of how user would use it? eg. how this would look in SQL?

I think we were thinking syntax you proposed above:

WHERE CAST(timestamp_col AS date) = DATE '...'

Update OPTIMIZE to take a partition version ID as an optional argument

What would it do? Would it limit operation to only files adhering to that partitioning?

Yeah, exactly.

erichwang commented 2 years ago

Do you have example of how user would use it? eg. how this would look in SQL?

ALTER TABLE my_table EXECUTE OPTIMIZE WHERE CAST(event_time AT TIME ZONE 'UTC' AS DATE) >= DATE '2022-01-01' I don't love it (especially with the time zone having to be declared from the internal details), but it seems the most reasonable without new language.

What would it do? Would it limit operation to only files adhering to that partitioning?

Yes, only to a specific partitioning scheme so that we don't hit a case where if we change the partitioning scheme completely, we aren't forced to optimize all partitions at once.

This is not possible. timestamptz values are ordered by point-in-time portion. Thus, CAST(a_timestsamp_tz AS date) is not monotonic function, and cannot be unwrapped.

Can you explain this a little bit more with an example where this is weird? I believe you, but it would be good to have a concrete example to refresh my memory. Probably missing the conversation on this and timestamp stuff is always tricky.

(This would be better modeled as an "instant" type: https://github.com/trinodb/trino/issues/2273)

Actually in modern Java time, both Instant and OffsetDateTime are used pretty successfully as event times. So it's possible to represent timestamp with time zone using OffsetDateTime, where time zones are only hour offsets (e.g. '-07:00') instead of the political version (e.g. 'America/Los_Angeles'). If we have a way to distinguish offset (which is pretty common) instead of political, we can still do this optimization.

findepi commented 2 years ago

ALTER TABLE my_table EXECUTE OPTIMIZE WHERE CAST(event_time AT TIME ZONE 'UTC' AS DATE) >= DATE '2022-01-01' I don't love it (especially with the time zone having to be declared from the internal details), but it seems the most reasonable without new language.

I don't like to specify time zone either, but it looks redundant/wrong.

In Iceberg, all timestamp_tz values are UTC, so if event_time is a timestamp with time zone, then this holds

CAST(event_time AT TIME ZONE 'UTC' AS DATE) = CAST(event_time AS DATE)

if, however, event_time is a timestamp, then AT TIME ZONE 'UTC' is wrong, as it first coerces the event_time to a timestamp with time zone, adding the session zone, and then moves to UTC, which may result in a different date.

trino> SELECT LOCALTIMESTAMP, LOCALTIMESTAMP AT TIME ZONE 'UTC';
          _col0          |            _col1
-------------------------+-----------------------------
 2022-05-27 10:41:38.919 | 2022-05-27 08:41:38.919 UTC

This is not possible. timestamptz values are ordered by point-in-time portion. Thus, CAST(a_timestsamp_tz AS date) is not monotonic function, and cannot be unwrapped.

Can you explain this a little bit more with an example where this is weird? I believe you, but it would be good to have a concrete example to refresh my memory. Probably missing the conversation on this and timestamp stuff is always tricky.

trino> SELECT x = y, CAST(x AS date) >= d, CAST(y AS date) >= d
    -> FROM (VALUES (TIMESTAMP '2022-05-27 00:03:00 +02:00', TIMESTAMP '2022-05-26 22:03:00 UTC', DATE '2022-05-27')) t(x,y,d);
 _col0 | _col1 | _col2
-------+-------+-------
 true  | true  | false

Actually in modern Java time, both Instant and OffsetDateTime are used pretty successfully as event times. So it's possible to represent timestamp with time zone using OffsetDateTime, where time zones are only hour offsets (e.g. '-07:00') instead of the political version (e.g. 'America/Los_Angeles'). If we have a way to distinguish offset (which is pretty common) instead of political, we can still do this optimization.

OffsetDateTime won't help, see above.

However, the Iceberg connector can still do the optimization on the connector side, since it knows all values are UTC. I.e. it knows it's effectively "an instant", while engine doesn't know that.

erichwang commented 2 years ago

if, however, event_time is a timestamp, then AT TIME ZONE 'UTC' is wrong, as it first coerces the event_time to a timestamp with time zone, adding the session zone, and then moves to UTC, which may result in a different date.

Sorry, I meant this as an example only for data source as timestamp with timezone, which is pretty common place for data. The timestamp form would definitely be different, as you pointed out. I should have stated that assumption first.

@findepi, also doesn't the example you showed with the date casting in the last message work if you cast to the same TZ first? In the example, extracting a date without first normalizing the TZ is definitely going to be different.

Thus, CAST(a_timestsamp_tz AS date) is not monotonic function, and cannot be unwrapped.

What I was saying in reference to this is that the optimization for unwrapping CAST(event_time_with_tz AT TIME ZONE 'UTC' AS DATE) >= 'DATE '2022-01-01' to classic TupleDomain format seems possible -- but as usual with time stuff, I'm not certain.

findepi commented 2 years ago

also doesn't the example you showed with the date casting in the last message work if you cast to the same TZ first? In the example, extracting a date without first normalizing the TZ is definitely going to be different.

i agree. i was answering to "update UnwrapTimestampToDateCastInComparison to handle timestamp with timezone". I didn't understand an additional assumption there is additional AT TIME ZONE, besides the CAST.

Again, in context of Iceberg, it doesn't matter. The connector doesn't have values with different zones, and we already have SPI way of passing CAST(a_timestamp_tz AS date) > c to the connector, so we don't really need engine changes.

Perhaps having engine changes to unwrap CAST((a_timestamp_tz AT TIME ZONE x) AS date) would be a good general improvement. I would recommend filing a separate issue about this.

erichwang commented 2 years ago

@findepi, are you sure CAST(a_timestamp_tz AS date) > c get's passed through the SPI TupleDomain? The UnwrapTimestampToDateCastInComparison code only seems to work on TimestampType:

            if (sourceType instanceof TimestampType && targetType == DATE) {
                return unwrapTimestampToDateCast(session, (TimestampType) sourceType, (DateType) targetType, operator, cast.getExpression(), (long) right).orElse(expression);
            }

It only handles TimestampTypes, and not TimestampWithTimeZoneType, and this seems to match my local testing. Anyways, I can file that as a separate issue too, but it would technically be required for any of this syntax to be useful for Iceberg: https://github.com/trinodb/trino/issues/12729

findepi commented 2 years ago

are you sure CAST(a_timestamp_tz AS date) > c get's passed through the SPI TupleDomain?

i don't recall saying it is. (it should not be.)

I can file that as a separate issue too, but it would technically be required for any of this syntax to be useful for Iceberg: https://github.com/trinodb/trino/issues/12729

that's not a strict requirement. we have other predicate pushdown APIs, that are not limited to TupleDomain.

erichwang commented 2 years ago

that's not a strict requirement. we have other predicate pushdown APIs, that are not limited to TupleDomain.

You are technically correct that this is not a strict requirement, since I believe you are referring to the expression pushdown APIs. But working on the expression trees is much more annoying, especially when the optimizer is soo close.

findepi commented 2 years ago

Optional: update UnwrapTimestampToDateCastInComparison to handle timestamp with timezone

This is not possible. timestamptz values are ordered by point-in-time portion. Thus, CAST(a_timestsamp_tz AS date) is not monotonic function, and cannot be unwrapped.

However, in Iceberg case we don't have this problem, as we have point-in-time in UTC zone only. (This would be better modeled as an "instant" type: #2273)

This means that the unwrapping can be done within the connector, but cannot be done in the engine. We should use io.trino.spi.connector.Constraint#expression and handle this on the connector side.

This is done in https://github.com/trinodb/trino/pull/12918