Various refactorings related to resource management and timeouts

 * `Job.Registration` is renamed to `DisposableHandle`
 * `EmptyRegistration` is renamed to `NonDisposableHandle`
 * `Job.unregisterOnCompletion` is renamed to `Job.disposeOnCompletion`
 * `Delay.invokeOnTimeout` is introduced
* `withTimeout` uses `Delay.invokeOnTimeout` when available
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
index 91dad91..2e69343 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Deferred.kt
@@ -155,7 +155,7 @@
 
     @Suppress("UNCHECKED_CAST")
     private suspend fun awaitSuspend(): T = suspendCancellableCoroutine { cont ->
-        cont.unregisterOnCompletion(invokeOnCompletion {
+        cont.disposeOnCompletion(invokeOnCompletion {
             val state = this.state
             check(state !is Incomplete)
             if (state is CompletedExceptionally)
@@ -183,7 +183,7 @@
             }
             if (startInternal(state) == 0) {
                 // slow-path -- register waiter for completion
-                select.unregisterOnCompletion(invokeOnCompletion(SelectAwaitOnCompletion(this, select, block)))
+                select.disposeOnSelect(invokeOnCompletion(SelectAwaitOnCompletion(this, select, block)))
                 return
             }
         }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
index 24c0aa3..e6daa03 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -16,12 +16,16 @@
 
 package kotlinx.coroutines.experimental
 
+import java.util.concurrent.Future
 import java.util.concurrent.TimeUnit
 import kotlin.coroutines.experimental.ContinuationInterceptor
 
 /**
  * This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that natively support
- * non-blocking [delay] function.
+ * scheduled execution of tasks.
+ *
+ * Implementation of this interface affects operation of
+ * [delay][kotlinx.coroutines.experimental.delay] and [withTimeout] functions.
  */
 public interface Delay {
     /**
@@ -52,6 +56,16 @@
      * ```
      */
     fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>)
+
+    /**
+     * Schedules invocation of a specified [block] after a specified delay [time].
+     * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation
+     * request if it is not needed anymore.
+     *
+     * This implementation uses a built-in single-threaded scheduled executor service.
+     */
+    fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
+        DisposableFutureHandle(scheduledExecutor.schedule(block, time, unit))
 }
 
 /**
@@ -60,18 +74,26 @@
  * If the [Job] of the current coroutine is completed while this suspending function is suspended, this function
  * immediately resumes with [CancellationException].
  *
- * This function delegates to [Delay] implementation of the context [CoroutineDispatcher] if possible,
- * otherwise it resumes using a built-in single-threaded scheduled executor service.
+ * This function delegates to [Delay.scheduleResumeAfterDelay] if the context [CoroutineDispatcher]
+ * implements [Delay] interface, otherwise it resumes using a built-in single-threaded scheduled executor service.
  */
 suspend fun delay(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS) {
     require(time >= 0) { "Delay time $time cannot be negative" }
     if (time <= 0) return // don't delay
     return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
-        (cont.context[ContinuationInterceptor] as? Delay)?.apply {
-            scheduleResumeAfterDelay(time, unit, cont)
-            return@sc
-        }
-        val timeout = scheduledExecutor.schedule(ResumeRunnable(cont), time, unit)
-        cont.cancelFutureOnCompletion(timeout)
+        val delay = cont.context[ContinuationInterceptor] as? Delay
+        if (delay != null)
+            delay.scheduleResumeAfterDelay(time, unit, cont) else
+            cont.cancelFutureOnCompletion(scheduledExecutor.schedule(ResumeRunnable(cont), time, unit))
     }
 }
+
+/**
+ * An implementation of [DisposableHandle] that cancels the specified future on dispose.
+ */
+public class DisposableFutureHandle(private val future: Future<*>) : DisposableHandle {
+    override fun dispose() {
+        future.cancel(false)
+    }
+    override fun toString(): String = "DisposableFutureHandle[$future]"
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
index f0be651..dde9cac 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
@@ -28,7 +28,9 @@
 public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
     ExecutorCoroutineDispatcher(this)
 
-internal open class ExecutorCoroutineDispatcher(val executor: Executor) : CoroutineDispatcher(), Delay {
+internal open class ExecutorCoroutineDispatcher(
+    private val executor: Executor
+) : CoroutineDispatcher(), Delay {
     override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
 
     override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
@@ -37,20 +39,29 @@
             scheduledExecutor.schedule(ResumeRunnable(continuation), time, unit)
         continuation.cancelFutureOnCompletion(timeout)
     }
+
+    override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
+        val timeout = if (executor is ScheduledExecutorService)
+            executor.schedule(block, time, unit) else
+            scheduledExecutor.schedule(block, time, unit)
+        return DisposableFutureHandle(timeout)
+    }
 }
 
 // --- reusing these classes in other places ---
 
 internal class ResumeUndispatchedRunnable(
-    val dispatcher: CoroutineDispatcher,
-    val continuation: CancellableContinuation<Unit>
+    private val dispatcher: CoroutineDispatcher,
+    private val continuation: CancellableContinuation<Unit>
 ) : Runnable {
     override fun run() {
         with(continuation) { dispatcher.resumeUndispatched(Unit) }
     }
 }
 
-internal class ResumeRunnable(val continuation: Continuation<Unit>) : Runnable {
+internal class ResumeRunnable(
+    private val continuation: Continuation<Unit>
+) : Runnable {
     override fun run() {
         continuation.resume(Unit)
     }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
index 18cba3a..add94a3 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Job.kt
@@ -102,18 +102,14 @@
      * Registers handler that is **synchronously** invoked on completion of this job.
      * When job is already complete, then the handler is immediately invoked
      * with a cancellation cause or `null`. Otherwise, handler will be invoked once when this
-     * job is complete. Note, that [cancellation][cancel] is also a form of completion).
+     * job is complete. Note, that [cancellation][cancel] is also a form of completion.
      *
-     * The resulting [Registration] can be used to [Registration.unregister] if this
-     * registration is no longer needed. There is no need to unregister after completion.
+     * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the
+     * registration of this handler and release its memory if its invocation is no longer needed.
+     * There is no need to dispose the handler after completion of this job. The reference to
+     * all the handlers are released when this job completes.
      */
-    public fun invokeOnCompletion(handler: CompletionHandler): Registration
-
-    /**
-     * @suppress **Deprecated**: Renamed to `invokeOnCompletion`
-     */
-    @Deprecated(message = "Renamed to `invokeOnCompletion`", replaceWith = ReplaceWith("invokeOnCompletion(handler)"))
-    public fun onCompletion(handler: CompletionHandler): Registration
+    public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle
 
     /**
      * Suspends coroutine until this job is complete. This invocation resumes normally (without exception)
@@ -161,16 +157,41 @@
     /**
      * Registration object for [invokeOnCompletion]. It can be used to [unregister] if needed.
      * There is no need to unregister after completion.
+     * @suppress **Deprecated**: Replace with `DisposableHandle`
      */
+    @Deprecated(message = "Replace with `DisposableHandle`",
+        replaceWith = ReplaceWith("DisposableHandle"))
     public interface Registration {
         /**
          * Unregisters completion handler.
+         * @suppress **Deprecated**: Replace with `dispose`
          */
+        @Deprecated(message = "Replace with `dispose`",
+            replaceWith = ReplaceWith("dispose()"))
         public fun unregister()
     }
 }
 
 /**
+ * A handle to an allocated object that can be disposed to make it eligible for garbage collection.
+ */
+public interface DisposableHandle : Job.Registration {
+    /**
+     * Disposes the corresponding object, making it eligible for garbage collection.
+     * Repeated invocation of this function has no effect.
+     */
+    public fun dispose()
+
+    /**
+     * Unregisters completion handler.
+     * @suppress **Deprecated**: Replace with `dispose`
+     */
+    @Deprecated(message = "Replace with `dispose`",
+        replaceWith = ReplaceWith("dispose()"))
+    public override fun unregister() = dispose()
+}
+
+/**
  * Handler for [Job.invokeOnCompletion].
  */
 public typealias CompletionHandler = (Throwable?) -> Unit
@@ -187,9 +208,23 @@
  * ```
  * invokeOnCompletion { registration.unregister() }
  * ```
+ * @suppress: **Deprecated**: Renamed to `disposeOnCompletion`.
  */
-public fun Job.unregisterOnCompletion(registration: Job.Registration): Job.Registration =
-    invokeOnCompletion(UnregisterOnCompletion(this, registration))
+@Deprecated(message = "Renamed to `disposeOnCompletion`",
+    replaceWith = ReplaceWith("disposeOnCompletion(registration)"))
+public fun Job.unregisterOnCompletion(registration: DisposableHandle): DisposableHandle =
+    invokeOnCompletion(DisposeOnCompletion(this, registration))
+
+/**
+ * Disposes a specified [handle] when this job is complete.
+ *
+ * This is a shortcut for the following code with slightly more efficient implementation (one fewer object created).
+ * ```
+ * invokeOnCompletion { handle.dispose() }
+ * ```
+ */
+public fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle =
+    invokeOnCompletion(DisposeOnCompletion(this, handle))
 
 /**
  * Cancels a specified [future] when this job is complete.
@@ -199,7 +234,7 @@
  * invokeOnCompletion { future.cancel(false) }
  * ```
  */
-public fun Job.cancelFutureOnCompletion(future: Future<*>): Job.Registration =
+public fun Job.cancelFutureOnCompletion(future: Future<*>): DisposableHandle =
     invokeOnCompletion(CancelFutureOnCompletion(this, future))
 
 /**
@@ -212,12 +247,19 @@
 /**
  * No-op implementation of [Job.Registration].
  */
-public object EmptyRegistration : Job.Registration {
-    /** Does not do anything. */
-    override fun unregister() {}
+@Deprecated(message = "Replace with `NonDisposableHandle`",
+    replaceWith = ReplaceWith("NonDisposableHandle"))
+typealias EmptyRegistration = NonDisposableHandle
 
-    /** Returns "EmptyRegistration" string. */
-    override fun toString(): String = "EmptyRegistration"
+/**
+ * No-op implementation of [DisposableHandle].
+ */
+public object NonDisposableHandle : DisposableHandle {
+    /** Does not do anything. */
+    override fun dispose() {}
+
+    /** Returns "NonDisposableHandle" string. */
+    override fun toString(): String = "NonDisposableHandle"
 }
 
 // --------------- utility classes to simplify job implementation
@@ -280,7 +322,7 @@
     private var _state: Any? = if (active) EmptyActive else EmptyNew // shared objects while we have no listeners
 
     @Volatile
-    private var registration: Job.Registration? = null
+    private var parentHandle: DisposableHandle? = null
 
     protected companion object {
         @JvmStatic
@@ -298,16 +340,16 @@
      * It shall be invoked at most once after construction after all other initialization.
      */
     public fun initParentJob(parent: Job?) {
-        check(registration == null)
+        check(parentHandle == null)
         if (parent == null) {
-            registration = EmptyRegistration
+            parentHandle = NonDisposableHandle
             return
         }
         // directly pass HandlerNode to parent scope to optimize one closure object (see makeNode)
         val newRegistration = parent.invokeOnCompletion(ParentOnCompletion(parent, this))
-        registration = newRegistration
+        parentHandle = newRegistration
         // now check our state _after_ registering (see updateState order of actions)
-        if (isCompleted) newRegistration.unregister()
+        if (isCompleted) newRegistration.dispose()
     }
 
     internal open fun onParentCompletion(cause: Throwable?) {
@@ -338,7 +380,7 @@
         require(expect is Incomplete && update !is Incomplete) // only incomplete -> completed transition is allowed
         if (!STATE.compareAndSet(this, expect, update)) return false
         // Unregister from parent job
-        registration?.unregister() // volatile read registration _after_ state was updated
+        parentHandle?.dispose() // volatile read parentHandle _after_ state was updated
         return true // continues in completeUpdateState
     }
 
@@ -515,9 +557,7 @@
         }
     }
 
-    override fun onCompletion(handler: CompletionHandler): Job.Registration = invokeOnCompletion(handler)
-
-    final override fun invokeOnCompletion(handler: CompletionHandler): Job.Registration {
+    final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle {
         var nodeCache: JobNode<*>? = null
         while (true) { // lock-free loop on state
             val state = this.state
@@ -545,7 +585,7 @@
                 }
                 else -> { // is inactive
                     handler((state as? CompletedExceptionally)?.exception)
-                    return EmptyRegistration
+                    return NonDisposableHandle
                 }
             }
         }
@@ -560,7 +600,7 @@
     }
 
     private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
