apache / arrow

Apache Arrow is the universal columnar format and multi-language toolbox for fast data interchange and in-memory analytics
https://arrow.apache.org/
Apache License 2.0
14.68k stars 3.56k forks source link

[C++] Add Byte Range to CSV Reader ReadOptions #32590

Open asfimport opened 2 years ago

asfimport commented 2 years ago

Sometimes it's desirable to just read a portion of a CSV. The best way to do that is to pass in a list of byte ranges to CSV read options that specify where in the CSV you want to read. These byte ranges don't necessarily have to be aligned on line break boundaries, the CSV reader should just read until the end of the line, and skip anything before the first line break in a byte range.  

Based on discussion, the scope is going to be reduced here. The first implementation will support a single byte range that is already assumed to be aligned on byte boundaries.

Will not handle quotes/returns and other edge cases.

Reporter: Ziheng Wang / @marsupialtail

PRs and other links:

Note: This issue was originally created as ARROW-17313. Please see the migration documentation for further details.

asfimport commented 2 years ago

Weston Pace / @westonpace: For additional motivation, this overlaps with the way Substrait expresses partitioning information. Substrait allows any file type to include "start byte" and "length" to slice the file. For file types like parquet & IPC this would involve grabbing all row groups whose first byte falls in that range (even though this may mean reading beyond the end of the specified range). The advantage is that there is then a uniform API for partitioning files across formats.

Another advantage here is that this would allow us to potentially parallelize chunking at a minor cost of overreading a bit for each block. This overreading could be avoided if we knew we were going to read multiple blocks. For example, if we know we want to read blocks 20-30 then we issue reads for blocks 20-31. As soon as any two consecutive blocks are loaded we can start parsing the lower block of the pair.

So the algorithm for each block boils down to:

Although...now that I type this up...I remember a potential flaw in this logic. Finding the "first line delimiter" in a block can be an impossible problem if newlines are allowed inside of delimiters. Though maybe we don't need to support that case, I don't recall.

CC @pitrou for additional thoughts

asfimport commented 2 years ago

Ziheng Wang / @marsupialtail: My proposal is that we will allow additional fields in ScanOptions that specifies the byte ranges to read for each fragment. 

Those byte ranges will be updated when you are calling OpenReaderAsync to be aligned to line breaks, potentially in another async function that samples the file around the byte range boundaries and figure out where the line breaks are. 

Then these aligned byte ranges will be used to create a MaskedRandomAccessFile object, which is a new object that resembles the RandomAcessFile object with all of its interface, except it will skip bytes that it's not supposed to read using seek. This will not read those skip-bytes on either disk or network based files. 

We pass this MaskedRandomAccessFile object to make a BufferedInputStream and a StreamingReader without any further change in code. The CSV StreamingReader has no idea that it is only reading partial chunks in the underlying file.

The alternative to deal with this in the CSV StreamingReaderImpl. However this is very complicated as it can only access a BufferedInputStream which is not seekable. Adding seek functionality to InputStream probably doesn't make sense when the underlying InputStream is not a file. 

asfimport commented 2 years ago

Yibo Cai / @cyb70289: I have same concern as Weston. CSV parsing is stateful. AFAIK, figuring out the line break has to be done in sequential, if we support "quote" or "escape" or custimized delimiter, etc. Some examples:

asfimport commented 2 years ago

Ziheng Wang / @marsupialtail: I think if you support things like this, then it's impossible to parallel read a CSV file.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: It's not too late to change the Substrait spec, is it?

Or we can raise NotImplemented if the offset is ever-non-zero.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: That said, it can also be implemented in a "dumb" way: call RandomAccessFile::GetStream to get a subset of the input file, and let the CSV reader try to make sense of it.

asfimport commented 2 years ago

Ziheng Wang / @marsupialtail: @pitrou can you elaborate a bit on your way?

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: There's not much to elaborate. RandomAccessFile::GetStream gives you a stream over a subrange of a file. See API docs: https://arrow.apache.org/docs/cpp/api/io.html#_CPPv4N5arrow2io16RandomAccessFile9GetStreamENSt10shared_ptrI16RandomAccessFileEE7int64_t7int64_t

