With the current setup, if a workset collection is accessed more than twice, it will still be cached by the cache call insertion optimization.
Example
for (_ <- 0 until 5) {
val cands = for {
x <- comps
e <- edges
y <- comps
if x.id == e.src
if y.id == e.dst
} yield LVertex(y.id, Math.min(x.label, y.label))
comps = for {
Group(id, cs) <- cands.groupBy(_.id)
} yield LVertex(id, cs.map(_.label).min)
}
This is translated to (approximately)
FlinkNtv.iterate(comps)(comps => {
val cands = for {
x <- comps
e <- edges
y <- comps
if x.id == e.src
if y.id == e.dst
} yield LVertex(y.id, Math.min(x.label, y.label))
for {
Group(id, cs) <- cands.groupBy(_.id)
} yield LVertex(id, cs.map(_.label).min)
})
In the rewritten version, the lambda passed to FlinkNtv.iterate has a parameter comps which is accessed twice the lambda body. Because of that, a subsequent addCacheCalls transformation will insert a FlinkOps.cache(comps) call at the beginning of the lambda body.
Suggested Solution
As a first approximation, we should exclude parameter caching for lambdas passed to a FlinkNtv.iterate operator.
With the current setup, if a workset collection is accessed more than twice, it will still be cached by the cache call insertion optimization.
Example
This is translated to (approximately)
In the rewritten version, the lambda passed to
FlinkNtv.iterate
has a parametercomps
which is accessed twice the lambda body. Because of that, a subsequentaddCacheCalls
transformation will insert aFlinkOps.cache(comps)
call at the beginning of the lambda body.Suggested Solution
As a first approximation, we should exclude parameter caching for lambdas passed to a
FlinkNtv.iterate
operator.