blob: ea9aeca94dfaf378d4b9fb8c830dab8a8f3a7ead [file] [log] [blame]
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
package benchmarks.akka
import akka.actor.*
import com.typesafe.config.*
import org.openjdk.jmh.annotations.*
import scala.concurrent.*
import scala.concurrent.duration.*
import java.util.concurrent.*
const val N_MESSAGES = 100_000
data class Ball(val count: Int)
class Start
class Stop
/*
* Benchmark (dispatcher) Mode Cnt Score Error Units
* PingPongAkkaBenchmark.coresCountPingPongs default-dispatcher avgt 10 277.501 ± 38.583 ms/op
* PingPongAkkaBenchmark.coresCountPingPongs single-thread-dispatcher avgt 10 196.192 ± 9.889 ms/op
*
* PingPongAkkaBenchmark.singlePingPong default-dispatcher avgt 10 173.742 ± 41.984 ms/op
* PingPongAkkaBenchmark.singlePingPong single-thread-dispatcher avgt 10 24.181 ± 0.730 ms/op
*/
//@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
//@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
//@Fork(value = 2)
//@BenchmarkMode(Mode.AverageTime)
//@OutputTimeUnit(TimeUnit.MILLISECONDS)
//@State(Scope.Benchmark)
open class PingPongAkkaBenchmark {
lateinit var system: ActorSystem
@Param("default-dispatcher", "single-thread-dispatcher")
var dispatcher: String = "akka.actor.default-dispatcher"
@Setup
fun setup() {
system = ActorSystem.create("PingPong", ConfigFactory.parseString("""
akka.actor.single-thread-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 1
}
throughput = 1
}
""".trimIndent()
))
}
@TearDown
fun tearDown() {
Await.ready(system.terminate(), Duration.Inf())
}
// @Benchmark
fun singlePingPong() {
runPingPongs(1)
}
// @Benchmark
fun coresCountPingPongs() {
runPingPongs(Runtime.getRuntime().availableProcessors())
}
private fun runPingPongs(pairsCount: Int) {
val latch = CountDownLatch(pairsCount)
repeat(pairsCount) {
val pongRef = system.actorOf(Props.create(PongActorAkka::class.java)
.withDispatcher("akka.actor.$dispatcher"))
val pingRef = system.actorOf(Props.create(PingActorAkka::class.java, pongRef, latch)
.withDispatcher("akka.actor.$dispatcher"))
pingRef.tell(Start(), ActorRef.noSender())
}
latch.await()
}
class PingActorAkka(val pongRef: ActorRef, val stopLatch: CountDownLatch) : UntypedAbstractActor() {
override fun onReceive(msg: Any?) {
when (msg) {
is Start -> {
pongRef.tell(Ball(0), self)
}
is Ball -> {
pongRef.tell(Ball(count = msg.count + 1), self)
}
is Stop -> {
stopLatch.countDown()
context.stop(self)
}
else -> unhandled(msg)
}
}
}
class PongActorAkka : UntypedAbstractActor() {
override fun onReceive(msg: Any?) {
when (msg) {
is Ball -> {
if (msg.count >= N_MESSAGES) {
sender.tell(Stop(), self)
context.stop(self)
} else {
sender.tell(Ball(msg.count + 1), self)
}
}
else -> unhandled(msg)
}
}
}
}