JuliaIO / Parquet.jl

Julia implementation of Parquet columnar file format reader
Other
112 stars 32 forks source link

read_parquet not picking up column names from partitioned dataset #154

Open mahiki opened 2 years ago

mahiki commented 2 years ago

I'm a package user, not a developer so I apologize in advance if I am missing something obvious in the API descriptions.

My use case: I'm consuming output of some data engineers to produce aggregated reports using Julia, my preferred language.

I have datasets that are written from Spark in parquet format, in the typical fashion of partitioning by certain columns. File layout below. Original complete dataset includes partitition columns dataset_date and device_family. The first being the date of observations, allowing incremental build of historical data, and the second is a natural partition of the data because of upstream logic.

Here are parquet file and Dataframe load operations that are not detecting the partitions.

Julia Version 1.6.1 Parquet v0.8.3 DataFrames v1.2.0

parquet files written by Spark: Spark version 2.4.4 Scala version 2.11.12 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)

shell> tree 
datasets
└── parquet
    └── jobs
        ├── d_table
        │   ├── dataset_date=2021-06-30
        │   │   ├── device_family=ABC
        │   │   │   └── part-00000-6655178e-0bbb-4741-ab6a-8efe6aebbded.c000.snappy.parquet
        │   │   ├── device_family=DEF
        │   │   │   └── part-00000-358d8620-c4e3-44cb-89d5-d00eb23ad52b.c000.snappy.parquet
        │   │   └── device_family=GHI
        │   │       └── part-00000-c10db044-3b38-4032-ad22-aab3edb8e1ba.c000.snappy.parquet
        │   └── dataset_date=2021-06-30_$folder$
        └── d_table_$folder$

datapath = "datasets/parquet/jobs/d_table/dataset_date=2021-06-30/device_family=ABC/part-00000-358d8620-c4e3-44cb-89d5-d00eb23ad52b.c000.snappy.parquet"

dataset_root = "datasets/parquet/jobs/d_table"

parquetfile = Parquet.File(datapath)
Parquet file: datasets/parquet/jobs/d_table/dataset_date=2021-06-30/device_family=ABC/part-00000-358d8620-c4e3-44cb-89d5-d00eb23ad52b.c000.snappy.parquet
    version: 1
    nrows: 19739
    created by: parquet-mr version 1.10.1 (build ae1ebff313617fb6ec367d105aeaf306bf27939c)
    cached: 0 column chunks

# Columns 'dataset_date' and  'device_family' is missing
colnames(parquetfile)
14-element Vector{Vector{String}}:
 ["country_cd"]
 ["month_end"]
 ["agg_level"]
 ["vert"]
 ["ten_group"]
 ["total_dialog"]
 ["users"]
 ["dialogs"]
 ["actives"]
 ["regist_cnt"]
 ["dia_user_rate"]
 ["act_pct"]
 ["penpct"]
 ["totaldia_userrate"]

# same for dataframe
df = DataFrame(read_parquet(dataset_root)

55919×14 DataFrame      # <-- note the number of rows includes data from all three partitions {ABC, DEF, GHI}
                                          #       but the partition columns are not present

propertynames(pdf)
14-element Vector{Symbol}:
:country_cd
:month_end
:agg_level
:vert
:ten_group
:total_dialog
:users
:dialogs
:actives
:regist_cnt
:dia_user_rate
:act_pct
:penpct
:totaldia_userrate
# same names, missing the partition columns "dataset_day" and "device_family"

Expected behavior is same result when reading from spark like the following, note the two last columns reflect the file paths.

val df = spark.read.parquet(dataset_root)

df.show(5)
+----------+-------------+---------+----------- ... -----------+------------+-------------+
|country_cd|month_end    |agg_level|     vert   ... totaldi    |dataset_date|device_family|
+----------+-------------+---------+----------- ... -----------+------------+-------------+
|        AU|   2021-01-31|  MONTHLY| XYZabc1234 ...   30.209284|  2021-06-30|          ABC|
|        BR|   2020-12-31|  MONTHLY| XYZabc1234 ...    73.46783|  2021-05-31|          DEF|
|        BR|   2020-02-29|  MONTHLY| XYZabc1234 ...   21.850622|  2021-05-31|          DEF|
|        BR|   2020-05-31|  MONTHLY| XYZabc1234 ...    377.8711|  2021-06-30|          GHI|
|        DE|   2020-08-31|  MONTHLY| XYZabc1234 ...   71.680115|  2021-06-30|          ABC|
+----------+-------------+---------+----------- ... -----------+------------+-------------+
mahiki commented 2 years ago

Related to: https://github.com/JuliaIO/Parquet.jl/issues/139 https://github.com/JuliaIO/Parquet.jl/pull/138

The docstring for read_parquet says the following for column_generator kwarg, so I think this does not need to be set:

column_generator: Function to generate a partitioned column when not found in the partitioned table. Parameters provided to the function: table, column index, length of column to generate. Default implementation determines column values from the table path

(Emphasis mine) I'm pretty sure the intention is for read_parquet to build column names from partitions in just the same way spark does, which would be great 😊. Must just be a bug in implementation, perhaps related to the change above about metadata files.

mahiki commented 2 years ago

This is a helpful reference about spark partition discovery, from @tk3369 https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery

mahiki commented 2 years ago

There is type inference from column path, this from the spark repo doc sql-data-sources-parquet.md:

Notice that the data types of the partitioning columns are automatically inferred. Currently, numeric data types, date, timestamp and string type are supported.

Pretty sure parquet files can encode metadata into the files, and maybe column type is stored in metadata for some parquet writer applications, but I think for spark the column names come from the path and the type is inferred.

If Parquet.jl is inferring column types the way CSV.jl does I think that would be a fine way to solve the problem, matching what spark provides for

# types inferred for columns
numeric data types
date
timestamp
string
tanmaykm commented 2 years ago

read_parquet depends on the table metadata being available in a file named _common_metadata or _metadata in the dataset folder. If that is not available, then it assumes that each of the partitioned parquet files has the complete metadata.

I do not have much experience with how Spark does it, but it does seem possible: https://arrow.apache.org/docs/python/parquet.html#writing-metadata-and-common-medata-files

Does the dataset you are consuming have the metadata files?

mahiki commented 2 years ago

Hi -- sorry for the long lapse.

Generally there are no files _metadata or _common_metadata, I think in Spark this is a controllable configuration but the typical data lakes I'm working with do not have these. I work with many data sets produced by other teams, so it seems in order to incorporate Julia processing steps I will need to pre-process these files again in spark or python.

I know this is less of a burden in the data science area, where it is expected to pre-process data before ingesting to models or stats work. I would like to use Julia in various data transformations as part of the data engineering components of the work.