blob: 4561a103593ce772fb864f6df3b036424220b159 [file] [log] [blame]
Roman Elizarov44e3ba52017-08-01 22:01:31 -07001/*
Roman Elizarov1f74a2d2018-06-29 19:19:45 +03002 * Copyright 2016-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarov44e3ba52017-08-01 22:01:31 -07003 */
4
5package kotlinx.coroutines.experimental.quasar
6
Roman Elizarov9fe5f462018-02-21 19:05:52 +03007import co.paralleluniverse.fibers.*
8import co.paralleluniverse.strands.*
9import co.paralleluniverse.strands.dataflow.*
Roman Elizarovffc61ae2017-10-26 19:29:52 +030010import kotlinx.coroutines.experimental.*
Roman Elizarov9fe5f462018-02-21 19:05:52 +030011import org.junit.*
12import java.util.concurrent.*
13import kotlin.coroutines.experimental.*
Roman Elizarov44e3ba52017-08-01 22:01:31 -070014
15class QuasarTest : TestBase() {
16 @Before
17 fun setup() {
18 ignoreLostThreads(
19 "FiberTimedScheduler-default-fiber-pool",
20 "ForkJoinPool-default-fiber-pool-worker-",
21 "Timer-")
22 }
23
24 @Test
25 fun testRunSuspendable() = runBlocking<Unit> {
26 expect(1)
27 val started = CompletableDeferred<Unit>() // Kotlin's event
28 val x = Val<String>() // Quasar's data flow
29 launch(coroutineContext) {
30 started.await() // await Quasar's scheduler
31 expect(3) // will get scheduled when runSuspendable suspends
32 x.set("OK")
33 }
34 val result = runSuspendable(SuspendableCallable {
35 expect(2)
36 started.complete(Unit) // signal that we've started
37 x.get(10, TimeUnit.SECONDS) // will get suspended
38 })
39 finish(4)
40 check(result == "OK")
41 }
42
43 @Test
44 fun testRunFiberBlocking() = runBlocking {
45 expect(1)
46 val started = CompletableDeferred<Unit>() // Kotlin's event
47 val result = CompletableDeferred<String>() // result goes here
48 val fiber = object : Fiber<String>() {
49 @Throws(SuspendExecution::class)
50 override fun run(): String {
51 expect(3)
52 started.complete(Unit) // signal that fiber is started
53 // block fiber on suspendable await
54 val value = runFiberBlocking {
55 result.await()
56 }
57 expect(5)
58 return value
59 }
60 }
61 fiber.start()
62 expect(2)
63 started.await() // wait fiber to start
64 expect(4)
65 result.complete("OK") // send Ok to fiber
66 val answer = runSuspendable(SuspendableCallable {
67 fiber.get()
68 })
69 finish(6)
70 check(answer == "OK")
71 }
72}