blob: 6751af3f6bd46fba03c9ee7ba43a2070ffdbcced [file] [log] [blame]
Sergey Mashkov0939f232017-12-22 19:57:30 +03001package kotlinx.coroutines.experimental.io.internal
2
3import kotlinx.atomicfu.*
4import kotlinx.coroutines.experimental.*
5import kotlin.coroutines.experimental.*
6import kotlin.coroutines.experimental.intrinsics.*
7
8/**
9 * Semi-cancellable reusable continuation. Unlike regular continuation this implementation has limitations:
10 * - could be resumed only once per swap, undefined behaviour otherwise
11 * - [T] should be neither [Throwable] nor [Continuation]
12 * - value shouldn't be null
13 */
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +030014internal class MutableDelegateContinuation<T : Any> : Continuation<T>, DispatchedTask<T> {
15 private var _delegate: Continuation<T>? = null
Sergey Mashkov0939f232017-12-22 19:57:30 +030016 private val state = atomic<Any?>(null)
17 private val handler = atomic<JobRelation?>(null)
18
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +030019 override val delegate: Continuation<T>
20 get() = _delegate!!
21
22 override fun takeState(): Any? {
23 val value = state.getAndSet(null)
24 _delegate = null
25 return value
26 }
27
Sergey Mashkov0939f232017-12-22 19:57:30 +030028 fun swap(actual: Continuation<T>): Any {
29 loop@while (true) {
30 val before = state.value
31
32 when (before) {
33 null -> {
34 if (!state.compareAndSet(null, actual)) continue@loop
35 parent(actual.context)
36 return COROUTINE_SUSPENDED
37 }
38 else -> {
39 if (!state.compareAndSet(before, null)) continue@loop
40 if (before is Throwable) throw before
41 @Suppress("UNCHECKED_CAST")
42 return before as T
43 }
44 }
45 }
46 }
47
48 fun close() {
49 resumeWithException(Cancellation)
50 handler.getAndSet(null)?.dispose()
51 }
52
53 private fun parent(context: CoroutineContext) {
54 val job = context[Job]
55 if (handler.value?.job === job) return
56
57 if (job == null) {
58 handler.getAndSet(null)?.dispose()
59 } else {
60 val handler = JobRelation(job)
61 val old = this.handler.getAndUpdate { j ->
62 when {
63 j == null -> handler
64 j.job === job -> return
65 else -> handler
66 }
67 }
68 old?.dispose()
69 }
70 }
71
72 override val context: CoroutineContext
73 get() = (state.value as? Continuation<*>)?.context ?: EmptyCoroutineContext
74
75 override fun resume(value: T) {
76 loop@while(true) {
77 val before = state.value
78
79 when (before) {
80 null -> {
81 if (!state.compareAndSet(null, value)) continue@loop
82 return
83 }
84 is Continuation<*> -> {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +030085 if (!state.compareAndSet(before, value)) continue@loop
Sergey Mashkov0939f232017-12-22 19:57:30 +030086 @Suppress("UNCHECKED_CAST")
87 val cont = before as Continuation<T>
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +030088 _delegate = cont
89 return dispatch(1)
Sergey Mashkov0939f232017-12-22 19:57:30 +030090 }
91 else -> return
92 }
93 }
94 }
95
96 override fun resumeWithException(exception: Throwable) {
97 loop@while(true) {
98 val before = state.value
99
100 when (before) {
101 null -> {
102 if (!state.compareAndSet(null, exception)) continue@loop
103 return
104 }
105 is Continuation<*> -> {
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300106 if (!state.compareAndSet(before, CompletedExceptionally(exception))) continue@loop
Sergey Mashkov0939f232017-12-22 19:57:30 +0300107 @Suppress("UNCHECKED_CAST")
108 val cont = before as Continuation<T>
Sergey Mashkov4d01f7e2018-01-12 18:46:22 +0300109 _delegate = cont
110 return dispatch(1)
Sergey Mashkov0939f232017-12-22 19:57:30 +0300111 }
112 else -> return
113 }
114 }
115 }
116
117 private fun resumeWithExceptionContinuationOnly(job: Job, exception: Throwable) {
118 var c: Continuation<*>? = null
119
120 state.update {
121 if (it !is Continuation<*>) return
122 if (it.context[Job] !== job) return
123 c = it
124 null
125 }
126
127 c!!.resumeWithException(exception)
128 }
129
130 private inner class JobRelation(val job: Job) : CompletionHandler, DisposableHandle {
131 private var handler: DisposableHandle = NonDisposableHandle
132
133 init {
134 val h = job.invokeOnCompletion(onCancelling = true, handler = this)
135 if (job.isActive) {
136 handler = h
137 }
138 }
139
140 override fun invoke(cause: Throwable?) {
141 this@MutableDelegateContinuation.handler.compareAndSet(this, null)
142 dispose()
143
144 if (cause != null) {
145 resumeWithExceptionContinuationOnly(job, cause)
146 }
147 }
148
149 override fun dispose() {
150 handler.dispose()
151 handler = NonDisposableHandle
152 }
153 }
154
155 private companion object {
156 val Cancellation = CancellationException("Continuation terminated")
157 }
158}