blob: 53df47e5364cc0c9d4db3624a396b0ffd2471848 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov331750b2017-02-15 17:59:17 +03003 */
4
5package kotlinx.coroutines.experimental.reactive
6
7import kotlinx.coroutines.experimental.CommonPool
8import kotlinx.coroutines.experimental.TestBase
9import kotlinx.coroutines.experimental.launch
10import kotlinx.coroutines.experimental.runBlocking
11import org.hamcrest.core.IsEqual
12import org.junit.Assert.assertThat
13import org.junit.Assert.assertTrue
14import org.junit.Test
15
16/**
17 * Test emitting multiple values with [publish].
18 */
19class PublisherMultiTest : TestBase() {
20 @Test
21 fun testConcurrentStress() = runBlocking<Unit> {
22 val n = 10_000 * stressTestMultiplier
23 val observable = publish<Int>(CommonPool) {
24 // concurrent emitters (many coroutines)
25 val jobs = List(n) {
26 // launch
27 launch(CommonPool) {
28 send(it)
29 }
30 }
31 jobs.forEach { it.join() }
32 }
33 val resultSet = mutableSetOf<Int>()
Roman Elizarov86349be2017-03-17 16:47:37 +030034 observable.consumeEach {
35 assertTrue(resultSet.add(it))
Roman Elizarov331750b2017-02-15 17:59:17 +030036 }
37 assertThat(resultSet.size, IsEqual(n))
38 }
39}