Open matrix-2022 opened 5 years ago
thanks for your comment. In SeqOp, weight wasn't summed, it's a returned value from each atomic computation. I don't think we should divide the total weights by the number of data partition. pls refer to https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L257
:)
as you say, "In SeqOp, weight wasn't summed, it's a returned value from each atomic computation." for example, if miniBatchSize is 1000, partition num is 10, then each SeqOp deals 100 samples, and renturn weights(not summed), and then combOp sum weights from 10 partitions, so wSum should divide 10.
I run this model on the data you give, and after trainning, final weights are very small(as divide by sample count, not partition count)
your implement is not the same as https://github.com/apache/spark/blob/5264164a67df498b73facae207eda12ee133be7d/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala#L257
/**
where "cumGradient the computed gradient will be added to this vector"
in your code, def computeFFM(label: Double, data: Array[(Int, Int, Double)], weights: Vector, r: Double = 1.0, eta: Double, regParam: (Double, Double), do_update: Boolean, iter: Int, solver: Boolean = true): (BDV[Double], Double) only return computed weights, not summed gradient or weights
val (wSum, lSum, miniBatchSize) = data.treeAggregate((BDV(bcWeights.value.toArray), 0.0, 0L))( seqOp = (c, v) => { val r = gradient.asInstanceOf[FFMGradient].computeFFM(v._1, (v._2), Vectors.fromBreeze(c._1), 1.0, eta, regParam, true, i, solver) (r._1, r._2 + c._2, c._3 + 1) }, combOp = (c1, c2) => { (c1._1 + c2._1, c1._2 + c2._2, c1._3 + c2._3) }) // TODO: add depth level bcWeights.destroy(blocking = false)
when average loss weights = Vectors.dense(wSum.toArray.map( / miniBatchSize)) is not correct, because in seqOp weights are already added up, then operation "weights = Vectors.dense(wSum.toArray.map( / miniBatchSize))" will lead to all weights very small.
The right way is "weights = Vectors.dense(wSum.toArray.map(_ / data.getNumPartitions))"