Release intercepted SafeCollector when onCompletion block is done (#2323)

* Do not use invokeSafely in onCompletion

Co-authored-by: Roman Elizarov <elizarov@gmail.com>
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
index 3ffe5fe..8be19f0 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
@@ -158,7 +158,12 @@
         throw e
     }
     // Normal completion
-    SafeCollector(this, currentCoroutineContext()).invokeSafely(action, null)
+    val sc = SafeCollector(this, currentCoroutineContext())
+    try {
+        sc.action(null)
+    } finally {
+        sc.releaseIntercepted()
+    }
 }
 
 /**
diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
index b275a48..ab42b63 100644
--- a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
+++ b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
@@ -55,7 +55,6 @@
      */
     override suspend fun emit(value: T) {
         return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
-            // Update information about caller for stackwalking
             try {
                 emit(uCont, value)
             } catch (e: Throwable) {
diff --git a/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt b/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt
new file mode 100644
index 0000000..a6268b5
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import org.junit.Test
+import kotlin.coroutines.*
+import kotlin.test.*
+
+class OnCompletionInterceptedReleaseTest : TestBase() {
+    @Test
+    fun testLeak() = runTest {
+        expect(1)
+        var cont: Continuation<Unit>? = null
+        val interceptor = CountingInterceptor()
+        val job = launch(interceptor, start = CoroutineStart.UNDISPATCHED) {
+            emptyFlow<Int>()
+                .onCompletion { emit(1) }
+                .collect { value ->
+                    expect(2)
+                    assertEquals(1, value)
+                    suspendCoroutine { cont = it }
+                }
+        }
+        cont!!.resume(Unit)
+        assertTrue(job.isCompleted)
+        assertEquals(interceptor.intercepted, interceptor.released)
+        finish(3)
+    }
+
+    class CountingInterceptor : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
+        var intercepted = 0
+        var released = 0
+        override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
+            intercepted++
+            return Continuation(continuation.context) { continuation.resumeWith(it) }
+        }
+
+        override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
+            released++
+        }
+    }
+}
\ No newline at end of file