blob: 148c28064cc70a4c08980beabe1ad92a9affecda [file] [log] [blame]
Roman Elizarova5e653f2017-02-13 13:49:55 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarova5e653f2017-02-13 13:49:55 +03003 */
4
Roman Elizarov0950dfa2018-07-13 10:33:25 +03005@file:UseExperimental(ExperimentalTypeInference::class)
Roman Elizarova5e653f2017-02-13 13:49:55 +03006
Roman Elizarov0950dfa2018-07-13 10:33:25 +03007package kotlinx.coroutines.channels
8
9import kotlinx.coroutines.*
10import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
11import kotlin.coroutines.*
12import kotlin.experimental.*
Roman Elizarova5e653f2017-02-13 13:49:55 +030013
14/**
Roman Elizarovc32579e2018-09-09 19:21:43 +030015 * Scope for [produce][CoroutineScope.produce] coroutine builder.
Roman Elizarov27b8f452018-09-20 21:23:41 +030016 *
17 * **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect
18 * to cancellation and error handling may change in the future.
Roman Elizarova5e653f2017-02-13 13:49:55 +030019 */
Roman Elizarov27b8f452018-09-20 21:23:41 +030020@ExperimentalCoroutinesApi
Roman Elizarovf1d9a4e2017-04-05 10:53:14 +030021public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
Roman Elizarova5e653f2017-02-13 13:49:55 +030022 /**
23 * A reference to the channel that this coroutine [sends][send] elements to.
24 * It is provided for convenience, so that the code in the coroutine can refer
25 * to the channel as `channel` as apposed to `this`.
26 * All the [SendChannel] functions on this interface delegate to
27 * the channel instance returned by this function.
28 */
Roman Elizarovf1d9a4e2017-04-05 10:53:14 +030029 val channel: SendChannel<E>
Roman Elizarova5e653f2017-02-13 13:49:55 +030030}
31
32/**
Roman Elizarova5e653f2017-02-13 13:49:55 +030033 * Launches new coroutine to produce a stream of values by sending them to a channel
Roman Elizarovb555d912017-08-17 21:01:33 +030034 * and returns a reference to the coroutine as a [ReceiveChannel]. This resulting
Roman Elizarovc0e19f82017-02-27 11:59:14 +030035 * object can be used to [receive][ReceiveChannel.receive] elements produced by this coroutine.
Roman Elizarova5e653f2017-02-13 13:49:55 +030036 *
37 * The scope of the coroutine contains [ProducerScope] interface, which implements
38 * both [CoroutineScope] and [SendChannel], so that coroutine can invoke
39 * [send][SendChannel.send] directly. The channel is [closed][SendChannel.close]
40 * when the coroutine completes.
Roman Elizarovb555d912017-08-17 21:01:33 +030041 * The running coroutine is cancelled when its receive channel is [cancelled][ReceiveChannel.cancel].
Roman Elizarova5e653f2017-02-13 13:49:55 +030042 *
Vsevolod Tolstopyatov79414ec2018-08-30 16:50:56 +030043 * Coroutine context is inherited from a [CoroutineScope], additional context elements can be specified with [context] argument.
Roman Elizarovdc29b072018-09-11 18:42:03 +030044 * If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [Dispatchers.Default] is used.
Vsevolod Tolstopyatov79414ec2018-08-30 16:50:56 +030045 * The parent job is inherited from a [CoroutineScope] as well, but it can also be overridden
46 * with corresponding [coroutineContext] element.
Roman Elizarova5e653f2017-02-13 13:49:55 +030047 *
48 * Uncaught exceptions in this coroutine close the channel with this exception as a cause and
49 * the resulting channel becomes _failed_, so that any attempt to receive from such a channel throws exception.
50 *
Roman Elizarov5633f912018-09-23 19:08:36 +030051 * The kind of the resulting channel depends on the specified [capacity] parameter.
52 * See [Channel] interface documentation for details.
Roman Elizarov89f8ff72018-03-14 13:39:03 +030053 *
Roman Elizarova5e653f2017-02-13 13:49:55 +030054 * See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
Roman Elizarovc0e19f82017-02-27 11:59:14 +030055 *
Roman Elizarov27b8f452018-09-20 21:23:41 +030056 * **Note: This is an experimental api.** Behaviour of producers that work as children in a parent scope with respect
57 * to cancellation and error handling may change in the future.
58 *
Roman Elizarovdc29b072018-09-11 18:42:03 +030059 * @param context additional to [CoroutineScope.coroutineContext] context of the coroutine.
Roman Elizarovc0d559b2017-09-28 14:27:05 +030060 * @param capacity capacity of the channel's buffer (no buffer by default).
61 * @param block the coroutine code.
Roman Elizarova5e653f2017-02-13 13:49:55 +030062 */
Roman Elizarov27b8f452018-09-20 21:23:41 +030063@ExperimentalCoroutinesApi
Roman Elizarov0950dfa2018-07-13 10:33:25 +030064@BuilderInference
Roman Elizarov27b8f452018-09-20 21:23:41 +030065public fun <E> CoroutineScope.produce(
66 context: CoroutineContext = EmptyCoroutineContext,
67 capacity: Int = 0,
Roman Elizarov0950dfa2018-07-13 10:33:25 +030068 @BuilderInference block: suspend ProducerScope<E>.() -> Unit
Roman Elizarov27b8f452018-09-20 21:23:41 +030069): ReceiveChannel<E> {
70 val channel = Channel<E>(capacity)
71 val newContext = newCoroutineContext(context)
72 val coroutine = ProducerCoroutine(newContext, channel)
73 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
74 return coroutine
75}
76
77/**
78 * @suppress **This an internal API and should not be used from general code.**
79 * onCompletion parameter will be redesigned.
80 */
Roman Elizarov0950dfa2018-07-13 10:33:25 +030081@BuilderInference
Roman Elizarov27b8f452018-09-20 21:23:41 +030082@InternalCoroutinesApi
Vsevolod Tolstopyatov79414ec2018-08-30 16:50:56 +030083public fun <E> CoroutineScope.produce(
84 context: CoroutineContext = EmptyCoroutineContext,
Roman Elizarova5e653f2017-02-13 13:49:55 +030085 capacity: Int = 0,
Roman Elizarov55a66ac2018-03-12 20:15:07 +030086 onCompletion: CompletionHandler? = null,
Roman Elizarov0950dfa2018-07-13 10:33:25 +030087 @BuilderInference block: suspend ProducerScope<E>.() -> Unit
Roman Elizarovb555d912017-08-17 21:01:33 +030088): ReceiveChannel<E> {
Roman Elizarova5e653f2017-02-13 13:49:55 +030089 val channel = Channel<E>(capacity)
Vsevolod Tolstopyatov79414ec2018-08-30 16:50:56 +030090 val newContext = newCoroutineContext(context)
Roman Elizarove8f694e2017-11-28 10:12:00 +030091 val coroutine = ProducerCoroutine(newContext, channel)
Roman Elizarov55a66ac2018-03-12 20:15:07 +030092 if (onCompletion != null) coroutine.invokeOnCompletion(handler = onCompletion)
Roman Elizarov2adf8bc2018-01-24 20:09:57 +030093 coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
Roman Elizarove8f694e2017-11-28 10:12:00 +030094 return coroutine
Roman Elizarova5e653f2017-02-13 13:49:55 +030095}
96
Roman Elizarov9faa61e2018-02-22 23:20:28 +030097private class ProducerCoroutine<E>(
98 parentContext: CoroutineContext, channel: Channel<E>
Vsevolod Tolstopyatovd92b0fa2018-10-08 19:41:18 +030099) : ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E> {
Vsevolod Tolstopyatov79414ec2018-08-30 16:50:56 +0300100 override val isActive: Boolean
Vsevolod Tolstopyatovd92b0fa2018-10-08 19:41:18 +0300101 get() = super.isActive
Vsevolod Tolstopyatov79414ec2018-08-30 16:50:56 +0300102
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300103 override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
104 val cause = (state as? CompletedExceptionally)?.cause
Roman Elizarov6685fd02018-09-25 13:23:53 +0300105 val processed = _channel.close(cause)
Roman Elizarovecbc85c2018-09-14 12:52:50 +0300106 if (cause != null && !processed && suppressed) handleExceptionViaHandler(context, cause)
Roman Elizarov9faa61e2018-02-22 23:20:28 +0300107 }
108}