apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.6k stars 3.54k forks source link

[C++][Parquet] Support nested data conversions for chunked array #32723

Open asfimport opened 2 years ago

asfimport commented 2 years ago

FileReaderImpl::ReadRowGroup fails with "Nested data conversions not implemented for chunked array outputs". It fails on ChunksToSingle

Data schema is: 


  optional group fields_map (MAP) = 217 {
    repeated group key_value {
      required binary key (STRING) = 218;
      optional binary value (STRING) = 219;
    }
  }
fields_map.key_value.value-> Size In Bytes: 13243589 Size In Ratio: 0.20541047
fields_map.key_value.key-> Size In Bytes: 3008860 Size In Ratio: 0.046667963

Is there a way to work around this issue in the cpp lib?

In any case, I am willing to implement this, but I need some guidance. I am very new to parquet (as in started reading about it yesterday).

 

Probably related to: https://issues.apache.org/jira/browse/ARROW-10958

Reporter: Arthur Passos Assignee: Arthur Passos

Related issues:

Note: This issue was originally created as ARROW-17459. Please see the migration documentation for further details.

asfimport commented 2 years ago

Will Jones / @wjones127: I haven't tried this, but perhaps GetRecordBatchReader instead will work https://github.com/wjones127/arrow/blob/895e2da93c0af3a1525c8c75ec8d612d96c28647/cpp/src/parquet/arrow/reader.h#L165

It sounds like there are some code paths that do work and some that don't.

asfimport commented 2 years ago

Arthur Passos: @wjones127 at a first glance, it seems to be working. The client code I had was something like the below:


std::shared_ptr<arrow::Table> table;
arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, column_indices, &table);
if (!read_status.ok())
    throw ParsingException{"Error while reading Parquet data: " + read_status.ToString(), ErrorCodes::CANNOT_READ_ALL_DATA};
++row_group_current;

 

Now it's the below:


std::shared_ptr<arrow::Table> table;

std::unique_ptr<::arrow::RecordBatchReader> rbr;
std::vector<int> row_group_indices { row_group_current };
arrow::Status get_batch_reader_status = file_reader->GetRecordBatchReader(row_group_indices, column_indices, &rbr);

if (!get_batch_reader_status.ok())
throw ParsingException{"Error while reading Parquet data: " + get_batch_reader_status.ToString(), ErrorCodes::CANNOT_READ_ALL_DATA};

arrow::Status read_status = rbr->ReadAll(&table);

if (!read_status.ok())
throw ParsingException{"Error while reading Parquet data: " + read_status.ToString(), ErrorCodes::CANNOT_READ_ALL_DATA};

++row_group_current;

 

Question: Should I expect any regressions or different behaviour by changing the code path to the latter?

 

asfimport commented 2 years ago

Arthur Passos: I am also trying to write test to cover this case, but failing to do so. For some reason, the files I generate with the very same schema and size don't get chunked while reading it. The original file was provided by a customer and it's confidential data, so it can't be used.

 

All the files I generated contain the above mentioned schema. The differences are in the data length. Some had maps of 50~300 elements with keys of random strings of 20~50 characters and values of random strings of 50~5000 characters. I also tried a low cardinality example and a large string example (2^30 characters).

 

I'd be very thankful if someone could give me some tips on how to generate a file that will trigger the exception.

asfimport commented 2 years ago

Will Jones / @wjones127: Hi Arthur,

Here's a simple repro I created in Python:


import pyarrow as pa
import pyarrow.parquet as pq

arr = pa.array([[("a" * 2**30, 1)]], type = pa.map_(pa.string(), pa.int32()))
arr = pa.chunked_array([arr, arr])
tab = pa.table({ "arr": arr })

pq.write_table(tab, "test.parquet")

pq.read_table("test.parquet")
#Traceback (most recent call last):
#  File "<stdin>", line 1, in <module>
#  File "/Users/willjones/mambaforge/envs/notebooks/lib/python3.10/site-#packages/pyarrow/parquet/__init__.py", line 2827, in read_table
#    return dataset.read(columns=columns, use_threads=use_threads,
#  File "/Users/willjones/mambaforge/envs/notebooks/lib/python3.10/site-#packages/pyarrow/parquet/__init__.py", line 2473, in read
#    table = self._dataset.to_table(
#  File "pyarrow/_dataset.pyx", line 331, in pyarrow._dataset.Dataset.to_table
#  File "pyarrow/_dataset.pyx", line 2577, in pyarrow._dataset.Scanner.to_table
#  File "pyarrow/error.pxi", line 144, in pyarrow.lib.pyarrow_internal_check_status
#  File "pyarrow/error.pxi", line 121, in pyarrow.lib.check_status
#pyarrow.lib.ArrowNotImplementedError: Nested data conversions not implemented for chunked array outputs
asfimport commented 2 years ago

Arthur Passos: @wjones127 Thank you for sharing this!

 

While your GetRecordBatchReader suggestion works for the use case I shared, it won't work for this one. Are there any docs I could read to understand the internals of arrow lib in order to implement it? Any tips would be appreciated.. The only thing that comes to mind right now is to somehow build a giant array with all the chunks, but it certainly has a set of implications.

