spark-root / laurelin

Allows reading ROOT TTrees into Apache Spark as DataFrames
BSD 3-Clause "New" or "Revised" License
8 stars 4 forks source link

Support for LIMIT pushdown #84

Open ghislainfourny opened 4 years ago

ghislainfourny commented 4 years ago

It seems that when trying to read the first N rows, Laurelin materializes the entire table before applying the LIMIT, e.g.:

SELECT * FROM the-name-i-give-to-my-df LIMIT 10

Are there plans to push down the LIMIT to read less from disk? I would expect this query to be very fast even with a very large file.

Thanks!

PerilousApricot commented 4 years ago

Hi Ghislain,

I do intend to add support for predicate pushdown, but I'm surprised by Spark's behavior in this case -- I would think it would only materialize a single partition since 10 is less than the number of events in a single partition. That being said, a quick glance at the filter API* and the DataSource API, I don't see any place where Spark would inform me of the # of rows requested in the DataSourceV2 API...

I presume the builtin parquet/json/csv data sources are doing what you expect and only loading a single partition? If that's the case, I'll need to try and step through how they're handling it and see what I'm missing. One wrinkle is that previously the builtin file data sources didn't use the published DataSourceV1 API and instead accessed internal Spark classes directly, so they had different capabilities/performance than external plugins using the API. Spark >= 2.4.X defaults to using the DataSourceV2 implementations, but there are bits of fallback logic that appear to let spark fall back to the older implementations. So, it might be possible that DSv2 is simply missing the functionality needed for Spark to tell a DataSource how many rows it wants back.

Thanks for the report! I'll look into what's happening... Andrew

* https://github.com/apache/spark/blob/9f42be25eba462cca8148ce636d6d3d20123d8fb/sql/catalyst/src/main/scala/org/apache/spark/sql/sources/filters.scala

PerilousApricot commented 4 years ago

I actually realized another option -- the Parquet reader supports both row-based and vectorized reads, and down at the very bottom, the API has a get() call which is called by spark when it wants more data. In the row-based API, the return value is InternalRow (which is a single row), but in the vectorized API, the return value is ColumnarBatch (which is currently all 200k rows in the partition).

If I can confirm what's happening, I'll try to get an explanation from the Spark devs and see if there's a way to extend the API to better support LIMIT (and COUNT for that matter). Fortunately the DSv2 API is still considered experimental, so they should hopefully be amenable to including small fixes like that

ghislainfourny commented 4 years ago

Thanks, Andrew!

PerilousApricot commented 4 years ago

I think that if implement org.apache.spark.sql.connector.read.SupportsReportStatistics, which provides the # of rows and # of bytes, spark will have enough to know upfront the number of rows in a scan, which will let it limit the partitions read

PerilousApricot commented 4 years ago

I asked the mailing list yesterday -- http://apache-spark-user-list.1001560.n3.nabble.com/Optimizing-LIMIT-in-DSv2-td37166.html . I'll report back if there's any reply, but longer term, I'm planning on implementing the various DataSourcev2 mixins, so that functionality should come up soon

ghislainfourny commented 4 years ago

Thanks a lot, Andrew. I am looking forward to their answer. It would be wonderful if Spark supported this (for all possible inputs -- except, that is, when the schema has to be inferred).