blob: 00348a8ad445dcb42cdb619403d15b6581f6d951 [file] [log] [blame]
Mark Hammond3b959db2002-04-19 00:11:32 +00001# Some simple Queue module tests, plus some failure conditions
Tim Petersafe52972004-08-20 02:37:25 +00002# to ensure the Queue locks remain stable.
Mark Hammond3b959db2002-04-19 00:11:32 +00003import Queue
4import sys
5import threading
6import time
Georg Brandld22b4662008-02-02 11:39:29 +00007import unittest
8from test import test_support
Mark Hammond3b959db2002-04-19 00:11:32 +00009
Tim Petersafe52972004-08-20 02:37:25 +000010QUEUE_SIZE = 5
Mark Hammond3b959db2002-04-19 00:11:32 +000011
Tim Petersafe52972004-08-20 02:37:25 +000012# A thread to run a function that unclogs a blocked Queue.
Mark Hammond3b959db2002-04-19 00:11:32 +000013class _TriggerThread(threading.Thread):
14 def __init__(self, fn, args):
15 self.fn = fn
16 self.args = args
17 self.startedEvent = threading.Event()
18 threading.Thread.__init__(self)
Tim Petersafe52972004-08-20 02:37:25 +000019
Mark Hammond3b959db2002-04-19 00:11:32 +000020 def run(self):
Tim Peters8d7626c2004-08-20 03:27:12 +000021 # The sleep isn't necessary, but is intended to give the blocking
22 # function in the main thread a chance at actually blocking before
23 # we unclog it. But if the sleep is longer than the timeout-based
24 # tests wait in their blocking functions, those tests will fail.
25 # So we give them much longer timeout values compared to the
26 # sleep here (I aimed at 10 seconds for blocking functions --
27 # they should never actually wait that long - they should make
28 # progress as soon as we call self.fn()).
29 time.sleep(0.1)
Mark Hammond3b959db2002-04-19 00:11:32 +000030 self.startedEvent.set()
31 self.fn(*self.args)
32
Tim Peters8d7626c2004-08-20 03:27:12 +000033
Georg Brandld22b4662008-02-02 11:39:29 +000034# Execute a function that blocks, and in a separate thread, a function that
35# triggers the release. Returns the result of the blocking function. Caution:
36# block_func must guarantee to block until trigger_func is called, and
37# trigger_func must guarantee to change queue state so that block_func can make
38# enough progress to return. In particular, a block_func that just raises an
39# exception regardless of whether trigger_func is called will lead to
40# timing-dependent sporadic failures, and one of those went rarely seen but
41# undiagnosed for years. Now block_func must be unexceptional. If block_func
42# is supposed to raise an exception, call do_exceptional_blocking_test()
43# instead.
44
45class BlockingTestMixin:
46
47 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
48 self.t = _TriggerThread(trigger_func, trigger_args)
49 self.t.start()
50 self.result = block_func(*block_args)
51 # If block_func returned before our thread made the call, we failed!
52 if not self.t.startedEvent.isSet():
53 self.fail("blocking function '%r' appeared not to block" %
54 block_func)
55 self.t.join(10) # make sure the thread terminates
56 if self.t.isAlive():
57 self.fail("trigger function '%r' appeared to not return" %
58 trigger_func)
59 return self.result
60
61 # Call this instead if block_func is supposed to raise an exception.
62 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
63 trigger_args, expected_exception_class):
64 self.t = _TriggerThread(trigger_func, trigger_args)
65 self.t.start()
Tim Peters8d7626c2004-08-20 03:27:12 +000066 try:
Georg Brandld22b4662008-02-02 11:39:29 +000067 try:
68 block_func(*block_args)
69 except expected_exception_class:
70 raise
71 else:
72 self.fail("expected exception of kind %r" %
73 expected_exception_class)
74 finally:
75 self.t.join(10) # make sure the thread terminates
76 if self.t.isAlive():
77 self.fail("trigger function '%r' appeared to not return" %
78 trigger_func)
79 if not self.t.startedEvent.isSet():
80 self.fail("trigger thread ended but event never set")
81
82
83class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
84 def setUp(self):
85 self.cum = 0
86 self.cumlock = threading.Lock()
87
88 def simple_queue_test(self, q):
89 if not q.empty():
90 raise RuntimeError, "Call this function with an empty queue"
91 # I guess we better check things actually queue correctly a little :)
92 q.put(111)
93 q.put(333)
94 q.put(222)
95 target_order = dict(Queue = [111, 333, 222],
96 LifoQueue = [222, 333, 111],
97 PriorityQueue = [111, 222, 333])
98 actual_order = [q.get(), q.get(), q.get()]
99 self.assertEquals(actual_order, target_order[q.__class__.__name__],
100 "Didn't seem to queue the correct data!")
101 for i in range(QUEUE_SIZE-1):
102 q.put(i)
103 self.assert_(not q.empty(), "Queue should not be empty")
104 self.assert_(not q.full(), "Queue should not be full")
105 q.put("last")
106 self.assert_(q.full(), "Queue should be full")
107 try:
108 q.put("full", block=0)
109 self.fail("Didn't appear to block with a full queue")
110 except Queue.Full:
111 pass
112 try:
113 q.put("full", timeout=0.01)
114 self.fail("Didn't appear to time-out with a full queue")
115 except Queue.Full:
116 pass
117 # Test a blocking put
118 self.do_blocking_test(q.put, ("full",), q.get, ())
119 self.do_blocking_test(q.put, ("full", True, 10), q.get, ())
120 # Empty it
121 for i in range(QUEUE_SIZE):
122 q.get()
123 self.assert_(q.empty(), "Queue should be empty")
124 try:
125 q.get(block=0)
126 self.fail("Didn't appear to block with an empty queue")
127 except Queue.Empty:
128 pass
129 try:
130 q.get(timeout=0.01)
131 self.fail("Didn't appear to time-out with an empty queue")
132 except Queue.Empty:
133 pass
134 # Test a blocking get
135 self.do_blocking_test(q.get, (), q.put, ('empty',))
136 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
137
138
139 def worker(self, q):
140 while True:
Georg Brandlcafb7102008-02-02 23:59:21 +0000141 x = q.get()
142 if x is None:
Georg Brandld22b4662008-02-02 11:39:29 +0000143 q.task_done()
144 return
145 self.cumlock.acquire()
146 try:
Georg Brandlcafb7102008-02-02 23:59:21 +0000147 self.cum += x
Georg Brandld22b4662008-02-02 11:39:29 +0000148 finally:
149 self.cumlock.release()
150 q.task_done()
151
152 def queue_join_test(self, q):
153 self.cum = 0
154 for i in (0,1):
155 threading.Thread(target=self.worker, args=(q,)).start()
156 for i in xrange(100):
157 q.put(i)
158 q.join()
159 self.assertEquals(self.cum, sum(range(100)),
Georg Brandlcafb7102008-02-02 23:59:21 +0000160 "q.join() did not block until all tasks were done")
Georg Brandld22b4662008-02-02 11:39:29 +0000161 for i in (0,1):
162 q.put(None) # instruct the threads to close
163 q.join() # verify that you can join twice
164
165 def test_queue_task_done(self):
166 # Test to make sure a queue task completed successfully.
167 q = self.type2test()
168 try:
169 q.task_done()
170 except ValueError:
171 pass
Tim Peters8d7626c2004-08-20 03:27:12 +0000172 else:
Georg Brandld22b4662008-02-02 11:39:29 +0000173 self.fail("Did not detect task count going negative")
174
175 def test_queue_join(self):
176 # Test that a queue join()s successfully, and before anything else
177 # (done twice for insurance).
178 q = self.type2test()
179 self.queue_join_test(q)
180 self.queue_join_test(q)
181 try:
182 q.task_done()
183 except ValueError:
184 pass
185 else:
186 self.fail("Did not detect task count going negative")
187
188 def test_simple_queue(self):
189 # Do it a couple of times on the same queue.
190 # Done twice to make sure works with same instance reused.
191 q = self.type2test(QUEUE_SIZE)
192 self.simple_queue_test(q)
193 self.simple_queue_test(q)
194
195
196class QueueTest(BaseQueueTest):
197 type2test = Queue.Queue
198
199class LifoQueueTest(BaseQueueTest):
200 type2test = Queue.LifoQueue
201
202class PriorityQueueTest(BaseQueueTest):
203 type2test = Queue.PriorityQueue
204
205
Mark Hammond3b959db2002-04-19 00:11:32 +0000206
207# A Queue subclass that can provoke failure at a moment's notice :)
208class FailingQueueException(Exception):
209 pass
210
211class FailingQueue(Queue.Queue):
212 def __init__(self, *args):
213 self.fail_next_put = False
214 self.fail_next_get = False
215 Queue.Queue.__init__(self, *args)
216 def _put(self, item):
217 if self.fail_next_put:
218 self.fail_next_put = False
219 raise FailingQueueException, "You Lose"
220 return Queue.Queue._put(self, item)
221 def _get(self):
222 if self.fail_next_get:
223 self.fail_next_get = False
224 raise FailingQueueException, "You Lose"
225 return Queue.Queue._get(self)
226
Georg Brandld22b4662008-02-02 11:39:29 +0000227class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
Mark Hammond3b959db2002-04-19 00:11:32 +0000228
Georg Brandld22b4662008-02-02 11:39:29 +0000229 def failing_queue_test(self, q):
230 if not q.empty():
231 raise RuntimeError, "Call this function with an empty queue"
232 for i in range(QUEUE_SIZE-1):
233 q.put(i)
234 # Test a failing non-blocking put.
235 q.fail_next_put = True
Raymond Hettingerfd3fcf02006-03-24 20:43:29 +0000236 try:
Georg Brandld22b4662008-02-02 11:39:29 +0000237 q.put("oops", block=0)
238 self.fail("The queue didn't fail when it should have")
239 except FailingQueueException:
240 pass
241 q.fail_next_put = True
242 try:
243 q.put("oops", timeout=0.1)
244 self.fail("The queue didn't fail when it should have")
245 except FailingQueueException:
246 pass
247 q.put("last")
248 self.assert_(q.full(), "Queue should be full")
249 # Test a failing blocking put
250 q.fail_next_put = True
251 try:
252 self.do_blocking_test(q.put, ("full",), q.get, ())
253 self.fail("The queue didn't fail when it should have")
254 except FailingQueueException:
255 pass
256 # Check the Queue isn't damaged.
257 # put failed, but get succeeded - re-add
258 q.put("last")
259 # Test a failing timeout put
260 q.fail_next_put = True
261 try:
262 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
263 FailingQueueException)
264 self.fail("The queue didn't fail when it should have")
265 except FailingQueueException:
266 pass
267 # Check the Queue isn't damaged.
268 # put failed, but get succeeded - re-add
269 q.put("last")
270 self.assert_(q.full(), "Queue should be full")
271 q.get()
272 self.assert_(not q.full(), "Queue should not be full")
273 q.put("last")
274 self.assert_(q.full(), "Queue should be full")
275 # Test a blocking put
276 self.do_blocking_test(q.put, ("full",), q.get, ())
277 # Empty it
278 for i in range(QUEUE_SIZE):
279 q.get()
280 self.assert_(q.empty(), "Queue should be empty")
281 q.put("first")
282 q.fail_next_get = True
283 try:
284 q.get()
285 self.fail("The queue didn't fail when it should have")
286 except FailingQueueException:
287 pass
288 self.assert_(not q.empty(), "Queue should not be empty")
289 q.fail_next_get = True
290 try:
291 q.get(timeout=0.1)
292 self.fail("The queue didn't fail when it should have")
293 except FailingQueueException:
294 pass
295 self.assert_(not q.empty(), "Queue should not be empty")
296 q.get()
297 self.assert_(q.empty(), "Queue should be empty")
298 q.fail_next_get = True
299 try:
300 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
301 FailingQueueException)
302 self.fail("The queue didn't fail when it should have")
303 except FailingQueueException:
304 pass
305 # put succeeded, but get failed.
306 self.assert_(not q.empty(), "Queue should not be empty")
307 q.get()
308 self.assert_(q.empty(), "Queue should be empty")
Tim Peterse33901e2006-03-25 01:50:43 +0000309
Georg Brandld22b4662008-02-02 11:39:29 +0000310 def test_failing_queue(self):
311 # Test to make sure a queue is functioning correctly.
312 # Done twice to the same instance.
313 q = FailingQueue(QUEUE_SIZE)
314 self.failing_queue_test(q)
315 self.failing_queue_test(q)
Raymond Hettingerc4e94b92006-03-25 12:15:04 +0000316
Raymond Hettingerfd3fcf02006-03-24 20:43:29 +0000317
Georg Brandld22b4662008-02-02 11:39:29 +0000318def test_main():
319 test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
320 FailingQueueTest)
Raymond Hettingerfd3fcf02006-03-24 20:43:29 +0000321
Raymond Hettinger9e1bc982008-01-16 23:40:45 +0000322
Georg Brandld22b4662008-02-02 11:39:29 +0000323if __name__ == "__main__":
324 test_main()