ArrayBroadcastChannel.send should forget values immediately w/o subs
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
index 0e78f9b..3ed8a8b 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannel.kt
@@ -24,7 +24,7 @@
/**
* Broadcast channel with array buffer of a fixed [capacity].
- * Sender suspends only when buffer is fully due to one of the receives not being late and
+ * Sender suspends only when buffer is full due to one of the receives being slow to consume and
* receiver suspends only when buffer is empty.
*
* Note, that elements that are sent to the broadcast channel while there are no [open] subscribers are immediately
@@ -128,11 +128,13 @@
private fun checkSubOffers() {
var updated = false
+ var hasSubs = false
@Suppress("LoopToCallChain") // must invoke `checkOffer` on every sub
for (sub in subs) {
+ hasSubs = true
if (sub.checkOffer()) updated = true
}
- if (updated)
+ if (updated || !hasSubs)
updateHead()
}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
index bc9e533..9beaa74 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ArrayBroadcastChannelTest.kt
@@ -19,7 +19,7 @@
import kotlinx.coroutines.experimental.*
import org.hamcrest.core.IsEqual
import org.hamcrest.core.IsNull
-import org.junit.Assert.*
+import org.junit.Assert.assertThat
import org.junit.Test
class ArrayBroadcastChannelTest : TestBase() {
@@ -112,4 +112,24 @@
assertThat(sub.isClosedForReceive, IsEqual(true))
finish(7)
}
+
+ @Test
+ fun testForgetUnsubscribed() = runBlocking {
+ expect(1)
+ val broadcast = ArrayBroadcastChannel<Int>(1)
+ broadcast.send(1)
+ broadcast.send(2)
+ broadcast.send(3)
+ expect(2) // should not suspend anywhere above
+ val sub = broadcast.open()
+ launch(context, CoroutineStart.UNDISPATCHED) {
+ expect(3)
+ assertThat(sub.receive(), IsEqual(4)) // suspends
+ expect(5)
+ }
+ expect(4)
+ broadcast.send(4) // sends
+ yield()
+ finish(6)
+ }
}
\ No newline at end of file