blob: 5186933fba2020505fb3314e39d470edae0315b0 [file] [log] [blame]
Roman Elizarov769d7dc2018-04-26 12:13:40 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov769d7dc2018-04-26 12:13:40 +03003 */
4
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +03005package kotlinx.coroutines.experimental
6
7import kotlinx.atomicfu.atomic
Roman Elizarove0b6db02018-04-28 19:22:29 +03008import kotlinx.coroutines.experimental.internalAnnotations.Volatile
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +03009
10/**
Roman Elizarov189e9952018-04-26 11:06:37 +030011 * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
12 * when all deferred computations are complete or resumes with the first thrown exception if any of computations
13 * complete exceptionally including cancellation.
14 *
15 * This function is **not** equivalent to `deferreds.map { it.await() }` which fails only when when it sequentially
16 * gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
17 *
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030018 * This suspending function is cancellable.
19 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
20 * this function immediately resumes with [CancellationException].
21 */
Roman Elizarov189e9952018-04-26 11:06:37 +030022public suspend fun <T> awaitAll(vararg deferreds: Deferred<T>): List<T> =
23 if (deferreds.isEmpty()) emptyList() else AwaitAll(deferreds).await()
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030024
25/**
Roman Elizarov189e9952018-04-26 11:06:37 +030026 * Awaits for completion of given deferred values without blocking a thread and resumes normally with the list of values
27 * when all deferred computations are complete or resumes with the first thrown exception if any of computations
28 * complete exceptionally including cancellation.
29 *
30 * This function is **not** equivalent to `this.map { it.await() }` which fails only when when it sequentially
31 * gets to wait the failing deferred, while this `awaitAll` fails immediately as soon as any of the deferreds fail.
32 *
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030033 * This suspending function is cancellable.
Roman Elizarov203abb02018-04-12 12:34:14 +030034 * If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030035 * this function immediately resumes with [CancellationException].
36 */
Roman Elizarov189e9952018-04-26 11:06:37 +030037public suspend fun <T> Collection<Deferred<T>>.awaitAll(): List<T> =
38 if (isEmpty()) emptyList() else AwaitAll(toTypedArray()).await()
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030039
40/**
Roman Elizarov189e9952018-04-26 11:06:37 +030041 * Suspends current coroutine until all given jobs are complete.
42 * This method is semantically equivalent to joining all given jobs one by one with `jobs.forEach { it.join() }`.
43 *
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030044 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
45 * this function immediately resumes with [CancellationException].
46 */
Vsevolod Tolstopyatov9c692792018-04-11 18:22:35 +030047public suspend fun joinAll(vararg jobs: Job): Unit = jobs.forEach { it.join() }
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030048
49/**
Roman Elizarov189e9952018-04-26 11:06:37 +030050 * Suspends current coroutine until all given jobs are complete.
51 * This method is semantically equivalent to joining all given jobs one by one with `forEach { it.join() }`.
52 *
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030053 * This suspending function is cancellable. If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting,
54 * this function immediately resumes with [CancellationException].
55 */
Roman Elizarov203abb02018-04-12 12:34:14 +030056public suspend fun Collection<Job>.joinAll(): Unit = forEach { it.join() }
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030057
Roman Elizarov189e9952018-04-26 11:06:37 +030058private class AwaitAll<T>(private val deferreds: Array<out Deferred<T>>) {
59 private val notCompletedCount = atomic(deferreds.size)
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +030060
Roman Elizarov189e9952018-04-26 11:06:37 +030061 suspend fun await(): List<T> = suspendCancellableCoroutine { cont ->
Roman Elizarove0b6db02018-04-28 19:22:29 +030062 // Intricate dance here
63 // Step 1: Create nodes and install them as completion handlers, they may fire!
64 val nodes = Array<AwaitAllNode>(deferreds.size) { i ->
65 val deferred = deferreds[i]
66 deferred.start() // To properly await lazily started deferreds
67 AwaitAllNode(cont, deferred).apply {
68 handle = deferred.invokeOnCompletion(asHandler)
69 }
Roman Elizarov6d9f40f2018-04-28 14:44:02 +030070 }
Roman Elizarove0b6db02018-04-28 19:22:29 +030071 val disposer = DisposeHandlersOnCancel(nodes)
72 // Step 2: Set disposer to each node
73 nodes.forEach { it.disposer = disposer }
74 // Here we know that if any code the nodes complete, it will dipsose the rest
75 // Step 3: Now we can check if continuation is complete
76 if (cont.isCompleted) {
77 // it is already complete while handlers were being installed -- dispose them all
78 disposer.disposeAll()
79 } else {
80 cont.invokeOnCancellation(handler = disposer.asHandler)
81 }
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +030082 }
83
Roman Elizarove0b6db02018-04-28 19:22:29 +030084 private inner class DisposeHandlersOnCancel(private val nodes: Array<AwaitAllNode>) : CancelHandler() {
85 fun disposeAll() {
86 nodes.forEach { it.handle.dispose() }
Roman Elizarov3e9f2442018-04-28 17:38:22 +030087 }
Roman Elizarove0b6db02018-04-28 19:22:29 +030088
89 override fun invoke(cause: Throwable?) { disposeAll() }
90 override fun toString(): String = "DisposeHandlersOnCancel[$nodes]"
Roman Elizarov3e9f2442018-04-28 17:38:22 +030091 }
92
93 private inner class AwaitAllNode(private val continuation: CancellableContinuation<List<T>>, job: Job) : JobNode<Job>(job) {
Roman Elizarove0b6db02018-04-28 19:22:29 +030094 lateinit var handle: DisposableHandle
95
96 @Volatile
97 var disposer: DisposeHandlersOnCancel? = null
98
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +030099 override fun invoke(cause: Throwable?) {
100 if (cause != null) {
101 val token = continuation.tryResumeWithException(cause)
102 if (token != null) {
103 continuation.completeResume(token)
Roman Elizarove0b6db02018-04-28 19:22:29 +0300104 // volatile read of disposer AFTER continuation is complete
105 val disposer = this.disposer
106 // and if disposer was already set (all handlers where already installed, then dispose them all)
107 if (disposer != null) disposer.disposeAll()
Vsevolod Tolstopyatov05d38232018-04-12 20:16:37 +0300108 }
109 } else if (notCompletedCount.decrementAndGet() == 0) {
Roman Elizarov189e9952018-04-26 11:06:37 +0300110 continuation.resume(deferreds.map { it.getCompleted() })
Roman Elizarove0b6db02018-04-28 19:22:29 +0300111 // Note, that all deferreds are complete here, so we don't need to dispose their nodes
Vsevolod Tolstopyatov4f0d48b2018-04-11 13:51:33 +0300112 }
113 }
114 }
115}