blob: 6ee906c4d2665a763e7d3ba9abe31c28017fe0b1 [file] [log] [blame]
Alexandre Vassalottif260e442008-05-11 19:59:59 +00001# Some simple queue module tests, plus some failure conditions
Tim Petersafe52972004-08-20 02:37:25 +00002# to ensure the Queue locks remain stable.
Alexandre Vassalottif260e442008-05-11 19:59:59 +00003import queue
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02004import threading
Mark Hammond3b959db2002-04-19 00:11:32 +00005import time
Georg Brandl0e3b0ec2008-02-05 18:48:51 +00006import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00007from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02008
Mark Hammond3b959db2002-04-19 00:11:32 +00009
Tim Petersafe52972004-08-20 02:37:25 +000010QUEUE_SIZE = 5
Mark Hammond3b959db2002-04-19 00:11:32 +000011
Raymond Hettingerda3caed2008-01-14 21:39:24 +000012def qfull(q):
13 return q.maxsize > 0 and q.qsize() == q.maxsize
14
Tim Petersafe52972004-08-20 02:37:25 +000015# A thread to run a function that unclogs a blocked Queue.
Mark Hammond3b959db2002-04-19 00:11:32 +000016class _TriggerThread(threading.Thread):
17 def __init__(self, fn, args):
18 self.fn = fn
19 self.args = args
20 self.startedEvent = threading.Event()
21 threading.Thread.__init__(self)
Tim Petersafe52972004-08-20 02:37:25 +000022
Mark Hammond3b959db2002-04-19 00:11:32 +000023 def run(self):
Tim Peters8d7626c2004-08-20 03:27:12 +000024 # The sleep isn't necessary, but is intended to give the blocking
25 # function in the main thread a chance at actually blocking before
26 # we unclog it. But if the sleep is longer than the timeout-based
27 # tests wait in their blocking functions, those tests will fail.
28 # So we give them much longer timeout values compared to the
29 # sleep here (I aimed at 10 seconds for blocking functions --
30 # they should never actually wait that long - they should make
31 # progress as soon as we call self.fn()).
32 time.sleep(0.1)
Mark Hammond3b959db2002-04-19 00:11:32 +000033 self.startedEvent.set()
34 self.fn(*self.args)
35
Tim Peters8d7626c2004-08-20 03:27:12 +000036
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000037# Execute a function that blocks, and in a separate thread, a function that
38# triggers the release. Returns the result of the blocking function. Caution:
39# block_func must guarantee to block until trigger_func is called, and
40# trigger_func must guarantee to change queue state so that block_func can make
41# enough progress to return. In particular, a block_func that just raises an
42# exception regardless of whether trigger_func is called will lead to
43# timing-dependent sporadic failures, and one of those went rarely seen but
44# undiagnosed for years. Now block_func must be unexceptional. If block_func
45# is supposed to raise an exception, call do_exceptional_blocking_test()
46# instead.
47
48class BlockingTestMixin:
49
50 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
Victor Stinner167cbde2017-09-14 14:04:56 -070051 thread = _TriggerThread(trigger_func, trigger_args)
52 thread.start()
53 try:
54 self.result = block_func(*block_args)
55 # If block_func returned before our thread made the call, we failed!
56 if not thread.startedEvent.is_set():
Serhiy Storchakaa4a30202017-11-28 22:54:42 +020057 self.fail("blocking function %r appeared not to block" %
Victor Stinner167cbde2017-09-14 14:04:56 -070058 block_func)
59 return self.result
60 finally:
Victor Stinnerb9b69002017-09-14 14:40:56 -070061 support.join_thread(thread, 10) # make sure the thread terminates
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000062
63 # Call this instead if block_func is supposed to raise an exception.
64 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
65 trigger_args, expected_exception_class):
Victor Stinner167cbde2017-09-14 14:04:56 -070066 thread = _TriggerThread(trigger_func, trigger_args)
67 thread.start()
Tim Peters8d7626c2004-08-20 03:27:12 +000068 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000069 try:
70 block_func(*block_args)
71 except expected_exception_class:
72 raise
73 else:
74 self.fail("expected exception of kind %r" %
75 expected_exception_class)
76 finally:
Victor Stinnerb9b69002017-09-14 14:40:56 -070077 support.join_thread(thread, 10) # make sure the thread terminates
Victor Stinner167cbde2017-09-14 14:04:56 -070078 if not thread.startedEvent.is_set():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000079 self.fail("trigger thread ended but event never set")
80
81
R David Murrayc6bfce92012-03-17 16:38:39 -040082class BaseQueueTestMixin(BlockingTestMixin):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000083 def setUp(self):
84 self.cum = 0
85 self.cumlock = threading.Lock()
86
87 def simple_queue_test(self, q):
88 if q.qsize():
89 raise RuntimeError("Call this function with an empty queue")
Brett Cannon671153d2010-07-23 16:56:21 +000090 self.assertTrue(q.empty())
91 self.assertFalse(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000092 # I guess we better check things actually queue correctly a little :)
93 q.put(111)
94 q.put(333)
95 q.put(222)
96 target_order = dict(Queue = [111, 333, 222],
97 LifoQueue = [222, 333, 111],
98 PriorityQueue = [111, 222, 333])
99 actual_order = [q.get(), q.get(), q.get()]
Ezio Melottib3aedd42010-11-20 19:04:17 +0000100 self.assertEqual(actual_order, target_order[q.__class__.__name__],
101 "Didn't seem to queue the correct data!")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000102 for i in range(QUEUE_SIZE-1):
103 q.put(i)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000104 self.assertTrue(q.qsize(), "Queue should not be empty")
105 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000106 last = 2 * QUEUE_SIZE
107 full = 3 * 2 * QUEUE_SIZE
108 q.put(last)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000109 self.assertTrue(qfull(q), "Queue should be full")
Brett Cannon671153d2010-07-23 16:56:21 +0000110 self.assertFalse(q.empty())
111 self.assertTrue(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000112 try:
113 q.put(full, block=0)
114 self.fail("Didn't appear to block with a full queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000115 except queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000116 pass
117 try:
118 q.put(full, timeout=0.01)
119 self.fail("Didn't appear to time-out with a full queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000120 except queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000121 pass
122 # Test a blocking put
123 self.do_blocking_test(q.put, (full,), q.get, ())
124 self.do_blocking_test(q.put, (full, True, 10), q.get, ())
125 # Empty it
126 for i in range(QUEUE_SIZE):
127 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000128 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000129 try:
130 q.get(block=0)
131 self.fail("Didn't appear to block with an empty queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000132 except queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000133 pass
134 try:
135 q.get(timeout=0.01)
136 self.fail("Didn't appear to time-out with an empty queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000137 except queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000138 pass
139 # Test a blocking get
140 self.do_blocking_test(q.get, (), q.put, ('empty',))
141 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
142
143
144 def worker(self, q):
145 while True:
146 x = q.get()
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000147 if x < 0:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000148 q.task_done()
149 return
150 with self.cumlock:
151 self.cum += x
152 q.task_done()
153
154 def queue_join_test(self, q):
155 self.cum = 0
Victor Stinner167cbde2017-09-14 14:04:56 -0700156 threads = []
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000157 for i in (0,1):
Victor Stinner167cbde2017-09-14 14:04:56 -0700158 thread = threading.Thread(target=self.worker, args=(q,))
159 thread.start()
160 threads.append(thread)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000161 for i in range(100):
162 q.put(i)
163 q.join()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000164 self.assertEqual(self.cum, sum(range(100)),
165 "q.join() did not block until all tasks were done")
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000166 for i in (0,1):
167 q.put(-1) # instruct the threads to close
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000168 q.join() # verify that you can join twice
Victor Stinner167cbde2017-09-14 14:04:56 -0700169 for thread in threads:
170 thread.join()
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000171
172 def test_queue_task_done(self):
173 # Test to make sure a queue task completed successfully.
174 q = self.type2test()
175 try:
176 q.task_done()
177 except ValueError:
178 pass
Tim Peters8d7626c2004-08-20 03:27:12 +0000179 else:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000180 self.fail("Did not detect task count going negative")
181
182 def test_queue_join(self):
183 # Test that a queue join()s successfully, and before anything else
184 # (done twice for insurance).
185 q = self.type2test()
186 self.queue_join_test(q)
187 self.queue_join_test(q)
188 try:
189 q.task_done()
190 except ValueError:
191 pass
192 else:
193 self.fail("Did not detect task count going negative")
194
195 def test_simple_queue(self):
196 # Do it a couple of times on the same queue.
197 # Done twice to make sure works with same instance reused.
198 q = self.type2test(QUEUE_SIZE)
199 self.simple_queue_test(q)
200 self.simple_queue_test(q)
201
Brett Cannon671153d2010-07-23 16:56:21 +0000202 def test_negative_timeout_raises_exception(self):
203 q = self.type2test(QUEUE_SIZE)
204 with self.assertRaises(ValueError):
205 q.put(1, timeout=-1)
206 with self.assertRaises(ValueError):
207 q.get(1, timeout=-1)
208
209 def test_nowait(self):
210 q = self.type2test(QUEUE_SIZE)
211 for i in range(QUEUE_SIZE):
212 q.put_nowait(1)
213 with self.assertRaises(queue.Full):
214 q.put_nowait(1)
215
216 for i in range(QUEUE_SIZE):
217 q.get_nowait()
218 with self.assertRaises(queue.Empty):
219 q.get_nowait()
220
Raymond Hettinger189316a2010-10-31 17:57:52 +0000221 def test_shrinking_queue(self):
222 # issue 10110
223 q = self.type2test(3)
224 q.put(1)
225 q.put(2)
226 q.put(3)
227 with self.assertRaises(queue.Full):
228 q.put_nowait(4)
229 self.assertEqual(q.qsize(), 3)
230 q.maxsize = 2 # shrink the queue
231 with self.assertRaises(queue.Full):
232 q.put_nowait(4)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000233
R David Murrayc6bfce92012-03-17 16:38:39 -0400234class QueueTest(BaseQueueTestMixin, unittest.TestCase):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000235 type2test = queue.Queue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000236
R David Murrayc6bfce92012-03-17 16:38:39 -0400237class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000238 type2test = queue.LifoQueue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000239
R David Murrayc6bfce92012-03-17 16:38:39 -0400240class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000241 type2test = queue.PriorityQueue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000242
243
Mark Hammond3b959db2002-04-19 00:11:32 +0000244
245# A Queue subclass that can provoke failure at a moment's notice :)
246class FailingQueueException(Exception):
247 pass
248
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000249class FailingQueue(queue.Queue):
Mark Hammond3b959db2002-04-19 00:11:32 +0000250 def __init__(self, *args):
251 self.fail_next_put = False
252 self.fail_next_get = False
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000253 queue.Queue.__init__(self, *args)
Mark Hammond3b959db2002-04-19 00:11:32 +0000254 def _put(self, item):
255 if self.fail_next_put:
256 self.fail_next_put = False
Collin Winter3add4d72007-08-29 23:37:32 +0000257 raise FailingQueueException("You Lose")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000258 return queue.Queue._put(self, item)
Mark Hammond3b959db2002-04-19 00:11:32 +0000259 def _get(self):
260 if self.fail_next_get:
261 self.fail_next_get = False
Collin Winter3add4d72007-08-29 23:37:32 +0000262 raise FailingQueueException("You Lose")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000263 return queue.Queue._get(self)
Mark Hammond3b959db2002-04-19 00:11:32 +0000264
Ezio Melotti656c8082013-03-23 23:35:06 +0200265class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
Mark Hammond3b959db2002-04-19 00:11:32 +0000266
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000267 def failing_queue_test(self, q):
268 if q.qsize():
269 raise RuntimeError("Call this function with an empty queue")
270 for i in range(QUEUE_SIZE-1):
271 q.put(i)
272 # Test a failing non-blocking put.
273 q.fail_next_put = True
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000274 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000275 q.put("oops", block=0)
276 self.fail("The queue didn't fail when it should have")
277 except FailingQueueException:
278 pass
279 q.fail_next_put = True
280 try:
281 q.put("oops", timeout=0.1)
282 self.fail("The queue didn't fail when it should have")
283 except FailingQueueException:
284 pass
285 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000286 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000287 # Test a failing blocking put
288 q.fail_next_put = True
289 try:
290 self.do_blocking_test(q.put, ("full",), q.get, ())
291 self.fail("The queue didn't fail when it should have")
292 except FailingQueueException:
293 pass
294 # Check the Queue isn't damaged.
295 # put failed, but get succeeded - re-add
296 q.put("last")
297 # Test a failing timeout put
298 q.fail_next_put = True
299 try:
300 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
301 FailingQueueException)
302 self.fail("The queue didn't fail when it should have")
303 except FailingQueueException:
304 pass
305 # Check the Queue isn't damaged.
306 # put failed, but get succeeded - re-add
307 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000308 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000309 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000310 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000311 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000312 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000313 # Test a blocking put
314 self.do_blocking_test(q.put, ("full",), q.get, ())
315 # Empty it
316 for i in range(QUEUE_SIZE):
317 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000318 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000319 q.put("first")
320 q.fail_next_get = True
321 try:
322 q.get()
323 self.fail("The queue didn't fail when it should have")
324 except FailingQueueException:
325 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000326 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000327 q.fail_next_get = True
328 try:
329 q.get(timeout=0.1)
330 self.fail("The queue didn't fail when it should have")
331 except FailingQueueException:
332 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000333 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000334 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000335 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000336 q.fail_next_get = True
337 try:
338 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
339 FailingQueueException)
340 self.fail("The queue didn't fail when it should have")
341 except FailingQueueException:
342 pass
343 # put succeeded, but get failed.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000344 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000345 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000346 self.assertTrue(not q.qsize(), "Queue should be empty")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000347
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000348 def test_failing_queue(self):
349 # Test to make sure a queue is functioning correctly.
350 # Done twice to the same instance.
351 q = FailingQueue(QUEUE_SIZE)
352 self.failing_queue_test(q)
353 self.failing_queue_test(q)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000354
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000355
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000356if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -0500357 unittest.main()