blob: 9bb5181377698c7ac3d90af37ab11ec8ce6363fc [file] [log] [blame]
Alexandre Vassalottif260e442008-05-11 19:59:59 +00001# Some simple queue module tests, plus some failure conditions
Tim Petersafe52972004-08-20 02:37:25 +00002# to ensure the Queue locks remain stable.
Antoine Pitrou94e16962018-01-16 00:27:16 +01003import itertools
Antoine Pitrou94e16962018-01-16 00:27:16 +01004import random
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02005import threading
Mark Hammond3b959db2002-04-19 00:11:32 +00006import time
Georg Brandl0e3b0ec2008-02-05 18:48:51 +00007import unittest
Antoine Pitrou94e16962018-01-16 00:27:16 +01008import weakref
Serhiy Storchaka462c1f02021-09-08 18:08:57 +03009from test.support import gc_collect
Hai Shi46605972020-08-04 00:49:18 +080010from test.support import import_helper
Hai Shie80697d2020-05-28 06:10:27 +080011from test.support import threading_helper
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020012
Hai Shi46605972020-08-04 00:49:18 +080013
14py_queue = import_helper.import_fresh_module('queue', blocked=['_queue'])
15c_queue = import_helper.import_fresh_module('queue', fresh=['_queue'])
Pablo Galindo3f5b9082019-06-25 02:53:30 +010016need_c_queue = unittest.skipUnless(c_queue, "No _queue module found")
Antoine Pitrou94e16962018-01-16 00:27:16 +010017
Tim Petersafe52972004-08-20 02:37:25 +000018QUEUE_SIZE = 5
Mark Hammond3b959db2002-04-19 00:11:32 +000019
Raymond Hettingerda3caed2008-01-14 21:39:24 +000020def qfull(q):
21 return q.maxsize > 0 and q.qsize() == q.maxsize
22
Tim Petersafe52972004-08-20 02:37:25 +000023# A thread to run a function that unclogs a blocked Queue.
Mark Hammond3b959db2002-04-19 00:11:32 +000024class _TriggerThread(threading.Thread):
25 def __init__(self, fn, args):
26 self.fn = fn
27 self.args = args
28 self.startedEvent = threading.Event()
29 threading.Thread.__init__(self)
Tim Petersafe52972004-08-20 02:37:25 +000030
Mark Hammond3b959db2002-04-19 00:11:32 +000031 def run(self):
Tim Peters8d7626c2004-08-20 03:27:12 +000032 # The sleep isn't necessary, but is intended to give the blocking
33 # function in the main thread a chance at actually blocking before
34 # we unclog it. But if the sleep is longer than the timeout-based
35 # tests wait in their blocking functions, those tests will fail.
36 # So we give them much longer timeout values compared to the
37 # sleep here (I aimed at 10 seconds for blocking functions --
38 # they should never actually wait that long - they should make
39 # progress as soon as we call self.fn()).
40 time.sleep(0.1)
Mark Hammond3b959db2002-04-19 00:11:32 +000041 self.startedEvent.set()
42 self.fn(*self.args)
43
Tim Peters8d7626c2004-08-20 03:27:12 +000044
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000045# Execute a function that blocks, and in a separate thread, a function that
46# triggers the release. Returns the result of the blocking function. Caution:
47# block_func must guarantee to block until trigger_func is called, and
48# trigger_func must guarantee to change queue state so that block_func can make
49# enough progress to return. In particular, a block_func that just raises an
50# exception regardless of whether trigger_func is called will lead to
51# timing-dependent sporadic failures, and one of those went rarely seen but
52# undiagnosed for years. Now block_func must be unexceptional. If block_func
53# is supposed to raise an exception, call do_exceptional_blocking_test()
54# instead.
55
56class BlockingTestMixin:
57
58 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
Victor Stinner167cbde2017-09-14 14:04:56 -070059 thread = _TriggerThread(trigger_func, trigger_args)
60 thread.start()
61 try:
62 self.result = block_func(*block_args)
63 # If block_func returned before our thread made the call, we failed!
64 if not thread.startedEvent.is_set():
Serhiy Storchakaa4a30202017-11-28 22:54:42 +020065 self.fail("blocking function %r appeared not to block" %
Victor Stinner167cbde2017-09-14 14:04:56 -070066 block_func)
67 return self.result
68 finally:
Hai Shie80697d2020-05-28 06:10:27 +080069 threading_helper.join_thread(thread) # make sure the thread terminates
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000070
71 # Call this instead if block_func is supposed to raise an exception.
72 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
73 trigger_args, expected_exception_class):
Victor Stinner167cbde2017-09-14 14:04:56 -070074 thread = _TriggerThread(trigger_func, trigger_args)
75 thread.start()
Tim Peters8d7626c2004-08-20 03:27:12 +000076 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000077 try:
78 block_func(*block_args)
79 except expected_exception_class:
80 raise
81 else:
82 self.fail("expected exception of kind %r" %
83 expected_exception_class)
84 finally:
Hai Shie80697d2020-05-28 06:10:27 +080085 threading_helper.join_thread(thread) # make sure the thread terminates
Victor Stinner167cbde2017-09-14 14:04:56 -070086 if not thread.startedEvent.is_set():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000087 self.fail("trigger thread ended but event never set")
88
89
R David Murrayc6bfce92012-03-17 16:38:39 -040090class BaseQueueTestMixin(BlockingTestMixin):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000091 def setUp(self):
92 self.cum = 0
93 self.cumlock = threading.Lock()
94
Antoine Pitrou94e16962018-01-16 00:27:16 +010095 def basic_queue_test(self, q):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000096 if q.qsize():
97 raise RuntimeError("Call this function with an empty queue")
Brett Cannon671153d2010-07-23 16:56:21 +000098 self.assertTrue(q.empty())
99 self.assertFalse(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000100 # I guess we better check things actually queue correctly a little :)
101 q.put(111)
102 q.put(333)
103 q.put(222)
104 target_order = dict(Queue = [111, 333, 222],
105 LifoQueue = [222, 333, 111],
106 PriorityQueue = [111, 222, 333])
107 actual_order = [q.get(), q.get(), q.get()]
Ezio Melottib3aedd42010-11-20 19:04:17 +0000108 self.assertEqual(actual_order, target_order[q.__class__.__name__],
109 "Didn't seem to queue the correct data!")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000110 for i in range(QUEUE_SIZE-1):
111 q.put(i)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000112 self.assertTrue(q.qsize(), "Queue should not be empty")
113 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000114 last = 2 * QUEUE_SIZE
115 full = 3 * 2 * QUEUE_SIZE
116 q.put(last)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000117 self.assertTrue(qfull(q), "Queue should be full")
Brett Cannon671153d2010-07-23 16:56:21 +0000118 self.assertFalse(q.empty())
119 self.assertTrue(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000120 try:
121 q.put(full, block=0)
122 self.fail("Didn't appear to block with a full queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100123 except self.queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000124 pass
125 try:
126 q.put(full, timeout=0.01)
127 self.fail("Didn't appear to time-out with a full queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100128 except self.queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000129 pass
130 # Test a blocking put
131 self.do_blocking_test(q.put, (full,), q.get, ())
132 self.do_blocking_test(q.put, (full, True, 10), q.get, ())
133 # Empty it
134 for i in range(QUEUE_SIZE):
135 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000136 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000137 try:
138 q.get(block=0)
139 self.fail("Didn't appear to block with an empty queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100140 except self.queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000141 pass
142 try:
143 q.get(timeout=0.01)
144 self.fail("Didn't appear to time-out with an empty queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100145 except self.queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000146 pass
147 # Test a blocking get
148 self.do_blocking_test(q.get, (), q.put, ('empty',))
149 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
150
151
152 def worker(self, q):
153 while True:
154 x = q.get()
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000155 if x < 0:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000156 q.task_done()
157 return
158 with self.cumlock:
159 self.cum += x
160 q.task_done()
161
162 def queue_join_test(self, q):
163 self.cum = 0
Victor Stinner167cbde2017-09-14 14:04:56 -0700164 threads = []
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000165 for i in (0,1):
Victor Stinner167cbde2017-09-14 14:04:56 -0700166 thread = threading.Thread(target=self.worker, args=(q,))
167 thread.start()
168 threads.append(thread)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000169 for i in range(100):
170 q.put(i)
171 q.join()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000172 self.assertEqual(self.cum, sum(range(100)),
173 "q.join() did not block until all tasks were done")
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000174 for i in (0,1):
175 q.put(-1) # instruct the threads to close
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000176 q.join() # verify that you can join twice
Victor Stinner167cbde2017-09-14 14:04:56 -0700177 for thread in threads:
178 thread.join()
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000179
180 def test_queue_task_done(self):
181 # Test to make sure a queue task completed successfully.
182 q = self.type2test()
183 try:
184 q.task_done()
185 except ValueError:
186 pass
Tim Peters8d7626c2004-08-20 03:27:12 +0000187 else:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000188 self.fail("Did not detect task count going negative")
189
190 def test_queue_join(self):
191 # Test that a queue join()s successfully, and before anything else
192 # (done twice for insurance).
193 q = self.type2test()
194 self.queue_join_test(q)
195 self.queue_join_test(q)
196 try:
197 q.task_done()
198 except ValueError:
199 pass
200 else:
201 self.fail("Did not detect task count going negative")
202
Antoine Pitrou94e16962018-01-16 00:27:16 +0100203 def test_basic(self):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000204 # Do it a couple of times on the same queue.
205 # Done twice to make sure works with same instance reused.
206 q = self.type2test(QUEUE_SIZE)
Antoine Pitrou94e16962018-01-16 00:27:16 +0100207 self.basic_queue_test(q)
208 self.basic_queue_test(q)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000209
Brett Cannon671153d2010-07-23 16:56:21 +0000210 def test_negative_timeout_raises_exception(self):
211 q = self.type2test(QUEUE_SIZE)
212 with self.assertRaises(ValueError):
213 q.put(1, timeout=-1)
214 with self.assertRaises(ValueError):
215 q.get(1, timeout=-1)
216
217 def test_nowait(self):
218 q = self.type2test(QUEUE_SIZE)
219 for i in range(QUEUE_SIZE):
220 q.put_nowait(1)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100221 with self.assertRaises(self.queue.Full):
Brett Cannon671153d2010-07-23 16:56:21 +0000222 q.put_nowait(1)
223
224 for i in range(QUEUE_SIZE):
225 q.get_nowait()
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100226 with self.assertRaises(self.queue.Empty):
Brett Cannon671153d2010-07-23 16:56:21 +0000227 q.get_nowait()
228
Raymond Hettinger189316a2010-10-31 17:57:52 +0000229 def test_shrinking_queue(self):
230 # issue 10110
231 q = self.type2test(3)
232 q.put(1)
233 q.put(2)
234 q.put(3)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100235 with self.assertRaises(self.queue.Full):
Raymond Hettinger189316a2010-10-31 17:57:52 +0000236 q.put_nowait(4)
237 self.assertEqual(q.qsize(), 3)
238 q.maxsize = 2 # shrink the queue
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100239 with self.assertRaises(self.queue.Full):
Raymond Hettinger189316a2010-10-31 17:57:52 +0000240 q.put_nowait(4)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000241
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100242class QueueTest(BaseQueueTestMixin):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000243
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100244 def setUp(self):
245 self.type2test = self.queue.Queue
246 super().setUp()
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000247
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100248class PyQueueTest(QueueTest, unittest.TestCase):
249 queue = py_queue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000250
251
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100252@need_c_queue
253class CQueueTest(QueueTest, unittest.TestCase):
254 queue = c_queue
255
256
257class LifoQueueTest(BaseQueueTestMixin):
258
259 def setUp(self):
260 self.type2test = self.queue.LifoQueue
261 super().setUp()
262
263
264class PyLifoQueueTest(LifoQueueTest, unittest.TestCase):
265 queue = py_queue
266
267
268@need_c_queue
269class CLifoQueueTest(LifoQueueTest, unittest.TestCase):
270 queue = c_queue
271
272
273class PriorityQueueTest(BaseQueueTestMixin):
274
275 def setUp(self):
276 self.type2test = self.queue.PriorityQueue
277 super().setUp()
278
279
280class PyPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
281 queue = py_queue
282
283
284@need_c_queue
285class CPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
286 queue = c_queue
287
Mark Hammond3b959db2002-04-19 00:11:32 +0000288
289# A Queue subclass that can provoke failure at a moment's notice :)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100290class FailingQueueException(Exception): pass
Mark Hammond3b959db2002-04-19 00:11:32 +0000291
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100292class FailingQueueTest(BlockingTestMixin):
Mark Hammond3b959db2002-04-19 00:11:32 +0000293
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100294 def setUp(self):
295
296 Queue = self.queue.Queue
297
298 class FailingQueue(Queue):
299 def __init__(self, *args):
300 self.fail_next_put = False
301 self.fail_next_get = False
302 Queue.__init__(self, *args)
303 def _put(self, item):
304 if self.fail_next_put:
305 self.fail_next_put = False
306 raise FailingQueueException("You Lose")
307 return Queue._put(self, item)
308 def _get(self):
309 if self.fail_next_get:
310 self.fail_next_get = False
311 raise FailingQueueException("You Lose")
312 return Queue._get(self)
313
314 self.FailingQueue = FailingQueue
315
316 super().setUp()
Mark Hammond3b959db2002-04-19 00:11:32 +0000317
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000318 def failing_queue_test(self, q):
319 if q.qsize():
320 raise RuntimeError("Call this function with an empty queue")
321 for i in range(QUEUE_SIZE-1):
322 q.put(i)
323 # Test a failing non-blocking put.
324 q.fail_next_put = True
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000325 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000326 q.put("oops", block=0)
327 self.fail("The queue didn't fail when it should have")
328 except FailingQueueException:
329 pass
330 q.fail_next_put = True
331 try:
332 q.put("oops", timeout=0.1)
333 self.fail("The queue didn't fail when it should have")
334 except FailingQueueException:
335 pass
336 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000337 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000338 # Test a failing blocking put
339 q.fail_next_put = True
340 try:
341 self.do_blocking_test(q.put, ("full",), q.get, ())
342 self.fail("The queue didn't fail when it should have")
343 except FailingQueueException:
344 pass
345 # Check the Queue isn't damaged.
346 # put failed, but get succeeded - re-add
347 q.put("last")
348 # Test a failing timeout put
349 q.fail_next_put = True
350 try:
351 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
352 FailingQueueException)
353 self.fail("The queue didn't fail when it should have")
354 except FailingQueueException:
355 pass
356 # Check the Queue isn't damaged.
357 # put failed, but get succeeded - re-add
358 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000359 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000360 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000361 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000362 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000363 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000364 # Test a blocking put
365 self.do_blocking_test(q.put, ("full",), q.get, ())
366 # Empty it
367 for i in range(QUEUE_SIZE):
368 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000369 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000370 q.put("first")
371 q.fail_next_get = True
372 try:
373 q.get()
374 self.fail("The queue didn't fail when it should have")
375 except FailingQueueException:
376 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000377 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000378 q.fail_next_get = True
379 try:
380 q.get(timeout=0.1)
381 self.fail("The queue didn't fail when it should have")
382 except FailingQueueException:
383 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000384 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000385 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000386 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000387 q.fail_next_get = True
388 try:
389 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
390 FailingQueueException)
391 self.fail("The queue didn't fail when it should have")
392 except FailingQueueException:
393 pass
394 # put succeeded, but get failed.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000395 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000396 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000397 self.assertTrue(not q.qsize(), "Queue should be empty")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000398
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000399 def test_failing_queue(self):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100400
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000401 # Test to make sure a queue is functioning correctly.
402 # Done twice to the same instance.
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100403 q = self.FailingQueue(QUEUE_SIZE)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000404 self.failing_queue_test(q)
405 self.failing_queue_test(q)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000406
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000407
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100408
409class PyFailingQueueTest(FailingQueueTest, unittest.TestCase):
410 queue = py_queue
411
412
413@need_c_queue
414class CFailingQueueTest(FailingQueueTest, unittest.TestCase):
415 queue = c_queue
416
417
Antoine Pitrou94e16962018-01-16 00:27:16 +0100418class BaseSimpleQueueTest:
419
420 def setUp(self):
421 self.q = self.type2test()
422
423 def feed(self, q, seq, rnd):
424 while True:
425 try:
426 val = seq.pop()
427 except IndexError:
428 return
429 q.put(val)
430 if rnd.random() > 0.5:
431 time.sleep(rnd.random() * 1e-3)
432
433 def consume(self, q, results, sentinel):
434 while True:
435 val = q.get()
436 if val == sentinel:
437 return
438 results.append(val)
439
440 def consume_nonblock(self, q, results, sentinel):
441 while True:
442 while True:
443 try:
444 val = q.get(block=False)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100445 except self.queue.Empty:
Antoine Pitrou94e16962018-01-16 00:27:16 +0100446 time.sleep(1e-5)
447 else:
448 break
449 if val == sentinel:
450 return
451 results.append(val)
452
453 def consume_timeout(self, q, results, sentinel):
454 while True:
455 while True:
456 try:
457 val = q.get(timeout=1e-5)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100458 except self.queue.Empty:
Antoine Pitrou94e16962018-01-16 00:27:16 +0100459 pass
460 else:
461 break
462 if val == sentinel:
463 return
464 results.append(val)
465
466 def run_threads(self, n_feeders, n_consumers, q, inputs,
467 feed_func, consume_func):
468 results = []
469 sentinel = None
470 seq = inputs + [sentinel] * n_consumers
471 seq.reverse()
472 rnd = random.Random(42)
473
474 exceptions = []
475 def log_exceptions(f):
476 def wrapper(*args, **kwargs):
477 try:
478 f(*args, **kwargs)
479 except BaseException as e:
480 exceptions.append(e)
481 return wrapper
482
483 feeders = [threading.Thread(target=log_exceptions(feed_func),
484 args=(q, seq, rnd))
485 for i in range(n_feeders)]
486 consumers = [threading.Thread(target=log_exceptions(consume_func),
487 args=(q, results, sentinel))
488 for i in range(n_consumers)]
489
Hai Shie80697d2020-05-28 06:10:27 +0800490 with threading_helper.start_threads(feeders + consumers):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100491 pass
492
493 self.assertFalse(exceptions)
494 self.assertTrue(q.empty())
495 self.assertEqual(q.qsize(), 0)
496
497 return results
498
499 def test_basic(self):
500 # Basic tests for get(), put() etc.
501 q = self.q
502 self.assertTrue(q.empty())
503 self.assertEqual(q.qsize(), 0)
504 q.put(1)
505 self.assertFalse(q.empty())
506 self.assertEqual(q.qsize(), 1)
507 q.put(2)
508 q.put_nowait(3)
509 q.put(4)
510 self.assertFalse(q.empty())
511 self.assertEqual(q.qsize(), 4)
512
513 self.assertEqual(q.get(), 1)
514 self.assertEqual(q.qsize(), 3)
515
516 self.assertEqual(q.get_nowait(), 2)
517 self.assertEqual(q.qsize(), 2)
518
519 self.assertEqual(q.get(block=False), 3)
520 self.assertFalse(q.empty())
521 self.assertEqual(q.qsize(), 1)
522
523 self.assertEqual(q.get(timeout=0.1), 4)
524 self.assertTrue(q.empty())
525 self.assertEqual(q.qsize(), 0)
526
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100527 with self.assertRaises(self.queue.Empty):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100528 q.get(block=False)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100529 with self.assertRaises(self.queue.Empty):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100530 q.get(timeout=1e-3)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100531 with self.assertRaises(self.queue.Empty):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100532 q.get_nowait()
533 self.assertTrue(q.empty())
534 self.assertEqual(q.qsize(), 0)
535
536 def test_negative_timeout_raises_exception(self):
537 q = self.q
538 q.put(1)
539 with self.assertRaises(ValueError):
540 q.get(timeout=-1)
541
542 def test_order(self):
543 # Test a pair of concurrent put() and get()
544 q = self.q
545 inputs = list(range(100))
546 results = self.run_threads(1, 1, q, inputs, self.feed, self.consume)
547
548 # One producer, one consumer => results appended in well-defined order
549 self.assertEqual(results, inputs)
550
551 def test_many_threads(self):
552 # Test multiple concurrent put() and get()
553 N = 50
554 q = self.q
555 inputs = list(range(10000))
556 results = self.run_threads(N, N, q, inputs, self.feed, self.consume)
557
558 # Multiple consumers without synchronization append the
559 # results in random order
560 self.assertEqual(sorted(results), inputs)
561
562 def test_many_threads_nonblock(self):
563 # Test multiple concurrent put() and get(block=False)
564 N = 50
565 q = self.q
566 inputs = list(range(10000))
567 results = self.run_threads(N, N, q, inputs,
568 self.feed, self.consume_nonblock)
569
570 self.assertEqual(sorted(results), inputs)
571
572 def test_many_threads_timeout(self):
573 # Test multiple concurrent put() and get(timeout=...)
574 N = 50
575 q = self.q
576 inputs = list(range(1000))
577 results = self.run_threads(N, N, q, inputs,
578 self.feed, self.consume_timeout)
579
580 self.assertEqual(sorted(results), inputs)
581
582 def test_references(self):
583 # The queue should lose references to each item as soon as
584 # it leaves the queue.
585 class C:
586 pass
587
588 N = 20
589 q = self.q
590 for i in range(N):
591 q.put(C())
592 for i in range(N):
593 wr = weakref.ref(q.get())
Serhiy Storchaka462c1f02021-09-08 18:08:57 +0300594 gc_collect() # For PyPy or other GCs.
Antoine Pitrou94e16962018-01-16 00:27:16 +0100595 self.assertIsNone(wr())
596
597
598class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100599
600 queue = py_queue
601 def setUp(self):
602 self.type2test = self.queue._PySimpleQueue
603 super().setUp()
Antoine Pitrou94e16962018-01-16 00:27:16 +0100604
605
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100606@need_c_queue
Antoine Pitrou94e16962018-01-16 00:27:16 +0100607class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
608
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100609 queue = c_queue
610
Antoine Pitrou94e16962018-01-16 00:27:16 +0100611 def setUp(self):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100612 self.type2test = self.queue.SimpleQueue
Antoine Pitrou94e16962018-01-16 00:27:16 +0100613 super().setUp()
614
615 def test_is_default(self):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100616 self.assertIs(self.type2test, self.queue.SimpleQueue)
617 self.assertIs(self.type2test, self.queue.SimpleQueue)
Antoine Pitrou94e16962018-01-16 00:27:16 +0100618
619 def test_reentrancy(self):
620 # bpo-14976: put() may be called reentrantly in an asynchronous
621 # callback.
622 q = self.q
623 gen = itertools.count()
624 N = 10000
625 results = []
626
627 # This test exploits the fact that __del__ in a reference cycle
628 # can be called any time the GC may run.
629
630 class Circular(object):
631 def __init__(self):
632 self.circular = self
633
634 def __del__(self):
635 q.put(next(gen))
636
637 while True:
638 o = Circular()
639 q.put(next(gen))
640 del o
641 results.append(q.get())
642 if results[-1] >= N:
643 break
644
645 self.assertEqual(results, list(range(N + 1)))
646
647
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000648if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -0500649 unittest.main()