Wrap all Incomplete final state into box (and unbox it where necessary) because otherwise job machinery treats such state as intermediate

Fixes #835
diff --git a/common/kotlinx-coroutines-core-common/src/Builders.common.kt b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
index 6b34cad..8b659e0 100644
--- a/common/kotlinx-coroutines-core-common/src/Builders.common.kt
+++ b/common/kotlinx-coroutines-core-common/src/Builders.common.kt
@@ -228,7 +228,7 @@
     fun getResult(): Any? {
         if (trySuspend()) return COROUTINE_SUSPENDED
         // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
-        val state = this.state
+        val state = this.state.unboxState()
         if (state is CompletedExceptionally) throw state.cause
         @Suppress("UNCHECKED_CAST")
         return state as T
diff --git a/common/kotlinx-coroutines-core-common/src/JobSupport.kt b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
index a804484..38139aa 100644
--- a/common/kotlinx-coroutines-core-common/src/JobSupport.kt
+++ b/common/kotlinx-coroutines-core-common/src/JobSupport.kt
@@ -154,10 +154,10 @@
     }
 
     // ------------ state query ------------
-
     /**
      * Returns current state of this job.
-     * @suppress **This is unstable API and it is subject to change.**
+     * If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
+     * and should be [unboxed][unboxState] before returning to user code.
      */
     internal val state: Any? get() {
         _state.loop { state -> // helper loop on state (complete in-progress atomic operations)
@@ -192,7 +192,12 @@
     // Finalizes Finishing -> Completed (terminal state) transition.
     // ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
     private fun tryFinalizeFinishingState(state: Finishing, proposedUpdate: Any?, mode: Int): Boolean {
-        require(proposedUpdate !is Incomplete) // only incomplete -> completed transition is allowed
+        /*
+         * Note: proposed state can be Incompleted, e.g.
+         * async {
+         *   smth.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
+         * }
+         */
         require(this.state === state) // consistency check -- it cannot change
         require(!state.isSealed) // consistency check -- cannot be sealed yet
         require(state.isCompleting) // consistency check -- must be marked as completing
@@ -220,7 +225,7 @@
             handleJobException(finalException)
         }
         // Then CAS to completed state -> it must succeed
-        require(_state.compareAndSet(state, finalState)) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
+        require(_state.compareAndSet(state, finalState.boxIncomplete())) { "Unexpected state: ${_state.value}, expected: $state, update: $finalState" }
         // And process all post-completion actions
         completeStateFinalization(state, finalState, mode, suppressed)
         return true
@@ -254,7 +259,7 @@
     private fun tryFinalizeSimpleState(state: Incomplete, update: Any?, mode: Int): Boolean {
         check(state is Empty || state is JobNode<*>) // only simple state without lists where children can concurrently add
         check(update !is CompletedExceptionally) // only for normal completion
-        if (!_state.compareAndSet(state, update)) return false
+        if (!_state.compareAndSet(state, update.boxIncomplete())) return false
         completeStateFinalization(state, update, mode, false)
         return true
     }
@@ -1029,7 +1034,7 @@
         private val job: JobSupport
     ) : CancellableContinuationImpl<T>(delegate, MODE_CANCELLABLE) {
         override fun getContinuationCancellationCause(parent: Job): Throwable {
-            val state = job.state
+            val state = job.state.unboxState()
             /*
              * When the job we are waiting for had already completely completed exceptionally or
              * is failing, we shall use its root/completion cause for await's result.
@@ -1054,7 +1059,7 @@
     public val isCompletedExceptionally: Boolean get() = state is CompletedExceptionally
 
     public fun getCompletionExceptionOrNull(): Throwable? {
-        val state = this.state
+        val state = this.state.unboxState()
         check(state !is Incomplete) { "This job has not completed yet" }
         return state.exceptionOrNull
     }
@@ -1063,7 +1068,7 @@
      * @suppress **This is unstable API and it is subject to change.**
      */
     internal fun getCompletedInternal(): Any? {
-        val state = this.state
+        val state = this.state.unboxState()
         check(state !is Incomplete) { "This job has not completed yet" }
         if (state is CompletedExceptionally) throw state.cause
         return state
@@ -1075,7 +1080,7 @@
     internal suspend fun awaitInternal(): Any? {
         // fast-path -- check state (avoid extra object creation)
         while (true) { // lock-free loop on state
-            val state = this.state
+            val state = this.state.unboxState()
             if (state !is Incomplete) {
                 // already complete -- just return result
                 if (state is CompletedExceptionally) throw state.cause
@@ -1131,7 +1136,7 @@
      */
     @Suppress("UNCHECKED_CAST")
     internal fun <T, R> selectAwaitCompletion(select: SelectInstance<R>, block: suspend (T) -> R) {
-        val state = this.state
+        val state = this.state.unboxState()
         // Note: await is non-atomic (can be cancelled while dispatched)
         if (state is CompletedExceptionally)
             select.resumeSelectCancellableWithException(state.cause)
@@ -1140,6 +1145,13 @@
     }
 }
 
+/*
+ * Class to represent object as the final state of the Job
+ */
+private class IncompleteStateBox(@JvmField val state: Incomplete)
+private fun Any?.boxIncomplete(): Any? = if (this is Incomplete) IncompleteStateBox(this) else this
+internal fun Any?.unboxState(): Any? = (this as? IncompleteStateBox)?.state ?: this
+
 // --------------- helper classes & constants for job implementation
 
 private const val COMPLETING_ALREADY_COMPLETING = 0
@@ -1232,8 +1244,7 @@
     private val continuation: AbstractContinuation<T>
 ) : JobNode<JobSupport>(job) {
     override fun invoke(cause: Throwable?) {
-        val state = job.state
-        check(state !is Incomplete)
+        val state = job.state.unboxState()
         if (state is CompletedExceptionally) {
             // Resume with exception in atomic way to preserve exception
             continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
diff --git a/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt b/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt
index a46fe4a..b840fc2 100644
--- a/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt
+++ b/common/kotlinx-coroutines-core-common/src/intrinsics/Undispatched.kt
@@ -123,7 +123,7 @@
     return when {
         result === COROUTINE_SUSPENDED -> COROUTINE_SUSPENDED
         makeCompletingOnce(result, MODE_IGNORE) -> {
-            val state = state
+            val state = state.unboxState()
             if (state is CompletedExceptionally) {
                 when {
                     shouldThrow(state.cause) -> throw state.cause
diff --git a/common/kotlinx-coroutines-core-common/test/AsyncTest.kt b/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
index cdde2e3..44f6dda 100644
--- a/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/AsyncTest.kt
@@ -239,4 +239,16 @@
             finish(3)
         }
     }
+
+    @Test
+    fun testIncompleteAsyncState() = runTest {
+        val job = async {
+            coroutineContext[Job]!!.invokeOnCompletion {  }
+        }
+
+        job.await().dispose()
+        assertTrue(job.isCompleted)
+        assertFalse(job.isActive)
+        assertFalse(job.isCancelled)
+    }
 }
diff --git a/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt b/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
index 915a724..4339ae0 100644
--- a/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/CoroutineScopeTest.kt
@@ -256,6 +256,20 @@
         assertSame(Dispatchers.Unconfined, scopePlusContext(Dispatchers.Unconfined, Dispatchers.Unconfined))
     }
 
+    @Test
+    fun testIncompleteScopeState() = runTest {
+        lateinit var scopeJob: Job
+        coroutineScope {
+            scopeJob = coroutineContext[Job]!!
+            scopeJob.invokeOnCompletion { }
+        }
+
+        scopeJob.join()
+        assertTrue(scopeJob.isCompleted)
+        assertFalse(scopeJob.isActive)
+        assertFalse(scopeJob.isCancelled)
+    }
+
     private fun scopePlusContext(c1: CoroutineContext, c2: CoroutineContext) =
         (ContextScope(c1) + c2).coroutineContext
 }
diff --git a/common/kotlinx-coroutines-core-common/test/JobTest.kt b/common/kotlinx-coroutines-core-common/test/JobTest.kt
index be9760c..d6fadbe 100644
--- a/common/kotlinx-coroutines-core-common/test/JobTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/JobTest.kt
@@ -2,6 +2,8 @@
  * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
  */
 
+@file:Suppress("DEPRECATION")
+
 package kotlinx.coroutines
 
 import kotlin.test.*
@@ -205,4 +207,30 @@
         assertTrue(job.isCancelled)
         assertTrue(parent.isCancelled)
     }
+
+    @Test
+    fun testIncompleteJobState() = runTest {
+        val job = launch {
+            coroutineContext[Job]!!.invokeOnCompletion {  }
+        }
+
+        job.join()
+        assertTrue(job.isCompleted)
+        assertFalse(job.isActive)
+        assertFalse(job.isCancelled)
+    }
+
+    @Test
+    fun testChildrenWithIncompleteState() = runTest {
+        val job = async { Wrapper() }
+        job.join()
+        assertTrue(job.children.toList().isEmpty())
+    }
+
+    private class Wrapper : Incomplete {
+        override val isActive: Boolean
+            get() =  error("")
+        override val list: NodeList?
+            get() = error("")
+    }
 }
diff --git a/common/kotlinx-coroutines-core-common/test/WithContextTest.kt b/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
index 80d3d39..b13d9b7 100644
--- a/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/WithContextTest.kt
@@ -52,8 +52,8 @@
         expect(2)
         val result = withContext(coroutineContext) { // same context!
             expect(3) // still here
-            "OK"
-        }
+            "OK".wrap()
+        }.unwrap()
         assertEquals("OK", result)
         expect(4)
         // will wait for the first coroutine
@@ -70,8 +70,8 @@
             expect(3) // still here
             yield() // now yields to launch!
             expect(5)
-            "OK"
-        }
+            "OK".wrap()
+        }.unwrap()
         assertEquals("OK", result)
         finish(6)
     }
@@ -95,7 +95,7 @@
                 } catch (e: CancellationException) {
                     expect(4)
                 }
