Closed anilpuliyeril closed 10 months ago
Hi!
Thank you for submitting the issue. We do not plan to prioritize the approximate bounds improvements in the nearest future, but you should be able to use the current implementation from a UDAF. For this, you will have to serialize and merge partial ApproximateBounds
states by calling getSerializableSummary
and mergeWith
.
I am not super-familiar with Spark UDAFs, but my understanding after reading https://spark.apache.org/docs/latest/sql-ref-functions-udf-aggregate.html is that:
IN
) type is Double, the output (OUT
) type is ApproximateBounds.Result
, while the type of the intermediate value of the reduction (BUF
) is byte[]
, it encodes the serialized representation of ApproximateBounds
.zero()
should create an empty instance of ApproximateBounds
, serialize it by calling getSerializableSummary()
and return the result (the result will have type byte[]
)reduce(otherApproxBounds: byte[], value: Double)
should do something like this:
ApproximateBounds approxBounds = ... // create an instance of ApproximateBounds
approxBounds.addEntry(value) // add the passed value to the approx bounds
approxBounds.merge(otherApproxBounds) // merge with the input bounds
return approxBounds.getSerializableSummary() // serialize the current merged bounds and return the result
merge(approxBounds1: byte[], approxBounds2: byte[]:
should:
ApproximateBounds approxBounds = ... // create an empty instance of ApproximateBounds
approxBounds.merge(approxBounds1) // merge it with one of the input partial approx bounds
approxBounds.merge(approxBounds2) // merge it with the other input partial approx bounds
return approxBounds.getSerializableSummary() // serialize the current merged bounds and return the result
finish(approxBoundsSerialized: byte[])
should:
ApproximateBounds approxBounds = ... // create an empty instance of ApproximateBounds
approxBounds.merge(approxBoundsSerialized) // merge the final aggregated bounds into it
return approxBounds.computeResult() // compute the result and return it
The usage is not super-intuitive, because the deserialize
method is currently missing in the ApproximateBounds
, and hence in order to deserialize byte[] approxBounds
, we have to create an empty instance of ApproximateBounds
and merge it with the serialized byte array that needs to be deserialized.
@monsieurmuffin thank you so much for the reply. This is exactly the path i was following. This really helped. The byte array serialization and new approxBound initializations through out seems to be causing significant delays. Running it over newyork taxi data ~20 mil record takes 2 hours vs using kryo for serializing the ApproxBounds directly.
import com.google.privacy.differentialprivacy.ApproximateBounds
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders}
case class ApproximateBoundsResult(lowerBound: Double, upperBound: Double)
object ApproximateBoundsUDF extends Aggregator[Double, ApproximateBounds, ApproximateBoundsResult] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: ApproximateBounds = getApproxBound
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: ApproximateBounds, data: Double): ApproximateBounds = {
buffer.addEntry(data)
buffer
}
private def getApproxBound: ApproximateBounds = {
ApproximateBounds.builder().inputType(ApproximateBounds.Params.InputType.DOUBLE).epsilon(100).maxContributions(1000).build()
}
// Merge two intermediate values
def merge(b1: ApproximateBounds, b2: ApproximateBounds): ApproximateBounds = {
b1.mergeWith(b2.getSerializableSummary)
b1
}
// Transform the output of the reduction
def finish(reduction: ApproximateBounds): ApproximateBoundsResult = {
val result = reduction.computeResult()
ApproximateBoundsResult(result.lowerBound, result.upperBound)
}
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[ApproximateBounds] = Encoders.kryo(classOf[ApproximateBounds])
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[ApproximateBoundsResult] = Encoders.product[ApproximateBoundsResult]
}
I'm trying to use the approximate bounds algorithm provided in the java section in a spark job. This is because the data volumes we are looking at considerably large (~50bn). I believe it would require a UDAF for the same as otherwise it would just run on the driver node. Is there a plan to add support for a distributed version for the approximate bounds algorithm?