I think I have a handle on why my attempt to measure the messaging throughput with a single contended echo (rather than one per client) resulting in deadlock. I suspect the problem is actually very serious.
The changes I made:
use a different mailBoxSpec for 'Echo', set to c;lientCount producers
create just one echo actor instance
to make testing easier I just use 900 batch size and 1 iteration
On my system, I have 8 cores, so I have 160 clients and they try to send 64000 messages in 'repeat' loops. This results in the JVM CPU usage dropping to 0.
If I increase the echo mailbox capacity to 30000 then it still fails. With 40000 it is working, but I think I'm just lucky and the echo actor got some CPU.
I believe the problem is that the CPU threads are all waiting on the actor's mailbox because send is effectively a blocking operation.
I think that's rather bad in an actor runtime.
Throughput is fine when its working, but it seems dangerous to use fixed size queues that can block a sender.
fun run() {
val clientCount = getRuntime().availableProcessors() 20
val mailboxSpecClient = ConcurrentQueueSpec(1,1,5000, Ordering.PRODUCER_FIFO , Preference.NONE)
val mailboxSpecEcho = ConcurrentQueueSpec(clientCount,1,40000, Ordering.PRODUCER_FIFO , Preference.NONE)
val messageCount = 1_000_000
val batchSize = 400
println("Dispatcher\t\tElapsed\t\tMsg/sec")
val tps = arrayOf(/1,2,5,10,20,50,100,150,200,300, 400, 500, 600, 700, 800,*/ 900)
for (t in tps) {
val d = DefaultDispatcher(throughput = t)
val echoProps =
fromProducer { EchoActor() }
.withDispatcher(d)
.withMailbox { newSpecifiedMailbox(mailboxSpecEcho) }
val latch = CountDownLatch(clientCount)
val clientProps =
fromProducer { PingActor(latch, messageCount, batchSize) }
.withDispatcher(d)
.withMailbox { newSpecifiedMailbox(mailboxSpecClient)}
val echoActor = spawn(echoProps)
val pairs = (0 until clientCount)
.map { Pair(spawn(clientProps), echoActor) }
.toTypedArray()
val sw = nanoTime()
for ((ping, pong) in pairs) {
send(ping, Start(Msg(ping,pong)))
}
latch.await()
val elapsedNanos = (nanoTime() - sw).toDouble()
val elapsedMillis = (elapsedNanos / 1_000_000).toInt()
val totalMessages = messageCount * 2 * clientCount
val x = ((totalMessages.toDouble() / elapsedNanos * 1_000_000_000.0 ).toInt())
println("$t\t\t\t\t$elapsedMillis\t\t\t$x")
for ((client, echo) in pairs) {
stop(client)
stop(echo)
}
Thread.sleep(500)
}
}
data class Msg(val ping: PID, val pong: PID)
data class Start(val msg : Msg)
class EchoActor : Actor {
suspend override fun Context.receive(msg: Any) {
//print('.')
when (msg) {
is Msg -> send(msg.ping, msg)
}
}
}
class PingActor(private val latch: CountDownLatch, private var messageCount: Int, private val batchSize: Int, private var batch: Int = 0) : Actor {
suspend override fun Context.receive(msg: Any) {
when (msg) {
is Start -> sendBatch(msg.msg)
is Msg -> {
batch--
if (batch > 0) return
if (!sendBatch(msg)) {
latch.countDown()
}
}
}
}
private fun Context.sendBatch(msg : Msg): Boolean = when (messageCount) {
0 -> false
else -> {
val n = minOf(batchSize, messageCount)
repeat(n) { send(msg.pong, msg) }
messageCount -= n
batch = n
true
}
}
I think I have a handle on why my attempt to measure the messaging throughput with a single contended echo (rather than one per client) resulting in deadlock. I suspect the problem is actually very serious.
The changes I made:
On my system, I have 8 cores, so I have 160 clients and they try to send 64000 messages in 'repeat' loops. This results in the JVM CPU usage dropping to 0.
If I increase the echo mailbox capacity to 30000 then it still fails. With 40000 it is working, but I think I'm just lucky and the echo actor got some CPU.
I believe the problem is that the CPU threads are all waiting on the actor's mailbox because send is effectively a blocking operation.
I think that's rather bad in an actor runtime.
Throughput is fine when its working, but it seems dangerous to use fixed size queues that can block a sender.
package actor.proto.examples.inprocessbenchmark
import actor.proto.* import actor.proto.mailbox.DefaultDispatcher import actor.proto.mailbox.newMpscArrayMailbox import actor.proto.mailbox.newSpecifiedMailbox import org.jctools.queues.spec.ConcurrentQueueSpec import org.jctools.queues.spec.Ordering import org.jctools.queues.spec.Preference import java.lang.Runtime.getRuntime import java.lang.System.nanoTime import java.util.concurrent.CountDownLatch
fun main(args: Array) {
repeat(1 /0/) {
run()
readLine()
}
}
fun run() { val clientCount = getRuntime().availableProcessors() 20 val mailboxSpecClient = ConcurrentQueueSpec(1,1,5000, Ordering.PRODUCER_FIFO , Preference.NONE) val mailboxSpecEcho = ConcurrentQueueSpec(clientCount,1,40000, Ordering.PRODUCER_FIFO , Preference.NONE) val messageCount = 1_000_000 val batchSize = 400 println("Dispatcher\t\tElapsed\t\tMsg/sec") val tps = arrayOf(/1,2,5,10,20,50,100,150,200,300, 400, 500, 600, 700, 800,*/ 900) for (t in tps) { val d = DefaultDispatcher(throughput = t)
}
data class Msg(val ping: PID, val pong: PID) data class Start(val msg : Msg)
class EchoActor : Actor { suspend override fun Context.receive(msg: Any) { //print('.') when (msg) { is Msg -> send(msg.ping, msg) } } }
class PingActor(private val latch: CountDownLatch, private var messageCount: Int, private val batchSize: Int, private var batch: Int = 0) : Actor { suspend override fun Context.receive(msg: Any) { when (msg) { is Start -> sendBatch(msg.msg) is Msg -> { batch-- if (batch > 0) return if (!sendBatch(msg)) { latch.countDown() } } } }
}