EventLoop is integrated as runBlocking default and is used for tests, coroutine builders provide CoroutineScope with context
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
index 0a0409a..b112462 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -1,10 +1,7 @@
package kotlinx.coroutines.experimental
import java.util.concurrent.locks.LockSupport
-import kotlin.coroutines.Continuation
-import kotlin.coroutines.CoroutineContext
-import kotlin.coroutines.startCoroutine
-import kotlin.coroutines.suspendCoroutine
+import kotlin.coroutines.*
// --------------- basic coroutine builders ---------------
@@ -21,8 +18,8 @@
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*/
-fun launch(context: CoroutineContext, block: suspend () -> Unit): Job =
- StandaloneCoroutine(newCoroutineContext(context)).also { block.startCoroutine(it) }
+fun launch(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit): Job =
+ StandaloneCoroutine(newCoroutineContext(context)).also { block.startCoroutine(it, it) }
/**
* Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
@@ -30,20 +27,21 @@
* different thread inside the block, and back when it completes.
* The specified [context] is merged onto the current coroutine context.
*/
-public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
+public suspend fun <T> run(context: CoroutineContext, block: suspend CoroutineScope.() -> T): T =
suspendCoroutine { cont ->
- block.startCoroutine(object : Continuation<T> by cont {
- override val context: CoroutineContext = cont.context + context
- })
+ // new don't invoke `newCoroutineContext`, but consider this being the same coroutine in the new context
+ InnerCoroutine(cont.context + context, cont).also { block.startCoroutine(it, it) }
}
/**
* Runs new coroutine and *blocks* current thread *interruptibly* until its completion.
- * This function should not be used from coroutine. It is designed to bridge regular code blocking code
- * to libraries that are written in suspending style.
- * The [context] for the new coroutine must be explicitly specified and must include [CoroutineDispatcher] element.
- * See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
- * The specified context is added to the context of the parent running coroutine (if any) inside which this function
+ * This function should not be used from coroutine. It is designed to bridge regular blocking code
+ * to libraries that are written in suspending style, to be used in `main` functions and in tests.
+ *
+ * The default [CoroutineDispatcher] for this builder in an implementation of [EventLoop] that processes continuations
+ * in this blocked thread until the completion of this coroutine.
+ * See [CoroutineDispatcher] for the other implementations that are provided by `kotlinx.coroutines`.
+ * The specified [context] is added to the context of the parent running coroutine (if any) inside which this function
* is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
*
* If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
@@ -52,26 +50,45 @@
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*/
@Throws(InterruptedException::class)
-public fun <T> runBlocking(context: CoroutineContext, block: suspend () -> T): T =
- BlockingCoroutine<T>(newCoroutineContext(context)).also { block.startCoroutine(it) }.joinBlocking()
+public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
+ val currentThread = Thread.currentThread()
+ val privateEventLoop = if (context[ContinuationInterceptor] as? CoroutineDispatcher == null)
+ EventLoopImpl(currentThread) else null
+ val newContext = newCoroutineContext(context + (privateEventLoop ?: EmptyCoroutineContext))
+ val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop != null)
+ privateEventLoop?.initParentJob(coroutine)
+ block.startCoroutine(coroutine, coroutine)
+ return coroutine.joinBlocking()
+}
// --------------- implementation ---------------
private class StandaloneCoroutine(
- val parentContext: CoroutineContext
-) : AbstractCoroutine<Unit>(parentContext) {
- init { initParentJob(parentContext[Job]) }
+ val newContext: CoroutineContext
+) : AbstractCoroutine<Unit>(newContext) {
+ init { initParentJob(newContext[Job]) }
override fun afterCompletion(state: Any?) {
- // note the use of the parent context below!
- if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.cancelReason)
+ // note the use of the parent's job context below!
+ if (state is CompletedExceptionally) handleCoroutineException(newContext, state.cancelReason)
}
}
-private class BlockingCoroutine<T>(parentContext: CoroutineContext) : AbstractCoroutine<T>(parentContext) {
- val blockedThread: Thread = Thread.currentThread()
+private class InnerCoroutine<T>(
+ override val context: CoroutineContext,
+ continuation: Continuation<T>
+) : Continuation<T> by continuation, CoroutineScope {
+ override val isActive: Boolean = context[Job]?.isActive ?: true
+}
- init { initParentJob(parentContext[Job]) }
+private class BlockingCoroutine<T>(
+ newContext: CoroutineContext,
+ val blockedThread: Thread,
+ val hasPrivateEventLoop: Boolean
+) : AbstractCoroutine<T>(newContext) {
+ val eventLoop: EventLoop? = newContext[ContinuationInterceptor] as? EventLoop
+
+ init { initParentJob(newContext[Job]) }
override fun afterCompletion(state: Any?) {
LockSupport.unpark(blockedThread)
@@ -81,8 +98,14 @@
fun joinBlocking(): T {
while (isActive) {
if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
- LockSupport.park(this)
+ if (eventLoop == null || !eventLoop.processNextEvent())
+ LockSupport.park(this)
}
+ // process remaining events (that could have been added after last processNextEvent and before cancel
+ if (hasPrivateEventLoop) {
+ while (eventLoop!!.processNextEvent()) { /* just spin */ }
+ }
+ // now return result
val state = getState()
(state as? CompletedExceptionally)?.let { throw it.exception }
return state as T
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
index 3089212..36d3cbf 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
@@ -11,7 +11,7 @@
* coroutine resumption is dispatched as a separate task even when it already executes inside the pool.
* When available, it wraps [ForkJoinPool.commonPool] and provides a similar shared pool where not.
*/
-object CommonPool : CoroutineDispatcher() {
+object CommonPool : CoroutineDispatcher(), Yield {
private val pool: Executor = findPool()
private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
@@ -37,4 +37,8 @@
override fun isDispatchNeeded(): Boolean = true
override fun dispatch(block: Runnable) = pool.execute(block)
+
+ override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
+ pool.execute { continuation.resume(Unit) }
+ }
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
similarity index 70%
rename from kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
rename to kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
index 631c06a..3629f2d 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/AbstractCoroutine.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineScope.kt
@@ -4,13 +4,29 @@
import kotlin.coroutines.CoroutineContext
/**
+ * Receiver interface for generic coroutine builders, so that the code inside coroutine has a convenient access
+ * to its [context] and cancellation status via [isActive].
+ */
+public interface CoroutineScope {
+ /**
+ * Returns `true` when this coroutine is still active (was not cancelled).
+ */
+ public val isActive: Boolean
+
+ /**
+ * Returns the context of this coroutine.
+ */
+ public val context: CoroutineContext
+}
+
+/**
* Abstract class to simplify writing of coroutine completion objects that
* implements [Continuation] and [Job] interfaces.
* It stores the result of continuation in the state of the job.
*/
@Suppress("LeakingThis")
-public abstract class AbstractCoroutine<in T>(parentContext: CoroutineContext) : JobSupport(), Continuation<T> {
- override val context: CoroutineContext = parentContext + this // merges this job into this context
+public abstract class AbstractCoroutine<in T>(newContext: CoroutineContext) : JobSupport(), Continuation<T>, CoroutineScope {
+ override val context: CoroutineContext = newContext + this // merges this job into this context
final override fun resume(value: T) {
while (true) { // lock-free loop on state
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
index 165e104..0e89f94 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
@@ -3,7 +3,6 @@
import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.ContinuationInterceptor
-import kotlin.coroutines.ContinuationInterceptor.Key
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
@@ -35,7 +34,6 @@
public val currentCoroutineContext: CoroutineContext
get() = CURRENT_CONTEXT.get() ?: throw IllegalStateException("Not inside a coroutine")
-
/**
* Returns the context of the coroutine that this function is invoked in or a specified [default]
* if not invoked inside a coroutine. A [default] must be a singleton [CoroutineDispatcher] element.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index 59f36cc..d1727aa 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -33,13 +33,13 @@
* The specified context is added to the context of the parent running coroutine (if any) inside which this function
* is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
*/
-public fun <T> defer(context: CoroutineContext, block: suspend () -> T) : Deferred<T> =
- DeferredCoroutine<T>(newCoroutineContext(context)).also { block.startCoroutine(it) }
+public fun <T> defer(context: CoroutineContext, block: suspend CoroutineScope.() -> T) : Deferred<T> =
+ DeferredCoroutine<T>(newCoroutineContext(context)).also { block.startCoroutine(it, it) }
private class DeferredCoroutine<T>(
- parentContext: CoroutineContext
-) : AbstractCoroutine<T>(parentContext), Deferred<T> {
- init { initParentJob(parentContext[Job]) }
+ newContext: CoroutineContext
+) : AbstractCoroutine<T>(newContext), Deferred<T> {
+ init { initParentJob(newContext[Job]) }
@Suppress("UNCHECKED_CAST")
suspend override fun await(): T {
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
index 8fc9e1e..bd0a1f4 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -4,7 +4,8 @@
import kotlin.coroutines.ContinuationInterceptor
/**
- * Implemented by [CoroutineDispatcher] implementations that natively support non-blocking [delay] function.
+ * This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support
+ * non-blocking [delay] function.
*/
public interface Delay {
/**
@@ -16,13 +17,13 @@
suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
require(time >= 0) { "Delay time $time cannot be negative" }
if (time <= 0) return // don't delay
- return suspendCancellableCoroutine { resumeAfterDelay(time, unit, it) }
+ return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, unit, it) }
}
/**
- * Resumes a specified continuation after a specified delay.
+ * Schedules resume of a specified [continuation] after a specified delay [time].
*/
- fun resumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>)
+ fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>)
}
/**
@@ -39,7 +40,7 @@
if (time <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
(cont.context[ContinuationInterceptor] as? Delay)?.apply {
- resumeAfterDelay(time, unit, cont)
+ scheduleResumeAfterDelay(time, unit, cont)
return@sc
}
val timeout = scheduledExecutor.schedule({ cont.resume(Unit) }, time, unit)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
index 847725d..66b4cba 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -4,65 +4,91 @@
import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
import java.util.concurrent.locks.LockSupport
import kotlin.coroutines.Continuation
-import kotlin.coroutines.startCoroutine
+/**
+ * Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
+ * be asked to process next event from their event queue. It is used by [runBlocking] to
+ * continue processing events when invoked from the event dispatch thread.
+ */
public interface EventLoop {
- public val thisEventLoop: CoroutineDispatcher
- public suspend fun yield()
+ /**
+ * Processes next event in this event loop and returns `true` or returns `false` if there are
+ * no events to process or when invoked from the wrong thread.
+ */
+ public fun processNextEvent(): Boolean
+
+ public companion object Factory {
+ /**
+ * Creates a new event loop that is bound the specified [thread] (current thread by default) and
+ * stops accepting new events when [parentJob] completes. Every continuation that is scheduled
+ * onto this event loop unparks the specified thread via [LockSupport.unpark].
+ *
+ * The main event-processing loop using the resulting `eventLoop` object should look like this:
+ * ```
+ * while (needsToBeRunning) {
+ * if (Thread.interrupted()) break // or handle somehow
+ * if (!eventLoop.processNextEvent()) LockSupport.park() // event loop will unpark
+ * }
+ * ```
+ */
+ public operator fun invoke(thread: Thread = Thread.currentThread(), parentJob: Job? = null): CoroutineDispatcher =
+ EventLoopImpl(thread).apply {
+ if (parentJob != null) initParentJob(parentJob)
+ }
+ }
}
-@Throws(InterruptedException::class)
-public fun <T> runEventLoop(block: suspend EventLoop.() -> T): T =
- EventLoopImpl<T>().also { block.startCoroutine(it, it.coroutine) }.coroutine.joinBlocking()
-
-private class EventLoopImpl<T> : CoroutineDispatcher(), EventLoop {
- val thread: Thread = Thread.currentThread()
+internal class EventLoopImpl(
+ val thread: Thread
+) : CoroutineDispatcher(), EventLoop, Yield {
val queue = LockFreeLinkedListHead()
- val coroutine = Coroutine()
+ var parentJob: Job? = null
- public override val thisEventLoop: CoroutineDispatcher = this
-
- public override suspend fun yield(): Unit = suspendCancellableCoroutine { cont ->
- val node = Resume(cont)
- schedule(node)
- cont.removeOnCompletion(node)
+ fun initParentJob(coroutine: Job) {
+ require(this.parentJob == null)
+ this.parentJob = coroutine
}
override fun isDispatchNeeded(): Boolean = Thread.currentThread() != thread
override fun dispatch(block: Runnable) {
schedule(Dispatch(block))
- queue.addLast(Dispatch(block))
}
- fun schedule(node: LockFreeLinkedListNode) {
- check(queue.addLastIf(node) { coroutine.isActive }) {
- "EventLoop is already complete... cannot schedule any tasks"
- }
- LockSupport.unpark(thread)
+ override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
+ val node = Resume(continuation)
+ if (schedule(node))
+ continuation.removeOnCompletion(node)
}
- inner class Coroutine : AbstractCoroutine<T>(this@EventLoopImpl) {
- override fun afterCompletion(state: Any?) {
+ fun schedule(node: Node): Boolean {
+ val added = if (parentJob == null) {
+ queue.addLast(node)
+ true
+ } else
+ queue.addLastIf(node) { parentJob!!.isActive }
+ if (added) {
LockSupport.unpark(thread)
+ } else {
+ node.run()
}
-
- @Suppress("UNCHECKED_CAST")
- fun joinBlocking(): T {
- while (isActive) {
- if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
- (queue.removeFirstOrNull() as? Runnable)?.run() ?: LockSupport.park(this)
- }
- check(queue.isEmpty) { "There are still tasks in event loop queue... Stray coroutines?"}
- val state = getState()
- (state as? CompletedExceptionally)?.let { throw it.exception }
- return state as T
- }
+ return added
}
- class Dispatch(block: Runnable) : LockFreeLinkedListNode(), Runnable by block
+ override fun processNextEvent(): Boolean {
+ if (Thread.currentThread() != thread) return false
+ (queue.removeFirstOrNull() as? Runnable)?.apply {
+ run()
+ return true
+ }
+ return false
+ }
- class Resume(val cont: Continuation<Unit>) : LockFreeLinkedListNode(), Runnable {
+ abstract class Node : LockFreeLinkedListNode(), Runnable
+
+ class Dispatch(block: Runnable) : Node(), Runnable by block
+
+ class Resume(val cont: Continuation<Unit>) : Node() {
override fun run() = cont.resume(Unit)
}
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index acfc634..80b4a4f 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -23,6 +23,9 @@
* All functions on this interface are thread-safe.
*/
public interface Job : CoroutineContext.Element {
+ /**
+ * Key for [Job] instance in the coroutine context.
+ */
public companion object Key : CoroutineContext.Key<Job> {
/**
* Creates new job object. It is optionally a child of a [parent] job.
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
index cf6d430..b456c12 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
@@ -37,7 +37,7 @@
nThreads: Int,
name: String,
val job: Job
-) : CoroutineDispatcher(), ContinuationInterceptor, Delay {
+) : CoroutineDispatcher(), ContinuationInterceptor, Yield, Delay {
val threadNo = AtomicInteger()
val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
thread(start = false, isDaemon = true,
@@ -55,7 +55,11 @@
override fun dispatch(block: Runnable) = executor.execute(block)
- override fun resumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
+ override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
+ executor.execute { continuation.resume(Unit) }
+ }
+
+ override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
val timeout = executor.schedule({ continuation.resume(Unit) }, time, unit)
continuation.cancelFutureOnCompletion(timeout)
}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
new file mode 100644
index 0000000..9623de3
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
@@ -0,0 +1,39 @@
+package kotlinx.coroutines.experimental
+
+import kotlin.coroutines.ContinuationInterceptor
+
+/**
+ * This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that
+ * natively support [yield] function.
+ */
+public interface Yield {
+ /**
+ * Yields a thread (or thread pool) of this dispatcher to other coroutines to run.
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
+ * resumes with [CancellationException].
+ */
+ suspend fun yield(): Unit = suspendCancellableCoroutine { cont -> scheduleResume(cont) }
+
+ /**
+ * Schedules resume of a specified [continuation] in this dispatcher's thread (or pool of threads).
+ */
+ fun scheduleResume(continuation: CancellableContinuation<Unit>)
+}
+
+/**
+ * Yields a thread (or thread pool) of the current coroutine dispatcher to other coroutines to run.
+ * If the coroutine dispatcher does not have its own thread pool (like [Here] dispatcher) and does not implement
+ * [Yield], then the [Yield] implementation is taken from the context, if there is none, then this
+ * function does nothing, but checks if the coroutine [Job] was cancelled.
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
+ * resumes with [CancellationException].
+ */
+suspend fun yield(): Unit = suspendCancellableCoroutine sc@ { cont ->
+ (cont.context[ContinuationInterceptor] as? Yield)?.apply {
+ scheduleResume(cont)
+ return@sc
+ }
+ cont.resume(Unit)
+}
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt
index d7b5e62..d5333d5 100644
--- a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/CoroutinesTest.kt
@@ -33,22 +33,22 @@
}
@Test
- fun testSimple() = runEventLoop {
+ fun testSimple() = runBlocking {
expect(1)
finish(2)
}
@Test
- fun testYield() = runEventLoop {
+ fun testYield() = runBlocking {
expect(1)
yield() // effectively does nothing, as we don't have other coroutines
finish(2)
}
@Test
- fun testLaunchAndYieldJoin() = runEventLoop {
+ fun testLaunchAndYieldJoin() = runBlocking {
expect(1)
- val job = launch(Here) {
+ val job = launch(context) {
expect(2)
yield()
expect(4)
@@ -59,11 +59,11 @@
}
@Test
- fun testNested() = runEventLoop {
+ fun testNested() = runBlocking {
expect(1)
- launch(Here) {
+ launch(context) {
expect(2)
- launch(Here) {
+ launch(context) {
expect(3)
}
expect(4)
@@ -72,11 +72,11 @@
}
@Test
- fun testNestedAndYieldJoin() = runEventLoop {
+ fun testNestedAndYieldJoin() = runBlocking {
expect(1)
- val j1 = launch(Here) {
+ val j1 = launch(context) {
expect(2)
- val j2 = launch(Here) {
+ val j2 = launch(context) {
expect(3)
yield()
expect(6)
@@ -91,9 +91,9 @@
}
@Test
- fun testCancelChildImplicit() = runEventLoop {
+ fun testCancelChildImplicit() = runBlocking {
expect(1)
- launch(Here) {
+ launch(context) {
expect(2)
yield() // parent finishes earlier, does not wait for us
expectUnreached()
@@ -102,9 +102,9 @@
}
@Test
- fun testCancelChildExplicit() = runEventLoop {
+ fun testCancelChildExplicit() = runBlocking {
expect(1)
- val job = launch(Here) {
+ val job = launch(context) {
expect(2)
yield()
expectUnreached()
@@ -115,11 +115,11 @@
}
@Test
- fun testCancelNestedImplicit() = runEventLoop {
+ fun testCancelNestedImplicit() = runBlocking {
expect(1)
- val j1 = launch(Here) {
+ val j1 = launch(context) {
expect(2)
- val j2 = launch(Here) {
+ val j2 = launch(context) {
expect(3)
yield() // parent finishes earlier, does not wait for us
expectUnreached()
@@ -130,11 +130,11 @@
}
@Test
- fun testCancelNestedImplicit2() = runEventLoop {
+ fun testCancelNestedImplicit2() = runBlocking {
expect(1)
- val j1 = launch(Here) {
+ val j1 = launch(context) {
expect(2)
- val j2 = launch(Here) {
+ val j2 = launch(context) {
expect(3)
yield() // parent finishes earlier, does not wait for us
expectUnreached()
@@ -147,15 +147,15 @@
}
@Test(expected = IOException::class)
- fun testExceptionPropagation(): Unit = runEventLoop {
+ fun testExceptionPropagation(): Unit = runBlocking {
finish(1)
throw IOException()
}
@Test(expected = CancellationException::class)
- fun testCancelParentOnChildException(): Unit = runEventLoop {
+ fun testCancelParentOnChildException(): Unit = runBlocking {
expect(1)
- launch(Here) {
+ launch(context) {
expect(2)
throw IOException() // does not propagate exception to launch, but cancels parent (!)
}
@@ -163,9 +163,9 @@
}
@Test(expected = CancellationException::class)
- fun testCancelParentOnChildException2(): Unit = runEventLoop {
+ fun testCancelParentOnChildException2(): Unit = runBlocking {
expect(1)
- launch(Here) {
+ launch(context) {
expect(2)
throw IOException()
}
@@ -175,11 +175,11 @@
}
@Test(expected = CancellationException::class)
- fun testCancelParentOnNestedException(): Unit = runEventLoop {
+ fun testCancelParentOnNestedException(): Unit = runBlocking {
expect(1)
- launch(Here) {
+ launch(context) {
expect(2)
- launch(Here) {
+ launch(context) {
expect(3)
throw IOException() // unhandled exception kills all parents
}
@@ -187,5 +187,4 @@
}
finish(5)
}
-
}
diff --git a/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt b/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
index 6a74a65..ce26153 100644
--- a/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
+++ b/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
@@ -16,7 +16,7 @@
/**
* Dispatches execution onto JavaFx application thread and provides native [delay] support.
*/
-object JavaFx : CoroutineDispatcher(), Delay {
+object JavaFx : CoroutineDispatcher(), Yield, Delay {
private val pulseTimer by lazy {
PulseTimer().apply { start() }
}
@@ -33,7 +33,11 @@
pulseTimer.onNext(cont)
}
- override fun resumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
+ override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
+ Platform.runLater { continuation.resume(Unit) }
+ }
+
+ override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
val timeline = Timeline(KeyFrame(Duration.millis(unit.toMillis(time).toDouble()),
EventHandler<ActionEvent> { continuation.resume(Unit) }))
timeline.play()
diff --git a/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt b/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt
index f16dd2e..f02b0eb 100644
--- a/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt
+++ b/kotlinx-coroutines-jdk8/src/main/kotlin/kotlinx/coroutines/experimental/future/Future.kt
@@ -10,8 +10,8 @@
* Starts new coroutine and returns its results an an implementation of [CompletableFuture].
* This coroutine builder uses [CommonPool] context by default and is conceptually similar to [CompletableFuture.supplyAsync].
*
- * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed
- * The [context] for the new coroutine must include [CoroutineDispatcher] element.
+ * The running coroutine is cancelled when the resulting future is cancelled or otherwise completed.
+ * If the [context] for the new coroutine is explicitly specified, then it must include [CoroutineDispatcher] element.
* See [CoroutineDispatcher] for the standard [context] implementations that are provided by `kotlinx.coroutines`.
* The specified context is added to the context of the parent running coroutine (if any) inside which this function
* is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
diff --git a/kotlinx-coroutines-nio/src/test/kotlin/kotlinx/coroutines/experimental/nio/AsyncIOTest.kt b/kotlinx-coroutines-nio/src/test/kotlin/kotlinx/coroutines/experimental/nio/AsyncIOTest.kt
index 409b041..c78ea09 100644
--- a/kotlinx-coroutines-nio/src/test/kotlin/kotlinx/coroutines/experimental/nio/AsyncIOTest.kt
+++ b/kotlinx-coroutines-nio/src/test/kotlin/kotlinx/coroutines/experimental/nio/AsyncIOTest.kt
@@ -1,6 +1,8 @@
package kotlinx.coroutines.experimental.nio
-import kotlinx.coroutines.experimental.*
+import kotlinx.coroutines.experimental.join
+import kotlinx.coroutines.experimental.launch
+import kotlinx.coroutines.experimental.runBlocking
import org.apache.commons.io.FileUtils
import org.junit.Rule
import org.junit.Test
@@ -11,7 +13,6 @@
import java.nio.channels.AsynchronousServerSocketChannel
import java.nio.channels.AsynchronousSocketChannel
import java.nio.file.StandardOpenOption
-import java.util.*
import kotlin.test.assertEquals
import kotlin.test.assertTrue
@@ -36,7 +37,7 @@
StandardOpenOption.CREATE, StandardOpenOption.WRITE)
val buf = ByteBuffer.allocate(1024)
- runBlocking(Here) {
+ runBlocking {
var totalBytesRead = 0L
var totalBytesWritten = 0L
while (totalBytesRead < input.size()) {
@@ -60,7 +61,7 @@
}
@Test
- fun testNetworkChannels() = runBlocking<Unit>(Here) {
+ fun testNetworkChannels() = runBlocking {
val serverChannel =
AsynchronousServerSocketChannel
.open()
@@ -68,7 +69,7 @@
val serverPort = (serverChannel.localAddress as InetSocketAddress).port
- val c1 = launch(CommonPool) {
+ val c1 = launch(context) {
val client = serverChannel.aAccept()
val buffer = ByteBuffer.allocate(2)
client.aRead(buffer)
@@ -79,7 +80,7 @@
client.close()
}
- val c2 = launch(CommonPool) {
+ val c2 = launch(context) {
val connection =
AsynchronousSocketChannel.open()
// async calls
diff --git a/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt b/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
index a5e3881..8bcea9d 100644
--- a/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
+++ b/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
@@ -3,6 +3,7 @@
import kotlinx.coroutines.experimental.CancellableContinuation
import kotlinx.coroutines.experimental.CoroutineDispatcher
import kotlinx.coroutines.experimental.Delay
+import kotlinx.coroutines.experimental.Yield
import kotlinx.coroutines.experimental.swing.Swing.delay
import java.awt.event.ActionListener
import java.util.concurrent.TimeUnit
@@ -12,11 +13,15 @@
/**
* Dispatches execution onto Swing event dispatching thread and provides native [delay] support.
*/
-object Swing : CoroutineDispatcher(), Delay {
+object Swing : CoroutineDispatcher(), Yield, Delay {
override fun isDispatchNeeded(): Boolean = !SwingUtilities.isEventDispatchThread()
override fun dispatch(block: Runnable) = SwingUtilities.invokeLater(block)
- override fun resumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
+ override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
+ SwingUtilities.invokeLater { continuation.resume(Unit) }
+ }
+
+ override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
val timerTime = unit.toMillis(time).coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
val timer = Timer(timerTime, ActionListener { continuation.resume(Unit) }).apply {
isRepeats = false