rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.23k stars 884 forks source link

[QST] `dask_cudf.read_parquet` failed with "NotImplementedError: large_string" #13039

Open stucash opened 1 year ago

stucash commented 1 year ago

I am a new user of dask/dask_cudf. I have parquet files of various sizes (11GB, 2.5GB, 1.1GB), all of which failed with NotImplementedError: large_string.

My dask.dataframe backend is cudf. When the backend is pandas, read.parquet works fine.

Here's an exerpt of what my data looks like in csv format:

Symbol,Date,Open,High,Low,Close,Volume
AADR,17-Oct-2017 09:00,57.47,58.3844,57.3645,58.3844,2094
AADR,17-Oct-2017 10:00,57.27,57.2856,57.25,57.27,627
AADR,17-Oct-2017 11:00,56.99,56.99,56.99,56.99,100
AADR,17-Oct-2017 12:00,56.98,57.05,56.98,57.05,200
AADR,17-Oct-2017 13:00,57.14,57.16,57.14,57.16,700
AADR,17-Oct-2017 14:00,57.13,57.13,57.13,57.13,100
AADR,17-Oct-2017 15:00,57.07,57.07,57.07,57.07,200
AAMC,17-Oct-2017 09:00,87,87,87,87,100
AAU,17-Oct-2017 09:00,1.1,1.13,1.0832,1.121,67790
AAU,17-Oct-2017 10:00,1.12,1.12,1.12,1.12,100
AAU,17-Oct-2017 11:00,1.125,1.125,1.125,1.125,200
AAU,17-Oct-2017 12:00,1.1332,1.15,1.1332,1.15,27439
AAU,17-Oct-2017 13:00,1.15,1.15,1.13,1.13,8200
AAU,17-Oct-2017 14:00,1.1467,1.1467,1.14,1.1467,1750
AAU,17-Oct-2017 15:00,1.1401,1.1493,1.1401,1.1493,4100
AAU,17-Oct-2017 16:00,1.13,1.13,1.13,1.13,100
ABE,17-Oct-2017 09:00,14.64,14.64,14.64,14.64,200
ABE,17-Oct-2017 10:00,14.67,14.67,14.66,14.66,1200
ABE,17-Oct-2017 11:00,14.65,14.65,14.65,14.65,600
ABE,17-Oct-2017 15:00,14.65,14.65,14.65,14.65,836

What I did was really simple:

import dask.dataframe as dd
import cudf
import dask_cudf

# Failed with large_string error
dask_cudf.read_parquet('path/to/my.parquet')
# Failed with large_string error
dd.read_parquet('path/to/my.parquet')

The only large string I could think of is the timestamp string.

Is there a way around this in cudf as it is not implemented yet? The format is 2023-03-12 09:00:00+00:00.

stucash commented 1 year ago

Team can you please take a look? This is currently a show stopper for me and I am literally freezed with GPU related development. Thanks a lot!

stucash commented 1 year ago

I've got a few more bug-like issues, I'll raise them here shortly.

stucash commented 1 year ago

In the meanwhile if someone has got a good introduction/tutorial about cudf other than the one already posted like 10-minute series, please throw it in here.

stucash commented 1 year ago

I've appended full error log below:


NotImplementedError                       Traceback (most recent call last)
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/backends.py:133, in CreationDispatch.register_inplace.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    132 try:
--> 133     return func(*args, **kwargs)
    134 except Exception as e:

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/dataframe/io/parquet/core.py:530, in read_parquet(path, columns, filters, categories, index, storage_options, engine, use_nullable_dtypes, calculate_divisions, ignore_metadata_file, metadata_task_size, split_row_groups, chunksize, aggregate_files, parquet_file_extension, filesystem, **kwargs)
    528     index = [index]
--> 530 read_metadata_result = engine.read_metadata(
    531     fs,
    532     paths,
    533     categories=categories,
    534     index=index,
    535     use_nullable_dtypes=use_nullable_dtypes,
    536     gather_statistics=calculate_divisions,
    537     filters=filters,
    538     split_row_groups=split_row_groups,
    539     chunksize=chunksize,
    540     aggregate_files=aggregate_files,
    541     ignore_metadata_file=ignore_metadata_file,
    542     metadata_task_size=metadata_task_size,
    543     parquet_file_extension=parquet_file_extension,
    544     dataset=dataset_options,
    545     read=read_options,
    546     **other_options,
    547 )
    549 # In the future, we may want to give the engine the
    550 # option to return a dedicated element for `common_kwargs`.
    551 # However, to avoid breaking the API, we just embed this
    552 # data in the first element of `parts` for now.
    553 # The logic below is inteded to handle backward and forward
    554 # compatibility with a user-defined engine.

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask_cudf/io/parquet.py:41, in CudfEngine.read_metadata(*args, **kwargs)
     39 if parts:
     40     # Re-set "object" dtypes align with pa schema
