blob: e7aaaacaf30667286a51596eea693da0af4768a1 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# A test of `multiprocessing.Pool` class
3#
4
5import multiprocessing
6import time
7import random
8import sys
9
10#
11# Functions used by test code
12#
13
14def calculate(func, args):
15 result = func(*args)
16 return '%s says that %s%s = %s' % (
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +000017 multiprocessing.current_process().name,
Benjamin Petersone711caf2008-06-11 16:44:04 +000018 func.__name__, args, result
19 )
20
21def calculatestar(args):
22 return calculate(*args)
23
24def mul(a, b):
25 time.sleep(0.5*random.random())
26 return a * b
27
28def plus(a, b):
29 time.sleep(0.5*random.random())
30 return a + b
31
32def f(x):
33 return 1.0 / (x-5.0)
34
35def pow3(x):
36 return x**3
37
38def noop(x):
39 pass
40
41#
42# Test code
43#
44
45def test():
46 print 'cpu_count() = %d\n' % multiprocessing.cpu_count()
47
48 #
49 # Create pool
50 #
51
52 PROCESSES = 4
53 print 'Creating pool with %d processes\n' % PROCESSES
54 pool = multiprocessing.Pool(PROCESSES)
55 print 'pool = %s' % pool
56 print
57
58 #
59 # Tests
60 #
61
62 TASKS = [(mul, (i, 7)) for i in range(10)] + \
63 [(plus, (i, 8)) for i in range(10)]
64
65 results = [pool.apply_async(calculate, t) for t in TASKS]
66 imap_it = pool.imap(calculatestar, TASKS)
67 imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
68
69 print 'Ordered results using pool.apply_async():'
70 for r in results:
71 print '\t', r.get()
72 print
73
74 print 'Ordered results using pool.imap():'
75 for x in imap_it:
76 print '\t', x
77 print
78
79 print 'Unordered results using pool.imap_unordered():'
80 for x in imap_unordered_it:
81 print '\t', x
82 print
83
84 print 'Ordered results using pool.map() --- will block till complete:'
85 for x in pool.map(calculatestar, TASKS):
86 print '\t', x
87 print
88
89 #
90 # Simple benchmarks
91 #
92
93 N = 100000
94 print 'def pow3(x): return x**3'
95
96 t = time.time()
97 A = map(pow3, xrange(N))
98 print '\tmap(pow3, xrange(%d)):\n\t\t%s seconds' % \
99 (N, time.time() - t)
100
101 t = time.time()
102 B = pool.map(pow3, xrange(N))
103 print '\tpool.map(pow3, xrange(%d)):\n\t\t%s seconds' % \
104 (N, time.time() - t)
105
106 t = time.time()
107 C = list(pool.imap(pow3, xrange(N), chunksize=N//8))
108 print '\tlist(pool.imap(pow3, xrange(%d), chunksize=%d)):\n\t\t%s' \
109 ' seconds' % (N, N//8, time.time() - t)
110
111 assert A == B == C, (len(A), len(B), len(C))
112 print
113
114 L = [None] * 1000000
115 print 'def noop(x): pass'
116 print 'L = [None] * 1000000'
117
118 t = time.time()
119 A = map(noop, L)
120 print '\tmap(noop, L):\n\t\t%s seconds' % \
121 (time.time() - t)
122
123 t = time.time()
124 B = pool.map(noop, L)
125 print '\tpool.map(noop, L):\n\t\t%s seconds' % \
126 (time.time() - t)
127
128 t = time.time()
129 C = list(pool.imap(noop, L, chunksize=len(L)//8))
130 print '\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
131 (len(L)//8, time.time() - t)
132
133 assert A == B == C, (len(A), len(B), len(C))
134 print
135
136 del A, B, C, L
137
138 #
139 # Test error handling
140 #
141
142 print 'Testing error handling:'
143
144 try:
145 print pool.apply(f, (5,))
146 except ZeroDivisionError:
147 print '\tGot ZeroDivisionError as expected from pool.apply()'
148 else:
149 raise AssertionError, 'expected ZeroDivisionError'
150
151 try:
152 print pool.map(f, range(10))
153 except ZeroDivisionError:
154 print '\tGot ZeroDivisionError as expected from pool.map()'
155 else:
156 raise AssertionError, 'expected ZeroDivisionError'
157
158 try:
159 print list(pool.imap(f, range(10)))
160 except ZeroDivisionError:
161 print '\tGot ZeroDivisionError as expected from list(pool.imap())'
162 else:
163 raise AssertionError, 'expected ZeroDivisionError'
164
165 it = pool.imap(f, range(10))
166 for i in range(10):
167 try:
168 x = it.next()
169 except ZeroDivisionError:
170 if i == 5:
171 pass
172 except StopIteration:
173 break
174 else:
175 if i == 5:
176 raise AssertionError, 'expected ZeroDivisionError'
177
178 assert i == 9
179 print '\tGot ZeroDivisionError as expected from IMapIterator.next()'
180 print
181
182 #
183 # Testing timeouts
184 #
185
186 print 'Testing ApplyResult.get() with timeout:',
187 res = pool.apply_async(calculate, TASKS[0])
188 while 1:
189 sys.stdout.flush()
190 try:
191 sys.stdout.write('\n\t%s' % res.get(0.02))
192 break
193 except multiprocessing.TimeoutError:
194 sys.stdout.write('.')
195 print
196 print
197
198 print 'Testing IMapIterator.next() with timeout:',
199 it = pool.imap(calculatestar, TASKS)
200 while 1:
201 sys.stdout.flush()
202 try:
203 sys.stdout.write('\n\t%s' % it.next(0.02))
204 except StopIteration:
205 break
206 except multiprocessing.TimeoutError:
207 sys.stdout.write('.')
208 print
209 print
210
211 #
212 # Testing callback
213 #
214
215 print 'Testing callback:'
216
217 A = []
218 B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
219
220 r = pool.apply_async(mul, (7, 8), callback=A.append)
221 r.wait()
222
223 r = pool.map_async(pow3, range(10), callback=A.extend)
224 r.wait()
225
226 if A == B:
227 print '\tcallbacks succeeded\n'
228 else:
229 print '\t*** callbacks failed\n\t\t%s != %s\n' % (A, B)
230
231 #
232 # Check there are no outstanding tasks
233 #
234
235 assert not pool._cache, 'cache = %r' % pool._cache
236
237 #
238 # Check close() methods
239 #
240
241 print 'Testing close():'
242
243 for worker in pool._pool:
244 assert worker.is_alive()
245
246 result = pool.apply_async(time.sleep, [0.5])
247 pool.close()
248 pool.join()
249
250 assert result.get() is None
251
252 for worker in pool._pool:
253 assert not worker.is_alive()
254
255 print '\tclose() succeeded\n'
256
257 #
258 # Check terminate() method
259 #
260
261 print 'Testing terminate():'
262
263 pool = multiprocessing.Pool(2)
264 DELTA = 0.1
265 ignore = pool.apply(pow3, [2])
266 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
267 pool.terminate()
268 pool.join()
269
270 for worker in pool._pool:
271 assert not worker.is_alive()
272
273 print '\tterminate() succeeded\n'
274
275 #
276 # Check garbage collection
277 #
278
279 print 'Testing garbage collection:'
280
281 pool = multiprocessing.Pool(2)
282 DELTA = 0.1
283 processes = pool._pool
284 ignore = pool.apply(pow3, [2])
285 results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
286
287 results = pool = None
288
289 time.sleep(DELTA * 2)
290
291 for worker in processes:
292 assert not worker.is_alive()
293
294 print '\tgarbage collection succeeded\n'
295
296
297if __name__ == '__main__':
298 multiprocessing.freeze_support()
299
300 assert len(sys.argv) in (1, 2)
301
302 if len(sys.argv) == 1 or sys.argv[1] == 'processes':
303 print ' Using processes '.center(79, '-')
304 elif sys.argv[1] == 'threads':
305 print ' Using threads '.center(79, '-')
306 import multiprocessing.dummy as multiprocessing
307 else:
308 print 'Usage:\n\t%s [processes | threads]' % sys.argv[0]
309 raise SystemExit(2)
310
311 test()