No Job in newSingleThreadContext and newFixedThreadPoolContext anymore

* This resolves the common issue of using `run(ctx)` where ctx comes from
either `newSingleThreadContext` or `newFixedThreadPoolContext`
invocation. They both used to return a combination of dispatcher + job,
and this job was overriding the parent job, thus preventing propagation
of cancellation. Not anymore.
* ThreadPoolDispatcher class is now public and is the result type for
both functions. It has the `close` method to release the thread pool.

Fixes #149
Fixes #151
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
index 142002a..a55dd31 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/Executors.kt
@@ -39,9 +39,15 @@
 
 private class ExecutorCoroutineDispatcher(override val executor: Executor) : ExecutorCoroutineDispatcherBase()
 
-internal abstract class ExecutorCoroutineDispatcherBase : CoroutineDispatcher(), Delay {
-    abstract val executor: Executor
-
+/**
+ * @suppress **This is unstable API and it is subject to change.**
+ */
+public abstract class ExecutorCoroutineDispatcherBase : CoroutineDispatcher(), Delay {
+    /**
+     * @suppress **This is unstable API and it is subject to change.**
+     */
+    internal abstract val executor: Executor
+    
     override fun dispatch(context: CoroutineContext, block: Runnable) =
         try { executor.execute(timeSource.trackTask(block)) }
         catch (e: RejectedExecutionException) {
diff --git a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
index c088b62..d9e1006 100644
--- a/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
+++ b/core/kotlinx-coroutines-core/src/main/kotlin/kotlinx/coroutines/experimental/ThreadPoolDispatcher.kt
@@ -16,6 +16,7 @@
 
 package kotlinx.coroutines.experimental
 
+import java.io.Closeable
 import java.util.concurrent.Executors
 import java.util.concurrent.ScheduledExecutorService
 import java.util.concurrent.atomic.AtomicInteger
@@ -23,27 +24,43 @@
 
 /**
  * Creates new coroutine execution context with the a single thread and built-in [yield] and [delay] support.
- * All continuations are dispatched immediately when invoked inside the thread of this context.
- * Resources of this pool (its thread) are reclaimed when job of this context is cancelled.
- * The specified [name] defines the name of the new thread.
- * An optional [parent] job may be specified upon creation.
+ * **NOTE: The resulting [ThreadPoolDispatcher] owns native resources (its thread).
+ * Resources are reclaimed by [ThreadPoolDispatcher.close].**
+ *
+ * @param name the base name of the created thread.
  */
+fun newSingleThreadContext(name: String): ThreadPoolDispatcher =
+    newFixedThreadPoolContext(1, name)
+
+/**
+ * @suppress **Deprecated**: Parent job is no longer supported.
+ */
+@Deprecated(message = "Parent job is no longer supported, `close` the resulting ThreadPoolDispatcher to release resources",
+    level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("newSingleThreadContext(name)"))
 fun newSingleThreadContext(name: String, parent: Job? = null): CoroutineContext =
-    newFixedThreadPoolContext(1, name, parent)
+    newFixedThreadPoolContext(1, name)
 
 /**
  * Creates new coroutine execution context with the fixed-size thread-pool and built-in [yield] and [delay] support.
- * All continuations are dispatched immediately when invoked inside the threads of this context.
- * Resources of this pool (its threads) are reclaimed when job of this context is cancelled.
- * The specified [name] defines the names of the threads.
- * An optional [parent] job may be specified upon creation.
+ * **NOTE: The resulting [ThreadPoolDispatcher] owns native resources (its threads).
+ * Resources are reclaimed by [ThreadPoolDispatcher.close].**
+ *
+ * @param nThreads the number of threads.
+ * @param name the base name of the created threads.
  */
-fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null): CoroutineContext {
+fun newFixedThreadPoolContext(nThreads: Int, name: String): ThreadPoolDispatcher {
     require(nThreads >= 1) { "Expected at least one thread, but $nThreads specified" }
-    val job = Job(parent)
-    return job + ThreadPoolDispatcher(nThreads, name, job)
+    return ThreadPoolDispatcher(nThreads, name)
 }
 
+/**
+ * @suppress **Deprecated**: Parent job is no longer supported.
+ */
+@Deprecated(message = "Parent job is no longer supported, `close` the resulting ThreadPoolDispatcher to release resources",
+    level = DeprecationLevel.WARNING, replaceWith = ReplaceWith("newFixedThreadPoolContext(nThreads, name)"))
+fun newFixedThreadPoolContext(nThreads: Int, name: String, parent: Job? = null): CoroutineContext =
+    newFixedThreadPoolContext(nThreads, name)
+
 internal class PoolThread(
     @JvmField val dispatcher: ThreadPoolDispatcher, // for debugging & tests
     target: Runnable, name: String
@@ -51,19 +68,25 @@
     init { isDaemon = true }
 }
 
-internal class ThreadPoolDispatcher(
+/**
+ * Dispatches coroutine execution to a thread pool of a fixed size. Instances of this dispatcher are
+ * created with [newSingleThreadContext] and [newFixedThreadPoolContext].
+ */
+public class ThreadPoolDispatcher internal constructor(
     private val nThreads: Int,
-    private val name: String,
-    job: Job
-) : ExecutorCoroutineDispatcherBase() {
+    private val name: String
+) : ExecutorCoroutineDispatcherBase(), Closeable {
     private val threadNo = AtomicInteger()
 
-    override val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
+    internal override val executor: ScheduledExecutorService = Executors.newScheduledThreadPool(nThreads) { target ->
         PoolThread(this, target, if (nThreads == 1) name else name + "-" + threadNo.incrementAndGet())
     }
 
-    init {
-        job.invokeOnCompletion { executor.shutdown() }
+    /**
+     * Closes this dispatcher -- shuts down all threads in this pool and releases resources.
+     */
+    public override fun close() {
+        executor.shutdown()
     }
 
     override fun toString(): String = "ThreadPoolDispatcher[$nThreads, $name]"
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt
index 228b4fc..43caf8b 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/guide/example-context-04.kt
@@ -22,13 +22,15 @@
 fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
 
 fun main(args: Array<String>) {
-    val ctx1 = newSingleThreadContext("Ctx1")
-    val ctx2 = newSingleThreadContext("Ctx2")
-    runBlocking(ctx1) {
-        log("Started in ctx1")
-        run(ctx2) {
-            log("Working in ctx2")
+    newSingleThreadContext("Ctx1").use { ctx1 ->
+        newSingleThreadContext("Ctx2").use { ctx2 ->
+            runBlocking(ctx1) {
+                log("Started in ctx1")
+                run(ctx2) {
+                    log("Working in ctx2")
+                }
+                log("Back to ctx1")
+            }
         }
-        log("Back to ctx1")
     }
 }
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt
index 71b3f35..0ae885d 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/ExecutorsTest.kt
@@ -31,7 +31,7 @@
         runBlocking(context) {
             checkThreadName("TestThread")
         }
-        context[Job]!!.cancel()
+        context.close()
     }
 
     @Test
@@ -40,7 +40,7 @@
         runBlocking(context) {
             checkThreadName("TestPool")
         }
-        context[Job]!!.cancel()
+        context.close()
     }
 
     @Test
@@ -63,7 +63,7 @@
             }
             checkThreadName("Ctx1")
         }
-        ctx1[Job]!!.cancel()
-        ctx2[Job]!!.cancel()
+        ctx1.close()
+        ctx2.close()
     }
 }
\ No newline at end of file
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt
index 8b087ca..220afbe 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/BroadcastChannelMultiReceiveStressTest.kt
@@ -52,7 +52,7 @@
 
     @After
     fun tearDown() {
-        pool.cancel()
+        pool.close()
     }
 
     @Test
diff --git a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt
index 0025df7..fbc55bc 100644
--- a/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt
+++ b/core/kotlinx-coroutines-core/src/test/kotlin/kotlinx/coroutines/experimental/channels/ConflatedChannelCloseStressTest.kt
@@ -35,7 +35,9 @@
     val pool = newFixedThreadPoolContext(nSenders + 2, "TestStressClose")
 
     @After
-    fun tearDown() { pool[Job]!!.cancel() }
+    fun tearDown() {
+        pool.close()
+    }
 
     @Test
     fun testStressClose() = runBlocking<Unit> {