microsoft / hyperspace

An open source indexing subsystem that brings index-based query acceleration to Apache Spark™ and big data workloads.
https://aka.ms/hyperspace
Apache License 2.0
424 stars 115 forks source link

Refactor hashing and fingerprint functions #404

Open clee704 opened 3 years ago

clee704 commented 3 years ago

Fixes #271.

With newly added classes for fingerprinting, now it is easy to make a fingerprint from various types of data.

FingerprintBuilder can be used to build a fingerprint from data. FileBasedRelation.signature now takes it as an argument, so the implementations don't have to do the hashing themselves. They can just use the provided builder.

For unorderd combining, one can use bitwise XOR to combine multiple fingerprints. This way, the order becomes irrelevant and can be used for unordered collections like Set.

What is the context for this pull request?

What changes were proposed in this pull request?

Does this PR introduce any user-facing change?

The JSON format remains the same as well as MD5. However, the way the signatures are computed is changed to accommodate more efficient computation. Therefore, existing signatures won't match the new one. This is a breaking change as #268 was.

For library authors and developers who want to extend Hyperspace, the API is incompatible and must be adapted.

How was this patch tested?

Unit test

andrei-ionescu commented 3 years ago

@clee704, @sezruby

If this is a breaking change that will make the already created indexes useless, I would like to throw in some common sense questions that should be addressed in a way or another:

  1. Any migration tool to migrate the existing indexes to the new fingerprinting method?
  2. If the answer is NO to the question above, is there a plan to have such a tool?
  3. If the answer is NO do we at least know how many are using OSS Hyperspace right now and let them know before about this fact?
  4. In the case of a user trying to use an old index with this new version how would he know that the problem is this breaking change? Is there an exception thrown or a message or something?
  5. Is there a process defined somewhere for cases when breaking changes are introduced?
  6. How will this breaking change be documented for the users?

I know these are a lot of questions and as good engineers we need to have a bit of "client-centric" mind to create a useful and easy-to-use product.

@rapoth, @apoorvedave1, @pirz, @imback82 WDYT?

rapoth commented 3 years ago

@clee704, @sezruby

If this is a breaking change that will make the already created indexes useless, I would like to throw in some common sense questions that should be addressed in a way or another:

  1. Any migration tool to migrate the existing indexes to the new fingerprinting method?
  2. If the answer is NO to the question above, is there a plan to have such a tool?
  3. If the answer is NO do we at least know how many are using OSS Hyperspace right now and let them know before about this fact?
  4. In the case of a user trying to use an old index with this new version how would he know that the problem is this breaking change? Is there an exception thrown or a message or something?
  5. Is there a process defined somewhere for cases when breaking changes are introduced?
  6. How will this breaking change be documented for the users?

I know these are a lot of questions and as good engineers we need to have a bit of "client-centric" mind to create a useful and easy-to-use product.

@rapoth, @apoorvedave1, @pirz, @imback82 WDYT?

Thanks @andrei-ionescu for rasing these questions! Here's my take on this:

  1. Any migration tool to migrate the existing indexes to the new fingerprinting method?
  2. If the answer is NO to the question above, is there a plan to have such a tool?

My thinking is that we should not block on a migration utility for this upcoming release. However, I requested @clee704 to write-up a design proposal for how such a migration utility would be designed along with all the dimensions we would have to cover. @clee704 should be able to get to that soon. Once it passes vote, it will be open for one of the folks to take up in the coming weeks.

That being said, if you are affected by this breaking change or one of your users/customers is, please let us know. Currently, I have already provided a heads-up to the customers I know who are using Hyperspace in production and they were fine with it.

  1. In the case of a user trying to use an old index with this new version how would he know that the problem is this breaking change? Is there an exception thrown or a message or something?

@clee704 @sezruby Could we verify what happens and add the appropriate error message + steps they would have to take to overcome the problem?

  1. Is there a process defined somewhere for cases when breaking changes are introduced?

No but now is probably a good time to discuss and document this. In general, as long as we do not announce a v1.0, we do not guarantee backward/forward compatibility and do not recommend the code to be used in production. This may not have anything to do with the quality of the code but rather stability (as customers provide us with more and more feature requests, we are trying to establish a good foundation so we can avoid major breaking changes in the future).

As we get closer to v1.0, we will follow semantic versioning guidelines (i.e., everything within a major version will be forward/backward compatible but across major versions, we may break things but we will provide a migration utility by then).

  1. How will this breaking change be documented for the users?

It will be documented in the release notes of this specific version and the frequently-asked-questions section. Suggestions are welcome!

andrei-ionescu commented 3 years ago

@rapoth

Thanks for putting up your thoughts.

My thinking is that we should not block on a migration utility for this upcoming release.

