rapidsai / cudf

cuDF - GPU DataFrame Library
https://docs.rapids.ai/api/cudf/stable/
Apache License 2.0
8.17k stars 879 forks source link

[FEA] `read_csv` context-passing interface for distributed/segmented parsing #11728

Open upsj opened 1 year ago

upsj commented 1 year ago

Is your feature request related to a problem? Please describe. To parse files from a regular language like CSV, we can only safely parse data inside a byte range if we know the parsing context from before this range. Without this information, we may accidentally interpret a record delimiter (newline) inside a quoted field as an actual delimiter.

More formally, when starting from a byte range, we don't know what state the DFA of the token language is in, so we need to store the transition vector starting from every possible state, and combine the vectors by function composition in the associative scan operation. This is pretty much identical to what is happening in the finite state transducer.

Instead of just running the scan on a full file, we can run it only on a byte range, and combine the resulting transition vectors in an exclusive scan over all byte ranges to establish local parsing context.

Describe the solution you'd like I want to propose a slight extension to the read_csv interface to handle this case:

  1. add a class csv_parse_context that opaquely wraps packed_rowctx_t, only exposing the merge_row_contexts functionality to combine the parsing context from adjacent byte ranges, and a finalize operation that turns the transition vector into its value starting from the initial DFA state.
  2. add a read_csv_context function that only scans the local byte range to compute its csv_parse_context transition vector. It can probably take the same parameters as read_csv
  3. add a csv_parse_context initial_parsing_state parameter to csv_reader_options that defaults to the initial state. The read_csv function can then use this initial state to determine record offsets and do the actual parsing.

Describe alternatives you've considered Alternatively, we could implement backtracking by reading chunks before the byte range until we figured out an unambiguous parser state (that is not the error state). This could in the worst case lead to reading the entire prefix up to the byte range.

Additional context This is relevant if we want dask.read_csv to be able to handle quoted record delimiters (i.e. newlines) where the opening quote occurs before the byte range.

The interface has the advantage that it can be tested in isolation on a single node, without having to rely on dask.

The same kind of pattern could also apply to read_json, where on top of the regular parsing state, we also need to pass the stack transition operations from the beginning to the end of the byte range.

GregoryKimball commented 1 year ago

@rjzamora This issue serves as our proposal for the new parsing-context aware CSV reader. We would love to discuss with you (or another Dask stakeholder) in more detail.

rjzamora commented 1 year ago

Nice - I can't say I understand how the transition-vector logic works, but the general plan seems reasonable to me. My current understanding is that the corresponding dask graph would look something like this:

possible-graph

We start by selecting overlapping byte ranges from the dataset (or perhaps they don't need to overlap?). Then we map the read_csv_context logic to each of these local byte-ranges. Then we do an overlapped mapping of the merge_row_contexts logic to update the context for each byte range. Then we do the final read_csv mapping to generate our DataFrame partitions.

Is this the general algorithm you have in mind?

upsj commented 1 year ago

couple of points for completeness:

  1. they must be non-overlapping, but adjacent for this to be correct
  2. "read local context" reads exactly the specified byte range (plus/minus one character to detect whether the byte range starts after a newline)
  3. "combine contexts" is really lightweight, we are basically passing around an integer, so it could happen via "allgather + local exclusive prefix-scan" as well
  4. read_csv may read past the end of the byte range to find the end of the last row inside the range, other than that, everything's non-overlapping
rjzamora commented 1 year ago
  1. they must be non-overlapping, but adjacent for this to be correct
  2. "read local context" reads exactly the specified byte range (plus/minus one character to detect whether the byte range starts after a newline)

Thanks for clarifying. We will likely want to use a distinct IO task to physically pull each byte range from a (possibly remote) dataset. This means a given "read local context" task will only have access to the specific bytes that were read in the IO task it depends on. It seems like we would want to assign non-overlapping byte ranges to the "read local context" tasks, but that the IO tasks they depend on would need to have a bit of overlap (to deal with the "plus/mins one character" issue).

  1. "combine contexts" is really lightweight, we are basically passing around an integer, so it could happen via "allgather + local exclusive prefix-scan" as well

Lightweight sounds good. A map-overlap pattern may still be preferred (if possible), since allgather opperations are not particularly performant in Dask.

upsj commented 1 year ago

This means a given "read local context" task will only have access to the specific bytes that were read in the IO task it depends on.

This sounds like a problematic limitation - if a byte range ends shortly after the start of a long row, your IO task may provide too little data. Are you worried about reading the same data twice here? Maybe we could also buffer that on the libcudf data_source level? To expand some more, read_csv works in two steps:

  1. finding row offsets, this means scanning the byte range for non-quoted/commented out newline characters, as well as finding the next newline after the end of the byte range. The IO here happens in small chunks and will most likely overrun the byte range.
  2. loading everything between the first and last row offset computed by 1. into device memory and actually parsing the rows.

The very worst case I could imagine would be a single dask task's byte range that is completely contained within a row, so the corresponding output is empty, but its predecessor would need to read past that task's byte range.

rjzamora commented 1 year ago

if a byte range ends shortly after the start of a long row, your IO task may provide too little data. Are you worried about reading the same data twice here?

You are correct that it will be possible to "break" the Dask logic by assigning byte ranges that are too small (or by assigning too-little overlap). As far as I understand, this is already potential problem in dask's read_csv. However, in the past the only alternative was to read the entire remote file on every worker.

Now that I think about it, this is no longer the only alternative, because we are now able to perform partial IO from a file opened with fsspec (as a pyarrow NativeFile). Therefore, as long as we can handle a pyarrow NativeFile source, then we can allow those tasks to access any byte from the file.

upsj commented 1 year ago

If we wanted to do this 100% cleanly, we could also extract row offsets in the parse context stage and exchange them between neighboring tasks/workers. Then the initial IO could be limited to the byte range + 1, and the subsequent parsing IO would know exactly which bytes to load a priori

Coming back to the communication patterns:

A map-overlap pattern may still be preferred (if possible), since allgather opperations are not particularly performant in Dask.

Overlaps could potentially work somewhat heuristically in most cases, but I'd be more comfortable with an always safe solution, since the error-proneness across byte range boundaries was the original motivation for this discussion.

rjzamora commented 1 year ago

I'd be more comfortable with an always safe solution

That makes sense. It would be fine for the first pass at the Dask algorithm to just execute a distinct task graph to calculate the correct byte range for each partition, followed by a conventional read_csv graph. The map-overlap pattern would allow us to avoid any eager execution, but my opinion is that eager execution is fine when it is well motivated.