blob: ae9875670a7c1d0edf5e2f564a129be38af0c2be [file] [log] [blame]
Roman Elizarov3aed4ee2017-03-06 12:21:05 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.channels
18
Roman Elizarov932e8602017-06-21 17:21:37 +030019import kotlinx.coroutines.experimental.selects.ALREADY_SELECTED
Roman Elizarov3aed4ee2017-03-06 12:21:05 +030020import kotlinx.coroutines.experimental.selects.SelectInstance
21
22/**
23 * Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
24 * so that the receiver always gets the most recently sent element.
25 * Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received,
26 * while previously sent elements **are lost**.
27 * Sender to this channel never suspends and [offer] always returns `true`.
28 *
Roman Elizarovf2a710a2017-07-21 18:33:59 +030029 * This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.
30 *
Roman Elizarov3aed4ee2017-03-06 12:21:05 +030031 * This implementation is fully lock-free.
32 */
33public open class ConflatedChannel<E> : AbstractChannel<E>() {
34 protected final override val isBufferAlwaysEmpty: Boolean get() = true
35 protected final override val isBufferEmpty: Boolean get() = true
36 protected final override val isBufferAlwaysFull: Boolean get() = false
37 protected final override val isBufferFull: Boolean get() = false
38
Roman Elizarove6e8ce82017-06-05 17:04:39 +030039 /**
40 * This implementation conflates last sent item when channel is closed.
41 * @suppress **This is unstable API and it is subject to change.**
42 */
43 override fun onClosed(closed: Closed<E>) {
44 conflatePreviousSendBuffered(closed)
45 }
46
Roman Elizarov3aed4ee2017-03-06 12:21:05 +030047 // result is always `OFFER_SUCCESS | Closed`
48 protected override fun offerInternal(element: E): Any {
49 while (true) {
50 val result = super.offerInternal(element)
51 when {
52 result === OFFER_SUCCESS -> return OFFER_SUCCESS
53 result === OFFER_FAILED -> { // try to buffer
Roman Elizarove6e8ce82017-06-05 17:04:39 +030054 val sendResult = sendConflated(element)
55 when (sendResult) {
56 null -> return OFFER_SUCCESS
57 is Closed<*> -> return sendResult
58 }
59 // otherwise there was receiver in queue, retry super.offerInternal
Roman Elizarov3aed4ee2017-03-06 12:21:05 +030060 }
61 result is Closed<*> -> return result
62 else -> error("Invalid offerInternal result $result")
63 }
64 }
65 }
66
67 // result is always `ALREADY_SELECTED | OFFER_SUCCESS | Closed`.
68 protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any {
69 while (true) {
70 val result = if (hasReceiveOrClosed)
71 super.offerSelectInternal(element, select) else
72 (select.performAtomicTrySelect(describeSendConflated(element)) ?: OFFER_SUCCESS)
73 when {
74 result === ALREADY_SELECTED -> return ALREADY_SELECTED
75 result === OFFER_SUCCESS -> return OFFER_SUCCESS
76 result === OFFER_FAILED -> {} // retry
77 result is Closed<*> -> return result
78 else -> error("Invalid result $result")
79 }
80 }
81 }
82}