Eagerly create coroutine for lazily-started coroutine builders in order to trigger DebugProbes.probeCoroutineCreated
Fixes #1544
diff --git a/kotlinx-coroutines-core/common/src/Builders.common.kt b/kotlinx-coroutines-core/common/src/Builders.common.kt
index 4c42e5e..f79cfc7 100644
--- a/kotlinx-coroutines-core/common/src/Builders.common.kt
+++ b/kotlinx-coroutines-core/common/src/Builders.common.kt
@@ -106,12 +106,10 @@
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> T
) : DeferredCoroutine<T>(parentContext, active = false) {
- private var block: (suspend CoroutineScope.() -> T)? = block
+ private val continuation = block.createCoroutineUnintercepted(this, this)
override fun onStart() {
- val block = checkNotNull(this.block) { "Already started" }
- this.block = null
- block.startCoroutineCancellable(this, this)
+ continuation.startCoroutineCancellable(this)
}
}
@@ -190,12 +188,10 @@
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
- private var block: (suspend CoroutineScope.() -> Unit)? = block
+ private val continuation = block.createCoroutineUnintercepted(this, this)
override fun onStart() {
- val block = checkNotNull(this.block) { "Already started" }
- this.block = null
- block.startCoroutineCancellable(this, this)
+ continuation.startCoroutineCancellable(this)
}
}
diff --git a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
index 4ad6b8c..d230326 100644
--- a/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
+++ b/kotlinx-coroutines-core/common/src/channels/Broadcast.kt
@@ -9,6 +9,7 @@
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.intrinsics.*
import kotlin.coroutines.*
+import kotlin.coroutines.intrinsics.*
/**
* Broadcasts all elements of the channel.
@@ -124,7 +125,7 @@
channel: BroadcastChannel<E>,
block: suspend ProducerScope<E>.() -> Unit
) : BroadcastCoroutine<E>(parentContext, channel, active = false) {
- private var block: (suspend ProducerScope<E>.() -> Unit)? = block
+ private val continuation = block.createCoroutineUnintercepted(this, this)
override fun openSubscription(): ReceiveChannel<E> {
// open subscription _first_
@@ -135,8 +136,6 @@
}
override fun onStart() {
- val block = checkNotNull(this.block) { "Already started" }
- this.block = null
- block.startCoroutineCancellable(this, this)
+ continuation.startCoroutineCancellable(this)
}
}
diff --git a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
index 246ae2c..ca0ab18 100644
--- a/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
+++ b/kotlinx-coroutines-core/common/src/intrinsics/Cancellable.kt
@@ -27,6 +27,15 @@
}
/**
+ * Similar to [startCoroutineCancellable], but for already created coroutine.
+ * [fatalCompletion] is used only when interception machinery throws an exception
+ */
+internal fun Continuation<Unit>.startCoroutineCancellable(fatalCompletion: Continuation<*>) =
+ runSafely(fatalCompletion) {
+ intercepted().resumeCancellable(Unit)
+ }
+
+/**
* Runs given block and completes completion with its exception if it occurs.
* Rationale: [startCoroutineCancellable] is invoked when we are about to run coroutine asynchronously in its own dispatcher.
* Thus if dispatcher throws an exception during coroutine start, coroutine never completes, so we should treat dispatcher exception
diff --git a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
index ffabb99..c1c1fb3 100644
--- a/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
+++ b/kotlinx-coroutines-core/jvm/src/channels/Actor.kt
@@ -8,6 +8,7 @@
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
+import kotlin.coroutines.intrinsics.*
/**
* Scope for [actor][GlobalScope.actor] coroutine builder.
@@ -143,11 +144,14 @@
private class LazyActorCoroutine<E>(
parentContext: CoroutineContext,
channel: Channel<E>,
- private val block: suspend ActorScope<E>.() -> Unit
+ block: suspend ActorScope<E>.() -> Unit
) : ActorCoroutine<E>(parentContext, channel, active = false),
SelectClause2<E, SendChannel<E>> {
+
+ private var continuation = block.createCoroutineUnintercepted(this, this)
+
override fun onStart() {
- block.startCoroutineCancellable(this, this)
+ continuation.startCoroutineCancellable(this)
}
override suspend fun send(element: E) {
diff --git a/kotlinx-coroutines-debug/test/StartModeProbesTest.kt b/kotlinx-coroutines-debug/test/StartModeProbesTest.kt
index a0297da..c2656d3 100644
--- a/kotlinx-coroutines-debug/test/StartModeProbesTest.kt
+++ b/kotlinx-coroutines-debug/test/StartModeProbesTest.kt
@@ -5,7 +5,7 @@
package kotlinx.coroutines.debug
import kotlinx.coroutines.*
-import org.junit.*
+import kotlinx.coroutines.channels.*
import org.junit.Test
import kotlin.test.*
@@ -138,4 +138,16 @@
delay(Long.MAX_VALUE)
}
}
+
+ @Test
+ fun testLazy() = runTest({ it is CancellationException }) {
+ launch(start = CoroutineStart.LAZY) { }
+ actor<Int>(start = CoroutineStart.LAZY) { }
+ broadcast<Int>(start = CoroutineStart.LAZY) { }
+ async(start = CoroutineStart.LAZY) { 1 }
+ verifyPartialDump(5, "BlockingCoroutine",
+ "LazyStandaloneCoroutine", "LazyActorCoroutine",
+ "LazyBroadcastCoroutine", "LazyDeferredCoroutine")
+ cancel()
+ }
}