blob: fd393f28881fd2f3d400d395be420355b24b58e2 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A test file for the `multiprocessing` package
3#
Benjamin Peterson4469d0c2008-11-30 22:46:23 +00004# Copyright (c) 2006-2008, R Oudkerk
5# All rights reserved.
6#
Benjamin Petersone711caf2008-06-11 16:44:04 +00007
8import time, sys, random
Christian Heimesaae1b702008-11-28 11:23:26 +00009from queue import Empty
Benjamin Petersone711caf2008-06-11 16:44:04 +000010
11import multiprocessing # may get overwritten
12
13
14#### TEST_VALUE
15
16def value_func(running, mutex):
17 random.seed()
18 time.sleep(random.random()*4)
19
20 mutex.acquire()
Christian Heimesaae1b702008-11-28 11:23:26 +000021 print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished')
Benjamin Petersone711caf2008-06-11 16:44:04 +000022 running.value -= 1
23 mutex.release()
24
25def test_value():
26 TASKS = 10
27 running = multiprocessing.Value('i', TASKS)
28 mutex = multiprocessing.Lock()
29
30 for i in range(TASKS):
31 p = multiprocessing.Process(target=value_func, args=(running, mutex))
32 p.start()
33
34 while running.value > 0:
35 time.sleep(0.08)
36 mutex.acquire()
Christian Heimesaae1b702008-11-28 11:23:26 +000037 print(running.value, end=' ')
Benjamin Petersone711caf2008-06-11 16:44:04 +000038 sys.stdout.flush()
39 mutex.release()
40
Christian Heimesaae1b702008-11-28 11:23:26 +000041 print()
42 print('No more running processes')
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
44
45#### TEST_QUEUE
46
47def queue_func(queue):
48 for i in range(30):
49 time.sleep(0.5 * random.random())
50 queue.put(i*i)
51 queue.put('STOP')
52
53def test_queue():
54 q = multiprocessing.Queue()
55
56 p = multiprocessing.Process(target=queue_func, args=(q,))
57 p.start()
58
59 o = None
60 while o != 'STOP':
61 try:
62 o = q.get(timeout=0.3)
Christian Heimesaae1b702008-11-28 11:23:26 +000063 print(o, end=' ')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064 sys.stdout.flush()
65 except Empty:
Christian Heimesaae1b702008-11-28 11:23:26 +000066 print('TIMEOUT')
Benjamin Petersone711caf2008-06-11 16:44:04 +000067
Christian Heimesaae1b702008-11-28 11:23:26 +000068 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +000069
70
71#### TEST_CONDITION
72
73def condition_func(cond):
74 cond.acquire()
Christian Heimesaae1b702008-11-28 11:23:26 +000075 print('\t' + str(cond))
Benjamin Petersone711caf2008-06-11 16:44:04 +000076 time.sleep(2)
Christian Heimesaae1b702008-11-28 11:23:26 +000077 print('\tchild is notifying')
78 print('\t' + str(cond))
Benjamin Petersone711caf2008-06-11 16:44:04 +000079 cond.notify()
80 cond.release()
81
82def test_condition():
83 cond = multiprocessing.Condition()
84
85 p = multiprocessing.Process(target=condition_func, args=(cond,))
Christian Heimesaae1b702008-11-28 11:23:26 +000086 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +000087
88 cond.acquire()
Christian Heimesaae1b702008-11-28 11:23:26 +000089 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +000090 cond.acquire()
Christian Heimesaae1b702008-11-28 11:23:26 +000091 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +000092
93 p.start()
94
Christian Heimesaae1b702008-11-28 11:23:26 +000095 print('main is waiting')
Benjamin Petersone711caf2008-06-11 16:44:04 +000096 cond.wait()
Christian Heimesaae1b702008-11-28 11:23:26 +000097 print('main has woken up')
Benjamin Petersone711caf2008-06-11 16:44:04 +000098
Christian Heimesaae1b702008-11-28 11:23:26 +000099 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000100 cond.release()
Christian Heimesaae1b702008-11-28 11:23:26 +0000101 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102 cond.release()
103
104 p.join()
Christian Heimesaae1b702008-11-28 11:23:26 +0000105 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000106
107
108#### TEST_SEMAPHORE
109
110def semaphore_func(sema, mutex, running):
111 sema.acquire()
112
113 mutex.acquire()
114 running.value += 1
Christian Heimesaae1b702008-11-28 11:23:26 +0000115 print(running.value, 'tasks are running')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000116 mutex.release()
117
118 random.seed()
119 time.sleep(random.random()*2)
120
121 mutex.acquire()
122 running.value -= 1
Christian Heimesaae1b702008-11-28 11:23:26 +0000123 print('%s has finished' % multiprocessing.current_process())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000124 mutex.release()
125
126 sema.release()
127
128def test_semaphore():
129 sema = multiprocessing.Semaphore(3)
130 mutex = multiprocessing.RLock()
131 running = multiprocessing.Value('i', 0)
132
133 processes = [
134 multiprocessing.Process(target=semaphore_func,
135 args=(sema, mutex, running))
136 for i in range(10)
137 ]
138
139 for p in processes:
140 p.start()
141
142 for p in processes:
143 p.join()
144
145
146#### TEST_JOIN_TIMEOUT
147
148def join_timeout_func():
Christian Heimesaae1b702008-11-28 11:23:26 +0000149 print('\tchild sleeping')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000150 time.sleep(5.5)
Christian Heimesaae1b702008-11-28 11:23:26 +0000151 print('\n\tchild terminating')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152
153def test_join_timeout():
154 p = multiprocessing.Process(target=join_timeout_func)
155 p.start()
156
Christian Heimesaae1b702008-11-28 11:23:26 +0000157 print('waiting for process to finish')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158
159 while 1:
160 p.join(timeout=1)
161 if not p.is_alive():
162 break
Christian Heimesaae1b702008-11-28 11:23:26 +0000163 print('.', end=' ')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000164 sys.stdout.flush()
165
166
167#### TEST_EVENT
168
169def event_func(event):
Christian Heimesaae1b702008-11-28 11:23:26 +0000170 print('\t%r is waiting' % multiprocessing.current_process())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000171 event.wait()
Christian Heimesaae1b702008-11-28 11:23:26 +0000172 print('\t%r has woken up' % multiprocessing.current_process())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173
174def test_event():
175 event = multiprocessing.Event()
176
177 processes = [multiprocessing.Process(target=event_func, args=(event,))
178 for i in range(5)]
179
180 for p in processes:
181 p.start()
182
Christian Heimesaae1b702008-11-28 11:23:26 +0000183 print('main is sleeping')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000184 time.sleep(2)
185
Christian Heimesaae1b702008-11-28 11:23:26 +0000186 print('main is setting event')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000187 event.set()
188
189 for p in processes:
190 p.join()
191
192
193#### TEST_SHAREDVALUES
194
195def sharedvalues_func(values, arrays, shared_values, shared_arrays):
196 for i in range(len(values)):
197 v = values[i][1]
198 sv = shared_values[i].value
199 assert v == sv
200
201 for i in range(len(values)):
202 a = arrays[i][1]
203 sa = list(shared_arrays[i][:])
204 assert a == sa
205
Christian Heimesaae1b702008-11-28 11:23:26 +0000206 print('Tests passed')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207
208def test_sharedvalues():
209 values = [
210 ('i', 10),
211 ('h', -2),
212 ('d', 1.25)
213 ]
214 arrays = [
Christian Heimesaae1b702008-11-28 11:23:26 +0000215 ('i', list(range(100))),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216 ('d', [0.25 * i for i in range(100)]),
Christian Heimesaae1b702008-11-28 11:23:26 +0000217 ('H', list(range(1000)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 ]
219
220 shared_values = [multiprocessing.Value(id, v) for id, v in values]
221 shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
222
223 p = multiprocessing.Process(
224 target=sharedvalues_func,
225 args=(values, arrays, shared_values, shared_arrays)
226 )
227 p.start()
228 p.join()
229
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 assert p.exitcode == 0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
232
233####
234
235def test(namespace=multiprocessing):
236 global multiprocessing
237
238 multiprocessing = namespace
239
240 for func in [ test_value, test_queue, test_condition,
241 test_semaphore, test_join_timeout, test_event,
242 test_sharedvalues ]:
243
Christian Heimesaae1b702008-11-28 11:23:26 +0000244 print('\n\t######## %s\n' % func.__name__)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245 func()
246
247 ignore = multiprocessing.active_children() # cleanup any old processes
248 if hasattr(multiprocessing, '_debug_info'):
249 info = multiprocessing._debug_info()
250 if info:
Christian Heimesaae1b702008-11-28 11:23:26 +0000251 print(info)
252 raise ValueError('there should be no positive refcounts left')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000253
254
255if __name__ == '__main__':
256 multiprocessing.freeze_support()
257
258 assert len(sys.argv) in (1, 2)
259
260 if len(sys.argv) == 1 or sys.argv[1] == 'processes':
Christian Heimesaae1b702008-11-28 11:23:26 +0000261 print(' Using processes '.center(79, '-'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262 namespace = multiprocessing
263 elif sys.argv[1] == 'manager':
Christian Heimesaae1b702008-11-28 11:23:26 +0000264 print(' Using processes and a manager '.center(79, '-'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265 namespace = multiprocessing.Manager()
266 namespace.Process = multiprocessing.Process
267 namespace.current_process = multiprocessing.current_process
268 namespace.active_children = multiprocessing.active_children
269 elif sys.argv[1] == 'threads':
Christian Heimesaae1b702008-11-28 11:23:26 +0000270 print(' Using threads '.center(79, '-'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271 import multiprocessing.dummy as namespace
272 else:
Christian Heimesaae1b702008-11-28 11:23:26 +0000273 print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0])
274 raise SystemExit(2)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275
276 test(namespace)