apache / arrow

Apache Arrow is a multi-language toolbox for accelerated data interchange and in-memory processing
https://arrow.apache.org/
Apache License 2.0
14.12k stars 3.44k forks source link

[Python][Dataset] The first table schema becomes a common schema for the full Dataset #27905

Open asfimport opened 3 years ago

asfimport commented 3 years ago

The first table schema becomes a common schema for the full Dataset. It could cause problems with sparse data.

Consider example below, when first chunks is full of NA, pyarrow ignores dtypes from pandas for a whole dataset:


# get dataset
!wget https://physionet.org/files/mimiciii-demo/1.4/D_ITEMS.csv

import pandas as pd 
import pyarrow.parquet as pq
import pyarrow as pa
import pyarrow.dataset as ds
import shutil
from pathlib import Path

def foo(input_csv='D_ITEMS.csv', output='tmp.parquet', chunksize=1000):
    if Path(output).exists():
        shutil.rmtree(output)    # write dataset
    d_items = pd.read_csv(input_csv, index_col='row_id',
                      usecols=['row_id', 'itemid', 'label', 'dbsource', 'category', 'param_type'],
                      dtype={'row_id': int, 'itemid': int, 'label': str, 'dbsource': str,
                             'category': str, 'param_type': str}, chunksize=chunksize)    for i, chunk in enumerate(d_items):
        table = pa.Table.from_pandas(chunk)
        if i == 0:
            schema1 = pa.Schema.from_pandas(chunk)
            schema2 = table.schema
#         print(table.field('param_type'))
        pq.write_to_dataset(table, root_path=output)

    # read dataset
    dataset = ds.dataset(output)

    # compare schemas
    print('Schemas are equal: ', dataset.schema == schema1 == schema2)
    print(dataset.schema.types)
    print('Should be string', dataset.schema.field('param_type'))    
    return dataset

dataset = foo()
dataset.to_table()

>>>Schemas are equal:  False
[DataType(int64), DataType(string), DataType(string), DataType(null), DataType(null), DataType(int64)]
Should be string pyarrow.Field<param_type: null>
---------------------------------------------------------------------------
ArrowTypeError: fields had matching names but differing types. From: category: string To: category: null

If you do schemas listing, you'll see that almost all parquet files ignored pandas dtypes:


import os

for i in os.listdir('tmp.parquet/'):
    print(ds.dataset(os.path.join('tmp.parquet/', i)).schema.field('param_type'))

>>>pyarrow.Field<param_type: null>
pyarrow.Field<param_type: string>
pyarrow.Field<param_type: null>
pyarrow.Field<param_type: null>
pyarrow.Field<param_type: null>
pyarrow.Field<param_type: null>
pyarrow.Field<param_type: null>
pyarrow.Field<param_type: string>
pyarrow.Field<param_type: null>
pyarrow.Field<param_type: string>
pyarrow.Field<param_type: string>
pyarrow.Field<param_type: null>
pyarrow.Field<param_type: null>

But if we will get bigger chunk of data, that contains non NA values, everything is OK:


dataset = foo(chunksize=10000)
dataset.to_table()

>>>Schemas are equal:  True
[DataType(int64), DataType(string), DataType(string), DataType(string), DataType(string), DataType(int64)]
Should be string pyarrow.Field<param_type: string>
pyarrow.Table
itemid: int64
label: string
dbsource: string
category: string
param_type: string
row_id: int64

Check NA in data:


pd.read_csv('D_ITEMS.csv', nrows=1000)['param_type'].unique()
>>>array([nan])

pd.read_csv('D_ITEMS.csv', nrows=10000)['param_type'].unique()
>>>array([nan, 'Numeric', 'Text', 'Date time', 'Solution', 'Process',
       'Checkbox'], dtype=object)

 

 PS: switching issues reporting from github to Jira is outstanding move

 

Reporter: Borys Kabakov

Related issues:

Note: This issue was originally created as ARROW-12080. Please see the migration documentation for further details.

asfimport commented 3 years ago

