blob: 59c503c051237801103e81d2a20bdb799074cdfd [file] [log] [blame]
/*
* Copyright 2016-2017 JetBrains s.r.o.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kotlinx.coroutines.experimental.channels
import kotlinx.coroutines.experimental.CancellationException
import kotlinx.coroutines.experimental.CoroutineScope
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.yield
/**
* Sender's interface to [Channel].
*/
public interface SendChannel<in E> {
/**
* Returns `true` if this channel was closed by invocation of [close] and thus
* the [send] attempt throws [ClosedSendChannelException]. If the channel was closed because of the exception, it
* is considered closed, too, but it is called a _failed_ channel. All suspending attempts to send
* an element to a failed channel throw the original [close] cause exception.
*/
public val isClosedForSend: Boolean
/**
* Returns `true` if the channel is full (out of capacity) and the [send] attempt will suspend.
* This function returns `false` for [isClosedForSend] channel.
*/
public val isFull: Boolean
/**
* Adds [element] into to this queue, suspending the caller while this queue [isFull],
* or throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
* It throws the original [close] cause exception if the channel has _failed_.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
* Cancellation of suspended send is *atomic* -- when this function
* throws [CancellationException] it means that the [element] was not sent to this channel.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public suspend fun send(element: E)
/**
* Adds [element] into this queue if it is possible to do so immediately without violating capacity restrictions
* and returns `true`. Otherwise, it returns `false` immediately
* or throws [ClosedSendChannelException] if the channel [isClosedForSend] _normally_.
* It throws the original [close] cause exception if the channel has _failed_.
*/
public fun offer(element: E): Boolean
/**
* Closes this channel with an optional exceptional [cause].
* This is an idempotent operation -- repeated invocations of this function have no effect and return `false`.
* Conceptually, its sends a special close token of this channel. Immediately after invocation of this function
* [isClosedForSend] starts returning `true`. However, [isClosedForReceive][ReceiveChannel.isClosedForReceive]
* on the side of [ReceiveChannel] starts returning `true` only after all previously sent elements
* are received.
*
* A channel that was closed without a [cause], is considered to be _closed normally_.
* A channel that was closed with non-null [cause] is called a _failed channel_. Attempts to send or
* receive on a failed channel throw this cause exception.
*/
public fun close(cause: Throwable? = null): Boolean
}
/**
* Receiver's interface to [Channel].
*/
public interface ReceiveChannel<out E> {
/**
* Returns `true` if this channel was closed by invocation of [close][SendChannel.close] on the [SendChannel]
* side and all previously sent items were already received, so that the [receive] attempt
* throws [ClosedReceiveChannelException]. If the channel was closed because of the exception, it
* is considered closed, too, but it is called a _failed_ channel. All suspending attempts to receive
* an element from a failed channel throw the original [close][SendChannel.close] cause exception.
*/
public val isClosedForReceive: Boolean
/**
* Returns `true` if the channel is empty (contains no elements) and the [receive] attempt will suspend.
* This function returns `false` for [isClosedForReceive] channel.
*/
public val isEmpty: Boolean
/**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
* or throws [ClosedReceiveChannelException] if the channel [isClosedForReceive].
* If the channel was closed because of the exception, it is called a _failed_ channel and this function
* throws the original [close][SendChannel.close] cause exception.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
* Cancellation of suspended receive is *atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public suspend fun receive(): E
/**
* Retrieves and removes the element from this channel suspending the caller while this channel [isEmpty]
* or returns `null` if the channel is [closed][isClosedForReceive] _normally_,
* or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
* Cancellation of suspended receive is *atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public suspend fun receiveOrNull(): E?
/**
* Retrieves and removes the head of this queue, or returns `null` if this queue [isEmpty]
* or is [closed][isClosedForReceive] _normally_,
* or throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public fun poll(): E?
/**
* Returns new iterator to receive elements from this channels using `for` loop.
* Iteration completes normally when the channel is [closed][isClosedForReceive] _normally_ and
* throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*/
public operator fun iterator(): ChannelIterator<E>
}
/**
* Iterator for [ReceiveChannel]. Instances of this interface are *not thread-safe* and shall not be used
* from concurrent coroutines.
*/
public interface ChannelIterator<out E> {
/**
* Returns `true` if the channel has more elements suspending the caller while this channel
* [isEmpty][ReceiveChannel.isEmpty] or returns `false` if the channel
* [isClosedForReceive][ReceiveChannel.isClosedForReceive] _normally_.
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* This function retrieves and removes the element from this channel for the subsequent invocation
* of [next].
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
* Cancellation of suspended receive is *atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public suspend operator fun hasNext(): Boolean
/**
* Retrieves and removes the element from this channel suspending the caller while this channel
* [isEmpty][ReceiveChannel.isEmpty] or throws [ClosedReceiveChannelException] if the channel
* [isClosedForReceive][ReceiveChannel.isClosedForReceive].
* It throws the original [close][SendChannel.close] cause exception if the channel has _failed_.
*
* This suspending function is cancellable. If the [Job] of the current coroutine is completed while this
* function is suspended, this function immediately resumes with [CancellationException].
* Cancellation of suspended receive is *atomic* -- when this function
* throws [CancellationException] it means that the element was not retrieved from this channel.
*
* Note, that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public suspend operator fun next(): E
}
/**
* Channel is a non-blocking primitive for communication between sender using [SendChannel] and receiver using [ReceiveChannel].
* Conceptually, a channel is similar to [BlockingQueue][java.util.concurrent.BlockingQueue],
* but it has suspending operations instead of blocking ones and it can be closed.
*/
public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
/**
* Factory for channels.
*/
public companion object Factory {
/**
* Creates a channel with specified buffer capacity (or without a buffer by default).
*/
public operator fun <E> invoke(capacity: Int = 0): Channel<E> {
check(capacity >= 0) { "Channel capacity cannot be negative, but $capacity was specified" }
return if (capacity == 0)
RendezvousChannel()
else
ArrayChannel(capacity)
}
}
}
/**
* Indicates attempt to [send][SendChannel.send] on [isClosedForSend][SendChannel.isClosedForSend] channel
* that was closed _normally_. A _failed_ channel rethrows the original [close][SendChannel.close] cause
* exception on send attempts.
*/
public class ClosedSendChannelException(message: String?) : IllegalStateException(message)
/**
* Indicates attempt to [receive][ReceiveChannel.receive] on [isClosedForReceive][ReceiveChannel.isClosedForReceive]
* channel that was closed _normally_. A _failed_ channel rethrows the original [close][SendChannel.close] cause
* exception on receive attempts.
*/
public class ClosedReceiveChannelException(message: String?) : NoSuchElementException(message)