asfimport commented 2 years ago

Ziheng Wang / @marsupialtail: Ah I meant what we should do about the linbreaks and quotes etc.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: Nothing. The Substrait producer should produce valid ranges. There is no reason to accept random invalid ranges.

asfimport commented 2 years ago

Weston Pace / @westonpace:

It's not too late to change the Substrait spec, is it? Or we can raise NotImplemented if the offset is ever-non-zero.

Raising "not implemented" in this case is fine I'm sure. If it can't be done then it can't be done. Perhaps we can avoid most of these cases by reading a little bit (e.g. 32 bytes) before the beginning of the block as well.

The sample block starts inside a "quoted" field

I think this is only a problem if we allow newlines in values. We should reject a partial read if newlines_in_values is false.

The first char of a block is "\n" but the last char of previous block is an "escape"

Reading a bit early would help here as long as it isn't a really long chain of escapes which should be rare and detectable (we could error in this case).

Sample at middle of "\r\n" may also be confusing

Reading a bit early would help here too.

asfimport commented 2 years ago

Weston Pace / @westonpace:

We should reject a partial read if newlines_in_values is false.

Oops, I mean, if newlines_in_values is true :)

asfimport commented 2 years ago

Ziheng Wang / @marsupialtail: Also this will not support compressed formats, at least at the start

asfimport commented 2 years ago

Ziheng Wang / @marsupialtail: Ideally we update the Dataset Scanner to be able to take in different byte ranges for different fragments. Or is this not required?

A complication would be that fragments right now don't seem to have some sort of "ID", so it might be hard for a user to specify which fragments should read which byte ranges. The way to do this would be to let the user pass in a dict in the ScanOptions that's something like {file_path1: byte_range1, file_path2: byte_range2}. I think this would make sense.

Alternatively if this is not going to be supported, then this option ideally should only make sense for a dataset with one fragment. Perhaps I'll just add a check in the FragmentsToBatches function or something.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: Ok, so perhaps byte ranges should actually be provided by the partitioning scheme ? Most partitioning schemes would only emit trivial byte ranges (i.e. the entire file), but the Substrait partitioning would emit whatever is in the Substrait plan.

Of course, it's up to the Substrait producer to ensure that these byte ranges actually make sense for the given data format. We shouldn't ask the file format implementations (for example CSV, Parquet or Orc) to accept innacurate byte ranges.

asfimport commented 2 years ago

Weston Pace / @westonpace: I think the FileFragment would be a good place for this. For example, I could imagine something like...


import pyarrow.dataset as ds
# Discovery happens here.  After this line we will have a
# FileSystemDataset and each FileFragment will be the
# entire file
my_dataset = ds.dataset("/var/data/my_dataset")
# Splits the dataset into 32 partitions.  Each one is
# still a FileSystemDataset with FileFragments but
# now the FileFragments may have slicing information
my_datasets = my_dataset.partition(32)

We shouldn't ask the file format implementations (for example CSV, Parquet or Orc) to accept innacurate byte ranges.

I think there are (at least) two options here. The partitioned ranges could be byte ranges without any knowledge of the format. This is easy to create but means the file format would need to be able to map a byte range to some readable range. For example, if a user has 10 parquet files, each 10GiB large, with 10 equal sized row groups and we want to divide it into 32 partitions then the partitions would look like:

File 0: Bytes 0 to 33554432 File 0: Bytes 33554432 to 67108864 ...

However, the row group boundaries would be 0, 107374182, 214748365, 322122547, 429496730. So, in the case, the parquet file format would adapt bytes 0 - 33554432 to row groups 0, 1, 2 (since the first byte falls in the requested range) even though this actually represents a slightly larger than requested partition (0 - 429496730).

