blob: 0208febc48bfdfda5df7dbde5ad050c163184531 [file] [log] [blame]
Roman Elizarova3497ee2017-08-10 20:25:36 +03001/*
Roman Elizarov82d2f792018-12-19 13:13:06 +03002 * Copyright 2017-2018 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
Roman Elizarova3497ee2017-08-10 20:25:36 +03003 */
4
5@file:Suppress("RedundantVisibilityModifier")
6
7package kotlinx.atomicfu
8
Roman Elizarovc201b132017-08-16 14:40:16 +03009import java.util.*
Roman Elizarov746a5d82019-08-23 14:56:16 +030010import java.util.concurrent.atomic.*
11import java.util.concurrent.locks.*
12import kotlin.coroutines.*
13import kotlin.coroutines.intrinsics.*
Roman Elizarova3497ee2017-08-10 20:25:36 +030014
15private const val PAUSE_EVERY_N_STEPS = 1000
Roman Elizarov67e4a432017-08-17 10:18:42 +030016private const val STALL_LIMIT_MS = 15_000L // 15s
17private const val SHUTDOWN_CHECK_MS = 10L // 10ms
Roman Elizarova3497ee2017-08-10 20:25:36 +030018
Roman Elizarov5ea61fd2017-08-15 10:53:27 +030019private const val STATUS_DONE = Int.MAX_VALUE
20
Roman Elizarov67e4a432017-08-17 10:18:42 +030021private const val MAX_PARK_NANOS = 1_000_000L // part for at most 1ms just in case of loosing unpark signal
Roman Elizarov46f988a2017-08-16 19:33:13 +030022
Roman Elizarova3497ee2017-08-10 20:25:36 +030023/**
24 * Environment for performing lock-freedom tests for lock-free data structures
25 * that are written with [atomic] variables.
26 */
27public open class LockFreedomTestEnvironment(
Roman Elizarov746a5d82019-08-23 14:56:16 +030028 private val name: String,
29 private val allowSuspendedThreads: Int = 0
Roman Elizarova3497ee2017-08-10 20:25:36 +030030) {
31 private val interceptor = Interceptor()
32 private val threads = mutableListOf<TestThread>()
Roman Elizarova3497ee2017-08-10 20:25:36 +030033 private val performedOps = LongAdder()
34 private val uncaughtException = AtomicReference<Throwable?>()
Roman Elizarov5ea61fd2017-08-15 10:53:27 +030035 private var started = false
36 private var performedResumes = 0
Roman Elizarova3497ee2017-08-10 20:25:36 +030037
Roman Elizarov746a5d82019-08-23 14:56:16 +030038 @Volatile
39 private var completed = false
40 private val onCompletion = mutableListOf<() -> Unit>()
41
Roman Elizarova3497ee2017-08-10 20:25:36 +030042 private val ueh = Thread.UncaughtExceptionHandler { t, e ->
43 synchronized(System.out) {
44 println("Uncaught exception in thread $t")
45 e.printStackTrace(System.out)
46 uncaughtException.compareAndSet(null, e)
47 }
48 }
49
Roman Elizarov5ea61fd2017-08-15 10:53:27 +030050 // status < 0 - inv paused thread id
51 // status >= 0 - no. of performed resumes so far (==last epoch)
52 // status == STATUS_DONE - done working
53 private val status = AtomicInteger()
Roman Elizarova3497ee2017-08-10 20:25:36 +030054 private val globalPauseProgress = AtomicInteger()
Roman Elizarov746a5d82019-08-23 14:56:16 +030055 private val suspendedThreads = ArrayList<TestThread>()
Roman Elizarova3497ee2017-08-10 20:25:36 +030056
Roman Elizarov67e4a432017-08-17 10:18:42 +030057 @Volatile
58 private var isActive = true
Roman Elizarova3497ee2017-08-10 20:25:36 +030059
Roman Elizarov67e4a432017-08-17 10:18:42 +030060 // ---------- API ----------
Roman Elizarov5ea61fd2017-08-15 10:53:27 +030061
Roman Elizarova3497ee2017-08-10 20:25:36 +030062 /**
63 * Starts lock-freedom test for a given duration in seconds,
64 * invoking [progress] every second (it will be invoked `seconds + 1` times).
65 */
66 public fun performTest(seconds: Int, progress: () -> Unit = {}) {
Roman Elizarov67e4a432017-08-17 10:18:42 +030067 check(isActive) { "Can perform test at most once on this instance" }
Roman Elizarova3497ee2017-08-10 20:25:36 +030068 println("=== $name")
Roman Elizarov746a5d82019-08-23 14:56:16 +030069 val minThreads = 2 + allowSuspendedThreads
70 check(threads.size >= minThreads) { "Must define at least $minThreads test threads" }
Roman Elizarova3497ee2017-08-10 20:25:36 +030071 lockAndSetInterceptor(interceptor)
72 started = true
Roman Elizarova3497ee2017-08-10 20:25:36 +030073 var nextTime = System.currentTimeMillis()
74 threads.forEach { thread ->
75 thread.setUncaughtExceptionHandler(ueh)
76 thread.lastOpTime = nextTime
77 thread.start()
78 }
Roman Elizarov746a5d82019-08-23 14:56:16 +030079 try {
80 var second = 0
81 while (uncaughtException.get() == null) {
82 waitUntil(nextTime)
83 println("--- $second: Performed ${performedOps.sum()} operations${resumeStr()}")
84 progress()
85 checkStalled()
86 if (++second > seconds) break
87 nextTime += 1000L
88 }
89 } finally {
90 complete()
Roman Elizarova3497ee2017-08-10 20:25:36 +030091 }
Roman Elizarov746a5d82019-08-23 14:56:16 +030092 println("------ Done with ${performedOps.sum()} operations${resumeStr()}")
93 progress()
94 }
95
96 private fun complete() {
Roman Elizarov23e5cc12019-09-05 11:51:23 +030097 val activeNonPausedThreads: MutableMap<TestThread, Array<StackTraceElement>> = mutableMapOf()
Roman Elizarov67e4a432017-08-17 10:18:42 +030098 val shutdownDeadline = System.currentTimeMillis() + STALL_LIMIT_MS
Roman Elizarov746a5d82019-08-23 14:56:16 +030099 try {
100 completed = true
Roman Elizarov98815b12019-09-03 11:08:28 +0300101 // perform custom completion blocks. For testing of things like channels, these custom completion
102 // blocks close all the channels, so that all suspended coroutines shall get resumed.
Roman Elizarov746a5d82019-08-23 14:56:16 +0300103 onCompletion.forEach { it() }
Roman Elizarov98815b12019-09-03 11:08:28 +0300104 // signal shutdown to all threads (non-paused threads will terminate)
Roman Elizarov746a5d82019-08-23 14:56:16 +0300105 isActive = false
Roman Elizarov98815b12019-09-03 11:08:28 +0300106 // wait for threads to terminate
Roman Elizarov746a5d82019-08-23 14:56:16 +0300107 while (System.currentTimeMillis() < shutdownDeadline) {
Roman Elizarov98815b12019-09-03 11:08:28 +0300108 // Check all threads while shutting down:
109 // All terminated threads are considered to make progress for the purpose of resuming stalled ones
Roman Elizarov23e5cc12019-09-05 11:51:23 +0300110 activeNonPausedThreads.clear()
Roman Elizarov98815b12019-09-03 11:08:28 +0300111 for (t in threads) {
112 when {
113 !t.isAlive -> t.makeProgress(getPausedEpoch()) // not alive - makes progress
114 t.index.inv() == status.get() -> {} // active, paused -- skip
Roman Elizarov23e5cc12019-09-05 11:51:23 +0300115 else -> {
116 val stackTrace = t.stackTrace
117 if (t.isAlive) activeNonPausedThreads[t] = stackTrace
118 }
Roman Elizarov98815b12019-09-03 11:08:28 +0300119 }
120 }
Roman Elizarov23e5cc12019-09-05 11:51:23 +0300121 if (activeNonPausedThreads.isEmpty()) break
Roman Elizarov746a5d82019-08-23 14:56:16 +0300122 checkStalled()
123 Thread.sleep(SHUTDOWN_CHECK_MS)
124 }
Roman Elizarov23e5cc12019-09-05 11:51:23 +0300125 activeNonPausedThreads.forEach { (t, stackTrack) ->
126 println("=== $t had failed to shutdown in time")
127 stackTrack.forEach { println("\tat $it") }
128 }
Roman Elizarov746a5d82019-08-23 14:56:16 +0300129 } finally {
130 shutdown(shutdownDeadline)
Roman Elizarov67e4a432017-08-17 10:18:42 +0300131 }
Roman Elizarov23e5cc12019-09-05 11:51:23 +0300132 // if no other exception was throws & we had threads that did not shut down -- still fails
133 if (activeNonPausedThreads.isNotEmpty()) error("Some threads had failed to shutdown in time")
Roman Elizarov746a5d82019-08-23 14:56:16 +0300134 }
135
136 private fun shutdown(shutdownDeadline: Long) {
Roman Elizarov98815b12019-09-03 11:08:28 +0300137 // forcefully unpause paused threads to shut them down (if any left)
Roman Elizarov5ea61fd2017-08-15 10:53:27 +0300138 val curStatus = status.getAndSet(STATUS_DONE)
139 if (curStatus < 0) LockSupport.unpark(threads[curStatus.inv()])
Roman Elizarov67e4a432017-08-17 10:18:42 +0300140 threads.forEach {
141 val remaining = shutdownDeadline - System.currentTimeMillis()
142 if (remaining > 0) it.join(remaining)
143 }
Roman Elizarov746a5d82019-08-23 14:56:16 +0300144 // abort waiting threads (if still any left)
145 threads.forEach { it.abortWait() }
Roman Elizarov67e4a432017-08-17 10:18:42 +0300146 // cleanup & be done
Roman Elizarova3497ee2017-08-10 20:25:36 +0300147 unlockAndResetInterceptor(interceptor)
148 uncaughtException.get()?.let { throw it }
149 threads.find { it.isAlive }?.let { dumpThreadsError("A thread is still alive: $it")}
Roman Elizarov4684e692017-08-14 19:06:34 +0300150 }
151
Roman Elizarov67e4a432017-08-17 10:18:42 +0300152 private fun checkStalled() {
153 val stallLimit = System.currentTimeMillis() - STALL_LIMIT_MS
154 val stalled = threads.filter { it.lastOpTime < stallLimit }
155 if (stalled.isNotEmpty()) dumpThreadsError("Progress stalled in threads ${stalled.map { it.name }}")
156 }
157
Roman Elizarov4684e692017-08-14 19:06:34 +0300158 private fun resumeStr(): String {
159 val resumes = performedResumes
160 return if (resumes == 0) "" else " (pause/resumes $resumes)"
Roman Elizarova3497ee2017-08-10 20:25:36 +0300161 }
162
163 private fun waitUntil(nextTime: Long) {
164 while (true) {
165 val curTime = System.currentTimeMillis()
166 if (curTime >= nextTime) break
167 Thread.sleep(nextTime - curTime)
168 }
169 }
170
171 private fun dumpThreadsError(message: String) : Nothing {
172 val traces = threads.associate { it to it.stackTrace }
173 println("!!! $message")
174 println("=== Dumping live thread stack traces")
175 for ((thread, trace) in traces) {
176 if (trace.isEmpty()) continue
177 println("Thread \"${thread.name}\" ${thread.state}")
178 for (t in trace) println("\tat ${t.className}.${t.methodName}(${t.fileName}:${t.lineNumber})")
179 println()
180 }
181 println("===")
182 error(message)
183 }
184
185 /**
Roman Elizarov746a5d82019-08-23 14:56:16 +0300186 * Returns true when test was completed.
187 * Sets to true before calling [onCompletion] blocks.
188 */
189 public val isCompleted: Boolean get() = completed
190
191 /**
192 * Performs a given block of code on test's completion
193 */
194 public fun onCompletion(block: () -> Unit) {
195 onCompletion += block
196 }
197
198 /**
Roman Elizarova3497ee2017-08-10 20:25:36 +0300199 * Creates a new test thread in this environment that is executes a given lock-free [operation]
200 * in a loop while this environment [isActive].
201 */
Roman Elizarov746a5d82019-08-23 14:56:16 +0300202 public fun testThread(name: String? = null, operation: suspend TestThread.() -> Unit): TestThread =
203 TestThread(name, operation)
Roman Elizarova3497ee2017-08-10 20:25:36 +0300204
205 /**
206 * Test thread.
207 */
208 @Suppress("LeakingThis")
Roman Elizarov746a5d82019-08-23 14:56:16 +0300209 public inner class TestThread internal constructor(
210 name: String?,
211 private val operation: suspend TestThread.() -> Unit
212 ) : Thread(composeThreadName(name)) {
Roman Elizarov67e4a432017-08-17 10:18:42 +0300213 internal val index: Int
Roman Elizarov5ea61fd2017-08-15 10:53:27 +0300214
Roman Elizarov4684e692017-08-14 19:06:34 +0300215 internal @Volatile var lastOpTime = 0L
216 internal @Volatile var pausedEpoch = -1
217
Roman Elizarov67e4a432017-08-17 10:18:42 +0300218 private val random = Random()
219
Roman Elizarov4684e692017-08-14 19:06:34 +0300220 // thread-local stuff
221 private var operationEpoch = -1
222 private var progressEpoch = -1
Roman Elizarov4684e692017-08-14 19:06:34 +0300223 private var sink = 0
Roman Elizarova3497ee2017-08-10 20:25:36 +0300224
225 init {
226 check(!started)
Roman Elizarov67e4a432017-08-17 10:18:42 +0300227 index = threads.size
Roman Elizarova3497ee2017-08-10 20:25:36 +0300228 threads += this
229 }
230
Roman Elizarov746a5d82019-08-23 14:56:16 +0300231 public override fun run() {
Roman Elizarova3497ee2017-08-10 20:25:36 +0300232 while (isActive) {
Roman Elizarov746a5d82019-08-23 14:56:16 +0300233 callOperation()
Roman Elizarova3497ee2017-08-10 20:25:36 +0300234 }
235 }
236
Roman Elizarov4684e692017-08-14 19:06:34 +0300237 /**
238 * Use it to insert an arbitrary intermission between lock-free operations.
239 */
240 public inline fun <T> intermission(block: () -> T): T {
241 afterLockFreeOperation()
242 return try { block() }
243 finally { beforeLockFreeOperation() }
244 }
245
Roman Elizarov4684e692017-08-14 19:06:34 +0300246 @PublishedApi
247 internal fun beforeLockFreeOperation() {
248 operationEpoch = getPausedEpoch()
249 }
250
251 @PublishedApi
252 internal fun afterLockFreeOperation() {
Roman Elizarov746a5d82019-08-23 14:56:16 +0300253 makeProgress(operationEpoch)
Roman Elizarov4684e692017-08-14 19:06:34 +0300254 lastOpTime = System.currentTimeMillis()
255 performedOps.add(1)
Roman Elizarov4684e692017-08-14 19:06:34 +0300256 }
257
Roman Elizarov98815b12019-09-03 11:08:28 +0300258 internal fun makeProgress(epoch: Int) {
Roman Elizarov746a5d82019-08-23 14:56:16 +0300259 if (epoch <= progressEpoch) return
260 progressEpoch = epoch
261 val total = globalPauseProgress.incrementAndGet()
262 if (total >= threads.size - 1) {
263 check(total == threads.size - 1)
264 check(globalPauseProgress.compareAndSet(threads.size - 1, 0))
265 resumeImpl()
266 }
267 }
268
Roman Elizarov4684e692017-08-14 19:06:34 +0300269 /**
270 * Inserts random spin wait between multiple lock-free operations in [operation].
271 */
272 public fun randomSpinWaitIntermission() {
273 intermission {
274 if (random.nextInt(100) < 95) return // be quick, no wait 95% of time
275 do {
276 val x = random.nextInt(100)
277 repeat(x) { sink += it }
278 } while (x >= 90)
279 }
280 }
Roman Elizarovc201b132017-08-16 14:40:16 +0300281
282 internal fun stepImpl() {
283 if (random.nextInt(PAUSE_EVERY_N_STEPS) == 0) pauseImpl()
284 }
285
286 internal fun pauseImpl() {
287 while (true) {
288 val curStatus = status.get()
289 if (curStatus < 0 || curStatus == STATUS_DONE) return // some other thread paused or done
290 pausedEpoch = curStatus + 1
Roman Elizarov67e4a432017-08-17 10:18:42 +0300291 val newStatus = index.inv()
Roman Elizarovc201b132017-08-16 14:40:16 +0300292 if (status.compareAndSet(curStatus, newStatus)) {
Roman Elizarov46f988a2017-08-16 19:33:13 +0300293 while (status.get() == newStatus) LockSupport.parkNanos(MAX_PARK_NANOS) // wait
Roman Elizarovc201b132017-08-16 14:40:16 +0300294 return
295 }
296 }
297 }
Roman Elizarov746a5d82019-08-23 14:56:16 +0300298
299 // ----- Lightweight support for suspending operations -----
300
301 private fun callOperation() {
302 beforeLockFreeOperation()
303 beginRunningOperation()
304 val result = operation.startCoroutineUninterceptedOrReturn(this, completion)
305 when {
306 result === Unit -> afterLockFreeOperation() // operation completed w/o suspension -- done
307 result === COROUTINE_SUSPENDED -> waitUntilCompletion() // operation had suspended
308 else -> error("Unexpected result of operation: $result")
309 }
310 try {
311 doneRunningOperation()
312 } catch(e: IllegalStateException) {
313 throw IllegalStateException("${e.message}; original start result=$result", e)
314 }
315 }
316
317 private var runningOperation = false
318 private var result: Result<Any?>? = null
319 private var continuation: Continuation<Any?>? = null
320
321 private fun waitUntilCompletion() {
322 try {
323 while (true) {
324 afterLockFreeOperation()
325 val result: Result<Any?> = waitForResult()
326 val continuation = takeContinuation()
327 if (continuation == null) { // done
328 check(result.getOrThrow() === Unit)
329 return
330 }
331 removeSuspended(this)
332 beforeLockFreeOperation()
333 continuation.resumeWith(result)
334 }
335 } finally {
336 removeSuspended(this)
337 }
338 }
339
340 private fun beginRunningOperation() {
341 runningOperation = true
342 result = null
343 continuation = null
344 }
345
346 @Synchronized
347 private fun doneRunningOperation() {
348 check(runningOperation) { "Should be running operation" }
349 check(result == null && continuation == null) {
350 "Callback invoked with result=$result, continuation=$continuation"
351 }
352 runningOperation = false
353 }
354
355 @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
356 @Synchronized
357 private fun resumeWith(result: Result<Any?>, continuation: Continuation<Any?>?) {
358 check(runningOperation) { "Should be running operation" }
359 check(this.result == null && this.continuation == null) {
360 "Resumed again with result=$result, continuation=$continuation, when this: result=${this.result}, continuation=${this.continuation}"
361 }
362 this.result = result
363 this.continuation = continuation
364 (this as Object).notifyAll()
365 }
366
367 @Suppress("RESULT_CLASS_IN_RETURN_TYPE", "PLATFORM_CLASS_MAPPED_TO_KOTLIN")
368 @Synchronized
369 private fun waitForResult(): Result<Any?> {
370 while (true) {
371 val result = this.result
372 if (result != null) return result
373 val index = addSuspended(this)
374 if (index < allowSuspendedThreads) {
375 // This suspension was permitted, so assume progress is happening while it is suspended
376 makeProgress(getPausedEpoch())
377 }
378 (this as Object).wait(10) // at most 10 ms
379 }
380 }
381
382 @Synchronized
383 private fun takeContinuation(): Continuation<Any?>? =
384 continuation.also {
385 this.result = null
386 this.continuation = null
387 }
388
389 @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN")
390 @Synchronized
391 fun abortWait() {
392 this.result = Result.failure(IllegalStateException("Aborted at the end of test"))
393 (this as Object).notifyAll()
394 }
395
396 private val interceptor: CoroutineContext = object : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
397 override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
398 Continuation<T>(this) {
399 @Suppress("UNCHECKED_CAST")
400 resumeWith(it, continuation as Continuation<Any?>)
401 }
402 }
403
404 private val completion = Continuation<Unit>(interceptor) {
405 resumeWith(it, null)
406 }
Roman Elizarova3497ee2017-08-10 20:25:36 +0300407 }
408
409 // ---------- Implementation ----------
410
Roman Elizarov746a5d82019-08-23 14:56:16 +0300411 @Synchronized
412 private fun addSuspended(thread: TestThread): Int {
413 val index = suspendedThreads.indexOf(thread)
414 if (index >= 0) return index
415 suspendedThreads.add(thread)
416 return suspendedThreads.size - 1
417 }
418
419 @Synchronized
420 private fun removeSuspended(thread: TestThread) {
421 suspendedThreads.remove(thread)
422 }
423
Roman Elizarova3497ee2017-08-10 20:25:36 +0300424 private fun getPausedEpoch(): Int {
425 while (true) {
Roman Elizarov5ea61fd2017-08-15 10:53:27 +0300426 val curStatus = status.get()
427 if (curStatus >= 0) return -1 // not paused
428 val thread = threads[curStatus.inv()]
Roman Elizarov46f988a2017-08-16 19:33:13 +0300429 val pausedEpoch = thread.pausedEpoch
430 if (curStatus == status.get()) return pausedEpoch
Roman Elizarova3497ee2017-08-10 20:25:36 +0300431 }
432 }
433
434 internal fun step() {
Roman Elizarovc201b132017-08-16 14:40:16 +0300435 val thread = Thread.currentThread() as? TestThread ?: return
436 thread.stepImpl()
Roman Elizarova3497ee2017-08-10 20:25:36 +0300437 }
438
Roman Elizarovc201b132017-08-16 14:40:16 +0300439 private fun resumeImpl() {
Roman Elizarov5ea61fd2017-08-15 10:53:27 +0300440 while (true) {
441 val curStatus = status.get()
442 if (curStatus == STATUS_DONE) return // done
443 check(curStatus < 0)
444 val thread = threads[curStatus.inv()]
445 performedResumes = thread.pausedEpoch
446 if (status.compareAndSet(curStatus, thread.pausedEpoch)) {
447 LockSupport.unpark(thread)
448 return
449 }
450 }
Roman Elizarova3497ee2017-08-10 20:25:36 +0300451 }
452
453 private fun composeThreadName(threadName: String?): String {
454 if (threadName != null) return "$name-$threadName"
455 return name + "-${threads.size + 1}"
456 }
457
458 private inner class Interceptor : AtomicOperationInterceptor() {
Roman Elizarova65501b2017-08-10 20:57:41 +0300459 override fun <T> beforeUpdate(ref: AtomicRef<T>) = step()
460 override fun beforeUpdate(ref: AtomicInt) = step()
461 override fun beforeUpdate(ref: AtomicLong) = step()
462 override fun <T> afterSet(ref: AtomicRef<T>, newValue: T) = step()
463 override fun afterSet(ref: AtomicInt, newValue: Int) = step()
464 override fun afterSet(ref: AtomicLong, newValue: Long) = step()
465 override fun <T> afterRMW(ref: AtomicRef<T>, oldValue: T, newValue: T) = step()
466 override fun afterRMW(ref: AtomicInt, oldValue: Int, newValue: Int) = step()
467 override fun afterRMW(ref: AtomicLong, oldValue: Long, newValue: Long) = step()
Roman Elizarova3497ee2017-08-10 20:25:36 +0300468 override fun toString(): String = "LockFreedomTestEnvironment($name)"
469 }
Roman Elizarovc201b132017-08-16 14:40:16 +0300470}
471
472/**
473 * Manual pause for on-going lock-free operation in a specified piece of code.
474 * Use it for targeted debugging of specific places in code. It does nothing
475 * when invoked outside of test thread.
476 *
477 * **Don't use it in production code.**
478 */
479public fun pauseLockFreeOp() {
480 val thread = Thread.currentThread() as? LockFreedomTestEnvironment.TestThread ?: return
481 thread.pauseImpl()
Roman Elizarova3497ee2017-08-10 20:25:36 +0300482}