asfimport commented 2 years ago

Will Jones / @wjones127: We have a section of our docs devoted to developer setup and guidelines. And we have documentation describing the Arrow in-memory format (it may be worth reviewing the structure of nested arrays, for example). For the internals of the Parquet arrow code, it's best to read through the source headers at {}cpp/src/parquet/arrow/{}.

asfimport commented 2 years ago

Arthur Passos: Hi @emkornfield. I see you are one of the authors of https://github.com/apache/arrow/pull/8177. I see the following snippet was introduced on that PR:


      // ARROW-3762(wesm): If item reader yields a chunked array, we reject as
      // this is not yet implemented
      return Status::NotImplemented(
          "Nested data conversions not implemented for chunked array outputs");

I wonder why this wasn't implemented. Is there a techinical limitation or the approach wasn't very well defined?

I am pretty new to Parquet and to arrow library, so it's very hard to me to reason about all of these concepts and code. From the top of my head, I got a couple of silly ideas:

  1. Find a way to convert a ChunkedArray into a single Array. That requires a processing step that allocates a contiguous chunk of memory big enough to hold all chunks. Plus, there is no clear interface to do so.
  2. Create a new ChunkedArray class that can hold ChunkedArrays. As of now, it can only hold raw Arrays. That would require a LOT of changes in other arrow  classes and, of course, it's not guaranteed to work.
  3. Make the chunk memory limit configurable (not sure it's feasible)

    Do you see any of these as a path forward? If not, what would be the path forward?

asfimport commented 2 years ago

Micah Kornfield / @emkornfield:

  1. ChunkedArrays have a Flatten method that will do this but I don't think it will help in this case. IIRC the challenge in this case is that parquet only yields chunked arrays if the underlying column data cannot fit into the right arrow structure. In this case for Utf8 arrays it means the sum of bytes across all strings has to be less then INT_MAX length. Otherwise it would need to flatten to LargeUtf8 which has implications for schema conversion. Structs and lists always expected Arrays as their inner element types and not chunked arrays.
  2. doesn't necessarily seem like the right approach.
  3. Per 1, this isn't really the issue I think. The approach here that could work (I don't remember all the code paths) is to vary the number of rows read back if not all rows are huge).

One way forward here could be to add an option for reading back arrays to always use the Large* variant (or maybe on a per column basis) to avoid chunking.

asfimport commented 2 years ago

Arthur Passos: @emkornfield thank you for your answer. Can you clarify what you mean by "read back arrays to always use the Large* variant"? I don't know what "back array" and "large variant" refer to, tho I can especulate what the latter means.

asfimport commented 2 years ago

Micah Kornfield / @emkornfield: i.e. LargeBinary, LargeString, LargeList these are distinct types that use int64s to represent offsets instead of int32

asfimport commented 2 years ago

Arthur Passos: @emkornfield if I understand correctly, this could help with the original case I shared. In the case @wjones127 shared, where he creates a ChunkedArray and then serializes it, it wouldn't help. Is that correct?

I am stating this based on my current understanding of the inner workings of arrow: The ChunkedArray data structure will be used in two or more situations: 

  1. The data in a row group exceeds the limit of INT_MAX (Case I initially shared)
  2. The serialized data/ table is a chunked array, thus it makes sense to use a chunked array.

 

edit:

I have just tested the snippet shared by Will Jones using type = pa.map_(pa.large_string(), pa.int64()) instead of type = pa.map_(pa.string(), pa.int32()) and the issue persists. 

 

asfimport commented 2 years ago

Micah Kornfield / @emkornfield: Yes, I think there are some code changes, we hard-code non large BinaryBuilder for accumulating chunks and then used when decoding arrow.

To answer your questions, I don't think the second case applies. As far as I know Parquet C++ does its own chunking and doesn't try to read back the exact chunking that the values are written with.

asfimport commented 2 years ago

Arthur Passos: I am a bit lost rn. I have made some changes to use LargeBinaryBuilder, but there is always an incosistency that throws an exception. Are you aware of any place in the code where instead of taking the String path it would take the LargeString path? I went all the way back to where it reads the schema in the hope of finding a place I could change the DataType from STRING to LARGE_STRING. Couldn't do so.

asfimport commented 2 years ago

Micah Kornfield / @emkornfield: You would have to follow this up the stack from the previous comments. Without seeing the stack trace it is a bit hard to give guidance, but i'd guess there are few places that always expected BinaryArray/BinaryBuilder in the linked code and might down_cast, these would need to be adjusted accordingly.

asfimport commented 2 years ago

Arthur Passos: @emkornfield I have changed a few places to use LargeBinary/LargeString and also commented out this type assertion. After that, I am able to read the parquet file. Would a PR that forces the use of LargeBinary/LargeString by default be acceptable? Plus, if you have any tips on how to work around that assertion without commenting it out, that would be great.

asfimport commented 2 years ago

