blob: 2f43ad8d95712675debe8ac7712974bb1ed81b56 [file] [log] [blame]
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001#
2# A test file for the `multiprocessing` package
3#
Christian Heimeseac80712008-11-28 19:33:33 +00004# Copyright (c) 2006-2008, R Oudkerk
5# All rights reserved.
6#
Benjamin Peterson190d56e2008-06-11 02:40:25 +00007
8import time, sys, random
9from Queue import Empty
10
11import multiprocessing # may get overwritten
12
13
14#### TEST_VALUE
15
16def value_func(running, mutex):
17 random.seed()
18 time.sleep(random.random()*4)
19
20 mutex.acquire()
21 print '\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished'
22 running.value -= 1
23 mutex.release()
24
25def test_value():
26 TASKS = 10
27 running = multiprocessing.Value('i', TASKS)
28 mutex = multiprocessing.Lock()
29
30 for i in range(TASKS):
31 p = multiprocessing.Process(target=value_func, args=(running, mutex))
32 p.start()
33
34 while running.value > 0:
35 time.sleep(0.08)
36 mutex.acquire()
37 print running.value,
38 sys.stdout.flush()
39 mutex.release()
40
41 print
42 print 'No more running processes'
43
44
45#### TEST_QUEUE
46
47def queue_func(queue):
48 for i in range(30):
49 time.sleep(0.5 * random.random())
50 queue.put(i*i)
51 queue.put('STOP')
52
53def test_queue():
54 q = multiprocessing.Queue()
55
56 p = multiprocessing.Process(target=queue_func, args=(q,))
57 p.start()
58
59 o = None
60 while o != 'STOP':
61 try:
62 o = q.get(timeout=0.3)
63 print o,
64 sys.stdout.flush()
65 except Empty:
66 print 'TIMEOUT'
67
68 print
69
70
71#### TEST_CONDITION
72
73def condition_func(cond):
74 cond.acquire()
75 print '\t' + str(cond)
76 time.sleep(2)
77 print '\tchild is notifying'
78 print '\t' + str(cond)
79 cond.notify()
80 cond.release()
81
82def test_condition():
83 cond = multiprocessing.Condition()
84
85 p = multiprocessing.Process(target=condition_func, args=(cond,))
86 print cond
87
88 cond.acquire()
89 print cond
90 cond.acquire()
91 print cond
92
93 p.start()
94
95 print 'main is waiting'
96 cond.wait()
97 print 'main has woken up'
98
99 print cond
100 cond.release()
101 print cond
102 cond.release()
103
104 p.join()
105 print cond
106
107
108#### TEST_SEMAPHORE
109
110def semaphore_func(sema, mutex, running):
111 sema.acquire()
112
113 mutex.acquire()
114 running.value += 1
115 print running.value, 'tasks are running'
116 mutex.release()
117
118 random.seed()
119 time.sleep(random.random()*2)
120
121 mutex.acquire()
122 running.value -= 1
123 print '%s has finished' % multiprocessing.current_process()
124 mutex.release()
125
126 sema.release()
127
128def test_semaphore():
129 sema = multiprocessing.Semaphore(3)
130 mutex = multiprocessing.RLock()
131 running = multiprocessing.Value('i', 0)
132
133 processes = [
134 multiprocessing.Process(target=semaphore_func,
135 args=(sema, mutex, running))
136 for i in range(10)
137 ]
138
139 for p in processes:
140 p.start()
141
142 for p in processes:
143 p.join()
144
145
146#### TEST_JOIN_TIMEOUT
147
148def join_timeout_func():
149 print '\tchild sleeping'
150 time.sleep(5.5)
151 print '\n\tchild terminating'
152
153def test_join_timeout():
154 p = multiprocessing.Process(target=join_timeout_func)
155 p.start()
156
157 print 'waiting for process to finish'
158
159 while 1:
160 p.join(timeout=1)
161 if not p.is_alive():
162 break
163 print '.',
164 sys.stdout.flush()
165
166
167#### TEST_EVENT
168
169def event_func(event):
170 print '\t%r is waiting' % multiprocessing.current_process()
171 event.wait()
172 print '\t%r has woken up' % multiprocessing.current_process()
173
174def test_event():
175 event = multiprocessing.Event()
176
177 processes = [multiprocessing.Process(target=event_func, args=(event,))
178 for i in range(5)]
179
180 for p in processes:
181 p.start()
182
183 print 'main is sleeping'
184 time.sleep(2)
185
186 print 'main is setting event'
187 event.set()
188
189 for p in processes:
190 p.join()
191
192
193#### TEST_SHAREDVALUES
194
195def sharedvalues_func(values, arrays, shared_values, shared_arrays):
196 for i in range(len(values)):
197 v = values[i][1]
198 sv = shared_values[i].value
199 assert v == sv
200
201 for i in range(len(values)):
202 a = arrays[i][1]
203 sa = list(shared_arrays[i][:])
204 assert a == sa
205
206 print 'Tests passed'
207
208def test_sharedvalues():
209 values = [
210 ('i', 10),
211 ('h', -2),
212 ('d', 1.25)
213 ]
214 arrays = [
215 ('i', range(100)),
216 ('d', [0.25 * i for i in range(100)]),
217 ('H', range(1000))
218 ]
219
220 shared_values = [multiprocessing.Value(id, v) for id, v in values]
221 shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
222
223 p = multiprocessing.Process(
224 target=sharedvalues_func,
225 args=(values, arrays, shared_values, shared_arrays)
226 )
227 p.start()
228 p.join()
229
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000230 assert p.exitcode == 0
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000231
232
233####
234
235def test(namespace=multiprocessing):
236 global multiprocessing
237
238 multiprocessing = namespace
239
240 for func in [ test_value, test_queue, test_condition,
241 test_semaphore, test_join_timeout, test_event,
242 test_sharedvalues ]:
243
244 print '\n\t######## %s\n' % func.__name__
245 func()
246
247 ignore = multiprocessing.active_children() # cleanup any old processes
248 if hasattr(multiprocessing, '_debug_info'):
249 info = multiprocessing._debug_info()
250 if info:
251 print info
252 raise ValueError, 'there should be no positive refcounts left'
253
254
255if __name__ == '__main__':
256 multiprocessing.freeze_support()
257
258 assert len(sys.argv) in (1, 2)
259
260 if len(sys.argv) == 1 or sys.argv[1] == 'processes':
261 print ' Using processes '.center(79, '-')
262 namespace = multiprocessing
263 elif sys.argv[1] == 'manager':
264 print ' Using processes and a manager '.center(79, '-')
265 namespace = multiprocessing.Manager()
266 namespace.Process = multiprocessing.Process
267 namespace.current_process = multiprocessing.current_process
268 namespace.active_children = multiprocessing.active_children
269 elif sys.argv[1] == 'threads':
270 print ' Using threads '.center(79, '-')
271 import multiprocessing.dummy as namespace
272 else:
273 print 'Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]
274 raise SystemExit, 2
275
276 test(namespace)