rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.45k stars 908 forks source link

[FEA] GPU-accelerated Feather support. #11407

Open bdice opened 2 years ago

bdice commented 2 years ago

Is your feature request related to a problem? Please describe. A user with existing Feather data files asked about GPU-accelerated Feather support. cuDF currently supports CPU-based Feather reading but does not use GPU acceleration.

Describe the solution you'd like cudf.read_feather should be GPU accelerated.

Describe alternatives you've considered Converting files to another format like Parquet is probably ideal for performance and can leverage existing GPU accelerated I/O. This conversion can be done with pandas.

Additional context Feather format description: https://github.com/wesm/feather/blob/master/doc/FORMAT.md

kkraus14 commented 2 years ago

FYI -- https://issues.apache.org/jira/browse/ARROW-17092

I believe the link above is for Feather V1 and the IPC File Format is Feather V2.

github-actions[bot] commented 2 years ago

This issue has been labeled inactive-30d due to no recent activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with a comment indicating any updates or changes to the original issue and/or confirm this issue still needs to be addressed. This issue will be labeled inactive-90d if there is no activity in the next 60 days.

mjshare commented 1 year ago

It's a valuable need. Don't the community have a plan?

beckernick commented 1 year ago

Problem description: When there are a large number of string columns, data in feather format is much smaller than that in arrow format. When data is transmitted to the GPU in feather format for decompression, the PICE bandwidth pressure is much lower. This requirement is valuable. Can the GPU support the feather format for decompression in GPU like parquet?

Originally posted by @mjshare in https://github.com/rapidsai/cudf/issues/12453#issue-1514306164

Similar requirements, Does the community have a solution plan? This reduces the bandwidth consumption of the GPU. @davidwendt

Originally posted by @mjshare in https://github.com/rapidsai/cudf/issues/12453#issuecomment-1369787656

@mjshare , would you be able to provide an example in which Feather vs Arrow makes a significant difference in a workload? Are you working in C++ or Python?

infzo commented 1 year ago

Problem description: When there are a large number of string columns, data in feather format is much smaller than that in arrow format. When data is transmitted to the GPU in feather format for decompression, the PICE bandwidth pressure is much lower. This requirement is valuable. Can the GPU support the feather format for decompression in GPU like parquet?

Originally posted by @mjshare in #12453 (comment)

Similar requirements, Does the community have a solution plan? This reduces the bandwidth consumption of the GPU. @davidwendt

Originally posted by @mjshare in #12453 (comment)

@mjshare , would you be able to provide an example in which Feather vs Arrow makes a significant difference in a workload? Are you working in C++ or Python?

