JuliaIO / Parquet.jl

Julia implementation of Parquet columnar file format reader
Other
119 stars 32 forks source link

Date type columns are being read as integers. #155

Open mahiki opened 3 years ago

mahiki commented 3 years ago

I can't use Parquet.jl because there is a problem reading Date-typed columns. They are reading into Julia DataFrames as an Int32 -- I'm pretty sure parquet files are supposed to define the schema and data types. When I'm working in spark I don't have to pass explicit schema/type definitions.

Here is a simple example from parquet files written by Spark, probably a very common use-case.

Read a small dataframe into spark

// scala>
val df2 = spark.read.parquet("test_date_column_data")

df2.show
+---+----------+-----+---------+---+------+
| id|  date_col|group|     item| ts|amount|
+---+----------+-----+---------+---+------+
|  1|2020-03-11|    A|BOO00OXXX|  1|   1.1|
|  2|2020-03-11|    B|BOO00OXXY|  2|   2.1|
|  3|2020-03-12|    B|BOO00OXXZ|  3| 17.08|
|  4|2020-03-12|    D|BOO00OXXA|  4|   9.3|
|  5|2020-03-13|    E|BOO00OXXB|  5|  1.99|
|  6|2020-03-13|    A|BOO00OXXY|  1|   0.1|
|  7|2020-03-13|    C|BOO00OXXY|  2|   2.0|
+---+----------+-----+---------+---+------+

df2.printSchema
//    root
//    |-- id: integer (nullable = true)
//    |-- date_col: date (nullable = true)
//    |-- group: string (nullable = true)
//    |-- item: string (nullable = true)
//    |-- ts: integer (nullable = true)
//    |-- amount: double (nullable = true)

// write to single parquet file
df2.coalesce(1).write.parquet("test2_date_column_data")

Read DataFrame using Parquet.jl

# julia>
using Parquet, DataFrames

dff = DataFrame(read_parquet("test2_date_column_data"))
7×6 DataFrame
 Row │ id      date_col  group    item       ts      amount
     │ Int32?  Int32?    String?  String?    Int32?  Float64?
─────┼────────────────────────────────────────────────────────
   1 │      1     18332  A        BOO00OXXX       1      1.1
   2 │      2     18332  B        BOO00OXXY       2      2.1
   3 │      3     18333  B        BOO00OXXZ       3     17.08
   4 │      4     18333  D        BOO00OXXA       4      9.3
   5 │      5     18334  E        BOO00OXXB       5      1.99
   6 │      6     18334  A        BOO00OXXY       1      0.1
   7 │      7     18334  C        BOO00OXXY       2      2.0

eltype.(eachcol(dff))
# 6-element Vector{Union}:
#  Union{Missing, Int32}
#  Union{Missing, Int32}
#  Union{Missing, String}
#  Union{Missing, String}
#  Union{Missing, Int32}
#  Union{Missing, Float64}

The dates in date_col are all messed up.

mahiki commented 3 years ago

Addendum

A perhaps related to the root cause on dates problem, but not a priority for me.

When parquet columns are non-nullable type the Int/Float columns are read in as garbage

First create a spark dataframe with date-type column, write to parquet

val dfq = List(
      (1,"2020-03-11","A","BOO00OXXX",1, 1.10),
      (2,"2020-03-11","B","BOO00OXXY",2, 2.10),
      (3,"2020-03-12","B","BOO00OXXZ",3, 17.08),
      (4,"2020-03-12","D","BOO00OXXA",4, 9.3),
      (5,"2020-03-13","E","BOO00OXXB",5, 1.99),
      (6,"2020-03-13","A","BOO00OXXY",1, 0.10),
      (7,"2020-03-13","C","BOO00OXXY",2, 2.0)
    )
    .toDF("id","da_te","group","item","ts","amount")
    .select($"id"
        , to_date($"da_te","yyyy-MM-dd").as("date_col")
        , $"group"
        , $"item"
        , $"ts"
        , $"amount")

dfq.printSchema
//    root
//    |-- id: integer (nullable = false)
//    |-- date_col: date (nullable = true)
//    |-- group: string (nullable = true)
//    |-- item: string (nullable = true)
//    |-- ts: integer (nullable = false)
//    |-- amount: double (nullable = false)

