blob: 508b739019593d6084a19ebf88a631afc36aaa7a [file] [log] [blame]
Alexandre Vassalottif260e442008-05-11 19:59:59 +00001# Some simple queue module tests, plus some failure conditions
Tim Petersafe52972004-08-20 02:37:25 +00002# to ensure the Queue locks remain stable.
Antoine Pitrou94e16962018-01-16 00:27:16 +01003import itertools
Antoine Pitrou94e16962018-01-16 00:27:16 +01004import random
Antoine Pitroua6a4dc82017-09-07 18:56:24 +02005import threading
Mark Hammond3b959db2002-04-19 00:11:32 +00006import time
Georg Brandl0e3b0ec2008-02-05 18:48:51 +00007import unittest
Antoine Pitrou94e16962018-01-16 00:27:16 +01008import weakref
Hai Shi46605972020-08-04 00:49:18 +08009from test.support import import_helper
Hai Shie80697d2020-05-28 06:10:27 +080010from test.support import threading_helper
Antoine Pitroua6a4dc82017-09-07 18:56:24 +020011
Hai Shi46605972020-08-04 00:49:18 +080012
13py_queue = import_helper.import_fresh_module('queue', blocked=['_queue'])
14c_queue = import_helper.import_fresh_module('queue', fresh=['_queue'])
Pablo Galindo3f5b9082019-06-25 02:53:30 +010015need_c_queue = unittest.skipUnless(c_queue, "No _queue module found")
Antoine Pitrou94e16962018-01-16 00:27:16 +010016
Tim Petersafe52972004-08-20 02:37:25 +000017QUEUE_SIZE = 5
Mark Hammond3b959db2002-04-19 00:11:32 +000018
Raymond Hettingerda3caed2008-01-14 21:39:24 +000019def qfull(q):
20 return q.maxsize > 0 and q.qsize() == q.maxsize
21
Tim Petersafe52972004-08-20 02:37:25 +000022# A thread to run a function that unclogs a blocked Queue.
Mark Hammond3b959db2002-04-19 00:11:32 +000023class _TriggerThread(threading.Thread):
24 def __init__(self, fn, args):
25 self.fn = fn
26 self.args = args
27 self.startedEvent = threading.Event()
28 threading.Thread.__init__(self)
Tim Petersafe52972004-08-20 02:37:25 +000029
Mark Hammond3b959db2002-04-19 00:11:32 +000030 def run(self):
Tim Peters8d7626c2004-08-20 03:27:12 +000031 # The sleep isn't necessary, but is intended to give the blocking
32 # function in the main thread a chance at actually blocking before
33 # we unclog it. But if the sleep is longer than the timeout-based
34 # tests wait in their blocking functions, those tests will fail.
35 # So we give them much longer timeout values compared to the
36 # sleep here (I aimed at 10 seconds for blocking functions --
37 # they should never actually wait that long - they should make
38 # progress as soon as we call self.fn()).
39 time.sleep(0.1)
Mark Hammond3b959db2002-04-19 00:11:32 +000040 self.startedEvent.set()
41 self.fn(*self.args)
42
Tim Peters8d7626c2004-08-20 03:27:12 +000043
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000044# Execute a function that blocks, and in a separate thread, a function that
45# triggers the release. Returns the result of the blocking function. Caution:
46# block_func must guarantee to block until trigger_func is called, and
47# trigger_func must guarantee to change queue state so that block_func can make
48# enough progress to return. In particular, a block_func that just raises an
49# exception regardless of whether trigger_func is called will lead to
50# timing-dependent sporadic failures, and one of those went rarely seen but
51# undiagnosed for years. Now block_func must be unexceptional. If block_func
52# is supposed to raise an exception, call do_exceptional_blocking_test()
53# instead.
54
55class BlockingTestMixin:
56
57 def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
Victor Stinner167cbde2017-09-14 14:04:56 -070058 thread = _TriggerThread(trigger_func, trigger_args)
59 thread.start()
60 try:
61 self.result = block_func(*block_args)
62 # If block_func returned before our thread made the call, we failed!
63 if not thread.startedEvent.is_set():
Serhiy Storchakaa4a30202017-11-28 22:54:42 +020064 self.fail("blocking function %r appeared not to block" %
Victor Stinner167cbde2017-09-14 14:04:56 -070065 block_func)
66 return self.result
67 finally:
Hai Shie80697d2020-05-28 06:10:27 +080068 threading_helper.join_thread(thread) # make sure the thread terminates
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000069
70 # Call this instead if block_func is supposed to raise an exception.
71 def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
72 trigger_args, expected_exception_class):
Victor Stinner167cbde2017-09-14 14:04:56 -070073 thread = _TriggerThread(trigger_func, trigger_args)
74 thread.start()
Tim Peters8d7626c2004-08-20 03:27:12 +000075 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000076 try:
77 block_func(*block_args)
78 except expected_exception_class:
79 raise
80 else:
81 self.fail("expected exception of kind %r" %
82 expected_exception_class)
83 finally:
Hai Shie80697d2020-05-28 06:10:27 +080084 threading_helper.join_thread(thread) # make sure the thread terminates
Victor Stinner167cbde2017-09-14 14:04:56 -070085 if not thread.startedEvent.is_set():
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000086 self.fail("trigger thread ended but event never set")
87
88
R David Murrayc6bfce92012-03-17 16:38:39 -040089class BaseQueueTestMixin(BlockingTestMixin):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000090 def setUp(self):
91 self.cum = 0
92 self.cumlock = threading.Lock()
93
Antoine Pitrou94e16962018-01-16 00:27:16 +010094 def basic_queue_test(self, q):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000095 if q.qsize():
96 raise RuntimeError("Call this function with an empty queue")
Brett Cannon671153d2010-07-23 16:56:21 +000097 self.assertTrue(q.empty())
98 self.assertFalse(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +000099 # I guess we better check things actually queue correctly a little :)
100 q.put(111)
101 q.put(333)
102 q.put(222)
103 target_order = dict(Queue = [111, 333, 222],
104 LifoQueue = [222, 333, 111],
105 PriorityQueue = [111, 222, 333])
106 actual_order = [q.get(), q.get(), q.get()]
Ezio Melottib3aedd42010-11-20 19:04:17 +0000107 self.assertEqual(actual_order, target_order[q.__class__.__name__],
108 "Didn't seem to queue the correct data!")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000109 for i in range(QUEUE_SIZE-1):
110 q.put(i)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000111 self.assertTrue(q.qsize(), "Queue should not be empty")
112 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000113 last = 2 * QUEUE_SIZE
114 full = 3 * 2 * QUEUE_SIZE
115 q.put(last)
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000116 self.assertTrue(qfull(q), "Queue should be full")
Brett Cannon671153d2010-07-23 16:56:21 +0000117 self.assertFalse(q.empty())
118 self.assertTrue(q.full())
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000119 try:
120 q.put(full, block=0)
121 self.fail("Didn't appear to block with a full queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100122 except self.queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000123 pass
124 try:
125 q.put(full, timeout=0.01)
126 self.fail("Didn't appear to time-out with a full queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100127 except self.queue.Full:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000128 pass
129 # Test a blocking put
130 self.do_blocking_test(q.put, (full,), q.get, ())
131 self.do_blocking_test(q.put, (full, True, 10), q.get, ())
132 # Empty it
133 for i in range(QUEUE_SIZE):
134 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000135 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000136 try:
137 q.get(block=0)
138 self.fail("Didn't appear to block with an empty queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100139 except self.queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000140 pass
141 try:
142 q.get(timeout=0.01)
143 self.fail("Didn't appear to time-out with an empty queue")
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100144 except self.queue.Empty:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000145 pass
146 # Test a blocking get
147 self.do_blocking_test(q.get, (), q.put, ('empty',))
148 self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
149
150
151 def worker(self, q):
152 while True:
153 x = q.get()
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000154 if x < 0:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000155 q.task_done()
156 return
157 with self.cumlock:
158 self.cum += x
159 q.task_done()
160
161 def queue_join_test(self, q):
162 self.cum = 0
Victor Stinner167cbde2017-09-14 14:04:56 -0700163 threads = []
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000164 for i in (0,1):
Victor Stinner167cbde2017-09-14 14:04:56 -0700165 thread = threading.Thread(target=self.worker, args=(q,))
166 thread.start()
167 threads.append(thread)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000168 for i in range(100):
169 q.put(i)
170 q.join()
Ezio Melottib3aedd42010-11-20 19:04:17 +0000171 self.assertEqual(self.cum, sum(range(100)),
172 "q.join() did not block until all tasks were done")
Amaury Forgeot d'Arcb4febc72008-04-01 21:23:34 +0000173 for i in (0,1):
174 q.put(-1) # instruct the threads to close
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000175 q.join() # verify that you can join twice
Victor Stinner167cbde2017-09-14 14:04:56 -0700176 for thread in threads:
177 thread.join()
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000178
179 def test_queue_task_done(self):
180 # Test to make sure a queue task completed successfully.
181 q = self.type2test()
182 try:
183 q.task_done()
184 except ValueError:
185 pass
Tim Peters8d7626c2004-08-20 03:27:12 +0000186 else:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000187 self.fail("Did not detect task count going negative")
188
189 def test_queue_join(self):
190 # Test that a queue join()s successfully, and before anything else
191 # (done twice for insurance).
192 q = self.type2test()
193 self.queue_join_test(q)
194 self.queue_join_test(q)
195 try:
196 q.task_done()
197 except ValueError:
198 pass
199 else:
200 self.fail("Did not detect task count going negative")
201
Antoine Pitrou94e16962018-01-16 00:27:16 +0100202 def test_basic(self):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000203 # Do it a couple of times on the same queue.
204 # Done twice to make sure works with same instance reused.
205 q = self.type2test(QUEUE_SIZE)
Antoine Pitrou94e16962018-01-16 00:27:16 +0100206 self.basic_queue_test(q)
207 self.basic_queue_test(q)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000208
Brett Cannon671153d2010-07-23 16:56:21 +0000209 def test_negative_timeout_raises_exception(self):
210 q = self.type2test(QUEUE_SIZE)
211 with self.assertRaises(ValueError):
212 q.put(1, timeout=-1)
213 with self.assertRaises(ValueError):
214 q.get(1, timeout=-1)
215
216 def test_nowait(self):
217 q = self.type2test(QUEUE_SIZE)
218 for i in range(QUEUE_SIZE):
219 q.put_nowait(1)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100220 with self.assertRaises(self.queue.Full):
Brett Cannon671153d2010-07-23 16:56:21 +0000221 q.put_nowait(1)
222
223 for i in range(QUEUE_SIZE):
224 q.get_nowait()
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100225 with self.assertRaises(self.queue.Empty):
Brett Cannon671153d2010-07-23 16:56:21 +0000226 q.get_nowait()
227
Raymond Hettinger189316a2010-10-31 17:57:52 +0000228 def test_shrinking_queue(self):
229 # issue 10110
230 q = self.type2test(3)
231 q.put(1)
232 q.put(2)
233 q.put(3)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100234 with self.assertRaises(self.queue.Full):
Raymond Hettinger189316a2010-10-31 17:57:52 +0000235 q.put_nowait(4)
236 self.assertEqual(q.qsize(), 3)
237 q.maxsize = 2 # shrink the queue
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100238 with self.assertRaises(self.queue.Full):
Raymond Hettinger189316a2010-10-31 17:57:52 +0000239 q.put_nowait(4)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000240
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100241class QueueTest(BaseQueueTestMixin):
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000242
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100243 def setUp(self):
244 self.type2test = self.queue.Queue
245 super().setUp()
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000246
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100247class PyQueueTest(QueueTest, unittest.TestCase):
248 queue = py_queue
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000249
250
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100251@need_c_queue
252class CQueueTest(QueueTest, unittest.TestCase):
253 queue = c_queue
254
255
256class LifoQueueTest(BaseQueueTestMixin):
257
258 def setUp(self):
259 self.type2test = self.queue.LifoQueue
260 super().setUp()
261
262
263class PyLifoQueueTest(LifoQueueTest, unittest.TestCase):
264 queue = py_queue
265
266
267@need_c_queue
268class CLifoQueueTest(LifoQueueTest, unittest.TestCase):
269 queue = c_queue
270
271
272class PriorityQueueTest(BaseQueueTestMixin):
273
274 def setUp(self):
275 self.type2test = self.queue.PriorityQueue
276 super().setUp()
277
278
279class PyPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
280 queue = py_queue
281
282
283@need_c_queue
284class CPriorityQueueTest(PriorityQueueTest, unittest.TestCase):
285 queue = c_queue
286
Mark Hammond3b959db2002-04-19 00:11:32 +0000287
288# A Queue subclass that can provoke failure at a moment's notice :)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100289class FailingQueueException(Exception): pass
Mark Hammond3b959db2002-04-19 00:11:32 +0000290
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100291class FailingQueueTest(BlockingTestMixin):
Mark Hammond3b959db2002-04-19 00:11:32 +0000292
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100293 def setUp(self):
294
295 Queue = self.queue.Queue
296
297 class FailingQueue(Queue):
298 def __init__(self, *args):
299 self.fail_next_put = False
300 self.fail_next_get = False
301 Queue.__init__(self, *args)
302 def _put(self, item):
303 if self.fail_next_put:
304 self.fail_next_put = False
305 raise FailingQueueException("You Lose")
306 return Queue._put(self, item)
307 def _get(self):
308 if self.fail_next_get:
309 self.fail_next_get = False
310 raise FailingQueueException("You Lose")
311 return Queue._get(self)
312
313 self.FailingQueue = FailingQueue
314
315 super().setUp()
Mark Hammond3b959db2002-04-19 00:11:32 +0000316
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000317 def failing_queue_test(self, q):
318 if q.qsize():
319 raise RuntimeError("Call this function with an empty queue")
320 for i in range(QUEUE_SIZE-1):
321 q.put(i)
322 # Test a failing non-blocking put.
323 q.fail_next_put = True
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000324 try:
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000325 q.put("oops", block=0)
326 self.fail("The queue didn't fail when it should have")
327 except FailingQueueException:
328 pass
329 q.fail_next_put = True
330 try:
331 q.put("oops", timeout=0.1)
332 self.fail("The queue didn't fail when it should have")
333 except FailingQueueException:
334 pass
335 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000336 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000337 # Test a failing blocking put
338 q.fail_next_put = True
339 try:
340 self.do_blocking_test(q.put, ("full",), q.get, ())
341 self.fail("The queue didn't fail when it should have")
342 except FailingQueueException:
343 pass
344 # Check the Queue isn't damaged.
345 # put failed, but get succeeded - re-add
346 q.put("last")
347 # Test a failing timeout put
348 q.fail_next_put = True
349 try:
350 self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
351 FailingQueueException)
352 self.fail("The queue didn't fail when it should have")
353 except FailingQueueException:
354 pass
355 # Check the Queue isn't damaged.
356 # put failed, but get succeeded - re-add
357 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000358 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000359 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000360 self.assertTrue(not qfull(q), "Queue should not be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000361 q.put("last")
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000362 self.assertTrue(qfull(q), "Queue should be full")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000363 # Test a blocking put
364 self.do_blocking_test(q.put, ("full",), q.get, ())
365 # Empty it
366 for i in range(QUEUE_SIZE):
367 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000368 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000369 q.put("first")
370 q.fail_next_get = True
371 try:
372 q.get()
373 self.fail("The queue didn't fail when it should have")
374 except FailingQueueException:
375 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000376 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000377 q.fail_next_get = True
378 try:
379 q.get(timeout=0.1)
380 self.fail("The queue didn't fail when it should have")
381 except FailingQueueException:
382 pass
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000383 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000384 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000385 self.assertTrue(not q.qsize(), "Queue should be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000386 q.fail_next_get = True
387 try:
388 self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
389 FailingQueueException)
390 self.fail("The queue didn't fail when it should have")
391 except FailingQueueException:
392 pass
393 # put succeeded, but get failed.
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000394 self.assertTrue(q.qsize(), "Queue should not be empty")
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000395 q.get()
Benjamin Petersonc9c0f202009-06-30 23:06:06 +0000396 self.assertTrue(not q.qsize(), "Queue should be empty")
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000397
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000398 def test_failing_queue(self):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100399
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000400 # Test to make sure a queue is functioning correctly.
401 # Done twice to the same instance.
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100402 q = self.FailingQueue(QUEUE_SIZE)
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000403 self.failing_queue_test(q)
404 self.failing_queue_test(q)
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000405
Thomas Wouters49fd7fa2006-04-21 10:40:58 +0000406
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100407
408class PyFailingQueueTest(FailingQueueTest, unittest.TestCase):
409 queue = py_queue
410
411
412@need_c_queue
413class CFailingQueueTest(FailingQueueTest, unittest.TestCase):
414 queue = c_queue
415
416
Antoine Pitrou94e16962018-01-16 00:27:16 +0100417class BaseSimpleQueueTest:
418
419 def setUp(self):
420 self.q = self.type2test()
421
422 def feed(self, q, seq, rnd):
423 while True:
424 try:
425 val = seq.pop()
426 except IndexError:
427 return
428 q.put(val)
429 if rnd.random() > 0.5:
430 time.sleep(rnd.random() * 1e-3)
431
432 def consume(self, q, results, sentinel):
433 while True:
434 val = q.get()
435 if val == sentinel:
436 return
437 results.append(val)
438
439 def consume_nonblock(self, q, results, sentinel):
440 while True:
441 while True:
442 try:
443 val = q.get(block=False)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100444 except self.queue.Empty:
Antoine Pitrou94e16962018-01-16 00:27:16 +0100445 time.sleep(1e-5)
446 else:
447 break
448 if val == sentinel:
449 return
450 results.append(val)
451
452 def consume_timeout(self, q, results, sentinel):
453 while True:
454 while True:
455 try:
456 val = q.get(timeout=1e-5)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100457 except self.queue.Empty:
Antoine Pitrou94e16962018-01-16 00:27:16 +0100458 pass
459 else:
460 break
461 if val == sentinel:
462 return
463 results.append(val)
464
465 def run_threads(self, n_feeders, n_consumers, q, inputs,
466 feed_func, consume_func):
467 results = []
468 sentinel = None
469 seq = inputs + [sentinel] * n_consumers
470 seq.reverse()
471 rnd = random.Random(42)
472
473 exceptions = []
474 def log_exceptions(f):
475 def wrapper(*args, **kwargs):
476 try:
477 f(*args, **kwargs)
478 except BaseException as e:
479 exceptions.append(e)
480 return wrapper
481
482 feeders = [threading.Thread(target=log_exceptions(feed_func),
483 args=(q, seq, rnd))
484 for i in range(n_feeders)]
485 consumers = [threading.Thread(target=log_exceptions(consume_func),
486 args=(q, results, sentinel))
487 for i in range(n_consumers)]
488
Hai Shie80697d2020-05-28 06:10:27 +0800489 with threading_helper.start_threads(feeders + consumers):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100490 pass
491
492 self.assertFalse(exceptions)
493 self.assertTrue(q.empty())
494 self.assertEqual(q.qsize(), 0)
495
496 return results
497
498 def test_basic(self):
499 # Basic tests for get(), put() etc.
500 q = self.q
501 self.assertTrue(q.empty())
502 self.assertEqual(q.qsize(), 0)
503 q.put(1)
504 self.assertFalse(q.empty())
505 self.assertEqual(q.qsize(), 1)
506 q.put(2)
507 q.put_nowait(3)
508 q.put(4)
509 self.assertFalse(q.empty())
510 self.assertEqual(q.qsize(), 4)
511
512 self.assertEqual(q.get(), 1)
513 self.assertEqual(q.qsize(), 3)
514
515 self.assertEqual(q.get_nowait(), 2)
516 self.assertEqual(q.qsize(), 2)
517
518 self.assertEqual(q.get(block=False), 3)
519 self.assertFalse(q.empty())
520 self.assertEqual(q.qsize(), 1)
521
522 self.assertEqual(q.get(timeout=0.1), 4)
523 self.assertTrue(q.empty())
524 self.assertEqual(q.qsize(), 0)
525
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100526 with self.assertRaises(self.queue.Empty):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100527 q.get(block=False)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100528 with self.assertRaises(self.queue.Empty):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100529 q.get(timeout=1e-3)
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100530 with self.assertRaises(self.queue.Empty):
Antoine Pitrou94e16962018-01-16 00:27:16 +0100531 q.get_nowait()
532 self.assertTrue(q.empty())
533 self.assertEqual(q.qsize(), 0)
534
535 def test_negative_timeout_raises_exception(self):
536 q = self.q
537 q.put(1)
538 with self.assertRaises(ValueError):
539 q.get(timeout=-1)
540
541 def test_order(self):
542 # Test a pair of concurrent put() and get()
543 q = self.q
544 inputs = list(range(100))
545 results = self.run_threads(1, 1, q, inputs, self.feed, self.consume)
546
547 # One producer, one consumer => results appended in well-defined order
548 self.assertEqual(results, inputs)
549
550 def test_many_threads(self):
551 # Test multiple concurrent put() and get()
552 N = 50
553 q = self.q
554 inputs = list(range(10000))
555 results = self.run_threads(N, N, q, inputs, self.feed, self.consume)
556
557 # Multiple consumers without synchronization append the
558 # results in random order
559 self.assertEqual(sorted(results), inputs)
560
561 def test_many_threads_nonblock(self):
562 # Test multiple concurrent put() and get(block=False)
563 N = 50
564 q = self.q
565 inputs = list(range(10000))
566 results = self.run_threads(N, N, q, inputs,
567 self.feed, self.consume_nonblock)
568
569 self.assertEqual(sorted(results), inputs)
570
571 def test_many_threads_timeout(self):
572 # Test multiple concurrent put() and get(timeout=...)
573 N = 50
574 q = self.q
575 inputs = list(range(1000))
576 results = self.run_threads(N, N, q, inputs,
577 self.feed, self.consume_timeout)
578
579 self.assertEqual(sorted(results), inputs)
580
581 def test_references(self):
582 # The queue should lose references to each item as soon as
583 # it leaves the queue.
584 class C:
585 pass
586
587 N = 20
588 q = self.q
589 for i in range(N):
590 q.put(C())
591 for i in range(N):
592 wr = weakref.ref(q.get())
593 self.assertIsNone(wr())
594
595
596class PySimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100597
598 queue = py_queue
599 def setUp(self):
600 self.type2test = self.queue._PySimpleQueue
601 super().setUp()
Antoine Pitrou94e16962018-01-16 00:27:16 +0100602
603
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100604@need_c_queue
Antoine Pitrou94e16962018-01-16 00:27:16 +0100605class CSimpleQueueTest(BaseSimpleQueueTest, unittest.TestCase):
606
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100607 queue = c_queue
608
Antoine Pitrou94e16962018-01-16 00:27:16 +0100609 def setUp(self):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100610 self.type2test = self.queue.SimpleQueue
Antoine Pitrou94e16962018-01-16 00:27:16 +0100611 super().setUp()
612
613 def test_is_default(self):
Pablo Galindo3f5b9082019-06-25 02:53:30 +0100614 self.assertIs(self.type2test, self.queue.SimpleQueue)
615 self.assertIs(self.type2test, self.queue.SimpleQueue)
Antoine Pitrou94e16962018-01-16 00:27:16 +0100616
617 def test_reentrancy(self):
618 # bpo-14976: put() may be called reentrantly in an asynchronous
619 # callback.
620 q = self.q
621 gen = itertools.count()
622 N = 10000
623 results = []
624
625 # This test exploits the fact that __del__ in a reference cycle
626 # can be called any time the GC may run.
627
628 class Circular(object):
629 def __init__(self):
630 self.circular = self
631
632 def __del__(self):
633 q.put(next(gen))
634
635 while True:
636 o = Circular()
637 q.put(next(gen))
638 del o
639 results.append(q.get())
640 if results[-1] >= N:
641 break
642
643 self.assertEqual(results, list(range(N + 1)))
644
645
Georg Brandl0e3b0ec2008-02-05 18:48:51 +0000646if __name__ == "__main__":
Zachary Ware38c707e2015-04-13 15:00:43 -0500647 unittest.main()