Added context parameter to CoroutineDispatcher methods, implemented Executor.toCoroutineDispatcher
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
index 36d3cbf..d91ac74 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CommonPool.kt
@@ -4,6 +4,7 @@
 import java.util.concurrent.Executors
 import java.util.concurrent.ForkJoinPool
 import java.util.concurrent.atomic.AtomicInteger
+import kotlin.coroutines.CoroutineContext
 
 /**
  * Represents common pool of shared threads as coroutine dispatcher for compute-intensive tasks.
@@ -11,7 +12,7 @@
  * coroutine resumption is dispatched as a separate task even when it already executes inside the pool.
  * When available, it wraps [ForkJoinPool.commonPool] and provides a similar shared pool where not.
  */
-object CommonPool : CoroutineDispatcher(), Yield {
+object CommonPool : CoroutineDispatcher() {
     private val pool: Executor = findPool()
 
     private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }
@@ -35,10 +36,6 @@
 
     private fun defaultParallelism() = (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)
 
-    override fun isDispatchNeeded(): Boolean = true
-    override fun dispatch(block: Runnable) = pool.execute(block)
-
-    override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
-        pool.execute { continuation.resume(Unit) }
-    }
+    override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
+    override fun dispatch(context: CoroutineContext, block: Runnable) = pool.execute(block)
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
index 055f6f3..78d810c 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CoroutineDispatcher.kt
@@ -3,6 +3,7 @@
 import kotlin.coroutines.AbstractCoroutineContextElement
 import kotlin.coroutines.Continuation
 import kotlin.coroutines.ContinuationInterceptor
+import kotlin.coroutines.CoroutineContext
 
 /**
  * Base class that shall be extended by all coroutine dispatcher implementations.
@@ -30,12 +31,12 @@
     /**
      * Return `true` if execution shall be dispatched onto another thread.
      */
-    public abstract fun isDispatchNeeded(): Boolean
+    public abstract fun isDispatchNeeded(context: CoroutineContext): Boolean
 
     /**
-     * Dispatches execution of a runnable [block] onto another thread.
+     * Dispatches execution of a runnable [block] onto another thread in the given [context].
      */
-    public abstract fun dispatch(block: Runnable)
+    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
 
     override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
             DispatchedContinuation<T>(this, continuation)