Micah Kornfield / @emkornfield: [~arthurpassos] awesome, nice work. IMO, I don't think we can change the default to LargeBinary/LargeString, as you can see based on the assertion there is an expectation that types produced match the schema. Also for most use-cases they aren't necessary, and require extra memory (and might be less well supported in other implementations).

I think the right way of approach this is to have an option users can set (maybe one for each type) that will work on two levels:

  1. Translate any non-large types in the schema to their large variants.
  2. Make the changes at the decoder level that you have already done.

So we keep the assertion but if users run into this issue we can provide guidance on how to set this.

asfimport commented 2 years ago

Arthur Passos: I see. That seems like a long journey for a non arrow developer / parquet expert to go through. Given the timeline I am working on, in the short term, I think I'll resort to the first suggestion by @wjones127. While it doesn't fix the second case, it fixes the one I originally shared. Which makes me curious, why does that fix the Map<String, String> but doesn't fix the one generated by the above script?

asfimport commented 2 years ago

Micah Kornfield / @emkornfield: Its probably a case of different batch sizes.

suppose you have 100 rows that take have 3GB of string evenly distributed. If you try to read all 100 rows it will overflow and create a ChunkedArray. If you read 50 rows at a time it would be an issue because chunking wouldn't be necessary.

asfimport commented 2 years ago

Arthur Passos: Hi @wjones127 . I have implemented your suggestion of GetRecordBatchReader and, at first, things seemed to work as expected. Recently, an issue regarding parquet data has been reported and reverting it to the ReadRowGroup solution seems to address this. This might be a misuse of the arrow library on my side, even though I have read the API docs and it looks correct.

 

My question is pretty much: should there be difference in the output when using the two APIs?

arthurpassos commented 1 year ago

Hi @emkornfield. I am the OP of this issue and I came across this again, so I think it's time work on this. I have re-implemented your suggestion to use LARGE* variants of String and Binary types in order to avoid chunking, see https://github.com/arthurpassos/arrow/pull/1/files. Ofc this is just a hard-coded version to validate it fixes the issue, apparently it did.

Based on that, I have a couple of questions:

  1. Is that still your recommendation to address this issue?
  2. You have mentioned this should be backed by a setting, can you point to any examples on how to create a setting?
  3. Is memory limit the only thing that would lead to chunking? I wonder if there are other reasons. In that case, this fix wouldn't completely solve the problem.
emkornfield commented 1 year ago
  1. Yes, I think so.
  2. I think ArrowReaderPropertires is probably where this belongs. For per column settings you can probably find inspiration from ParquetProperties (global might be fine for an initial implementation.
  3. IIRC its not really memory limit as much as it is a limitation of the underlying address space of the Binary/String arrays which allow for at most 2GB of data in a row group. I don't recall the code well enough to know if there are other edge cases that you might encounter, but i think this would solve most issues.
arthurpassos commented 1 year ago
  1. Yes, I think so.
  2. I think ArrowReaderPropertires is probably where this belongs. For per column settings you can probably find inspiration from ParquetProperties (global might be fine for an initial implementation.
  3. IIRC its not really memory limit as much as it is a limitation of the underlying address space of the Binary/String arrays which allow for at most 2GB of data in a row group. I don't recall the code well enough to know if there are other edge cases that you might encounter, but i think this would solve most issues.

Cool, thanks. I have updated the draft PR with some refactorings, but it's no longer working. I suspect it's related to the dictionary encondig / decoding classes, they seem to be hard-coded to int, which might not work for LARGE* variants. Do you know if it's necessary to have the 64 bit version of dictionaries?

arthurpassos commented 1 year ago
  1. Yes, I think so.
  2. I think ArrowReaderPropertires is probably where this belongs. For per column settings you can probably find inspiration from ParquetProperties (global might be fine for an initial implementation.
  3. IIRC its not really memory limit as much as it is a limitation of the underlying address space of the Binary/String arrays which allow for at most 2GB of data in a row group. I don't recall the code well enough to know if there are other edge cases that you might encounter, but i think this would solve most issues.

Cool, thanks. I have updated the draft PR with some refactorings, but it's no longer working. I suspect it's related to the dictionary encondig / decoding classes, they seem to be hard-coded to int, which might not work for LARGE* variants. Do you know if it's necessary to have the 64 bit version of dictionaries?

Enconding / decoding code is huge & somewhat complex, it would be great if I could skip changing that. Tons of changes and I am kind of afraid of introducing bugs.. https://github.com/arthurpassos/arrow/blob/main/cpp/src/parquet/encoding.h https://github.com/arthurpassos/arrow/blob/main/cpp/src/parquet/encoding.cc

mapleFU commented 1 year ago

I guess you can split the patch into multiple parts, and support read from Decoder at one of them?

lhoestq commented 3 months ago

Experiencing this issue on datasets like https://huggingface.co/datasets/mlfoundations/MINT-1T-HTML that we can't read in python

File "pyarrow/_parquet.pyx", line 1587, in iter_batches
File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status
    pyarrow.lib.ArrowNotImplementedError: Nested data conversions not implemented for chunked array outputs