Release intercepted SafeCollector when onCompletion block is done (#2323)
* Do not use invokeSafely in onCompletion
Co-authored-by: Roman Elizarov <elizarov@gmail.com>
diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
index 3ffe5fe..8be19f0 100644
--- a/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
+++ b/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt
@@ -158,7 +158,12 @@
throw e
}
// Normal completion
- SafeCollector(this, currentCoroutineContext()).invokeSafely(action, null)
+ val sc = SafeCollector(this, currentCoroutineContext())
+ try {
+ sc.action(null)
+ } finally {
+ sc.releaseIntercepted()
+ }
}
/**
diff --git a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
index b275a48..ab42b63 100644
--- a/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
+++ b/kotlinx-coroutines-core/jvm/src/flow/internal/SafeCollector.kt
@@ -55,7 +55,6 @@
*/
override suspend fun emit(value: T) {
return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
- // Update information about caller for stackwalking
try {
emit(uCont, value)
} catch (e: Throwable) {
diff --git a/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt b/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt
new file mode 100644
index 0000000..a6268b5
--- /dev/null
+++ b/kotlinx-coroutines-core/jvm/test/flow/OnCompletionInterceptedReleaseTest.kt
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
+ */
+
+package kotlinx.coroutines.flow
+
+import kotlinx.coroutines.*
+import org.junit.Test
+import kotlin.coroutines.*
+import kotlin.test.*
+
+class OnCompletionInterceptedReleaseTest : TestBase() {
+ @Test
+ fun testLeak() = runTest {
+ expect(1)
+ var cont: Continuation<Unit>? = null
+ val interceptor = CountingInterceptor()
+ val job = launch(interceptor, start = CoroutineStart.UNDISPATCHED) {
+ emptyFlow<Int>()
+ .onCompletion { emit(1) }
+ .collect { value ->
+ expect(2)
+ assertEquals(1, value)
+ suspendCoroutine { cont = it }
+ }
+ }
+ cont!!.resume(Unit)
+ assertTrue(job.isCompleted)
+ assertEquals(interceptor.intercepted, interceptor.released)
+ finish(3)
+ }
+
+ class CountingInterceptor : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
+ var intercepted = 0
+ var released = 0
+ override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
+ intercepted++
+ return Continuation(continuation.context) { continuation.resumeWith(it) }
+ }
+
+ override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
+ released++
+ }
+ }
+}
\ No newline at end of file