-                "OK"
+                "OK".wrap()
             }
 
             expectUnreached()
@@ -126,7 +126,7 @@
             } catch (e: CancellationException) {
                 finish(6)
             }
-            "OK"
+            "OK".wrap()
         }
         // still fails, because parent job was cancelled
         expectUnreached()
@@ -240,7 +240,9 @@
                     job!!.cancel() // cancel itself
                     require(job!!.cancel(AssertionError()))
                     require(!isActive)
+                    "OK".wrap()
                 }
+                expectUnreached()
             } catch (e: Throwable) {
                 expect(7)
                 // make sure JCE is thrown
@@ -269,7 +271,9 @@
                     throw TestException()
                 }
                 expect(3)
+                "OK".wrap()
             }
+            expectUnreached()
         } catch (e: TestException) {
             // ensure that we can catch exception outside of the scope
             expect(5)
@@ -287,7 +291,8 @@
                 expect(4) // waits before return
             }
             expect(3)
-        }
+            "OK".wrap()
+        }.unwrap()
         finish(5)
     }
 
@@ -301,7 +306,32 @@
                 expect(4) // waits before return
             }
             expect(3)
-        }
+            "OK".wrap()
+        }.unwrap()
         finish(5)
     }
+
+    @Test
+    fun testIncompleteWithContextState() = runTest {
+        lateinit var ctxJob: Job
+        withContext(wrapperDispatcher(coroutineContext)) {
+            ctxJob = coroutineContext[Job]!!
+            ctxJob.invokeOnCompletion { }
+        }
+
+        ctxJob.join()
+        assertTrue(ctxJob.isCompleted)
+        assertFalse(ctxJob.isActive)
+        assertFalse(ctxJob.isCancelled)
+    }
+
+    private class Wrapper(val value: String) : Incomplete {
+        override val isActive: Boolean
+            get() =  error("")
+        override val list: NodeList?
+            get() = error("")
+    }
+
+    private fun String.wrap() = Wrapper(this)
+    private fun Wrapper.unwrap() = value
 }
