Qbeast-io / qbeast-spark

Qbeast-spark: DataSource enabling multi-dimensional indexing and efficient data sampling. Big Data, free from the unnecessary!
https://qbeast.io/qbeast-our-tech/
Apache License 2.0
210 stars 19 forks source link

Support for DatasourceV2: Sampling Pushdown and Limit Pushdown [Spark] #175

Open osopardo1 opened 1 year ago

osopardo1 commented 1 year ago

Related to #166 .

Qbeast-Spark should be compatible with latest versions of Delta Lake and Apache Spark, to benefit from any new features and major upgrades. The change to Delta version 2.1.0 and Spark 3.3.0, reveal a set of interesting Pushdown operation that could be empowered with the Qbeast Metadata.

We should:

Thoughts on #68

osopardo1 commented 1 year ago

In DatasourceV2 there's also the possibility to build your own scan of the table, with more options than the Datasource V1 (which we are currently using).

Maybe it's worth to explore the DV2 API.

alexeiakimov commented 1 year ago

IMHO we need to explore the Datasource V2 API, possibly we will end to drop the V1. To support both can be too much conditional logic.

osopardo1 commented 1 year ago

Yes, I agree. Do you think this can be done in the same PR #167 or it is best to do workaround first for Sampling and Limit Pushdown and migrate everything to V2 in a separate issue?

alexeiakimov commented 1 year ago

Well, I prefer to separate migration to the new versions of Spark/Delta and reworking the QbeastTable on the top of DataSource V2. So migration would mean that everything is compiling and running without new problems. And rework is a complex task, because they changed DataSource SPI a lot although it still has version V2. A good overview of the Spark 3.0 SPI could be found here https://blog.madhukaraphatak.com/categories/datasource-v2-spark-three/

alexeiakimov commented 1 year ago

I would like to share some thought on Spark 3.x.x DataSource API V2.

  1. Surprisingly DataSource API V2 in Spark 2.x and in Spark 3. are different. A good general overview can be found https://blog.madhukaraphatak.com/categories/datasource-v2-spark-three/
  2. The Read API seems straightforward, the mixins for filtering, sampling and limit push down should be implemented for the ScanBuilder, the later passes the filters, sampling and limit to the Scan and Batch. Batch can compute the necessary files and pass them to PartitionReaderFactory. The later creates PartitionReader for each partition (we have just one). PartitionReader returns the table rows one by one like iterator.
  3. The Write API is much more challenging. A possible implementation does not have direct access to the original DataFrame, instead the rows are written by DataWriter one by one. In other words DataWriter is a callback, so there is no explicit notification when the writing starts. It means that we cannot assign weights by transforming the original DataFrame as it is done now. It also means that we have to start transaction lazily when we read the current index.

@osopardo1, @cugni , @Jiaweihu08 Could it make sense to create a temporary DataFrame to copy the data being written, and then to apply the algorithm we use now?

osopardo1 commented 1 year ago

Thank you for the overview!

  1. Very nice, a lot of code can be reused from OTreeIndex, once the filters and everything is pushed down.

  2. One solution for the Writer API is to keep a Fallback to Version 1. It is what we have implemented for the moment. The Writer Builder returns a V1Write, which will create an InsertableRelation, that calls our methods in IndexedTable for indexing and writing the DataFrame. I think we can migrate just Read features at the moment, while we consider moving everything else in the future.

osopardo1 commented 1 year ago

Well, I prefer to separate migration to the new versions of Spark/Delta and reworking the QbeastTable on the top of DataSource V2. So migration would mean that everything is compiling and running without new problems. And rework is a complex task, because they changed DataSource SPI a lot although it still has version V2. A good overview of the Spark 3.0 SPI could be found here https://blog.madhukaraphatak.com/categories/datasource-v2-spark-three/

Noted. We are going to merge #167 first and then migrate to V2. We can also split the development of migration in two:

  1. Migrate Read Features (plus add Sampling Pushdown and Limit)
  2. Migrate Writer (if needed)
alexeiakimov commented 1 year ago

Technically I prefer 4 steps:

  1. Implement Read API V2 to have a working pipeline.
  2. Add sampling push down
  3. Add limit push down
  4. Implement Write API V2 falling back to V1Write.

Probably small changes will be easier to review and to demonstrate

osopardo1 commented 1 year ago

Plan looks good to me. 👍

osopardo1 commented 10 months ago

I maintain this issue for future development plans. We need to rethink the design, the utility, and the properties involved.