blob: e5258bb9d8ba7ba174a3fe3e78242c549deaef96 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
178class _TestProcess(BaseTestCase):
179
180 ALLOWED_TYPES = ('processes', 'threads')
181
182 def test_current(self):
183 if self.TYPE == 'threads':
184 return
185
186 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000191 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEqual(current.ident, os.getpid())
194 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 q.put(args)
200 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000223 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225
226 p.start()
227
Ezio Melotti2623a372010-11-21 13:34:58 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
Ezio Melotti2623a372010-11-21 13:34:58 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.join()
240
Ezio Melotti2623a372010-11-21 13:34:58 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
251 return
252
253 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000254 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000258 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
271
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288
Jesus Cea6f6016b2011-09-09 20:26:57 +0200289 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000290 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000291 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000292
293 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000294 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000295
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000296 @classmethod
297 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298 from multiprocessing import forking
299 wconn.send(id)
300 if len(id) < 2:
301 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 p = cls.Process(
303 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 )
305 p.start()
306 p.join()
307
308 def test_recursion(self):
309 rconn, wconn = self.Pipe(duplex=False)
310 self._test_recursion(wconn, [])
311
312 time.sleep(DELTA)
313 result = []
314 while rconn.poll():
315 result.append(rconn.recv())
316
317 expected = [
318 [],
319 [0],
320 [0, 0],
321 [0, 1],
322 [1],
323 [1, 0],
324 [1, 1]
325 ]
326 self.assertEqual(result, expected)
327
328#
329#
330#
331
332class _UpperCaser(multiprocessing.Process):
333
334 def __init__(self):
335 multiprocessing.Process.__init__(self)
336 self.child_conn, self.parent_conn = multiprocessing.Pipe()
337
338 def run(self):
339 self.parent_conn.close()
340 for s in iter(self.child_conn.recv, None):
341 self.child_conn.send(s.upper())
342 self.child_conn.close()
343
344 def submit(self, s):
345 assert type(s) is str
346 self.parent_conn.send(s)
347 return self.parent_conn.recv()
348
349 def stop(self):
350 self.parent_conn.send(None)
351 self.parent_conn.close()
352 self.child_conn.close()
353
354class _TestSubclassingProcess(BaseTestCase):
355
356 ALLOWED_TYPES = ('processes',)
357
358 def test_subclassing(self):
359 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200360 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000361 uppercaser.start()
362 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
363 self.assertEqual(uppercaser.submit('world'), 'WORLD')
364 uppercaser.stop()
365 uppercaser.join()
366
367#
368#
369#
370
371def queue_empty(q):
372 if hasattr(q, 'empty'):
373 return q.empty()
374 else:
375 return q.qsize() == 0
376
377def queue_full(q, maxsize):
378 if hasattr(q, 'full'):
379 return q.full()
380 else:
381 return q.qsize() == maxsize
382
383
384class _TestQueue(BaseTestCase):
385
386
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000387 @classmethod
388 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000389 child_can_start.wait()
390 for i in range(6):
391 queue.get()
392 parent_can_continue.set()
393
394 def test_put(self):
395 MAXSIZE = 6
396 queue = self.Queue(maxsize=MAXSIZE)
397 child_can_start = self.Event()
398 parent_can_continue = self.Event()
399
400 proc = self.Process(
401 target=self._test_put,
402 args=(queue, child_can_start, parent_can_continue)
403 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000404 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000405 proc.start()
406
407 self.assertEqual(queue_empty(queue), True)
408 self.assertEqual(queue_full(queue, MAXSIZE), False)
409
410 queue.put(1)
411 queue.put(2, True)
412 queue.put(3, True, None)
413 queue.put(4, False)
414 queue.put(5, False, None)
415 queue.put_nowait(6)
416
417 # the values may be in buffer but not yet in pipe so sleep a bit
418 time.sleep(DELTA)
419
420 self.assertEqual(queue_empty(queue), False)
421 self.assertEqual(queue_full(queue, MAXSIZE), True)
422
423 put = TimingWrapper(queue.put)
424 put_nowait = TimingWrapper(queue.put_nowait)
425
426 self.assertRaises(Queue.Full, put, 7, False)
427 self.assertTimingAlmostEqual(put.elapsed, 0)
428
429 self.assertRaises(Queue.Full, put, 7, False, None)
430 self.assertTimingAlmostEqual(put.elapsed, 0)
431
432 self.assertRaises(Queue.Full, put_nowait, 7)
433 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
434
435 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
436 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
437
438 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
439 self.assertTimingAlmostEqual(put.elapsed, 0)
440
441 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
442 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
443
444 child_can_start.set()
445 parent_can_continue.wait()
446
447 self.assertEqual(queue_empty(queue), True)
448 self.assertEqual(queue_full(queue, MAXSIZE), False)
449
450 proc.join()
451
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000452 @classmethod
453 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000454 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000455 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000456 queue.put(2)
457 queue.put(3)
458 queue.put(4)
459 queue.put(5)
460 parent_can_continue.set()
461
462 def test_get(self):
463 queue = self.Queue()
464 child_can_start = self.Event()
465 parent_can_continue = self.Event()
466
467 proc = self.Process(
468 target=self._test_get,
469 args=(queue, child_can_start, parent_can_continue)
470 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000471 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000472 proc.start()
473
474 self.assertEqual(queue_empty(queue), True)
475
476 child_can_start.set()
477 parent_can_continue.wait()
478
479 time.sleep(DELTA)
480 self.assertEqual(queue_empty(queue), False)
481
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000482 # Hangs unexpectedly, remove for now
483 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000484 self.assertEqual(queue.get(True, None), 2)
485 self.assertEqual(queue.get(True), 3)
486 self.assertEqual(queue.get(timeout=1), 4)
487 self.assertEqual(queue.get_nowait(), 5)
488
489 self.assertEqual(queue_empty(queue), True)
490
491 get = TimingWrapper(queue.get)
492 get_nowait = TimingWrapper(queue.get_nowait)
493
494 self.assertRaises(Queue.Empty, get, False)
495 self.assertTimingAlmostEqual(get.elapsed, 0)
496
497 self.assertRaises(Queue.Empty, get, False, None)
498 self.assertTimingAlmostEqual(get.elapsed, 0)
499
500 self.assertRaises(Queue.Empty, get_nowait)
501 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
502
503 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
504 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
505
506 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
507 self.assertTimingAlmostEqual(get.elapsed, 0)
508
509 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
510 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
511
512 proc.join()
513
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000514 @classmethod
515 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000516 for i in range(10, 20):
517 queue.put(i)
518 # note that at this point the items may only be buffered, so the
519 # process cannot shutdown until the feeder thread has finished
520 # pushing items onto the pipe.
521
522 def test_fork(self):
523 # Old versions of Queue would fail to create a new feeder
524 # thread for a forked process if the original process had its
525 # own feeder thread. This test checks that this no longer
526 # happens.
527
528 queue = self.Queue()
529
530 # put items on queue so that main process starts a feeder thread
531 for i in range(10):
532 queue.put(i)
533
534 # wait to make sure thread starts before we fork a new process
535 time.sleep(DELTA)
536
537 # fork process
538 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200539 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000540 p.start()
541
542 # check that all expected items are in the queue
543 for i in range(20):
544 self.assertEqual(queue.get(), i)
545 self.assertRaises(Queue.Empty, queue.get, False)
546
547 p.join()
548
549 def test_qsize(self):
550 q = self.Queue()
551 try:
552 self.assertEqual(q.qsize(), 0)
553 except NotImplementedError:
554 return
555 q.put(1)
556 self.assertEqual(q.qsize(), 1)
557 q.put(5)
558 self.assertEqual(q.qsize(), 2)
559 q.get()
560 self.assertEqual(q.qsize(), 1)
561 q.get()
562 self.assertEqual(q.qsize(), 0)
563
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000564 @classmethod
565 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000566 for obj in iter(q.get, None):
567 time.sleep(DELTA)
568 q.task_done()
569
570 def test_task_done(self):
571 queue = self.JoinableQueue()
572
573 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000574 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000575
576 workers = [self.Process(target=self._test_task_done, args=(queue,))
577 for i in xrange(4)]
578
579 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200580 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000581 p.start()
582
583 for i in xrange(10):
584 queue.put(i)
585
586 queue.join()
587
588 for p in workers:
589 queue.put(None)
590
591 for p in workers:
592 p.join()
593
594#
595#
596#
597
598class _TestLock(BaseTestCase):
599
600 def test_lock(self):
601 lock = self.Lock()
602 self.assertEqual(lock.acquire(), True)
603 self.assertEqual(lock.acquire(False), False)
604 self.assertEqual(lock.release(), None)
605 self.assertRaises((ValueError, threading.ThreadError), lock.release)
606
607 def test_rlock(self):
608 lock = self.RLock()
609 self.assertEqual(lock.acquire(), True)
610 self.assertEqual(lock.acquire(), True)
611 self.assertEqual(lock.acquire(), True)
612 self.assertEqual(lock.release(), None)
613 self.assertEqual(lock.release(), None)
614 self.assertEqual(lock.release(), None)
615 self.assertRaises((AssertionError, RuntimeError), lock.release)
616
Jesse Noller82eb5902009-03-30 23:29:31 +0000617 def test_lock_context(self):
618 with self.Lock():
619 pass
620
Benjamin Petersondfd79492008-06-13 19:13:39 +0000621
622class _TestSemaphore(BaseTestCase):
623
624 def _test_semaphore(self, sem):
625 self.assertReturnsIfImplemented(2, get_value, sem)
626 self.assertEqual(sem.acquire(), True)
627 self.assertReturnsIfImplemented(1, get_value, sem)
628 self.assertEqual(sem.acquire(), True)
629 self.assertReturnsIfImplemented(0, get_value, sem)
630 self.assertEqual(sem.acquire(False), False)
631 self.assertReturnsIfImplemented(0, get_value, sem)
632 self.assertEqual(sem.release(), None)
633 self.assertReturnsIfImplemented(1, get_value, sem)
634 self.assertEqual(sem.release(), None)
635 self.assertReturnsIfImplemented(2, get_value, sem)
636
637 def test_semaphore(self):
638 sem = self.Semaphore(2)
639 self._test_semaphore(sem)
640 self.assertEqual(sem.release(), None)
641 self.assertReturnsIfImplemented(3, get_value, sem)
642 self.assertEqual(sem.release(), None)
643 self.assertReturnsIfImplemented(4, get_value, sem)
644
645 def test_bounded_semaphore(self):
646 sem = self.BoundedSemaphore(2)
647 self._test_semaphore(sem)
648 # Currently fails on OS/X
649 #if HAVE_GETVALUE:
650 # self.assertRaises(ValueError, sem.release)
651 # self.assertReturnsIfImplemented(2, get_value, sem)
652
653 def test_timeout(self):
654 if self.TYPE != 'processes':
655 return
656
657 sem = self.Semaphore(0)
658 acquire = TimingWrapper(sem.acquire)
659
660 self.assertEqual(acquire(False), False)
661 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
662
663 self.assertEqual(acquire(False, None), False)
664 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
665
666 self.assertEqual(acquire(False, TIMEOUT1), False)
667 self.assertTimingAlmostEqual(acquire.elapsed, 0)
668
669 self.assertEqual(acquire(True, TIMEOUT2), False)
670 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
671
672 self.assertEqual(acquire(timeout=TIMEOUT3), False)
673 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
674
675
676class _TestCondition(BaseTestCase):
677
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000678 @classmethod
679 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000680 cond.acquire()
681 sleeping.release()
682 cond.wait(timeout)
683 woken.release()
684 cond.release()
685
686 def check_invariant(self, cond):
687 # this is only supposed to succeed when there are no sleepers
688 if self.TYPE == 'processes':
689 try:
690 sleepers = (cond._sleeping_count.get_value() -
691 cond._woken_count.get_value())
692 self.assertEqual(sleepers, 0)
693 self.assertEqual(cond._wait_semaphore.get_value(), 0)
694 except NotImplementedError:
695 pass
696
697 def test_notify(self):
698 cond = self.Condition()
699 sleeping = self.Semaphore(0)
700 woken = self.Semaphore(0)
701
702 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000703 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704 p.start()
705
706 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000707 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000708 p.start()
709
710 # wait for both children to start sleeping
711 sleeping.acquire()
712 sleeping.acquire()
713
714 # check no process/thread has woken up
715 time.sleep(DELTA)
716 self.assertReturnsIfImplemented(0, get_value, woken)
717
718 # wake up one process/thread
719 cond.acquire()
720 cond.notify()
721 cond.release()
722
723 # check one process/thread has woken up
724 time.sleep(DELTA)
725 self.assertReturnsIfImplemented(1, get_value, woken)
726
727 # wake up another
728 cond.acquire()
729 cond.notify()
730 cond.release()
731
732 # check other has woken up
733 time.sleep(DELTA)
734 self.assertReturnsIfImplemented(2, get_value, woken)
735
736 # check state is not mucked up
737 self.check_invariant(cond)
738 p.join()
739
740 def test_notify_all(self):
741 cond = self.Condition()
742 sleeping = self.Semaphore(0)
743 woken = self.Semaphore(0)
744
745 # start some threads/processes which will timeout
746 for i in range(3):
747 p = self.Process(target=self.f,
748 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000749 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000750 p.start()
751
752 t = threading.Thread(target=self.f,
753 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000754 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000755 t.start()
756
757 # wait for them all to sleep
758 for i in xrange(6):
759 sleeping.acquire()
760
761 # check they have all timed out
762 for i in xrange(6):
763 woken.acquire()
764 self.assertReturnsIfImplemented(0, get_value, woken)
765
766 # check state is not mucked up
767 self.check_invariant(cond)
768
769 # start some more threads/processes
770 for i in range(3):
771 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000772 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000773 p.start()
774
775 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000776 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000777 t.start()
778
779 # wait for them to all sleep
780 for i in xrange(6):
781 sleeping.acquire()
782
783 # check no process/thread has woken up
784 time.sleep(DELTA)
785 self.assertReturnsIfImplemented(0, get_value, woken)
786
787 # wake them all up
788 cond.acquire()
789 cond.notify_all()
790 cond.release()
791
792 # check they have all woken
793 time.sleep(DELTA)
794 self.assertReturnsIfImplemented(6, get_value, woken)
795
796 # check state is not mucked up
797 self.check_invariant(cond)
798
799 def test_timeout(self):
800 cond = self.Condition()
801 wait = TimingWrapper(cond.wait)
802 cond.acquire()
803 res = wait(TIMEOUT1)
804 cond.release()
805 self.assertEqual(res, None)
806 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
807
808
809class _TestEvent(BaseTestCase):
810
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000811 @classmethod
812 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000813 time.sleep(TIMEOUT2)
814 event.set()
815
816 def test_event(self):
817 event = self.Event()
818 wait = TimingWrapper(event.wait)
819
Ezio Melottic2077b02011-03-16 12:34:31 +0200820 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000821 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000822 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000823
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000824 # Removed, threading.Event.wait() will return the value of the __flag
825 # instead of None. API Shear with the semaphore backed mp.Event
826 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000827 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000828 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000829 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
830
831 event.set()
832
833 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000834 self.assertEqual(event.is_set(), True)
835 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000836 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000837 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000838 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
839 # self.assertEqual(event.is_set(), True)
840
841 event.clear()
842
843 #self.assertEqual(event.is_set(), False)
844
Jesus Cea6f6016b2011-09-09 20:26:57 +0200845 p = self.Process(target=self._test_event, args=(event,))
846 p.daemon = True
847 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000848 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000849
850#
851#
852#
853
854class _TestValue(BaseTestCase):
855
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000856 ALLOWED_TYPES = ('processes',)
857
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 codes_values = [
859 ('i', 4343, 24234),
860 ('d', 3.625, -4.25),
861 ('h', -232, 234),
862 ('c', latin('x'), latin('y'))
863 ]
864
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000865 def setUp(self):
866 if not HAS_SHAREDCTYPES:
867 self.skipTest("requires multiprocessing.sharedctypes")
868
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000869 @classmethod
870 def _test(cls, values):
871 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000872 sv.value = cv[2]
873
874
875 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000876 if raw:
877 values = [self.RawValue(code, value)
878 for code, value, _ in self.codes_values]
879 else:
880 values = [self.Value(code, value)
881 for code, value, _ in self.codes_values]
882
883 for sv, cv in zip(values, self.codes_values):
884 self.assertEqual(sv.value, cv[1])
885
886 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200887 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000888 proc.start()
889 proc.join()
890
891 for sv, cv in zip(values, self.codes_values):
892 self.assertEqual(sv.value, cv[2])
893
894 def test_rawvalue(self):
895 self.test_value(raw=True)
896
897 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000898 val1 = self.Value('i', 5)
899 lock1 = val1.get_lock()
900 obj1 = val1.get_obj()
901
902 val2 = self.Value('i', 5, lock=None)
903 lock2 = val2.get_lock()
904 obj2 = val2.get_obj()
905
906 lock = self.Lock()
907 val3 = self.Value('i', 5, lock=lock)
908 lock3 = val3.get_lock()
909 obj3 = val3.get_obj()
910 self.assertEqual(lock, lock3)
911
Jesse Noller6ab22152009-01-18 02:45:38 +0000912 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000913 self.assertFalse(hasattr(arr4, 'get_lock'))
914 self.assertFalse(hasattr(arr4, 'get_obj'))
915
Jesse Noller6ab22152009-01-18 02:45:38 +0000916 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
917
918 arr5 = self.RawValue('i', 5)
919 self.assertFalse(hasattr(arr5, 'get_lock'))
920 self.assertFalse(hasattr(arr5, 'get_obj'))
921
Benjamin Petersondfd79492008-06-13 19:13:39 +0000922
923class _TestArray(BaseTestCase):
924
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000925 ALLOWED_TYPES = ('processes',)
926
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000927 @classmethod
928 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 for i in range(1, len(seq)):
930 seq[i] += seq[i-1]
931
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000932 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000933 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000934 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
935 if raw:
936 arr = self.RawArray('i', seq)
937 else:
938 arr = self.Array('i', seq)
939
940 self.assertEqual(len(arr), len(seq))
941 self.assertEqual(arr[3], seq[3])
942 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
943
944 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
945
946 self.assertEqual(list(arr[:]), seq)
947
948 self.f(seq)
949
950 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200951 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000952 p.start()
953 p.join()
954
955 self.assertEqual(list(arr[:]), seq)
956
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000957 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000958 def test_array_from_size(self):
959 size = 10
960 # Test for zeroing (see issue #11675).
961 # The repetition below strengthens the test by increasing the chances
962 # of previously allocated non-zero memory being used for the new array
963 # on the 2nd and 3rd loops.
964 for _ in range(3):
965 arr = self.Array('i', size)
966 self.assertEqual(len(arr), size)
967 self.assertEqual(list(arr), [0] * size)
968 arr[:] = range(10)
969 self.assertEqual(list(arr), range(10))
970 del arr
971
972 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000973 def test_rawarray(self):
974 self.test_array(raw=True)
975
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000976 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000977 def test_array_accepts_long(self):
978 arr = self.Array('i', 10L)
979 self.assertEqual(len(arr), 10)
980 raw_arr = self.RawArray('i', 10L)
981 self.assertEqual(len(raw_arr), 10)
982
983 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000984 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000985 arr1 = self.Array('i', range(10))
986 lock1 = arr1.get_lock()
987 obj1 = arr1.get_obj()
988
989 arr2 = self.Array('i', range(10), lock=None)
990 lock2 = arr2.get_lock()
991 obj2 = arr2.get_obj()
992
993 lock = self.Lock()
994 arr3 = self.Array('i', range(10), lock=lock)
995 lock3 = arr3.get_lock()
996 obj3 = arr3.get_obj()
997 self.assertEqual(lock, lock3)
998
Jesse Noller6ab22152009-01-18 02:45:38 +0000999 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001000 self.assertFalse(hasattr(arr4, 'get_lock'))
1001 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001002 self.assertRaises(AttributeError,
1003 self.Array, 'i', range(10), lock='notalock')
1004
1005 arr5 = self.RawArray('i', range(10))
1006 self.assertFalse(hasattr(arr5, 'get_lock'))
1007 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001008
1009#
1010#
1011#
1012
1013class _TestContainers(BaseTestCase):
1014
1015 ALLOWED_TYPES = ('manager',)
1016
1017 def test_list(self):
1018 a = self.list(range(10))
1019 self.assertEqual(a[:], range(10))
1020
1021 b = self.list()
1022 self.assertEqual(b[:], [])
1023
1024 b.extend(range(5))
1025 self.assertEqual(b[:], range(5))
1026
1027 self.assertEqual(b[2], 2)
1028 self.assertEqual(b[2:10], [2,3,4])
1029
1030 b *= 2
1031 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1032
1033 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1034
1035 self.assertEqual(a[:], range(10))
1036
1037 d = [a, b]
1038 e = self.list(d)
1039 self.assertEqual(
1040 e[:],
1041 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1042 )
1043
1044 f = self.list([a])
1045 a.append('hello')
1046 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1047
1048 def test_dict(self):
1049 d = self.dict()
1050 indices = range(65, 70)
1051 for i in indices:
1052 d[i] = chr(i)
1053 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1054 self.assertEqual(sorted(d.keys()), indices)
1055 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1056 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1057
1058 def test_namespace(self):
1059 n = self.Namespace()
1060 n.name = 'Bob'
1061 n.job = 'Builder'
1062 n._hidden = 'hidden'
1063 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1064 del n.job
1065 self.assertEqual(str(n), "Namespace(name='Bob')")
1066 self.assertTrue(hasattr(n, 'name'))
1067 self.assertTrue(not hasattr(n, 'job'))
1068
1069#
1070#
1071#
1072
1073def sqr(x, wait=0.0):
1074 time.sleep(wait)
1075 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001076class _TestPool(BaseTestCase):
1077
1078 def test_apply(self):
1079 papply = self.pool.apply
1080 self.assertEqual(papply(sqr, (5,)), sqr(5))
1081 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1082
1083 def test_map(self):
1084 pmap = self.pool.map
1085 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1086 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1087 map(sqr, range(100)))
1088
Jesse Noller7530e472009-07-16 14:23:04 +00001089 def test_map_chunksize(self):
1090 try:
1091 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1092 except multiprocessing.TimeoutError:
1093 self.fail("pool.map_async with chunksize stalled on null list")
1094
Benjamin Petersondfd79492008-06-13 19:13:39 +00001095 def test_async(self):
1096 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1097 get = TimingWrapper(res.get)
1098 self.assertEqual(get(), 49)
1099 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1100
1101 def test_async_timeout(self):
1102 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1103 get = TimingWrapper(res.get)
1104 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1105 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1106
1107 def test_imap(self):
1108 it = self.pool.imap(sqr, range(10))
1109 self.assertEqual(list(it), map(sqr, range(10)))
1110
1111 it = self.pool.imap(sqr, range(10))
1112 for i in range(10):
1113 self.assertEqual(it.next(), i*i)
1114 self.assertRaises(StopIteration, it.next)
1115
1116 it = self.pool.imap(sqr, range(1000), chunksize=100)
1117 for i in range(1000):
1118 self.assertEqual(it.next(), i*i)
1119 self.assertRaises(StopIteration, it.next)
1120
1121 def test_imap_unordered(self):
1122 it = self.pool.imap_unordered(sqr, range(1000))
1123 self.assertEqual(sorted(it), map(sqr, range(1000)))
1124
1125 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1126 self.assertEqual(sorted(it), map(sqr, range(1000)))
1127
1128 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001129 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1130 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1131
Benjamin Petersondfd79492008-06-13 19:13:39 +00001132 p = multiprocessing.Pool(3)
1133 self.assertEqual(3, len(p._pool))
1134 p.close()
1135 p.join()
1136
1137 def test_terminate(self):
1138 if self.TYPE == 'manager':
1139 # On Unix a forked process increfs each shared object to
1140 # which its parent process held a reference. If the
1141 # forked process gets terminated then there is likely to
1142 # be a reference leak. So to prevent
1143 # _TestZZZNumberOfObjects from failing we skip this test
1144 # when using a manager.
1145 return
1146
1147 result = self.pool.map_async(
1148 time.sleep, [0.1 for i in range(10000)], chunksize=1
1149 )
1150 self.pool.terminate()
1151 join = TimingWrapper(self.pool.join)
1152 join()
1153 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001154
1155class _TestPoolWorkerLifetime(BaseTestCase):
1156
1157 ALLOWED_TYPES = ('processes', )
1158 def test_pool_worker_lifetime(self):
1159 p = multiprocessing.Pool(3, maxtasksperchild=10)
1160 self.assertEqual(3, len(p._pool))
1161 origworkerpids = [w.pid for w in p._pool]
1162 # Run many tasks so each worker gets replaced (hopefully)
1163 results = []
1164 for i in range(100):
1165 results.append(p.apply_async(sqr, (i, )))
1166 # Fetch the results and verify we got the right answers,
1167 # also ensuring all the tasks have completed.
1168 for (j, res) in enumerate(results):
1169 self.assertEqual(res.get(), sqr(j))
1170 # Refill the pool
1171 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001172 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001173 # (countdown * DELTA = 5 seconds max startup process time)
1174 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001175 while countdown and not all(w.is_alive() for w in p._pool):
1176 countdown -= 1
1177 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001178 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001179 # All pids should be assigned. See issue #7805.
1180 self.assertNotIn(None, origworkerpids)
1181 self.assertNotIn(None, finalworkerpids)
1182 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001183 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1184 p.close()
1185 p.join()
1186
Charles-François Natali46f990e2011-10-24 18:43:51 +02001187 def test_pool_worker_lifetime_early_close(self):
1188 # Issue #10332: closing a pool whose workers have limited lifetimes
1189 # before all the tasks completed would make join() hang.
1190 p = multiprocessing.Pool(3, maxtasksperchild=1)
1191 results = []
1192 for i in range(6):
1193 results.append(p.apply_async(sqr, (i, 0.3)))
1194 p.close()
1195 p.join()
1196 # check the results
1197 for (j, res) in enumerate(results):
1198 self.assertEqual(res.get(), sqr(j))
1199
1200
Benjamin Petersondfd79492008-06-13 19:13:39 +00001201#
1202# Test that manager has expected number of shared objects left
1203#
1204
1205class _TestZZZNumberOfObjects(BaseTestCase):
1206 # Because test cases are sorted alphabetically, this one will get
1207 # run after all the other tests for the manager. It tests that
1208 # there have been no "reference leaks" for the manager's shared
1209 # objects. Note the comment in _TestPool.test_terminate().
1210 ALLOWED_TYPES = ('manager',)
1211
1212 def test_number_of_objects(self):
1213 EXPECTED_NUMBER = 1 # the pool object is still alive
1214 multiprocessing.active_children() # discard dead process objs
1215 gc.collect() # do garbage collection
1216 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001217 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001218 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001219 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001220 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001221
1222 self.assertEqual(refs, EXPECTED_NUMBER)
1223
1224#
1225# Test of creating a customized manager class
1226#
1227
1228from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1229
1230class FooBar(object):
1231 def f(self):
1232 return 'f()'
1233 def g(self):
1234 raise ValueError
1235 def _h(self):
1236 return '_h()'
1237
1238def baz():
1239 for i in xrange(10):
1240 yield i*i
1241
1242class IteratorProxy(BaseProxy):
1243 _exposed_ = ('next', '__next__')
1244 def __iter__(self):
1245 return self
1246 def next(self):
1247 return self._callmethod('next')
1248 def __next__(self):
1249 return self._callmethod('__next__')
1250
1251class MyManager(BaseManager):
1252 pass
1253
1254MyManager.register('Foo', callable=FooBar)
1255MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1256MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1257
1258
1259class _TestMyManager(BaseTestCase):
1260
1261 ALLOWED_TYPES = ('manager',)
1262
1263 def test_mymanager(self):
1264 manager = MyManager()
1265 manager.start()
1266
1267 foo = manager.Foo()
1268 bar = manager.Bar()
1269 baz = manager.baz()
1270
1271 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1272 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1273
1274 self.assertEqual(foo_methods, ['f', 'g'])
1275 self.assertEqual(bar_methods, ['f', '_h'])
1276
1277 self.assertEqual(foo.f(), 'f()')
1278 self.assertRaises(ValueError, foo.g)
1279 self.assertEqual(foo._callmethod('f'), 'f()')
1280 self.assertRaises(RemoteError, foo._callmethod, '_h')
1281
1282 self.assertEqual(bar.f(), 'f()')
1283 self.assertEqual(bar._h(), '_h()')
1284 self.assertEqual(bar._callmethod('f'), 'f()')
1285 self.assertEqual(bar._callmethod('_h'), '_h()')
1286
1287 self.assertEqual(list(baz), [i*i for i in range(10)])
1288
1289 manager.shutdown()
1290
1291#
1292# Test of connecting to a remote server and using xmlrpclib for serialization
1293#
1294
1295_queue = Queue.Queue()
1296def get_queue():
1297 return _queue
1298
1299class QueueManager(BaseManager):
1300 '''manager class used by server process'''
1301QueueManager.register('get_queue', callable=get_queue)
1302
1303class QueueManager2(BaseManager):
1304 '''manager class which specifies the same interface as QueueManager'''
1305QueueManager2.register('get_queue')
1306
1307
1308SERIALIZER = 'xmlrpclib'
1309
1310class _TestRemoteManager(BaseTestCase):
1311
1312 ALLOWED_TYPES = ('manager',)
1313
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001314 @classmethod
1315 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001316 manager = QueueManager2(
1317 address=address, authkey=authkey, serializer=SERIALIZER
1318 )
1319 manager.connect()
1320 queue = manager.get_queue()
1321 queue.put(('hello world', None, True, 2.25))
1322
1323 def test_remote(self):
1324 authkey = os.urandom(32)
1325
1326 manager = QueueManager(
1327 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1328 )
1329 manager.start()
1330
1331 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001332 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001333 p.start()
1334
1335 manager2 = QueueManager2(
1336 address=manager.address, authkey=authkey, serializer=SERIALIZER
1337 )
1338 manager2.connect()
1339 queue = manager2.get_queue()
1340
1341 # Note that xmlrpclib will deserialize object as a list not a tuple
1342 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1343
1344 # Because we are using xmlrpclib for serialization instead of
1345 # pickle this will cause a serialization error.
1346 self.assertRaises(Exception, queue.put, time.sleep)
1347
1348 # Make queue finalizer run before the server is stopped
1349 del queue
1350 manager.shutdown()
1351
Jesse Noller459a6482009-03-30 15:50:42 +00001352class _TestManagerRestart(BaseTestCase):
1353
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001354 @classmethod
1355 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001356 manager = QueueManager(
1357 address=address, authkey=authkey, serializer=SERIALIZER)
1358 manager.connect()
1359 queue = manager.get_queue()
1360 queue.put('hello world')
1361
1362 def test_rapid_restart(self):
1363 authkey = os.urandom(32)
1364 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001365 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001366 srvr = manager.get_server()
1367 addr = srvr.address
1368 # Close the connection.Listener socket which gets opened as a part
1369 # of manager.get_server(). It's not needed for the test.
1370 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001371 manager.start()
1372
1373 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001374 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001375 p.start()
1376 queue = manager.get_queue()
1377 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001378 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001379 manager.shutdown()
1380 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001381 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001382 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001383 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001384
Benjamin Petersondfd79492008-06-13 19:13:39 +00001385#
1386#
1387#
1388
1389SENTINEL = latin('')
1390
1391class _TestConnection(BaseTestCase):
1392
1393 ALLOWED_TYPES = ('processes', 'threads')
1394
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001395 @classmethod
1396 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001397 for msg in iter(conn.recv_bytes, SENTINEL):
1398 conn.send_bytes(msg)
1399 conn.close()
1400
1401 def test_connection(self):
1402 conn, child_conn = self.Pipe()
1403
1404 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001405 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001406 p.start()
1407
1408 seq = [1, 2.25, None]
1409 msg = latin('hello world')
1410 longmsg = msg * 10
1411 arr = array.array('i', range(4))
1412
1413 if self.TYPE == 'processes':
1414 self.assertEqual(type(conn.fileno()), int)
1415
1416 self.assertEqual(conn.send(seq), None)
1417 self.assertEqual(conn.recv(), seq)
1418
1419 self.assertEqual(conn.send_bytes(msg), None)
1420 self.assertEqual(conn.recv_bytes(), msg)
1421
1422 if self.TYPE == 'processes':
1423 buffer = array.array('i', [0]*10)
1424 expected = list(arr) + [0] * (10 - len(arr))
1425 self.assertEqual(conn.send_bytes(arr), None)
1426 self.assertEqual(conn.recv_bytes_into(buffer),
1427 len(arr) * buffer.itemsize)
1428 self.assertEqual(list(buffer), expected)
1429
1430 buffer = array.array('i', [0]*10)
1431 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1432 self.assertEqual(conn.send_bytes(arr), None)
1433 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1434 len(arr) * buffer.itemsize)
1435 self.assertEqual(list(buffer), expected)
1436
1437 buffer = bytearray(latin(' ' * 40))
1438 self.assertEqual(conn.send_bytes(longmsg), None)
1439 try:
1440 res = conn.recv_bytes_into(buffer)
1441 except multiprocessing.BufferTooShort, e:
1442 self.assertEqual(e.args, (longmsg,))
1443 else:
1444 self.fail('expected BufferTooShort, got %s' % res)
1445
1446 poll = TimingWrapper(conn.poll)
1447
1448 self.assertEqual(poll(), False)
1449 self.assertTimingAlmostEqual(poll.elapsed, 0)
1450
1451 self.assertEqual(poll(TIMEOUT1), False)
1452 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1453
1454 conn.send(None)
1455
1456 self.assertEqual(poll(TIMEOUT1), True)
1457 self.assertTimingAlmostEqual(poll.elapsed, 0)
1458
1459 self.assertEqual(conn.recv(), None)
1460
1461 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1462 conn.send_bytes(really_big_msg)
1463 self.assertEqual(conn.recv_bytes(), really_big_msg)
1464
1465 conn.send_bytes(SENTINEL) # tell child to quit
1466 child_conn.close()
1467
1468 if self.TYPE == 'processes':
1469 self.assertEqual(conn.readable, True)
1470 self.assertEqual(conn.writable, True)
1471 self.assertRaises(EOFError, conn.recv)
1472 self.assertRaises(EOFError, conn.recv_bytes)
1473
1474 p.join()
1475
1476 def test_duplex_false(self):
1477 reader, writer = self.Pipe(duplex=False)
1478 self.assertEqual(writer.send(1), None)
1479 self.assertEqual(reader.recv(), 1)
1480 if self.TYPE == 'processes':
1481 self.assertEqual(reader.readable, True)
1482 self.assertEqual(reader.writable, False)
1483 self.assertEqual(writer.readable, False)
1484 self.assertEqual(writer.writable, True)
1485 self.assertRaises(IOError, reader.send, 2)
1486 self.assertRaises(IOError, writer.recv)
1487 self.assertRaises(IOError, writer.poll)
1488
1489 def test_spawn_close(self):
1490 # We test that a pipe connection can be closed by parent
1491 # process immediately after child is spawned. On Windows this
1492 # would have sometimes failed on old versions because
1493 # child_conn would be closed before the child got a chance to
1494 # duplicate it.
1495 conn, child_conn = self.Pipe()
1496
1497 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001498 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001499 p.start()
1500 child_conn.close() # this might complete before child initializes
1501
1502 msg = latin('hello')
1503 conn.send_bytes(msg)
1504 self.assertEqual(conn.recv_bytes(), msg)
1505
1506 conn.send_bytes(SENTINEL)
1507 conn.close()
1508 p.join()
1509
1510 def test_sendbytes(self):
1511 if self.TYPE != 'processes':
1512 return
1513
1514 msg = latin('abcdefghijklmnopqrstuvwxyz')
1515 a, b = self.Pipe()
1516
1517 a.send_bytes(msg)
1518 self.assertEqual(b.recv_bytes(), msg)
1519
1520 a.send_bytes(msg, 5)
1521 self.assertEqual(b.recv_bytes(), msg[5:])
1522
1523 a.send_bytes(msg, 7, 8)
1524 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1525
1526 a.send_bytes(msg, 26)
1527 self.assertEqual(b.recv_bytes(), latin(''))
1528
1529 a.send_bytes(msg, 26, 0)
1530 self.assertEqual(b.recv_bytes(), latin(''))
1531
1532 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1533
1534 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1535
1536 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1537
1538 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1539
1540 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1541
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001542 @classmethod
1543 def _is_fd_assigned(cls, fd):
1544 try:
1545 os.fstat(fd)
1546 except OSError as e:
1547 if e.errno == errno.EBADF:
1548 return False
1549 raise
1550 else:
1551 return True
1552
1553 @classmethod
1554 def _writefd(cls, conn, data, create_dummy_fds=False):
1555 if create_dummy_fds:
1556 for i in range(0, 256):
1557 if not cls._is_fd_assigned(i):
1558 os.dup2(conn.fileno(), i)
1559 fd = reduction.recv_handle(conn)
1560 if msvcrt:
1561 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1562 os.write(fd, data)
1563 os.close(fd)
1564
Charles-François Natalif8413b22011-09-21 18:44:49 +02001565 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001566 def test_fd_transfer(self):
1567 if self.TYPE != 'processes':
1568 self.skipTest("only makes sense with processes")
1569 conn, child_conn = self.Pipe(duplex=True)
1570
1571 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001572 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001573 p.start()
1574 with open(test_support.TESTFN, "wb") as f:
1575 fd = f.fileno()
1576 if msvcrt:
1577 fd = msvcrt.get_osfhandle(fd)
1578 reduction.send_handle(conn, fd, p.pid)
1579 p.join()
1580 with open(test_support.TESTFN, "rb") as f:
1581 self.assertEqual(f.read(), b"foo")
1582
Charles-François Natalif8413b22011-09-21 18:44:49 +02001583 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001584 @unittest.skipIf(sys.platform == "win32",
1585 "test semantics don't make sense on Windows")
1586 @unittest.skipIf(MAXFD <= 256,
1587 "largest assignable fd number is too small")
1588 @unittest.skipUnless(hasattr(os, "dup2"),
1589 "test needs os.dup2()")
1590 def test_large_fd_transfer(self):
1591 # With fd > 256 (issue #11657)
1592 if self.TYPE != 'processes':
1593 self.skipTest("only makes sense with processes")
1594 conn, child_conn = self.Pipe(duplex=True)
1595
1596 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001597 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001598 p.start()
1599 with open(test_support.TESTFN, "wb") as f:
1600 fd = f.fileno()
1601 for newfd in range(256, MAXFD):
1602 if not self._is_fd_assigned(newfd):
1603 break
1604 else:
1605 self.fail("could not find an unassigned large file descriptor")
1606 os.dup2(fd, newfd)
1607 try:
1608 reduction.send_handle(conn, newfd, p.pid)
1609 finally:
1610 os.close(newfd)
1611 p.join()
1612 with open(test_support.TESTFN, "rb") as f:
1613 self.assertEqual(f.read(), b"bar")
1614
Jesus Ceac23484b2011-09-21 03:47:39 +02001615 @classmethod
1616 def _send_data_without_fd(self, conn):
1617 os.write(conn.fileno(), b"\0")
1618
Charles-François Natalif8413b22011-09-21 18:44:49 +02001619 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001620 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1621 def test_missing_fd_transfer(self):
1622 # Check that exception is raised when received data is not
1623 # accompanied by a file descriptor in ancillary data.
1624 if self.TYPE != 'processes':
1625 self.skipTest("only makes sense with processes")
1626 conn, child_conn = self.Pipe(duplex=True)
1627
1628 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1629 p.daemon = True
1630 p.start()
1631 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1632 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001633
Benjamin Petersondfd79492008-06-13 19:13:39 +00001634class _TestListenerClient(BaseTestCase):
1635
1636 ALLOWED_TYPES = ('processes', 'threads')
1637
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001638 @classmethod
1639 def _test(cls, address):
1640 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001641 conn.send('hello')
1642 conn.close()
1643
1644 def test_listener_client(self):
1645 for family in self.connection.families:
1646 l = self.connection.Listener(family=family)
1647 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001648 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001649 p.start()
1650 conn = l.accept()
1651 self.assertEqual(conn.recv(), 'hello')
1652 p.join()
1653 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001654#
1655# Test of sending connection and socket objects between processes
1656#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001657"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001658class _TestPicklingConnections(BaseTestCase):
1659
1660 ALLOWED_TYPES = ('processes',)
1661
1662 def _listener(self, conn, families):
1663 for fam in families:
1664 l = self.connection.Listener(family=fam)
1665 conn.send(l.address)
1666 new_conn = l.accept()
1667 conn.send(new_conn)
1668
1669 if self.TYPE == 'processes':
1670 l = socket.socket()
1671 l.bind(('localhost', 0))
1672 conn.send(l.getsockname())
1673 l.listen(1)
1674 new_conn, addr = l.accept()
1675 conn.send(new_conn)
1676
1677 conn.recv()
1678
1679 def _remote(self, conn):
1680 for (address, msg) in iter(conn.recv, None):
1681 client = self.connection.Client(address)
1682 client.send(msg.upper())
1683 client.close()
1684
1685 if self.TYPE == 'processes':
1686 address, msg = conn.recv()
1687 client = socket.socket()
1688 client.connect(address)
1689 client.sendall(msg.upper())
1690 client.close()
1691
1692 conn.close()
1693
1694 def test_pickling(self):
1695 try:
1696 multiprocessing.allow_connection_pickling()
1697 except ImportError:
1698 return
1699
1700 families = self.connection.families
1701
1702 lconn, lconn0 = self.Pipe()
1703 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001704 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001705 lp.start()
1706 lconn0.close()
1707
1708 rconn, rconn0 = self.Pipe()
1709 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001710 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001711 rp.start()
1712 rconn0.close()
1713
1714 for fam in families:
1715 msg = ('This connection uses family %s' % fam).encode('ascii')
1716 address = lconn.recv()
1717 rconn.send((address, msg))
1718 new_conn = lconn.recv()
1719 self.assertEqual(new_conn.recv(), msg.upper())
1720
1721 rconn.send(None)
1722
1723 if self.TYPE == 'processes':
1724 msg = latin('This connection uses a normal socket')
1725 address = lconn.recv()
1726 rconn.send((address, msg))
1727 if hasattr(socket, 'fromfd'):
1728 new_conn = lconn.recv()
1729 self.assertEqual(new_conn.recv(100), msg.upper())
1730 else:
1731 # XXX On Windows with Py2.6 need to backport fromfd()
1732 discard = lconn.recv_bytes()
1733
1734 lconn.send(None)
1735
1736 rconn.close()
1737 lconn.close()
1738
1739 lp.join()
1740 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001741"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001742#
1743#
1744#
1745
1746class _TestHeap(BaseTestCase):
1747
1748 ALLOWED_TYPES = ('processes',)
1749
1750 def test_heap(self):
1751 iterations = 5000
1752 maxblocks = 50
1753 blocks = []
1754
1755 # create and destroy lots of blocks of different sizes
1756 for i in xrange(iterations):
1757 size = int(random.lognormvariate(0, 1) * 1000)
1758 b = multiprocessing.heap.BufferWrapper(size)
1759 blocks.append(b)
1760 if len(blocks) > maxblocks:
1761 i = random.randrange(maxblocks)
1762 del blocks[i]
1763
1764 # get the heap object
1765 heap = multiprocessing.heap.BufferWrapper._heap
1766
1767 # verify the state of the heap
1768 all = []
1769 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001770 heap._lock.acquire()
1771 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001772 for L in heap._len_to_seq.values():
1773 for arena, start, stop in L:
1774 all.append((heap._arenas.index(arena), start, stop,
1775 stop-start, 'free'))
1776 for arena, start, stop in heap._allocated_blocks:
1777 all.append((heap._arenas.index(arena), start, stop,
1778 stop-start, 'occupied'))
1779 occupied += (stop-start)
1780
1781 all.sort()
1782
1783 for i in range(len(all)-1):
1784 (arena, start, stop) = all[i][:3]
1785 (narena, nstart, nstop) = all[i+1][:3]
1786 self.assertTrue((arena != narena and nstart == 0) or
1787 (stop == nstart))
1788
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001789 def test_free_from_gc(self):
1790 # Check that freeing of blocks by the garbage collector doesn't deadlock
1791 # (issue #12352).
1792 # Make sure the GC is enabled, and set lower collection thresholds to
1793 # make collections more frequent (and increase the probability of
1794 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001795 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001796 gc.enable()
1797 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001798 thresholds = gc.get_threshold()
1799 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001800 gc.set_threshold(10)
1801
1802 # perform numerous block allocations, with cyclic references to make
1803 # sure objects are collected asynchronously by the gc
1804 for i in range(5000):
1805 a = multiprocessing.heap.BufferWrapper(1)
1806 b = multiprocessing.heap.BufferWrapper(1)
1807 # circular references
1808 a.buddy = b
1809 b.buddy = a
1810
Benjamin Petersondfd79492008-06-13 19:13:39 +00001811#
1812#
1813#
1814
Benjamin Petersondfd79492008-06-13 19:13:39 +00001815class _Foo(Structure):
1816 _fields_ = [
1817 ('x', c_int),
1818 ('y', c_double)
1819 ]
1820
1821class _TestSharedCTypes(BaseTestCase):
1822
1823 ALLOWED_TYPES = ('processes',)
1824
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001825 def setUp(self):
1826 if not HAS_SHAREDCTYPES:
1827 self.skipTest("requires multiprocessing.sharedctypes")
1828
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001829 @classmethod
1830 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001831 x.value *= 2
1832 y.value *= 2
1833 foo.x *= 2
1834 foo.y *= 2
1835 string.value *= 2
1836 for i in range(len(arr)):
1837 arr[i] *= 2
1838
1839 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001840 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001841 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001842 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001843 arr = self.Array('d', range(10), lock=lock)
1844 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001845 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001846
1847 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001848 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001849 p.start()
1850 p.join()
1851
1852 self.assertEqual(x.value, 14)
1853 self.assertAlmostEqual(y.value, 2.0/3.0)
1854 self.assertEqual(foo.x, 6)
1855 self.assertAlmostEqual(foo.y, 4.0)
1856 for i in range(10):
1857 self.assertAlmostEqual(arr[i], i*2)
1858 self.assertEqual(string.value, latin('hellohello'))
1859
1860 def test_synchronize(self):
1861 self.test_sharedctypes(lock=True)
1862
1863 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001864 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001865 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001866 foo.x = 0
1867 foo.y = 0
1868 self.assertEqual(bar.x, 2)
1869 self.assertAlmostEqual(bar.y, 5.0)
1870
1871#
1872#
1873#
1874
1875class _TestFinalize(BaseTestCase):
1876
1877 ALLOWED_TYPES = ('processes',)
1878
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001879 @classmethod
1880 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001881 class Foo(object):
1882 pass
1883
1884 a = Foo()
1885 util.Finalize(a, conn.send, args=('a',))
1886 del a # triggers callback for a
1887
1888 b = Foo()
1889 close_b = util.Finalize(b, conn.send, args=('b',))
1890 close_b() # triggers callback for b
1891 close_b() # does nothing because callback has already been called
1892 del b # does nothing because callback has already been called
1893
1894 c = Foo()
1895 util.Finalize(c, conn.send, args=('c',))
1896
1897 d10 = Foo()
1898 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1899
1900 d01 = Foo()
1901 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1902 d02 = Foo()
1903 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1904 d03 = Foo()
1905 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1906
1907 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1908
1909 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1910
Ezio Melottic2077b02011-03-16 12:34:31 +02001911 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001912 # garbage collecting locals
1913 util._exit_function()
1914 conn.close()
1915 os._exit(0)
1916
1917 def test_finalize(self):
1918 conn, child_conn = self.Pipe()
1919
1920 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001921 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001922 p.start()
1923 p.join()
1924
1925 result = [obj for obj in iter(conn.recv, 'STOP')]
1926 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1927
1928#
1929# Test that from ... import * works for each module
1930#
1931
1932class _TestImportStar(BaseTestCase):
1933
1934 ALLOWED_TYPES = ('processes',)
1935
1936 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001937 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001938 'multiprocessing', 'multiprocessing.connection',
1939 'multiprocessing.heap', 'multiprocessing.managers',
1940 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001941 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001942 ]
1943
Charles-François Natalif8413b22011-09-21 18:44:49 +02001944 if HAS_REDUCTION:
1945 modules.append('multiprocessing.reduction')
1946
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001947 if c_int is not None:
1948 # This module requires _ctypes
1949 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001950
1951 for name in modules:
1952 __import__(name)
1953 mod = sys.modules[name]
1954
1955 for attr in getattr(mod, '__all__', ()):
1956 self.assertTrue(
1957 hasattr(mod, attr),
1958 '%r does not have attribute %r' % (mod, attr)
1959 )
1960
1961#
1962# Quick test that logging works -- does not test logging output
1963#
1964
1965class _TestLogging(BaseTestCase):
1966
1967 ALLOWED_TYPES = ('processes',)
1968
1969 def test_enable_logging(self):
1970 logger = multiprocessing.get_logger()
1971 logger.setLevel(util.SUBWARNING)
1972 self.assertTrue(logger is not None)
1973 logger.debug('this will not be printed')
1974 logger.info('nor will this')
1975 logger.setLevel(LOG_LEVEL)
1976
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001977 @classmethod
1978 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001979 logger = multiprocessing.get_logger()
1980 conn.send(logger.getEffectiveLevel())
1981
1982 def test_level(self):
1983 LEVEL1 = 32
1984 LEVEL2 = 37
1985
1986 logger = multiprocessing.get_logger()
1987 root_logger = logging.getLogger()
1988 root_level = root_logger.level
1989
1990 reader, writer = multiprocessing.Pipe(duplex=False)
1991
1992 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02001993 p = self.Process(target=self._test_level, args=(writer,))
1994 p.daemon = True
1995 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001996 self.assertEqual(LEVEL1, reader.recv())
1997
1998 logger.setLevel(logging.NOTSET)
1999 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002000 p = self.Process(target=self._test_level, args=(writer,))
2001 p.daemon = True
2002 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002003 self.assertEqual(LEVEL2, reader.recv())
2004
2005 root_logger.setLevel(root_level)
2006 logger.setLevel(level=LOG_LEVEL)
2007
Jesse Noller814d02d2009-11-21 14:38:23 +00002008
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002009# class _TestLoggingProcessName(BaseTestCase):
2010#
2011# def handle(self, record):
2012# assert record.processName == multiprocessing.current_process().name
2013# self.__handled = True
2014#
2015# def test_logging(self):
2016# handler = logging.Handler()
2017# handler.handle = self.handle
2018# self.__handled = False
2019# # Bypass getLogger() and side-effects
2020# logger = logging.getLoggerClass()(
2021# 'multiprocessing.test.TestLoggingProcessName')
2022# logger.addHandler(handler)
2023# logger.propagate = False
2024#
2025# logger.warn('foo')
2026# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002027
Benjamin Petersondfd79492008-06-13 19:13:39 +00002028#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002029# Test to verify handle verification, see issue 3321
2030#
2031
2032class TestInvalidHandle(unittest.TestCase):
2033
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002034 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002035 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002036 conn = _multiprocessing.Connection(44977608)
2037 self.assertRaises(IOError, conn.poll)
2038 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002039
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002040#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002041# Functions used to create test cases from the base ones in this module
2042#
2043
2044def get_attributes(Source, names):
2045 d = {}
2046 for name in names:
2047 obj = getattr(Source, name)
2048 if type(obj) == type(get_attributes):
2049 obj = staticmethod(obj)
2050 d[name] = obj
2051 return d
2052
2053def create_test_cases(Mixin, type):
2054 result = {}
2055 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002056 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002057
2058 for name in glob.keys():
2059 if name.startswith('_Test'):
2060 base = glob[name]
2061 if type in base.ALLOWED_TYPES:
2062 newname = 'With' + Type + name[1:]
2063 class Temp(base, unittest.TestCase, Mixin):
2064 pass
2065 result[newname] = Temp
2066 Temp.__name__ = newname
2067 Temp.__module__ = Mixin.__module__
2068 return result
2069
2070#
2071# Create test cases
2072#
2073
2074class ProcessesMixin(object):
2075 TYPE = 'processes'
2076 Process = multiprocessing.Process
2077 locals().update(get_attributes(multiprocessing, (
2078 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2079 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2080 'RawArray', 'current_process', 'active_children', 'Pipe',
2081 'connection', 'JoinableQueue'
2082 )))
2083
2084testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2085globals().update(testcases_processes)
2086
2087
2088class ManagerMixin(object):
2089 TYPE = 'manager'
2090 Process = multiprocessing.Process
2091 manager = object.__new__(multiprocessing.managers.SyncManager)
2092 locals().update(get_attributes(manager, (
2093 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2094 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2095 'Namespace', 'JoinableQueue'
2096 )))
2097
2098testcases_manager = create_test_cases(ManagerMixin, type='manager')
2099globals().update(testcases_manager)
2100
2101
2102class ThreadsMixin(object):
2103 TYPE = 'threads'
2104 Process = multiprocessing.dummy.Process
2105 locals().update(get_attributes(multiprocessing.dummy, (
2106 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2107 'Condition', 'Event', 'Value', 'Array', 'current_process',
2108 'active_children', 'Pipe', 'connection', 'dict', 'list',
2109 'Namespace', 'JoinableQueue'
2110 )))
2111
2112testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2113globals().update(testcases_threads)
2114
Neal Norwitz0c519b32008-08-25 01:50:24 +00002115class OtherTest(unittest.TestCase):
2116 # TODO: add more tests for deliver/answer challenge.
2117 def test_deliver_challenge_auth_failure(self):
2118 class _FakeConnection(object):
2119 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002120 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002121 def send_bytes(self, data):
2122 pass
2123 self.assertRaises(multiprocessing.AuthenticationError,
2124 multiprocessing.connection.deliver_challenge,
2125 _FakeConnection(), b'abc')
2126
2127 def test_answer_challenge_auth_failure(self):
2128 class _FakeConnection(object):
2129 def __init__(self):
2130 self.count = 0
2131 def recv_bytes(self, size):
2132 self.count += 1
2133 if self.count == 1:
2134 return multiprocessing.connection.CHALLENGE
2135 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002136 return b'something bogus'
2137 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002138 def send_bytes(self, data):
2139 pass
2140 self.assertRaises(multiprocessing.AuthenticationError,
2141 multiprocessing.connection.answer_challenge,
2142 _FakeConnection(), b'abc')
2143
Jesse Noller7152f6d2009-04-02 05:17:26 +00002144#
2145# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2146#
2147
2148def initializer(ns):
2149 ns.test += 1
2150
2151class TestInitializers(unittest.TestCase):
2152 def setUp(self):
2153 self.mgr = multiprocessing.Manager()
2154 self.ns = self.mgr.Namespace()
2155 self.ns.test = 0
2156
2157 def tearDown(self):
2158 self.mgr.shutdown()
2159
2160 def test_manager_initializer(self):
2161 m = multiprocessing.managers.SyncManager()
2162 self.assertRaises(TypeError, m.start, 1)
2163 m.start(initializer, (self.ns,))
2164 self.assertEqual(self.ns.test, 1)
2165 m.shutdown()
2166
2167 def test_pool_initializer(self):
2168 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2169 p = multiprocessing.Pool(1, initializer, (self.ns,))
2170 p.close()
2171 p.join()
2172 self.assertEqual(self.ns.test, 1)
2173
Jesse Noller1b90efb2009-06-30 17:11:52 +00002174#
2175# Issue 5155, 5313, 5331: Test process in processes
2176# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2177#
2178
2179def _ThisSubProcess(q):
2180 try:
2181 item = q.get(block=False)
2182 except Queue.Empty:
2183 pass
2184
2185def _TestProcess(q):
2186 queue = multiprocessing.Queue()
2187 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002188 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002189 subProc.start()
2190 subProc.join()
2191
2192def _afunc(x):
2193 return x*x
2194
2195def pool_in_process():
2196 pool = multiprocessing.Pool(processes=4)
2197 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2198
2199class _file_like(object):
2200 def __init__(self, delegate):
2201 self._delegate = delegate
2202 self._pid = None
2203
2204 @property
2205 def cache(self):
2206 pid = os.getpid()
2207 # There are no race conditions since fork keeps only the running thread
2208 if pid != self._pid:
2209 self._pid = pid
2210 self._cache = []
2211 return self._cache
2212
2213 def write(self, data):
2214 self.cache.append(data)
2215
2216 def flush(self):
2217 self._delegate.write(''.join(self.cache))
2218 self._cache = []
2219
2220class TestStdinBadfiledescriptor(unittest.TestCase):
2221
2222 def test_queue_in_process(self):
2223 queue = multiprocessing.Queue()
2224 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2225 proc.start()
2226 proc.join()
2227
2228 def test_pool_in_process(self):
2229 p = multiprocessing.Process(target=pool_in_process)
2230 p.start()
2231 p.join()
2232
2233 def test_flushing(self):
2234 sio = StringIO()
2235 flike = _file_like(sio)
2236 flike.write('foo')
2237 proc = multiprocessing.Process(target=lambda: flike.flush())
2238 flike.flush()
2239 assert sio.getvalue() == 'foo'
2240
2241testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2242 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002243
Benjamin Petersondfd79492008-06-13 19:13:39 +00002244#
2245#
2246#
2247
2248def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002249 if sys.platform.startswith("linux"):
2250 try:
2251 lock = multiprocessing.RLock()
2252 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002253 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002254
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002255 check_enough_semaphores()
2256
Benjamin Petersondfd79492008-06-13 19:13:39 +00002257 if run is None:
2258 from test.test_support import run_unittest as run
2259
2260 util.get_temp_dir() # creates temp directory for use by all processes
2261
2262 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2263
Jesse Noller146b7ab2008-07-02 16:44:09 +00002264 ProcessesMixin.pool = multiprocessing.Pool(4)
2265 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2266 ManagerMixin.manager.__init__()
2267 ManagerMixin.manager.start()
2268 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002269
2270 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002271 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2272 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002273 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2274 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002275 )
2276
2277 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2278 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002279 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2280 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002281 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002282 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002283 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002284 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2285 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2286 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002287 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002288
Jesse Noller146b7ab2008-07-02 16:44:09 +00002289 ThreadsMixin.pool.terminate()
2290 ProcessesMixin.pool.terminate()
2291 ManagerMixin.pool.terminate()
2292 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002293
Jesse Noller146b7ab2008-07-02 16:44:09 +00002294 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002295
2296def main():
2297 test_main(unittest.TextTestRunner(verbosity=2).run)
2298
2299if __name__ == '__main__':
2300 main()