diff --git a/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt b/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt
index 3899278..2f6c37f 100644
--- a/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt
+++ b/common/kotlinx-coroutines-core-common/test/WithTimeoutTest.kt
@@ -195,5 +195,19 @@
             finish(4)
         }
     }
-}
 
+    @Test
+    fun testIncompleteWithTimeoutState() = runTest {
+        lateinit var timeoutJob: Job
+        val handle = withTimeout(Long.MAX_VALUE) {
+            timeoutJob = coroutineContext[Job]!!
+            timeoutJob.invokeOnCompletion { }
+        }
+
+        handle.dispose()
+        timeoutJob.join()
+        assertTrue(timeoutJob.isCompleted)
+        assertFalse(timeoutJob.isActive)
+        assertFalse(timeoutJob.isCancelled)
+    }
+}
diff --git a/core/kotlinx-coroutines-core/src/Builders.kt b/core/kotlinx-coroutines-core/src/Builders.kt
index 01c6c98..e3a7562 100644
--- a/core/kotlinx-coroutines-core/src/Builders.kt
+++ b/core/kotlinx-coroutines-core/src/Builders.kt
@@ -81,7 +81,7 @@
         }
         timeSource.unregisterTimeLoopThread()
         // now return result
-        val state = this.state
+        val state = this.state.unboxState()
         (state as? CompletedExceptionally)?.let { throw it.cause }
         return state as T
     }
diff --git a/core/kotlinx-coroutines-core/test/RunBlockingTest.kt b/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
index 55e6e40..d21a9f8 100644
--- a/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
+++ b/core/kotlinx-coroutines-core/test/RunBlockingTest.kt
@@ -152,4 +152,14 @@
 
         assertEquals(1, value)
     }
+
+    @Test
+    fun testIncompleteState() {
+        val handle = runBlocking {
+            // See #835
+            coroutineContext[Job]!!.invokeOnCompletion {  }
+        }
+
+        handle.dispose()
+    }
 }