Wrap all Incomplete final state into box (and unbox it where necessary) because otherwise job machinery treats such state as intermediate
Fixes #835
diff --git a/common/kotlinx-coroutines-core-common/src/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
index 6b34cad..8b659e0 100644
--- a/common/kotlinx-coroutines-core-common/src/Builders.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
@@ -228,7 +228,7 @@
fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
- val state = this.state
+ val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
index a804484..38139aa 100644
--- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt
+++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
@@ -154,10 +154,10 @@
}
// ------------ state query ------------
-
/**
* Returns current state of this job.
- * @suppress **This is unstable API and it is subject to change.**
+ * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
+ * and should be [unboxed][unboxState] before returning to user code.
*/
internal val state: Any? get() {
_state.loop { state -> // helper loop on state (complete in-progress atomic operations)
@@ -192,7 +192,12 @@
// Finalizes Finishing -> Completed (terminal state) transition.
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
- require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed
+ /*
+ * Note: proposed state can be Incompleted, e.g.
+ * async {
+ * smth.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
+ * }
+ */
require(this.state === state) // consistency check -- it cannot change
require(!state.isSealed) // consistency check -- cannot be sealed yet
require(state.isCompleting) // consistency check -- must be marked as completing
@@ -220,7 +225,7 @@
handleJobException(finalException)
}
// Then CAS to completed state -> it must succeed
- require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
+ require(_state.compareAndSet(state, finalState.boxIncomplete())) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
// And process all post-completion actions
completeStateFinalization(state, finalState, mode, suppressed)
return true
@@ -254,7 +259,7 @@
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
check(update !is CompletedExceptionally) // only for normal completion
- if (!_state.compareAndSet(state, update)) return false
+ if (!_state.compareAndSet(state, update.boxIncomplete())) return false
completeStateFinalization(state, update, mode, false)
return true
}
@@ -1029,7 +1034,7 @@
private val job: JobSupport
) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
override fun getContinuationCancellationCause(parent: Job): Throwable {
- val state = job.state
+ val state = job.state.unboxState()
/*
* When the job we are waiting for had already completely completed exceptionally or
* is failing, we shall use its root/completion cause for await's result.
@@ -1054,7 +1059,7 @@
public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
public fun getCompletionExceptionOrNull(): Throwable? {
- val state = this.state
+ val state = this.state.unboxState()
check(state !is Incomplete) { "This job has not completed yet" }
return state.exceptionOrNull
}
@@ -1063,7 +1068,7 @@
* @suppress **This is unstable API and it is subject to change.**
*/
internal fun getCompletedInternal(): Any? {
- val state = this.state
+ val state = this.state.unboxState()
check(state !is Incomplete) { "This job has not completed yet" }
if (state is CompletedExceptionally) throw state.cause
return state
@@ -1075,7 +1080,7 @@
internal suspend fun awaitInternal(): Any? {
// fast-path -- check state (avoid extra object creation)
while (true) { // lock-free loop on state
- val state = this.state
+ val state = this.state.unboxState()
if (state !is Incomplete) {
// already complete -- just return result
if (state is CompletedExceptionally) throw state.cause
@@ -1131,7 +1136,7 @@
*/
@Suppress("UNCHECKED_CAST")
internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
- val state = this.state
+ val state = this.state.unboxState()
// Note: await is non-atomic (can be cancelled while dispatched)
if (state is CompletedExceptionally)
select.resumeSelectCancellableWithException(state.cause)
@@ -1140,6 +1145,13 @@
}
}
+/*
+ * Class to represent object as the final state of the Job
+ */
+private class IncompleteStateBox(@JvmField val state: Incomplete)
+private fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
+internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
+
// --------------- helper classes & constants for job implementation
private const val COMPLETING_ALREADY_COMPLETING = 0
@@ -1232,8 +1244,7 @@
private val continuation: AbstractContinuation<T>
) : JobNode<JobSupport>(job) {
override fun invoke(cause: Throwable?) {
- val state = job.state
- check(state !is Incomplete)
+ val state = job.state.unboxState()
if (state is CompletedExceptionally) {
// Resume with exception in atomic way to preserve exception
continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
diff --git a/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt b/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt
index a46fe4a..b840fc2 100644
--- a/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt
+++ b/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt
@@ -123,7 +123,7 @@
return when {
result === COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
makeCompletingOnce(result, MODE_IGNORE) -> {
- val state = state
+ val state = state.unboxState()
if (state is CompletedExceptionally) {
when {
shouldThrow(state.cause) -> throw state.cause
diff --git a/common/kotlinx-coroutines-core-common/test/AsyncTest.kt b/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
index cdde2e3..44f6dda 100644
--- a/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
@@ -239,4 +239,16 @@
finish(3)
}
}
+
+ @Test
+ fun testIncompleteAsyncState() = runTest {
+ val job = async {
+ coroutineContext[Job]!!.invokeOnCompletion { }
+ }
+
+ job.await().dispose()
+ assertTrue(job.isCompleted)
+ assertFalse(job.isActive)
+ assertFalse(job.isCancelled)
+ }
}
diff --git a/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
index 915a724..4339ae0 100644
--- a/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
@@ -256,6 +256,20 @@
assertSame(Dispatchers.Unconfined, scopePlusContext(Dispatchers.Unconfined, Dispatchers.Unconfined))
}
+ @Test
+ fun testIncompleteScopeState() = runTest {
+ lateinit var scopeJob: Job
+ coroutineScope {
+ scopeJob = coroutineContext[Job]!!
+ scopeJob.invokeOnCompletion { }
+ }
+
+ scopeJob.join()
+ assertTrue(scopeJob.isCompleted)
+ assertFalse(scopeJob.isActive)
+ assertFalse(scopeJob.isCancelled)
+ }
+
private fun scopePlusContext(c1: CoroutineContext, c2: CoroutineContext) =
(ContextScope(c1) + c2).coroutineContext
}
diff --git a/common/kotlinx-coroutines-core-common/test/JobTest.kt b/common/kotlinx-coroutines-core-common/test/JobTest.kt
index be9760c..d6fadbe 100644
--- a/common/kotlinx-coroutines-core-common/test/JobTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/JobTest.kt
@@ -2,6 +2,8 @@
* Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
+@file:Suppress("DEPRECATION")
+
package kotlinx.coroutines
import kotlin.test.*
@@ -205,4 +207,30 @@
assertTrue(job.isCancelled)
assertTrue(parent.isCancelled)
}
+
+ @Test
+ fun testIncompleteJobState() = runTest {
+ val job = launch {
+ coroutineContext[Job]!!.invokeOnCompletion { }
+ }
+
+ job.join()
+ assertTrue(job.isCompleted)
+ assertFalse(job.isActive)
+ assertFalse(job.isCancelled)
+ }
+
+ @Test
+ fun testChildrenWithIncompleteState() = runTest {
+ val job = async { Wrapper() }
+ job.join()
+ assertTrue(job.children.toList().isEmpty())
+ }
+
+ private class Wrapper : Incomplete {
+ override val isActive: Boolean
+ get() = error("")
+ override val list: NodeList?
+ get() = error("")
+ }
}
diff --git a/common/kotlinx-coroutines-core-common/test/WithContextTest.kt b/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
index 80d3d39..b13d9b7 100644
--- a/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
@@ -52,8 +52,8 @@
expect(2)
val result = withContext(coroutineContext) { // same context!
expect(3) // still here
- "OK"
- }
+ "OK".wrap()
+ }.unwrap()
assertEquals("OK", result)
expect(4)
// will wait for the first coroutine
@@ -70,8 +70,8 @@
expect(3) // still here
yield() // now yields to launch!
expect(5)
- "OK"
- }
+ "OK".wrap()
+ }.unwrap()
assertEquals("OK", result)
finish(6)
}
@@ -95,7 +95,7 @@
} catch (e: CancellationException) {
expect(4)
}
- "OK"
+ "OK".wrap()
}
expectUnreached()
@@ -126,7 +126,7 @@
} catch (e: CancellationException) {
finish(6)
}
- "OK"
+ "OK".wrap()
}
// still fails, because parent job was cancelled
expectUnreached()
@@ -240,7 +240,9 @@
job!!.cancel() // cancel itself
require(job!!.cancel(AssertionError()))
require(!isActive)
+ "OK".wrap()
}
+ expectUnreached()
} catch (e: Throwable) {
expect(7)
// make sure JCE is thrown
@@ -269,7 +271,9 @@
throw TestException()
}
expect(3)
+ "OK".wrap()
}
+ expectUnreached()
} catch (e: TestException) {
// ensure that we can catch exception outside of the scope
expect(5)
@@ -287,7 +291,8 @@
expect(4) // waits before return
}
expect(3)
- }
+ "OK".wrap()
+ }.unwrap()
finish(5)
}
@@ -301,7 +306,32 @@
expect(4) // waits before return
}
expect(3)
- }
+ "OK".wrap()
+ }.unwrap()
finish(5)
}
+
+ @Test
+ fun testIncompleteWithContextState() = runTest {
+ lateinit var ctxJob: Job
+ withContext(wrapperDispatcher(coroutineContext)) {
+ ctxJob = coroutineContext[Job]!!
+ ctxJob.invokeOnCompletion { }
+ }
+
+ ctxJob.join()
+ assertTrue(ctxJob.isCompleted)
+ assertFalse(ctxJob.isActive)
+ assertFalse(ctxJob.isCancelled)
+ }
+
+ private class Wrapper(val value: String) : Incomplete {
+ override val isActive: Boolean
+ get() = error("")
+ override val list: NodeList?
+ get() = error("")
+ }
+
+ private fun String.wrap() = Wrapper(this)
+ private fun Wrapper.unwrap() = value
}
diff --git a/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt b/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt
index 3899278..2f6c37f 100644
--- a/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt
@@ -195,5 +195,19 @@
finish(4)
}
}
-}
+ @Test
+ fun testIncompleteWithTimeoutState() = runTest {
+ lateinit var timeoutJob: Job
+ val handle = withTimeout(Long.MAX_VALUE) {
+ timeoutJob = coroutineContext[Job]!!
+ timeoutJob.invokeOnCompletion { }
+ }
+
+ handle.dispose()
+ timeoutJob.join()
+ assertTrue(timeoutJob.isCompleted)
+ assertFalse(timeoutJob.isActive)
+ assertFalse(timeoutJob.isCancelled)
+ }
+}
diff --git a/core/kotlinx-coroutines-core/src/Builders.kt b/core/kotlinx-coroutines-core/src/Builders.kt
index 01c6c98..e3a7562 100644
--- a/core/kotlinx-coroutines-core/src/Builders.kt
+++ b/core/kotlinx-coroutines-core/src/Builders.kt
@@ -81,7 +81,7 @@
}
timeSource.unregisterTimeLoopThread()
// now return result
- val state = this.state
+ val state = this.state.unboxState()
(state as? CompletedExceptionally)?.let { throw it.cause }
return state as T
}
diff --git a/core/kotlinx-coroutines-core/test/RunBlockingTest.kt b/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
index 55e6e40..d21a9f8 100644
--- a/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
+++ b/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
@@ -152,4 +152,14 @@
assertEquals(1, value)
}
+
+ @Test
+ fun testIncompleteState() {
+ val handle = runBlocking {
+ // See #835
+ coroutineContext[Job]!!.invokeOnCompletion { }
+ }
+
+ handle.dispose()
+ }
}