>>> import pyarrow
>>> import numpy as np
>>>
>>> def create_table(n_rows, n_cols):
...     table = pyarrow.Table.from_pydict({f'col_{c}': np.random.randint(0, 10000, size=[n_rows]) for c in range(n_cols)})
...     return table
...
>>>
>>> tbl = create_table(8000 * 10000, 6)
>>> print(f'arrow table size {tbl.nbytes // 1024 // 1024} MB')
arrow table size 3662 MB
>>>
>>> import pyarrow.feather as feather
>>> feather.write_feather(tbl, '/tmp/data.feather')
>>>
>>> import os
>>> feather_size = os.path.getsize('/tmp/data.feather')
>>> print(f'feather file size {feather_size // 1024 // 1024} MB')
feather file size 1755 MB
>>> import pyarrow
>>> import numpy as np
>>>
>>> def create_table(n_rows, n_cols):
...     table = pyarrow.Table.from_pydict({f'col_{c}': np.random.randint(0, 10000, size=[n_rows]) for c in range(n_cols)})
...     return table
...
>>>
>>> def create_table_with_str(n_rows, n_cols, n_strs, n_strs_cols):
...     start = time.time()
...     prefix = 'xxxx_' * ((n_strs - 10) // 5)
...     cdf = create_table(n_rows, n_cols).to_pandas()
...     for i in range(n_strs_cols):
...         cdf[f'col_{i}'] = cdf[f'col_{i}'].apply(lambda x: f'{prefix}{x:010}')
...     return pyarrow.Table.from_pandas(cdf)
...
>>>
>>> tbl = create_table_with_str(2000 * 10000, 2, 40, 2)
>>> print(tbl.to_pandas())
                                             col_0                                     col_1
0         xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000008295  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000007592
1         xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000004599  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000002469
2         xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000004553  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000008704
3         xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000004059  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000003143
4         xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000006622  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000000062
...                                            ...                                       ...
19999995  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000009539  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000002897
19999996  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000000073  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000009834
19999997  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000002616  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000008283
19999998  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000001085  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000005927
19999999  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000008378  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000002684

[20000000 rows x 2 columns]
>>>
>>> print(f'arrow table size {tbl.nbytes // 1024 // 1024} MB')
arrow table size 1678 MB
>>>
>>> import pyarrow.feather as feather
>>> feather.write_feather(tbl, '/tmp/data.feather')
>>>
>>> import os
>>> feather_size = os.path.getsize('/tmp/data.feather')
>>> print(f'feather file size {feather_size // 1024 // 1024} MB')
feather file size 352 MB
>>>
mjshare commented 1 year ago

Problem description: When there are a large number of string columns, data in feather format is much smaller than that in arrow format. When data is transmitted to the GPU in feather format for decompression, the PICE bandwidth pressure is much lower. This requirement is valuable. Can the GPU support the feather format for decompression in GPU like parquet?

Originally posted by @mjshare in #12453 (comment)

Similar requirements, Does the community have a solution plan? This reduces the bandwidth consumption of the GPU. @davidwendt

Originally posted by @mjshare in #12453 (comment) @mjshare , would you be able to provide an example in which Feather vs Arrow makes a significant difference in a workload? Are you working in C++ or Python?

>>> import pyarrow
>>> import numpy as np
>>>
>>> def create_table(n_rows, n_cols):
...     table = pyarrow.Table.from_pydict({f'col_{c}': np.random.randint(0, 10000, size=[n_rows]) for c in range(n_cols)})
...     return table
...
>>>
>>> tbl = create_table(8000 * 10000, 6)
>>> print(f'arrow table size {tbl.nbytes // 1024 // 1024} MB')
arrow table size 3662 MB
>>>
>>> import pyarrow.feather as feather
>>> feather.write_feather(tbl, '/tmp/data.feather')
>>>
>>> import os
>>> feather_size = os.path.getsize('/tmp/data.feather')
>>> print(f'feather file size {feather_size // 1024 // 1024} MB')
feather file size 1755 MB
>>> import pyarrow
>>> import numpy as np
>>>
>>> def create_table(n_rows, n_cols):
...     table = pyarrow.Table.from_pydict({f'col_{c}': np.random.randint(0, 10000, size=[n_rows]) for c in range(n_cols)})
...     return table
...
>>>
>>> def create_table_with_str(n_rows, n_cols, n_strs, n_strs_cols):
...     start = time.time()
...     prefix = 'xxxx_' * ((n_strs - 10) // 5)
...     cdf = create_table(n_rows, n_cols).to_pandas()
...     for i in range(n_strs_cols):
...         cdf[f'col_{i}'] = cdf[f'col_{i}'].apply(lambda x: f'{prefix}{x:010}')
...     return pyarrow.Table.from_pandas(cdf)
...
>>>
>>> tbl = create_table_with_str(2000 * 10000, 2, 40, 2)
>>> print(tbl.to_pandas())
                                             col_0                                     col_1
0         xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000008295  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000007592
1         xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000004599  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000002469
2         xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000004553  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000008704
3         xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000004059  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000003143
4         xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000006622  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000000062
...                                            ...                                       ...
19999995  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000009539  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000002897
19999996  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000000073  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000009834
19999997  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000002616  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000008283
19999998  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000001085  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000005927
19999999  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000008378  xxxx_xxxx_xxxx_xxxx_xxxx_xxxx_0000002684

[20000000 rows x 2 columns]
>>>
>>> print(f'arrow table size {tbl.nbytes // 1024 // 1024} MB')
arrow table size 1678 MB
>>>
>>> import pyarrow.feather as feather
>>> feather.write_feather(tbl, '/tmp/data.feather')
>>>
>>> import os
>>> feather_size = os.path.getsize('/tmp/data.feather')
>>> print(f'feather file size {feather_size // 1024 // 1024} MB')
feather file size 352 MB
>>>

In typical scenarios, an integer can be compressed to 1/2, and a character string can be compressed to 1/4.

beckernick commented 1 year ago

Writing the file out to Feather will compress the data using LZ4 by default. Writing out to Parquet will compress the data using snappy by default, but you can select LZ4 if it matters for your dataset / use case.

cuDF providers accelerated readers for Parquet. Could you use that for your use case?

infzo commented 1 year ago

Writing the file out to Feather will compress the data using LZ4 by default. Writing out to Parquet will compress the data using snappy by default, but you can select LZ4 if it matters for your dataset / use case.

cuDF providers accelerated readers for Parquet. Could you use that for your use case?

>>> def create_table(n_rows, n_cols):
...     table = pyarrow.Table.from_pydict({f'col_{c}': np.random.randint(0, 10000, size=[n_rows]) for c in range(n_cols)})
...     return table
...
>>> def create_table_with_str(n_rows, n_cols, n_strs, n_strs_cols):
...     prefix = 'xxxx_' * ((n_strs - 10) // 5)
...     cdf = create_table(n_rows, n_cols).to_pandas()
...     for i in range(n_strs_cols):
...         cdf[f'col_{i}'] = cdf[f'col_{i}'].apply(lambda x: f'{prefix}{x:010}')
...     return pyarrow.Table.from_pandas(cdf)
...
>>> tbl = create_table_with_str(2000 * 10000, 2, 40, 2)
>>> print(f'arrow table size {tbl.nbytes // 1024 // 1024} MB')
arrow table size 1678 MB
>>>
>>> def arrow_cudf():
...     start = time.time()
...     df = cudf.DataFrame.from_arrow(tbl)
...     print(f'arrow_cudf cost {time.time() - start} s')
...
>>> arrow_cudf()
arrow_cudf cost 0.3969125747680664 s
>>>
>>> arrow_cudf()
arrow_cudf cost 0.39836621284484863 s
>>>
>>> def feather():
...     import pyarrow.feather as feather
...     start = time.time()
...     feather.write_feather(tbl, '/tmp/data.feather')
...     print(f'write_feather cost {time.time() - start} s')
...     feather_size = os.path.getsize('/tmp/data.feather')
...     print(f'feather file size {feather_size // 1024 // 1024} MB')
...     start = time.time()
...     df = cudf.read_feather('/tmp/data.feather')
...     print(f'read_feather cost {time.time() - start} s')
...
>>> feather()
write_feather cost 0.8628420829772949 s
feather file size 352 MB
read_feather cost 1.010714054107666 s
>>>
>>> feather()
write_feather cost 0.8496232032775879 s
feather file size 352 MB
read_feather cost 1.0259618759155273 s
>>>
>>> def parquet():
...     start = time.time()
...     parquet.write_table(tbl, '/tmp/data.parquet', compression='LZ4')
...     print(f'write_parquet cost {time.time() - start} s')
...     parquet_size = os.path.getsize('/tmp/data.parquet')
...     print(f'parquet file size {parquet_size // 1024 // 1024} MB')
...     start = time.time()
...     df = cudf.read_parquet('/tmp/data.parquet')
...     print(f'read_parquet cost {time.time() - start} s')
...     start = time.time()
...     df.to_parquet('/tmp/data_second.parquet')
...     print(f'to_parquet cost {time.time() - start} s')
...
>>> parquet()
write_parquet cost 2.4992175102233887 s
parquet file size 67 MB
read_parquet cost 0.03415870666503906 s
to_parquet cost 0.05026721954345703 s
>>>
>>> parquet()
write_parquet cost 2.4414985179901123 s
parquet file size 67 MB
read_parquet cost 0.034529924392700195 s
to_parquet cost 0.05629706382751465 s
>>>

#################################
arrow table size 1678 MB

arrow   -> cudf         0.39    s

arrow   -> feather      0.84    s
feather -> cudf         1.02    s

arrow   -> parquet      2.4     s
parquet -> cudf         0.034   s  
cudf    -> parquet      0.05    s
  1. It takes too long to output the arrow as parquet. If feather supports gpu read, can we combine the read and write advantages to achieve better conversion performance?
  2. Can the arrow-to-cudf conversion be pre-compressed and then transmitted to the GPU to reduce bandwidth pressure and improve transmission efficiency?
vyasr commented 1 year ago

What about doing arrow->cudf->parquet? Based on your timings that looks to be nearly optimal in terms of both timing and compression.