blob: 954aae1b48556b3499b5e14de290100a74c86620 [file] [log] [blame]
Roman Elizarov331750b2017-02-15 17:59:17 +03001/*
2 * Copyright 2016-2017 JetBrains s.r.o.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package kotlinx.coroutines.experimental.reactive
18
19import kotlinx.coroutines.experimental.CommonPool
20import kotlinx.coroutines.experimental.TestBase
21import kotlinx.coroutines.experimental.launch
22import kotlinx.coroutines.experimental.runBlocking
23import org.hamcrest.core.IsEqual
24import org.junit.Assert.assertThat
25import org.junit.Assert.assertTrue
26import org.junit.Test
27
28/**
29 * Test emitting multiple values with [publish].
30 */
31class PublisherMultiTest : TestBase() {
32 @Test
33 fun testConcurrentStress() = runBlocking<Unit> {
34 val n = 10_000 * stressTestMultiplier
35 val observable = publish<Int>(CommonPool) {
36 // concurrent emitters (many coroutines)
37 val jobs = List(n) {
38 // launch
39 launch(CommonPool) {
40 send(it)
41 }
42 }
43 jobs.forEach { it.join() }
44 }
45 val resultSet = mutableSetOf<Int>()
Roman Elizarov86349be2017-03-17 16:47:37 +030046 observable.consumeEach {
47 assertTrue(resultSet.add(it))
Roman Elizarov331750b2017-02-15 17:59:17 +030048 }
49 assertThat(resultSet.size, IsEqual(n))
50 }
51}