Borys Kabakov: Probably things even worse, because the code snippet below crashes at random:

 


import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pandas as pd
import numpy as np
import shutil
from pathlib import Path

output='./tmp.parquet'
if Path(output).exists():
    shutil.rmtree(output)

df = pd.DataFrame({'A': [np.nan, np.nan, '3', np.nan],
                   'B': ['1', '2', '3', np.nan]},
                  dtype=str).to_csv('tmp.csv', index=False)

for i, chunk in enumerate(pd.read_csv('tmp.csv', dtype={'A': str, 'B': str}, chunksize=1)):
    table = pa.Table.from_pandas(chunk)
    print(table.schema.types)
    pq.write_to_dataset(table, root_path=output)

dataset = ds.dataset('./tmp.parquet')
dataset.to_table().to_pandas()

Examples of output:

 


# first example
[DataType(null), DataType(string)]
[DataType(null), DataType(string)]
[DataType(string), DataType(string)]
[DataType(null), DataType(null)]
      A     B
0     3     3
1  None     2
2  None  None
3  None     1

# second example
[DataType(null), DataType(string)]
[DataType(null), DataType(string)]
[DataType(string), DataType(string)]
[DataType(null), DataType(null)]
-----------------------------
ArrowTypeError: fields had matching names but differing types. From: A: string To: A: null
asfimport commented 3 years ago

Borys Kabakov: It affects not only datasets, but writing to a single file too:


import pandas as pd 
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path

def bar(input_csv='D_ITEMS.csv', output='tmp.parquet', chunksize=1000):
    Path(output).unlink(missing_ok=True)
    pqwriter = None

    if Path(output).exists():
        shutil.rmtree(output)    # write file
    d_items = pd.read_csv(input_csv, index_col='row_id',
                      usecols=['row_id', 'itemid', 'label', 'dbsource', 'category', 'param_type'],
                      dtype={'row_id': int, 'itemid': int, 'label': str, 'dbsource': str,
                             'category': str, 'param_type': str}, chunksize=chunksize)    for i, chunk in enumerate(d_items):
        table = pa.Table.from_pandas(chunk)
        if i == 0:
            # create a parquet write object giving it an output file
            pqwriter = pq.ParquetWriter(output, table.schema)
        pqwriter.write_table(table)

    # close the parquet writer
    if pqwriter:
        pqwriter.close()

    df = pd.read_parquet(output)
    return df

# all will be fine
# returned dataframe equal to 'D_ITEMS.csv'
df = bar(chunksize=10000)

# it will crash
# same reason as before -- only NAs in the first chunk
df = bar(chunksize=1000)

>>>
---------------------------------------------------------------------------
ValueError: Table schema does not match schema used to create file: 
table:
itemid: int64
label: string
dbsource: string
category: null
param_type: null
row_id: int64
-- schema metadata --
pandas: '{"index_columns": ["row_id"], "column_indexes": [{"name": null, ' + 877 vs. 
file:
itemid: int64
label: string
dbsource: string
category: string
param_type: null
row_id: int64
-- schema metadata --
pandas: '{"index_columns": ["row_id"], "column_indexes": [{"name": null, ' + 879
asfimport commented 3 years ago

Joris Van den Bossche / @jorisvandenbossche: [~banderlog] sorry for the slow response.

The first table schema becomes a common schema for the full Dataset

That's indeed the current behaviour (but I see that this should be documented better). See ARROW-8221 for a general issue about expanding this (to eg unifying the schema across all files).

A workaround for now is to manually specify the schema (of course, in case of CSV you actually need to parse the data to get the schema ..). You could read once a bigger chunk to get the proper schema, and then use that schema to pass to ds.dataset(..). Or if you know the schema of the file, you can create it manually with pa.schema(..) (similarly as you pass a dict of types to pandas.read_csv).

In your specific case, you can actually already specify the scheme in table = pa.Table.from_pandas(chunk) before writing to parquet. By doing that, you can ensure that the parquet files have the proper types, and then subsequent reading of the Parquet dataset will work fine without needing to specify the schema manually.