blob: ca0b2ff6bea3c4a104dcffbf48d0e83bc6d122b8 [file] [log] [blame]
Roman Elizarov769d7dc2018-04-26 12:13:40 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030017package kotlinx.coroutines.experimental
18
19import kotlinx.atomicfu.atomic
Roman Elizarove0b6db02018-04-28 19:22:29 +030020import kotlinx.coroutines.experimental.internalAnnotations.Volatile
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030021
22/**
Roman Elizarov189e9952018-04-26 11:06:37 +030023 * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
24 * when all deferred computations are complete or resumes with the first thrown exception if any of computations
25 * complete exceptionally including cancellation.
26 *
27 * This function is **not** equivalent to `deferreds.map { it.await() }` which fails only when when it sequentially
28 * gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
29 *
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030030 * This suspending function is cancellable.
31 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
32 * this function immediately resumes with [CancellationException].
33 */
Roman Elizarov189e9952018-04-26 11:06:37 +030034public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
35 if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030036
37/**
Roman Elizarov189e9952018-04-26 11:06:37 +030038 * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
39 * when all deferred computations are complete or resumes with the first thrown exception if any of computations
40 * complete exceptionally including cancellation.
41 *
42 * This function is **not** equivalent to `this.map { it.await() }` which fails only when when it sequentially
43 * gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
44 *
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030045 * This suspending function is cancellable.
Roman Elizarov203abb02018-04-12 12:34:14 +030046 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030047 * this function immediately resumes with [CancellationException].
48 */
Roman Elizarov189e9952018-04-26 11:06:37 +030049public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
50 if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030051
52/**
Roman Elizarov189e9952018-04-26 11:06:37 +030053 * Suspends current coroutine until all given jobs are complete.
54 * This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
55 *
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030056 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
57 * this function immediately resumes with [CancellationException].
58 */
Vsevolod Tolstopyatov9c692792018-04-11 18:22:35 +030059public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030060
61/**
Roman Elizarov189e9952018-04-26 11:06:37 +030062 * Suspends current coroutine until all given jobs are complete.
63 * This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
64 *
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030065 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
66 * this function immediately resumes with [CancellationException].
67 */
Roman Elizarov203abb02018-04-12 12:34:14 +030068public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030069
Roman Elizarov189e9952018-04-26 11:06:37 +030070private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
71 private val notCompletedCount = atomic(deferreds.size)
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030072
Roman Elizarov189e9952018-04-26 11:06:37 +030073 suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
Roman Elizarove0b6db02018-04-28 19:22:29 +030074 // Intricate dance here
75 // Step 1: Create nodes and install them as completion handlers, they may fire!
76 val nodes = Array<AwaitAllNode>(deferreds.size) { i ->
77 val deferred = deferreds[i]
78 deferred.start() // To properly await lazily started deferreds
79 AwaitAllNode(cont, deferred).apply {
80 handle = deferred.invokeOnCompletion(asHandler)
81 }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +030082 }
Roman Elizarove0b6db02018-04-28 19:22:29 +030083 val disposer = DisposeHandlersOnCancel(nodes)
84 // Step 2: Set disposer to each node
85 nodes.forEach { it.disposer = disposer }
86 // Here we know that if any code the nodes complete, it will dipsose the rest
87 // Step 3: Now we can check if continuation is complete
88 if (cont.isCompleted) {
89 // it is already complete while handlers were being installed -- dispose them all
90 disposer.disposeAll()
91 } else {
92 cont.invokeOnCancellation(handler = disposer.asHandler)
93 }
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +030094 }
95
Roman Elizarove0b6db02018-04-28 19:22:29 +030096 private inner class DisposeHandlersOnCancel(private val nodes: Array<AwaitAllNode>) : CancelHandler() {
97 fun disposeAll() {
98 nodes.forEach { it.handle.dispose() }
Roman Elizarov3e9f2442018-04-28 17:38:22 +030099 }
Roman Elizarove0b6db02018-04-28 19:22:29 +0300100
101 override fun invoke(cause: Throwable?) { disposeAll() }
102 override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
Roman Elizarov3e9f2442018-04-28 17:38:22 +0300103 }
104
105 private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
Roman Elizarove0b6db02018-04-28 19:22:29 +0300106 lateinit var handle: DisposableHandle
107
108 @Volatile
109 var disposer: DisposeHandlersOnCancel? = null
110
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300111 override fun invoke(cause: Throwable?) {
112 if (cause != null) {
113 val token = continuation.tryResumeWithException(cause)
114 if (token != null) {
115 continuation.completeResume(token)
Roman Elizarove0b6db02018-04-28 19:22:29 +0300116 // volatile read of disposer AFTER continuation is complete
117 val disposer = this.disposer
118 // and if disposer was already set (all handlers where already installed, then dispose them all)
119 if (disposer != null) disposer.disposeAll()
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300120 }
121 } else if (notCompletedCount.decrementAndGet() == 0) {
Roman Elizarov189e9952018-04-26 11:06:37 +0300122 continuation.resume(deferreds.map { it.getCompleted() })
Roman Elizarove0b6db02018-04-28 19:22:29 +0300123 // Note, that all deferreds are complete here, so we don't need to dispose their nodes
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +0300124 }
125 }
126 }
127}