-        cont.unregisterOnCompletion(invokeOnCompletion(ResumeOnCompletion(this, cont)))
+        cont.disposeOnCompletion(invokeOnCompletion(ResumeOnCompletion(this, cont)))
     }
 
     override fun <R> registerSelectJoin(select: SelectInstance<R>, block: suspend () -> R) {
@@ -576,7 +616,7 @@
             }
             if (startInternal(state) == 0) {
                 // slow-path -- register waiter for completion
-                select.unregisterOnCompletion(invokeOnCompletion(SelectJoinOnCompletion(this, select, block)))
+                select.disposeOnSelect(invokeOnCompletion(SelectJoinOnCompletion(this, select, block)))
                 return
             }
         }
@@ -731,12 +771,12 @@
 
 internal abstract class JobNode<out J : Job>(
     @JvmField val job: J
-) : LockFreeLinkedListNode(), Job.Registration, CompletionHandler, JobSupport.Incomplete {
+) : LockFreeLinkedListNode(), DisposableHandle, CompletionHandler, JobSupport.Incomplete {
     final override val isActive: Boolean get() = true
     final override val idempotentStart: Any? get() = null
     // if unregister is called on this instance, then Job was an instance of JobSupport that added this node it itself
     // directly without wrapping
-    final override fun unregister() = (job as JobSupport).removeNode(this)
+    final override fun dispose() = (job as JobSupport).removeNode(this)
     override abstract fun invoke(reason: Throwable?)
 }
 