I agree that we should not block this PR by not having the migration tool - this is acceptable now as Hyperspace is at the beginning. It would be great though to have at least a ticket/issue/proposal/feature to describe the gap and put together a plan to fill the gap. I’m glad we are on the same page.

if you are affected by this breaking change or one of your users/customers is, please let us know

Currently no major impact. I do have a few indexes created on big datasets 15TB and more or which I will be forced to recreate when you upgrading.

rapoth commented 3 years ago

My thinking is that we should not block on a migration utility for this upcoming release.

I agree that we should not block this PR by not having the migration tool - this is acceptable now as Hyperspace is at the beginning. It would be great though to have at least a ticket/issue/proposal/feature to describe the gap and put together a plan to fill the gap. I’m glad we are on the same page.

Thanks @andrei-ionescu! I've requested @clee704 to do this. Once he wraps up his current work, he will write up a proposal . Here's the feature request I just opened #420 - please feel free to add anything else I may have missed directly in that issue.

clee704 commented 3 years ago

If this signature change is the only breaking change of v0.5, then it should be easy to update the signatures of indices from old systems as we have a version string in the serialized index log entries. This will be addressed as part of #420. Perhaps a simple migration utility to address this could be part of v0.5.

clee704 commented 3 years ago

Spark was too slow to create such many files, so I did measurement for the signature method of DefaultFileBasedRelation with fake files.

Below are the elapsed times of the signature method with varying number of files (averages of 10 repetitions).

# Files Before After Speedup
10,000 65.7 ms 32.4 ms 51%
100,000 581 ms 249 ms 57%
1,000,000 7.59 s 2.32 s 69%

As you can see, the performance of signature was superlinear before the PR, most likely O(nlogn) due to the sorting. It is now linear.

The results can vary depending on many other factors such as file path lengths and the format of file names. In this measurement I assumed a single root directory whose path length is 64 and random file names of length 67 (which is the default length of parquet files in Spark).

Here's the source for the test:

import scala.util.Random

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningAwareFileIndex}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.types.StructType

import com.microsoft.hyperspace.index.sources.default.DefaultFileBasedRelation
import com.microsoft.hyperspace.util.fingerprint.MD5FingerprintBuilderFactory

class PerformanceTest extends SparkFunSuite {
  test("DefaultFileBasedRelation.signature performance") {
    for (numFiles <- Seq(10000, 100000, 1000000)) {
      val repetition = 10
      val measurements = (1 to repetition).map(_ => runTest(numFiles))
      println(s"numFiles = $numFiles")
      println(s"max = ${measurements.max / 1000000.0} ms")
      println(s"min = ${measurements.min / 1000000.0} ms")
      println(s"avg = ${measurements.sum / measurements.length / 1000000.0} ms")
    }
  }

  def runTest(numFiles: Int): Long = {
    val fileBasedRel = getDefaultFileBasedRelation(numFiles)
    val start = System.nanoTime()
    val sig = fileBasedRel.signature(fb)
    // val sig = fileBasedRel.signature
    val end = System.nanoTime()
    assert(sig.isDefined)
    // assert(sig.nonEmpty)
    end - start
  }

  def getDefaultFileBasedRelation(numFiles: Int): DefaultFileBasedRelation = {
    val files = (1 to numFiles).map(_ => getRandomFileStatus())
    val location = FakeFileIndex(files)(spark)
    val parquet = new ParquetFileFormat()
    val st = new StructType()
    val rel = HadoopFsRelation(location, st, st, None, parquet, Map())(spark)
    val logicalRel = LogicalRelation(rel)
    new DefaultFileBasedRelation(spark, logicalRel)
  }

  def getRandomFileStatus(): FileStatus = {
    val length = rand.nextLong()
    val modificationTime = rand.nextLong()
    val name = getRandomAlphanumericString(67)
    val path = new Path(s"/$dir", name)
    new FileStatus(length, false, 3, 128 * 1024 * 1024, modificationTime, path)
  }

  def getRandomAlphanumericString(length: Int): String = {
    val chars = "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
    val s = new Array[Char](length)
    for (i <- 0 until length) s(i) = chars(rand.nextInt(chars.length))
    new String(s)
  }

  Logger.getLogger("org").setLevel(Level.OFF)
  lazy val spark = SparkSession.builder().master("local").getOrCreate()
  lazy val rand = new Random(42)
  lazy val dir = getRandomAlphanumericString(64)
  lazy val fb = new MD5FingerprintBuilderFactory().create

  case class FakeFileIndex(files: Seq[FileStatus])(sparkSession: SparkSession)
    extends PartitioningAwareFileIndex(sparkSession, Map(), None) {
    override def allFiles(): Seq[FileStatus] = files
    override def partitionSpec() = ???
    override def leafFiles = ???
    override def leafDirToChildrenFiles = ???
    override def rootPaths = ???
    override def refresh() = ???
  }
}