@@ -46,27 +47,29 @@
         val continuation: Continuation<T>
 ): Continuation<T> by continuation {
     override fun resume(value: T) {
-        if (dispatcher.isDispatchNeeded())
-            dispatcher.dispatch(Runnable {
-                withDefaultCoroutineContext(continuation.context) {
+        val context = continuation.context
+        if (dispatcher.isDispatchNeeded(context))
+            dispatcher.dispatch(context, Runnable {
+                withDefaultCoroutineContext(context) {
                     continuation.resume(value)
                 }
             })
         else
-            withDefaultCoroutineContext(continuation.context) {
+            withDefaultCoroutineContext(context) {
                 continuation.resume(value)
             }
     }
 
     override fun resumeWithException(exception: Throwable) {
-        if (dispatcher.isDispatchNeeded())
-            dispatcher.dispatch(Runnable {
-                withDefaultCoroutineContext(continuation.context) {
+        val context = continuation.context
+        if (dispatcher.isDispatchNeeded(context))
+            dispatcher.dispatch(context, Runnable {
+                withDefaultCoroutineContext(context) {
                     continuation.resumeWithException(exception)
                 }
             })
         else
-            withDefaultCoroutineContext(continuation.context) {
+            withDefaultCoroutineContext(context) {
                 continuation.resumeWithException(exception)
             }
     }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
index 0e89f94..2de613d 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/CurrentCoroutineContext.kt
@@ -19,8 +19,8 @@
  * mandating any specific threading policy.
  */
 public object Here : CoroutineDispatcher() {
-    override fun isDispatchNeeded(): Boolean = false
-    override fun dispatch(block: Runnable) { throw UnsupportedOperationException() }
+    override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
+    override fun dispatch(context: CoroutineContext, block: Runnable) { throw UnsupportedOperationException() }
 }
 
 /**
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
index bd0a1f4..653c9bd 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Delay.kt
@@ -43,7 +43,6 @@
             scheduleResumeAfterDelay(time, unit, cont)
             return@sc
         }
-        val timeout = scheduledExecutor.schedule({ cont.resume(Unit) }, time, unit)
-        cont.cancelFutureOnCompletion(timeout)
+        scheduledExecutor.scheduleResumeAfterDelay(time, unit, cont)
     }
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
index 66b4cba..ce35930 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/EventLoop.kt
@@ -4,6 +4,7 @@
 import kotlinx.coroutines.experimental.internal.LockFreeLinkedListNode
 import java.util.concurrent.locks.LockSupport
 import kotlin.coroutines.Continuation
+import kotlin.coroutines.CoroutineContext
 
 /**
  * Implemented by [CoroutineDispatcher] implementations that have event loop inside and can
@@ -49,9 +50,9 @@
         this.parentJob = coroutine
     }
 
-    override fun isDispatchNeeded(): Boolean = Thread.currentThread() != thread
+    override fun isDispatchNeeded(context: CoroutineContext): Boolean = Thread.currentThread() != thread
 
-    override fun dispatch(block: Runnable) {
+    override fun dispatch(context: CoroutineContext, block: Runnable) {
         schedule(Dispatch(block))
     }
 
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
new file mode 100644
index 0000000..6aea7d1
--- /dev/null
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
@@ -0,0 +1,32 @@
+package kotlinx.coroutines.experimental
+
+import java.util.concurrent.Executor
+import java.util.concurrent.ExecutorService
+import java.util.concurrent.ScheduledExecutorService
+import java.util.concurrent.TimeUnit
+import kotlin.coroutines.CoroutineContext
+
+/**
+ * Converts an instance of [Executor] to an implementation of [CoroutineDispatcher].
+ */
+public fun Executor.toCoroutineDispatcher(): CoroutineDispatcher =
+    ExecutorCoroutineDispatcher(this)
+
+internal open class ExecutorCoroutineDispatcher(val executor: Executor) : CoroutineDispatcher(), Delay {
+    override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
+    override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
+
+    override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
+        (executor as? ScheduledExecutorService ?: scheduledExecutor).scheduleResumeAfterDelay(time, unit, continuation)
+    }
+}
+
+internal fun ExecutorService.scheduleResume(cont: CancellableContinuation<Unit>) {
+    val future = submit { cont.resume(Unit) }
+    cont.cancelFutureOnCompletion(future)
+}
+
+internal fun ScheduledExecutorService.scheduleResumeAfterDelay(time: Long, unit: TimeUnit, cont: CancellableContinuation<Unit>) {
+    val timeout = schedule({ cont.resume(Unit) }, time, unit)
+    cont.cancelFutureOnCompletion(timeout)
+}
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
index b456c12..dbfab01 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
@@ -5,11 +5,10 @@
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicInteger
 import kotlin.concurrent.thread
-import kotlin.coroutines.ContinuationInterceptor
 import kotlin.coroutines.CoroutineContext
 
 /**
- * Creates new coroutine execution context with the a single thread and built-in [delay] support.
+ * Creates new coroutine execution context with the a single thread and built-in [yield] and [delay] support.
  * All continuations are dispatched immediately when invoked inside the thread of this context.
  * Resources of this pool (its thread) are reclaimed when job of this context is cancelled.
  * The specified [name] defines the name of the new thread.
@@ -19,7 +18,7 @@
     newFixedThreadPoolContext(1, name, parent)
 
 /**
- * Creates new coroutine execution context with the fixed-size thread-pool and built-in [delay] support.
+ * Creates new coroutine execution context with the fixed-size thread-pool and built-in [yield] and [delay] support.
  * All continuations are dispatched immediately when invoked inside the threads of this context.
  * Resources of this pool (its threads) are reclaimed when job of this context is cancelled.
  * The specified [name] defines the names of the threads.
@@ -27,8 +26,8 @@
  */
 fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null): CoroutineContext {
     require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
-    val lifetime = Job(parent)
-    return lifetime + ThreadPoolDispatcher(nThreads, name, lifetime)
+    val job = Job(parent)
+    return job + ThreadPoolDispatcher(nThreads, name, job)
 }
 
 private val thisThreadContext = ThreadLocal<ThreadPoolDispatcher>()
@@ -37,7 +36,7 @@
         nThreads: Int,
         name: String,
         val job: Job
-) : CoroutineDispatcher(), ContinuationInterceptor, Yield, Delay {
+) : CoroutineDispatcher(), Yield, Delay {
     val threadNo = AtomicInteger()
     val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
         thread(start = false, isDaemon = true,
@@ -51,16 +50,15 @@
         job.onCompletion { executor.shutdown() }
     }
 
-    override fun isDispatchNeeded(): Boolean = thisThreadContext.get() != this
+    override fun isDispatchNeeded(context: CoroutineContext): Boolean = thisThreadContext.get() != this
 
-    override fun dispatch(block: Runnable) = executor.execute(block)
+    override fun dispatch(context: CoroutineContext, block: Runnable) = executor.execute(block)
 
     override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
-        executor.execute { continuation.resume(Unit) }
+        executor.scheduleResume(continuation)
     }
 
     override fun scheduleResumeAfterDelay(time: Long, unit: TimeUnit, continuation: CancellableContinuation<Unit>) {
-        val timeout = executor.schedule({ continuation.resume(Unit) }, time, unit)
-        continuation.cancelFutureOnCompletion(timeout)
+        executor.scheduleResumeAfterDelay(time, unit, continuation)
     }
 }
diff --git a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
index 9623de3..803c9c9 100644
--- a/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
+++ b/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Yield.kt
@@ -4,7 +4,8 @@
 
 /**
  * This dispatcher _feature_ is implemented by [CoroutineDispatcher] implementations that
- * natively support [yield] function.
+ * natively support [yield] function. It shall be implemented only by [CoroutineDispatcher]
+ * classes with non-trivial [CoroutineDispatcher.isDispatchNeeded] implementations.
  */
 public interface Yield {
     /**
diff --git a/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt
new file mode 100644
index 0000000..37414d6
--- /dev/null
+++ b/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt
@@ -0,0 +1,85 @@
+package kotlinx.coroutines.experimental
+
+import org.junit.After
+import org.junit.Before
+import org.junit.Test
+import java.util.concurrent.Executors
+
+class ExecutorsTest {
+    fun threadNames(): Set<String> {
+        val arrayOfThreads = Array<Thread?>(Thread.activeCount()) { null }
+        val n = Thread.enumerate(arrayOfThreads)
+        val names = hashSetOf<String>()
+        for (i in 0 until n)
+            names.add(arrayOfThreads[i]!!.name)
+        return names
+    }
+
+    lateinit var threadNamesBefore: Set<String>
+
+    @Before
+    fun before() {
+        threadNamesBefore = threadNames()
+    }
+
+    @After
+    fun after() {
+        // give threads some time to shutdown
+        val waitTill = System.currentTimeMillis() + 1000L
+        var diff: Set<String>
+        do {
+            val threadNamesAfter = threadNames()
+            diff = threadNamesAfter - threadNamesBefore
+            if (diff.isEmpty()) break
+        } while (System.currentTimeMillis() <= waitTill)
+        diff.forEach { println("Lost thread '$it'") }
+        check(diff.isEmpty()) { "Lost ${diff.size} threads"}
+    }
+
+    fun checkThreadName(prefix: String) {
+        val name = Thread.currentThread().name
+        check(name.startsWith(prefix)) { "Expected thread name to start with '$prefix', found: '$name'" }
+    }
+
+    @Test
+    fun testSingleThread() {
+        val context = newSingleThreadContext("TestThread")
+        runBlocking(context) {
+            checkThreadName("TestThread")
+        }
+        context[Job]!!.cancel()
+    }
+
+    @Test
+    fun testFixedThreadPool() {
+        val context = newFixedThreadPoolContext(2, "TestPool")
+        runBlocking(context) {
+            checkThreadName("TestPool")
+        }
+        context[Job]!!.cancel()
+    }
+
+    @Test
+    fun testToExecutor() {
+        val executor = Executors.newSingleThreadExecutor { r -> Thread(r, "TestExecutor") }
+        runBlocking(executor.toCoroutineDispatcher()) {
+            checkThreadName("TestExecutor")
+        }
+        executor.shutdown()
+    }
+
+    @Test
+    fun testTwoThreads() {
+        val ctx1 = newSingleThreadContext("Ctx1")
+        val ctx2 = newSingleThreadContext("Ctx2")
+        runBlocking(ctx1) {
+            checkThreadName("Ctx1")
+            run(ctx2) {
+                checkThreadName("Ctx2")
+            }
+            checkThreadName("Ctx1")
+        }
+        ctx1[Job]!!.cancel()
+        ctx2[Job]!!.cancel()
+    }
+}
\ No newline at end of file
diff --git a/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt b/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
index ce26153..f12a5dc 100644
--- a/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
+++ b/kotlinx-coroutines-javafx/src/main/kotlin/kotlinx/coroutines/experimental/javafx/JavaFx.kt
@@ -11,6 +11,7 @@
 import kotlinx.coroutines.experimental.javafx.JavaFx.delay
 import java.util.concurrent.CopyOnWriteArrayList
 import java.util.concurrent.TimeUnit
+import kotlin.coroutines.CoroutineContext
 
 
 /**
@@ -21,8 +22,8 @@
         PulseTimer().apply { start() }
     }
 
-    override fun isDispatchNeeded(): Boolean = !Platform.isFxApplicationThread()
-    override fun dispatch(block: Runnable) = Platform.runLater(block)
+    override fun isDispatchNeeded(context: CoroutineContext): Boolean = !Platform.isFxApplicationThread()
+    override fun dispatch(context: CoroutineContext, block: Runnable) = Platform.runLater(block)
 
     /**
      * Suspends coroutine until next JavaFx pulse and returns time of the pulse on resumption.
diff --git a/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt b/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt
index f6694ec..30a99ca 100644
--- a/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt
+++ b/kotlinx-coroutines-jdk8/src/test/kotlin/kotlinx/coroutines/experimental/future/FutureTest.kt
@@ -7,6 +7,7 @@
 import java.util.concurrent.CompletableFuture
 import java.util.concurrent.ExecutionException
 import java.util.concurrent.atomic.AtomicInteger
+import kotlin.coroutines.CoroutineContext
 import kotlin.test.assertEquals
 import kotlin.test.assertFalse
 import kotlin.test.assertTrue
@@ -123,8 +124,8 @@
     }
 
     private fun wrapContinuation(wrapper: (() -> Unit) -> Unit): CoroutineDispatcher = object : CoroutineDispatcher() {
-        override fun isDispatchNeeded(): Boolean = true
-        override fun dispatch(block: Runnable) {
+        override fun isDispatchNeeded(context: CoroutineContext): Boolean = true
+        override fun dispatch(context: CoroutineContext, block: Runnable) {
             wrapper {
                 block.run()
             }
diff --git a/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt b/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
index 8bcea9d..42ebe22 100644
--- a/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
+++ b/kotlinx-coroutines-swing/src/main/kotlin/kotlinx/coroutines/experimental/swing/Swing.kt
@@ -9,13 +9,14 @@
 import java.util.concurrent.TimeUnit
 import javax.swing.SwingUtilities
 import javax.swing.Timer
+import kotlin.coroutines.CoroutineContext
 
 /**
  * Dispatches execution onto Swing event dispatching thread and provides native [delay] support.
  */
 object Swing : CoroutineDispatcher(), Yield, Delay {
-    override fun isDispatchNeeded(): Boolean = !SwingUtilities.isEventDispatchThread()
-    override fun dispatch(block: Runnable) = SwingUtilities.invokeLater(block)
+    override fun isDispatchNeeded(context: CoroutineContext): Boolean = !SwingUtilities.isEventDispatchThread()
+    override fun dispatch(context: CoroutineContext, block: Runnable) = SwingUtilities.invokeLater(block)
 
     override fun scheduleResume(continuation: CancellableContinuation<Unit>) {
         SwingUtilities.invokeLater { continuation.resume(Unit) }