Qbeast-io / qbeast-spark

Qbeast-spark: DataSource enabling multi-dimensional indexing and efficient data sampling. Big Data, free from the unnecessary!
https://qbeast.io/qbeast-our-tech/
Apache License 2.0
210 stars 19 forks source link

Add Quantile Transformer for skewed column datasets #338

Closed osopardo1 closed 1 week ago

osopardo1 commented 3 months ago

After analyzing the efficiency of distribution functions for indexing (see issue #336 ), we can start implementing the QuantileTransformation.

The idea is to build it as another type of transformation, and eventually turn it into the default.

The API can be something like:

df.write.format("qbeast").option("columnsToIndex", "id:quantile").save("/tmp/test-quantile")

Or we can use the word approx:

df.write.format("qbeast").option("columnsToIndex", "id:approx").save("/tmp/test-quantile")

And under the hood:


case class QuantileTransformation(quantiles: IndexedSeq[Any]) extends Transformation {

  override def transform(value: Any): Double = ???

  /**
   * This method should determine if the new data will cause the creation of a new revision.
   *
   * @param newTransformation
   *   the new transformation created with statistics over the new data
   * @return
   *   true if the domain of the newTransformation is not fully contained in this one.
   */
  override def isSupersededBy(newTransformation: Transformation): Boolean = ???

  /**
   * Merges two transformations. The domain of the resulting transformation is the union of this
   *
   * @param other
   * @return
   *   a new Transformation that contains both this and other.
   */
  override def merge(other: Transformation): Transformation = ???
}

object QuantileTransformation {
  def apply(quantiles: IndexedSeq[String]): QuantileTransformation = new QuantileTransformation(hist)
}

We would take advantage of the first step of OTreeDataAnalyzer and compute an approximate quantiles of the columns specified.

  /**
   * Analyze a specific group of columns of the dataframe and extract valuable statistics
   * @param data
   *   the data to analyze
   * @param columnTransformers
   *   the columns to analyze
   * @return
   */
  private[index] def getDataFrameStats(
      data: DataFrame,
      columnTransformers: IISeq[Transformer]): Row = {
    val columnStats = columnTransformers.map(_.stats)
    val columnsExpr = columnStats.flatMap(_.statsSqlPredicates)
    data.selectExpr(columnsExpr ++ Seq("count(1) AS count"): _*).first()
  }
osopardo1 commented 3 weeks ago

I am changing the title from Histogram to Approximate Quantile. It would be more adequate to use this method instead of the histogram to infer new statistics from the table and to transform the rows at writing time.

osopardo1 commented 3 weeks ago

Before implementing, I will prepare a document regarding the existing algorithms for the Approximate Quantiles.

Spark already has an approximateQuantile method, but unfortunately only works with numerical values.

osopardo1 commented 2 weeks ago

Leaving this issue On Hold: We will need to review the design document before implementing. Some concerns regarding algorithms and Strings needs to be processed.

osopardo1 commented 2 weeks ago

This issue would only be regarding a simple implementation of the API. It would not include any logic of updating the Quantiles and so on.

Same as we did for Histograms #230 , we will add a different type of transformation for quantiles.

osopardo1 commented 1 week ago

Since the issue has diverged from the original scope, I would open a new one

osopardo1 commented 1 week ago

Closing it because we changed to #416