blob: 81dbc387fc4a65f3af5d20562fbb7e32abca629c [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A test file for the `multiprocessing` package
3#
Benjamin Peterson4469d0c2008-11-30 22:46:23 +00004# Copyright (c) 2006-2008, R Oudkerk
5# All rights reserved.
6#
Benjamin Petersone711caf2008-06-11 16:44:04 +00007
Raymond Hettinger40fc59d2011-04-26 13:55:55 -07008import time
9import sys
10import random
Christian Heimesaae1b702008-11-28 11:23:26 +000011from queue import Empty
Benjamin Petersone711caf2008-06-11 16:44:04 +000012
13import multiprocessing # may get overwritten
14
15
16#### TEST_VALUE
17
18def value_func(running, mutex):
19 random.seed()
20 time.sleep(random.random()*4)
21
22 mutex.acquire()
Christian Heimesaae1b702008-11-28 11:23:26 +000023 print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished')
Benjamin Petersone711caf2008-06-11 16:44:04 +000024 running.value -= 1
25 mutex.release()
26
27def test_value():
28 TASKS = 10
29 running = multiprocessing.Value('i', TASKS)
30 mutex = multiprocessing.Lock()
31
32 for i in range(TASKS):
33 p = multiprocessing.Process(target=value_func, args=(running, mutex))
34 p.start()
35
36 while running.value > 0:
37 time.sleep(0.08)
38 mutex.acquire()
Christian Heimesaae1b702008-11-28 11:23:26 +000039 print(running.value, end=' ')
Benjamin Petersone711caf2008-06-11 16:44:04 +000040 sys.stdout.flush()
41 mutex.release()
42
Christian Heimesaae1b702008-11-28 11:23:26 +000043 print()
44 print('No more running processes')
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
46
47#### TEST_QUEUE
48
49def queue_func(queue):
50 for i in range(30):
51 time.sleep(0.5 * random.random())
52 queue.put(i*i)
53 queue.put('STOP')
54
55def test_queue():
56 q = multiprocessing.Queue()
57
58 p = multiprocessing.Process(target=queue_func, args=(q,))
59 p.start()
60
61 o = None
62 while o != 'STOP':
63 try:
64 o = q.get(timeout=0.3)
Christian Heimesaae1b702008-11-28 11:23:26 +000065 print(o, end=' ')
Benjamin Petersone711caf2008-06-11 16:44:04 +000066 sys.stdout.flush()
67 except Empty:
Christian Heimesaae1b702008-11-28 11:23:26 +000068 print('TIMEOUT')
Benjamin Petersone711caf2008-06-11 16:44:04 +000069
Christian Heimesaae1b702008-11-28 11:23:26 +000070 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72
73#### TEST_CONDITION
74
75def condition_func(cond):
76 cond.acquire()
Christian Heimesaae1b702008-11-28 11:23:26 +000077 print('\t' + str(cond))
Benjamin Petersone711caf2008-06-11 16:44:04 +000078 time.sleep(2)
Christian Heimesaae1b702008-11-28 11:23:26 +000079 print('\tchild is notifying')
80 print('\t' + str(cond))
Benjamin Petersone711caf2008-06-11 16:44:04 +000081 cond.notify()
82 cond.release()
83
84def test_condition():
85 cond = multiprocessing.Condition()
86
87 p = multiprocessing.Process(target=condition_func, args=(cond,))
Christian Heimesaae1b702008-11-28 11:23:26 +000088 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +000089
90 cond.acquire()
Christian Heimesaae1b702008-11-28 11:23:26 +000091 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +000092 cond.acquire()
Christian Heimesaae1b702008-11-28 11:23:26 +000093 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +000094
95 p.start()
96
Christian Heimesaae1b702008-11-28 11:23:26 +000097 print('main is waiting')
Benjamin Petersone711caf2008-06-11 16:44:04 +000098 cond.wait()
Christian Heimesaae1b702008-11-28 11:23:26 +000099 print('main has woken up')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000100
Christian Heimesaae1b702008-11-28 11:23:26 +0000101 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102 cond.release()
Christian Heimesaae1b702008-11-28 11:23:26 +0000103 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000104 cond.release()
105
106 p.join()
Christian Heimesaae1b702008-11-28 11:23:26 +0000107 print(cond)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108
109
110#### TEST_SEMAPHORE
111
112def semaphore_func(sema, mutex, running):
113 sema.acquire()
114
115 mutex.acquire()
116 running.value += 1
Christian Heimesaae1b702008-11-28 11:23:26 +0000117 print(running.value, 'tasks are running')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000118 mutex.release()
119
120 random.seed()
121 time.sleep(random.random()*2)
122
123 mutex.acquire()
124 running.value -= 1
Christian Heimesaae1b702008-11-28 11:23:26 +0000125 print('%s has finished' % multiprocessing.current_process())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000126 mutex.release()
127
128 sema.release()
129
130def test_semaphore():
131 sema = multiprocessing.Semaphore(3)
132 mutex = multiprocessing.RLock()
133 running = multiprocessing.Value('i', 0)
134
135 processes = [
136 multiprocessing.Process(target=semaphore_func,
137 args=(sema, mutex, running))
138 for i in range(10)
139 ]
140
141 for p in processes:
142 p.start()
143
144 for p in processes:
145 p.join()
146
147
148#### TEST_JOIN_TIMEOUT
149
150def join_timeout_func():
Christian Heimesaae1b702008-11-28 11:23:26 +0000151 print('\tchild sleeping')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000152 time.sleep(5.5)
Christian Heimesaae1b702008-11-28 11:23:26 +0000153 print('\n\tchild terminating')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000154
155def test_join_timeout():
156 p = multiprocessing.Process(target=join_timeout_func)
157 p.start()
158
Christian Heimesaae1b702008-11-28 11:23:26 +0000159 print('waiting for process to finish')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160
161 while 1:
162 p.join(timeout=1)
163 if not p.is_alive():
164 break
Christian Heimesaae1b702008-11-28 11:23:26 +0000165 print('.', end=' ')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 sys.stdout.flush()
167
168
169#### TEST_EVENT
170
171def event_func(event):
Christian Heimesaae1b702008-11-28 11:23:26 +0000172 print('\t%r is waiting' % multiprocessing.current_process())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 event.wait()
Christian Heimesaae1b702008-11-28 11:23:26 +0000174 print('\t%r has woken up' % multiprocessing.current_process())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175
176def test_event():
177 event = multiprocessing.Event()
178
179 processes = [multiprocessing.Process(target=event_func, args=(event,))
180 for i in range(5)]
181
182 for p in processes:
183 p.start()
184
Christian Heimesaae1b702008-11-28 11:23:26 +0000185 print('main is sleeping')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 time.sleep(2)
187
Christian Heimesaae1b702008-11-28 11:23:26 +0000188 print('main is setting event')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000189 event.set()
190
191 for p in processes:
192 p.join()
193
194
195#### TEST_SHAREDVALUES
196
197def sharedvalues_func(values, arrays, shared_values, shared_arrays):
198 for i in range(len(values)):
199 v = values[i][1]
200 sv = shared_values[i].value
201 assert v == sv
202
203 for i in range(len(values)):
204 a = arrays[i][1]
205 sa = list(shared_arrays[i][:])
206 assert a == sa
207
Christian Heimesaae1b702008-11-28 11:23:26 +0000208 print('Tests passed')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210def test_sharedvalues():
211 values = [
212 ('i', 10),
213 ('h', -2),
214 ('d', 1.25)
215 ]
216 arrays = [
Christian Heimesaae1b702008-11-28 11:23:26 +0000217 ('i', list(range(100))),
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218 ('d', [0.25 * i for i in range(100)]),
Christian Heimesaae1b702008-11-28 11:23:26 +0000219 ('H', list(range(1000)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000220 ]
221
222 shared_values = [multiprocessing.Value(id, v) for id, v in values]
223 shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
224
225 p = multiprocessing.Process(
226 target=sharedvalues_func,
227 args=(values, arrays, shared_values, shared_arrays)
228 )
229 p.start()
230 p.join()
231
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000232 assert p.exitcode == 0
Benjamin Petersone711caf2008-06-11 16:44:04 +0000233
234
235####
236
237def test(namespace=multiprocessing):
238 global multiprocessing
239
240 multiprocessing = namespace
241
Raymond Hettinger40fc59d2011-04-26 13:55:55 -0700242 for func in [test_value, test_queue, test_condition,
243 test_semaphore, test_join_timeout, test_event,
244 test_sharedvalues]:
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245
Christian Heimesaae1b702008-11-28 11:23:26 +0000246 print('\n\t######## %s\n' % func.__name__)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247 func()
248
249 ignore = multiprocessing.active_children() # cleanup any old processes
250 if hasattr(multiprocessing, '_debug_info'):
251 info = multiprocessing._debug_info()
252 if info:
Christian Heimesaae1b702008-11-28 11:23:26 +0000253 print(info)
254 raise ValueError('there should be no positive refcounts left')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255
256
257if __name__ == '__main__':
258 multiprocessing.freeze_support()
259
260 assert len(sys.argv) in (1, 2)
261
262 if len(sys.argv) == 1 or sys.argv[1] == 'processes':
Christian Heimesaae1b702008-11-28 11:23:26 +0000263 print(' Using processes '.center(79, '-'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264 namespace = multiprocessing
265 elif sys.argv[1] == 'manager':
Christian Heimesaae1b702008-11-28 11:23:26 +0000266 print(' Using processes and a manager '.center(79, '-'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267 namespace = multiprocessing.Manager()
268 namespace.Process = multiprocessing.Process
269 namespace.current_process = multiprocessing.current_process
270 namespace.active_children = multiprocessing.active_children
271 elif sys.argv[1] == 'threads':
Christian Heimesaae1b702008-11-28 11:23:26 +0000272 print(' Using threads '.center(79, '-'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 import multiprocessing.dummy as namespace
274 else:
Christian Heimesaae1b702008-11-28 11:23:26 +0000275 print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0])
276 raise SystemExit(2)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
278 test(namespace)