Antoine Pitrou | b43c4ca | 2017-09-18 22:04:20 +0200 | [diff] [blame] | 1 | import _dummy_thread as _thread |
| 2 | import time |
| 3 | import queue |
| 4 | import random |
| 5 | import unittest |
| 6 | from test import support |
| 7 | from unittest import mock |
| 8 | |
| 9 | DELAY = 0 |
| 10 | |
| 11 | |
| 12 | class LockTests(unittest.TestCase): |
| 13 | """Test lock objects.""" |
| 14 | |
| 15 | def setUp(self): |
| 16 | # Create a lock |
| 17 | self.lock = _thread.allocate_lock() |
| 18 | |
| 19 | def test_initlock(self): |
| 20 | #Make sure locks start locked |
| 21 | self.assertFalse(self.lock.locked(), |
| 22 | "Lock object is not initialized unlocked.") |
| 23 | |
| 24 | def test_release(self): |
| 25 | # Test self.lock.release() |
| 26 | self.lock.acquire() |
| 27 | self.lock.release() |
| 28 | self.assertFalse(self.lock.locked(), |
| 29 | "Lock object did not release properly.") |
| 30 | |
| 31 | def test_LockType_context_manager(self): |
| 32 | with _thread.LockType(): |
| 33 | pass |
| 34 | self.assertFalse(self.lock.locked(), |
| 35 | "Acquired Lock was not released") |
| 36 | |
| 37 | def test_improper_release(self): |
| 38 | #Make sure release of an unlocked thread raises RuntimeError |
| 39 | self.assertRaises(RuntimeError, self.lock.release) |
| 40 | |
| 41 | def test_cond_acquire_success(self): |
| 42 | #Make sure the conditional acquiring of the lock works. |
| 43 | self.assertTrue(self.lock.acquire(0), |
| 44 | "Conditional acquiring of the lock failed.") |
| 45 | |
| 46 | def test_cond_acquire_fail(self): |
| 47 | #Test acquiring locked lock returns False |
| 48 | self.lock.acquire(0) |
| 49 | self.assertFalse(self.lock.acquire(0), |
| 50 | "Conditional acquiring of a locked lock incorrectly " |
| 51 | "succeeded.") |
| 52 | |
| 53 | def test_uncond_acquire_success(self): |
| 54 | #Make sure unconditional acquiring of a lock works. |
| 55 | self.lock.acquire() |
| 56 | self.assertTrue(self.lock.locked(), |
| 57 | "Uncondional locking failed.") |
| 58 | |
| 59 | def test_uncond_acquire_return_val(self): |
| 60 | #Make sure that an unconditional locking returns True. |
| 61 | self.assertIs(self.lock.acquire(1), True, |
| 62 | "Unconditional locking did not return True.") |
| 63 | self.assertIs(self.lock.acquire(), True) |
| 64 | |
| 65 | def test_uncond_acquire_blocking(self): |
| 66 | #Make sure that unconditional acquiring of a locked lock blocks. |
| 67 | def delay_unlock(to_unlock, delay): |
| 68 | """Hold on to lock for a set amount of time before unlocking.""" |
| 69 | time.sleep(delay) |
| 70 | to_unlock.release() |
| 71 | |
| 72 | self.lock.acquire() |
Victor Stinner | 2cf4c20 | 2018-12-17 09:36:36 +0100 | [diff] [blame^] | 73 | start_time = int(time.monotonic()) |
Antoine Pitrou | b43c4ca | 2017-09-18 22:04:20 +0200 | [diff] [blame] | 74 | _thread.start_new_thread(delay_unlock,(self.lock, DELAY)) |
| 75 | if support.verbose: |
| 76 | print() |
| 77 | print("*** Waiting for thread to release the lock "\ |
| 78 | "(approx. %s sec.) ***" % DELAY) |
| 79 | self.lock.acquire() |
Victor Stinner | 2cf4c20 | 2018-12-17 09:36:36 +0100 | [diff] [blame^] | 80 | end_time = int(time.monotonic()) |
Antoine Pitrou | b43c4ca | 2017-09-18 22:04:20 +0200 | [diff] [blame] | 81 | if support.verbose: |
| 82 | print("done") |
| 83 | self.assertGreaterEqual(end_time - start_time, DELAY, |
| 84 | "Blocking by unconditional acquiring failed.") |
| 85 | |
| 86 | @mock.patch('time.sleep') |
| 87 | def test_acquire_timeout(self, mock_sleep): |
| 88 | """Test invoking acquire() with a positive timeout when the lock is |
| 89 | already acquired. Ensure that time.sleep() is invoked with the given |
| 90 | timeout and that False is returned.""" |
| 91 | |
| 92 | self.lock.acquire() |
| 93 | retval = self.lock.acquire(waitflag=0, timeout=1) |
| 94 | self.assertTrue(mock_sleep.called) |
| 95 | mock_sleep.assert_called_once_with(1) |
| 96 | self.assertEqual(retval, False) |
| 97 | |
| 98 | def test_lock_representation(self): |
| 99 | self.lock.acquire() |
| 100 | self.assertIn("locked", repr(self.lock)) |
| 101 | self.lock.release() |
| 102 | self.assertIn("unlocked", repr(self.lock)) |
| 103 | |
| 104 | |
| 105 | class MiscTests(unittest.TestCase): |
| 106 | """Miscellaneous tests.""" |
| 107 | |
| 108 | def test_exit(self): |
| 109 | self.assertRaises(SystemExit, _thread.exit) |
| 110 | |
| 111 | def test_ident(self): |
| 112 | self.assertIsInstance(_thread.get_ident(), int, |
| 113 | "_thread.get_ident() returned a non-integer") |
| 114 | self.assertGreater(_thread.get_ident(), 0) |
| 115 | |
| 116 | def test_LockType(self): |
| 117 | self.assertIsInstance(_thread.allocate_lock(), _thread.LockType, |
| 118 | "_thread.LockType is not an instance of what " |
| 119 | "is returned by _thread.allocate_lock()") |
| 120 | |
| 121 | def test_set_sentinel(self): |
| 122 | self.assertIsInstance(_thread._set_sentinel(), _thread.LockType, |
| 123 | "_thread._set_sentinel() did not return a " |
| 124 | "LockType instance.") |
| 125 | |
| 126 | def test_interrupt_main(self): |
| 127 | #Calling start_new_thread with a function that executes interrupt_main |
| 128 | # should raise KeyboardInterrupt upon completion. |
| 129 | def call_interrupt(): |
| 130 | _thread.interrupt_main() |
| 131 | |
| 132 | self.assertRaises(KeyboardInterrupt, |
| 133 | _thread.start_new_thread, |
| 134 | call_interrupt, |
| 135 | tuple()) |
| 136 | |
| 137 | def test_interrupt_in_main(self): |
| 138 | self.assertRaises(KeyboardInterrupt, _thread.interrupt_main) |
| 139 | |
| 140 | def test_stack_size_None(self): |
| 141 | retval = _thread.stack_size(None) |
| 142 | self.assertEqual(retval, 0) |
| 143 | |
| 144 | def test_stack_size_not_None(self): |
| 145 | with self.assertRaises(_thread.error) as cm: |
| 146 | _thread.stack_size("") |
| 147 | self.assertEqual(cm.exception.args[0], |
| 148 | "setting thread stack size not supported") |
| 149 | |
| 150 | |
| 151 | class ThreadTests(unittest.TestCase): |
| 152 | """Test thread creation.""" |
| 153 | |
| 154 | def test_arg_passing(self): |
| 155 | #Make sure that parameter passing works. |
| 156 | def arg_tester(queue, arg1=False, arg2=False): |
| 157 | """Use to test _thread.start_new_thread() passes args properly.""" |
| 158 | queue.put((arg1, arg2)) |
| 159 | |
| 160 | testing_queue = queue.Queue(1) |
| 161 | _thread.start_new_thread(arg_tester, (testing_queue, True, True)) |
| 162 | result = testing_queue.get() |
| 163 | self.assertTrue(result[0] and result[1], |
| 164 | "Argument passing for thread creation " |
| 165 | "using tuple failed") |
| 166 | |
| 167 | _thread.start_new_thread( |
| 168 | arg_tester, |
| 169 | tuple(), |
| 170 | {'queue':testing_queue, 'arg1':True, 'arg2':True}) |
| 171 | |
| 172 | result = testing_queue.get() |
| 173 | self.assertTrue(result[0] and result[1], |
| 174 | "Argument passing for thread creation " |
| 175 | "using kwargs failed") |
| 176 | |
| 177 | _thread.start_new_thread( |
| 178 | arg_tester, |
| 179 | (testing_queue, True), |
| 180 | {'arg2':True}) |
| 181 | |
| 182 | result = testing_queue.get() |
| 183 | self.assertTrue(result[0] and result[1], |
| 184 | "Argument passing for thread creation using both tuple" |
| 185 | " and kwargs failed") |
| 186 | |
| 187 | def test_multi_thread_creation(self): |
| 188 | def queue_mark(queue, delay): |
| 189 | time.sleep(delay) |
| 190 | queue.put(_thread.get_ident()) |
| 191 | |
| 192 | thread_count = 5 |
| 193 | testing_queue = queue.Queue(thread_count) |
| 194 | |
| 195 | if support.verbose: |
| 196 | print() |
| 197 | print("*** Testing multiple thread creation " |
| 198 | "(will take approx. %s to %s sec.) ***" % ( |
| 199 | DELAY, thread_count)) |
| 200 | |
| 201 | for count in range(thread_count): |
| 202 | if DELAY: |
| 203 | local_delay = round(random.random(), 1) |
| 204 | else: |
| 205 | local_delay = 0 |
| 206 | _thread.start_new_thread(queue_mark, |
| 207 | (testing_queue, local_delay)) |
| 208 | time.sleep(DELAY) |
| 209 | if support.verbose: |
| 210 | print('done') |
| 211 | self.assertEqual(testing_queue.qsize(), thread_count, |
| 212 | "Not all %s threads executed properly " |
| 213 | "after %s sec." % (thread_count, DELAY)) |
| 214 | |
| 215 | def test_args_not_tuple(self): |
| 216 | """ |
| 217 | Test invoking start_new_thread() with a non-tuple value for "args". |
| 218 | Expect TypeError with a meaningful error message to be raised. |
| 219 | """ |
| 220 | with self.assertRaises(TypeError) as cm: |
| 221 | _thread.start_new_thread(mock.Mock(), []) |
| 222 | self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple") |
| 223 | |
| 224 | def test_kwargs_not_dict(self): |
| 225 | """ |
| 226 | Test invoking start_new_thread() with a non-dict value for "kwargs". |
| 227 | Expect TypeError with a meaningful error message to be raised. |
| 228 | """ |
| 229 | with self.assertRaises(TypeError) as cm: |
| 230 | _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[]) |
| 231 | self.assertEqual(cm.exception.args[0], "3rd arg must be a dict") |
| 232 | |
| 233 | def test_SystemExit(self): |
| 234 | """ |
| 235 | Test invoking start_new_thread() with a function that raises |
| 236 | SystemExit. |
| 237 | The exception should be discarded. |
| 238 | """ |
| 239 | func = mock.Mock(side_effect=SystemExit()) |
| 240 | try: |
| 241 | _thread.start_new_thread(func, tuple()) |
| 242 | except SystemExit: |
| 243 | self.fail("start_new_thread raised SystemExit.") |
| 244 | |
| 245 | @mock.patch('traceback.print_exc') |
| 246 | def test_RaiseException(self, mock_print_exc): |
| 247 | """ |
| 248 | Test invoking start_new_thread() with a function that raises exception. |
| 249 | |
| 250 | The exception should be discarded and the traceback should be printed |
| 251 | via traceback.print_exc() |
| 252 | """ |
| 253 | func = mock.Mock(side_effect=Exception) |
| 254 | _thread.start_new_thread(func, tuple()) |
| 255 | self.assertTrue(mock_print_exc.called) |