blob: eeb768f7d7a509b48936796b66c1d980dfe695cc [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
178class _TestProcess(BaseTestCase):
179
180 ALLOWED_TYPES = ('processes', 'threads')
181
182 def test_current(self):
183 if self.TYPE == 'threads':
184 return
185
186 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000191 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEqual(current.ident, os.getpid())
194 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 q.put(args)
200 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000223 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225
226 p.start()
227
Ezio Melotti2623a372010-11-21 13:34:58 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
Ezio Melotti2623a372010-11-21 13:34:58 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.join()
240
Ezio Melotti2623a372010-11-21 13:34:58 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
251 return
252
253 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000254 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000258 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
271
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288
Jesus Cea6f6016b2011-09-09 20:26:57 +0200289 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000290 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000291 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000292
293 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000294 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000295
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000296 @classmethod
297 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298 from multiprocessing import forking
299 wconn.send(id)
300 if len(id) < 2:
301 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 p = cls.Process(
303 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 )
305 p.start()
306 p.join()
307
308 def test_recursion(self):
309 rconn, wconn = self.Pipe(duplex=False)
310 self._test_recursion(wconn, [])
311
312 time.sleep(DELTA)
313 result = []
314 while rconn.poll():
315 result.append(rconn.recv())
316
317 expected = [
318 [],
319 [0],
320 [0, 0],
321 [0, 1],
322 [1],
323 [1, 0],
324 [1, 1]
325 ]
326 self.assertEqual(result, expected)
327
328#
329#
330#
331
332class _UpperCaser(multiprocessing.Process):
333
334 def __init__(self):
335 multiprocessing.Process.__init__(self)
336 self.child_conn, self.parent_conn = multiprocessing.Pipe()
337
338 def run(self):
339 self.parent_conn.close()
340 for s in iter(self.child_conn.recv, None):
341 self.child_conn.send(s.upper())
342 self.child_conn.close()
343
344 def submit(self, s):
345 assert type(s) is str
346 self.parent_conn.send(s)
347 return self.parent_conn.recv()
348
349 def stop(self):
350 self.parent_conn.send(None)
351 self.parent_conn.close()
352 self.child_conn.close()
353
354class _TestSubclassingProcess(BaseTestCase):
355
356 ALLOWED_TYPES = ('processes',)
357
358 def test_subclassing(self):
359 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200360 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000361 uppercaser.start()
362 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
363 self.assertEqual(uppercaser.submit('world'), 'WORLD')
364 uppercaser.stop()
365 uppercaser.join()
366
367#
368#
369#
370
371def queue_empty(q):
372 if hasattr(q, 'empty'):
373 return q.empty()
374 else:
375 return q.qsize() == 0
376
377def queue_full(q, maxsize):
378 if hasattr(q, 'full'):
379 return q.full()
380 else:
381 return q.qsize() == maxsize
382
383
384class _TestQueue(BaseTestCase):
385
386
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000387 @classmethod
388 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000389 child_can_start.wait()
390 for i in range(6):
391 queue.get()
392 parent_can_continue.set()
393
394 def test_put(self):
395 MAXSIZE = 6
396 queue = self.Queue(maxsize=MAXSIZE)
397 child_can_start = self.Event()
398 parent_can_continue = self.Event()
399
400 proc = self.Process(
401 target=self._test_put,
402 args=(queue, child_can_start, parent_can_continue)
403 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000404 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000405 proc.start()
406
407 self.assertEqual(queue_empty(queue), True)
408 self.assertEqual(queue_full(queue, MAXSIZE), False)
409
410 queue.put(1)
411 queue.put(2, True)
412 queue.put(3, True, None)
413 queue.put(4, False)
414 queue.put(5, False, None)
415 queue.put_nowait(6)
416
417 # the values may be in buffer but not yet in pipe so sleep a bit
418 time.sleep(DELTA)
419
420 self.assertEqual(queue_empty(queue), False)
421 self.assertEqual(queue_full(queue, MAXSIZE), True)
422
423 put = TimingWrapper(queue.put)
424 put_nowait = TimingWrapper(queue.put_nowait)
425
426 self.assertRaises(Queue.Full, put, 7, False)
427 self.assertTimingAlmostEqual(put.elapsed, 0)
428
429 self.assertRaises(Queue.Full, put, 7, False, None)
430 self.assertTimingAlmostEqual(put.elapsed, 0)
431
432 self.assertRaises(Queue.Full, put_nowait, 7)
433 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
434
435 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
436 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
437
438 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
439 self.assertTimingAlmostEqual(put.elapsed, 0)
440
441 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
442 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
443
444 child_can_start.set()
445 parent_can_continue.wait()
446
447 self.assertEqual(queue_empty(queue), True)
448 self.assertEqual(queue_full(queue, MAXSIZE), False)
449
450 proc.join()
451
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000452 @classmethod
453 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000454 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000455 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000456 queue.put(2)
457 queue.put(3)
458 queue.put(4)
459 queue.put(5)
460 parent_can_continue.set()
461
462 def test_get(self):
463 queue = self.Queue()
464 child_can_start = self.Event()
465 parent_can_continue = self.Event()
466
467 proc = self.Process(
468 target=self._test_get,
469 args=(queue, child_can_start, parent_can_continue)
470 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000471 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000472 proc.start()
473
474 self.assertEqual(queue_empty(queue), True)
475
476 child_can_start.set()
477 parent_can_continue.wait()
478
479 time.sleep(DELTA)
480 self.assertEqual(queue_empty(queue), False)
481
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000482 # Hangs unexpectedly, remove for now
483 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000484 self.assertEqual(queue.get(True, None), 2)
485 self.assertEqual(queue.get(True), 3)
486 self.assertEqual(queue.get(timeout=1), 4)
487 self.assertEqual(queue.get_nowait(), 5)
488
489 self.assertEqual(queue_empty(queue), True)
490
491 get = TimingWrapper(queue.get)
492 get_nowait = TimingWrapper(queue.get_nowait)
493
494 self.assertRaises(Queue.Empty, get, False)
495 self.assertTimingAlmostEqual(get.elapsed, 0)
496
497 self.assertRaises(Queue.Empty, get, False, None)
498 self.assertTimingAlmostEqual(get.elapsed, 0)
499
500 self.assertRaises(Queue.Empty, get_nowait)
501 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
502
503 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
504 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
505
506 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
507 self.assertTimingAlmostEqual(get.elapsed, 0)
508
509 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
510 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
511
512 proc.join()
513
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000514 @classmethod
515 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000516 for i in range(10, 20):
517 queue.put(i)
518 # note that at this point the items may only be buffered, so the
519 # process cannot shutdown until the feeder thread has finished
520 # pushing items onto the pipe.
521
522 def test_fork(self):
523 # Old versions of Queue would fail to create a new feeder
524 # thread for a forked process if the original process had its
525 # own feeder thread. This test checks that this no longer
526 # happens.
527
528 queue = self.Queue()
529
530 # put items on queue so that main process starts a feeder thread
531 for i in range(10):
532 queue.put(i)
533
534 # wait to make sure thread starts before we fork a new process
535 time.sleep(DELTA)
536
537 # fork process
538 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200539 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000540 p.start()
541
542 # check that all expected items are in the queue
543 for i in range(20):
544 self.assertEqual(queue.get(), i)
545 self.assertRaises(Queue.Empty, queue.get, False)
546
547 p.join()
548
549 def test_qsize(self):
550 q = self.Queue()
551 try:
552 self.assertEqual(q.qsize(), 0)
553 except NotImplementedError:
554 return
555 q.put(1)
556 self.assertEqual(q.qsize(), 1)
557 q.put(5)
558 self.assertEqual(q.qsize(), 2)
559 q.get()
560 self.assertEqual(q.qsize(), 1)
561 q.get()
562 self.assertEqual(q.qsize(), 0)
563
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000564 @classmethod
565 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000566 for obj in iter(q.get, None):
567 time.sleep(DELTA)
568 q.task_done()
569
570 def test_task_done(self):
571 queue = self.JoinableQueue()
572
573 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000574 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000575
576 workers = [self.Process(target=self._test_task_done, args=(queue,))
577 for i in xrange(4)]
578
579 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200580 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000581 p.start()
582
583 for i in xrange(10):
584 queue.put(i)
585
586 queue.join()
587
588 for p in workers:
589 queue.put(None)
590
591 for p in workers:
592 p.join()
593
594#
595#
596#
597
598class _TestLock(BaseTestCase):
599
600 def test_lock(self):
601 lock = self.Lock()
602 self.assertEqual(lock.acquire(), True)
603 self.assertEqual(lock.acquire(False), False)
604 self.assertEqual(lock.release(), None)
605 self.assertRaises((ValueError, threading.ThreadError), lock.release)
606
607 def test_rlock(self):
608 lock = self.RLock()
609 self.assertEqual(lock.acquire(), True)
610 self.assertEqual(lock.acquire(), True)
611 self.assertEqual(lock.acquire(), True)
612 self.assertEqual(lock.release(), None)
613 self.assertEqual(lock.release(), None)
614 self.assertEqual(lock.release(), None)
615 self.assertRaises((AssertionError, RuntimeError), lock.release)
616
Jesse Noller82eb5902009-03-30 23:29:31 +0000617 def test_lock_context(self):
618 with self.Lock():
619 pass
620
Benjamin Petersondfd79492008-06-13 19:13:39 +0000621
622class _TestSemaphore(BaseTestCase):
623
624 def _test_semaphore(self, sem):
625 self.assertReturnsIfImplemented(2, get_value, sem)
626 self.assertEqual(sem.acquire(), True)
627 self.assertReturnsIfImplemented(1, get_value, sem)
628 self.assertEqual(sem.acquire(), True)
629 self.assertReturnsIfImplemented(0, get_value, sem)
630 self.assertEqual(sem.acquire(False), False)
631 self.assertReturnsIfImplemented(0, get_value, sem)
632 self.assertEqual(sem.release(), None)
633 self.assertReturnsIfImplemented(1, get_value, sem)
634 self.assertEqual(sem.release(), None)
635 self.assertReturnsIfImplemented(2, get_value, sem)
636
637 def test_semaphore(self):
638 sem = self.Semaphore(2)
639 self._test_semaphore(sem)
640 self.assertEqual(sem.release(), None)
641 self.assertReturnsIfImplemented(3, get_value, sem)
642 self.assertEqual(sem.release(), None)
643 self.assertReturnsIfImplemented(4, get_value, sem)
644
645 def test_bounded_semaphore(self):
646 sem = self.BoundedSemaphore(2)
647 self._test_semaphore(sem)
648 # Currently fails on OS/X
649 #if HAVE_GETVALUE:
650 # self.assertRaises(ValueError, sem.release)
651 # self.assertReturnsIfImplemented(2, get_value, sem)
652
653 def test_timeout(self):
654 if self.TYPE != 'processes':
655 return
656
657 sem = self.Semaphore(0)
658 acquire = TimingWrapper(sem.acquire)
659
660 self.assertEqual(acquire(False), False)
661 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
662
663 self.assertEqual(acquire(False, None), False)
664 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
665
666 self.assertEqual(acquire(False, TIMEOUT1), False)
667 self.assertTimingAlmostEqual(acquire.elapsed, 0)
668
669 self.assertEqual(acquire(True, TIMEOUT2), False)
670 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
671
672 self.assertEqual(acquire(timeout=TIMEOUT3), False)
673 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
674
675
676class _TestCondition(BaseTestCase):
677
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000678 @classmethod
679 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000680 cond.acquire()
681 sleeping.release()
682 cond.wait(timeout)
683 woken.release()
684 cond.release()
685
686 def check_invariant(self, cond):
687 # this is only supposed to succeed when there are no sleepers
688 if self.TYPE == 'processes':
689 try:
690 sleepers = (cond._sleeping_count.get_value() -
691 cond._woken_count.get_value())
692 self.assertEqual(sleepers, 0)
693 self.assertEqual(cond._wait_semaphore.get_value(), 0)
694 except NotImplementedError:
695 pass
696
697 def test_notify(self):
698 cond = self.Condition()
699 sleeping = self.Semaphore(0)
700 woken = self.Semaphore(0)
701
702 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000703 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704 p.start()
705
706 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000707 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000708 p.start()
709
710 # wait for both children to start sleeping
711 sleeping.acquire()
712 sleeping.acquire()
713
714 # check no process/thread has woken up
715 time.sleep(DELTA)
716 self.assertReturnsIfImplemented(0, get_value, woken)
717
718 # wake up one process/thread
719 cond.acquire()
720 cond.notify()
721 cond.release()
722
723 # check one process/thread has woken up
724 time.sleep(DELTA)
725 self.assertReturnsIfImplemented(1, get_value, woken)
726
727 # wake up another
728 cond.acquire()
729 cond.notify()
730 cond.release()
731
732 # check other has woken up
733 time.sleep(DELTA)
734 self.assertReturnsIfImplemented(2, get_value, woken)
735
736 # check state is not mucked up
737 self.check_invariant(cond)
738 p.join()
739
740 def test_notify_all(self):
741 cond = self.Condition()
742 sleeping = self.Semaphore(0)
743 woken = self.Semaphore(0)
744
745 # start some threads/processes which will timeout
746 for i in range(3):
747 p = self.Process(target=self.f,
748 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000749 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000750 p.start()
751
752 t = threading.Thread(target=self.f,
753 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000754 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000755 t.start()
756
757 # wait for them all to sleep
758 for i in xrange(6):
759 sleeping.acquire()
760
761 # check they have all timed out
762 for i in xrange(6):
763 woken.acquire()
764 self.assertReturnsIfImplemented(0, get_value, woken)
765
766 # check state is not mucked up
767 self.check_invariant(cond)
768
769 # start some more threads/processes
770 for i in range(3):
771 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000772 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000773 p.start()
774
775 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000776 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000777 t.start()
778
779 # wait for them to all sleep
780 for i in xrange(6):
781 sleeping.acquire()
782
783 # check no process/thread has woken up
784 time.sleep(DELTA)
785 self.assertReturnsIfImplemented(0, get_value, woken)
786
787 # wake them all up
788 cond.acquire()
789 cond.notify_all()
790 cond.release()
791
792 # check they have all woken
793 time.sleep(DELTA)
794 self.assertReturnsIfImplemented(6, get_value, woken)
795
796 # check state is not mucked up
797 self.check_invariant(cond)
798
799 def test_timeout(self):
800 cond = self.Condition()
801 wait = TimingWrapper(cond.wait)
802 cond.acquire()
803 res = wait(TIMEOUT1)
804 cond.release()
805 self.assertEqual(res, None)
806 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
807
808
809class _TestEvent(BaseTestCase):
810
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000811 @classmethod
812 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000813 time.sleep(TIMEOUT2)
814 event.set()
815
816 def test_event(self):
817 event = self.Event()
818 wait = TimingWrapper(event.wait)
819
Ezio Melottic2077b02011-03-16 12:34:31 +0200820 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000821 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000822 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000823
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000824 # Removed, threading.Event.wait() will return the value of the __flag
825 # instead of None. API Shear with the semaphore backed mp.Event
826 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000827 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000828 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000829 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
830
831 event.set()
832
833 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000834 self.assertEqual(event.is_set(), True)
835 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000836 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000837 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000838 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
839 # self.assertEqual(event.is_set(), True)
840
841 event.clear()
842
843 #self.assertEqual(event.is_set(), False)
844
Jesus Cea6f6016b2011-09-09 20:26:57 +0200845 p = self.Process(target=self._test_event, args=(event,))
846 p.daemon = True
847 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000848 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000849
850#
851#
852#
853
854class _TestValue(BaseTestCase):
855
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000856 ALLOWED_TYPES = ('processes',)
857
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 codes_values = [
859 ('i', 4343, 24234),
860 ('d', 3.625, -4.25),
861 ('h', -232, 234),
862 ('c', latin('x'), latin('y'))
863 ]
864
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000865 def setUp(self):
866 if not HAS_SHAREDCTYPES:
867 self.skipTest("requires multiprocessing.sharedctypes")
868
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000869 @classmethod
870 def _test(cls, values):
871 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000872 sv.value = cv[2]
873
874
875 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000876 if raw:
877 values = [self.RawValue(code, value)
878 for code, value, _ in self.codes_values]
879 else:
880 values = [self.Value(code, value)
881 for code, value, _ in self.codes_values]
882
883 for sv, cv in zip(values, self.codes_values):
884 self.assertEqual(sv.value, cv[1])
885
886 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200887 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000888 proc.start()
889 proc.join()
890
891 for sv, cv in zip(values, self.codes_values):
892 self.assertEqual(sv.value, cv[2])
893
894 def test_rawvalue(self):
895 self.test_value(raw=True)
896
897 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000898 val1 = self.Value('i', 5)
899 lock1 = val1.get_lock()
900 obj1 = val1.get_obj()
901
902 val2 = self.Value('i', 5, lock=None)
903 lock2 = val2.get_lock()
904 obj2 = val2.get_obj()
905
906 lock = self.Lock()
907 val3 = self.Value('i', 5, lock=lock)
908 lock3 = val3.get_lock()
909 obj3 = val3.get_obj()
910 self.assertEqual(lock, lock3)
911
Jesse Noller6ab22152009-01-18 02:45:38 +0000912 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000913 self.assertFalse(hasattr(arr4, 'get_lock'))
914 self.assertFalse(hasattr(arr4, 'get_obj'))
915
Jesse Noller6ab22152009-01-18 02:45:38 +0000916 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
917
918 arr5 = self.RawValue('i', 5)
919 self.assertFalse(hasattr(arr5, 'get_lock'))
920 self.assertFalse(hasattr(arr5, 'get_obj'))
921
Benjamin Petersondfd79492008-06-13 19:13:39 +0000922
923class _TestArray(BaseTestCase):
924
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000925 ALLOWED_TYPES = ('processes',)
926
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000927 @classmethod
928 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 for i in range(1, len(seq)):
930 seq[i] += seq[i-1]
931
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000932 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000933 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000934 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
935 if raw:
936 arr = self.RawArray('i', seq)
937 else:
938 arr = self.Array('i', seq)
939
940 self.assertEqual(len(arr), len(seq))
941 self.assertEqual(arr[3], seq[3])
942 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
943
944 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
945
946 self.assertEqual(list(arr[:]), seq)
947
948 self.f(seq)
949
950 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200951 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000952 p.start()
953 p.join()
954
955 self.assertEqual(list(arr[:]), seq)
956
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000957 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000958 def test_array_from_size(self):
959 size = 10
960 # Test for zeroing (see issue #11675).
961 # The repetition below strengthens the test by increasing the chances
962 # of previously allocated non-zero memory being used for the new array
963 # on the 2nd and 3rd loops.
964 for _ in range(3):
965 arr = self.Array('i', size)
966 self.assertEqual(len(arr), size)
967 self.assertEqual(list(arr), [0] * size)
968 arr[:] = range(10)
969 self.assertEqual(list(arr), range(10))
970 del arr
971
972 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000973 def test_rawarray(self):
974 self.test_array(raw=True)
975
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000976 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000977 def test_array_accepts_long(self):
978 arr = self.Array('i', 10L)
979 self.assertEqual(len(arr), 10)
980 raw_arr = self.RawArray('i', 10L)
981 self.assertEqual(len(raw_arr), 10)
982
983 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000984 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000985 arr1 = self.Array('i', range(10))
986 lock1 = arr1.get_lock()
987 obj1 = arr1.get_obj()
988
989 arr2 = self.Array('i', range(10), lock=None)
990 lock2 = arr2.get_lock()
991 obj2 = arr2.get_obj()
992
993 lock = self.Lock()
994 arr3 = self.Array('i', range(10), lock=lock)
995 lock3 = arr3.get_lock()
996 obj3 = arr3.get_obj()
997 self.assertEqual(lock, lock3)
998
Jesse Noller6ab22152009-01-18 02:45:38 +0000999 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001000 self.assertFalse(hasattr(arr4, 'get_lock'))
1001 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001002 self.assertRaises(AttributeError,
1003 self.Array, 'i', range(10), lock='notalock')
1004
1005 arr5 = self.RawArray('i', range(10))
1006 self.assertFalse(hasattr(arr5, 'get_lock'))
1007 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001008
1009#
1010#
1011#
1012
1013class _TestContainers(BaseTestCase):
1014
1015 ALLOWED_TYPES = ('manager',)
1016
1017 def test_list(self):
1018 a = self.list(range(10))
1019 self.assertEqual(a[:], range(10))
1020
1021 b = self.list()
1022 self.assertEqual(b[:], [])
1023
1024 b.extend(range(5))
1025 self.assertEqual(b[:], range(5))
1026
1027 self.assertEqual(b[2], 2)
1028 self.assertEqual(b[2:10], [2,3,4])
1029
1030 b *= 2
1031 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1032
1033 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1034
1035 self.assertEqual(a[:], range(10))
1036
1037 d = [a, b]
1038 e = self.list(d)
1039 self.assertEqual(
1040 e[:],
1041 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1042 )
1043
1044 f = self.list([a])
1045 a.append('hello')
1046 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1047
1048 def test_dict(self):
1049 d = self.dict()
1050 indices = range(65, 70)
1051 for i in indices:
1052 d[i] = chr(i)
1053 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1054 self.assertEqual(sorted(d.keys()), indices)
1055 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1056 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1057
1058 def test_namespace(self):
1059 n = self.Namespace()
1060 n.name = 'Bob'
1061 n.job = 'Builder'
1062 n._hidden = 'hidden'
1063 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1064 del n.job
1065 self.assertEqual(str(n), "Namespace(name='Bob')")
1066 self.assertTrue(hasattr(n, 'name'))
1067 self.assertTrue(not hasattr(n, 'job'))
1068
1069#
1070#
1071#
1072
1073def sqr(x, wait=0.0):
1074 time.sleep(wait)
1075 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001076class _TestPool(BaseTestCase):
1077
1078 def test_apply(self):
1079 papply = self.pool.apply
1080 self.assertEqual(papply(sqr, (5,)), sqr(5))
1081 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1082
1083 def test_map(self):
1084 pmap = self.pool.map
1085 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1086 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1087 map(sqr, range(100)))
1088
Jesse Noller7530e472009-07-16 14:23:04 +00001089 def test_map_chunksize(self):
1090 try:
1091 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1092 except multiprocessing.TimeoutError:
1093 self.fail("pool.map_async with chunksize stalled on null list")
1094
Benjamin Petersondfd79492008-06-13 19:13:39 +00001095 def test_async(self):
1096 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1097 get = TimingWrapper(res.get)
1098 self.assertEqual(get(), 49)
1099 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1100
1101 def test_async_timeout(self):
1102 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1103 get = TimingWrapper(res.get)
1104 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1105 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1106
1107 def test_imap(self):
1108 it = self.pool.imap(sqr, range(10))
1109 self.assertEqual(list(it), map(sqr, range(10)))
1110
1111 it = self.pool.imap(sqr, range(10))
1112 for i in range(10):
1113 self.assertEqual(it.next(), i*i)
1114 self.assertRaises(StopIteration, it.next)
1115
1116 it = self.pool.imap(sqr, range(1000), chunksize=100)
1117 for i in range(1000):
1118 self.assertEqual(it.next(), i*i)
1119 self.assertRaises(StopIteration, it.next)
1120
1121 def test_imap_unordered(self):
1122 it = self.pool.imap_unordered(sqr, range(1000))
1123 self.assertEqual(sorted(it), map(sqr, range(1000)))
1124
1125 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1126 self.assertEqual(sorted(it), map(sqr, range(1000)))
1127
1128 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001129 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1130 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1131
Benjamin Petersondfd79492008-06-13 19:13:39 +00001132 p = multiprocessing.Pool(3)
1133 self.assertEqual(3, len(p._pool))
1134 p.close()
1135 p.join()
1136
1137 def test_terminate(self):
1138 if self.TYPE == 'manager':
1139 # On Unix a forked process increfs each shared object to
1140 # which its parent process held a reference. If the
1141 # forked process gets terminated then there is likely to
1142 # be a reference leak. So to prevent
1143 # _TestZZZNumberOfObjects from failing we skip this test
1144 # when using a manager.
1145 return
1146
1147 result = self.pool.map_async(
1148 time.sleep, [0.1 for i in range(10000)], chunksize=1
1149 )
1150 self.pool.terminate()
1151 join = TimingWrapper(self.pool.join)
1152 join()
1153 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001154
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001155def unpickleable_result():
1156 return lambda: 42
1157
1158class _TestPoolWorkerErrors(BaseTestCase):
1159 ALLOWED_TYPES = ('processes', )
1160
1161 def test_unpickleable_result(self):
1162 from multiprocessing.pool import MaybeEncodingError
1163 p = multiprocessing.Pool(2)
1164
1165 # Make sure we don't lose pool processes because of encoding errors.
1166 for iteration in range(20):
1167 res = p.apply_async(unpickleable_result)
1168 self.assertRaises(MaybeEncodingError, res.get)
1169
1170 p.close()
1171 p.join()
1172
Jesse Noller654ade32010-01-27 03:05:57 +00001173class _TestPoolWorkerLifetime(BaseTestCase):
1174
1175 ALLOWED_TYPES = ('processes', )
1176 def test_pool_worker_lifetime(self):
1177 p = multiprocessing.Pool(3, maxtasksperchild=10)
1178 self.assertEqual(3, len(p._pool))
1179 origworkerpids = [w.pid for w in p._pool]
1180 # Run many tasks so each worker gets replaced (hopefully)
1181 results = []
1182 for i in range(100):
1183 results.append(p.apply_async(sqr, (i, )))
1184 # Fetch the results and verify we got the right answers,
1185 # also ensuring all the tasks have completed.
1186 for (j, res) in enumerate(results):
1187 self.assertEqual(res.get(), sqr(j))
1188 # Refill the pool
1189 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001190 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001191 # (countdown * DELTA = 5 seconds max startup process time)
1192 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001193 while countdown and not all(w.is_alive() for w in p._pool):
1194 countdown -= 1
1195 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001196 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001197 # All pids should be assigned. See issue #7805.
1198 self.assertNotIn(None, origworkerpids)
1199 self.assertNotIn(None, finalworkerpids)
1200 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001201 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1202 p.close()
1203 p.join()
1204
Charles-François Natali46f990e2011-10-24 18:43:51 +02001205 def test_pool_worker_lifetime_early_close(self):
1206 # Issue #10332: closing a pool whose workers have limited lifetimes
1207 # before all the tasks completed would make join() hang.
1208 p = multiprocessing.Pool(3, maxtasksperchild=1)
1209 results = []
1210 for i in range(6):
1211 results.append(p.apply_async(sqr, (i, 0.3)))
1212 p.close()
1213 p.join()
1214 # check the results
1215 for (j, res) in enumerate(results):
1216 self.assertEqual(res.get(), sqr(j))
1217
1218
Benjamin Petersondfd79492008-06-13 19:13:39 +00001219#
1220# Test that manager has expected number of shared objects left
1221#
1222
1223class _TestZZZNumberOfObjects(BaseTestCase):
1224 # Because test cases are sorted alphabetically, this one will get
1225 # run after all the other tests for the manager. It tests that
1226 # there have been no "reference leaks" for the manager's shared
1227 # objects. Note the comment in _TestPool.test_terminate().
1228 ALLOWED_TYPES = ('manager',)
1229
1230 def test_number_of_objects(self):
1231 EXPECTED_NUMBER = 1 # the pool object is still alive
1232 multiprocessing.active_children() # discard dead process objs
1233 gc.collect() # do garbage collection
1234 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001235 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001236 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001237 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001238 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001239
1240 self.assertEqual(refs, EXPECTED_NUMBER)
1241
1242#
1243# Test of creating a customized manager class
1244#
1245
1246from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1247
1248class FooBar(object):
1249 def f(self):
1250 return 'f()'
1251 def g(self):
1252 raise ValueError
1253 def _h(self):
1254 return '_h()'
1255
1256def baz():
1257 for i in xrange(10):
1258 yield i*i
1259
1260class IteratorProxy(BaseProxy):
1261 _exposed_ = ('next', '__next__')
1262 def __iter__(self):
1263 return self
1264 def next(self):
1265 return self._callmethod('next')
1266 def __next__(self):
1267 return self._callmethod('__next__')
1268
1269class MyManager(BaseManager):
1270 pass
1271
1272MyManager.register('Foo', callable=FooBar)
1273MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1274MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1275
1276
1277class _TestMyManager(BaseTestCase):
1278
1279 ALLOWED_TYPES = ('manager',)
1280
1281 def test_mymanager(self):
1282 manager = MyManager()
1283 manager.start()
1284
1285 foo = manager.Foo()
1286 bar = manager.Bar()
1287 baz = manager.baz()
1288
1289 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1290 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1291
1292 self.assertEqual(foo_methods, ['f', 'g'])
1293 self.assertEqual(bar_methods, ['f', '_h'])
1294
1295 self.assertEqual(foo.f(), 'f()')
1296 self.assertRaises(ValueError, foo.g)
1297 self.assertEqual(foo._callmethod('f'), 'f()')
1298 self.assertRaises(RemoteError, foo._callmethod, '_h')
1299
1300 self.assertEqual(bar.f(), 'f()')
1301 self.assertEqual(bar._h(), '_h()')
1302 self.assertEqual(bar._callmethod('f'), 'f()')
1303 self.assertEqual(bar._callmethod('_h'), '_h()')
1304
1305 self.assertEqual(list(baz), [i*i for i in range(10)])
1306
1307 manager.shutdown()
1308
1309#
1310# Test of connecting to a remote server and using xmlrpclib for serialization
1311#
1312
1313_queue = Queue.Queue()
1314def get_queue():
1315 return _queue
1316
1317class QueueManager(BaseManager):
1318 '''manager class used by server process'''
1319QueueManager.register('get_queue', callable=get_queue)
1320
1321class QueueManager2(BaseManager):
1322 '''manager class which specifies the same interface as QueueManager'''
1323QueueManager2.register('get_queue')
1324
1325
1326SERIALIZER = 'xmlrpclib'
1327
1328class _TestRemoteManager(BaseTestCase):
1329
1330 ALLOWED_TYPES = ('manager',)
1331
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001332 @classmethod
1333 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001334 manager = QueueManager2(
1335 address=address, authkey=authkey, serializer=SERIALIZER
1336 )
1337 manager.connect()
1338 queue = manager.get_queue()
1339 queue.put(('hello world', None, True, 2.25))
1340
1341 def test_remote(self):
1342 authkey = os.urandom(32)
1343
1344 manager = QueueManager(
1345 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1346 )
1347 manager.start()
1348
1349 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001350 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001351 p.start()
1352
1353 manager2 = QueueManager2(
1354 address=manager.address, authkey=authkey, serializer=SERIALIZER
1355 )
1356 manager2.connect()
1357 queue = manager2.get_queue()
1358
1359 # Note that xmlrpclib will deserialize object as a list not a tuple
1360 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1361
1362 # Because we are using xmlrpclib for serialization instead of
1363 # pickle this will cause a serialization error.
1364 self.assertRaises(Exception, queue.put, time.sleep)
1365
1366 # Make queue finalizer run before the server is stopped
1367 del queue
1368 manager.shutdown()
1369
Jesse Noller459a6482009-03-30 15:50:42 +00001370class _TestManagerRestart(BaseTestCase):
1371
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001372 @classmethod
1373 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001374 manager = QueueManager(
1375 address=address, authkey=authkey, serializer=SERIALIZER)
1376 manager.connect()
1377 queue = manager.get_queue()
1378 queue.put('hello world')
1379
1380 def test_rapid_restart(self):
1381 authkey = os.urandom(32)
1382 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001383 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001384 srvr = manager.get_server()
1385 addr = srvr.address
1386 # Close the connection.Listener socket which gets opened as a part
1387 # of manager.get_server(). It's not needed for the test.
1388 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001389 manager.start()
1390
1391 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001392 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001393 p.start()
1394 queue = manager.get_queue()
1395 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001396 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001397 manager.shutdown()
1398 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001399 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001400 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001401 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001402
Benjamin Petersondfd79492008-06-13 19:13:39 +00001403#
1404#
1405#
1406
1407SENTINEL = latin('')
1408
1409class _TestConnection(BaseTestCase):
1410
1411 ALLOWED_TYPES = ('processes', 'threads')
1412
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001413 @classmethod
1414 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001415 for msg in iter(conn.recv_bytes, SENTINEL):
1416 conn.send_bytes(msg)
1417 conn.close()
1418
1419 def test_connection(self):
1420 conn, child_conn = self.Pipe()
1421
1422 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001423 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001424 p.start()
1425
1426 seq = [1, 2.25, None]
1427 msg = latin('hello world')
1428 longmsg = msg * 10
1429 arr = array.array('i', range(4))
1430
1431 if self.TYPE == 'processes':
1432 self.assertEqual(type(conn.fileno()), int)
1433
1434 self.assertEqual(conn.send(seq), None)
1435 self.assertEqual(conn.recv(), seq)
1436
1437 self.assertEqual(conn.send_bytes(msg), None)
1438 self.assertEqual(conn.recv_bytes(), msg)
1439
1440 if self.TYPE == 'processes':
1441 buffer = array.array('i', [0]*10)
1442 expected = list(arr) + [0] * (10 - len(arr))
1443 self.assertEqual(conn.send_bytes(arr), None)
1444 self.assertEqual(conn.recv_bytes_into(buffer),
1445 len(arr) * buffer.itemsize)
1446 self.assertEqual(list(buffer), expected)
1447
1448 buffer = array.array('i', [0]*10)
1449 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1450 self.assertEqual(conn.send_bytes(arr), None)
1451 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1452 len(arr) * buffer.itemsize)
1453 self.assertEqual(list(buffer), expected)
1454
1455 buffer = bytearray(latin(' ' * 40))
1456 self.assertEqual(conn.send_bytes(longmsg), None)
1457 try:
1458 res = conn.recv_bytes_into(buffer)
1459 except multiprocessing.BufferTooShort, e:
1460 self.assertEqual(e.args, (longmsg,))
1461 else:
1462 self.fail('expected BufferTooShort, got %s' % res)
1463
1464 poll = TimingWrapper(conn.poll)
1465
1466 self.assertEqual(poll(), False)
1467 self.assertTimingAlmostEqual(poll.elapsed, 0)
1468
1469 self.assertEqual(poll(TIMEOUT1), False)
1470 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1471
1472 conn.send(None)
1473
1474 self.assertEqual(poll(TIMEOUT1), True)
1475 self.assertTimingAlmostEqual(poll.elapsed, 0)
1476
1477 self.assertEqual(conn.recv(), None)
1478
1479 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1480 conn.send_bytes(really_big_msg)
1481 self.assertEqual(conn.recv_bytes(), really_big_msg)
1482
1483 conn.send_bytes(SENTINEL) # tell child to quit
1484 child_conn.close()
1485
1486 if self.TYPE == 'processes':
1487 self.assertEqual(conn.readable, True)
1488 self.assertEqual(conn.writable, True)
1489 self.assertRaises(EOFError, conn.recv)
1490 self.assertRaises(EOFError, conn.recv_bytes)
1491
1492 p.join()
1493
1494 def test_duplex_false(self):
1495 reader, writer = self.Pipe(duplex=False)
1496 self.assertEqual(writer.send(1), None)
1497 self.assertEqual(reader.recv(), 1)
1498 if self.TYPE == 'processes':
1499 self.assertEqual(reader.readable, True)
1500 self.assertEqual(reader.writable, False)
1501 self.assertEqual(writer.readable, False)
1502 self.assertEqual(writer.writable, True)
1503 self.assertRaises(IOError, reader.send, 2)
1504 self.assertRaises(IOError, writer.recv)
1505 self.assertRaises(IOError, writer.poll)
1506
1507 def test_spawn_close(self):
1508 # We test that a pipe connection can be closed by parent
1509 # process immediately after child is spawned. On Windows this
1510 # would have sometimes failed on old versions because
1511 # child_conn would be closed before the child got a chance to
1512 # duplicate it.
1513 conn, child_conn = self.Pipe()
1514
1515 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001516 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001517 p.start()
1518 child_conn.close() # this might complete before child initializes
1519
1520 msg = latin('hello')
1521 conn.send_bytes(msg)
1522 self.assertEqual(conn.recv_bytes(), msg)
1523
1524 conn.send_bytes(SENTINEL)
1525 conn.close()
1526 p.join()
1527
1528 def test_sendbytes(self):
1529 if self.TYPE != 'processes':
1530 return
1531
1532 msg = latin('abcdefghijklmnopqrstuvwxyz')
1533 a, b = self.Pipe()
1534
1535 a.send_bytes(msg)
1536 self.assertEqual(b.recv_bytes(), msg)
1537
1538 a.send_bytes(msg, 5)
1539 self.assertEqual(b.recv_bytes(), msg[5:])
1540
1541 a.send_bytes(msg, 7, 8)
1542 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1543
1544 a.send_bytes(msg, 26)
1545 self.assertEqual(b.recv_bytes(), latin(''))
1546
1547 a.send_bytes(msg, 26, 0)
1548 self.assertEqual(b.recv_bytes(), latin(''))
1549
1550 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1551
1552 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1553
1554 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1555
1556 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1557
1558 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1559
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001560 @classmethod
1561 def _is_fd_assigned(cls, fd):
1562 try:
1563 os.fstat(fd)
1564 except OSError as e:
1565 if e.errno == errno.EBADF:
1566 return False
1567 raise
1568 else:
1569 return True
1570
1571 @classmethod
1572 def _writefd(cls, conn, data, create_dummy_fds=False):
1573 if create_dummy_fds:
1574 for i in range(0, 256):
1575 if not cls._is_fd_assigned(i):
1576 os.dup2(conn.fileno(), i)
1577 fd = reduction.recv_handle(conn)
1578 if msvcrt:
1579 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1580 os.write(fd, data)
1581 os.close(fd)
1582
Charles-François Natalif8413b22011-09-21 18:44:49 +02001583 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001584 def test_fd_transfer(self):
1585 if self.TYPE != 'processes':
1586 self.skipTest("only makes sense with processes")
1587 conn, child_conn = self.Pipe(duplex=True)
1588
1589 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001590 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001591 p.start()
1592 with open(test_support.TESTFN, "wb") as f:
1593 fd = f.fileno()
1594 if msvcrt:
1595 fd = msvcrt.get_osfhandle(fd)
1596 reduction.send_handle(conn, fd, p.pid)
1597 p.join()
1598 with open(test_support.TESTFN, "rb") as f:
1599 self.assertEqual(f.read(), b"foo")
1600
Charles-François Natalif8413b22011-09-21 18:44:49 +02001601 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001602 @unittest.skipIf(sys.platform == "win32",
1603 "test semantics don't make sense on Windows")
1604 @unittest.skipIf(MAXFD <= 256,
1605 "largest assignable fd number is too small")
1606 @unittest.skipUnless(hasattr(os, "dup2"),
1607 "test needs os.dup2()")
1608 def test_large_fd_transfer(self):
1609 # With fd > 256 (issue #11657)
1610 if self.TYPE != 'processes':
1611 self.skipTest("only makes sense with processes")
1612 conn, child_conn = self.Pipe(duplex=True)
1613
1614 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001615 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001616 p.start()
1617 with open(test_support.TESTFN, "wb") as f:
1618 fd = f.fileno()
1619 for newfd in range(256, MAXFD):
1620 if not self._is_fd_assigned(newfd):
1621 break
1622 else:
1623 self.fail("could not find an unassigned large file descriptor")
1624 os.dup2(fd, newfd)
1625 try:
1626 reduction.send_handle(conn, newfd, p.pid)
1627 finally:
1628 os.close(newfd)
1629 p.join()
1630 with open(test_support.TESTFN, "rb") as f:
1631 self.assertEqual(f.read(), b"bar")
1632
Jesus Ceac23484b2011-09-21 03:47:39 +02001633 @classmethod
1634 def _send_data_without_fd(self, conn):
1635 os.write(conn.fileno(), b"\0")
1636
Charles-François Natalif8413b22011-09-21 18:44:49 +02001637 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001638 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1639 def test_missing_fd_transfer(self):
1640 # Check that exception is raised when received data is not
1641 # accompanied by a file descriptor in ancillary data.
1642 if self.TYPE != 'processes':
1643 self.skipTest("only makes sense with processes")
1644 conn, child_conn = self.Pipe(duplex=True)
1645
1646 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1647 p.daemon = True
1648 p.start()
1649 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1650 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001651
Benjamin Petersondfd79492008-06-13 19:13:39 +00001652class _TestListenerClient(BaseTestCase):
1653
1654 ALLOWED_TYPES = ('processes', 'threads')
1655
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001656 @classmethod
1657 def _test(cls, address):
1658 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001659 conn.send('hello')
1660 conn.close()
1661
1662 def test_listener_client(self):
1663 for family in self.connection.families:
1664 l = self.connection.Listener(family=family)
1665 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001666 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001667 p.start()
1668 conn = l.accept()
1669 self.assertEqual(conn.recv(), 'hello')
1670 p.join()
1671 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001672#
1673# Test of sending connection and socket objects between processes
1674#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001675"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001676class _TestPicklingConnections(BaseTestCase):
1677
1678 ALLOWED_TYPES = ('processes',)
1679
1680 def _listener(self, conn, families):
1681 for fam in families:
1682 l = self.connection.Listener(family=fam)
1683 conn.send(l.address)
1684 new_conn = l.accept()
1685 conn.send(new_conn)
1686
1687 if self.TYPE == 'processes':
1688 l = socket.socket()
1689 l.bind(('localhost', 0))
1690 conn.send(l.getsockname())
1691 l.listen(1)
1692 new_conn, addr = l.accept()
1693 conn.send(new_conn)
1694
1695 conn.recv()
1696
1697 def _remote(self, conn):
1698 for (address, msg) in iter(conn.recv, None):
1699 client = self.connection.Client(address)
1700 client.send(msg.upper())
1701 client.close()
1702
1703 if self.TYPE == 'processes':
1704 address, msg = conn.recv()
1705 client = socket.socket()
1706 client.connect(address)
1707 client.sendall(msg.upper())
1708 client.close()
1709
1710 conn.close()
1711
1712 def test_pickling(self):
1713 try:
1714 multiprocessing.allow_connection_pickling()
1715 except ImportError:
1716 return
1717
1718 families = self.connection.families
1719
1720 lconn, lconn0 = self.Pipe()
1721 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001722 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001723 lp.start()
1724 lconn0.close()
1725
1726 rconn, rconn0 = self.Pipe()
1727 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001728 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001729 rp.start()
1730 rconn0.close()
1731
1732 for fam in families:
1733 msg = ('This connection uses family %s' % fam).encode('ascii')
1734 address = lconn.recv()
1735 rconn.send((address, msg))
1736 new_conn = lconn.recv()
1737 self.assertEqual(new_conn.recv(), msg.upper())
1738
1739 rconn.send(None)
1740
1741 if self.TYPE == 'processes':
1742 msg = latin('This connection uses a normal socket')
1743 address = lconn.recv()
1744 rconn.send((address, msg))
1745 if hasattr(socket, 'fromfd'):
1746 new_conn = lconn.recv()
1747 self.assertEqual(new_conn.recv(100), msg.upper())
1748 else:
1749 # XXX On Windows with Py2.6 need to backport fromfd()
1750 discard = lconn.recv_bytes()
1751
1752 lconn.send(None)
1753
1754 rconn.close()
1755 lconn.close()
1756
1757 lp.join()
1758 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001759"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001760#
1761#
1762#
1763
1764class _TestHeap(BaseTestCase):
1765
1766 ALLOWED_TYPES = ('processes',)
1767
1768 def test_heap(self):
1769 iterations = 5000
1770 maxblocks = 50
1771 blocks = []
1772
1773 # create and destroy lots of blocks of different sizes
1774 for i in xrange(iterations):
1775 size = int(random.lognormvariate(0, 1) * 1000)
1776 b = multiprocessing.heap.BufferWrapper(size)
1777 blocks.append(b)
1778 if len(blocks) > maxblocks:
1779 i = random.randrange(maxblocks)
1780 del blocks[i]
1781
1782 # get the heap object
1783 heap = multiprocessing.heap.BufferWrapper._heap
1784
1785 # verify the state of the heap
1786 all = []
1787 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001788 heap._lock.acquire()
1789 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001790 for L in heap._len_to_seq.values():
1791 for arena, start, stop in L:
1792 all.append((heap._arenas.index(arena), start, stop,
1793 stop-start, 'free'))
1794 for arena, start, stop in heap._allocated_blocks:
1795 all.append((heap._arenas.index(arena), start, stop,
1796 stop-start, 'occupied'))
1797 occupied += (stop-start)
1798
1799 all.sort()
1800
1801 for i in range(len(all)-1):
1802 (arena, start, stop) = all[i][:3]
1803 (narena, nstart, nstop) = all[i+1][:3]
1804 self.assertTrue((arena != narena and nstart == 0) or
1805 (stop == nstart))
1806
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001807 def test_free_from_gc(self):
1808 # Check that freeing of blocks by the garbage collector doesn't deadlock
1809 # (issue #12352).
1810 # Make sure the GC is enabled, and set lower collection thresholds to
1811 # make collections more frequent (and increase the probability of
1812 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001813 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001814 gc.enable()
1815 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001816 thresholds = gc.get_threshold()
1817 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001818 gc.set_threshold(10)
1819
1820 # perform numerous block allocations, with cyclic references to make
1821 # sure objects are collected asynchronously by the gc
1822 for i in range(5000):
1823 a = multiprocessing.heap.BufferWrapper(1)
1824 b = multiprocessing.heap.BufferWrapper(1)
1825 # circular references
1826 a.buddy = b
1827 b.buddy = a
1828
Benjamin Petersondfd79492008-06-13 19:13:39 +00001829#
1830#
1831#
1832
Benjamin Petersondfd79492008-06-13 19:13:39 +00001833class _Foo(Structure):
1834 _fields_ = [
1835 ('x', c_int),
1836 ('y', c_double)
1837 ]
1838
1839class _TestSharedCTypes(BaseTestCase):
1840
1841 ALLOWED_TYPES = ('processes',)
1842
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001843 def setUp(self):
1844 if not HAS_SHAREDCTYPES:
1845 self.skipTest("requires multiprocessing.sharedctypes")
1846
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001847 @classmethod
1848 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001849 x.value *= 2
1850 y.value *= 2
1851 foo.x *= 2
1852 foo.y *= 2
1853 string.value *= 2
1854 for i in range(len(arr)):
1855 arr[i] *= 2
1856
1857 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001858 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001859 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001860 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001861 arr = self.Array('d', range(10), lock=lock)
1862 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001863 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001864
1865 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001866 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001867 p.start()
1868 p.join()
1869
1870 self.assertEqual(x.value, 14)
1871 self.assertAlmostEqual(y.value, 2.0/3.0)
1872 self.assertEqual(foo.x, 6)
1873 self.assertAlmostEqual(foo.y, 4.0)
1874 for i in range(10):
1875 self.assertAlmostEqual(arr[i], i*2)
1876 self.assertEqual(string.value, latin('hellohello'))
1877
1878 def test_synchronize(self):
1879 self.test_sharedctypes(lock=True)
1880
1881 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001882 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001883 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001884 foo.x = 0
1885 foo.y = 0
1886 self.assertEqual(bar.x, 2)
1887 self.assertAlmostEqual(bar.y, 5.0)
1888
1889#
1890#
1891#
1892
1893class _TestFinalize(BaseTestCase):
1894
1895 ALLOWED_TYPES = ('processes',)
1896
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001897 @classmethod
1898 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001899 class Foo(object):
1900 pass
1901
1902 a = Foo()
1903 util.Finalize(a, conn.send, args=('a',))
1904 del a # triggers callback for a
1905
1906 b = Foo()
1907 close_b = util.Finalize(b, conn.send, args=('b',))
1908 close_b() # triggers callback for b
1909 close_b() # does nothing because callback has already been called
1910 del b # does nothing because callback has already been called
1911
1912 c = Foo()
1913 util.Finalize(c, conn.send, args=('c',))
1914
1915 d10 = Foo()
1916 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1917
1918 d01 = Foo()
1919 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1920 d02 = Foo()
1921 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1922 d03 = Foo()
1923 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1924
1925 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1926
1927 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1928
Ezio Melottic2077b02011-03-16 12:34:31 +02001929 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001930 # garbage collecting locals
1931 util._exit_function()
1932 conn.close()
1933 os._exit(0)
1934
1935 def test_finalize(self):
1936 conn, child_conn = self.Pipe()
1937
1938 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001939 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001940 p.start()
1941 p.join()
1942
1943 result = [obj for obj in iter(conn.recv, 'STOP')]
1944 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1945
1946#
1947# Test that from ... import * works for each module
1948#
1949
1950class _TestImportStar(BaseTestCase):
1951
1952 ALLOWED_TYPES = ('processes',)
1953
1954 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001955 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001956 'multiprocessing', 'multiprocessing.connection',
1957 'multiprocessing.heap', 'multiprocessing.managers',
1958 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001959 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001960 ]
1961
Charles-François Natalif8413b22011-09-21 18:44:49 +02001962 if HAS_REDUCTION:
1963 modules.append('multiprocessing.reduction')
1964
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001965 if c_int is not None:
1966 # This module requires _ctypes
1967 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001968
1969 for name in modules:
1970 __import__(name)
1971 mod = sys.modules[name]
1972
1973 for attr in getattr(mod, '__all__', ()):
1974 self.assertTrue(
1975 hasattr(mod, attr),
1976 '%r does not have attribute %r' % (mod, attr)
1977 )
1978
1979#
1980# Quick test that logging works -- does not test logging output
1981#
1982
1983class _TestLogging(BaseTestCase):
1984
1985 ALLOWED_TYPES = ('processes',)
1986
1987 def test_enable_logging(self):
1988 logger = multiprocessing.get_logger()
1989 logger.setLevel(util.SUBWARNING)
1990 self.assertTrue(logger is not None)
1991 logger.debug('this will not be printed')
1992 logger.info('nor will this')
1993 logger.setLevel(LOG_LEVEL)
1994
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001995 @classmethod
1996 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001997 logger = multiprocessing.get_logger()
1998 conn.send(logger.getEffectiveLevel())
1999
2000 def test_level(self):
2001 LEVEL1 = 32
2002 LEVEL2 = 37
2003
2004 logger = multiprocessing.get_logger()
2005 root_logger = logging.getLogger()
2006 root_level = root_logger.level
2007
2008 reader, writer = multiprocessing.Pipe(duplex=False)
2009
2010 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002011 p = self.Process(target=self._test_level, args=(writer,))
2012 p.daemon = True
2013 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002014 self.assertEqual(LEVEL1, reader.recv())
2015
2016 logger.setLevel(logging.NOTSET)
2017 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002018 p = self.Process(target=self._test_level, args=(writer,))
2019 p.daemon = True
2020 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002021 self.assertEqual(LEVEL2, reader.recv())
2022
2023 root_logger.setLevel(root_level)
2024 logger.setLevel(level=LOG_LEVEL)
2025
Jesse Noller814d02d2009-11-21 14:38:23 +00002026
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002027# class _TestLoggingProcessName(BaseTestCase):
2028#
2029# def handle(self, record):
2030# assert record.processName == multiprocessing.current_process().name
2031# self.__handled = True
2032#
2033# def test_logging(self):
2034# handler = logging.Handler()
2035# handler.handle = self.handle
2036# self.__handled = False
2037# # Bypass getLogger() and side-effects
2038# logger = logging.getLoggerClass()(
2039# 'multiprocessing.test.TestLoggingProcessName')
2040# logger.addHandler(handler)
2041# logger.propagate = False
2042#
2043# logger.warn('foo')
2044# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002045
Benjamin Petersondfd79492008-06-13 19:13:39 +00002046#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002047# Test to verify handle verification, see issue 3321
2048#
2049
2050class TestInvalidHandle(unittest.TestCase):
2051
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002052 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002053 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002054 conn = _multiprocessing.Connection(44977608)
2055 self.assertRaises(IOError, conn.poll)
2056 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002057
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002058#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002059# Functions used to create test cases from the base ones in this module
2060#
2061
2062def get_attributes(Source, names):
2063 d = {}
2064 for name in names:
2065 obj = getattr(Source, name)
2066 if type(obj) == type(get_attributes):
2067 obj = staticmethod(obj)
2068 d[name] = obj
2069 return d
2070
2071def create_test_cases(Mixin, type):
2072 result = {}
2073 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002074 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002075
2076 for name in glob.keys():
2077 if name.startswith('_Test'):
2078 base = glob[name]
2079 if type in base.ALLOWED_TYPES:
2080 newname = 'With' + Type + name[1:]
2081 class Temp(base, unittest.TestCase, Mixin):
2082 pass
2083 result[newname] = Temp
2084 Temp.__name__ = newname
2085 Temp.__module__ = Mixin.__module__
2086 return result
2087
2088#
2089# Create test cases
2090#
2091
2092class ProcessesMixin(object):
2093 TYPE = 'processes'
2094 Process = multiprocessing.Process
2095 locals().update(get_attributes(multiprocessing, (
2096 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2097 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2098 'RawArray', 'current_process', 'active_children', 'Pipe',
2099 'connection', 'JoinableQueue'
2100 )))
2101
2102testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2103globals().update(testcases_processes)
2104
2105
2106class ManagerMixin(object):
2107 TYPE = 'manager'
2108 Process = multiprocessing.Process
2109 manager = object.__new__(multiprocessing.managers.SyncManager)
2110 locals().update(get_attributes(manager, (
2111 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2112 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2113 'Namespace', 'JoinableQueue'
2114 )))
2115
2116testcases_manager = create_test_cases(ManagerMixin, type='manager')
2117globals().update(testcases_manager)
2118
2119
2120class ThreadsMixin(object):
2121 TYPE = 'threads'
2122 Process = multiprocessing.dummy.Process
2123 locals().update(get_attributes(multiprocessing.dummy, (
2124 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2125 'Condition', 'Event', 'Value', 'Array', 'current_process',
2126 'active_children', 'Pipe', 'connection', 'dict', 'list',
2127 'Namespace', 'JoinableQueue'
2128 )))
2129
2130testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2131globals().update(testcases_threads)
2132
Neal Norwitz0c519b32008-08-25 01:50:24 +00002133class OtherTest(unittest.TestCase):
2134 # TODO: add more tests for deliver/answer challenge.
2135 def test_deliver_challenge_auth_failure(self):
2136 class _FakeConnection(object):
2137 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002138 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002139 def send_bytes(self, data):
2140 pass
2141 self.assertRaises(multiprocessing.AuthenticationError,
2142 multiprocessing.connection.deliver_challenge,
2143 _FakeConnection(), b'abc')
2144
2145 def test_answer_challenge_auth_failure(self):
2146 class _FakeConnection(object):
2147 def __init__(self):
2148 self.count = 0
2149 def recv_bytes(self, size):
2150 self.count += 1
2151 if self.count == 1:
2152 return multiprocessing.connection.CHALLENGE
2153 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002154 return b'something bogus'
2155 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002156 def send_bytes(self, data):
2157 pass
2158 self.assertRaises(multiprocessing.AuthenticationError,
2159 multiprocessing.connection.answer_challenge,
2160 _FakeConnection(), b'abc')
2161
Jesse Noller7152f6d2009-04-02 05:17:26 +00002162#
2163# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2164#
2165
2166def initializer(ns):
2167 ns.test += 1
2168
2169class TestInitializers(unittest.TestCase):
2170 def setUp(self):
2171 self.mgr = multiprocessing.Manager()
2172 self.ns = self.mgr.Namespace()
2173 self.ns.test = 0
2174
2175 def tearDown(self):
2176 self.mgr.shutdown()
2177
2178 def test_manager_initializer(self):
2179 m = multiprocessing.managers.SyncManager()
2180 self.assertRaises(TypeError, m.start, 1)
2181 m.start(initializer, (self.ns,))
2182 self.assertEqual(self.ns.test, 1)
2183 m.shutdown()
2184
2185 def test_pool_initializer(self):
2186 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2187 p = multiprocessing.Pool(1, initializer, (self.ns,))
2188 p.close()
2189 p.join()
2190 self.assertEqual(self.ns.test, 1)
2191
Jesse Noller1b90efb2009-06-30 17:11:52 +00002192#
2193# Issue 5155, 5313, 5331: Test process in processes
2194# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2195#
2196
2197def _ThisSubProcess(q):
2198 try:
2199 item = q.get(block=False)
2200 except Queue.Empty:
2201 pass
2202
2203def _TestProcess(q):
2204 queue = multiprocessing.Queue()
2205 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002206 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002207 subProc.start()
2208 subProc.join()
2209
2210def _afunc(x):
2211 return x*x
2212
2213def pool_in_process():
2214 pool = multiprocessing.Pool(processes=4)
2215 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2216
2217class _file_like(object):
2218 def __init__(self, delegate):
2219 self._delegate = delegate
2220 self._pid = None
2221
2222 @property
2223 def cache(self):
2224 pid = os.getpid()
2225 # There are no race conditions since fork keeps only the running thread
2226 if pid != self._pid:
2227 self._pid = pid
2228 self._cache = []
2229 return self._cache
2230
2231 def write(self, data):
2232 self.cache.append(data)
2233
2234 def flush(self):
2235 self._delegate.write(''.join(self.cache))
2236 self._cache = []
2237
2238class TestStdinBadfiledescriptor(unittest.TestCase):
2239
2240 def test_queue_in_process(self):
2241 queue = multiprocessing.Queue()
2242 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2243 proc.start()
2244 proc.join()
2245
2246 def test_pool_in_process(self):
2247 p = multiprocessing.Process(target=pool_in_process)
2248 p.start()
2249 p.join()
2250
2251 def test_flushing(self):
2252 sio = StringIO()
2253 flike = _file_like(sio)
2254 flike.write('foo')
2255 proc = multiprocessing.Process(target=lambda: flike.flush())
2256 flike.flush()
2257 assert sio.getvalue() == 'foo'
2258
2259testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2260 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002261
Benjamin Petersondfd79492008-06-13 19:13:39 +00002262#
2263#
2264#
2265
2266def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002267 if sys.platform.startswith("linux"):
2268 try:
2269 lock = multiprocessing.RLock()
2270 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002271 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002272
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002273 check_enough_semaphores()
2274
Benjamin Petersondfd79492008-06-13 19:13:39 +00002275 if run is None:
2276 from test.test_support import run_unittest as run
2277
2278 util.get_temp_dir() # creates temp directory for use by all processes
2279
2280 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2281
Jesse Noller146b7ab2008-07-02 16:44:09 +00002282 ProcessesMixin.pool = multiprocessing.Pool(4)
2283 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2284 ManagerMixin.manager.__init__()
2285 ManagerMixin.manager.start()
2286 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002287
2288 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002289 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2290 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002291 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2292 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002293 )
2294
2295 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2296 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002297 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2298 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002299 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002300 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002301 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002302 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2303 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2304 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002305 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002306
Jesse Noller146b7ab2008-07-02 16:44:09 +00002307 ThreadsMixin.pool.terminate()
2308 ProcessesMixin.pool.terminate()
2309 ManagerMixin.pool.terminate()
2310 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002311
Jesse Noller146b7ab2008-07-02 16:44:09 +00002312 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002313
2314def main():
2315 test_main(unittest.TextTestRunner(verbosity=2).run)
2316
2317if __name__ == '__main__':
2318 main()