blob: 7b23699a00f1d0d771a1476e6b2e548011ed5521 [file] [log] [blame]
Alexandre Vassalottif260e442008-05-11 19:59:59 +00001# Some simple queue module tests, plus some failure conditions
Tim Petersafe52972004-08-20 02:37:25 +00002# to ensure the Queue locks remain stable.
Antoine Pitrou94e16962018-01-16 00:27:16 +01003import itertools
Antoine Pitrou94e16962018-01-16 00:27:16 +01004import random
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02005import threading
Mark Hammond3b959db2002-04-19 00:11:32 +00006import time
Georg Brandl0e3b0ec2008-02-05 18:48:51 +00007import unittest
Antoine Pitrou94e16962018-01-16 00:27:16 +01008import weakref
Benjamin Petersonee8712c2008-05-20 21:35:26 +00009from test import support
Hai Shie80697d2020-05-28 06:10:27 +080010from test.support import threading_helper
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020011
Pablo Galindo3f5b9082019-06-25 02:53:30 +010012py_queue = support.import_fresh_module('queue', blocked=['_queue'])
13c_queue = support.import_fresh_module('queue', fresh=['_queue'])
14need_c_queue = unittest.skipUnless(c_queue, "No _queue module found")
Antoine Pitrou94e16962018-01-16 00:27:16 +010015
Tim Petersafe52972004-08-20 02:37:25 +000016QUEUE_SIZE = 5
Mark Hammond3b959db2002-04-19 00:11:32 +000017
Raymond Hettingerda3caed2008-01-14 21:39:24 +000018def qfull(q):
19 return q.maxsize > 0 and q.qsize() == q.maxsize
20
Tim Petersafe52972004-08-20 02:37:25 +000021# A thread to run a function that unclogs a blocked Queue.
Mark Hammond3b959db2002-04-19 00:11:32 +000022class _TriggerThread(threading.Thread):
23 def __init__(self, fn, args):
24 self.fn = fn
25 self.args = args
26 self.startedEvent = threading.Event()
27 threading.Thread.__init__(self)
Tim Petersafe52972004-08-20 02:37:25 +000028
Mark Hammond3b959db2002-04-19 00:11:32 +000029 def run(self):
Tim Peters8d7626c2004-08-20 03:27:12 +000030 # The sleep isn't necessary, but is intended to give the blocking
31 # function in the main thread a chance at actually blocking before
32 # we unclog it. But if the sleep is longer than the timeout-based
33 # tests wait in their blocking functions, those tests will fail.
34 # So we give them much longer timeout values compared to the
35 # sleep here (I aimed at 10 seconds for blocking functions --
36 # they should never actually wait that long - they should make
37 # progress as soon as we call self.fn()).
38 time.sleep(0.1)
Mark Hammond3b959db2002-04-19 00:11:32 +000039 self.startedEvent.set()
40 self.fn(*self.args)
41
Tim Peters8d7626c2004-08-20 03:27:12 +000042
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000043# Execute a function that blocks, and in a separate thread, a function that
44# triggers the release. Returns the result of the blocking function. Caution:
45# block_func must guarantee to block until trigger_func is called, and
46# trigger_func must guarantee to change queue state so that block_func can make
47# enough progress to return. In particular, a block_func that just raises an
48# exception regardless of whether trigger_func is called will lead to
49# timing-dependent sporadic failures, and one of those went rarely seen but
50# undiagnosed for years. Now block_func must be unexceptional. If block_func
51# is supposed to raise an exception, call do_exceptional_blocking_test()
52# instead.
53
54class BlockingTestMixin:
55
56 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
Victor Stinner167cbde2017-09-14 14:04:56 -070057 thread = _TriggerThread(trigger_func, trigger_args)
58 thread.start()
59 try:
60 self.result = block_func(*block_args)
61 # If block_func returned before our thread made the call, we failed!
62 if not thread.startedEvent.is_set():
Serhiy Storchakaa4a30202017-11-28 22:54:42 +020063 self.fail("blocking function %r appeared not to block" %
Victor Stinner167cbde2017-09-14 14:04:56 -070064 block_func)
65 return self.result
66 finally:
Hai Shie80697d2020-05-28 06:10:27 +080067 threading_helper.join_thread(thread) # make sure the thread terminates
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000068
69 # Call this instead if block_func is supposed to raise an exception.
70 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
71 trigger_args, expected_exception_class):
Victor Stinner167cbde2017-09-14 14:04:56 -070072 thread = _TriggerThread(trigger_func, trigger_args)
73 thread.start()
Tim Peters8d7626c2004-08-20 03:27:12 +000074 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000075 try:
76 block_func(*block_args)
77 except expected_exception_class:
78 raise
79 else:
80 self.fail("expected exception of kind %r" %
81 expected_exception_class)
82 finally:
Hai Shie80697d2020-05-28 06:10:27 +080083 threading_helper.join_thread(thread) # make sure the thread terminates
Victor Stinner167cbde2017-09-14 14:04:56 -070084 if not thread.startedEvent.is_set():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000085 self.fail("trigger thread ended but event never set")
86
87
R David Murrayc6bfce92012-03-17 16:38:39 -040088class BaseQueueTestMixin(BlockingTestMixin):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000089 def setUp(self):
90 self.cum = 0
91 self.cumlock = threading.Lock()
92
Antoine Pitrou94e16962018-01-16 00:27:16 +010093 def basic_queue_test(self, q):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000094 if q.qsize():
95 raise RuntimeError("Call this function with an empty queue")
Brett Cannon671153d2010-07-23 16:56:21 +000096 self.assertTrue(q.empty())
97 self.assertFalse(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000098 # I guess we better check things actually queue correctly a little :)
99 q.put(111)
100 q.put(333)
101 q.put(222)
102 target_order = dict(Queue = [111, 333, 222],
103 LifoQueue = [222, 333, 111],
104 PriorityQueue = [111, 222, 333])
105 actual_order = [q.get(), q.get(), q.get()]
Ezio Melottib3aedd42010-11-20 19:04:17 +0000106 self.assertEqual(actual_order, target_order[q.__class__.__name__],
107 "Didn't seem to queue the correct data!")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000108 for i in range(QUEUE_SIZE-1):
109 q.put(i)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000110 self.assertTrue(q.qsize(), "Queue should not be empty")
111 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000112 last = 2 * QUEUE_SIZE
113 full = 3 * 2 * QUEUE_SIZE
114 q.put(last)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000115 self.assertTrue(qfull(q), "Queue should be full")
Brett Cannon671153d2010-07-23 16:56:21 +0000116 self.assertFalse(q.empty())
117 self.assertTrue(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000118 try:
119 q.put(full, block=0)
120 self.fail("Didn't appear to block with a full queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100121 except self.queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000122 pass
123 try:
124 q.put(full, timeout=0.01)
125 self.fail("Didn't appear to time-out with a full queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100126 except self.queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000127 pass
128 # Test a blocking put
129 self.do_blocking_test(q.put, (full,), q.get, ())
130 self.do_blocking_test(q.put, (full, True, 10), q.get, ())
131 # Empty it
132 for i in range(QUEUE_SIZE):
133 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000134 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000135 try:
136 q.get(block=0)
137 self.fail("Didn't appear to block with an empty queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100138 except self.queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000139 pass
140 try:
141 q.get(timeout=0.01)
142 self.fail("Didn't appear to time-out with an empty queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100143 except self.queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000144 pass
145 # Test a blocking get
146 self.do_blocking_test(q.get, (), q.put, ('empty',))
147 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
148
149
150 def worker(self, q):
151 while True:
152 x = q.get()
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000153 if x < 0:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000154 q.task_done()
155 return
156 with self.cumlock:
157 self.cum += x
158 q.task_done()
159
160 def queue_join_test(self, q):
161 self.cum = 0
Victor Stinner167cbde2017-09-14 14:04:56 -0700162 threads = []
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000163 for i in (0,1):
Victor Stinner167cbde2017-09-14 14:04:56 -0700164 thread = threading.Thread(target=self.worker, args=(q,))
165 thread.start()
166 threads.append(thread)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000167 for i in range(100):
168 q.put(i)
169 q.join()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000170 self.assertEqual(self.cum, sum(range(100)),
171 "q.join() did not block until all tasks were done")
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000172 for i in (0,1):
173 q.put(-1) # instruct the threads to close
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000174 q.join() # verify that you can join twice
Victor Stinner167cbde2017-09-14 14:04:56 -0700175 for thread in threads:
176 thread.join()
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000177
178 def test_queue_task_done(self):
179 # Test to make sure a queue task completed successfully.
180 q = self.type2test()
181 try:
182 q.task_done()
183 except ValueError:
184 pass
Tim Peters8d7626c2004-08-20 03:27:12 +0000185 else:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000186 self.fail("Did not detect task count going negative")
187
188 def test_queue_join(self):
189 # Test that a queue join()s successfully, and before anything else
190 # (done twice for insurance).
191 q = self.type2test()
192 self.queue_join_test(q)
193 self.queue_join_test(q)
194 try:
195 q.task_done()
196 except ValueError:
197 pass
198 else:
199 self.fail("Did not detect task count going negative")
200
Antoine Pitrou94e16962018-01-16 00:27:16 +0100201 def test_basic(self):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000202 # Do it a couple of times on the same queue.
203 # Done twice to make sure works with same instance reused.
204 q = self.type2test(QUEUE_SIZE)
Antoine Pitrou94e16962018-01-16 00:27:16 +0100205 self.basic_queue_test(q)
206 self.basic_queue_test(q)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000207
Brett Cannon671153d2010-07-23 16:56:21 +0000208 def test_negative_timeout_raises_exception(self):
209 q = self.type2test(QUEUE_SIZE)
210 with self.assertRaises(ValueError):
211 q.put(1, timeout=-1)
212 with self.assertRaises(ValueError):
213 q.get(1, timeout=-1)
214
215 def test_nowait(self):
216 q = self.type2test(QUEUE_SIZE)
217 for i in range(QUEUE_SIZE):
218 q.put_nowait(1)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100219 with self.assertRaises(self.queue.Full):
Brett Cannon671153d2010-07-23 16:56:21 +0000220 q.put_nowait(1)
221
222 for i in range(QUEUE_SIZE):
223 q.get_nowait()
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100224 with self.assertRaises(self.queue.Empty):
Brett Cannon671153d2010-07-23 16:56:21 +0000225 q.get_nowait()
226
Raymond Hettinger189316a2010-10-31 17:57:52 +0000227 def test_shrinking_queue(self):
228 # issue 10110
229 q = self.type2test(3)
230 q.put(1)
231 q.put(2)
232 q.put(3)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100233 with self.assertRaises(self.queue.Full):
Raymond Hettinger189316a2010-10-31 17:57:52 +0000234 q.put_nowait(4)
235 self.assertEqual(q.qsize(), 3)
236 q.maxsize = 2 # shrink the queue
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100237 with self.assertRaises(self.queue.Full):
Raymond Hettinger189316a2010-10-31 17:57:52 +0000238 q.put_nowait(4)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000239
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100240class QueueTest(BaseQueueTestMixin):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000241
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100242 def setUp(self):
243 self.type2test = self.queue.Queue
244 super().setUp()
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000245
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100246class PyQueueTest(QueueTest, unittest.TestCase):
247 queue = py_queue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000248
249
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100250@need_c_queue
251class CQueueTest(QueueTest, unittest.TestCase):
252 queue = c_queue
253
254
255class LifoQueueTest(BaseQueueTestMixin):
256
257 def setUp(self):
258 self.type2test = self.queue.LifoQueue
259 super().setUp()
260
261
262class PyLifoQueueTest(LifoQueueTest, unittest.TestCase):
263 queue = py_queue
264
265
266@need_c_queue
267class CLifoQueueTest(LifoQueueTest, unittest.TestCase):
268 queue = c_queue
269
270
271class PriorityQueueTest(BaseQueueTestMixin):
272
273 def setUp(self):
274 self.type2test = self.queue.PriorityQueue
275 super().setUp()
276
277
278class PyPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
279 queue = py_queue
280
281
282@need_c_queue
283class CPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
284 queue = c_queue
285
Mark Hammond3b959db2002-04-19 00:11:32 +0000286
287# A Queue subclass that can provoke failure at a moment's notice :)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100288class FailingQueueException(Exception): pass
Mark Hammond3b959db2002-04-19 00:11:32 +0000289
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100290class FailingQueueTest(BlockingTestMixin):
Mark Hammond3b959db2002-04-19 00:11:32 +0000291
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100292 def setUp(self):
293
294 Queue = self.queue.Queue
295
296 class FailingQueue(Queue):
297 def __init__(self, *args):
298 self.fail_next_put = False
299 self.fail_next_get = False
300 Queue.__init__(self, *args)
301 def _put(self, item):
302 if self.fail_next_put:
303 self.fail_next_put = False
304 raise FailingQueueException("You Lose")
305 return Queue._put(self, item)
306 def _get(self):
307 if self.fail_next_get:
308 self.fail_next_get = False
309 raise FailingQueueException("You Lose")
310 return Queue._get(self)
311
312 self.FailingQueue = FailingQueue
313
314 super().setUp()
Mark Hammond3b959db2002-04-19 00:11:32 +0000315
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000316 def failing_queue_test(self, q):
317 if q.qsize():
318 raise RuntimeError("Call this function with an empty queue")
319 for i in range(QUEUE_SIZE-1):
320 q.put(i)
321 # Test a failing non-blocking put.
322 q.fail_next_put = True
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000323 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000324 q.put("oops", block=0)
325 self.fail("The queue didn't fail when it should have")
326 except FailingQueueException:
327 pass
328 q.fail_next_put = True
329 try:
330 q.put("oops", timeout=0.1)
331 self.fail("The queue didn't fail when it should have")
332 except FailingQueueException:
333 pass
334 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000335 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000336 # Test a failing blocking put
337 q.fail_next_put = True
338 try:
339 self.do_blocking_test(q.put, ("full",), q.get, ())
340 self.fail("The queue didn't fail when it should have")
341 except FailingQueueException:
342 pass
343 # Check the Queue isn't damaged.
344 # put failed, but get succeeded - re-add
345 q.put("last")
346 # Test a failing timeout put
347 q.fail_next_put = True
348 try:
349 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
350 FailingQueueException)
351 self.fail("The queue didn't fail when it should have")
352 except FailingQueueException:
353 pass
354 # Check the Queue isn't damaged.
355 # put failed, but get succeeded - re-add
356 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000357 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000358 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000359 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000360 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000361 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000362 # Test a blocking put
363 self.do_blocking_test(q.put, ("full",), q.get, ())
364 # Empty it
365 for i in range(QUEUE_SIZE):
366 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000367 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000368 q.put("first")
369 q.fail_next_get = True
370 try:
371 q.get()
372 self.fail("The queue didn't fail when it should have")
373 except FailingQueueException:
374 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000375 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000376 q.fail_next_get = True
377 try:
378 q.get(timeout=0.1)
379 self.fail("The queue didn't fail when it should have")
380 except FailingQueueException:
381 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000382 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000383 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000384 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000385 q.fail_next_get = True
386 try:
387 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
388 FailingQueueException)
389 self.fail("The queue didn't fail when it should have")
390 except FailingQueueException:
391 pass
392 # put succeeded, but get failed.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000393 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000394 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000395 self.assertTrue(not q.qsize(), "Queue should be empty")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000396
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000397 def test_failing_queue(self):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100398
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000399 # Test to make sure a queue is functioning correctly.
400 # Done twice to the same instance.
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100401 q = self.FailingQueue(QUEUE_SIZE)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000402 self.failing_queue_test(q)
403 self.failing_queue_test(q)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000404
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000405
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100406
407class PyFailingQueueTest(FailingQueueTest, unittest.TestCase):
408 queue = py_queue
409
410
411@need_c_queue
412class CFailingQueueTest(FailingQueueTest, unittest.TestCase):
413 queue = c_queue
414
415
Antoine Pitrou94e16962018-01-16 00:27:16 +0100416class BaseSimpleQueueTest:
417
418 def setUp(self):
419 self.q = self.type2test()
420
421 def feed(self, q, seq, rnd):
422 while True:
423 try:
424 val = seq.pop()
425 except IndexError:
426 return
427 q.put(val)
428 if rnd.random() > 0.5:
429 time.sleep(rnd.random() * 1e-3)
430
431 def consume(self, q, results, sentinel):
432 while True:
433 val = q.get()
434 if val == sentinel:
435 return
436 results.append(val)
437
438 def consume_nonblock(self, q, results, sentinel):
439 while True:
440 while True:
441 try:
442 val = q.get(block=False)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100443 except self.queue.Empty:
Antoine Pitrou94e16962018-01-16 00:27:16 +0100444 time.sleep(1e-5)
445 else:
446 break
447 if val == sentinel:
448 return
449 results.append(val)
450
451 def consume_timeout(self, q, results, sentinel):
452 while True:
453 while True:
454 try:
455 val = q.get(timeout=1e-5)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100456 except self.queue.Empty:
Antoine Pitrou94e16962018-01-16 00:27:16 +0100457 pass
458 else:
459 break
460 if val == sentinel:
461 return
462 results.append(val)
463
464 def run_threads(self, n_feeders, n_consumers, q, inputs,
465 feed_func, consume_func):
466 results = []
467 sentinel = None
468 seq = inputs + [sentinel] * n_consumers
469 seq.reverse()
470 rnd = random.Random(42)
471
472 exceptions = []
473 def log_exceptions(f):
474 def wrapper(*args, **kwargs):
475 try:
476 f(*args, **kwargs)
477 except BaseException as e:
478 exceptions.append(e)
479 return wrapper
480
481 feeders = [threading.Thread(target=log_exceptions(feed_func),
482 args=(q, seq, rnd))
483 for i in range(n_feeders)]
484 consumers = [threading.Thread(target=log_exceptions(consume_func),
485 args=(q, results, sentinel))
486 for i in range(n_consumers)]
487
Hai Shie80697d2020-05-28 06:10:27 +0800488 with threading_helper.start_threads(feeders + consumers):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100489 pass
490
491 self.assertFalse(exceptions)
492 self.assertTrue(q.empty())
493 self.assertEqual(q.qsize(), 0)
494
495 return results
496
497 def test_basic(self):
498 # Basic tests for get(), put() etc.
499 q = self.q
500 self.assertTrue(q.empty())
501 self.assertEqual(q.qsize(), 0)
502 q.put(1)
503 self.assertFalse(q.empty())
504 self.assertEqual(q.qsize(), 1)
505 q.put(2)
506 q.put_nowait(3)
507 q.put(4)
508 self.assertFalse(q.empty())
509 self.assertEqual(q.qsize(), 4)
510
511 self.assertEqual(q.get(), 1)
512 self.assertEqual(q.qsize(), 3)
513
514 self.assertEqual(q.get_nowait(), 2)
515 self.assertEqual(q.qsize(), 2)
516
517 self.assertEqual(q.get(block=False), 3)
518 self.assertFalse(q.empty())
519 self.assertEqual(q.qsize(), 1)
520
521 self.assertEqual(q.get(timeout=0.1), 4)
522 self.assertTrue(q.empty())
523 self.assertEqual(q.qsize(), 0)
524
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100525 with self.assertRaises(self.queue.Empty):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100526 q.get(block=False)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100527 with self.assertRaises(self.queue.Empty):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100528 q.get(timeout=1e-3)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100529 with self.assertRaises(self.queue.Empty):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100530 q.get_nowait()
531 self.assertTrue(q.empty())
532 self.assertEqual(q.qsize(), 0)
533
534 def test_negative_timeout_raises_exception(self):
535 q = self.q
536 q.put(1)
537 with self.assertRaises(ValueError):
538 q.get(timeout=-1)
539
540 def test_order(self):
541 # Test a pair of concurrent put() and get()
542 q = self.q
543 inputs = list(range(100))
544 results = self.run_threads(1, 1, q, inputs, self.feed, self.consume)
545
546 # One producer, one consumer => results appended in well-defined order
547 self.assertEqual(results, inputs)
548
549 def test_many_threads(self):
550 # Test multiple concurrent put() and get()
551 N = 50
552 q = self.q
553 inputs = list(range(10000))
554 results = self.run_threads(N, N, q, inputs, self.feed, self.consume)
555
556 # Multiple consumers without synchronization append the
557 # results in random order
558 self.assertEqual(sorted(results), inputs)
559
560 def test_many_threads_nonblock(self):
561 # Test multiple concurrent put() and get(block=False)
562 N = 50
563 q = self.q
564 inputs = list(range(10000))
565 results = self.run_threads(N, N, q, inputs,
566 self.feed, self.consume_nonblock)
567
568 self.assertEqual(sorted(results), inputs)
569
570 def test_many_threads_timeout(self):
571 # Test multiple concurrent put() and get(timeout=...)
572 N = 50
573 q = self.q
574 inputs = list(range(1000))
575 results = self.run_threads(N, N, q, inputs,
576 self.feed, self.consume_timeout)
577
578 self.assertEqual(sorted(results), inputs)
579
580 def test_references(self):
581 # The queue should lose references to each item as soon as
582 # it leaves the queue.
583 class C:
584 pass
585
586 N = 20
587 q = self.q
588 for i in range(N):
589 q.put(C())
590 for i in range(N):
591 wr = weakref.ref(q.get())
592 self.assertIsNone(wr())
593
594
595class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100596
597 queue = py_queue
598 def setUp(self):
599 self.type2test = self.queue._PySimpleQueue
600 super().setUp()
Antoine Pitrou94e16962018-01-16 00:27:16 +0100601
602
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100603@need_c_queue
Antoine Pitrou94e16962018-01-16 00:27:16 +0100604class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
605
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100606 queue = c_queue
607
Antoine Pitrou94e16962018-01-16 00:27:16 +0100608 def setUp(self):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100609 self.type2test = self.queue.SimpleQueue
Antoine Pitrou94e16962018-01-16 00:27:16 +0100610 super().setUp()
611
612 def test_is_default(self):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100613 self.assertIs(self.type2test, self.queue.SimpleQueue)
614 self.assertIs(self.type2test, self.queue.SimpleQueue)
Antoine Pitrou94e16962018-01-16 00:27:16 +0100615
616 def test_reentrancy(self):
617 # bpo-14976: put() may be called reentrantly in an asynchronous
618 # callback.
619 q = self.q
620 gen = itertools.count()
621 N = 10000
622 results = []
623
624 # This test exploits the fact that __del__ in a reference cycle
625 # can be called any time the GC may run.
626
627 class Circular(object):
628 def __init__(self):
629 self.circular = self
630
631 def __del__(self):
632 q.put(next(gen))
633
634 while True:
635 o = Circular()
636 q.put(next(gen))
637 del o
638 results.append(q.get())
639 if results[-1] >= N:
640 break
641
642 self.assertEqual(results, list(range(N + 1)))
643
644
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000645if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -0500646 unittest.main()