dfq.show
+---+----------+-----+---------+---+------+
| id|  date_col|group|     item| ts|amount|
+---+----------+-----+---------+---+------+
|  1|2020-03-11|    A|BOO00OXXX|  1|   1.1|
|  2|2020-03-11|    B|BOO00OXXY|  2|   2.1|
|  3|2020-03-12|    B|BOO00OXXZ|  3| 17.08|
|  4|2020-03-12|    D|BOO00OXXA|  4|   9.3|
|  5|2020-03-13|    E|BOO00OXXB|  5|  1.99|
|  6|2020-03-13|    A|BOO00OXXY|  1|   0.1|
|  7|2020-03-13|    C|BOO00OXXY|  2|   2.0|
+---+----------+-----+---------+---+------+

dfq.coalesce(1).write.parquet("test_date_column_data")

Notice the data is the same as original set, but some columns are nullable = false, this is default behavior for toDF() for Int/Double.

Read parquet data as Julia DataFrame

(v1.6) pkg> generate pq_test_date
(v1.6) pkg> activate .
(pq_test_date) pkg> add DataFrames, Parquet
(pq_test_date) pkg> st
#        Project pq_test_date v0.1.0
#         Status `~/../pq_test_date/Project.toml`
#     [a93c6f00] DataFrames v1.2.0
#     [626c502c] Parquet v0.8.3

using Parquet, DataFrames
dfq = DataFrame(read_parquet("test_date_column_data"))

7×6 DataFrame
 Row │ id         date_col  group    item       ts         amount
     │ Int32      Int32?    String?  String?    Int32      Float64
─────┼──────────────────────────────────────────────────────────────────
   1 │         0     18332  A        BOO00OXXX          0  2.22659e-314
   2 │        13     18332  B        BOO00OXXY        150  3.0e-323
   3 │         0     18333  B        BOO00OXXZ          0  2.24929e-314
   4 │ 263668488     18333  D        BOO00OXXA  322394659  2.25631e-314
   5 │         1     18334  E        BOO00OXXB          1  2.24929e-314
   6 │        28     18334  A        BOO00OXXY          3  2.24916e-314
   7 │         0     18334  C        BOO00OXXY          0  2.24916e-314

eltype.(eachcol(dfq))
# 6-element Vector{Type}:
#  Int32
#  Union{Missing, Int32}
#  Union{Missing, String}
#  Union{Missing, String}
#  Int32
#  Float64

# reading it twice gives different numbers
dfq = DataFrame(read_parquet("test_date_column_data"))
7×6 DataFrame
 Row │ id         date_col  group    item       ts         amount
     │ Int32      Int32?    String?  String?    Int32      Float64
─────┼──────────────────────────────────────────────────────────────────
   1 │         0     18332  A        BOO00OXXX          0  2.25523e-314
   2 │       894     18332  B        BOO00OXXY        914  2.27273e-314
   3 │         0     18333  B        BOO00OXXZ          0  2.21165e-314
   4 │ 267651619     18333  D        BOO00OXXA  863662672  2.218e-314
   5 │         1     18334  E        BOO00OXXB          1  2.21052e-314
   6 │         4     18334  A        BOO00OXXY  877323664  2.21052e-314
   7 │         0     18334  C        BOO00OXXY          1  5.0e-324

I think the Int/Float columns are affected by the nullable attribute.

Notes:

ExpandingMan commented 3 years ago

Any chance you know whether it is writing Parquet's Int96 or Int64 timestamps? Trying to figure out what the status quo is here. Rather to my surprise Int96 seems to work for me, but Int64 does not. Was expecting it to be the other way around.

Oh, part of my problem is that I'm only testing with full timestamps and these are dates, so maybe we just can't read any dates right now :disappointed: .

mahiki commented 3 years ago

The date columns are spark DateType. I can attach the sample data to verify, I'm pretty ignorant about this meaning of this as part of the binary encoding present in the physical file.

I can change the date column to timestamp in my production sources, so perhaps this is a workaround.

mahiki commented 3 years ago

Attaching parquet output from spark, as a DateType date column, all columns nullable

ls -l test2_date_column_data
total 4
    0 Jul 18 22:46 _SUCCESS
 staff 1679 Jul 18 22:46 part-00000-dee59adb-aa01-46ea-9215-fa3f296bfd5b-c000.snappy.parquet

test2_date_column_data-zip.zip

mahiki commented 3 years ago

It looks like timestamps are working. DateTime doesn't support timezones without TimeZones.jl, so I'm ignoring the timestamp hours conversion from 00:00 to 19:00.

