blob: 5d5d2f87a6eda0405aadcc317ccb23485c1ed661 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.reactive
6
Roman Elizarov9fe5f462018-02-21 19:05:52 +03007import kotlinx.coroutines.experimental.*
8import org.junit.*
9import org.reactivestreams.*
10import kotlin.coroutines.experimental.*
Roman Elizarov331750b2017-02-15 17:59:17 +030011
12class PublisherBackpressureTest : TestBase() {
13 @Test
14 fun testCancelWhileBPSuspended() = runBlocking<Unit> {
15 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030016 val observable = publish(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030017 expect(5)
18 send("A") // will not suspend, because an item was requested
19 expect(7)
20 send("B") // second requested item
21 expect(9)
22 try {
23 send("C") // will suspend (no more requested)
24 } finally {
Roman Elizarov58246ca2017-07-12 12:58:44 +030025 expect(13)
Roman Elizarov331750b2017-02-15 17:59:17 +030026 }
27 expectUnreached()
28 }
29 expect(2)
30 var sub: Subscription? = null
31 observable.subscribe(object : Subscriber<String> {
32 override fun onSubscribe(s: Subscription) {
33 sub = s
34 expect(3)
35 s.request(2) // request two items
36 }
37
38 override fun onNext(t: String) {
39 when (t) {
40 "A" -> expect(6)
41 "B" -> expect(8)
42 else -> error("Should not happen")
43 }
44 }
45
46 override fun onComplete() {
Roman Elizarov58246ca2017-07-12 12:58:44 +030047 expect(11)
Roman Elizarov331750b2017-02-15 17:59:17 +030048 }
49
50 override fun onError(e: Throwable) {
51 expectUnreached()
52 }
53 })
54 expect(4)
55 yield() // yield to observable coroutine
56 expect(10)
Roman Elizarov58246ca2017-07-12 12:58:44 +030057 sub!!.cancel() // now unsubscribe -- shall cancel coroutine & immediately signal onComplete
58 expect(12)
Roman Elizarov7a7b0092017-06-26 14:25:56 +030059 yield() // shall perform finally in coroutine & report onComplete
Roman Elizarov331750b2017-02-15 17:59:17 +030060 finish(14)
61 }
62}