blob: c8528f97741dccb3e29e4554796775832eeb46f7 [file] [log] [blame]
Alexandre Vassalottif260e442008-05-11 19:59:59 +00001# Some simple queue module tests, plus some failure conditions
Tim Petersafe52972004-08-20 02:37:25 +00002# to ensure the Queue locks remain stable.
Antoine Pitrou94e16962018-01-16 00:27:16 +01003import itertools
Alexandre Vassalottif260e442008-05-11 19:59:59 +00004import queue
Antoine Pitrou94e16962018-01-16 00:27:16 +01005import random
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02006import threading
Mark Hammond3b959db2002-04-19 00:11:32 +00007import time
Georg Brandl0e3b0ec2008-02-05 18:48:51 +00008import unittest
Antoine Pitrou94e16962018-01-16 00:27:16 +01009import weakref
Benjamin Petersonee8712c2008-05-20 21:35:26 +000010from test import support
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020011
Mark Hammond3b959db2002-04-19 00:11:32 +000012
Antoine Pitrou94e16962018-01-16 00:27:16 +010013try:
14 import _queue
15except ImportError:
16 _queue = None
17
Tim Petersafe52972004-08-20 02:37:25 +000018QUEUE_SIZE = 5
Mark Hammond3b959db2002-04-19 00:11:32 +000019
Raymond Hettingerda3caed2008-01-14 21:39:24 +000020def qfull(q):
21 return q.maxsize > 0 and q.qsize() == q.maxsize
22
Tim Petersafe52972004-08-20 02:37:25 +000023# A thread to run a function that unclogs a blocked Queue.
Mark Hammond3b959db2002-04-19 00:11:32 +000024class _TriggerThread(threading.Thread):
25 def __init__(self, fn, args):
26 self.fn = fn
27 self.args = args
28 self.startedEvent = threading.Event()
29 threading.Thread.__init__(self)
Tim Petersafe52972004-08-20 02:37:25 +000030
Mark Hammond3b959db2002-04-19 00:11:32 +000031 def run(self):
Tim Peters8d7626c2004-08-20 03:27:12 +000032 # The sleep isn't necessary, but is intended to give the blocking
33 # function in the main thread a chance at actually blocking before
34 # we unclog it. But if the sleep is longer than the timeout-based
35 # tests wait in their blocking functions, those tests will fail.
36 # So we give them much longer timeout values compared to the
37 # sleep here (I aimed at 10 seconds for blocking functions --
38 # they should never actually wait that long - they should make
39 # progress as soon as we call self.fn()).
40 time.sleep(0.1)
Mark Hammond3b959db2002-04-19 00:11:32 +000041 self.startedEvent.set()
42 self.fn(*self.args)
43
Tim Peters8d7626c2004-08-20 03:27:12 +000044
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000045# Execute a function that blocks, and in a separate thread, a function that
46# triggers the release. Returns the result of the blocking function. Caution:
47# block_func must guarantee to block until trigger_func is called, and
48# trigger_func must guarantee to change queue state so that block_func can make
49# enough progress to return. In particular, a block_func that just raises an
50# exception regardless of whether trigger_func is called will lead to
51# timing-dependent sporadic failures, and one of those went rarely seen but
52# undiagnosed for years. Now block_func must be unexceptional. If block_func
53# is supposed to raise an exception, call do_exceptional_blocking_test()
54# instead.
55
56class BlockingTestMixin:
57
58 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
Victor Stinner167cbde2017-09-14 14:04:56 -070059 thread = _TriggerThread(trigger_func, trigger_args)
60 thread.start()
61 try:
62 self.result = block_func(*block_args)
63 # If block_func returned before our thread made the call, we failed!
64 if not thread.startedEvent.is_set():
Serhiy Storchakaa4a30202017-11-28 22:54:42 +020065 self.fail("blocking function %r appeared not to block" %
Victor Stinner167cbde2017-09-14 14:04:56 -070066 block_func)
67 return self.result
68 finally:
Victor Stinnerb9b69002017-09-14 14:40:56 -070069 support.join_thread(thread, 10) # make sure the thread terminates
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000070
71 # Call this instead if block_func is supposed to raise an exception.
72 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
73 trigger_args, expected_exception_class):
Victor Stinner167cbde2017-09-14 14:04:56 -070074 thread = _TriggerThread(trigger_func, trigger_args)
75 thread.start()
Tim Peters8d7626c2004-08-20 03:27:12 +000076 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000077 try:
78 block_func(*block_args)
79 except expected_exception_class:
80 raise
81 else:
82 self.fail("expected exception of kind %r" %
83 expected_exception_class)
84 finally:
Victor Stinnerb9b69002017-09-14 14:40:56 -070085 support.join_thread(thread, 10) # make sure the thread terminates
Victor Stinner167cbde2017-09-14 14:04:56 -070086 if not thread.startedEvent.is_set():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000087 self.fail("trigger thread ended but event never set")
88
89
R David Murrayc6bfce92012-03-17 16:38:39 -040090class BaseQueueTestMixin(BlockingTestMixin):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000091 def setUp(self):
92 self.cum = 0
93 self.cumlock = threading.Lock()
94
Antoine Pitrou94e16962018-01-16 00:27:16 +010095 def basic_queue_test(self, q):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000096 if q.qsize():
97 raise RuntimeError("Call this function with an empty queue")
Brett Cannon671153d2010-07-23 16:56:21 +000098 self.assertTrue(q.empty())
99 self.assertFalse(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000100 # I guess we better check things actually queue correctly a little :)
101 q.put(111)
102 q.put(333)
103 q.put(222)
104 target_order = dict(Queue = [111, 333, 222],
105 LifoQueue = [222, 333, 111],
106 PriorityQueue = [111, 222, 333])
107 actual_order = [q.get(), q.get(), q.get()]
Ezio Melottib3aedd42010-11-20 19:04:17 +0000108 self.assertEqual(actual_order, target_order[q.__class__.__name__],
109 "Didn't seem to queue the correct data!")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000110 for i in range(QUEUE_SIZE-1):
111 q.put(i)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000112 self.assertTrue(q.qsize(), "Queue should not be empty")
113 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000114 last = 2 * QUEUE_SIZE
115 full = 3 * 2 * QUEUE_SIZE
116 q.put(last)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000117 self.assertTrue(qfull(q), "Queue should be full")
Brett Cannon671153d2010-07-23 16:56:21 +0000118 self.assertFalse(q.empty())
119 self.assertTrue(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000120 try:
121 q.put(full, block=0)
122 self.fail("Didn't appear to block with a full queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000123 except queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000124 pass
125 try:
126 q.put(full, timeout=0.01)
127 self.fail("Didn't appear to time-out with a full queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000128 except queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000129 pass
130 # Test a blocking put
131 self.do_blocking_test(q.put, (full,), q.get, ())
132 self.do_blocking_test(q.put, (full, True, 10), q.get, ())
133 # Empty it
134 for i in range(QUEUE_SIZE):
135 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000136 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000137 try:
138 q.get(block=0)
139 self.fail("Didn't appear to block with an empty queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000140 except queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000141 pass
142 try:
143 q.get(timeout=0.01)
144 self.fail("Didn't appear to time-out with an empty queue")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000145 except queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000146 pass
147 # Test a blocking get
148 self.do_blocking_test(q.get, (), q.put, ('empty',))
149 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
150
151
152 def worker(self, q):
153 while True:
154 x = q.get()
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000155 if x < 0:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000156 q.task_done()
157 return
158 with self.cumlock:
159 self.cum += x
160 q.task_done()
161
162 def queue_join_test(self, q):
163 self.cum = 0
Victor Stinner167cbde2017-09-14 14:04:56 -0700164 threads = []
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000165 for i in (0,1):
Victor Stinner167cbde2017-09-14 14:04:56 -0700166 thread = threading.Thread(target=self.worker, args=(q,))
167 thread.start()
168 threads.append(thread)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000169 for i in range(100):
170 q.put(i)
171 q.join()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000172 self.assertEqual(self.cum, sum(range(100)),
173 "q.join() did not block until all tasks were done")
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000174 for i in (0,1):
175 q.put(-1) # instruct the threads to close
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000176 q.join() # verify that you can join twice
Victor Stinner167cbde2017-09-14 14:04:56 -0700177 for thread in threads:
178 thread.join()
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000179
180 def test_queue_task_done(self):
181 # Test to make sure a queue task completed successfully.
182 q = self.type2test()
183 try:
184 q.task_done()
185 except ValueError:
186 pass
Tim Peters8d7626c2004-08-20 03:27:12 +0000187 else:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000188 self.fail("Did not detect task count going negative")
189
190 def test_queue_join(self):
191 # Test that a queue join()s successfully, and before anything else
192 # (done twice for insurance).
193 q = self.type2test()
194 self.queue_join_test(q)
195 self.queue_join_test(q)
196 try:
197 q.task_done()
198 except ValueError:
199 pass
200 else:
201 self.fail("Did not detect task count going negative")
202
Antoine Pitrou94e16962018-01-16 00:27:16 +0100203 def test_basic(self):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000204 # Do it a couple of times on the same queue.
205 # Done twice to make sure works with same instance reused.
206 q = self.type2test(QUEUE_SIZE)
Antoine Pitrou94e16962018-01-16 00:27:16 +0100207 self.basic_queue_test(q)
208 self.basic_queue_test(q)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000209
Brett Cannon671153d2010-07-23 16:56:21 +0000210 def test_negative_timeout_raises_exception(self):
211 q = self.type2test(QUEUE_SIZE)
212 with self.assertRaises(ValueError):
213 q.put(1, timeout=-1)
214 with self.assertRaises(ValueError):
215 q.get(1, timeout=-1)
216
217 def test_nowait(self):
218 q = self.type2test(QUEUE_SIZE)
219 for i in range(QUEUE_SIZE):
220 q.put_nowait(1)
221 with self.assertRaises(queue.Full):
222 q.put_nowait(1)
223
224 for i in range(QUEUE_SIZE):
225 q.get_nowait()
226 with self.assertRaises(queue.Empty):
227 q.get_nowait()
228
Raymond Hettinger189316a2010-10-31 17:57:52 +0000229 def test_shrinking_queue(self):
230 # issue 10110
231 q = self.type2test(3)
232 q.put(1)
233 q.put(2)
234 q.put(3)
235 with self.assertRaises(queue.Full):
236 q.put_nowait(4)
237 self.assertEqual(q.qsize(), 3)
238 q.maxsize = 2 # shrink the queue
239 with self.assertRaises(queue.Full):
240 q.put_nowait(4)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000241
R David Murrayc6bfce92012-03-17 16:38:39 -0400242class QueueTest(BaseQueueTestMixin, unittest.TestCase):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000243 type2test = queue.Queue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000244
R David Murrayc6bfce92012-03-17 16:38:39 -0400245class LifoQueueTest(BaseQueueTestMixin, unittest.TestCase):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000246 type2test = queue.LifoQueue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000247
R David Murrayc6bfce92012-03-17 16:38:39 -0400248class PriorityQueueTest(BaseQueueTestMixin, unittest.TestCase):
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000249 type2test = queue.PriorityQueue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000250
251
Mark Hammond3b959db2002-04-19 00:11:32 +0000252
253# A Queue subclass that can provoke failure at a moment's notice :)
254class FailingQueueException(Exception):
255 pass
256
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000257class FailingQueue(queue.Queue):
Mark Hammond3b959db2002-04-19 00:11:32 +0000258 def __init__(self, *args):
259 self.fail_next_put = False
260 self.fail_next_get = False
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000261 queue.Queue.__init__(self, *args)
Mark Hammond3b959db2002-04-19 00:11:32 +0000262 def _put(self, item):
263 if self.fail_next_put:
264 self.fail_next_put = False
Collin Winter3add4d72007-08-29 23:37:32 +0000265 raise FailingQueueException("You Lose")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000266 return queue.Queue._put(self, item)
Mark Hammond3b959db2002-04-19 00:11:32 +0000267 def _get(self):
268 if self.fail_next_get:
269 self.fail_next_get = False
Collin Winter3add4d72007-08-29 23:37:32 +0000270 raise FailingQueueException("You Lose")
Alexandre Vassalottif260e442008-05-11 19:59:59 +0000271 return queue.Queue._get(self)
Mark Hammond3b959db2002-04-19 00:11:32 +0000272
Ezio Melotti656c8082013-03-23 23:35:06 +0200273class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
Mark Hammond3b959db2002-04-19 00:11:32 +0000274
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000275 def failing_queue_test(self, q):
276 if q.qsize():
277 raise RuntimeError("Call this function with an empty queue")
278 for i in range(QUEUE_SIZE-1):
279 q.put(i)
280 # Test a failing non-blocking put.
281 q.fail_next_put = True
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000282 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000283 q.put("oops", block=0)
284 self.fail("The queue didn't fail when it should have")
285 except FailingQueueException:
286 pass
287 q.fail_next_put = True
288 try:
289 q.put("oops", timeout=0.1)
290 self.fail("The queue didn't fail when it should have")
291 except FailingQueueException:
292 pass
293 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000294 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000295 # Test a failing blocking put
296 q.fail_next_put = True
297 try:
298 self.do_blocking_test(q.put, ("full",), q.get, ())
299 self.fail("The queue didn't fail when it should have")
300 except FailingQueueException:
301 pass
302 # Check the Queue isn't damaged.
303 # put failed, but get succeeded - re-add
304 q.put("last")
305 # Test a failing timeout put
306 q.fail_next_put = True
307 try:
308 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
309 FailingQueueException)
310 self.fail("The queue didn't fail when it should have")
311 except FailingQueueException:
312 pass
313 # Check the Queue isn't damaged.
314 # put failed, but get succeeded - re-add
315 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000316 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000317 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000318 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000319 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000320 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000321 # Test a blocking put
322 self.do_blocking_test(q.put, ("full",), q.get, ())
323 # Empty it
324 for i in range(QUEUE_SIZE):
325 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000326 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000327 q.put("first")
328 q.fail_next_get = True
329 try:
330 q.get()
331 self.fail("The queue didn't fail when it should have")
332 except FailingQueueException:
333 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000334 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000335 q.fail_next_get = True
336 try:
337 q.get(timeout=0.1)
338 self.fail("The queue didn't fail when it should have")
339 except FailingQueueException:
340 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000341 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000342 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000343 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000344 q.fail_next_get = True
345 try:
346 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
347 FailingQueueException)
348 self.fail("The queue didn't fail when it should have")
349 except FailingQueueException:
350 pass
351 # put succeeded, but get failed.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000352 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000353 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000354 self.assertTrue(not q.qsize(), "Queue should be empty")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000355
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000356 def test_failing_queue(self):
357 # Test to make sure a queue is functioning correctly.
358 # Done twice to the same instance.
359 q = FailingQueue(QUEUE_SIZE)
360 self.failing_queue_test(q)
361 self.failing_queue_test(q)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000362
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000363
Antoine Pitrou94e16962018-01-16 00:27:16 +0100364class BaseSimpleQueueTest:
365
366 def setUp(self):
367 self.q = self.type2test()
368
369 def feed(self, q, seq, rnd):
370 while True:
371 try:
372 val = seq.pop()
373 except IndexError:
374 return
375 q.put(val)
376 if rnd.random() > 0.5:
377 time.sleep(rnd.random() * 1e-3)
378
379 def consume(self, q, results, sentinel):
380 while True:
381 val = q.get()
382 if val == sentinel:
383 return
384 results.append(val)
385
386 def consume_nonblock(self, q, results, sentinel):
387 while True:
388 while True:
389 try:
390 val = q.get(block=False)
391 except queue.Empty:
392 time.sleep(1e-5)
393 else:
394 break
395 if val == sentinel:
396 return
397 results.append(val)
398
399 def consume_timeout(self, q, results, sentinel):
400 while True:
401 while True:
402 try:
403 val = q.get(timeout=1e-5)
404 except queue.Empty:
405 pass
406 else:
407 break
408 if val == sentinel:
409 return
410 results.append(val)
411
412 def run_threads(self, n_feeders, n_consumers, q, inputs,
413 feed_func, consume_func):
414 results = []
415 sentinel = None
416 seq = inputs + [sentinel] * n_consumers
417 seq.reverse()
418 rnd = random.Random(42)
419
420 exceptions = []
421 def log_exceptions(f):
422 def wrapper(*args, **kwargs):
423 try:
424 f(*args, **kwargs)
425 except BaseException as e:
426 exceptions.append(e)
427 return wrapper
428
429 feeders = [threading.Thread(target=log_exceptions(feed_func),
430 args=(q, seq, rnd))
431 for i in range(n_feeders)]
432 consumers = [threading.Thread(target=log_exceptions(consume_func),
433 args=(q, results, sentinel))
434 for i in range(n_consumers)]
435
436 with support.start_threads(feeders + consumers):
437 pass
438
439 self.assertFalse(exceptions)
440 self.assertTrue(q.empty())
441 self.assertEqual(q.qsize(), 0)
442
443 return results
444
445 def test_basic(self):
446 # Basic tests for get(), put() etc.
447 q = self.q
448 self.assertTrue(q.empty())
449 self.assertEqual(q.qsize(), 0)
450 q.put(1)
451 self.assertFalse(q.empty())
452 self.assertEqual(q.qsize(), 1)
453 q.put(2)
454 q.put_nowait(3)
455 q.put(4)
456 self.assertFalse(q.empty())
457 self.assertEqual(q.qsize(), 4)
458
459 self.assertEqual(q.get(), 1)
460 self.assertEqual(q.qsize(), 3)
461
462 self.assertEqual(q.get_nowait(), 2)
463 self.assertEqual(q.qsize(), 2)
464
465 self.assertEqual(q.get(block=False), 3)
466 self.assertFalse(q.empty())
467 self.assertEqual(q.qsize(), 1)
468
469 self.assertEqual(q.get(timeout=0.1), 4)
470 self.assertTrue(q.empty())
471 self.assertEqual(q.qsize(), 0)
472
473 with self.assertRaises(queue.Empty):
474 q.get(block=False)
475 with self.assertRaises(queue.Empty):
476 q.get(timeout=1e-3)
477 with self.assertRaises(queue.Empty):
478 q.get_nowait()
479 self.assertTrue(q.empty())
480 self.assertEqual(q.qsize(), 0)
481
482 def test_negative_timeout_raises_exception(self):
483 q = self.q
484 q.put(1)
485 with self.assertRaises(ValueError):
486 q.get(timeout=-1)
487
488 def test_order(self):
489 # Test a pair of concurrent put() and get()
490 q = self.q
491 inputs = list(range(100))
492 results = self.run_threads(1, 1, q, inputs, self.feed, self.consume)
493
494 # One producer, one consumer => results appended in well-defined order
495 self.assertEqual(results, inputs)
496
497 def test_many_threads(self):
498 # Test multiple concurrent put() and get()
499 N = 50
500 q = self.q
501 inputs = list(range(10000))
502 results = self.run_threads(N, N, q, inputs, self.feed, self.consume)
503
504 # Multiple consumers without synchronization append the
505 # results in random order
506 self.assertEqual(sorted(results), inputs)
507
508 def test_many_threads_nonblock(self):
509 # Test multiple concurrent put() and get(block=False)
510 N = 50
511 q = self.q
512 inputs = list(range(10000))
513 results = self.run_threads(N, N, q, inputs,
514 self.feed, self.consume_nonblock)
515
516 self.assertEqual(sorted(results), inputs)
517
518 def test_many_threads_timeout(self):
519 # Test multiple concurrent put() and get(timeout=...)
520 N = 50
521 q = self.q
522 inputs = list(range(1000))
523 results = self.run_threads(N, N, q, inputs,
524 self.feed, self.consume_timeout)
525
526 self.assertEqual(sorted(results), inputs)
527
528 def test_references(self):
529 # The queue should lose references to each item as soon as
530 # it leaves the queue.
531 class C:
532 pass
533
534 N = 20
535 q = self.q
536 for i in range(N):
537 q.put(C())
538 for i in range(N):
539 wr = weakref.ref(q.get())
540 self.assertIsNone(wr())
541
542
543class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
544 type2test = queue._PySimpleQueue
545
546
547@unittest.skipIf(_queue is None, "No _queue module found")
548class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
549
550 def setUp(self):
551 self.type2test = _queue.SimpleQueue
552 super().setUp()
553
554 def test_is_default(self):
555 self.assertIs(self.type2test, queue.SimpleQueue)
556
557 def test_reentrancy(self):
558 # bpo-14976: put() may be called reentrantly in an asynchronous
559 # callback.
560 q = self.q
561 gen = itertools.count()
562 N = 10000
563 results = []
564
565 # This test exploits the fact that __del__ in a reference cycle
566 # can be called any time the GC may run.
567
568 class Circular(object):
569 def __init__(self):
570 self.circular = self
571
572 def __del__(self):
573 q.put(next(gen))
574
575 while True:
576 o = Circular()
577 q.put(next(gen))
578 del o
579 results.append(q.get())
580 if results[-1] >= N:
581 break
582
583 self.assertEqual(results, list(range(N + 1)))
584
585
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000586if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -0500587 unittest.main()