blob: 0a3d92ad544dc3b028ab517d9b0d45ff0da53471 [file] [log] [blame]
Benjamin Peterson190d56e2008-06-11 02:40:25 +00001#
2# A test of `multiprocessing.Pool` class
3#
Christian Heimeseac80712008-11-28 19:33:33 +00004# Copyright (c) 2006-2008, R Oudkerk
5# All rights reserved.
6#
Benjamin Peterson190d56e2008-06-11 02:40:25 +00007
8import multiprocessing
9import time
10import random
11import sys
12
13#
14# Functions used by test code
15#
16
17def calculate(func, args):
18 result = func(*args)
19 return '%s says that %s%s = %s' % (
Jesse Noller5bc9f4c2008-08-19 19:06:19 +000020 multiprocessing.current_process().name,
Benjamin Peterson190d56e2008-06-11 02:40:25 +000021 func.__name__, args, result
22 )
23
24def calculatestar(args):
25 return calculate(*args)
26
27def mul(a, b):
28 time.sleep(0.5*random.random())
29 return a * b
30
31def plus(a, b):
32 time.sleep(0.5*random.random())
33 return a + b
34
35def f(x):
36 return 1.0 / (x-5.0)
37
38def pow3(x):
39 return x**3
40
41def noop(x):
42 pass
43
44#
45# Test code
46#
47
48def test():
49 print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
50
51 #
52 # Create pool
53 #
54
55 PROCESSES = 4
56 print 'Creating pool with %d processes\n' % PROCESSES
57 pool = multiprocessing.Pool(PROCESSES)
58 print 'pool = %s' % pool
59 print
60
61 #
62 # Tests
63 #
64
65 TASKS = [(mul, (i, 7)) for i in range(10)] + \
66 [(plus, (i, 8)) for i in range(10)]
67
68 results = [pool.apply_async(calculate, t) for t in TASKS]
69 imap_it = pool.imap(calculatestar, TASKS)
70 imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
71
72 print 'Ordered results using pool.apply_async():'
73 for r in results:
74 print '\t', r.get()
75 print
76
77 print 'Ordered results using pool.imap():'
78 for x in imap_it:
79 print '\t', x
80 print
81
82 print 'Unordered results using pool.imap_unordered():'
83 for x in imap_unordered_it:
84 print '\t', x
85 print
86
87 print 'Ordered results using pool.map() --- will block till complete:'
88 for x in pool.map(calculatestar, TASKS):
89 print '\t', x
90 print
91
92 #
93 # Simple benchmarks
94 #
95
96 N = 100000
97 print 'def pow3(x): return x**3'
98
99 t = time.time()
100 A = map(pow3, xrange(N))
101 print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
102 (N, time.time() - t)
103
104 t = time.time()
105 B = pool.map(pow3, xrange(N))
106 print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
107 (N, time.time() - t)
108
109 t = time.time()
110 C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
111 print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
112 ' seconds' % (N, N//8, time.time() - t)
113
114 assert A == B == C, (len(A), len(B), len(C))
115 print
116
117 L = [None] * 1000000
118 print 'def noop(x): pass'
119 print 'L = [None] * 1000000'
120
121 t = time.time()
122 A = map(noop, L)
123 print '\tmap(noop, L):\n\t\t%s seconds' % \
124 (time.time() - t)
125
126 t = time.time()
127 B = pool.map(noop, L)
128 print '\tpool.map(noop, L):\n\t\t%s seconds' % \
129 (time.time() - t)
130
131 t = time.time()
132 C = list(pool.imap(noop, L, chunksize=len(L)//8))
133 print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
134 (len(L)//8, time.time() - t)
135
136 assert A == B == C, (len(A), len(B), len(C))
137 print
138
139 del A, B, C, L
140
141 #
142 # Test error handling
143 #
144
145 print 'Testing error handling:'
146
147 try:
148 print pool.apply(f, (5,))
149 except ZeroDivisionError:
150 print '\tGot ZeroDivisionError as expected from pool.apply()'
151 else:
Georg Brandlc1edec32009-06-03 07:25:35 +0000152 raise AssertionError('expected ZeroDivisionError')
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000153
154 try:
155 print pool.map(f, range(10))
156 except ZeroDivisionError:
157 print '\tGot ZeroDivisionError as expected from pool.map()'
158 else:
Georg Brandlc1edec32009-06-03 07:25:35 +0000159 raise AssertionError('expected ZeroDivisionError')
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000160
161 try:
162 print list(pool.imap(f, range(10)))
163 except ZeroDivisionError:
164 print '\tGot ZeroDivisionError as expected from list(pool.imap())'
165 else:
Georg Brandlc1edec32009-06-03 07:25:35 +0000166 raise AssertionError('expected ZeroDivisionError')
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000167
168 it = pool.imap(f, range(10))
169 for i in range(10):
170 try:
171 x = it.next()
172 except ZeroDivisionError:
173 if i == 5:
174 pass
175 except StopIteration:
176 break
177 else:
178 if i == 5:
Georg Brandlc1edec32009-06-03 07:25:35 +0000179 raise AssertionError('expected ZeroDivisionError')
Benjamin Peterson190d56e2008-06-11 02:40:25 +0000180
181 assert i == 9
182 print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
183 print
184
185 #
186 # Testing timeouts
187 #
188
189 print 'Testing ApplyResult.get() with timeout:',
190 res = pool.apply_async(calculate, TASKS[0])
191 while 1:
192 sys.stdout.flush()
193 try:
194 sys.stdout.write('\n\t%s' % res.get(0.02))
195 break
196 except multiprocessing.TimeoutError:
197 sys.stdout.write('.')
198 print
199 print
200
201 print 'Testing IMapIterator.next() with timeout:',
202 it = pool.imap(calculatestar, TASKS)
203 while 1:
204 sys.stdout.flush()
205 try:
206 sys.stdout.write('\n\t%s' % it.next(0.02))
207 except StopIteration:
208 break
209 except multiprocessing.TimeoutError:
210 sys.stdout.write('.')
211 print
212 print
213
214 #
215 # Testing callback
216 #
217
218 print 'Testing callback:'
219
220 A = []
221 B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
222
223 r = pool.apply_async(mul, (7, 8), callback=A.append)
224 r.wait()
225
226 r = pool.map_async(pow3, range(10), callback=A.extend)
227 r.wait()
228
229 if A == B:
230 print '\tcallbacks succeeded\n'
231 else:
232 print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
233
234 #
235 # Check there are no outstanding tasks
236 #
237
238 assert not pool._cache, 'cache = %r' % pool._cache
239
240 #
241 # Check close() methods
242 #
243
244 print 'Testing close():'
245
246 for worker in pool._pool:
247 assert worker.is_alive()
248
249 result = pool.apply_async(time.sleep, [0.5])
250 pool.close()
251 pool.join()
252
253 assert result.get() is None
254
255 for worker in pool._pool:
256 assert not worker.is_alive()
257
258 print '\tclose() succeeded\n'
259
260 #
261 # Check terminate() method
262 #
263
264 print 'Testing terminate():'
265
266 pool = multiprocessing.Pool(2)
267 DELTA = 0.1
268 ignore = pool.apply(pow3, [2])
269 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
270 pool.terminate()
271 pool.join()
272
273 for worker in pool._pool:
274 assert not worker.is_alive()
275
276 print '\tterminate() succeeded\n'
277
278 #
279 # Check garbage collection
280 #
281
282 print 'Testing garbage collection:'
283
284 pool = multiprocessing.Pool(2)
285 DELTA = 0.1
286 processes = pool._pool
287 ignore = pool.apply(pow3, [2])
288 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
289
290 results = pool = None
291
292 time.sleep(DELTA * 2)
293
294 for worker in processes:
295 assert not worker.is_alive()
296
297 print '\tgarbage collection succeeded\n'
298
299
300if __name__ == '__main__':
301 multiprocessing.freeze_support()
302
303 assert len(sys.argv) in (1, 2)
304
305 if len(sys.argv) == 1 or sys.argv[1] == 'processes':
306 print ' Using processes '.center(79, '-')
307 elif sys.argv[1] == 'threads':
308 print ' Using threads '.center(79, '-')
309 import multiprocessing.dummy as multiprocessing
310 else:
311 print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
312 raise SystemExit(2)
313
314 test()