apache / datafusion

Apache DataFusion SQL Query Engine
https://datafusion.apache.org/
Apache License 2.0
6.23k stars 1.18k forks source link

Improve Parallel Reading (CSV, JSON) / Help Wanted #8723

Open marvinlanhenke opened 10 months ago

marvinlanhenke commented 10 months ago

Is your feature request related to a problem or challenge?

As originally stated in #6922 (I'm not sure why the issue was closed) and discussed in #6801 the current FileOpener implementation for both, Csv and Json, are utilizing multiple GetRequests to adjust the byte range prior to parsing / reading the file itself.

This is suboptimal and can be improved - minimizing the latency due to multiple remote network requests.

Describe the solution you'd like

I would like to reduce the number of GetRequests from 3 to 1.

This can be done by "overfetching" the original partition byte range; and then adjust the range by finding the newline delimiter similar to the solution already implemented.

The approach is outlined here: https://github.com/apache/arrow-datafusion/pull/6801#discussion_r1257465786 by @alamb

There are some edge-cases that need consideration, like "heterogenous object sizes" within a CSV row or JSON object, that leads to partition ranges overlapping on the same line, which can lead to reading the same line twice. Error handling/ retry when no newline can be found ("overfetching" range was to small) has to be handled, as well.

POC:

I already went ahead and implemented a POC which works and can handle some edge-cases like overlapping partition ranges; appropriate error handling / retry is still missing.

However, I definitely need help to improve upon this: https://github.com/marvinlanhenke/arrow-datafusion/blob/poc_optimize_get_req/datafusion/core/src/datasource/physical_plan/json.rs#L232-L381

The solution is inefficient due to line-by-line operations and buffer cloning / copying. I tried different ways to handle the GetResultPayload::Stream by using BytesMut::new() & buffer.extend_from_slice; but I was not able to handle all the edge-cases correctly.

I'd greatly appreciate if someone can give some pointers; or take it from here to improve upon the POC.

Describe alternatives you've considered

Leave as is.

Additional context

None.

marvinlanhenke commented 10 months ago

cc @alamb : as discussed in #6801

...happy if you have any pointers on how to improve the POC.

alamb commented 10 months ago

Thanks for the heads up @marvinlanhenke -- I'll check this out over the next day or two. Much appreciated

cc @devinjdangelo and @tustvold if you are interested

alamb commented 10 months ago

I didn't have a chance to review https://github.com/marvinlanhenke/arrow-datafusion/blob/poc_optimize_get_req/datafusion/core/src/datasource/physical_plan/json.rs#L232-L381 in fine detail, but in general I think that code is looking very hard to navagate / test as it is so deeply nested in futures / closures.

My suggestion is to try and extract the logic somehow into a structure that is easier to test and reason about.

For example, maybe you could create a struct like

struct StreamingJsonReaderBuilder {
 ...
}

impl StreamingJsonReaderBuilder {
  fn new(..) {}
  fn with_range(mut self, range: ..) ...
  fn build () -> SendableRecordBatchStream
}

And the you could start writing tests like

let object_store = ...;
let input = StreamingJsonReaderBuilder::new(object_store.read())
  .with_range(Range::new(100, 200))
  .build;

let batches = collect(input)
assert_batches_eq(..)

That might not be the right structure, but I am trying to give you the flavor of what encapsulating the complexity might look like

marvinlanhenke commented 10 months ago

That might not be the right structure, but I am trying to give you the flavor of what encapsulating the complexity might look like

@alamb ...thanks, this might already help; guiding me on the right track - I was trying not "to put to much effort into the POC"; however this might have been the wrong decision since I kinda hit a roadbloack due to not being able to effectively reason about the code.

I will look into you suggestion and try to do some refactoring in order to understand what is and what should be happening at this point.

thanks again.

alamb commented 9 months ago

I think this was accidentally closed so I am reopening it. I am happy to close it again if I missed something