blob: a2c744fe59f0c77435e31dc7d7232a7b326890af [file] [log] [blame]
Mark Hammond3b959db2002-04-19 00:11:32 +00001# Some simple Queue module tests, plus some failure conditions
2# to ensure the Queue locks remain stable
3import Queue
4import sys
5import threading
6import time
7
Barry Warsaw04f357c2002-07-23 19:04:11 +00008from test.test_support import verify, TestFailed, verbose
Mark Hammond3b959db2002-04-19 00:11:32 +00009
10queue_size = 5
11
12# Execute a function that blocks, and in a seperate thread, a function that
13# triggers the release. Returns the result of the blocking function.
14class _TriggerThread(threading.Thread):
15 def __init__(self, fn, args):
16 self.fn = fn
17 self.args = args
18 self.startedEvent = threading.Event()
19 threading.Thread.__init__(self)
20 def run(self):
21 time.sleep(.1)
22 self.startedEvent.set()
23 self.fn(*self.args)
24
25def _doBlockingTest( block_func, block_args, trigger_func, trigger_args):
26 t = _TriggerThread(trigger_func, trigger_args)
27 t.start()
28 try:
29 return block_func(*block_args)
30 finally:
31 # If we unblocked before our thread made the call, we failed!
32 if not t.startedEvent.isSet():
33 raise TestFailed("blocking function '%r' appeared not to block" % (block_func,))
34 t.join(1) # make sure the thread terminates
35 if t.isAlive():
36 raise TestFailed("trigger function '%r' appeared to not return" % (trigger_func,))
37
38# A Queue subclass that can provoke failure at a moment's notice :)
39class FailingQueueException(Exception):
40 pass
41
42class FailingQueue(Queue.Queue):
43 def __init__(self, *args):
44 self.fail_next_put = False
45 self.fail_next_get = False
46 Queue.Queue.__init__(self, *args)
47 def _put(self, item):
48 if self.fail_next_put:
49 self.fail_next_put = False
50 raise FailingQueueException, "You Lose"
51 return Queue.Queue._put(self, item)
52 def _get(self):
53 if self.fail_next_get:
54 self.fail_next_get = False
55 raise FailingQueueException, "You Lose"
56 return Queue.Queue._get(self)
57
58def FailingQueueTest(q):
59 if not q.empty():
60 raise RuntimeError, "Call this function with an empty queue"
61 for i in range(queue_size-1):
62 q.put(i)
63 q.fail_next_put = True
64 # Test a failing non-blocking put.
65 try:
66 q.put("oops", block=0)
67 raise TestFailed("The queue didn't fail when it should have")
68 except FailingQueueException:
69 pass
70 q.put("last")
71 verify(q.full(), "Queue should be full")
72 q.fail_next_put = True
73 # Test a failing blocking put
74 try:
75 _doBlockingTest( q.put, ("full",), q.get, ())
76 raise TestFailed("The queue didn't fail when it should have")
77 except FailingQueueException:
78 pass
79 # Check the Queue isn't damaged.
80 # put failed, but get succeeded - re-add
81 q.put("last")
82 verify(q.full(), "Queue should be full")
83 q.get()
84 verify(not q.full(), "Queue should not be full")
85 q.put("last")
86 verify(q.full(), "Queue should be full")
87 # Test a blocking put
88 _doBlockingTest( q.put, ("full",), q.get, ())
89 # Empty it
90 for i in range(queue_size):
91 q.get()
92 verify(q.empty(), "Queue should be empty")
93 q.put("first")
94 q.fail_next_get = True
95 try:
96 q.get()
97 raise TestFailed("The queue didn't fail when it should have")
98 except FailingQueueException:
99 pass
100 verify(not q.empty(), "Queue should not be empty")
101 q.get()
102 verify(q.empty(), "Queue should be empty")
103 q.fail_next_get = True
104 try:
105 _doBlockingTest( q.get, (), q.put, ('empty',))
106 raise TestFailed("The queue didn't fail when it should have")
107 except FailingQueueException:
108 pass
109 # put succeeded, but get failed.
110 verify(not q.empty(), "Queue should not be empty")
111 q.get()
112 verify(q.empty(), "Queue should be empty")
113
114def SimpleQueueTest(q):
115 if not q.empty():
116 raise RuntimeError, "Call this function with an empty queue"
117 # I guess we better check things actually queue correctly a little :)
118 q.put(111)
119 q.put(222)
120 verify(q.get()==111 and q.get()==222, "Didn't seem to queue the correct data!")
121 for i in range(queue_size-1):
122 q.put(i)
123 verify(not q.full(), "Queue should not be full")
124 q.put("last")
125 verify(q.full(), "Queue should be full")
126 try:
127 q.put("full", block=0)
128 raise TestFailed("Didn't appear to block with a full queue")
129 except Queue.Full:
130 pass
131 # Test a blocking put
132 _doBlockingTest( q.put, ("full",), q.get, ())
133 # Empty it
134 for i in range(queue_size):
135 q.get()
136 verify(q.empty(), "Queue should be empty")
137 try:
138 q.get(block=0)
139 raise TestFailed("Didn't appear to block with an empty queue")
140 except Queue.Empty:
141 pass
142 # Test a blocking get
143 _doBlockingTest( q.get, (), q.put, ('empty',))
144
145def test():
146 q=Queue.Queue(queue_size)
147 # Do it a couple of times on the same queue
148 SimpleQueueTest(q)
149 SimpleQueueTest(q)
150 if verbose:
151 print "Simple Queue tests seemed to work"
152 q = FailingQueue(queue_size)
153 FailingQueueTest(q)
154 FailingQueueTest(q)
155 if verbose:
156 print "Failing Queue tests seemed to work"
157
158test()