blob: 86ad9c076550e305796720009e4a24a5decda775 [file] [log] [blame]
Alexandre Vassalottif260e442008-05-11 19:59:59 +00001# Some simple queue module tests, plus some failure conditions
Tim Petersafe52972004-08-20 02:37:25 +00002# to ensure the Queue locks remain stable.
Alexandre Vassalottif260e442008-05-11 19:59:59 +00003import queue
Mark Hammond3b959db2002-04-19 00:11:32 +00004import time
Georg Brandl0e3b0ec2008-02-05 18:48:51 +00005import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00006from test import support
Victor Stinner45df8202010-04-28 22:31:17 +00007threading = support.import_module('threading')
Mark Hammond3b959db2002-04-19 00:11:32 +00008
Tim Petersafe52972004-08-20 02:37:25 +00009QUEUE_SIZE = 5
Mark Hammond3b959db2002-04-19 00:11:32 +000010
Raymond Hettingerda3caed2008-01-14 21:39:24 +000011def qfull(q):
12 return q.maxsize > 0 and q.qsize() == q.maxsize
13
Tim Petersafe52972004-08-20 02:37:25 +000014# A thread to run a function that unclogs a blocked Queue.
Mark Hammond3b959db2002-04-19 00:11:32 +000015class _TriggerThread(threading.Thread):
16 def __init__(self, fn, args):
17 self.fn = fn
18 self.args = args
19 self.startedEvent = threading.Event()
20 threading.Thread.__init__(self)
Tim Petersafe52972004-08-20 02:37:25 +000021
Mark Hammond3b959db2002-04-19 00:11:32 +000022 def run(self):
Tim Peters8d7626c2004-08-20 03:27:12 +000023 # The sleep isn't necessary, but is intended to give the blocking
24 # function in the main thread a chance at actually blocking before
25 # we unclog it. But if the sleep is longer than the timeout-based
26 # tests wait in their blocking functions, those tests will fail.
27 # So we give them much longer timeout values compared to the
28 # sleep here (I aimed at 10 seconds for blocking functions --
29 # they should never actually wait that long - they should make
30 # progress as soon as we call self.fn()).
31 time.sleep(0.1)
Mark Hammond3b959db2002-04-19 00:11:32 +000032 self.startedEvent.set()
33 self.fn(*self.args)
34
Tim Peters8d7626c2004-08-20 03:27:12 +000035
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000036# Execute a function that blocks, and in a separate thread, a function that
37# triggers the release. Returns the result of the blocking function. Caution:
38# block_func must guarantee to block until trigger_func is called, and
39# trigger_func must guarantee to change queue state so that block_func can make
40# enough progress to return. In particular, a block_func that just raises an
41# exception regardless of whether trigger_func is called will lead to
42# timing-dependent sporadic failures, and one of those went rarely seen but
43# undiagnosed for years. Now block_func must be unexceptional. If block_func
44# is supposed to raise an exception, call do_exceptional_blocking_test()
45# instead.
46
47class BlockingTestMixin:
48
49 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
50 self.t = _TriggerThread(trigger_func, trigger_args)
51 self.t.start()
52 self.result = block_func(*block_args)
53 # If block_func returned before our thread made the call, we failed!
Benjamin Peterson672b8032008-06-11 19:14:14 +000054 if not self.t.startedEvent.is_set():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000055 self.fail("blocking function '%r' appeared not to block" %
56 block_func)
57 self.t.join(10) # make sure the thread terminates
Benjamin Peterson672b8032008-06-11 19:14:14 +000058 if self.t.is_alive():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000059 self.fail("trigger function '%r' appeared to not return" %
60 trigger_func)
61 return self.result
62
63 # Call this instead if block_func is supposed to raise an exception.
64 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
65 trigger_args, expected_exception_class):
66 self.t = _TriggerThread(trigger_func, trigger_args)
67 self.t.start()
Tim Peters8d7626c2004-08-20 03:27:12 +000068 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000069 try:
70 block_func(*block_args)
71 except expected_exception_class:
72 raise
73 else:
74 self.fail("expected exception of kind %r" %
75 expected_exception_class)
76 finally:
77 self.t.join(10) # make sure the thread terminates
Benjamin Peterson672b8032008-06-11 19:14:14 +000078 if self.t.is_alive():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000079 self.fail("trigger function '%r' appeared to not return" %
80 trigger_func)
Benjamin Peterson672b8032008-06-11 19:14:14 +000081 if not self.t.startedEvent.is_set():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000082 self.fail("trigger thread ended but event never set")
83
84
R David Murrayc6bfce92012-03-17 16:38:39 -040085class BaseQueueTestMixin(BlockingTestMixin):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000086 def setUp(self):
87 self.cum = 0
88 self.cumlock = threading.Lock()
89
90 def simple_queue_test(self, q):
91 if q.qsize():
92 raise RuntimeError("Call this function with an empty queue")
Brett Cannon671153d2010-07-23 16:56:21 +000093 self.assertTrue(q.empty())
94 self.assertFalse(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000095 # I guess we better check things actually queue correctly a little :)
96 q.put(111)
97 q.put(333)
98 q.put(222)
99 target_order = dict(Queue = [111, 333, 222],
100 LifoQueue = [222, 333, 111],
101 PriorityQueue = [111, 222, 333])
102 actual_order = [q.get(), q.get(), q.get()]
Ezio Melottib3aedd42010-11-20 19:04:17 +0000103 self.assertEqual(actual_order, target_order[q.__class__.__name__],
104 "Didn't seem to queue the correct data!")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000105 for i in range(QUEUE_SIZE-1):
106 q.put(i)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000107 self.assertTrue(q.qsize(), "Queue should not be empty")
108 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000109 last = 2 * QUEUE_SIZE
110 full = 3 * 2 * QUEUE_SIZE
111 q.put(last)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000112 self.assertTrue(qfull(q), "Queue should be full")
Brett Cannon671153d2010-07-23 16:56:21 +0000113 self.assertFalse(q.empty())
114 self.assertTrue(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000115 try:
116 q.put(full, block=0)
117 self.fail("Didn't appear to block with a full queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000118 except queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000119 pass
120 try:
121 q.put(full, timeout=0.01)
122 self.fail("Didn't appear to time-out with a full queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000123 except queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000124 pass
125 # Test a blocking put
126 self.do_blocking_test(q.put, (full,), q.get, ())
127 self.do_blocking_test(q.put, (full, True, 10), q.get, ())
128 # Empty it
129 for i in range(QUEUE_SIZE):
130 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000131 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000132 try:
133 q.get(block=0)
134 self.fail("Didn't appear to block with an empty queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000135 except queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000136 pass
137 try:
138 q.get(timeout=0.01)
139 self.fail("Didn't appear to time-out with an empty queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000140 except queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000141 pass
142 # Test a blocking get
143 self.do_blocking_test(q.get, (), q.put, ('empty',))
144 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
145
146
147 def worker(self, q):
148 while True:
149 x = q.get()
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000150 if x < 0:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000151 q.task_done()
152 return
153 with self.cumlock:
154 self.cum += x
155 q.task_done()
156
157 def queue_join_test(self, q):
158 self.cum = 0
159 for i in (0,1):
160 threading.Thread(target=self.worker, args=(q,)).start()
161 for i in range(100):
162 q.put(i)
163 q.join()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000164 self.assertEqual(self.cum, sum(range(100)),
165 "q.join() did not block until all tasks were done")
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000166 for i in (0,1):
167 q.put(-1) # instruct the threads to close
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000168 q.join() # verify that you can join twice
169
170 def test_queue_task_done(self):
171 # Test to make sure a queue task completed successfully.
172 q = self.type2test()
173 try:
174 q.task_done()
175 except ValueError:
176 pass
Tim Peters8d7626c2004-08-20 03:27:12 +0000177 else:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000178 self.fail("Did not detect task count going negative")
179
180 def test_queue_join(self):
181 # Test that a queue join()s successfully, and before anything else
182 # (done twice for insurance).
183 q = self.type2test()
184 self.queue_join_test(q)
185 self.queue_join_test(q)
186 try:
187 q.task_done()
188 except ValueError:
189 pass
190 else:
191 self.fail("Did not detect task count going negative")
192
193 def test_simple_queue(self):
194 # Do it a couple of times on the same queue.
195 # Done twice to make sure works with same instance reused.
196 q = self.type2test(QUEUE_SIZE)
197 self.simple_queue_test(q)
198 self.simple_queue_test(q)
199
Brett Cannon671153d2010-07-23 16:56:21 +0000200 def test_negative_timeout_raises_exception(self):
201 q = self.type2test(QUEUE_SIZE)
202 with self.assertRaises(ValueError):
203 q.put(1, timeout=-1)
204 with self.assertRaises(ValueError):
205 q.get(1, timeout=-1)
206
207 def test_nowait(self):
208 q = self.type2test(QUEUE_SIZE)
209 for i in range(QUEUE_SIZE):
210 q.put_nowait(1)
211 with self.assertRaises(queue.Full):
212 q.put_nowait(1)
213
214 for i in range(QUEUE_SIZE):
215 q.get_nowait()
216 with self.assertRaises(queue.Empty):
217 q.get_nowait()
218
Raymond Hettinger189316a2010-10-31 17:57:52 +0000219 def test_shrinking_queue(self):
220 # issue 10110
221 q = self.type2test(3)
222 q.put(1)
223 q.put(2)
224 q.put(3)
225 with self.assertRaises(queue.Full):
226 q.put_nowait(4)
227 self.assertEqual(q.qsize(), 3)
228 q.maxsize = 2 # shrink the queue
229 with self.assertRaises(queue.Full):
230 q.put_nowait(4)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000231
R David Murrayc6bfce92012-03-17 16:38:39 -0400232class QueueTest(BaseQueueTestMixin, unittest.TestCase):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000233 type2test = queue.Queue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000234
R David Murrayc6bfce92012-03-17 16:38:39 -0400235class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000236 type2test = queue.LifoQueue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000237
R David Murrayc6bfce92012-03-17 16:38:39 -0400238class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000239 type2test = queue.PriorityQueue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000240
241
Mark Hammond3b959db2002-04-19 00:11:32 +0000242
243# A Queue subclass that can provoke failure at a moment's notice :)
244class FailingQueueException(Exception):
245 pass
246
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000247class FailingQueue(queue.Queue):
Mark Hammond3b959db2002-04-19 00:11:32 +0000248 def __init__(self, *args):
249 self.fail_next_put = False
250 self.fail_next_get = False
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000251 queue.Queue.__init__(self, *args)
Mark Hammond3b959db2002-04-19 00:11:32 +0000252 def _put(self, item):
253 if self.fail_next_put:
254 self.fail_next_put = False
Collin Winter3add4d72007-08-29 23:37:32 +0000255 raise FailingQueueException("You Lose")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000256 return queue.Queue._put(self, item)
Mark Hammond3b959db2002-04-19 00:11:32 +0000257 def _get(self):
258 if self.fail_next_get:
259 self.fail_next_get = False
Collin Winter3add4d72007-08-29 23:37:32 +0000260 raise FailingQueueException("You Lose")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000261 return queue.Queue._get(self)
Mark Hammond3b959db2002-04-19 00:11:32 +0000262
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000263class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
Mark Hammond3b959db2002-04-19 00:11:32 +0000264
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000265 def failing_queue_test(self, q):
266 if q.qsize():
267 raise RuntimeError("Call this function with an empty queue")
268 for i in range(QUEUE_SIZE-1):
269 q.put(i)
270 # Test a failing non-blocking put.
271 q.fail_next_put = True
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000272 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000273 q.put("oops", block=0)
274 self.fail("The queue didn't fail when it should have")
275 except FailingQueueException:
276 pass
277 q.fail_next_put = True
278 try:
279 q.put("oops", timeout=0.1)
280 self.fail("The queue didn't fail when it should have")
281 except FailingQueueException:
282 pass
283 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000284 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000285 # Test a failing blocking put
286 q.fail_next_put = True
287 try:
288 self.do_blocking_test(q.put, ("full",), q.get, ())
289 self.fail("The queue didn't fail when it should have")
290 except FailingQueueException:
291 pass
292 # Check the Queue isn't damaged.
293 # put failed, but get succeeded - re-add
294 q.put("last")
295 # Test a failing timeout put
296 q.fail_next_put = True
297 try:
298 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
299 FailingQueueException)
300 self.fail("The queue didn't fail when it should have")
301 except FailingQueueException:
302 pass
303 # Check the Queue isn't damaged.
304 # put failed, but get succeeded - re-add
305 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000306 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000307 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000308 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000309 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000310 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000311 # Test a blocking put
312 self.do_blocking_test(q.put, ("full",), q.get, ())
313 # Empty it
314 for i in range(QUEUE_SIZE):
315 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000316 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000317 q.put("first")
318 q.fail_next_get = True
319 try:
320 q.get()
321 self.fail("The queue didn't fail when it should have")
322 except FailingQueueException:
323 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000324 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000325 q.fail_next_get = True
326 try:
327 q.get(timeout=0.1)
328 self.fail("The queue didn't fail when it should have")
329 except FailingQueueException:
330 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000331 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000332 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000333 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000334 q.fail_next_get = True
335 try:
336 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
337 FailingQueueException)
338 self.fail("The queue didn't fail when it should have")
339 except FailingQueueException:
340 pass
341 # put succeeded, but get failed.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000342 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000343 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000344 self.assertTrue(not q.qsize(), "Queue should be empty")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000345
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000346 def test_failing_queue(self):
347 # Test to make sure a queue is functioning correctly.
348 # Done twice to the same instance.
349 q = FailingQueue(QUEUE_SIZE)
350 self.failing_queue_test(q)
351 self.failing_queue_test(q)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000352
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000353
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000354def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000355 support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000356 FailingQueueTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000357
Christian Heimes679db4a2008-01-18 09:56:22 +0000358
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000359if __name__ == "__main__":
360 test_main()