apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.17k stars 1.17k forks source link

Parallel NDSON file reading #8502

Closed alamb closed 9 months ago

alamb commented 10 months ago

Is your feature request related to a problem or challenge?

DataFusion can now automatically read CSV and parquet files in parallel (see https://github.com/apache/arrow-datafusion/issues/6325 for CSV)

It would be great to do the same for "NDJSON" files -- namely files that have multiple JSON objects placed one after the other.

Describe the solution you'd like

Basically implement what is described in https://github.com/apache/arrow-datafusion/issues/6325 for JSON -- and read a single large ND json file (new line delimited file) in parallel

Describe alternatives you've considered

Some research may be required -- I am not sure if finding record boundaries is feasible

Additional context

I found this while writing tests for https://github.com/apache/arrow-datafusion/issues/8451

tustvold commented 10 months ago

This should be simpler than CSV, as NDJSON does not typically permit unescaped newline characters, so it should just be a case of finding the next newline

alamb commented 10 months ago

I think this is a medium difficulty task for a new contributor as the pattern exists and there are tests (e.g. see https://github.com/apache/arrow-datafusion/pull/8505)

JacobOgle commented 10 months ago

@alamb I wouldn't mind digging into this one if its still open

alamb commented 10 months ago

@alamb I wouldn't mind digging into this one if its still open

I just filed it and I don't know of anyone else working on it.

Thanks @JacobOgle

kassoudkt commented 10 months ago

Is it still available ? i wloud love to take it :)

JacobOgle commented 10 months ago

@kassoudkt feel free! I've been a bit tied up lately so if you're free go for it!

marvinlanhenke commented 10 months ago

This should be simpler than CSV, as NDJSON does not typically permit unescaped newline characters, so it should just be a case of finding the next newline

@tustvold @alamb
...out of curiosity, I was digging into this as well. From my understanding (looking at the CSV impl) the FileGroupPartitioner and its method repartition_file_groups are used to create the partitions. However, in this case evenly divided by size.

In order for NDJSON to be split "correctly" (and not in the middle of a JSON Object) the FileGroupPartitioner needs a new method to split on newline? Would this be a reasonable approach? Then only fn repartitioned of trait ExecutionPlan and fn open of trait FileOpener need to be implemented.

Thanks for helping out.

tustvold commented 10 months ago

The way it typically works is the split is based on filesize but the reader is setup such that one of the bounds includes the current partial row, and the other does not. For example the reader starts at the NEXT newline (with special case for first row) and stops when it reaches the end of a line AND the byte position now exceeds the end limit. CSV (and parquet) behave similarly.

This all avoids the planner needing to perform IO, which is pretty important

marvinlanhenke commented 10 months ago

I just realized that I forgot the IO part. Now, I understand the approach better - thanks for the explanation.

alamb commented 10 months ago

In order for NDJSON to be split "correctly" (and not in the middle of a JSON Object) the FileGroupPartitioner needs a new method to split on newline? Would this be a reasonable approach?

Indeed -- I think the relevant code that finds the next bounds is https://github.com/apache/arrow-datafusion/blob/a1e959d87a66da7060bd005b1993b824c0683a63/datafusion/core/src/datasource/physical_plan/csv.rs#L411-L450

alamb commented 10 months ago

BTW this comment might help: https://github.com/apache/arrow-datafusion/blob/6b433a839948c406a41128186e81572ec1fff689/datafusion/core/src/datasource/physical_plan/file_groups.rs#L35-L79

marvinlanhenke commented 10 months ago

@alamb thanks for the pointers.

I already implemented a working solution, however I need to do some refactoring (if my kids let me :P ). I'd also like to extract the common functionality since the NdJson and the CSV implementation are nearly the same; any suggestions where to put those utility functions, like find_first_newline? I think mod.rs would be fine.

alamb commented 10 months ago

arrow-datafusion/datafusion/core/src/datasource/physical_plan/mod.rs sounds like a good idea to me

marvinlanhenke commented 10 months ago

@alamb ...found some time to clean up the changes.

However, I am not sure about properly benchmarking the solution (as stated in the PR) and perhaps some more tests are needed? I am looking forward to your feedback.

alamb commented 10 months ago

However, I am not sure about properly benchmarking the solution (as stated in the PR) and perhaps some more tests are needed? I am looking forward to your feedback.

I think a good test would be to find a largish JSON input file and show some benchmark reading numbers

I don't know of any existing benchmarks we have for reading large JSON files. Maybe we could add a benchmark for reading from large JSON (any CSV?) files in https://github.com/apache/arrow-datafusion/tree/main/benchmarks#datafusion-benchmarks

Something like

bench.sh run parse

That would measure the speed of parsing a large JSON file

marvinlanhenke commented 10 months ago

@alamb

I did some basic benchmarking.

Methodology:

  1. Generated a 60mil rows NDJSON file (~3.7G)
  2. Run tests with datafusion-cli (before / after changes)
  3. create external table json_test stored as json location '/home/ml/data_60m.json';
  4. select * from json_test; & select * from json_test where a > 5;

Results:

query before after
select * from json_test; ~24s ~24s
select * from json_test where a > 5; ~26s ~11s

When applying a filter and explain select * from json_test where a > 5; we can see the repartitioning happening (file_groups: 12).

However, when simply running select * from json_test. File_groups remain at 1 and we get no parallel reading.

I think this issue relates to: #6983 Haven't tested it with a dataframe; however the issue seems to remain, at least for the datafusion-cli (tested with JSON and CSV)

image

alamb commented 10 months ago

select from json_test; & select from json_test where a > 5;

A good test of just the speed of the parsing might be something like

select count(*) from json_test; 
select count(*) from json_test where a > 5;

That will minimize most of the actual query work other than parsing and won't need to try and format / carry through the other columns

marvinlanhenke commented 10 months ago

...the updated result with different queries:

query before after
select count(*) from json_test; ~19s ~6s
select count(*) from json_test where a > 5; ~18s ~8s