blob: da512167834f7f0cb8c337160acc8485885fa58f [file] [log] [blame]
Antoine Pitroub43c4ca2017-09-18 22:04:20 +02001import _dummy_thread as _thread
2import time
3import queue
4import random
5import unittest
6from test import support
7from unittest import mock
8
9DELAY = 0
10
11
12class LockTests(unittest.TestCase):
13 """Test lock objects."""
14
15 def setUp(self):
16 # Create a lock
17 self.lock = _thread.allocate_lock()
18
19 def test_initlock(self):
20 #Make sure locks start locked
21 self.assertFalse(self.lock.locked(),
22 "Lock object is not initialized unlocked.")
23
24 def test_release(self):
25 # Test self.lock.release()
26 self.lock.acquire()
27 self.lock.release()
28 self.assertFalse(self.lock.locked(),
29 "Lock object did not release properly.")
30
31 def test_LockType_context_manager(self):
32 with _thread.LockType():
33 pass
34 self.assertFalse(self.lock.locked(),
35 "Acquired Lock was not released")
36
37 def test_improper_release(self):
38 #Make sure release of an unlocked thread raises RuntimeError
39 self.assertRaises(RuntimeError, self.lock.release)
40
41 def test_cond_acquire_success(self):
42 #Make sure the conditional acquiring of the lock works.
43 self.assertTrue(self.lock.acquire(0),
44 "Conditional acquiring of the lock failed.")
45
46 def test_cond_acquire_fail(self):
47 #Test acquiring locked lock returns False
48 self.lock.acquire(0)
49 self.assertFalse(self.lock.acquire(0),
50 "Conditional acquiring of a locked lock incorrectly "
51 "succeeded.")
52
53 def test_uncond_acquire_success(self):
54 #Make sure unconditional acquiring of a lock works.
55 self.lock.acquire()
56 self.assertTrue(self.lock.locked(),
57 "Uncondional locking failed.")
58
59 def test_uncond_acquire_return_val(self):
60 #Make sure that an unconditional locking returns True.
61 self.assertIs(self.lock.acquire(1), True,
62 "Unconditional locking did not return True.")
63 self.assertIs(self.lock.acquire(), True)
64
65 def test_uncond_acquire_blocking(self):
66 #Make sure that unconditional acquiring of a locked lock blocks.
67 def delay_unlock(to_unlock, delay):
68 """Hold on to lock for a set amount of time before unlocking."""
69 time.sleep(delay)
70 to_unlock.release()
71
72 self.lock.acquire()
Victor Stinner2cf4c202018-12-17 09:36:36 +010073 start_time = int(time.monotonic())
Antoine Pitroub43c4ca2017-09-18 22:04:20 +020074 _thread.start_new_thread(delay_unlock,(self.lock, DELAY))
75 if support.verbose:
76 print()
77 print("*** Waiting for thread to release the lock "\
78 "(approx. %s sec.) ***" % DELAY)
79 self.lock.acquire()
Victor Stinner2cf4c202018-12-17 09:36:36 +010080 end_time = int(time.monotonic())
Antoine Pitroub43c4ca2017-09-18 22:04:20 +020081 if support.verbose:
82 print("done")
83 self.assertGreaterEqual(end_time - start_time, DELAY,
84 "Blocking by unconditional acquiring failed.")
85
86 @mock.patch('time.sleep')
87 def test_acquire_timeout(self, mock_sleep):
88 """Test invoking acquire() with a positive timeout when the lock is
89 already acquired. Ensure that time.sleep() is invoked with the given
90 timeout and that False is returned."""
91
92 self.lock.acquire()
93 retval = self.lock.acquire(waitflag=0, timeout=1)
94 self.assertTrue(mock_sleep.called)
95 mock_sleep.assert_called_once_with(1)
96 self.assertEqual(retval, False)
97
98 def test_lock_representation(self):
99 self.lock.acquire()
100 self.assertIn("locked", repr(self.lock))
101 self.lock.release()
102 self.assertIn("unlocked", repr(self.lock))
103
104
105class MiscTests(unittest.TestCase):
106 """Miscellaneous tests."""
107
108 def test_exit(self):
109 self.assertRaises(SystemExit, _thread.exit)
110
111 def test_ident(self):
112 self.assertIsInstance(_thread.get_ident(), int,
113 "_thread.get_ident() returned a non-integer")
114 self.assertGreater(_thread.get_ident(), 0)
115
116 def test_LockType(self):
117 self.assertIsInstance(_thread.allocate_lock(), _thread.LockType,
118 "_thread.LockType is not an instance of what "
119 "is returned by _thread.allocate_lock()")
120
121 def test_set_sentinel(self):
122 self.assertIsInstance(_thread._set_sentinel(), _thread.LockType,
123 "_thread._set_sentinel() did not return a "
124 "LockType instance.")
125
126 def test_interrupt_main(self):
127 #Calling start_new_thread with a function that executes interrupt_main
128 # should raise KeyboardInterrupt upon completion.
129 def call_interrupt():
130 _thread.interrupt_main()
131
132 self.assertRaises(KeyboardInterrupt,
133 _thread.start_new_thread,
134 call_interrupt,
135 tuple())
136
137 def test_interrupt_in_main(self):
138 self.assertRaises(KeyboardInterrupt, _thread.interrupt_main)
139
140 def test_stack_size_None(self):
141 retval = _thread.stack_size(None)
142 self.assertEqual(retval, 0)
143
144 def test_stack_size_not_None(self):
145 with self.assertRaises(_thread.error) as cm:
146 _thread.stack_size("")
147 self.assertEqual(cm.exception.args[0],
148 "setting thread stack size not supported")
149
150
151class ThreadTests(unittest.TestCase):
152 """Test thread creation."""
153
154 def test_arg_passing(self):
155 #Make sure that parameter passing works.
156 def arg_tester(queue, arg1=False, arg2=False):
157 """Use to test _thread.start_new_thread() passes args properly."""
158 queue.put((arg1, arg2))
159
160 testing_queue = queue.Queue(1)
161 _thread.start_new_thread(arg_tester, (testing_queue, True, True))
162 result = testing_queue.get()
163 self.assertTrue(result[0] and result[1],
164 "Argument passing for thread creation "
165 "using tuple failed")
166
167 _thread.start_new_thread(
168 arg_tester,
169 tuple(),
170 {'queue':testing_queue, 'arg1':True, 'arg2':True})
171
172 result = testing_queue.get()
173 self.assertTrue(result[0] and result[1],
174 "Argument passing for thread creation "
175 "using kwargs failed")
176
177 _thread.start_new_thread(
178 arg_tester,
179 (testing_queue, True),
180 {'arg2':True})
181
182 result = testing_queue.get()
183 self.assertTrue(result[0] and result[1],
184 "Argument passing for thread creation using both tuple"
185 " and kwargs failed")
186
187 def test_multi_thread_creation(self):
188 def queue_mark(queue, delay):
189 time.sleep(delay)
190 queue.put(_thread.get_ident())
191
192 thread_count = 5
193 testing_queue = queue.Queue(thread_count)
194
195 if support.verbose:
196 print()
197 print("*** Testing multiple thread creation "
198 "(will take approx. %s to %s sec.) ***" % (
199 DELAY, thread_count))
200
201 for count in range(thread_count):
202 if DELAY:
203 local_delay = round(random.random(), 1)
204 else:
205 local_delay = 0
206 _thread.start_new_thread(queue_mark,
207 (testing_queue, local_delay))
208 time.sleep(DELAY)
209 if support.verbose:
210 print('done')
211 self.assertEqual(testing_queue.qsize(), thread_count,
212 "Not all %s threads executed properly "
213 "after %s sec." % (thread_count, DELAY))
214
215 def test_args_not_tuple(self):
216 """
217 Test invoking start_new_thread() with a non-tuple value for "args".
218 Expect TypeError with a meaningful error message to be raised.
219 """
220 with self.assertRaises(TypeError) as cm:
221 _thread.start_new_thread(mock.Mock(), [])
222 self.assertEqual(cm.exception.args[0], "2nd arg must be a tuple")
223
224 def test_kwargs_not_dict(self):
225 """
226 Test invoking start_new_thread() with a non-dict value for "kwargs".
227 Expect TypeError with a meaningful error message to be raised.
228 """
229 with self.assertRaises(TypeError) as cm:
230 _thread.start_new_thread(mock.Mock(), tuple(), kwargs=[])
231 self.assertEqual(cm.exception.args[0], "3rd arg must be a dict")
232
233 def test_SystemExit(self):
234 """
235 Test invoking start_new_thread() with a function that raises
236 SystemExit.
237 The exception should be discarded.
238 """
239 func = mock.Mock(side_effect=SystemExit())
240 try:
241 _thread.start_new_thread(func, tuple())
242 except SystemExit:
243 self.fail("start_new_thread raised SystemExit.")
244
245 @mock.patch('traceback.print_exc')
246 def test_RaiseException(self, mock_print_exc):
247 """
248 Test invoking start_new_thread() with a function that raises exception.
249
250 The exception should be discarded and the traceback should be printed
251 via traceback.print_exc()
252 """
253 func = mock.Mock(side_effect=Exception)
254 _thread.start_new_thread(func, tuple())
255 self.assertTrue(mock_print_exc.called)