blob: cb065af63beb30830a35fed9f2af89c84dd7c42b [file] [log] [blame]
Roman Elizarov9d61b3e2017-04-19 18:32:11 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental
18
19import org.hamcrest.core.IsEqual
20import org.junit.After
21import org.junit.Assert
22import org.junit.Test
23import java.util.concurrent.ExecutorService
24import java.util.concurrent.Executors
25import java.util.concurrent.ThreadFactory
Roman Elizarovca9d5be2017-04-20 19:23:18 +030026import java.util.concurrent.atomic.AtomicInteger
Roman Elizarov9d61b3e2017-04-19 18:32:11 +030027import kotlin.coroutines.experimental.CoroutineContext
28
29class WithTimeoutThreadDispatchTest : TestBase() {
30 var executor: ExecutorService? = null
31
32 @After
33 fun tearDown() {
34 executor?.shutdown()
35 }
36
37 @Test
38 fun testCancellationDispatchScheduled() {
39 checkCancellationDispatch {
40 executor = Executors.newScheduledThreadPool(1, it)
41 executor!!.asCoroutineDispatcher()
42 }
43 }
44
45 @Test
46 fun testCancellationDispatchNonScheduled() {
47 checkCancellationDispatch {
48 executor = Executors.newSingleThreadExecutor(it)
49 executor!!.asCoroutineDispatcher()
50 }
51 }
52
53 @Test
54 fun testCancellationDispatchCustomNoDelay() {
Roman Elizarovca9d5be2017-04-20 19:23:18 +030055 // it also checks that there is at most once scheduled request in flight (no spurious concurrency)
56 var error: String? = null
Roman Elizarov9d61b3e2017-04-19 18:32:11 +030057 checkCancellationDispatch {
58 executor = Executors.newSingleThreadExecutor(it)
Roman Elizarovca9d5be2017-04-20 19:23:18 +030059 val scheduled = AtomicInteger(0)
Roman Elizarov9d61b3e2017-04-19 18:32:11 +030060 object : CoroutineDispatcher() {
61 override fun dispatch(context: CoroutineContext, block: Runnable) {
Roman Elizarovca9d5be2017-04-20 19:23:18 +030062 if (scheduled.incrementAndGet() > 1) error = "Two requests are scheduled concurrently"
63 executor!!.execute {
64 scheduled.decrementAndGet()
65 block.run()
66 }
Roman Elizarov9d61b3e2017-04-19 18:32:11 +030067 }
68 }
69 }
Roman Elizarovca9d5be2017-04-20 19:23:18 +030070 error?.let { error(it) }
Roman Elizarov9d61b3e2017-04-19 18:32:11 +030071 }
72
73 private fun checkCancellationDispatch(factory: (ThreadFactory) -> CoroutineDispatcher) = runBlocking {
74 expect(1)
75 var thread: Thread? = null
76 val dispatcher = factory(ThreadFactory { Thread(it).also { thread = it } })
77 run(dispatcher) {
78 expect(2)
79 Assert.assertThat(Thread.currentThread(), IsEqual(thread))
80 try {
81 withTimeout(100) {
82 try {
83 expect(3)
84 delay(1000)
85 expectUnreached()
86 } catch (e: CancellationException) {
87 expect(4)
88 Assert.assertThat(Thread.currentThread(), IsEqual(thread))
Roman Elizarovca9d5be2017-04-20 19:23:18 +030089 throw e // rethrow
Roman Elizarov9d61b3e2017-04-19 18:32:11 +030090 }
Roman Elizarov9d61b3e2017-04-19 18:32:11 +030091 }
92 } catch (e: CancellationException) {
Roman Elizarovca9d5be2017-04-20 19:23:18 +030093 expect(5)
Roman Elizarov9d61b3e2017-04-19 18:32:11 +030094 Assert.assertThat(Thread.currentThread(), IsEqual(thread))
95 }
Roman Elizarovca9d5be2017-04-20 19:23:18 +030096 expect(6)
Roman Elizarov9d61b3e2017-04-19 18:32:11 +030097 }
Roman Elizarovca9d5be2017-04-20 19:23:18 +030098 finish(7)
Roman Elizarov9d61b3e2017-04-19 18:32:11 +030099 }
100}