NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
773 stars 227 forks source link

[FEA] Support timestamp transitions to and from UTC for single time zones with no repeating rules #6831

Closed revans2 closed 8 months ago

revans2 commented 1 year ago

Is your feature request related to a problem? Please describe. We need a way to support transitions to and from UTC for various time zones. In the short term we should support this for any time zone that only has a hard coded set of transitions. Not extra rules that repeat.

Describe the solution you'd like A lot of this work should be done in spark-rapids-jni, as I think the most efficient way to do this is with a custom kernel. It will also become a necessity in the future when we want to support repeating rules so we can fully support time zones.

The time zone database in java is accessible through the java.time.ZoneId class. Specifically the ZoneRules class that is associated with it. ZoneRules has three different sets of rules that can be used to get transitions to/from UTC. Fixed Offsets, ZoneOffsetTransition and ZoneOffsetTransitionRule. This is intended to cover rules for only the first two. Any rules that include ZoneOffsetTransitionRule instances will be covered by a separate issue.

But we are going to write the code in such a way that hopefully the API will not need to change much when we add in full support for all time zones.

Please note that the API that follows is just a suggestion, not a hard requirement. The general API should be a GpuTimeZoneDB class with mostly static APIs. We are not going to support the time zone database being reloaded dynamically at this time, and we need to make sure that we document this limitation as we are going to cache the information.

This class should provide a few APIs.

class GpuTimeZoneDB {
  /**
   * Start to cache the database. This should be called on startup of an executor. It should start to cache the data on the CPU in 
   * a background thread. It should return immediately and allow the other APIs to be called. Depending on what we want to 
   * do we can have the other APIs block until this is done caching, or we can have private APIs that would let us load and
   * use specific parts of the database. I prefer the former solution at least until we see a performance hit where we are waiting
   * on the database to finish loading.
   */
  public static void cacheDatabase();

  public static ColumnVector convertToUTC(ColumnVector input, ZoneId currentTimeZone);
  public static ColumnVector convertFromUTC(ColumnVector input, ZoneId desiredTimeZone);
}

We probably also want a separate API in another class that would let us check to see if a ZoneId is supported or not without loading the native code. This would let us programmatically enable support for supported timezones and fall back to the CPU for those that are not currently supported.

Internally we will want to dedupe the zone IDs. Unfortunately equality for ZoneId really only comes down to the name of the zone, not the rules associated with it. So we will want to use the ZoneRules to determine if we need to cache something or not.

For cacheDatabase we want to produce two things. A Map<String, Int> that will map the string id in ZoneId to where the table for the zone id is stored. Long term I think it would be good to have a single column for the transitions, which would be a List<Struct<instant: int64, offset:int32>>, and the Int in the previous map would point to the row that holds the transition rules. But we could also cache each time zone separately and just have a lot of really small columns for the instant and the offset. The problem is that long term we need a way to have a kernel that can access all of the time zones at once. i.e. parsing a string to a timestamp. So having them all in a single table would make that a lot better to deal with.

Either way cacheDatabase should keep all of the data on the CPU. Ideally as a HostColumnVector so we can send it to the GPU very quickly. Instant is the time that a transition happens in seconds since the epoch in UTC. offset is the offset in seconds that is applied after the transition for the conversion. I think the offset applies at exactly that instant meaning a greater than or equal operator to know when it applies. This is true except for the first rule. The first line, as far as I can tell, this is the offset to use if the timestamp is before the first official rule. I tried to match the python database by doing the following code.

        instants.append(transition.getInstant().getEpochSecond() - 1);
        offsets.append(transition.getOffsetBefore().getTotalSeconds());
        for (int i = 0; i < transitions.size(); i++) {
          transition = transitions.get(i);
          instants.append(transition.getInstant().getEpochSecond());
          offsets.append(transition.getOffsetAfter().getTotalSeconds());
        }

But I don't think it will work properly for transitions because the first rule. The OffsetBefore is just 1 second before the previous offset, and I think we are going to have to have a special case if this rule is applied.

convertToUTC would need to take the currentTimeZone and figure out if it is a fixed offset or not. If it is, then it would convert the offset to the same time scale (seconds or microseconds, I don't think we support anything else with Spark) and apply it to the input column to convert it back to UTC. This should be an add or subtract operation. We need to have a bunch of tests to verify that this all works properly, and we need to pay special attention to really old dates when testing it. Things before the Julian/Gregorian calendar switch. Hopefully it just works and matches what Spark does.

If it is not a fixed offset, then we need to copy the timezone database to the GPU and call into JNI to do the transition. The transition should be fairly simple. I think it would do a lower bound on the timestamp against the instant + offset, because instant is in UTC and we need to convert it to the given timezone to compare it against the current timestamp. But we would also have to convert the rule to the appropriate type (seconds or microseconds, or possibly others). Once we have the index we care about we can look up the offset that corresponds to it and apply it to the value to produce the answer we want. We might need a special case for the first rule so it includes index 0 as well as index 1. As I described above.

convertFromUTC would be exactly the same as convertToUTC except instead of subtracting the offset (I think) we would add it. And instead of comparing the timestamp to the instant + offset we would compare it directly to the instant.

We need to be vary careful in our testing and try to compare things as closely as possible to what Spark does so we don't hit too many surprises in the future.

NVnavkumar commented 8 months ago

Completed in #9810