* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
package benchmarks
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.infra.Blackhole
import java.lang.Integer.max
import java.util.concurrent.ForkJoinPool
import java.util.concurrent.Phaser
import java.util.concurrent.ThreadLocalRandom
import java.util.concurrent.TimeUnit
* Benchmark to measure channel algorithm performance in terms of average time per `send-receive` pair;
* actually, it measures the time for a batch of such operations separated into the specified number of consumers/producers.
* It uses different channels (rendezvous, buffered, unlimited; see [ChannelCreator]) and different dispatchers
* (see [DispatcherCreator]). If the [_3_withSelect] property is set, it invokes `send` and
* `receive` via [select], waiting on a local dummy channel simultaneously, simulating a "cancellation" channel.
* Please, be patient, this benchmark takes quite a lot of time to complete.
@Warmup(iterations = 3, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MICROSECONDS)
@Fork(value = 3)
open class ChannelProducerConsumerBenchmark {
private var _0_dispatcher: DispatcherCreator = DispatcherCreator.FORK_JOIN
private var _1_channel: ChannelCreator = ChannelCreator.RENDEZVOUS
@Param("0", "1000")
private var _2_coroutines: Int = 0
@Param("false", "true")
private var _3_withSelect: Boolean = false
@Param("1", "2", "4") // local machine
// @Param("1", "2", "4", "8", "12") // local machine
// @Param("1", "2", "4", "8", "16", "32", "64", "128", "144") // dasquad
// @Param("1", "2", "4", "8", "16", "32", "64", "96") // Google Cloud
private var _4_parallelism: Int = 0
private lateinit var dispatcher: CoroutineDispatcher
private lateinit var channel: Channel<Int>
fun setup() {
dispatcher = _0_dispatcher.create(_4_parallelism)
channel = _1_channel.create()
fun spmc() {
if (_2_coroutines != 0) return
val producers = max(1, _4_parallelism - 1)
val consumers = 1
run(producers, consumers)
fun mpmc() {
val producers = if (_2_coroutines == 0) (_4_parallelism + 1) / 2 else _2_coroutines / 2
val consumers = producers
run(producers, consumers)
private fun run(producers: Int, consumers: Int) {
val n = APPROX_BATCH_SIZE / producers * producers
val phaser = Phaser(producers + consumers + 1)
// Run producers
repeat(producers) {
GlobalScope.launch(dispatcher) {
val dummy = if (_3_withSelect) _1_channel.create() else null
repeat(n / producers) {
produce(it, dummy)
// Run consumers
repeat(consumers) {
GlobalScope.launch(dispatcher) {
val dummy = if (_3_withSelect) _1_channel.create() else null
repeat(n / consumers) {
// Wait until work is done
private suspend fun produce(element: Int, dummy: Channel<Int>?) {
if (_3_withSelect) {
select<Unit> {
channel.onSend(element) {}
dummy!!.onReceive {}
} else {
private suspend fun consume(dummy: Channel<Int>?) {
if (_3_withSelect) {
select<Unit> {
channel.onReceive {}
dummy!!.onReceive {}
} else {
enum class DispatcherCreator(val create: (parallelism: Int) -> CoroutineDispatcher) {
FORK_JOIN({ parallelism -> ForkJoinPool(parallelism).asCoroutineDispatcher() })
enum class ChannelCreator(private val capacity: Int) {
// BUFFERED_1(1),
// BUFFERED_4(4),
fun create(): Channel<Int> = Channel(capacity)
private fun doWork(): Unit = Blackhole.consumeCPU(ThreadLocalRandom.current().nextLong(WORK_MIN, WORK_MAX))
private const val WORK_MIN = 50L
private const val WORK_MAX = 100L
private const val APPROX_BATCH_SIZE = 100000