---> 41     set_object_dtypes_from_pa_schema(
     42         new_meta,
     43         parts[0].get("common_kwargs", {}).get("schema", None),
     44     )
     46 # If `strings_to_categorical==True`, convert objects to int32

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask_cudf/io/parquet.py:403, in set_object_dtypes_from_pa_schema(df, schema)
    402     continue
--> 403 typ = cudf_dtype_from_pa_type(schema.field(col_name).type)
    404 if (
    405     col_name in schema.names
    406     and not isinstance(typ, (cudf.ListDtype, cudf.StructDtype))
    407     and isinstance(col, cudf.core.column.StringColumn)
    408 ):

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/cudf/utils/dtypes.py:231, in cudf_dtype_from_pa_type(typ)
    230 else:
--> 231     return cudf.api.types.pandas_dtype(typ.to_pandas_dtype())

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/pyarrow/types.pxi:220, in pyarrow.lib.DataType.to_pandas_dtype()

NotImplementedError: large_string

The above exception was the direct cause of the following exception:

NotImplementedError                       Traceback (most recent call last)
File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/backends.py:133, in CreationDispatch.register_inplace.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    132 try:
--> 133     return func(*args, **kwargs)
    134 except Exception as e:

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask_cudf/backends.py:497, in CudfBackendEntrypoint.read_parquet(engine, *args, **kwargs)
    495 from dask_cudf.io.parquet import CudfEngine
--> 497 return _default_backend(
    498     dd.read_parquet,
    499     *args,
    500     engine=CudfEngine,
    501     **kwargs,
    502 )

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask_cudf/backends.py:446, in _default_backend(func, *args, **kwargs)
    445 with config.set({"dataframe.backend": "pandas"}):
--> 446     return func(*args, **kwargs)

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/backends.py:135, in CreationDispatch.register_inplace.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    134 except Exception as e:
--> 135     raise type(e)(
    136         f"An error occurred while calling the {funcname(func)} "
    137         f"method registered to the {self.backend} backend.\n"
    138         f"Original Message: {e}"
    139     ) from e

NotImplementedError: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: large_string

The above exception was the direct cause of the following exception:

NotImplementedError                       Traceback (most recent call last)
Cell In[9], line 1
----> 1 dask_cudf.read_parquet('/home/demo/Live-usb-storage/projects/.share/data/test_cudf.parquet')

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask_cudf/io/parquet.py:465, in read_parquet(path, columns, **kwargs)
    462         kwargs["read"] = {}
    463     kwargs["read"]["check_file_size"] = check_file_size
--> 465 return dd.read_parquet(path, columns=columns, engine=CudfEngine, **kwargs)

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask/backends.py:135, in CreationDispatch.register_inplace.<locals>.decorator.<locals>.wrapper(*args, **kwargs)
    133     return func(*args, **kwargs)
    134 except Exception as e:
--> 135     raise type(e)(
    136         f"An error occurred while calling the {funcname(func)} "
    137         f"method registered to the {self.backend} backend.\n"
    138         f"Original Message: {e}"
    139     ) from e

NotImplementedError: An error occurred while calling the read_parquet method registered to the cudf backend.
Original Message: An error occurred while calling the read_parquet method registered to the pandas backend.
Original Message: large_string
stucash commented 1 year ago

Reading the log, I found that cudf is converting pyarrow explicitly to pandas dtypes (before they become cudf dtypes); therefore I tried using pandas to write the same data to a parquet file.

The pandas-written parquet file was successfully converted to a dask_cudf dataframe.

Here's the highlighted line of code in error log that did this:

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask_cudf/io/parquet.py:41, in CudfEngine.read_metadata(*args, **kwargs)
     39 if parts:
     40     # Re-set "object" dtypes align with pa schema
---> 41     **set_object_dtypes_from_pa_schema**(
     42         new_meta,
     43         parts[0].get("common_kwargs", {}).get("schema", None),
     44     )
     46 # If `strings_to_categorical==True`, convert objects to int32

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/dask_cudf/io/parquet.py:403, in set_object_dtypes_from_pa_schema(df, schema)
    402     continue
--> 403 typ = **cudf_dtype_from_pa_type(schema.field(col_name).type)**
    404 if (
    405     col_name in schema.names
    406     and not isinstance(typ, (cudf.ListDtype, cudf.StructDtype))
    407     and isinstance(col, cudf.core.column.StringColumn)
    408 ):