@@ -756,12 +796,12 @@
     override fun toString() = "ResumeOnCompletion[$continuation]"
 }
 
-internal class UnregisterOnCompletion(
+internal class DisposeOnCompletion(
     job: Job,
-    @JvmField val registration: Job.Registration
+    @JvmField val handle: DisposableHandle
 ) : JobNode<Job>(job) {
-    override fun invoke(reason: Throwable?) = registration.unregister()
-    override fun toString(): String = "UnregisterOnCompletion[$registration]"
+    override fun invoke(reason: Throwable?) = handle.dispose()
+    override fun toString(): String = "DisposeOnCompletion[$handle]"
 }
 
 private class ParentOnCompletion(
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
index 69f68db..801d741 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/NonCancellable.kt
@@ -57,10 +57,8 @@
     /** Always throws [IllegalStateException]. */
     override fun getCompletionException(): CancellationException = throw IllegalStateException("This job is always active")
 
-    override fun onCompletion(handler: CompletionHandler): Job.Registration = invokeOnCompletion(handler)
-
-    /** Always returns [EmptyRegistration]. */
-    override fun invokeOnCompletion(handler: CompletionHandler): Job.Registration = EmptyRegistration
+    /** Always returns [NonDisposableHandle]. */
+    override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = NonDisposableHandle
 
     /** Always returns `false`. */
     override fun cancel(cause: Throwable?): Boolean = false
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
index d447ad5..7163160 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Scheduled.kt
@@ -19,6 +19,7 @@
 import java.util.concurrent.ScheduledExecutorService
 import java.util.concurrent.ScheduledThreadPoolExecutor
 import java.util.concurrent.TimeUnit
+import kotlin.coroutines.experimental.ContinuationInterceptor
 import kotlin.coroutines.experimental.startCoroutine
 
 private val KEEP_ALIVE = java.lang.Long.getLong("kotlinx.coroutines.ScheduledExecutor.keepAlive", 1000L)
@@ -62,18 +63,32 @@
 /**
  * Runs a given suspending block of code inside a coroutine with a specified timeout and throws
  * [CancellationException] if timeout was exceeded.
+ *
+ * This function delegates to [Delay.invokeOnTimeout] if the context [CoroutineDispatcher]
+ * implements [Delay] interface, otherwise it tracks time using a built-in single-threaded scheduled executor service.
  */
-suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T {
+public suspend fun <T> withTimeout(time: Long, unit: TimeUnit = TimeUnit.MILLISECONDS, block: suspend () -> T): T {
     require(time >= 0) { "Timeout time $time cannot be negative" }
     if (time <= 0L) throw CancellationException("Timed out immediately")
-    return suspendCancellableCoroutine { cont: CancellableContinuation<T> ->
+    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<T> ->
         // schedule cancellation of this continuation on time
-        val timeout = scheduledExecutor.schedule({
-            // create an exception with a specific text
-            cont.cancel(CancellationException("Timed out waiting for $time $unit"))
-        }, time, unit)
-        cont.cancelFutureOnCompletion(timeout)
+        val runnable = CancelTimedOutContinuationRunnable(time, unit, cont)
+        val delay = cont.context[ContinuationInterceptor] as? Delay
+        if (delay != null)
+            cont.disposeOnCompletion(delay.invokeOnTimeout(time, unit, runnable)) else
+            cont.cancelFutureOnCompletion(scheduledExecutor.schedule(runnable, time, unit))
         // restart block in a separate coroutine using cancellable context of this continuation,
         block.startCoroutine(cont)
     }
 }
+
+private class CancelTimedOutContinuationRunnable(
+    private val time: Long,
+    private val unit: TimeUnit,
+    private val cont: CancellableContinuation<*>
+): Runnable {
+    override fun run() {
+        cont.cancel(CancellationException("Timed out waiting for $time $unit"))
+    }
+    override fun toString(): String = "CancelTimedOutContinuationRunnable[$time,$unit,$cont]"
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
index 4cb12a7..c3df491 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
@@ -72,4 +72,7 @@
         val timeout = executor.schedule(ResumeUndispatchedRunnable(this, continuation), time, unit)
         continuation.cancelFutureOnCompletion(timeout)
     }
+
+    override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle =
+        DisposableFutureHandle(executor.schedule(block, time, unit))
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
index 7fe9e6b..e9b8b56 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/channels/AbstractChannel.kt
@@ -148,7 +148,7 @@
      */
     protected fun describeSendBuffered(element: E): AddLastDesc<*> = SendBufferedDesc(queue, element)
 
-    private class SendBufferedDesc<E>(
+    private class SendBufferedDesc<out E>(
         queue: LockFreeLinkedListHead,
         element: E
     ) : AddLastDesc<SendBuffered<E>>(queue, SendBuffered(element)) {
@@ -289,7 +289,7 @@
         override fun finishOnSuccess(affected: LockFreeLinkedListNode, next: LockFreeLinkedListNode) {
             super.finishOnSuccess(affected, next)
             // we can actually remove on select start, but this is also Ok (it'll get removed if discovered there)
-            node.removeOnSelectCompletion()
+            node.disposeOnSelect()
         }
     }
 
@@ -694,7 +694,7 @@
         override val pollResult: Any?,
         @JvmField val select: SelectInstance<R>,
         @JvmField val block: suspend () -> R
-    ) : LockFreeLinkedListNode(), Send, CompletionHandler {
+    ) : LockFreeLinkedListNode(), Send, DisposableHandle {
         override fun tryResumeSend(idempotent: Any?): Any? =
             if (select.trySelect(idempotent)) SELECT_STARTED else null
 
@@ -703,11 +703,11 @@
             block.startCoroutine(select.completion)
         }
 
-        fun removeOnSelectCompletion() {
-            select.invokeOnCompletion(this)
+        fun disposeOnSelect() {
+            select.disposeOnSelect(this)
         }
 
-        override fun invoke(cause: Throwable?) {
+        override fun dispose() {
             remove()
         }
 
@@ -820,7 +820,7 @@
         @JvmField val select: SelectInstance<R>,
         @JvmField val block: suspend (E?) -> R,
         @JvmField val nullOnClose: Boolean
-    ) : Receive<E>(), CompletionHandler {
+    ) : Receive<E>(), DisposableHandle {
         override fun tryResumeReceive(value: E, idempotent: Any?): Any?  =
             if (select.trySelect(idempotent)) (value ?: NULL_VALUE) else null
 
@@ -839,9 +839,11 @@
             }
         }
 
-        fun removeOnSelectCompletion() { select.invokeOnCompletion(this) }
+        fun removeOnSelectCompletion() {
+            select.disposeOnSelect(this)
+        }
 
-        override fun invoke(cause: Throwable?) { // invoked on select completion
+        override fun dispose() { // invoked on select completion
             if (remove())
                 onCancelledReceive() // notify cancellation of receive
         }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
index 1c7041f..815d7d9 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/selects/Select.kt
@@ -114,11 +114,10 @@
 
     public fun resumeSelectWithException(exception: Throwable, mode: Int)
 
-    public fun invokeOnCompletion(handler: CompletionHandler): Job.Registration
-
-    public fun unregisterOnCompletion(registration: Job.Registration)
-
-    public fun removeOnCompletion(node: LockFreeLinkedListNode)
+    // This function is actually implemented to dispose the handle only when the whole
+    // select expression complete. It is later than it could be, but if resource will get released anyway
+    // :todo: Invoke this function on select really
+    public fun disposeOnSelect(handle: DisposableHandle)
 }
 
 /**
@@ -241,19 +240,7 @@
         registerSelectLock(this@SelectBuilderImpl, owner, block)
     }
 
-    override fun unregisterOnCompletion(registration: Job.Registration) {
-        invokeOnCompletion(UnregisterOnCompletion(this, registration))
+    override fun disposeOnSelect(handle: DisposableHandle) {
+        invokeOnCompletion(DisposeOnCompletion(this, handle))
     }
-
-    override fun removeOnCompletion(node: LockFreeLinkedListNode) {
-        invokeOnCompletion(RemoveOnCompletion(this, node))
-    }
-}
-
-private class RemoveOnCompletion(
-    select: SelectBuilderImpl<*>,
-    @JvmField val node: LockFreeLinkedListNode
-) : JobNode<SelectBuilderImpl<*>>(select) {
-    override fun invoke(reason: Throwable?) { node.remove() }
-    override fun toString(): String = "RemoveOnCompletion[$node]"
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
index 26c1a5b..66d32b1 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/sync/Mutex.kt
@@ -232,7 +232,7 @@
                     val failure = select.performAtomicIfNotSelected(enqueueOp)
                     when {
                         failure == null -> { // successfully enqueued
-                            select.removeOnCompletion(enqueueOp.node)
+                            select.disposeOnSelect(enqueueOp.node)
                             return
                         }
                         failure === ALREADY_SELECTED -> return // already selected -- bail out
@@ -345,7 +345,8 @@
 
     private abstract class LockWaiter(
         @JvmField val owner: Any?
-    ) : LockFreeLinkedListNode() {
+    ) : LockFreeLinkedListNode(), DisposableHandle {
+        final override fun dispose() { remove() }
         abstract fun tryResumeLockWaiter(): Any?
         abstract fun completeResumeLockWaiter(token: Any)
     }
diff --git a/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt b/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
index ce21513..b1587c5 100644
--- a/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
+++ b/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
@@ -50,14 +50,26 @@
     }
 
     override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
-        val handler = EventHandler<ActionEvent> {
+        val timeline = schedule(time, unit, EventHandler<ActionEvent> {
             with(continuation) { resumeUndispatched(Unit) }
-        }
-        val timeline = Timeline(KeyFrame(Duration.millis(unit.toMillis(time).toDouble()), handler))
-        timeline.play()
+        })
         continuation.invokeOnCompletion { timeline.stop() }
     }
 
+    override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
+        val timeline = schedule(time, unit, EventHandler<ActionEvent> {
+            block.run()
+        })
+        return object : DisposableHandle {
+            override fun dispose() {
+                timeline.stop()
+            }
+        }
+    }
+
+    private fun schedule(time: Long, unit: TimeUnit, handler: EventHandler<ActionEvent>): Timeline =
+        Timeline(KeyFrame(Duration.millis(unit.toMillis(time).toDouble()), handler)).apply { play() }
+
     private class PulseTimer : AnimationTimer() {
         val next = CopyOnWriteArrayList<CancellableContinuation<Long>>()
 
diff --git a/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt b/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
index 08cb0da..44695c6 100644
--- a/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
+++ b/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
@@ -19,6 +19,7 @@
 import kotlinx.coroutines.experimental.CancellableContinuation
 import kotlinx.coroutines.experimental.CoroutineDispatcher
 import kotlinx.coroutines.experimental.Delay
+import kotlinx.coroutines.experimental.DisposableHandle
 import kotlinx.coroutines.experimental.swing.Swing.delay
 import java.awt.event.ActionListener
 import java.util.concurrent.TimeUnit
@@ -33,16 +34,28 @@
     override fun dispatch(context: CoroutineContext, block: Runnable) = SwingUtilities.invokeLater(block)
 
     override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
-        val timerTime = unit.toMillis(time).coerceAtMost(Int.MAX_VALUE.toLong()).toInt()
-        val action = ActionListener {
+        val timer = schedule(time, unit, ActionListener {
             with(continuation) { resumeUndispatched(Unit) }
+        })
+        continuation.invokeOnCompletion { timer.stop() }
+    }
+
+    override fun invokeOnTimeout(time: Long, unit: TimeUnit, block: Runnable): DisposableHandle {
+        val timer = schedule(time, unit, ActionListener {
+            block.run()
+        })
+        return object : DisposableHandle {
+            override fun dispose() {
+                timer.stop()
+            }
         }
-        val timer = Timer(timerTime, action).apply {
+    }
+
+    private fun schedule(time: Long, unit: TimeUnit, action: ActionListener): Timer =
+        Timer(unit.toMillis(time).coerceAtMost(Int.MAX_VALUE.toLong()).toInt(), action).apply {
             isRepeats = false
             start()
         }
-        continuation.invokeOnCompletion { timer.stop() }
-    }
 
     override fun toString() = "Swing"
 }