blob: 8cf11bd4d01df0dfe8c31eb4e6acb27f9e1bb939 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A test file for the `multiprocessing` package
3#
4
5import time, sys, random
6from Queue import Empty
7
8import multiprocessing # may get overwritten
9
10
11#### TEST_VALUE
12
13def value_func(running, mutex):
14 random.seed()
15 time.sleep(random.random()*4)
16
17 mutex.acquire()
18 print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
19 running.value -= 1
20 mutex.release()
21
22def test_value():
23 TASKS = 10
24 running = multiprocessing.Value('i', TASKS)
25 mutex = multiprocessing.Lock()
26
27 for i in range(TASKS):
28 p = multiprocessing.Process(target=value_func, args=(running, mutex))
29 p.start()
30
31 while running.value > 0:
32 time.sleep(0.08)
33 mutex.acquire()
34 print running.value,
35 sys.stdout.flush()
36 mutex.release()
37
38 print
39 print 'No more running processes'
40
41
42#### TEST_QUEUE
43
44def queue_func(queue):
45 for i in range(30):
46 time.sleep(0.5 * random.random())
47 queue.put(i*i)
48 queue.put('STOP')
49
50def test_queue():
51 q = multiprocessing.Queue()
52
53 p = multiprocessing.Process(target=queue_func, args=(q,))
54 p.start()
55
56 o = None
57 while o != 'STOP':
58 try:
59 o = q.get(timeout=0.3)
60 print o,
61 sys.stdout.flush()
62 except Empty:
63 print 'TIMEOUT'
64
65 print
66
67
68#### TEST_CONDITION
69
70def condition_func(cond):
71 cond.acquire()
72 print '\t' + str(cond)
73 time.sleep(2)
74 print '\tchild is notifying'
75 print '\t' + str(cond)
76 cond.notify()
77 cond.release()
78
79def test_condition():
80 cond = multiprocessing.Condition()
81
82 p = multiprocessing.Process(target=condition_func, args=(cond,))
83 print cond
84
85 cond.acquire()
86 print cond
87 cond.acquire()
88 print cond
89
90 p.start()
91
92 print 'main is waiting'
93 cond.wait()
94 print 'main has woken up'
95
96 print cond
97 cond.release()
98 print cond
99 cond.release()
100
101 p.join()
102 print cond
103
104
105#### TEST_SEMAPHORE
106
107def semaphore_func(sema, mutex, running):
108 sema.acquire()
109
110 mutex.acquire()
111 running.value += 1
112 print running.value, 'tasks are running'
113 mutex.release()
114
115 random.seed()
116 time.sleep(random.random()*2)
117
118 mutex.acquire()
119 running.value -= 1
120 print '%s has finished' % multiprocessing.current_process()
121 mutex.release()
122
123 sema.release()
124
125def test_semaphore():
126 sema = multiprocessing.Semaphore(3)
127 mutex = multiprocessing.RLock()
128 running = multiprocessing.Value('i', 0)
129
130 processes = [
131 multiprocessing.Process(target=semaphore_func,
132 args=(sema, mutex, running))
133 for i in range(10)
134 ]
135
136 for p in processes:
137 p.start()
138
139 for p in processes:
140 p.join()
141
142
143#### TEST_JOIN_TIMEOUT
144
145def join_timeout_func():
146 print '\tchild sleeping'
147 time.sleep(5.5)
148 print '\n\tchild terminating'
149
150def test_join_timeout():
151 p = multiprocessing.Process(target=join_timeout_func)
152 p.start()
153
154 print 'waiting for process to finish'
155
156 while 1:
157 p.join(timeout=1)
158 if not p.is_alive():
159 break
160 print '.',
161 sys.stdout.flush()
162
163
164#### TEST_EVENT
165
166def event_func(event):
167 print '\t%r is waiting' % multiprocessing.current_process()
168 event.wait()
169 print '\t%r has woken up' % multiprocessing.current_process()
170
171def test_event():
172 event = multiprocessing.Event()
173
174 processes = [multiprocessing.Process(target=event_func, args=(event,))
175 for i in range(5)]
176
177 for p in processes:
178 p.start()
179
180 print 'main is sleeping'
181 time.sleep(2)
182
183 print 'main is setting event'
184 event.set()
185
186 for p in processes:
187 p.join()
188
189
190#### TEST_SHAREDVALUES
191
192def sharedvalues_func(values, arrays, shared_values, shared_arrays):
193 for i in range(len(values)):
194 v = values[i][1]
195 sv = shared_values[i].value
196 assert v == sv
197
198 for i in range(len(values)):
199 a = arrays[i][1]
200 sa = list(shared_arrays[i][:])
201 assert a == sa
202
203 print 'Tests passed'
204
205def test_sharedvalues():
206 values = [
207 ('i', 10),
208 ('h', -2),
209 ('d', 1.25)
210 ]
211 arrays = [
212 ('i', range(100)),
213 ('d', [0.25 * i for i in range(100)]),
214 ('H', range(1000))
215 ]
216
217 shared_values = [multiprocessing.Value(id, v) for id, v in values]
218 shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
219
220 p = multiprocessing.Process(
221 target=sharedvalues_func,
222 args=(values, arrays, shared_values, shared_arrays)
223 )
224 p.start()
225 p.join()
226
227 assert p.get_exitcode() == 0
228
229
230####
231
232def test(namespace=multiprocessing):
233 global multiprocessing
234
235 multiprocessing = namespace
236
237 for func in [ test_value, test_queue, test_condition,
238 test_semaphore, test_join_timeout, test_event,
239 test_sharedvalues ]:
240
241 print '\n\t######## %s\n' % func.__name__
242 func()
243
244 ignore = multiprocessing.active_children() # cleanup any old processes
245 if hasattr(multiprocessing, '_debug_info'):
246 info = multiprocessing._debug_info()
247 if info:
248 print info
249 raise ValueError, 'there should be no positive refcounts left'
250
251
252if __name__ == '__main__':
253 multiprocessing.freeze_support()
254
255 assert len(sys.argv) in (1, 2)
256
257 if len(sys.argv) == 1 or sys.argv[1] == 'processes':
258 print ' Using processes '.center(79, '-')
259 namespace = multiprocessing
260 elif sys.argv[1] == 'manager':
261 print ' Using processes and a manager '.center(79, '-')
262 namespace = multiprocessing.Manager()
263 namespace.Process = multiprocessing.Process
264 namespace.current_process = multiprocessing.current_process
265 namespace.active_children = multiprocessing.active_children
266 elif sys.argv[1] == 'threads':
267 print ' Using threads '.center(79, '-')
268 import multiprocessing.dummy as namespace
269 else:
270 print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
271 raise SystemExit, 2
272
273 test(namespace)