blob: bd5db8e46baeaba393207ba1a3d2464056348297 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.reactive
6
Roman Elizarov9fe5f462018-02-21 19:05:52 +03007import kotlinx.coroutines.experimental.*
8import org.hamcrest.core.*
9import org.junit.*
10import org.junit.Assert.*
11import org.reactivestreams.*
12import kotlin.coroutines.experimental.*
Roman Elizarov331750b2017-02-15 17:59:17 +030013
14class PublishTest : TestBase() {
15 @Test
16 fun testBasicEmpty() = runBlocking<Unit> {
17 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030018 val publisher = publish<Int>(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030019 expect(5)
20 }
21 expect(2)
22 publisher.subscribe(object : Subscriber<Int> {
23 override fun onSubscribe(s: Subscription?) { expect(3) }
24 override fun onNext(t: Int?) { expectUnreached() }
25 override fun onComplete() { expect(6) }
26 override fun onError(t: Throwable?) { expectUnreached() }
27 })
28 expect(4)
29 yield() // to publish coroutine
30 finish(7)
31 }
32
33 @Test
34 fun testBasicSingle() = runBlocking<Unit> {
35 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030036 val publisher = publish<Int>(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030037 expect(5)
38 send(42)
39 expect(7)
40 }
41 expect(2)
42 publisher.subscribe(object : Subscriber<Int> {
43 override fun onSubscribe(s: Subscription) {
44 expect(3)
45 s.request(1)
46 }
47 override fun onNext(t: Int) {
48 expect(6)
49 assertThat(t, IsEqual(42))
50 }
51 override fun onComplete() { expect(8) }
52 override fun onError(t: Throwable?) { expectUnreached() }
53 })
54 expect(4)
55 yield() // to publish coroutine
56 finish(9)
57 }
58
59 @Test
60 fun testBasicError() = runBlocking<Unit> {
61 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +030062 val publisher = publish<Int>(coroutineContext) {
Roman Elizarov331750b2017-02-15 17:59:17 +030063 expect(5)
64 throw RuntimeException("OK")
65 }
66 expect(2)
67 publisher.subscribe(object : Subscriber<Int> {
68 override fun onSubscribe(s: Subscription) {
69 expect(3)
70 s.request(1)
71 }
72 override fun onNext(t: Int) { expectUnreached() }
73 override fun onComplete() { expectUnreached() }
74 override fun onError(t: Throwable) {
75 expect(6)
76 assertThat(t, IsInstanceOf(RuntimeException::class.java))
77 assertThat(t.message, IsEqual("OK"))
78 }
79 })
80 expect(4)
81 yield() // to publish coroutine
82 finish(7)
83 }
84}