Closed connec closed 1 month ago
I haven't dug into the implementation, but I imagine it becomes harder to find the right split point for multi-threaded reading
That is correct, DataFusion parallel CSV scan does:
arrow-rs
handle reading a valid file byte range.So I think the first step will be to add arrow support
I see, so an approach could be to:
Add newlines_in_values: bool
to arrow_csv::reader::Format
. The implementation could use this to consume terminators within quotes.arrow-csv
operates in this way by default.
Add newlines_in_values: bool
to datafusion::config::CsvOptions
(and datafusion::datasource::file_format::csv::CsvFormat
). The implementation could use this in two ways:
newlines_in_values
flag on to csv-arrow
.Turns out arrow-csv
does correctly parse newlines in quoted values, so the issue comes from reading CSVs in parallel. Limiting the target number of partitions to 1 solves the issue.
For example, if I add this to my code before I run ctx.sql("SELECT * FROM <csv including linebreaks>")
then the query executed successfully (without it there are errors about wrong numbers of columns):
ctx.state_weak_ref()
.upgrade()
.unwrap()
.write_arc()
.config_mut()
.options_mut()
.execution
.target_partitions = 1;
From a UX perspective, this setting is quite disconnected from the intent. It also required changing the session-level target_partitions
setting, which (I assume, untested) would adversely affect reads of CSVs that might not be expected to contain linebreaks. A per-CSV newlines_in_values
setting would help to signpost this situation and allow more efficient plans when only some executed CSVs have this requirement.
Turns out
arrow-csv
does correctly parse newlines in quoted values, so the issue comes from reading CSVs in parallel. Limiting the target number of partitions to 1 solves the issue.
Thank you for the investigation @connec , if that's the case, setting datafusion.optimizer.repartition_file_scans
to false
might help: https://datafusion.apache.org/user-guide/configs.html
IIRC this option will only limit file scan to one thread, other parts of the plan will still be parallelized using target_partitions
Thanks for the pointer! That would certainly be better than disabling parallelization throughout.
Do you think it would still be desirable to be able to turn off file-level partioning for CSVs only (and/or to vary it on a per-type basis)?
I'm not familiar enough with Parquet to understand if there would be a functional reason to disable it for that format?
Furthermore, I think there could be value in controlling this on a per-file basis. E.g. if you're working with multiple CSVs, one of which you know contains newlines in values and others that you know do not (and so can benefit from parallelisation).
Do you think it would still be desirable to be able to turn off file-level partioning for CSVs only (and/or to vary it on a per-type basis)?
I'm not familiar enough with Parquet to understand if there would be a functional reason to disable it for that format?
Yes, using the global option is only a quick fix.
I think it's a good idea to add an extra option field for individual files, and later the planner can make better decisions based on per file's newlines_in_values
property (like for now disable parallel execution)
And it can be CSV-specific, parquet seems not relevant here.
I've opened a PR with the most straightforward implementation of this I could think of. I'd be glad to receive any feedback on that approach.
Is your feature request related to a problem or challenge?
I'm trying to read CSVs that include newlines in (quoted) values.
Describe the solution you'd like
Some googling revealed that this isn't supported currently by the
arrow-csv
crate, whereas that functionality does exist in the C++ (ParseOptions::newlines_in_values
) and Python (ParseOptions.newlines_in_values
) implementations.Ideally, a
newlines_in_values
field could be added todatafusion::common::config::CsvOptions
to support this functionality.Note that the Python docs call out the performance implications of this:
I haven't dug into the implementation, but I imagine it becomes harder to find the right split point for multi-threaded reading (though, it seems not dissimilar to finding the prev/next linebreak, so perhaps not insurmountable...).
Describe alternatives you've considered
The only alternative I can see would be to preprocess the CSV before feeding it into DF. I haven't explored this option as I imagine it would take a lot of DF plumbing, and it seems valuable to have parity with other arrow CSV packages (C++ and Python, at least).
Additional context
I was originally planning to report this against the
arrow-rs
repository, but since my use-case is withdatafusion
I decided to report it here. Let me know if this issue would be more appropriate there and I can move/copy it.