/*
 * Copyright 2016-2017 JetBrains s.r.o.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package kotlinx.coroutines.experimental

import kotlinx.atomicfu.*
import kotlinx.coroutines.experimental.internalAnnotations.*
import kotlin.coroutines.experimental.*
import kotlin.coroutines.experimental.intrinsics.*

private const val UNDECIDED = 0
private const val SUSPENDED = 1
private const val RESUMED = 2

/**
 * @suppress **This is unstable API and it is subject to change.**
 */
internal abstract class AbstractContinuation<in T>(
    public final override val delegate: Continuation<T>,
    public final override val resumeMode: Int
) : Continuation<T>, DispatchedTask<T> {

    /*
     * Implementation notes
     *
     * AbstractContinuation is a subset of Job with following limitations:
     * 1) It can have only cancellation listeners
     * 2) It always invokes cancellation listener if it's cancelled (no 'invokeImmediately')
     * 3) It can have at most one cancellation listener
     * 4) Its cancellation listeners cannot be deregistered
     * As a consequence it has much simpler state machine, more lightweight machinery and
     * less dependencies.
     *
     * Cancelling state
     * If useCancellingState is true, then this continuation can have additional cancelling state,
     * which is transition from Active to Cancelled. This is specific state to support withContext(ctx)
     * construction: block in withContext can be cancelled from withing or even before stepping into withContext,
     * but we still want to properly run it (e.g. when it has atomic cancellation mode) and run its completion listener
     * after.
     * During cancellation all pending exceptions are aggregated and thrown during transition to final state
     */

    /* decision state machine

        +-----------+   trySuspend   +-----------+
        | UNDECIDED | -------------> | SUSPENDED |
        +-----------+                +-----------+
              |
              | tryResume
              V
        +-----------+
        |  RESUMED  |
        +-----------+

        Note: both tryResume and trySuspend can be invoked at most once, first invocation wins
     */
    private val _decision = atomic(UNDECIDED)

    /*
       === Internal states ===
       name        state class          public state    description
       ------      ------------         ------------    -----------
       ACTIVE      Active               : Active        active, no listeners
       SINGLE_A    CancellationHandler  : Active        active, one cancellation listener
       CANCELLING  Cancelling           : Active        in the process of cancellation due to cancellation of parent job
       CANCELLED   Cancelled            : Cancelled     cancelled (final state)
       COMPLETED   any                  : Completed     produced some result or threw an exception (final state)
     */
    private val _state = atomic<Any?>(ACTIVE)

    @Volatile
    private var parentHandle: DisposableHandle? = null

    internal val state: Any? get() = _state.value

    public val isActive: Boolean get() = state is NotCompleted

    public val isCompleted: Boolean get() = state !is NotCompleted

    public val isCancelled: Boolean get() = state is CancelledContinuation

    protected open val useCancellingState: Boolean get() = false

    internal fun initParentJobInternal(parent: Job?) {
        check(parentHandle == null)
        if (parent == null) {
            parentHandle = NonDisposableHandle
            return
        }
        parent.start() // make sure the parent is started
        val handle = parent.invokeOnCompletion(onCancelling = true, handler = ChildContinuation(parent, this).asHandler)

        parentHandle = handle
        // now check our state _after_ registering (see updateStateToFinal order of actions)
        if (isCompleted) {
            handle.dispose()
            parentHandle = NonDisposableHandle // release it just in case, to aid GC
        }
    }

    override fun takeState(): Any? = state

    public fun cancel(cause: Throwable?): Boolean {
        loopOnState { state ->
            if (state !is NotCompleted) return false // quit if already complete
            if (state is Cancelling) return false // someone else succeeded
            if (tryCancel(state, cause)) return true
        }
    }

    private fun trySuspend(): Boolean {
        _decision.loop { decision ->
            when (decision) {
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
                RESUMED -> return false
                else -> error("Already suspended")
            }
        }
    }

    private fun tryResume(): Boolean {
        _decision.loop { decision ->
            when (decision) {
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, RESUMED)) return true
                SUSPENDED -> return false
                else -> error("Already resumed")
            }
        }
    }

    @PublishedApi
    internal fun getResult(): Any? {
        if (trySuspend()) return COROUTINE_SUSPENDED
        // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
        val state = this.state
        if (state is CompletedExceptionally) throw state.exception
        return getSuccessfulResult(state)
    }

    override fun resume(value: T) =
        resumeImpl(value, resumeMode)

    override fun resumeWithException(exception: Throwable) =
        resumeImpl(CompletedExceptionally(exception), resumeMode)

    public fun invokeOnCancellation(handler: CompletionHandler) {
        var handleCache: CancellationHandler? = null
        loopOnState { state ->
            when (state) {
                is Active -> {
                    val node = handleCache ?: makeHandler(handler).also { handleCache = it }
                    if (_state.compareAndSet(state, node)) {
                        return
                    }
                }
                is CancellationHandler -> error("It's prohibited to register multiple handlers, tried to register $handler, already has $state")
                is CancelledContinuation -> {
                    /*
                     * Continuation is complete, invoke directly.
                     * NOTE: multiple invokeOnCancellation calls with different handlers are allowed on cancelled continuation.
                     * It's inconsistent with running continuation, but currently, we have no mechanism to check
                     * whether any handler was registered during continuation lifecycle without additional overhead.
                     * This may be changed in the future.
                     *
                     * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension,
                     * because we play type tricks on Kotlin/JS and handler is not necessarily a function there
                     */
                    handler.invokeIt((state as? CompletedExceptionally)?.cause)
                    return
                }
                is Cancelling -> error("Cancellation handlers for continuations with 'Cancelling' state are not supported")
                else -> return
            }
        }
    }

    private fun makeHandler(handler: CompletionHandler): CancellationHandlerImpl<*> {
        if (handler is CancellationHandlerImpl<*>) {
            require(handler.continuation === this) { "Handler has non-matching continuation ${handler.continuation}, current: $this" }
            return handler
        }

        return InvokeOnCancel(this, handler)
    }

    private fun tryCancel(state: NotCompleted, cause: Throwable?): Boolean {
        if (useCancellingState) {
            require(state !is CancellationHandler) { "Invariant: 'Cancelling' state and cancellation handlers cannot be used together" }
            return _state.compareAndSet(state, Cancelling(CancelledContinuation(this, cause)))
        }

        return updateStateToFinal(state, CancelledContinuation(this, cause), mode = MODE_ATOMIC_DEFAULT)
    }

    private fun onCompletionInternal(mode: Int) {
        if (tryResume()) return // completed before getResult invocation -- bail out
        // otherwise, getResult has already commenced, i.e. completed later or in other thread
        dispatch(mode)
    }

    protected inline fun loopOnState(block: (Any?) -> Unit): Nothing {
        while (true) {
            block(state)
        }
    }

    protected fun resumeImpl(proposedUpdate: Any?, resumeMode: Int) {
        loopOnState { state ->
            when (state) {
                is Cancelling -> { // withContext() support
                    /*
                     * If already cancelled block is resumed with non-exception,
                     * resume it with cancellation exception.
                     * E.g.
                     * ```
                     * val value = withContext(ctx) {
                     *   outerJob.cancel() // -> cancelling
                     *   42 // -> cancelled
                     * }
                     * ```
                     * should throw cancellation exception instead of returning 42
                     */
                    if (proposedUpdate !is CompletedExceptionally) {
                        val update = state.cancel
                        if (updateStateToFinal(state, update, resumeMode)) return
                    } else {
                        /*
                         * If already cancelled block is resumed with an exception,
                         * then we should properly merge them to avoid information loss
                         */
                        val update: CompletedExceptionally

                        /*
                         * Proposed update is another CancellationException.
                         * e.g.
                         * ```
                         * T1: ctxJob.cancel(e1) // -> cancelling
                         * T2:
                         * withContext(ctx) {
                         *   // -> resumed with cancellation exception
                         * }
                         * ```
                         */
                        if (proposedUpdate.exception is CancellationException) {
                            // Keep original cancellation cause and try add to suppressed exception from proposed cancel
                            update = state.cancel
                            coerceWithCancellation(state, proposedUpdate, update)
                        } else {
                            /*
                             * Proposed update is exception => transition to terminal state
                             * E.g.
                             * ```
                             * withContext(ctx) {
                             *   outerJob.cancel() // -> cancelling
                             *   throw Exception() // -> completed exceptionally
                             * }
                             * ```
                             */
                            val exception = proposedUpdate.exception
                            // TODO clashes with await all
                            val currentException = state.cancel.cause
                            // Add to suppressed if original cancellation differs from proposed exception
                            if (currentException != null && (currentException !is CancellationException || currentException.cause !== exception)) {
                                exception.addSuppressedThrowable(currentException)
                            }

                            update = CompletedExceptionally(exception)
                        }

                        if (updateStateToFinal(state, update, resumeMode)) {
                            return
                        }
                    }
                }

                is NotCompleted -> {
                    if (updateStateToFinal(state, proposedUpdate, resumeMode)) return
                }
                is CancelledContinuation -> {
                    if (proposedUpdate is NotCompleted || proposedUpdate is CompletedExceptionally) {
                        error("Unexpected update, state: $state, update: $proposedUpdate")
                    }
                    // Coroutine is dispatched normally (e.g.via `delay()`) after cancellation
                    return
                }
                else -> error("Already resumed, but proposed with update $proposedUpdate")
            }
        }
    }

    // Coerce current cancelling state with proposed cancellation
    private fun coerceWithCancellation(state: Cancelling, proposedUpdate: CompletedExceptionally, update: CompletedExceptionally) {
        val originalCancellation = state.cancel
        val originalException = originalCancellation.exception
        val updateCause = proposedUpdate.cause

        // Cause of proposed update is present and differs from one in current state TODO clashes with await all
        val isSameCancellation = originalCancellation.exception is CancellationException
                && originalException.cause === updateCause?.cause

        if (!isSameCancellation && updateCause !== null && originalException.cause !== updateCause) {
            update.exception.addSuppressedThrowable(updateCause)
        }
    }

    /**
     * Tries to make transition from active to cancelled or completed state and invokes cancellation handler if necessary
     */
    private fun updateStateToFinal(expect: NotCompleted, proposedUpdate: Any?, mode: Int): Boolean {
        if (!tryUpdateStateToFinal(expect, proposedUpdate)) {
            return false
        }

        completeStateUpdate(expect, proposedUpdate, mode)
        return true
    }

    protected fun tryUpdateStateToFinal(expect: NotCompleted, update: Any?): Boolean {
        require(update !is NotCompleted) // only NotCompleted -> completed transition is allowed
        if (!_state.compareAndSet(expect, update)) return false
        // Unregister from parent job
        parentHandle?.let {
            it.dispose() // volatile read parentHandle _after_ state was updated
            parentHandle = NonDisposableHandle // release it just in case, to aid GC
        }
        return true // continues in completeStateUpdate
    }

    protected fun completeStateUpdate(expect: NotCompleted, update: Any?, mode: Int) {
        val exceptionally = update as? CompletedExceptionally
        onCompletionInternal(mode)

        // Invoke cancellation handlers only if necessary
        if (update is CancelledContinuation && expect is CancellationHandler) {
            try {
                expect.invoke(exceptionally?.cause)
            } catch (ex: Throwable) {
                handleException(CompletionHandlerException("Exception in completion handler $expect for $this", ex))
            }
        }
    }

    private fun handleException(exception: Throwable) {
        handleCoroutineException(context, exception)
    }

    // For nicer debugging
    public override fun toString(): String =
        "${nameString()}{${stateString()}}@$hexAddress"

    protected open fun nameString(): String = classSimpleName

    private fun stateString(): String {
        val state = this.state
        return when (state) {
            is NotCompleted ->"Active"
            is CancelledContinuation -> "Cancelled"
            is CompletedExceptionally -> "CompletedExceptionally"
            else -> "Completed"
        }
    }

}

// Marker for active continuation
internal interface NotCompleted

private class Active : NotCompleted
private val ACTIVE: Active = Active()

// In progress of cancellation
internal class Cancelling(@JvmField val cancel: CancelledContinuation) : NotCompleted

internal abstract class CancellationHandlerImpl<out C : AbstractContinuation<*>>(@JvmField val continuation: C) :
    CancellationHandler(), NotCompleted

// Wrapper for lambdas, for the performance sake CancellationHandler can be subclassed directly
private class InvokeOnCancel( // Clashes with InvokeOnCancellation
    continuation: AbstractContinuation<*>,
    private val handler: CompletionHandler
) : CancellationHandlerImpl<AbstractContinuation<*>>(continuation) {

    override fun invoke(cause: Throwable?) {
        handler.invoke(cause)
    }

    override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]"
}