Spark write to parquet with TimestampType column

Test if DateType is not supported vs. TimeStamp

I guess if I had to choose I would support TimeStamp columns ahead of DateType

// scala>
val df_ts = spark.read.parquet("test_date_column_data")
   .withColumn("timestamp_col", $"date_col".cast("timestamp"))

scala> df_ts.show
+---+----------+-----+---------+---+------+-------------------+
| id|  date_col|group|     item| ts|amount|      timestamp_col|
+---+----------+-----+---------+---+------+-------------------+
|  1|2020-03-11|    A|BOO00OXXX|  1|   1.1|2020-03-11 00:00:00|
|  2|2020-03-11|    B|BOO00OXXY|  2|   2.1|2020-03-11 00:00:00|
|  3|2020-03-12|    B|BOO00OXXZ|  3| 17.08|2020-03-12 00:00:00|
|  4|2020-03-12|    D|BOO00OXXA|  4|   9.3|2020-03-12 00:00:00|
|  5|2020-03-13|    E|BOO00OXXB|  5|  1.99|2020-03-13 00:00:00|
|  6|2020-03-13|    A|BOO00OXXY|  1|   0.1|2020-03-13 00:00:00|
|  7|2020-03-13|    C|BOO00OXXY|  2|   2.0|2020-03-13 00:00:00|
+---+----------+-----+---------+---+------+-------------------+

df_ts.printSchema
// root
//  |-- id: integer (nullable = true)
//  |-- date_col: date (nullable = true)
//  |-- group: string (nullable = true)
//  |-- item: string (nullable = true)
//  |-- ts: integer (nullable = true)
//  |-- amount: double (nullable = true)
//  |-- timestamp_col: timestamp (nullable = true)

df_ts.coalesce(1).write.parquet("test_timestamp_column_data")

// shell
$> ls -l test_timestamp_column_data
total 4
-rw-r--r-- 1 user  staff    0 Jul 20 15:40 _SUCCESS
-rw-r--r-- 1 user  staff 1917 Jul 20 15:40 part-00000-7938c7c8-cead-410f-b001-5c0d9301880c-c000.snappy.parquet

Now read as a julia dataframe via Parquet.jl

# julia>
using Parquet, DataFrames

dfts = DataFrame(read_parquet("test_timestamp_column_data"))
7×7 DataFrame
 Row │ id      date_col  group    item       ts      amount    timestamp_col
     │ Int32?  Int32?    String?  String?    Int32?  Float64?  DateTime…?
─────┼─────────────────────────────────────────────────────────────────────────────
   1 │      1     18332  A        BOO00OXXX       1      1.1   2020-03-11T19:00:00
   2 │      2     18332  B        BOO00OXXY       2      2.1   2020-03-11T19:00:00
   3 │      3     18333  B        BOO00OXXZ       3     17.08  2020-03-12T19:00:00
   4 │      4     18333  D        BOO00OXXA       4      9.3   2020-03-12T19:00:00
   5 │      5     18334  E        BOO00OXXB       5      1.99  2020-03-13T19:00:00
   6 │      6     18334  A        BOO00OXXY       1      0.1   2020-03-13T19:00:00
   7 │      7     18334  C        BOO00OXXY       2      2.0   2020-03-13T19:00:00

eltype.(eachcol(dfts))
# 7-element Vector{Union}:
#  Union{Missing, Int32}
#  Union{Missing, Int32}
#  Union{Missing, String}
#  Union{Missing, String}
#  Union{Missing, Int32}
#  Union{Missing, Float64}
#  Union{Missing, Dates.DateTime}
mahiki commented 3 years ago

Attaching parquet output from spark write above

$> ls -l test_timestamp_column_data
total 4
-rw-r--r-- 1 user  staff    0 Jul 20 15:40 _SUCCESS
-rw-r--r-- 1 user  staff 1917 Jul 20 15:40 part-00000-7938c7c8-cead-410f-b001-5c0d9301880c-c000.snappy.parquet

test_timestamp_column_data-zip.zip

tanmaykm commented 3 years ago

Thanks for the detailed report! Yes Parquet.jl does not support all data types yet. From what I recollect regarding dates and timestamps, it is missing support for (each again with some underlying variants):

kainkad commented 1 year ago

As there are quite a few other issues raised regarding DateTime support for reader (#133) and writer (#102), I was wondering if you have any plans to include DateTime support in Parquet.jl?