File ~/Live-usb-storage/projects/python/alpha/lib/python3.10/site-packages/cudf/utils/dtypes.py:231, in cudf_dtype_from_pa_type(typ)
    230 else:
--> 231     **return cudf.api.types.pandas_dtype(typ.to_pandas_dtype()**)

parquet is simply a data container, whoever writes parquet is going to write a parquet, after all.

Until I found that when we write parquet using different library, the parquet file can be different! (The original parquet that failed was written by polars).

By the way, pandas and polars both called pa.write_table for parquet creation.

I am not sure whether this involves support for polars or we could replace pandas dtype conversion with a python dtypes conversion, but at the moment understanding only parquet from pandas is fairly limited.

Hope you guys can take a look, thanks a lot!

rjzamora commented 1 year ago

Thanks for raising @stucash - I'll take a look at this today to see how I can help.

rjzamora commented 1 year ago

@stucash - I cannot be 100% certain without having a complete write + read reproducer to run locally. However, it looks like your original dataset may contain extremely large row-groups. Unfortunately, until very recently the default row-group size in PyArrow was 64Mi rows, which can sometimes result in string columns that cannot be read back by cudf (since cudf has a 2B character limit for an individual single string column; see: cudf#3958).

If the problem is that the row-groups are too large, you will need to rewrite the files with polars or pandas, passing through row_group_size=<something-smaller> to the pyarrow backend.

It may also be possible that your row-groups are within the cudf limit, but that pyarrow is choosing to use a large_string when converting the dtype to pandas (note that cudf's read_parquet code currently leans on the arrow's native pandas logic to figure out what cudf dtypes to use). If I can get my hands on a reproducer for this, we can probably resolve the problem in cudf/dask-cudf.

rjzamora commented 1 year ago

I tried using pandas to write the same data to a parquet file... The pandas-written parquet file was successfully converted to a dask_cudf dataframe.

Hmmm, this is interesting. I was expecting the same pyarrow issue to show up for a pandas-written parquet file as well (since both are presumably using arrow as the backend). Good to know.

stucash commented 1 year ago

@rjzamora Thanks for taking the time to investigate;

I've got 7 parquet files (1.5GB ish per file) originally from polars, all of which failed with dask_cudf, and once all were rewritten by pandas they could be read by dask_cudf in one go with no problem.

Let me prepare a reproducer with data; in the meanwhile I'll try your suggestion of the row_group_size to pyarrow backend.

GregoryKimball commented 1 year ago

Thank you @stucash for posting this. It also occurs to me that libcudf does not support the large_string type that we see in pyarrow. Have you tried converting your column to a string type instead of long_string?

CarloNicolini commented 3 months ago

I've been having this kind of issues using NVTabular that relies on cudf. Everything smooth when dataframes were saved throught pandas.to_parquet function, but lot of problems when using Polars .write_parquet.

Given that pandas was not an option due to its slowness, I managed to save the polars dataframe through a manually crafted function that uses pyarrow.

This part simplifies the schema removing the pa.types.large_string, pa.types.large_list in favor of their non_large counterpart.


def pyarrow_simplified_schema(schema: pa.Schema) -> pa.Schema:
    """
    Convert LargeList<LargeString> fields to LargeList<String> in a PyArrow schema.

    Parameters
    ----------
    schema : pa.Schema
        The original schema of the PyArrow Table.

    Returns
    -------
    pa.Schema
        A new schema where all LargeList<LargeString> fields are converted to LargeList<String>.
    """
    fields = []
    for field in schema:
        if pa.types.is_float64(field.type):
            warn(
                f"NVTabular does not support double precision, downcasting {field.name} to float32"
            )
            fields.append(pa.field(field.name, pa.float32()))
        elif pa.types.is_large_list(field.type) or pa.types.is_list(field.type):
            if pa.types.is_large_string(field.type.value_type):
                fields.append(pa.field(field.name, pa.list_(pa.string())))
            elif pa.types.is_float64(field.type.value_type):
                warn(
                    f"NVTabular does not support double precision, downcasting {field.name} to float32"
                )
                fields.append(pa.field(field.name, pa.list_(pa.float32())))
            else:
                # passthrough on other types
                fields.append(pa.field(field.name, pa.list_(field.type.value_type)))
        elif pa.types.is_large_string(field.type):
            fields.append(pa.field(field.name, pa.string()))
        else:
            # passthrough on other types
            fields.append(field)
    return pa.schema(fields)

After this I was able to load everything correctly using the cudf implementation of NVTabular.

beckernick commented 4 days ago

@GregoryKimball @rjzamora , is this now likely resolved due to the merge of large string support and interop with pyarrow in v24.08?