blob: e360703bd1e78178015d2d97bc350ac849277903 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A test of `multiprocessing.Pool` class
3#
Benjamin Peterson4469d0c2008-11-30 22:46:23 +00004# Copyright (c) 2006-2008, R Oudkerk
5# All rights reserved.
6#
Benjamin Petersone711caf2008-06-11 16:44:04 +00007
8import multiprocessing
9import time
10import random
11import sys
12
13#
14# Functions used by test code
15#
16
17def calculate(func, args):
18 result = func(*args)
19 return '%s says that %s%s = %s' % (
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +000020 multiprocessing.current_process().name,
Benjamin Petersone711caf2008-06-11 16:44:04 +000021 func.__name__, args, result
22 )
23
24def calculatestar(args):
25 return calculate(*args)
26
27def mul(a, b):
28 time.sleep(0.5*random.random())
29 return a * b
30
31def plus(a, b):
32 time.sleep(0.5*random.random())
33 return a + b
34
35def f(x):
36 return 1.0 / (x-5.0)
37
38def pow3(x):
39 return x**3
40
41def noop(x):
42 pass
43
44#
45# Test code
46#
47
48def test():
Christian Heimesaae1b702008-11-28 11:23:26 +000049 print('cpu_count() = %d\n' % multiprocessing.cpu_count())
Benjamin Petersone711caf2008-06-11 16:44:04 +000050
51 #
52 # Create pool
53 #
54
55 PROCESSES = 4
Christian Heimesaae1b702008-11-28 11:23:26 +000056 print('Creating pool with %d processes\n' % PROCESSES)
Benjamin Petersone711caf2008-06-11 16:44:04 +000057 pool = multiprocessing.Pool(PROCESSES)
Christian Heimesaae1b702008-11-28 11:23:26 +000058 print('pool = %s' % pool)
59 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +000060
61 #
62 # Tests
63 #
64
65 TASKS = [(mul, (i, 7)) for i in range(10)] + \
66 [(plus, (i, 8)) for i in range(10)]
67
68 results = [pool.apply_async(calculate, t) for t in TASKS]
69 imap_it = pool.imap(calculatestar, TASKS)
70 imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
71
Christian Heimesaae1b702008-11-28 11:23:26 +000072 print('Ordered results using pool.apply_async():')
Benjamin Petersone711caf2008-06-11 16:44:04 +000073 for r in results:
Christian Heimesaae1b702008-11-28 11:23:26 +000074 print('\t', r.get())
75 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +000076
Christian Heimesaae1b702008-11-28 11:23:26 +000077 print('Ordered results using pool.imap():')
Benjamin Petersone711caf2008-06-11 16:44:04 +000078 for x in imap_it:
Christian Heimesaae1b702008-11-28 11:23:26 +000079 print('\t', x)
80 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +000081
Christian Heimesaae1b702008-11-28 11:23:26 +000082 print('Unordered results using pool.imap_unordered():')
Benjamin Petersone711caf2008-06-11 16:44:04 +000083 for x in imap_unordered_it:
Christian Heimesaae1b702008-11-28 11:23:26 +000084 print('\t', x)
85 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +000086
Christian Heimesaae1b702008-11-28 11:23:26 +000087 print('Ordered results using pool.map() --- will block till complete:')
Benjamin Petersone711caf2008-06-11 16:44:04 +000088 for x in pool.map(calculatestar, TASKS):
Christian Heimesaae1b702008-11-28 11:23:26 +000089 print('\t', x)
90 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +000091
92 #
93 # Simple benchmarks
94 #
95
96 N = 100000
Christian Heimesaae1b702008-11-28 11:23:26 +000097 print('def pow3(x): return x**3')
Benjamin Petersone711caf2008-06-11 16:44:04 +000098
99 t = time.time()
Christian Heimesaae1b702008-11-28 11:23:26 +0000100 A = list(map(pow3, range(N)))
Georg Brandlc9a5a0e2009-09-01 07:34:27 +0000101 print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % \
Christian Heimesaae1b702008-11-28 11:23:26 +0000102 (N, time.time() - t))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103
104 t = time.time()
Christian Heimesaae1b702008-11-28 11:23:26 +0000105 B = pool.map(pow3, range(N))
Georg Brandlc9a5a0e2009-09-01 07:34:27 +0000106 print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % \
Christian Heimesaae1b702008-11-28 11:23:26 +0000107 (N, time.time() - t))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000108
109 t = time.time()
Christian Heimesaae1b702008-11-28 11:23:26 +0000110 C = list(pool.imap(pow3, range(N), chunksize=N//8))
Georg Brandlc9a5a0e2009-09-01 07:34:27 +0000111 print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' \
Christian Heimesaae1b702008-11-28 11:23:26 +0000112 ' seconds' % (N, N//8, time.time() - t))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000113
114 assert A == B == C, (len(A), len(B), len(C))
Christian Heimesaae1b702008-11-28 11:23:26 +0000115 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000116
117 L = [None] * 1000000
Christian Heimesaae1b702008-11-28 11:23:26 +0000118 print('def noop(x): pass')
119 print('L = [None] * 1000000')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000120
121 t = time.time()
Christian Heimesaae1b702008-11-28 11:23:26 +0000122 A = list(map(noop, L))
123 print('\tmap(noop, L):\n\t\t%s seconds' % \
124 (time.time() - t))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000125
126 t = time.time()
127 B = pool.map(noop, L)
Christian Heimesaae1b702008-11-28 11:23:26 +0000128 print('\tpool.map(noop, L):\n\t\t%s seconds' % \
129 (time.time() - t))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000130
131 t = time.time()
132 C = list(pool.imap(noop, L, chunksize=len(L)//8))
Christian Heimesaae1b702008-11-28 11:23:26 +0000133 print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
134 (len(L)//8, time.time() - t))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000135
136 assert A == B == C, (len(A), len(B), len(C))
Christian Heimesaae1b702008-11-28 11:23:26 +0000137 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000138
139 del A, B, C, L
140
141 #
142 # Test error handling
143 #
144
Christian Heimesaae1b702008-11-28 11:23:26 +0000145 print('Testing error handling:')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146
147 try:
Christian Heimesaae1b702008-11-28 11:23:26 +0000148 print(pool.apply(f, (5,)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149 except ZeroDivisionError:
Christian Heimesaae1b702008-11-28 11:23:26 +0000150 print('\tGot ZeroDivisionError as expected from pool.apply()')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000151 else:
Christian Heimesaae1b702008-11-28 11:23:26 +0000152 raise AssertionError('expected ZeroDivisionError')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000153
154 try:
Christian Heimesaae1b702008-11-28 11:23:26 +0000155 print(pool.map(f, list(range(10))))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000156 except ZeroDivisionError:
Christian Heimesaae1b702008-11-28 11:23:26 +0000157 print('\tGot ZeroDivisionError as expected from pool.map()')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000158 else:
Christian Heimesaae1b702008-11-28 11:23:26 +0000159 raise AssertionError('expected ZeroDivisionError')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160
161 try:
Christian Heimesaae1b702008-11-28 11:23:26 +0000162 print(list(pool.imap(f, list(range(10)))))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163 except ZeroDivisionError:
Christian Heimesaae1b702008-11-28 11:23:26 +0000164 print('\tGot ZeroDivisionError as expected from list(pool.imap())')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000165 else:
Christian Heimesaae1b702008-11-28 11:23:26 +0000166 raise AssertionError('expected ZeroDivisionError')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000167
Christian Heimesaae1b702008-11-28 11:23:26 +0000168 it = pool.imap(f, list(range(10)))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169 for i in range(10):
170 try:
Christian Heimesaae1b702008-11-28 11:23:26 +0000171 x = next(it)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172 except ZeroDivisionError:
173 if i == 5:
174 pass
175 except StopIteration:
176 break
177 else:
178 if i == 5:
Christian Heimesaae1b702008-11-28 11:23:26 +0000179 raise AssertionError('expected ZeroDivisionError')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000180
181 assert i == 9
Christian Heimesaae1b702008-11-28 11:23:26 +0000182 print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
183 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000184
185 #
186 # Testing timeouts
187 #
188
Christian Heimesaae1b702008-11-28 11:23:26 +0000189 print('Testing ApplyResult.get() with timeout:', end=' ')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000190 res = pool.apply_async(calculate, TASKS[0])
191 while 1:
192 sys.stdout.flush()
193 try:
194 sys.stdout.write('\n\t%s' % res.get(0.02))
195 break
196 except multiprocessing.TimeoutError:
197 sys.stdout.write('.')
Christian Heimesaae1b702008-11-28 11:23:26 +0000198 print()
199 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000200
Christian Heimesaae1b702008-11-28 11:23:26 +0000201 print('Testing IMapIterator.next() with timeout:', end=' ')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202 it = pool.imap(calculatestar, TASKS)
203 while 1:
204 sys.stdout.flush()
205 try:
206 sys.stdout.write('\n\t%s' % it.next(0.02))
207 except StopIteration:
208 break
209 except multiprocessing.TimeoutError:
210 sys.stdout.write('.')
Christian Heimesaae1b702008-11-28 11:23:26 +0000211 print()
212 print()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213
214 #
215 # Testing callback
216 #
217
Christian Heimesaae1b702008-11-28 11:23:26 +0000218 print('Testing callback:')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000219
220 A = []
221 B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
222
223 r = pool.apply_async(mul, (7, 8), callback=A.append)
224 r.wait()
225
Christian Heimesaae1b702008-11-28 11:23:26 +0000226 r = pool.map_async(pow3, list(range(10)), callback=A.extend)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227 r.wait()
228
229 if A == B:
Christian Heimesaae1b702008-11-28 11:23:26 +0000230 print('\tcallbacks succeeded\n')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231 else:
Christian Heimesaae1b702008-11-28 11:23:26 +0000232 print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000233
234 #
235 # Check there are no outstanding tasks
236 #
237
238 assert not pool._cache, 'cache = %r' % pool._cache
239
240 #
241 # Check close() methods
242 #
243
Christian Heimesaae1b702008-11-28 11:23:26 +0000244 print('Testing close():')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000245
246 for worker in pool._pool:
247 assert worker.is_alive()
248
249 result = pool.apply_async(time.sleep, [0.5])
250 pool.close()
251 pool.join()
252
253 assert result.get() is None
254
255 for worker in pool._pool:
256 assert not worker.is_alive()
257
Christian Heimesaae1b702008-11-28 11:23:26 +0000258 print('\tclose() succeeded\n')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259
260 #
261 # Check terminate() method
262 #
263
Christian Heimesaae1b702008-11-28 11:23:26 +0000264 print('Testing terminate():')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265
266 pool = multiprocessing.Pool(2)
267 DELTA = 0.1
268 ignore = pool.apply(pow3, [2])
269 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
270 pool.terminate()
271 pool.join()
272
273 for worker in pool._pool:
274 assert not worker.is_alive()
275
Christian Heimesaae1b702008-11-28 11:23:26 +0000276 print('\tterminate() succeeded\n')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
278 #
279 # Check garbage collection
280 #
281
Christian Heimesaae1b702008-11-28 11:23:26 +0000282 print('Testing garbage collection:')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000283
284 pool = multiprocessing.Pool(2)
285 DELTA = 0.1
286 processes = pool._pool
287 ignore = pool.apply(pow3, [2])
288 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
289
290 results = pool = None
291
292 time.sleep(DELTA * 2)
293
294 for worker in processes:
295 assert not worker.is_alive()
296
Christian Heimesaae1b702008-11-28 11:23:26 +0000297 print('\tgarbage collection succeeded\n')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000298
299
300if __name__ == '__main__':
301 multiprocessing.freeze_support()
302
303 assert len(sys.argv) in (1, 2)
304
305 if len(sys.argv) == 1 or sys.argv[1] == 'processes':
Christian Heimesaae1b702008-11-28 11:23:26 +0000306 print(' Using processes '.center(79, '-'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307 elif sys.argv[1] == 'threads':
Christian Heimesaae1b702008-11-28 11:23:26 +0000308 print(' Using threads '.center(79, '-'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000309 import multiprocessing.dummy as multiprocessing
310 else:
Christian Heimesaae1b702008-11-28 11:23:26 +0000311 print('Usage:\n\t%s [processes | threads]' % sys.argv[0])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000312 raise SystemExit(2)
313
314 test()