blob: fe958ac63f70c1e1273363372c4a2ce108396cf6 [file] [log] [blame]
Roman Elizarovf106ff32017-05-17 12:22:14 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.guava
18
Roman Elizarov9fe5f462018-02-21 19:05:52 +030019import com.google.common.util.concurrent.*
Roman Elizarovf106ff32017-05-17 12:22:14 +030020import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030021import kotlinx.coroutines.experimental.CancellationException
22import org.hamcrest.core.*
23import org.junit.*
Roman Elizarovf106ff32017-05-17 12:22:14 +030024import org.junit.Assert.*
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +030025import java.io.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030026import java.util.concurrent.*
27import kotlin.coroutines.experimental.*
Roman Elizarovf106ff32017-05-17 12:22:14 +030028
29class ListenableFutureTest : TestBase() {
Roman Elizarov45181042017-07-20 20:37:51 +030030 @Before
31 fun setup() {
32 ignoreLostThreads("ForkJoinPool.commonPool-worker-")
33 }
34
Roman Elizarovf106ff32017-05-17 12:22:14 +030035 @Test
36 fun testSimpleAwait() {
37 val service = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
38 val future = future {
39 service.submit(Callable<String> {
40 "O"
41 }).await() + "K"
42 }
43 assertThat(future.get(), IsEqual("OK"))
44 }
45
46 @Test
Vsevolod Tolstopyatovf2b4e0e2018-05-25 18:58:01 +030047 fun testAwaitWithContext() = runTest {
48 val future = SettableFuture.create<Int>()
49 val deferred = async(coroutineContext) {
50 withContext(CommonPool) {
51 future.await()
52 }
53 }
54
55 future.set(1)
56 assertEquals(1, deferred.await())
57 }
58
59 @Test
60 fun testAwaitWithContextCancellation() = runTest(expected = {it is JobCancellationException}) {
61 val future = SettableFuture.create<Int>()
62 val deferred = async(coroutineContext) {
63 withContext(CommonPool) {
64 future.await()
65 }
66 }
67
68 deferred.cancel(IOException())
69 deferred.await()
70 }
71
72 @Test
Roman Elizarovf106ff32017-05-17 12:22:14 +030073 fun testCompletedFuture() {
74 val toAwait = SettableFuture.create<String>()
75 toAwait.set("O")
76 val future = future {
77 toAwait.await() + "K"
78 }
79 assertThat(future.get(), IsEqual("OK"))
80 }
81
82 @Test
83 fun testWaitForFuture() {
84 val toAwait = SettableFuture.create<String>()
85 val future = future {
86 toAwait.await() + "K"
87 }
88 assertFalse(future.isDone)
89 toAwait.set("O")
90 assertThat(future.get(), IsEqual("OK"))
91 }
92
93 @Test
94 fun testCompletedFutureExceptionally() {
95 val toAwait = SettableFuture.create<String>()
96 toAwait.setException(IllegalArgumentException("O"))
97 val future = future<String> {
98 try {
99 toAwait.await()
100 } catch (e: RuntimeException) {
101 assertThat(e, IsInstanceOf(IllegalArgumentException::class.java))
102 e.message!!
103 } + "K"
104 }
105 assertThat(future.get(), IsEqual("OK"))
106 }
107
108 @Test
109 fun testWaitForFutureWithException() {
110 val toAwait = SettableFuture.create<String>()
111 val future = future<String> {
112 try {
113 toAwait.await()
114 } catch (e: RuntimeException) {
115 assertThat(e, IsInstanceOf(IllegalArgumentException::class.java))
116 e.message!!
117 } + "K"
118 }
119 assertFalse(future.isDone)
120 toAwait.setException(IllegalArgumentException("O"))
121 assertThat(future.get(), IsEqual("OK"))
122 }
123
124 @Test
125 fun testExceptionInsideCoroutine() {
126 val service = MoreExecutors.listeningDecorator(ForkJoinPool.commonPool())
127 val future = future {
128 if (service.submit(Callable<Boolean> { true }).await()) {
129 throw IllegalStateException("OK")
130 }
131 "fail"
132 }
133 try {
134 future.get()
135 fail("'get' should've throw an exception")
136 } catch (e: ExecutionException) {
137 assertThat(e.cause, IsInstanceOf(IllegalStateException::class.java))
138 assertThat(e.cause!!.message, IsEqual("OK"))
139 }
140 }
141
142 @Test
143 fun testCompletedDeferredAsListenableFuture() = runBlocking {
144 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +0300145 val deferred = async(coroutineContext, CoroutineStart.UNDISPATCHED) {
Roman Elizarovf106ff32017-05-17 12:22:14 +0300146 expect(2) // completed right away
147 "OK"
148 }
149 expect(3)
150 val future = deferred.asListenableFuture()
151 assertThat(future.await(), IsEqual("OK"))
152 finish(4)
153 }
154
155 @Test
156 fun testWaitForDeferredAsListenableFuture() = runBlocking {
157 expect(1)
Roman Elizarov43e3af72017-07-21 16:01:31 +0300158 val deferred = async(coroutineContext) {
Roman Elizarovf106ff32017-05-17 12:22:14 +0300159 expect(3) // will complete later
160 "OK"
161 }
162 expect(2)
163 val future = deferred.asListenableFuture()
164 assertThat(future.await(), IsEqual("OK")) // await yields main thread to deferred coroutine
165 finish(4)
166 }
167
168 @Test
169 fun testCancellableAwait() = runBlocking {
170 expect(1)
171 val toAwait = SettableFuture.create<String>()
Roman Elizarov43e3af72017-07-21 16:01:31 +0300172 val job = launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
Roman Elizarovf106ff32017-05-17 12:22:14 +0300173 expect(2)
174 try {
175 toAwait.await() // suspends
176 } catch (e: CancellationException) {
177 expect(5) // should throw cancellation exception
178 throw e
179 }
180 }
181 expect(3)
182 job.cancel() // cancel the job
183 toAwait.set("fail") // too late, the waiting job was already cancelled
184 expect(4) // job processing of cancellation was scheduled, not executed yet
185 yield() // yield main thread to job
186 finish(6)
187 }
188}