Roman Elizarov | 8a4a8e1 | 2017-03-09 19:52:58 +0300 | [diff] [blame] | 1 | /* |
Roman Elizarov | 1f74a2d | 2018-06-29 19:19:45 +0300 | [diff] [blame] | 2 | * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license. |
Roman Elizarov | 8a4a8e1 | 2017-03-09 19:52:58 +0300 | [diff] [blame] | 3 | */ |
| 4 | |
| 5 | // This file was automatically generated from coroutines-guide-reactive.md by Knit tool. Do not edit. |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 6 | package kotlinx.coroutines.rx2.guide.operators04 |
Roman Elizarov | 8a4a8e1 | 2017-03-09 19:52:58 +0300 | [diff] [blame] | 7 | |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 8 | import kotlinx.coroutines.* |
| 9 | import kotlinx.coroutines.reactive.* |
Roman Elizarov | 9fe5f46 | 2018-02-21 19:05:52 +0300 | [diff] [blame] | 10 | import org.reactivestreams.* |
Roman Elizarov | 0950dfa | 2018-07-13 10:33:25 +0300 | [diff] [blame^] | 11 | import kotlin.coroutines.* |
Roman Elizarov | 8a4a8e1 | 2017-03-09 19:52:58 +0300 | [diff] [blame] | 12 | |
Vsevolod Tolstopyatov | bbaf99d | 2018-09-11 15:55:56 +0300 | [diff] [blame] | 13 | fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = GlobalScope.publish<T>(context) { |
Roman Elizarov | 86349be | 2017-03-17 16:47:37 +0300 | [diff] [blame] | 14 | consumeEach { pub -> // for each publisher received on the source channel |
Vsevolod Tolstopyatov | bbaf99d | 2018-09-11 15:55:56 +0300 | [diff] [blame] | 15 | launch { // launch a child coroutine |
Roman Elizarov | 86349be | 2017-03-17 16:47:37 +0300 | [diff] [blame] | 16 | pub.consumeEach { send(it) } // resend all element from this publisher |
Roman Elizarov | 8a4a8e1 | 2017-03-09 19:52:58 +0300 | [diff] [blame] | 17 | } |
| 18 | } |
| 19 | } |
| 20 | |
Vsevolod Tolstopyatov | bbaf99d | 2018-09-11 15:55:56 +0300 | [diff] [blame] | 21 | fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> { |
Roman Elizarov | 8a4a8e1 | 2017-03-09 19:52:58 +0300 | [diff] [blame] | 22 | for (x in start until start + count) { |
| 23 | delay(time) // wait before sending each number |
| 24 | send(x) |
| 25 | } |
| 26 | } |
| 27 | |
Vsevolod Tolstopyatov | bbaf99d | 2018-09-11 15:55:56 +0300 | [diff] [blame] | 28 | fun CoroutineScope.testPub() = publish<Publisher<Int>> { |
| 29 | send(rangeWithInterval(250, 1, 4)) // number 1 at 250ms, 2 at 500ms, 3 at 750ms, 4 at 1000ms |
Roman Elizarov | 8a4a8e1 | 2017-03-09 19:52:58 +0300 | [diff] [blame] | 30 | delay(100) // wait for 100 ms |
Vsevolod Tolstopyatov | bbaf99d | 2018-09-11 15:55:56 +0300 | [diff] [blame] | 31 | send(rangeWithInterval(500, 11, 3)) // number 11 at 600ms, 12 at 1100ms, 13 at 1600ms |
Roman Elizarov | 8a4a8e1 | 2017-03-09 19:52:58 +0300 | [diff] [blame] | 32 | delay(1100) // wait for 1.1s - done in 1.2 sec after start |
| 33 | } |
| 34 | |
| 35 | fun main(args: Array<String>) = runBlocking<Unit> { |
Vsevolod Tolstopyatov | bbaf99d | 2018-09-11 15:55:56 +0300 | [diff] [blame] | 36 | testPub().merge(coroutineContext).consumeEach { println(it) } // print the whole stream |
Roman Elizarov | 8a4a8e1 | 2017-03-09 19:52:58 +0300 | [diff] [blame] | 37 | } |