JanKoelzer / reactive-async

Expressive deterministic concurrency in Scala
BSD 2-Clause "Simplified" License
0 stars 1 forks source link

SPP for pull #26

Open JanKoelzer opened 6 years ago

JanKoelzer commented 6 years ago

SPP could be improved.

JanKoelzer commented 6 years ago

There is a case that shows massive performance improvement, if SPP is used.

package com.phaller.rasync.test

import org.scalatest.FunSuite
import java.util.concurrent.CountDownLatch

import com.phaller.rasync.{FinalOutcome, HandlerPool, NextOutcome, NoOutcome}

import scala.util.Try
import com.phaller.rasync.lattice._

class PerfSuite extends FunSuite {
  test("SPP") {
    val start = System.currentTimeMillis()
    implicit val pool: HandlerPool = new HandlerPool()
    val theKey = new DefaultKey[Set[Int]]
    implicit object IntSetLattice extends Lattice[Set[Int]] {
      def join(left: Set[Int], right: Set[Int]): Set[Int] = left ++ right
      val bottom: Set[Int] = Set[Int]()
    }
    // `center` has many outgoing deps, that fire concurrently.
    // In change, `center` updates very often, while it should be clear
    // that more changes are in sight. So `center` should not inform
    // `dest` about every update but only, if no more update is in sight.
    val center = pool.mkCell[theKey.type, Set[Int]](theKey, c => {
      for (i <- 1 to 10000) {
        val src = pool.mkCell[theKey.type, Set[Int]](theKey, _ => FinalOutcome(Set(i)))
        c.whenNext(src, NextOutcome(_))
        c.whenComplete(src, x =>  NextOutcome(x.map(- _)))
      }
      NoOutcome
    })
    val dest = pool.mkCell[theKey.type, Set[Int]](theKey, c => {
      // `dest` depends on `center` using a slow callback function.
      // One should avoid unnecessary propgations from `center`
      // to `dest`.

      c.whenNext(center, x => {
        // This is slow!
        Try(Thread.sleep(20))
        NextOutcome(x)
      })

      NoOutcome
    })

    dest.trigger()

    val latch = new CountDownLatch(1)
    pool.onQuiescent(() => {
      val end = System.currentTimeMillis()
      val duration = end - start
      println(s"Calculation took $duration seconds")
      pool.shutdown()
      latch.countDown()
    })
    latch.await()
  }

}