| /* |
| * Copyright 2016-2017 JetBrains s.r.o. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| // This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit. |
| package kotlinx.coroutines.experimental.guide.select05 |
| |
| import kotlinx.coroutines.experimental.* |
| import kotlinx.coroutines.experimental.channels.* |
| import kotlinx.coroutines.experimental.selects.* |
| import kotlin.coroutines.experimental.* |
| |
| fun switchMapDeferreds(input: ReceiveChannel<Deferred<String>>) = produce<String> { |
| var current = input.receive() // start with first received deferred value |
| while (isActive) { // loop while not cancelled/closed |
| val next = select<Deferred<String>?> { // return next deferred value from this select or null |
| input.onReceiveOrNull { update -> |
| update // replaces next value to wait |
| } |
| current.onAwait { value -> |
| send(value) // send value that current deferred has produced |
| input.receiveOrNull() // and use the next deferred from the input channel |
| } |
| } |
| if (next == null) { |
| println("Channel was closed") |
| break // out of loop |
| } else { |
| current = next |
| } |
| } |
| } |
| |
| fun asyncString(str: String, time: Long) = async { |
| delay(time) |
| str |
| } |
| |
| fun main(args: Array<String>) = runBlocking<Unit> { |
| val chan = Channel<Deferred<String>>() // the channel for test |
| launch(coroutineContext) { // launch printing coroutine |
| for (s in switchMapDeferreds(chan)) |
| println(s) // print each received string |
| } |
| chan.send(asyncString("BEGIN", 100)) |
| delay(200) // enough time for "BEGIN" to be produced |
| chan.send(asyncString("Slow", 500)) |
| delay(100) // not enough time to produce slow |
| chan.send(asyncString("Replace", 100)) |
| delay(500) // give it time before the last one |
| chan.send(asyncString("END", 500)) |
| delay(1000) // give it time to process |
| chan.close() // close the channel ... |
| delay(500) // and wait some time to let it finish |
| } |