blob: 2f8ce9ac42f726262f177c286c42cbf0e3dd7897 [file] [log] [blame]
SokolovaMaria1dcfd972019-08-09 17:35:14 +03001package kotlinx.coroutines.reactor
2
3import kotlinx.coroutines.flow.*
4import kotlinx.coroutines.reactive.*
5import kotlinx.coroutines.runBlocking
6import org.junit.Test
7import reactor.core.publisher.Mono
8import reactor.util.context.Context
9import kotlin.test.assertEquals
10
11class FlowAsFluxTest {
12 @Test
13 fun testFlowToFluxContextPropagation() = runBlocking<Unit> {
14 val flux = flow<String> {
15 (1..4).forEach { i -> emit(m(i).awaitFirst()) }
16 } .asFlux()
17 .subscriberContext(Context.of(1, "1"))
18 .subscriberContext(Context.of(2, "2", 3, "3", 4, "4"))
19 var i = 0
20 flux.subscribe { str -> i++; println(str); assertEquals(str, i.toString()) }
21 }
22
23 private fun m(i: Int): Mono<String> = mono {
24 val ctx = coroutineContext[ReactorContext]?.context
25 ctx?.getOrDefault(i, "noValue")
26 }
27}