Approach 2. As an alternative approach we could expect the producer to know the details of the file format. In this case the partitions would probably be best expressed in terms that make sense for the format. A "parquet partitioner" would specify a list of files with a list of row groups for each file. An Orc partitioner would give a list of stripes. A CSV partitioner would still need to use byte ranges.

With this approach I think you end up needing ParquetFileFragment, OrcFileFragment, etc. (although you could maybe get by with just RowGroupFileFragment, accepted by the grouped formats and ByteRangeFileFragment, accepted by the text formats).

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: The intent of datasets has always been that each file format defines its own granularity for reading files. I don't understand why the consumer would specify byte ranges by hand.

@bkietz What is your opinion on this?

asfimport commented 2 years ago

Weston Pace / @westonpace: Would it help to think of these not as byte ranges but as percentages? I'm pretty sure the goal is just to be able to split a scan specification into subtasks. They could then be divided amongst processes, divided amongst servers, or simply run piecemeal so that partial success and retry is simpler (I think this might be @marsupialtail's end goal).

"Repartition the data into smaller files" should always work but I don't know that this is always an acceptable option.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: What I strive to understand is why the Substrait producer would be in a better position to choose the file fragment granularity rather than the query engine implementation (or, in the Arrow C++ case, the dataset subsystem).

asfimport commented 2 years ago

Weston Pace / @westonpace: I don't think I've been explaining myself well. Let's imagine a worst-case (though not unheard of) scenario where a user has a single 10GiB file, stored in S3, that they want to scan using 4 different EC2 containers. Using the current datasets API this would be impossible to do unless that file happens to be parquet (since we do have ParquetFileFormat and split_row_groups). I'd like a solution that I can use regardless of the format.

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: Hmm... so this is partitioning on the client side by sharding the queries to different compute servers?

asfimport commented 2 years ago

Weston Pace / @westonpace: Yes. I think the original Substrait use case was based on Spark's implementation (linking to https://github.com/substrait-io/substrait/pull/102) but I didn't ask too many details.

Iceberg has something kind of similar. In their manifest they have a list of data files. Each data file has a list of split offsets. These are byte indices where the file could be split. That sort of approach could be interesting. FileFragment isn't persistable today but we could easily add split offsets when discovering parquet, IPC, ORC today. Plus, there could be a boolean to scan CSV files during discovery to discover line breaks (probably debounced by some block size) and record those as split offsets. That would solve the "need to know the right spot to split" problem for CSV (at the cost of a more expensive discovery).

asfimport commented 2 years ago

Antoine Pitrou / @pitrou: Well, if arbitrary byte ranges need to be supported, then some scanning will be required when starting to read the CSV file. And we need to error out if allow_newlines_in_values is true.

An additional complication: what happens with e.g. a .csv.gz file?

asfimport commented 2 years ago

Ziheng Wang / @marsupialtail: There is no physical way you can do this with a .csv.gz file – you can't decompress starting at a random byte, so the interface can be supported but the implementation won't skip scanning bytes and will be less efficient.

I am in favor of adding a file format specific option in the FIleFragment class to denote how the byte range / row group range is specified. This option could take different values for different types of files and each file type reader can interpret accordingly. This might avoid the need for multiple file classes.

asfimport commented 2 years ago

Weston Pace / @westonpace: That sounds good to me. Even if we end up later unifying everything in a common "byte ranges" or "percentages" API I don't think there is any harm in also having format-specific APIs. Plus, having the format-specific APIs should simplify adoption of a common API if we decide to go that route.

asfimport commented 2 years ago

David Li / @lidavidm: ARROW-17159 is a very similar issue, except motivated by Parquet/Spark

asfimport commented 2 years ago

Weston Pace / @westonpace: There is also ARROW-15589, which I had referenced above, also motivated from Spark (but via Substrait).

asfimport commented 1 year ago

Apache Arrow JIRA Bot: This issue was last updated over 90 days ago, which may be an indication it is no longer being actively worked. To better reflect the current state, the issue is being unassigned per project policy. Please feel free to re-take assignment of the issue if it is being actively worked, or if you plan to start that work soon.