blob: 22cdcad1efeab835a9396ea553221e1ceb2dd251 [file] [log] [blame]
Alexandre Vassalottif260e442008-05-11 19:59:59 +00001# Some simple queue module tests, plus some failure conditions
Tim Petersafe52972004-08-20 02:37:25 +00002# to ensure the Queue locks remain stable.
Alexandre Vassalottif260e442008-05-11 19:59:59 +00003import queue
Mark Hammond3b959db2002-04-19 00:11:32 +00004import sys
5import threading
6import time
Georg Brandl0e3b0ec2008-02-05 18:48:51 +00007import unittest
Benjamin Petersonee8712c2008-05-20 21:35:26 +00008from test import support
Mark Hammond3b959db2002-04-19 00:11:32 +00009
Tim Petersafe52972004-08-20 02:37:25 +000010QUEUE_SIZE = 5
Mark Hammond3b959db2002-04-19 00:11:32 +000011
Raymond Hettingerda3caed2008-01-14 21:39:24 +000012def qfull(q):
13 return q.maxsize > 0 and q.qsize() == q.maxsize
14
Tim Petersafe52972004-08-20 02:37:25 +000015# A thread to run a function that unclogs a blocked Queue.
Mark Hammond3b959db2002-04-19 00:11:32 +000016class _TriggerThread(threading.Thread):
17 def __init__(self, fn, args):
18 self.fn = fn
19 self.args = args
20 self.startedEvent = threading.Event()
21 threading.Thread.__init__(self)
Tim Petersafe52972004-08-20 02:37:25 +000022
Mark Hammond3b959db2002-04-19 00:11:32 +000023 def run(self):
Tim Peters8d7626c2004-08-20 03:27:12 +000024 # The sleep isn't necessary, but is intended to give the blocking
25 # function in the main thread a chance at actually blocking before
26 # we unclog it. But if the sleep is longer than the timeout-based
27 # tests wait in their blocking functions, those tests will fail.
28 # So we give them much longer timeout values compared to the
29 # sleep here (I aimed at 10 seconds for blocking functions --
30 # they should never actually wait that long - they should make
31 # progress as soon as we call self.fn()).
32 time.sleep(0.1)
Mark Hammond3b959db2002-04-19 00:11:32 +000033 self.startedEvent.set()
34 self.fn(*self.args)
35
Tim Peters8d7626c2004-08-20 03:27:12 +000036
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000037# Execute a function that blocks, and in a separate thread, a function that
38# triggers the release. Returns the result of the blocking function. Caution:
39# block_func must guarantee to block until trigger_func is called, and
40# trigger_func must guarantee to change queue state so that block_func can make
41# enough progress to return. In particular, a block_func that just raises an
42# exception regardless of whether trigger_func is called will lead to
43# timing-dependent sporadic failures, and one of those went rarely seen but
44# undiagnosed for years. Now block_func must be unexceptional. If block_func
45# is supposed to raise an exception, call do_exceptional_blocking_test()
46# instead.
47
48class BlockingTestMixin:
49
50 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
51 self.t = _TriggerThread(trigger_func, trigger_args)
52 self.t.start()
53 self.result = block_func(*block_args)
54 # If block_func returned before our thread made the call, we failed!
Benjamin Peterson672b8032008-06-11 19:14:14 +000055 if not self.t.startedEvent.is_set():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000056 self.fail("blocking function '%r' appeared not to block" %
57 block_func)
58 self.t.join(10) # make sure the thread terminates
Benjamin Peterson672b8032008-06-11 19:14:14 +000059 if self.t.is_alive():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000060 self.fail("trigger function '%r' appeared to not return" %
61 trigger_func)
62 return self.result
63
64 # Call this instead if block_func is supposed to raise an exception.
65 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
66 trigger_args, expected_exception_class):
67 self.t = _TriggerThread(trigger_func, trigger_args)
68 self.t.start()
Tim Peters8d7626c2004-08-20 03:27:12 +000069 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000070 try:
71 block_func(*block_args)
72 except expected_exception_class:
73 raise
74 else:
75 self.fail("expected exception of kind %r" %
76 expected_exception_class)
77 finally:
78 self.t.join(10) # make sure the thread terminates
Benjamin Peterson672b8032008-06-11 19:14:14 +000079 if self.t.is_alive():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000080 self.fail("trigger function '%r' appeared to not return" %
81 trigger_func)
Benjamin Peterson672b8032008-06-11 19:14:14 +000082 if not self.t.startedEvent.is_set():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000083 self.fail("trigger thread ended but event never set")
84
85
86class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
87 def setUp(self):
88 self.cum = 0
89 self.cumlock = threading.Lock()
90
91 def simple_queue_test(self, q):
92 if q.qsize():
93 raise RuntimeError("Call this function with an empty queue")
94 # I guess we better check things actually queue correctly a little :)
95 q.put(111)
96 q.put(333)
97 q.put(222)
98 target_order = dict(Queue = [111, 333, 222],
99 LifoQueue = [222, 333, 111],
100 PriorityQueue = [111, 222, 333])
101 actual_order = [q.get(), q.get(), q.get()]
102 self.assertEquals(actual_order, target_order[q.__class__.__name__],
103 "Didn't seem to queue the correct data!")
104 for i in range(QUEUE_SIZE-1):
105 q.put(i)
106 self.assert_(q.qsize(), "Queue should not be empty")
107 self.assert_(not qfull(q), "Queue should not be full")
108 last = 2 * QUEUE_SIZE
109 full = 3 * 2 * QUEUE_SIZE
110 q.put(last)
111 self.assert_(qfull(q), "Queue should be full")
112 try:
113 q.put(full, block=0)
114 self.fail("Didn't appear to block with a full queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000115 except queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000116 pass
117 try:
118 q.put(full, timeout=0.01)
119 self.fail("Didn't appear to time-out with a full queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000120 except queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000121 pass
122 # Test a blocking put
123 self.do_blocking_test(q.put, (full,), q.get, ())
124 self.do_blocking_test(q.put, (full, True, 10), q.get, ())
125 # Empty it
126 for i in range(QUEUE_SIZE):
127 q.get()
128 self.assert_(not q.qsize(), "Queue should be empty")
129 try:
130 q.get(block=0)
131 self.fail("Didn't appear to block with an empty queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000132 except queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000133 pass
134 try:
135 q.get(timeout=0.01)
136 self.fail("Didn't appear to time-out with an empty queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000137 except queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000138 pass
139 # Test a blocking get
140 self.do_blocking_test(q.get, (), q.put, ('empty',))
141 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
142
143
144 def worker(self, q):
145 while True:
146 x = q.get()
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000147 if x < 0:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000148 q.task_done()
149 return
150 with self.cumlock:
151 self.cum += x
152 q.task_done()
153
154 def queue_join_test(self, q):
155 self.cum = 0
156 for i in (0,1):
157 threading.Thread(target=self.worker, args=(q,)).start()
158 for i in range(100):
159 q.put(i)
160 q.join()
161 self.assertEquals(self.cum, sum(range(100)),
162 "q.join() did not block until all tasks were done")
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000163 for i in (0,1):
164 q.put(-1) # instruct the threads to close
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000165 q.join() # verify that you can join twice
166
167 def test_queue_task_done(self):
168 # Test to make sure a queue task completed successfully.
169 q = self.type2test()
170 try:
171 q.task_done()
172 except ValueError:
173 pass
Tim Peters8d7626c2004-08-20 03:27:12 +0000174 else:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000175 self.fail("Did not detect task count going negative")
176
177 def test_queue_join(self):
178 # Test that a queue join()s successfully, and before anything else
179 # (done twice for insurance).
180 q = self.type2test()
181 self.queue_join_test(q)
182 self.queue_join_test(q)
183 try:
184 q.task_done()
185 except ValueError:
186 pass
187 else:
188 self.fail("Did not detect task count going negative")
189
190 def test_simple_queue(self):
191 # Do it a couple of times on the same queue.
192 # Done twice to make sure works with same instance reused.
193 q = self.type2test(QUEUE_SIZE)
194 self.simple_queue_test(q)
195 self.simple_queue_test(q)
196
197
198class QueueTest(BaseQueueTest):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000199 type2test = queue.Queue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000200
201class LifoQueueTest(BaseQueueTest):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000202 type2test = queue.LifoQueue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000203
204class PriorityQueueTest(BaseQueueTest):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000205 type2test = queue.PriorityQueue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000206
207
Mark Hammond3b959db2002-04-19 00:11:32 +0000208
209# A Queue subclass that can provoke failure at a moment's notice :)
210class FailingQueueException(Exception):
211 pass
212
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000213class FailingQueue(queue.Queue):
Mark Hammond3b959db2002-04-19 00:11:32 +0000214 def __init__(self, *args):
215 self.fail_next_put = False
216 self.fail_next_get = False
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000217 queue.Queue.__init__(self, *args)
Mark Hammond3b959db2002-04-19 00:11:32 +0000218 def _put(self, item):
219 if self.fail_next_put:
220 self.fail_next_put = False
Collin Winter3add4d72007-08-29 23:37:32 +0000221 raise FailingQueueException("You Lose")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000222 return queue.Queue._put(self, item)
Mark Hammond3b959db2002-04-19 00:11:32 +0000223 def _get(self):
224 if self.fail_next_get:
225 self.fail_next_get = False
Collin Winter3add4d72007-08-29 23:37:32 +0000226 raise FailingQueueException("You Lose")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000227 return queue.Queue._get(self)
Mark Hammond3b959db2002-04-19 00:11:32 +0000228
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000229class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
Mark Hammond3b959db2002-04-19 00:11:32 +0000230
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000231 def failing_queue_test(self, q):
232 if q.qsize():
233 raise RuntimeError("Call this function with an empty queue")
234 for i in range(QUEUE_SIZE-1):
235 q.put(i)
236 # Test a failing non-blocking put.
237 q.fail_next_put = True
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000238 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000239 q.put("oops", block=0)
240 self.fail("The queue didn't fail when it should have")
241 except FailingQueueException:
242 pass
243 q.fail_next_put = True
244 try:
245 q.put("oops", timeout=0.1)
246 self.fail("The queue didn't fail when it should have")
247 except FailingQueueException:
248 pass
249 q.put("last")
250 self.assert_(qfull(q), "Queue should be full")
251 # Test a failing blocking put
252 q.fail_next_put = True
253 try:
254 self.do_blocking_test(q.put, ("full",), q.get, ())
255 self.fail("The queue didn't fail when it should have")
256 except FailingQueueException:
257 pass
258 # Check the Queue isn't damaged.
259 # put failed, but get succeeded - re-add
260 q.put("last")
261 # Test a failing timeout put
262 q.fail_next_put = True
263 try:
264 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
265 FailingQueueException)
266 self.fail("The queue didn't fail when it should have")
267 except FailingQueueException:
268 pass
269 # Check the Queue isn't damaged.
270 # put failed, but get succeeded - re-add
271 q.put("last")
272 self.assert_(qfull(q), "Queue should be full")
273 q.get()
274 self.assert_(not qfull(q), "Queue should not be full")
275 q.put("last")
276 self.assert_(qfull(q), "Queue should be full")
277 # Test a blocking put
278 self.do_blocking_test(q.put, ("full",), q.get, ())
279 # Empty it
280 for i in range(QUEUE_SIZE):
281 q.get()
282 self.assert_(not q.qsize(), "Queue should be empty")
283 q.put("first")
284 q.fail_next_get = True
285 try:
286 q.get()
287 self.fail("The queue didn't fail when it should have")
288 except FailingQueueException:
289 pass
290 self.assert_(q.qsize(), "Queue should not be empty")
291 q.fail_next_get = True
292 try:
293 q.get(timeout=0.1)
294 self.fail("The queue didn't fail when it should have")
295 except FailingQueueException:
296 pass
297 self.assert_(q.qsize(), "Queue should not be empty")
298 q.get()
299 self.assert_(not q.qsize(), "Queue should be empty")
300 q.fail_next_get = True
301 try:
302 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
303 FailingQueueException)
304 self.fail("The queue didn't fail when it should have")
305 except FailingQueueException:
306 pass
307 # put succeeded, but get failed.
308 self.assert_(q.qsize(), "Queue should not be empty")
309 q.get()
310 self.assert_(not q.qsize(), "Queue should be empty")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000311
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000312 def test_failing_queue(self):
313 # Test to make sure a queue is functioning correctly.
314 # Done twice to the same instance.
315 q = FailingQueue(QUEUE_SIZE)
316 self.failing_queue_test(q)
317 self.failing_queue_test(q)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000318
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000319
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000320def test_main():
Benjamin Petersonee8712c2008-05-20 21:35:26 +0000321 support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000322 FailingQueueTest)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000323
Christian Heimes679db4a2008-01-18 09:56:22 +0000324
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000325if __name__ == "__main__":
326 test_main()