dask / fastparquet

python implementation of the parquet columnar file format.
Apache License 2.0
772 stars 177 forks source link

Partioned column not found when metadata is not available. #749

Open dakenblack opened 2 years ago

dakenblack commented 2 years ago

What happened: dast.read_parquet is not able to find partioned column names when metadata is not available using the fastparquet engine. pandas (with fastparquet engine) and dask (with pyarrow engine) is able to find it though. The problem appears to lie with dask's fastparquet engine.

I've removed metadata, because for my usecase I find it is faster to update the parquet dataset without metadata. (i.e. periodic updates of metadata is too expensive).

What you expected to happen: All columns to be found by dask.

Minimal Complete Verifiable Example:

import pandas as pd
import dask.dataframe as dd
import fastparquet
import dask
from os import remove

def containsall(arr, elems):
    return all([e in arr for e in elems])

print(dask.__version__, fastparquet.__version__)
# OUT: 2022.01.1 0.8.0

df = pd.DataFrame([
    ('A', 'B', 1),
    ('A', 'C', 3),
], columns=['group1', 'group2', 'value'])

dd.from_pandas(df, npartitions=1).to_parquet(
    'test', 
    partition_on=['group1', 'group2'], 
    engine='fastparquet',
    write_metadata_file=False, # metadata NOT written
    overwrite=True,
    append=False,
)

# engine = 'pyarrow'
engine='fastparquet'
from_dd = dd.read_parquet('test', engine=engine)
from_pd = pd.read_parquet('test', engine=engine)
expected = ['group1', 'group2', 'value']

assert(containsall(from_pd.columns, expected))
assert(containsall(from_dd.columns, expected)) # fails

print('Success')

Anything else we need to know?: I've also tried specifying the columns to load in the columns argument, but that returns an error:

ValueError: The following columns were not found in the dataset {'group1'}
The following columns were found Index(['value', 'group2'], dtype='object')

Environment:

martindurant commented 2 years ago

duplicate of https://github.com/dask/dask/issues/8666 ?

dakenblack commented 2 years ago

Hi @martindurant, thankls for your response but I don't think it is a duplicate because

I've also further minimised my example.

I've also noticed, that only the partitioned columns with a single value are missing. If for example I changed my dataset to the one below, my code succeeds.

df = pd.DataFrame([
    ('A', 'B', 1),
    ('A', 'C', 3),
    ('B', 'C', 3),
], columns=['group1', 'group2', 'value'])
martindurant commented 2 years ago

@rjzamora , the fp engine allows a base_path argument to be able to infer the top of the parquet dataset tree correctly for this case, but I don't see how to pass it via dd.read_parquet.

martindurant commented 2 years ago

The situation is, that the root of the parquet dataset is not obvious in the case that there is no _metadata, and the top level of partitioning only has one option.

rjzamora commented 2 years ago

the fp engine allows a base_path argument to be able to infer the top of the parquet dataset tree correctly for this case, but I don't see how to pass it via dd.read_parquet.

This seems like the kind of thing I was hoping 8765 could help us with.