New implementation for 1.1-Beta
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
new file mode 100644
index 0000000..f0fe557
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Builders.kt
@@ -0,0 +1,86 @@
+package kotlinx.coroutines.experimental
+
+import java.util.concurrent.locks.LockSupport
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.startCoroutine
+import kotlin.coroutines.suspendCoroutine
+
+// --------------- basic coroutine builders ---------------
+
+/**
+ * Launches new coroutine without blocking current thread and returns a reference to the coroutine as a [Job].
+ * The [context] for the new coroutine must be explicitly specified and must include [CoroutineDispatcher] element.
+ * The specified context is added to the context of the parent running coroutine (if any) inside which this function
+ * is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
+ *
+ * Uncaught exceptions in this coroutine cancel parent job in the context by default
+ * (unless [CoroutineExceptionHandler] is explicitly specified), which means that when `launch` is used from another
+ * coroutine, any uncaught exception leads to the cancellation of parent coroutine.
+ *
+ * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
+ */
+fun launch(context: CoroutineContext, block: suspend () -> Unit): Job =
+ StandaloneCoroutine(newCoroutineContext(context)).also { block.startCoroutine(it) }
+
+/**
+ * Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
+ * the result. It immediately applies dispatcher from the new context, shifting execution of the block into the
+ * different thread inside the block, and back when it completes.
+ * The specified [context] is merged onto the current coroutine context.
+ */
+public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T =
+ suspendCoroutine { cont ->
+ block.startCoroutine(object : Continuation<T> by cont {
+ override val context: CoroutineContext = cont.context + context
+ })
+ }
+
+/**
+ * Runs new coroutine and *blocks* current thread *interruptibly* until its completion.
+ * This function should not be used from coroutine. It is designed to bridge regular code blocking code
+ * to libraries that are written in suspending style.
+ * The [context] for the new coroutine must be explicitly specified and must include [CoroutineDispatcher] element.
+ * The specified context is added to the context of the parent running coroutine (if any) inside which this function
+ * is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
+ *
+ * If this blocked thread is interrupted (see [Thread.interrupt]), then the coroutine job is cancelled and
+ * this `runBlocking` invocation throws [InterruptedException].
+ *
+ * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
+ */
+@Throws(InterruptedException::class)
+public fun <T> runBlocking(context: CoroutineContext, block: suspend () -> T): T =
+ BlockingCoroutine<T>(newCoroutineContext(context)).also { block.startCoroutine(it) }.joinBlocking()
+
+// --------------- implementation ---------------
+
+private class StandaloneCoroutine(
+ val parentContext: CoroutineContext
+) : JobContinuation<Unit>(parentContext) {
+ override fun afterCompletion(state: Any?, closeException: Throwable?) {
+ if (closeException != null) handleCoroutineException(context, closeException)
+ // note the use of the parent context below!
+ if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
+ }
+}
+
+private class BlockingCoroutine<T>(parentContext: CoroutineContext) : JobContinuation<T>(parentContext) {
+ val blockedThread: Thread = Thread.currentThread()
+
+ override fun afterCompletion(state: Any?, closeException: Throwable?) {
+ if (closeException != null) handleCoroutineException(context, closeException)
+ LockSupport.unpark(blockedThread)
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ fun joinBlocking(): T {
+ while (isActive) {
+ if (Thread.interrupted()) throw InterruptedException().also { cancel(it) }
+ LockSupport.park(this)
+ }
+ val state = getState()
+ (state as? CompletedExceptionally)?.let { throw it.exception }
+ return state as T
+ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
new file mode 100644
index 0000000..397d590
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CancellableContinuation.kt
@@ -0,0 +1,62 @@
+package kotlinx.coroutines.experimental
+
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.intrinsics.SUSPENDED_MARKER
+import kotlin.coroutines.intrinsics.suspendCoroutineOrReturn
+import kotlin.coroutines.suspendCoroutine
+
+// --------------- cancellable continuations ---------------
+
+/**
+ * Cancellable continuation. Its job is completed when it is resumed or cancelled.
+ * When [cancel] function is explicitly invoked, this continuation resumes with [CancellationException].
+ * If the cancel reason was not a [CancellationException], then the original exception is added as cause of the
+ * [CancellationException] that this continuation resumes with.
+ */
+public interface CancellableContinuation<in T> : Continuation<T>, Job
+
+/**
+ * Suspend coroutine similar to [suspendCoroutine], but provide an implementation of [CancellableContinuation] to
+ * the [block].
+ */
+public inline suspend fun <T> suspendCancellableCoroutine(crossinline block: (CancellableContinuation<T>) -> Unit): T =
+ suspendCoroutineOrReturn { c ->
+ val safe = SafeCancellableContinuation(c)
+ block(safe)
+ safe.getResult()
+ }
+
+// --------------- implementation details ---------------
+
+@PublishedApi
+internal class SafeCancellableContinuation<in T>(
+ private val delegate: Continuation<T>
+) : JobContinuation<T>(delegate.context), CancellableContinuation<T> {
+ // only updated from the thread that invoked suspendCancellableCoroutine
+ private var suspendedThread: Thread? = Thread.currentThread()
+
+ fun getResult(): Any? {
+ if (suspendedThread != null) {
+ suspendedThread = null
+ return SUSPENDED_MARKER
+ }
+ val state = getState()
+ if (state is CompletedExceptionally) throw state.exception
+ return state
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ override fun afterCompletion(state: Any?, closeException: Throwable?) {
+ if (closeException != null) handleCoroutineException(context, closeException)
+ if (suspendedThread === Thread.currentThread()) {
+ // cancelled during suspendCancellableCoroutine in its thread
+ suspendedThread = null
+ } else {
+ // cancelled later or in other thread
+ if (state is CompletedExceptionally)
+ delegate.resumeWithException(state.exception)
+ else
+ delegate.resume(state as T)
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
new file mode 100644
index 0000000..db9ee47
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
@@ -0,0 +1,40 @@
+package kotlinx.coroutines.experimental
+
+import java.util.concurrent.Executor
+import java.util.concurrent.Executors
+import java.util.concurrent.ForkJoinPool
+import java.util.concurrent.atomic.AtomicInteger
+
+/**
+ * Represents common pool of threads as coroutine dispatcher for compute-intensive tasks.
+ * It uses [ForkJoinPool] when available, which implements efficient work-stealing algorithm for its queues, so every
+ * coroutine resumption is dispatched as a separate task even when it already executes inside the pool.
+ * When available, it wraps `ForkJoinPool.commonPool()` and provides a similar shared pool where not.
+ */
+object CommonPool : CoroutineDispatcher() {
+ private val pool: Executor = findPool()
+
+ private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
+
+ private fun findPool(): Executor {
+ val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
+ ?: return createPlainPool()
+ Try { fjpClass.getMethod("commonPool")?.invoke(null) as? Executor }
+ ?. let { return it }
+ Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? Executor }
+ ?. let { return it }
+ return createPlainPool()
+ }
+
+ private fun createPlainPool(): Executor {
+ val threadId = AtomicInteger()
+ return Executors.newFixedThreadPool(defaultParallelism()) {
+ Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
+ }
+ }
+
+ private fun defaultParallelism() = (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
+
+ override fun isDispatchNeeded(): Boolean = true
+ override fun dispatch(block: Runnable) = pool.execute(block)
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
new file mode 100644
index 0000000..5e0e3b6
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
@@ -0,0 +1,56 @@
+package kotlinx.coroutines.experimental
+
+import kotlin.coroutines.AbstractCoroutineContextElement
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.ContinuationInterceptor
+
+/**
+ * Base class that shall be extended by all coroutine dispatcher implementations so that that [newCoroutineContext] is
+ * correctly transferred to a new thread.
+ */
+public abstract class CoroutineDispatcher :
+ AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
+ /**
+ * Return `true` if execution shall be dispatched onto another thread.
+ */
+ public abstract fun isDispatchNeeded(): Boolean
+
+ /**
+ * Dispatches execution of a runnable [block] onto another thread.
+ */
+ public abstract fun dispatch(block: Runnable)
+
+ override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
+ DispatchedContinuation<T>(this, continuation)
+}
+
+private class DispatchedContinuation<T>(
+ val dispatcher: CoroutineDispatcher,
+ val continuation: Continuation<T>
+): Continuation<T> by continuation {
+ override fun resume(value: T) {
+ if (dispatcher.isDispatchNeeded())
+ dispatcher.dispatch(Runnable {
+ withDefaultCoroutineContext(continuation.context) {
+ continuation.resume(value)
+ }
+ })
+ else
+ withDefaultCoroutineContext(continuation.context) {
+ continuation.resume(value)
+ }
+ }
+
+ override fun resumeWithException(exception: Throwable) {
+ if (dispatcher.isDispatchNeeded())
+ dispatcher.dispatch(Runnable {
+ withDefaultCoroutineContext(continuation.context) {
+ continuation.resumeWithException(exception)
+ }
+ })
+ else
+ withDefaultCoroutineContext(continuation.context) {
+ continuation.resumeWithException(exception)
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt
new file mode 100644
index 0000000..1e4ee7d
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineExceptionHandler.kt
@@ -0,0 +1,36 @@
+package kotlinx.coroutines.experimental
+
+import kotlin.coroutines.CoroutineContext
+
+
+/**
+ * Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
+ * It tries to handle uncaught exception in the following way:
+ * * If there is [CoroutineExceptionHandler] in the context, then it is used.
+ * * Otherwise, if there is a [Job] in the context, then [Job.cancel] is invoked and if it
+ * returns `true` (it was still active), then the exception is considered to be handled.
+ * * Otherwise, if exception is [CancellationException] then it is ignored.
+ * * Otherwise, current thread's [Thread.uncaughtExceptionHandler] is used.
+ */
+fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
+ context[CoroutineExceptionHandler]?.let {
+ it.handleException(context, exception)
+ return
+ }
+ // quit if successfully pushed exception as cancellation cancelReason
+ if (context[Job]?.cancel(exception) ?: false) return
+ // ignore CancellationException (they are normal means to terminate a coroutine)
+ if (exception is CancellationException) return
+ // otherwise just use thread's handler
+ val currentThread = Thread.currentThread()
+ currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
+}
+
+/**
+ * An optional element on the coroutine context to handler uncaught exceptions.
+ * See [handleCoroutineException].
+ */
+public interface CoroutineExceptionHandler : CoroutineContext.Element {
+ companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>
+ public fun handleException(context: CoroutineContext, exception: Throwable)
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineName.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineName.kt
new file mode 100644
index 0000000..07ef03d
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineName.kt
@@ -0,0 +1,13 @@
+package kotlinx.coroutines.experimental
+
+import kotlin.coroutines.AbstractCoroutineContextElement
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * User-specified name of coroutine. This name is used in debugging mode.
+ * See [newCoroutineContext] for the description of coroutine debugging facilities.
+ */
+public data class CoroutineName(val name: String) : AbstractCoroutineContextElement(CoroutineName) {
+ public companion object Key : CoroutineContext.Key<CoroutineName>
+ override fun toString(): String = "CoroutineName($name)"
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
new file mode 100644
index 0000000..3e0cde7
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
@@ -0,0 +1,100 @@
+package kotlinx.coroutines.experimental
+
+import java.util.concurrent.atomic.AtomicLong
+import kotlin.coroutines.AbstractCoroutineContextElement
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.EmptyCoroutineContext
+
+private const val DEBUG_PROPERTY_NAME = "kotlinx.coroutines.debug"
+private val DEBUG = CoroutineId::class.java.desiredAssertionStatus() || System.getProperty(DEBUG_PROPERTY_NAME) != null
+private val COROUTINE_ID = AtomicLong()
+
+@PublishedApi
+internal val CURRENT_CONTEXT = ThreadLocal<CoroutineContext>()
+
+/**
+ * A coroutine dispatcher that executes initial continuation of the coroutine _right here_ in the current call-frame
+ * and let the coroutine resume in whatever thread that is used by the corresponding suspending function, without
+ * mandating any specific threading policy.
+ */
+public object Here : CoroutineDispatcher() {
+ override fun isDispatchNeeded(): Boolean = false
+ override fun dispatch(block: Runnable) { throw UnsupportedOperationException() }
+}
+
+/**
+ * Creates context for the new coroutine with user-specified overrides from [context] parameter.
+ * The [context] for the new coroutine must be explicitly specified and must include [CoroutineDispatcher] element.
+ * This function shall be used to start new coroutines.
+ *
+ * **Debugging facilities:** When assertions are enabled or when "kotlinx.coroutines.debug" system property
+ * is set, every coroutine is assigned a unique consecutive identifier. Every thread that executes
+ * a coroutine has its name modified to include the name and identifier of the currently currently running coroutine.
+ *
+ * When one coroutine is suspended and resumes another coroutine in the same thread and a [CoroutineDispatcher]
+ * is not explicitly or dispatcher executes continuation in the same thread, then the thread name displays
+ * the whole stack of coroutine descriptions that are being executed on this thread.
+ *
+ * Coroutine name can be explicitly assigned using [CoroutineName] context element.
+ * The string "coroutine" is used as a default name.
+ */
+public fun newCoroutineContext(context: CoroutineContext): CoroutineContext {
+ validateContext(context)
+ return ((CURRENT_CONTEXT.get() ?: EmptyCoroutineContext) + context).let {
+ if (DEBUG) it + CoroutineId(COROUTINE_ID.incrementAndGet()) else it
+ }
+}
+
+/**
+ * Executes a block using a given default coroutine context.
+ * This context affects all new coroutines that are started withing the block.
+ * The specified [context] is merged onto the current coroutine context (if any).
+ */
+internal inline fun <T> withDefaultCoroutineContext(context: CoroutineContext, block: () -> T): T {
+ val oldContext = CURRENT_CONTEXT.get()
+ val oldName = updateContext(oldContext, context)
+ try {
+ return block()
+ } finally {
+ restoreContext(oldContext, oldName)
+ }
+}
+
+private fun validateContext(context: CoroutineContext) {
+ check(context[ContinuationInterceptor] is CoroutineDispatcher) {
+ "Context of new coroutine must include CoroutineDispatcher"
+ }
+}
+
+@PublishedApi
+internal fun updateContext(oldContext: CoroutineContext?, newContext: CoroutineContext): String? {
+ if (newContext === oldContext) return null
+ CURRENT_CONTEXT.set(newContext)
+ if (!DEBUG) return null
+ val newId = newContext[CoroutineId] ?: return null
+ val oldId = oldContext?.get(CoroutineId)
+ if (newId === oldId) return null
+ val currentThread = Thread.currentThread()
+ val oldName = currentThread.name
+ val coroutineName = newContext[CoroutineName]?.name ?: "coroutine"
+ currentThread.name = buildString(oldName.length + coroutineName.length + 10) {
+ append(oldName)
+ append(" @")
+ append(coroutineName)
+ append('#')
+ append(newId.id)
+ }
+ return oldName
+}
+
+@PublishedApi
+internal fun restoreContext(oldContext: CoroutineContext?, oldName: String?) {
+ if (oldName != null) Thread.currentThread().name = oldName
+ CURRENT_CONTEXT.set(oldContext)
+}
+
+private class CoroutineId(val id: Long) : AbstractCoroutineContextElement(CoroutineId) {
+ companion object Key : CoroutineContext.Key<CoroutineId>
+ override fun toString(): String = "CoroutineId($id)"
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
new file mode 100644
index 0000000..4c0c5e3
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -0,0 +1,60 @@
+package kotlinx.coroutines.experimental
+
+import kotlin.coroutines.CoroutineContext
+import kotlin.coroutines.startCoroutine
+
+/**
+ * Deferred value is conceptually a non-blocking cancellable future.
+ * It is created with [defer] coroutine builder.
+ */
+public interface Deferred<out T> : Job {
+ /**
+ * Awaits for completion of this value without blocking a thread and resumes when deferred computation is complete.
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is completed while this suspending function is waiting, this function
+ * immediately resumes with [CancellationException] .
+ */
+ public suspend fun await(): T
+}
+
+/**
+ * Starts new coroutine and returns its result as an implementation of [Deferred].
+ * The running coroutine is cancelled when the resulting job is cancelled.
+ * The [context] for the new coroutine must be explicitly specified and must include [CoroutineDispatcher] element.
+ * The specified context is added to the context of the parent running coroutine (if any) inside which this function
+ * is invoked. The [Job] of the resulting coroutine is a child of the job of the parent coroutine (if any).
+ */
+public fun <T> defer(context: CoroutineContext, block: suspend () -> T) : Deferred<T> =
+ DeferredCoroutine<T>(newCoroutineContext(context)).also { block.startCoroutine(it) }
+
+private class DeferredCoroutine<T>(
+ parentContext: CoroutineContext
+) : JobContinuation<T>(parentContext), Deferred<T> {
+ @Suppress("UNCHECKED_CAST")
+ suspend override fun await(): T {
+ // quick check if already complete (avoid extra object creation)
+ val state = getState()
+ if (state !is Active) {
+ if (state is CompletedExceptionally) throw state.exception
+ return state as T
+ }
+ // Note: await is cancellable itself!
+ return awaitGetValue()
+ }
+
+ @Suppress("UNCHECKED_CAST")
+ private suspend fun awaitGetValue(): T = suspendCancellableCoroutine { cont ->
+ cont.unregisterOnCompletion(onCompletion {
+ val state = getState()
+ check(state !is Active)
+ if (state is CompletedExceptionally)
+ cont.resumeWithException(state.exception)
+ else
+ cont.resume(state as T)
+ })
+ }
+
+ override fun afterCompletion(state: Any?, closeException: Throwable?) {
+ if (closeException != null) handleCoroutineException(context, closeException)
+ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
new file mode 100644
index 0000000..8fc9e1e
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -0,0 +1,48 @@
+package kotlinx.coroutines.experimental
+
+import java.util.concurrent.TimeUnit
+import kotlin.coroutines.ContinuationInterceptor
+
+/**
+ * Implemented by [CoroutineDispatcher] implementations that natively support non-blocking [delay] function.
+ */
+public interface Delay {
+ /**
+ * Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
+ * immediately resumes with [CancellationException].
+ */
+ suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
+ require(time >= 0) { "Delay time $time cannot be negative" }
+ if (time <= 0) return // don't delay
+ return suspendCancellableCoroutine { resumeAfterDelay(time, unit, it) }
+ }
+
+ /**
+ * Resumes a specified continuation after a specified delay.
+ */
+ fun resumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>)
+}
+
+/**
+ * Delays coroutine for a given time without blocking a thread and resumes it after a specified time.
+ * This suspending function is cancellable.
+ * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
+ * immediately resumes with [CancellationException].
+ *
+ * This function delegates to [Delay] implementation of the context [CoroutineDispatcher] if possible,
+ * otherwise it resumes using a built-in single-threaded scheduled executor service.
+ */
+suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
+ require(time >= 0) { "Delay time $time cannot be negative" }
+ if (time <= 0) return // don't delay
+ return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
+ (cont.context[ContinuationInterceptor] as? Delay)?.apply {
+ resumeAfterDelay(time, unit, cont)
+ return@sc
+ }
+ val timeout = scheduledExecutor.schedule({ cont.resume(Unit) }, time, unit)
+ cont.cancelFutureOnCompletion(timeout)
+ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
new file mode 100644
index 0000000..fc89287
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -0,0 +1,271 @@
+package kotlinx.coroutines.experimental
+
+import kotlinx.coroutines.experimental.util.LockFreeLinkedListHead
+import kotlinx.coroutines.experimental.util.LockFreeLinkedListNode
+import java.util.concurrent.CancellationException
+import java.util.concurrent.Future
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import kotlin.coroutines.AbstractCoroutineContextElement
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+
+// --------------- core job interfaces ---------------
+
+/**
+ * A background job. It has two states: _active_ (initial state) and _completed_ (final state).
+ * It can be _cancelled_ at any time with [cancel] function that forces it to become completed immediately.
+ * A job in the coroutine context represents the coroutine itself.
+ * A job is active while the coroutine is working and job's cancellation aborts the coroutine when
+ * the coroutine is suspended on a _cancellable_ suspension point by throwing [CancellationException]
+ * inside the coroutine.
+ *
+ * Jobs can have a _parent_. A job with a parent is cancelled when its parent completes.
+ *
+ * All functions on this interface are thread-safe.
+ */
+public interface Job : CoroutineContext.Element {
+ public companion object Key : CoroutineContext.Key<Job> {
+ /**
+ * Creates new job object. It is optionally a child of a [parent] job.
+ */
+ public operator fun invoke(parent: Job? = null): Job = JobSupport(parent)
+ }
+
+ /**
+ * Returns `true` when job is still active.
+ */
+ public val isActive: Boolean
+
+ /**
+ * Registers completion handler. The action depends on the state of this job.
+ * When job is cancelled with [cancel], then the handler is immediately invoked
+ * with a cancellation reason. Otherwise, handler will be invoked once when this
+ * job is complete (cancellation also is a form of completion).
+ * The resulting [Registration] can be used to [Registration.unregister] if this
+ * registration is no longer needed. There is no need to unregister after completion.
+ */
+ public fun onCompletion(handler: CompletionHandler): Registration
+
+ /**
+ * Cancel this activity with an optional cancellation [reason]. The result is `true` if this job was
+ * cancelled as a result of this invocation and `false` otherwise (if it was already cancelled).
+ * When cancellation has a clear reason in the code, an instance of [CancellationException] should be created
+ * at the corresponding original cancellation site and passed into this method to aid in debugging by providing
+ * both the context of cancellation and text description of the reason.
+ */
+ public fun cancel(reason: Throwable? = null): Boolean
+
+ /**
+ * Registration object for [onCompletion]. It can be used to [unregister] if needed.
+ * There is no need to unregister after completion.
+ */
+ public interface Registration {
+ /**
+ * Unregisters completion handler.
+ */
+ public fun unregister()
+ }
+}
+
+typealias CompletionHandler = (Throwable?) -> Unit
+
+typealias CancellationException = CancellationException
+
+/**
+ * Unregisters a specified [registration] when this job is complete.
+ * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
+ * ```
+ * onCompletion { registration.unregister() }
+ * ```
+ */
+public fun Job.unregisterOnCompletion(registration: Job.Registration): Job.Registration =
+ onCompletion(UnregisterOnCompletion(this, registration))
+
+/**
+ * Cancels a specified [future] when this job is complete.
+ * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
+ * ```
+ * onCompletion { future.cancel(true) }
+ * ```
+ */
+public fun Job.cancelFutureOnCompletion(future: Future<*>): Job.Registration =
+ onCompletion(CancelFutureOnCompletion(this, future))
+
+/**
+ * Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
+ * when the job is complete for any reason.
+ *
+ * This suspending function is cancellable. If the [Job] of the invoking coroutine is completed while this
+ * suspending function is suspended, this function immediately resumes with [CancellationException].
+ */
+public suspend fun Job.join() {
+ if (!isActive) return // fast path
+ return suspendCancellableCoroutine { cont ->
+ cont.unregisterOnCompletion(onCompletion(ResumeOnCompletion(this, cont)))
+ }
+}
+
+// --------------- utility classes to simplify job implementation
+
+/**
+ * A concrete implementation of [Job]. It is optionally a child to a parent job.
+ * This job is cancelled when the parent is complete, but not vise-versa.
+ *
+ * This is an open class designed for extension by more specific classes that might augment the
+ * state and mare store addition state information for completed jobs, like their result values.
+ */
+@Suppress("LeakingThis")
+public open class JobSupport(
+ parent: Job? = null
+) : AbstractCoroutineContextElement(Job), Job {
+ // keeps a stack of cancel listeners or a special CANCELLED, other values denote completed scope
+ @Volatile
+ private var state: Any? = Active() // will drop the list on cancel
+
+ // directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
+ private val registration: Job.Registration? = parent?.onCompletion(CancelOnCompletion(parent, this))
+
+ protected companion object {
+ @JvmStatic
+ private val STATE: AtomicReferenceFieldUpdater<JobSupport, Any?> =
+ AtomicReferenceFieldUpdater.newUpdater(JobSupport::class.java, Any::class.java, "state")
+ }
+
+ protected fun getState(): Any? = state
+
+ protected fun updateState(expect: Any, update: Any?): Boolean {
+ expect as Active // assert type
+ require(update !is Active) // only active -> inactive transition is allowed
+ if (!STATE.compareAndSet(this, expect, update)) return false
+ // #1. Unregister from parent job
+ registration?.unregister()
+ // #2 Invoke completion handlers
+ var closeException: Throwable? = null
+ val reason = when (update) {
+ is Cancelled -> update.cancelReason
+ is CompletedExceptionally -> update.exception
+ else -> null
+ }
+ expect.forEach<JobNode> { node ->
+ try {
+ node.invoke(reason)
+ } catch (ex: Throwable) {
+ if (closeException == null) closeException = ex else closeException!!.addSuppressed(ex)
+ }
+ }
+ // #3 Do other (overridable) processing
+ afterCompletion(update, closeException)
+ return true
+ }
+
+ public override val isActive: Boolean get() = state is Active
+
+ public override fun onCompletion(handler: CompletionHandler): Job.Registration {
+ var nodeCache: JobNode? = null
+ while (true) { // lock-free loop on state
+ val state = this.state
+ if (state !is Active) {
+ handler((state as? Cancelled)?.cancelReason)
+ return EmptyRegistration
+ }
+ val node = nodeCache ?: makeNode(handler).apply { nodeCache = this }
+ if (state.addLastIf(node) { this.state == state }) return node
+ }
+ }
+
+ public override fun cancel(reason: Throwable?): Boolean {
+ while (true) { // lock-free loop on state
+ val state = this.state as? Active ?: return false // quit if not active anymore
+ if (updateState(state, Cancelled(reason))) return true
+ }
+ }
+
+ protected open fun afterCompletion(state: Any?, closeException: Throwable?) {
+ if (closeException != null) throw closeException
+ }
+
+ private fun makeNode(handler: CompletionHandler): JobNode =
+ (handler as? JobNode)?.also { require(it.job === this) }
+ ?: InvokeOnCompletion(this, handler)
+
+ protected class Active : LockFreeLinkedListHead()
+
+ protected abstract class CompletedExceptionally {
+ abstract val cancelReason: Throwable?
+ abstract val exception: Throwable
+ }
+
+ protected class Cancelled(override val cancelReason: Throwable?) : CompletedExceptionally() {
+ @Volatile
+ private var _exception: Throwable? = null // convert reason to CancellationException on first need
+ override val exception: Throwable get() =
+ _exception ?: // atomic read volatile var or else
+ run {
+ val result = cancelReason as? CancellationException ?:
+ CancellationException().apply { if (cancelReason != null) initCause(cancelReason) }
+ _exception = result
+ result
+ }
+ }
+
+ protected class Failed(override val exception: Throwable) : CompletedExceptionally() {
+ override val cancelReason: Throwable
+ get() = exception
+ }
+}
+
+internal abstract class JobNode(
+ val job: Job
+) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler {
+ override fun unregister() {
+ // this is an object-allocation optimization -- do not remove if job is not active anymore
+ if (job.isActive) remove()
+ }
+
+ override abstract fun invoke(reason: Throwable?)
+}
+
+private class InvokeOnCompletion(
+ job: Job,
+ val handler: CompletionHandler
+) : JobNode(job) {
+ override fun invoke(reason: Throwable?) = handler.invoke(reason)
+ override fun toString() = "InvokeOnCompletion[${handler::class.java.name}@${Integer.toHexString(System.identityHashCode(handler))}]"
+}
+
+private class ResumeOnCompletion(
+ job: Job,
+ val continuation: Continuation<Unit>
+) : JobNode(job) {
+ override fun invoke(reason: Throwable?) = continuation.resume(Unit)
+ override fun toString() = "ResumeOnCompletion[$continuation]"
+}
+
+private class UnregisterOnCompletion(
+ job: Job,
+ val registration: Job.Registration
+) : JobNode(job) {
+ override fun invoke(reason: Throwable?) = registration.unregister()
+ override fun toString(): String = "UnregisterOnCompletion[$registration]"
+}
+
+private class CancelOnCompletion(
+ parentJob: Job,
+ val subordinateJob: Job
+) : JobNode(parentJob) {
+ override fun invoke(reason: Throwable?) { subordinateJob.cancel(reason) }
+ override fun toString(): String = "CancelOnCompletion[$subordinateJob]"
+}
+
+private object EmptyRegistration : Job.Registration {
+ override fun unregister() {}
+ override fun toString(): String = "EmptyRegistration"
+}
+
+private class CancelFutureOnCompletion(
+ job: Job,
+ val future: Future<*>
+) : JobNode(job) {
+ override fun invoke(reason: Throwable?) { future.cancel(true) }
+ override fun toString() = "CancelFutureOnCompletion[$future]"
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/JobContinuation.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/JobContinuation.kt
new file mode 100644
index 0000000..497de91
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/JobContinuation.kt
@@ -0,0 +1,38 @@
+package kotlinx.coroutines.experimental
+
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
+
+// internal helper class for various primitives that combines Job and Continuation implementations
+@Suppress("LeakingThis")
+internal open class JobContinuation<in T>(
+ parentContext: CoroutineContext
+) : JobSupport(parentContext[Job]), Continuation<T> {
+ override val context: CoroutineContext = parentContext + this // mixes this job into this context
+
+ override fun resume(value: T) {
+ while (true) { // lock-free loop on state
+ val state = getState() // atomic read
+ when (state) {
+ is Active -> if (updateState(state, value)) return
+ is Cancelled -> return // ignore resumes on cancelled continuation
+ else -> throw IllegalStateException("Already resumed, but got value $value")
+ }
+ }
+ }
+
+ override fun resumeWithException(exception: Throwable) {
+ while (true) { // lock-free loop on state
+ val state = getState() // atomic read
+ when (state) {
+ is Active -> if (updateState(state, Failed(exception))) return
+ is Cancelled -> {
+ // ignore resumes on cancelled continuation, but handle exception if a different one is here
+ if (exception != state.exception) handleCoroutineException(context, exception)
+ return
+ }
+ else -> throw IllegalStateException("Already resumed, but got exception $exception", exception)
+ }
+ }
+ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
new file mode 100644
index 0000000..73445d3
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
@@ -0,0 +1,31 @@
+package kotlinx.coroutines.experimental
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import kotlin.coroutines.startCoroutine
+
+internal val scheduledExecutor by lazy<ScheduledExecutorService> {
+ Executors.newScheduledThreadPool(1) { r ->
+ Thread(r, "kotlinx.coroutines.ScheduledExecutor").apply { isDaemon = true }
+ }
+}
+
+/**
+ * Runs a given suspending block of code inside a coroutine with a specified timeout and throws
+ * [CancellationException] if timeout was exceeded.
+ */
+suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T {
+ require(time >= 0) { "Timeout time $time cannot be negative" }
+ if (time <= 0L) throw CancellationException("Timed out immediately")
+ return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
+ // schedule cancellation of this continuation on time
+ val timeout = scheduledExecutor.schedule({
+ // create an exception with a specific text
+ cont.cancel(CancellationException("Timed out waiting for $time $unit"))
+ }, time, unit)
+ cont.cancelFutureOnCompletion(timeout)
+ // restart block in a separate coroutine using cancellable context of this continuation,
+ block.startCoroutine(cont)
+ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
new file mode 100644
index 0000000..cf6d430
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
@@ -0,0 +1,62 @@
+package kotlinx.coroutines.experimental
+
+import java.util.concurrent.Executors
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicInteger
+import kotlin.concurrent.thread
+import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Creates new coroutine execution context with the a single thread and built-in [delay] support.
+ * All continuations are dispatched immediately when invoked inside the thread of this context.
+ * Resources of this pool (its thread) are reclaimed when job of this context is cancelled.
+ * The specified [name] defines the name of the new thread.
+ * An optional [parent] job may be specified upon creation.
+ */
+fun newSingleThreadContext(name: String, parent: Job? = null): CoroutineContext =
+ newFixedThreadPoolContext(1, name, parent)
+
+/**
+ * Creates new coroutine execution context with the fixed-size thread-pool and built-in [delay] support.
+ * All continuations are dispatched immediately when invoked inside the threads of this context.
+ * Resources of this pool (its threads) are reclaimed when job of this context is cancelled.
+ * The specified [name] defines the names of the threads.
+ * An optional [parent] job may be specified upon creation.
+ */
+fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null): CoroutineContext {
+ require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
+ val lifetime = Job(parent)
+ return lifetime + ThreadPoolDispatcher(nThreads, name, lifetime)
+}
+
+private val thisThreadContext = ThreadLocal<ThreadPoolDispatcher>()
+
+private class ThreadPoolDispatcher(
+ nThreads: Int,
+ name: String,
+ val job: Job
+) : CoroutineDispatcher(), ContinuationInterceptor, Delay {
+ val threadNo = AtomicInteger()
+ val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
+ thread(start = false, isDaemon = true,
+ name = if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet()) {
+ thisThreadContext.set(this@ThreadPoolDispatcher)
+ target.run()
+ }
+ }
+
+ init {
+ job.onCompletion { executor.shutdown() }
+ }
+
+ override fun isDispatchNeeded(): Boolean = thisThreadContext.get() != this
+
+ override fun dispatch(block: Runnable) = executor.execute(block)
+
+ override fun resumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
+ val timeout = executor.schedule({ continuation.resume(Unit) }, time, unit)
+ continuation.cancelFutureOnCompletion(timeout)
+ }
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Intrinsics.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Intrinsics.kt
new file mode 100644
index 0000000..75405ae
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/intrinsics/Intrinsics.kt
@@ -0,0 +1,14 @@
+package kotlinx.coroutines.experimental.intrinsics
+
+import kotlin.coroutines.Continuation
+import kotlin.coroutines.intrinsics.SUSPENDED_MARKER
+
+/**
+ * Starts coroutine without receiver and with result type [T].
+ * This function creates and start a new, fresh instance of suspendable computation every time it is invoked.
+ * If the coroutine never suspends, then its result is returned directly,
+ * otherwise it returns [SUSPENDED_MARKER] and the [completion] continuation is invoked when coroutine completes.
+ */
+@Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN", "UNCHECKED_CAST")
+public fun <T> (suspend () -> T).startCoroutineOrReturn(completion: Continuation<T>): Any? =
+ (this as kotlin.jvm.functions.Function1<Continuation<T>, Any?>).invoke(completion)
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/util/LockFreeLinkedList.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/util/LockFreeLinkedList.kt
new file mode 100644
index 0000000..fb8e531
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/util/LockFreeLinkedList.kt
@@ -0,0 +1,295 @@
+package kotlinx.coroutines.experimental.util
+
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+
+private typealias Node = LockFreeLinkedListNode
+
+/**
+ * Doubly-linked concurrent list node with remove support.
+ * Based on paper
+ * ["Lock-Free and Practical Doubly Linked List-Based Deques Using Single-Word Compare-and-Swap"](http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.140.4693&rep=rep1&type=pdf)
+ * by Sundell and Tsigas.
+ * The instance of this class serves both as list head/tail sentinel and as the list item.
+ * Sentinel node should be never removed.
+ */
+@Suppress("LeakingThis")
+public open class LockFreeLinkedListNode {
+ @Volatile
+ private var _next: Any = this // DoubleLinkedNode | Removed | CondAdd
+ @Volatile
+ private var prev: Any = this // DoubleLinkedNode | Removed
+
+ private companion object {
+ @JvmStatic
+ val NEXT: AtomicReferenceFieldUpdater<Node, Any> =
+ AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Any::class.java, "_next")
+ @JvmStatic
+ val PREV: AtomicReferenceFieldUpdater<Node, Any> =
+ AtomicReferenceFieldUpdater.newUpdater(Node::class.java, Any::class.java, "prev")
+
+ }
+
+ private class Removed(val ref: Node) {
+ override fun toString(): String = "Removed[$ref]"
+ }
+
+ @PublishedApi
+ internal abstract class CondAdd {
+ internal lateinit var newNode: Node
+ internal lateinit var oldNext: Node
+ @Volatile
+ private var consensus: Int = UNDECIDED // status of operation
+ abstract fun isCondition(): Boolean
+
+ private companion object {
+ @JvmStatic
+ val CONSENSUS: AtomicIntegerFieldUpdater<CondAdd> =
+ AtomicIntegerFieldUpdater.newUpdater(CondAdd::class.java, "consensus")
+
+ const val UNDECIDED = 0
+ const val SUCCESS = 1
+ const val FAILURE = 2
+ }
+
+ fun completeAdd(node: Node): Boolean {
+ // make decision on status
+ var consensus: Int
+ while (true) {
+ consensus = this.consensus
+ if (consensus != UNDECIDED) break
+ val proposal = if (isCondition()) SUCCESS else FAILURE
+ if (CONSENSUS.compareAndSet(this, UNDECIDED, proposal)) {
+ consensus = proposal
+ break
+ }
+ }
+ val success = consensus == SUCCESS
+ if (NEXT.compareAndSet(node, this, if (success) newNode else oldNext)) {
+ // only the thread the makes this update actually finishes add operation
+ if (success) newNode.finishAdd(oldNext)
+ }
+ return success
+ }
+ }
+
+ public val isRemoved: Boolean get() = _next is Removed
+
+ private val isFresh: Boolean get() = _next === this && prev === this
+
+ private val next: Any get() {
+ while (true) { // helper loop on _next
+ val next = this._next
+ if (next !is CondAdd) return next
+ next.completeAdd(this)
+ }
+ }
+
+ @PublishedApi
+ internal fun next(): Node = next.unwrap()
+
+ @PublishedApi
+ internal fun addFirstCC(node: Node, condAdd: CondAdd?): Boolean {
+ require(node.isFresh)
+ condAdd?.newNode = node
+ while (true) { // lock-free loop on next
+ val next = this.next as Node // this sentinel node is never removed
+ PREV.lazySet(node, this)
+ NEXT.lazySet(node, next)
+ condAdd?.oldNext = next
+ if (NEXT.compareAndSet(this, next, condAdd ?: node)) {
+ // added successfully (linearized add) -- fixup the list
+ return condAdd?.completeAdd(this) ?: run { node.finishAdd(next); true }
+ }
+ }
+ }
+
+ @PublishedApi
+ internal fun addLastCC(node: Node, condAdd: CondAdd?): Boolean {
+ require(node.isFresh)
+ condAdd?.newNode = node
+ while (true) { // lock-free loop on prev.next
+ val prev = this.prev as Node // this sentinel node is never removed
+ if (prev.next !== this) {
+ helpInsert(prev)
+ continue
+ }
+ PREV.lazySet(node, prev)
+ NEXT.lazySet(node, this)
+ condAdd?.oldNext = this
+ if (NEXT.compareAndSet(prev, this, condAdd ?: node)) {
+ // added successfully (linearized add) -- fixup the list
+ return condAdd?.completeAdd(prev) ?: run { node.finishAdd(this); true }
+ }
+ }
+ }
+
+ private fun finishAdd(next: Node) {
+ while (true) {
+ val nextPrev = next.prev
+ if (nextPrev is Removed || this.next !== next) return // next was removed, remover fixes up links
+ if (PREV.compareAndSet(next, nextPrev, this)) {
+ if (this.next is Removed) {
+ // already removed
+ next.helpInsert(nextPrev as Node)
+ }
+ return
+ }
+ }
+ }
+
+ /**
+ * Removes this node from the list.
+ */
+ public open fun remove() {
+ while (true) { // lock-free loop on next
+ val next = this.next
+ if (next is Removed) return // was already removed -- don't try to help (original thread will take care)
+ if (NEXT.compareAndSet(this, next, Removed(next as Node))) {
+ // was removed successfully (linearized remove) -- fixup the list
+ helpDelete()
+ next.helpInsert(prev.unwrap())
+ return
+ }
+ }
+ }
+
+ private fun markPrev(): Node {
+ while (true) { // lock-free loop on prev
+ val prev = this.prev
+ if (prev is Removed) return prev.ref
+ if (PREV.compareAndSet(this, prev, Removed(prev as Node))) return prev
+ }
+ }
+
+ // fixes next links to the left of this node
+ private fun helpDelete() {
+ var last: Node? = null // will set to the node left of prev when found
+ var prev: Node = markPrev()
+ var next: Node = (this._next as Removed).ref
+ while (true) {
+ // move to the right until first non-removed node
+ val nextNext = next.next
+ if (nextNext is Removed) {
+ next.markPrev()
+ next = nextNext.ref
+ continue
+ }
+ // move the the left until first non-removed node
+ val prevNext = prev.next
+ if (prevNext is Removed) {
+ if (last != null) {
+ prev.markPrev()
+ NEXT.compareAndSet(last, prev, prevNext.ref)
+ prev = last
+ last = null
+ } else {
+ prev = prev.prev.unwrap()
+ }
+ continue
+ }
+ if (prevNext !== this) {
+ // skipped over some removed nodes to the left -- setup to fixup the next links
+ last = prev
+ prev = prevNext as Node
+ if (prev === next) return // already done!!!
+ continue
+ }
+ // Now prev & next are Ok
+ if (NEXT.compareAndSet(prev, this, next)) return // success!
+ }
+ }
+
+ // fixes prev links from this node
+ private fun helpInsert(_prev: Node) {
+ var prev: Node = _prev
+ var last: Node? = null // will be set so that last.next === prev
+ while (true) {
+ // move the the left until first non-removed node
+ val prevNext = prev.next
+ if (prevNext is Removed) {
+ if (last !== null) {
+ prev.markPrev()
+ NEXT.compareAndSet(last, prev, prevNext.ref)
+ prev = last
+ last = null
+ } else {
+ prev = prev.prev.unwrap()
+ }
+ continue
+ }
+ val oldPrev = this.prev
+ if (oldPrev is Removed) return // this node was removed, too -- its remover will take care
+ if (prevNext !== this) {
+ // need to fixup next
+ last = prev
+ prev = prevNext as Node
+ continue
+ }
+ if (oldPrev === prev) return // it is already linked as needed
+ if (PREV.compareAndSet(this, oldPrev, prev)) {
+ if (prev.prev !is Removed) return // finish only if prev was not concurrently removed
+ }
+ }
+ }
+
+ private fun Any.unwrap(): Node = if (this is Removed) ref else this as Node
+
+ internal fun validateNode(prev: Node, next: Node) {
+ check(prev === this.prev)
+ check(next === this.next)
+ }
+}
+
+public open class LockFreeLinkedListHead : LockFreeLinkedListNode() {
+ /**
+ * Iterates over all elements in this list of a specified type.
+ */
+ public inline fun <reified T : Node> forEach(block: (T) -> Unit) {
+ var cur: Node = next()
+ while (cur != this) {
+ if (cur is T) block(cur)
+ cur = cur.next()
+ }
+ }
+
+ /**
+ * Adds first item to this list.
+ */
+ public fun addFirst(node: Node) { addFirstCC(node, null) }
+
+ /**
+ * Adds first item to this list atomically if the [condition] is true.
+ */
+ public inline fun addFirstIf(node: Node, crossinline condition: () -> Boolean): Boolean =
+ addFirstCC(node, object : CondAdd() {
+ override fun isCondition(): Boolean = condition()
+ })
+
+ /**
+ * Adds last item to this list.
+ */
+ public fun addLast(node: Node) { addLastCC(node, null) }
+
+ /**
+ * Adds last item to this list atomically if the [condition] is true.
+ */
+ public inline fun addLastIf(node: Node, crossinline condition: () -> Boolean): Boolean =
+ addLastCC(node, object : CondAdd() {
+ override fun isCondition(): Boolean = condition()
+ })
+
+ public override fun remove() = throw UnsupportedOperationException()
+
+ internal fun validate() {
+ var prev: Node = this
+ var cur: Node = next()
+ while (cur != this) {
+ val next = cur.next()
+ cur.validateNode(prev, next)
+ prev = cur
+ cur = next
+ }
+ validateNode(prev, next())
+ }
+}