outrauk / dataiku-plugin-snowflake-hdfs

DSS plugin for fast loading between Snowflake and HDFS
https://github.com/outrauk/dataiku-plugin-snowflake-hdfs
MIT License
0 stars 0 forks source link

sync_snowflake_to_hdfs recipe doesn't handle dates correctly #12

Open mklaber opened 4 years ago

mklaber commented 4 years ago

Snowflake tables with date and timestamp columns (with timezone or not), when synced back to Dataiku via sync_snowflake_to_hdfs, are imported as int and bigint, respectively. This appears to be an issue with how Dataiku reads Parquet files rather than the plugin itself. In particular, it appears DSS expects dates and times will only be stored using the legacy and now deprecated int96 representation. Its deprecation is described in PARQUET-323 and implemented in https://github.com/apache/parquet-format/pull/86. Snowflake's Parquet implementation outputs dates as annotated int32 and timestamps as int64 (as LogicalTypes).

Additional information about dates, times, and Parquet is described in this article.

Snowflake's Parquet schema:

message schema {
  optional int32 PROPERTY_VALUE_DATE (DATE);
  optional int64 PROPERTY_VALUE_TIMESTAMP (TIMESTAMP_MILLIS);
  optional int64 PROPERTY_VALUE_TIMESTAMP_NTZ (TIMESTAMP_MILLIS);
}

DSS's Parquet schema:

message hive_schema {
  optional int96 PROPERTY_VALUE_DATE;
  optional int96 PROPERTY_VALUE_TIMESTAMP;
  optional int96 PROPERTY_VALUE_TIMESTAMP_NTZ;
}

Requested DSS Fix

Even if DSS continues to output int96, at least make sure the importer recognises and supports the more modern int32 and int64 representations.

Update: this is on Dataiku's backlog but not a high priority

Possible Workarounds

Under the current implementation, a user could create a Prepare recipe that, in Dataiku's words:

You can convert the int32 version to a date with a prepare recipe using 86400 * the_int32_column and parsing this as a Unix timestamp. The int64 is likely a Unix timestamp (in milliseconds) too that you can also parse with a prepare recipe.

A potential workaround would be to use PyArrow to read in Snowflake's parquet files and write out to new files using the use_deprecated_int96_timestamps flag on pyarrow.parquet.write_table.

We could also chose to convert any date/time columns to strings and then rely on users to add a Prepare recipe that parses it back to date time.

We could also fallback on CSV rather than Parquet, but that's undesirable for many reasons. (And defeats the whole point of this plugin...)

How to replicate

Prereqs:

You can either use Snowflake and DSS to create example Parquet files, or use the two files in the attached parquet_examples.zip file:

First, create a sample parquet file in Snowflake:

COPY INTO '@PUBLIC.MY_S3_STAGE/dss_bug/part'
FROM (
  SELECT 
    t.example_ts::DATE AS property_value_date, 
    t.example_ts::TIMESTAMP AS property_value_timestamp, 
    t.example_ts::TIMESTAMP_NTZ AS property_value_timestamp_ntz 
  FROM 
    (
      VALUES (GETDATE()), (DATEADD(HOUR,2,getdate())), (DATEADD(DAY,-3,GETDATE())) AS t (example_ts)
    )
)
FILE_FORMAT = (TYPE = PARQUET)
OVERWRITE = TRUE
HEADER = TRUE
;

Copy the file locally and inspect the schema using parquet-tools schema:

$ parquet-tools schema part_0_0_0.snappy.parquet
message schema {
  optional int32 PROPERTY_VALUE_DATE (DATE);
  optional int64 PROPERTY_VALUE_TIMESTAMP (TIMESTAMP_MILLIS);
  optional int64 PROPERTY_VALUE_TIMESTAMP_NTZ (TIMESTAMP_MILLIS);
}

Also, create a "Hadoop HDFS dataset" in Dataiku that points to this file. Note that the schema gets loaded as int for dates and bigint for timestamps: image

NB: we've tried picking various flavours of Parquet (Hive, Pig, and Spark) and manually setting the schema's data type to date with no success.

Now, in Dataiku, create a dummy dataset with the same types. It can be Snowflake, or anything else. Example: image

Add a Sync recipe with "Parquet" as the Format, and run it: image

Copy the file locally and again run parquet-tools schema (this output also appears in the Activity Log of the sync execution):

$ parquet-tools schema part-r-00000.snappy.parquet
message hive_schema {
  optional int96 PROPERTY_VALUE_DATE;
  optional int96 PROPERTY_VALUE_TIMESTAMP;
  optional int96 PROPERTY_VALUE_TIMESTAMP_NTZ;
}

As you can see from the Parquet schema, Dataiku outputs using int96 for date values.

mklaber commented 4 years ago

I've received confirmation from Dataiku that this is a known limitation and there is no timeline to resolve it. Here's what was said:

We confirm that DSS does not try to interpret the new Parquet logical types, so reads the int32 and int64 as int32 and int64. You can convert the int32 version to a date with a prepare recipe using 86400 * the_int32_column and parsing this as a Unix timestamp. The int64 is likely a Unix timestamp (in milliseconds) too that you can also parse with a prepare recipe.

mklaber commented 4 years ago

This is also tracked in Jira as AOBS-524.