blob: 7d20709f4e6317a2dd222f99dee02e5bab732d3b [file] [log] [blame]
Roman Elizarovf16fd272017-02-07 11:26:00 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Roman Elizarovaa461cf2018-04-11 13:20:29 +030017@file:JvmMultifileClass
18@file:JvmName("BuildersKt")
Roman Elizarovf161c9f2018-02-07 23:27:10 +030019
Roman Elizarov3754f952017-01-18 20:47:54 +030020package kotlinx.coroutines.experimental
21
Roman Elizarov2adf8bc2018-01-24 20:09:57 +030022import java.util.concurrent.locks.*
Roman Elizarovea4a51b2017-01-31 12:01:25 +030023import kotlin.coroutines.experimental.*
Roman Elizarov3754f952017-01-18 20:47:54 +030024
25/**
Roman Elizarov32d95322017-02-09 15:57:31 +030026 * Runs new coroutine and **blocks** current thread _interruptibly_ until its completion.
Roman Elizarovd528e3e2017-01-23 15:40:05 +030027 * This function should not be used from coroutine. It is designed to bridge regular blocking code
28 * to libraries that are written in suspending style, to be used in `main` functions and in tests.
29 *
30 * The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations
31 * in this blocked thread until the completion of this coroutine.
32 * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
Roman Elizarov3754f952017-01-18 20:47:54 +030033 *
Roman Elizarovcb787872018-01-29 18:07:21 +030034 * When [CoroutineDispatcher] is explicitly specified in the [context], then the new coroutine runs in the context of
35 * the specified dispatcher while the current thread is blocked. If the specified dispatcher implements [EventLoop]
36 * interface and this `runBlocking` invocation is performed from inside of the this event loop's thread, then
37 * this event loop is processed using its [processNextEvent][EventLoop.processNextEvent] method until coroutine completes.
38 *
Roman Elizarov3754f952017-01-18 20:47:54 +030039 * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
40 * this `runBlocking` invocation throws [InterruptedException].
41 *
42 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
Roman Elizarovc0d559b2017-09-28 14:27:05 +030043 *
44 * @param context context of the coroutine. The default value is an implementation of [EventLoop].
45 * @param block the coroutine code.
Roman Elizarov3754f952017-01-18 20:47:54 +030046 */
47@Throws(InterruptedException::class)
Roman Elizarovbddb1d72017-12-25 17:16:08 +030048public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
Roman Elizarovd528e3e2017-01-23 15:40:05 +030049 val currentThread = Thread.currentThread()
Roman Elizarovcb787872018-01-29 18:07:21 +030050 val contextInterceptor = context[ContinuationInterceptor]
51 val privateEventLoop = contextInterceptor == null // create private event loop if no dispatcher is specified
52 val eventLoop = if (privateEventLoop) BlockingEventLoop(currentThread) else contextInterceptor as? EventLoop
53 val newContext = newCoroutineContext(
54 if (privateEventLoop) context + (eventLoop as ContinuationInterceptor) else context
55 )
56 val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop, privateEventLoop)
Roman Elizarov2adf8bc2018-01-24 20:09:57 +030057 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
Roman Elizarovd528e3e2017-01-23 15:40:05 +030058 return coroutine.joinBlocking()
59}
Roman Elizarov3754f952017-01-18 20:47:54 +030060
Roman Elizarovd528e3e2017-01-23 15:40:05 +030061private class BlockingCoroutine<T>(
Roman Elizarov2b12d582017-06-22 20:12:19 +030062 parentContext: CoroutineContext,
Roman Elizarov4518e052017-03-02 22:48:27 +030063 private val blockedThread: Thread,
Roman Elizarovcb787872018-01-29 18:07:21 +030064 private val eventLoop: EventLoop?,
Roman Elizarov4518e052017-03-02 22:48:27 +030065 private val privateEventLoop: Boolean
Roman Elizarov2b12d582017-06-22 20:12:19 +030066) : AbstractCoroutine<T>(parentContext, true) {
Roman Elizarov4518e052017-03-02 22:48:27 +030067 init {
Roman Elizarov8b38fa22017-09-27 17:44:31 +030068 if (privateEventLoop) require(eventLoop is BlockingEventLoop)
Roman Elizarov4518e052017-03-02 22:48:27 +030069 }
70
Roman Elizarovebc88662018-01-24 23:58:56 +030071 override fun onCancellationInternal(exceptionally: CompletedExceptionally?) {
Roman Elizarov8b38fa22017-09-27 17:44:31 +030072 // wake up blocked thread
Roman Elizarova4becc92017-01-24 11:45:50 +030073 if (Thread.currentThread() != blockedThread)
74 LockSupport.unpark(blockedThread)
Roman Elizarov3754f952017-01-18 20:47:54 +030075 }
76
77 @Suppress("UNCHECKED_CAST")
78 fun joinBlocking(): T {
Roman Elizarov35d2c342017-07-20 14:54:39 +030079 timeSource.registerTimeLoopThread()
Roman Elizarov4518e052017-03-02 22:48:27 +030080 while (true) {
Roman Elizarov3754f952017-01-18 20:47:54 +030081 if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
Roman Elizarov4518e052017-03-02 22:48:27 +030082 val parkNanos = eventLoop?.processNextEvent() ?: Long.MAX_VALUE
Roman Elizarov35d2c342017-07-20 14:54:39 +030083 // note: process next even may loose unpark flag, so check if completed before parking
Roman Elizarov7400ff02017-07-10 17:56:41 +030084 if (isCompleted) break
Roman Elizarov35d2c342017-07-20 14:54:39 +030085 timeSource.parkNanos(this, parkNanos)
Roman Elizarov3754f952017-01-18 20:47:54 +030086 }
Roman Elizarov4518e052017-03-02 22:48:27 +030087 // process queued events (that could have been added after last processNextEvent and before cancel
Roman Elizarov0d35c852017-10-07 23:19:22 +030088 if (privateEventLoop) (eventLoop as BlockingEventLoop).apply {
89 // We exit the "while" loop above when this coroutine's state "isCompleted",
90 // Here we should signal that BlockingEventLoop should not accept any more tasks
91 isCompleted = true
92 shutdown()
93 }
Roman Elizarov35d2c342017-07-20 14:54:39 +030094 timeSource.unregisterTimeLoopThread()
Roman Elizarovd528e3e2017-01-23 15:40:05 +030095 // now return result
Roman Elizarovee7c0eb2017-02-16 15:29:28 +030096 val state = this.state
Roman Elizarov3754f952017-01-18 20:47:54 +030097 (state as? CompletedExceptionally)?.let { throw it.exception }
98 return state as T
99 }
Roman Elizarov3754f952017-01-18 20:47:54 +0300100}