Reusable continuation leak (#1858)
Detect suspendCancellableCoroutine right after suspendCancellableCoroutineReusable within the same state machine and properly cleanup its child handle when its block completes
Fixes #1855
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt
index 62cc2e5..2573d30 100644
--- a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/FlowPlaysScrabbleOpt.kt
@@ -6,10 +6,11 @@
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.flow.Flow
import org.openjdk.jmh.annotations.*
-import java.lang.Long.*
import java.util.*
import java.util.concurrent.*
+import kotlin.math.*
@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
diff --git a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt
index 87d0e61..fa944fa 100644
--- a/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt
+++ b/benchmarks/src/jmh/kotlin/benchmarks/flow/scrabble/SequencePlaysScrabble.kt
@@ -5,10 +5,11 @@
package benchmarks.flow.scrabble
import kotlinx.coroutines.*
+import kotlinx.coroutines.flow.*
import org.openjdk.jmh.annotations.*
import java.lang.Long.*
import java.util.*
-import java.util.concurrent.*
+import java.util.concurrent.TimeUnit
@Warmup(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 7, time = 1, timeUnit = TimeUnit.SECONDS)
diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
index 0cc9b57..1f67dd3 100644
--- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
+++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt
@@ -85,12 +85,13 @@
// This method does nothing. Leftover for binary compatibility with old compiled code
}
- private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable
+ private fun isReusable(): Boolean = delegate is DispatchedContinuation<*> && delegate.isReusable(this)
/**
* Resets cancellability state in order to [suspendAtomicCancellableCoroutineReusable] to work.
* Invariant: used only by [suspendAtomicCancellableCoroutineReusable] in [REUSABLE_CLAIMED] state.
*/
+ @JvmName("resetState") // Prettier stack traces
internal fun resetState(): Boolean {
assert { parentHandle !== NonDisposableHandle }
val state = _state.value
diff --git a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt
index acb6c48..5075814 100644
--- a/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt
+++ b/kotlinx-coroutines-core/common/src/internal/DispatchedContinuation.kt
@@ -63,8 +63,24 @@
public val reusableCancellableContinuation: CancellableContinuationImpl<*>?
get() = _reusableCancellableContinuation.value as? CancellableContinuationImpl<*>
- public val isReusable: Boolean
- get() = _reusableCancellableContinuation.value != null
+ public fun isReusable(requester: CancellableContinuationImpl<*>): Boolean {
+ /*
+ * Reusability control:
+ * `null` -> no reusability at all, false
+ * If current state is not CCI, then we are within `suspendAtomicCancellableCoroutineReusable`, true
+ * Else, if result is CCI === requester.
+ * Identity check my fail for the following pattern:
+ * ```
+ * loop:
+ * suspendAtomicCancellableCoroutineReusable { } // Reusable, outer coroutine stores the child handle
+ * suspendCancellableCoroutine { } // **Not reusable**, handle should be disposed after {}, otherwise
+ * it will leak because it won't be freed by `releaseInterceptedContinuation`
+ * ```
+ */
+ val value = _reusableCancellableContinuation.value ?: return false
+ if (value is CancellableContinuationImpl<*>) return value === requester
+ return true
+ }
/**
* Claims the continuation for [suspendAtomicCancellableCoroutineReusable] block,
diff --git a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
index 997f746..5f5620c 100644
--- a/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
+++ b/kotlinx-coroutines-core/jvm/test/ReusableCancellableContinuationTest.kt
@@ -192,4 +192,17 @@
FieldWalker.assertReachableCount(0, receiver) { it is CancellableContinuation<*> }
finish(3)
}
+
+ @Test
+ fun testReusableAndRegularSuspendCancellableCoroutineMemoryLeak() = runTest {
+ val channel = produce {
+ repeat(10) {
+ send(Unit)
+ }
+ }
+ for (value in channel) {
+ delay(1)
+ }
+ FieldWalker.assertReachableCount(1, coroutineContext[Job], { it is ChildContinuation })
+ }
}