blob: 0fe649799a3b6a2e60219edbc0bae375dfd6effe [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00008import Queue
9import time
10import sys
11import os
12import gc
13import signal
14import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000015import socket
16import random
17import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020018import errno
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
178class _TestProcess(BaseTestCase):
179
180 ALLOWED_TYPES = ('processes', 'threads')
181
182 def test_current(self):
183 if self.TYPE == 'threads':
184 return
185
186 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000191 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEqual(current.ident, os.getpid())
194 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 q.put(args)
200 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000223 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225
226 p.start()
227
Ezio Melotti2623a372010-11-21 13:34:58 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
Ezio Melotti2623a372010-11-21 13:34:58 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.join()
240
Ezio Melotti2623a372010-11-21 13:34:58 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
251 return
252
253 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000254 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000258 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
271
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288
Jesus Cea6f6016b2011-09-09 20:26:57 +0200289 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000290 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000291 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000292
293 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000294 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000295
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000296 @classmethod
297 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298 from multiprocessing import forking
299 wconn.send(id)
300 if len(id) < 2:
301 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 p = cls.Process(
303 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 )
305 p.start()
306 p.join()
307
308 def test_recursion(self):
309 rconn, wconn = self.Pipe(duplex=False)
310 self._test_recursion(wconn, [])
311
312 time.sleep(DELTA)
313 result = []
314 while rconn.poll():
315 result.append(rconn.recv())
316
317 expected = [
318 [],
319 [0],
320 [0, 0],
321 [0, 1],
322 [1],
323 [1, 0],
324 [1, 1]
325 ]
326 self.assertEqual(result, expected)
327
328#
329#
330#
331
332class _UpperCaser(multiprocessing.Process):
333
334 def __init__(self):
335 multiprocessing.Process.__init__(self)
336 self.child_conn, self.parent_conn = multiprocessing.Pipe()
337
338 def run(self):
339 self.parent_conn.close()
340 for s in iter(self.child_conn.recv, None):
341 self.child_conn.send(s.upper())
342 self.child_conn.close()
343
344 def submit(self, s):
345 assert type(s) is str
346 self.parent_conn.send(s)
347 return self.parent_conn.recv()
348
349 def stop(self):
350 self.parent_conn.send(None)
351 self.parent_conn.close()
352 self.child_conn.close()
353
354class _TestSubclassingProcess(BaseTestCase):
355
356 ALLOWED_TYPES = ('processes',)
357
358 def test_subclassing(self):
359 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200360 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000361 uppercaser.start()
362 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
363 self.assertEqual(uppercaser.submit('world'), 'WORLD')
364 uppercaser.stop()
365 uppercaser.join()
366
367#
368#
369#
370
371def queue_empty(q):
372 if hasattr(q, 'empty'):
373 return q.empty()
374 else:
375 return q.qsize() == 0
376
377def queue_full(q, maxsize):
378 if hasattr(q, 'full'):
379 return q.full()
380 else:
381 return q.qsize() == maxsize
382
383
384class _TestQueue(BaseTestCase):
385
386
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000387 @classmethod
388 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000389 child_can_start.wait()
390 for i in range(6):
391 queue.get()
392 parent_can_continue.set()
393
394 def test_put(self):
395 MAXSIZE = 6
396 queue = self.Queue(maxsize=MAXSIZE)
397 child_can_start = self.Event()
398 parent_can_continue = self.Event()
399
400 proc = self.Process(
401 target=self._test_put,
402 args=(queue, child_can_start, parent_can_continue)
403 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000404 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000405 proc.start()
406
407 self.assertEqual(queue_empty(queue), True)
408 self.assertEqual(queue_full(queue, MAXSIZE), False)
409
410 queue.put(1)
411 queue.put(2, True)
412 queue.put(3, True, None)
413 queue.put(4, False)
414 queue.put(5, False, None)
415 queue.put_nowait(6)
416
417 # the values may be in buffer but not yet in pipe so sleep a bit
418 time.sleep(DELTA)
419
420 self.assertEqual(queue_empty(queue), False)
421 self.assertEqual(queue_full(queue, MAXSIZE), True)
422
423 put = TimingWrapper(queue.put)
424 put_nowait = TimingWrapper(queue.put_nowait)
425
426 self.assertRaises(Queue.Full, put, 7, False)
427 self.assertTimingAlmostEqual(put.elapsed, 0)
428
429 self.assertRaises(Queue.Full, put, 7, False, None)
430 self.assertTimingAlmostEqual(put.elapsed, 0)
431
432 self.assertRaises(Queue.Full, put_nowait, 7)
433 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
434
435 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
436 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
437
438 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
439 self.assertTimingAlmostEqual(put.elapsed, 0)
440
441 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
442 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
443
444 child_can_start.set()
445 parent_can_continue.wait()
446
447 self.assertEqual(queue_empty(queue), True)
448 self.assertEqual(queue_full(queue, MAXSIZE), False)
449
450 proc.join()
451
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000452 @classmethod
453 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000454 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000455 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000456 queue.put(2)
457 queue.put(3)
458 queue.put(4)
459 queue.put(5)
460 parent_can_continue.set()
461
462 def test_get(self):
463 queue = self.Queue()
464 child_can_start = self.Event()
465 parent_can_continue = self.Event()
466
467 proc = self.Process(
468 target=self._test_get,
469 args=(queue, child_can_start, parent_can_continue)
470 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000471 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000472 proc.start()
473
474 self.assertEqual(queue_empty(queue), True)
475
476 child_can_start.set()
477 parent_can_continue.wait()
478
479 time.sleep(DELTA)
480 self.assertEqual(queue_empty(queue), False)
481
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000482 # Hangs unexpectedly, remove for now
483 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000484 self.assertEqual(queue.get(True, None), 2)
485 self.assertEqual(queue.get(True), 3)
486 self.assertEqual(queue.get(timeout=1), 4)
487 self.assertEqual(queue.get_nowait(), 5)
488
489 self.assertEqual(queue_empty(queue), True)
490
491 get = TimingWrapper(queue.get)
492 get_nowait = TimingWrapper(queue.get_nowait)
493
494 self.assertRaises(Queue.Empty, get, False)
495 self.assertTimingAlmostEqual(get.elapsed, 0)
496
497 self.assertRaises(Queue.Empty, get, False, None)
498 self.assertTimingAlmostEqual(get.elapsed, 0)
499
500 self.assertRaises(Queue.Empty, get_nowait)
501 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
502
503 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
504 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
505
506 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
507 self.assertTimingAlmostEqual(get.elapsed, 0)
508
509 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
510 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
511
512 proc.join()
513
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000514 @classmethod
515 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000516 for i in range(10, 20):
517 queue.put(i)
518 # note that at this point the items may only be buffered, so the
519 # process cannot shutdown until the feeder thread has finished
520 # pushing items onto the pipe.
521
522 def test_fork(self):
523 # Old versions of Queue would fail to create a new feeder
524 # thread for a forked process if the original process had its
525 # own feeder thread. This test checks that this no longer
526 # happens.
527
528 queue = self.Queue()
529
530 # put items on queue so that main process starts a feeder thread
531 for i in range(10):
532 queue.put(i)
533
534 # wait to make sure thread starts before we fork a new process
535 time.sleep(DELTA)
536
537 # fork process
538 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200539 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000540 p.start()
541
542 # check that all expected items are in the queue
543 for i in range(20):
544 self.assertEqual(queue.get(), i)
545 self.assertRaises(Queue.Empty, queue.get, False)
546
547 p.join()
548
549 def test_qsize(self):
550 q = self.Queue()
551 try:
552 self.assertEqual(q.qsize(), 0)
553 except NotImplementedError:
554 return
555 q.put(1)
556 self.assertEqual(q.qsize(), 1)
557 q.put(5)
558 self.assertEqual(q.qsize(), 2)
559 q.get()
560 self.assertEqual(q.qsize(), 1)
561 q.get()
562 self.assertEqual(q.qsize(), 0)
563
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000564 @classmethod
565 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000566 for obj in iter(q.get, None):
567 time.sleep(DELTA)
568 q.task_done()
569
570 def test_task_done(self):
571 queue = self.JoinableQueue()
572
573 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000574 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000575
576 workers = [self.Process(target=self._test_task_done, args=(queue,))
577 for i in xrange(4)]
578
579 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200580 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000581 p.start()
582
583 for i in xrange(10):
584 queue.put(i)
585
586 queue.join()
587
588 for p in workers:
589 queue.put(None)
590
591 for p in workers:
592 p.join()
593
594#
595#
596#
597
598class _TestLock(BaseTestCase):
599
600 def test_lock(self):
601 lock = self.Lock()
602 self.assertEqual(lock.acquire(), True)
603 self.assertEqual(lock.acquire(False), False)
604 self.assertEqual(lock.release(), None)
605 self.assertRaises((ValueError, threading.ThreadError), lock.release)
606
607 def test_rlock(self):
608 lock = self.RLock()
609 self.assertEqual(lock.acquire(), True)
610 self.assertEqual(lock.acquire(), True)
611 self.assertEqual(lock.acquire(), True)
612 self.assertEqual(lock.release(), None)
613 self.assertEqual(lock.release(), None)
614 self.assertEqual(lock.release(), None)
615 self.assertRaises((AssertionError, RuntimeError), lock.release)
616
Jesse Noller82eb5902009-03-30 23:29:31 +0000617 def test_lock_context(self):
618 with self.Lock():
619 pass
620
Benjamin Petersondfd79492008-06-13 19:13:39 +0000621
622class _TestSemaphore(BaseTestCase):
623
624 def _test_semaphore(self, sem):
625 self.assertReturnsIfImplemented(2, get_value, sem)
626 self.assertEqual(sem.acquire(), True)
627 self.assertReturnsIfImplemented(1, get_value, sem)
628 self.assertEqual(sem.acquire(), True)
629 self.assertReturnsIfImplemented(0, get_value, sem)
630 self.assertEqual(sem.acquire(False), False)
631 self.assertReturnsIfImplemented(0, get_value, sem)
632 self.assertEqual(sem.release(), None)
633 self.assertReturnsIfImplemented(1, get_value, sem)
634 self.assertEqual(sem.release(), None)
635 self.assertReturnsIfImplemented(2, get_value, sem)
636
637 def test_semaphore(self):
638 sem = self.Semaphore(2)
639 self._test_semaphore(sem)
640 self.assertEqual(sem.release(), None)
641 self.assertReturnsIfImplemented(3, get_value, sem)
642 self.assertEqual(sem.release(), None)
643 self.assertReturnsIfImplemented(4, get_value, sem)
644
645 def test_bounded_semaphore(self):
646 sem = self.BoundedSemaphore(2)
647 self._test_semaphore(sem)
648 # Currently fails on OS/X
649 #if HAVE_GETVALUE:
650 # self.assertRaises(ValueError, sem.release)
651 # self.assertReturnsIfImplemented(2, get_value, sem)
652
653 def test_timeout(self):
654 if self.TYPE != 'processes':
655 return
656
657 sem = self.Semaphore(0)
658 acquire = TimingWrapper(sem.acquire)
659
660 self.assertEqual(acquire(False), False)
661 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
662
663 self.assertEqual(acquire(False, None), False)
664 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
665
666 self.assertEqual(acquire(False, TIMEOUT1), False)
667 self.assertTimingAlmostEqual(acquire.elapsed, 0)
668
669 self.assertEqual(acquire(True, TIMEOUT2), False)
670 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
671
672 self.assertEqual(acquire(timeout=TIMEOUT3), False)
673 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
674
675
676class _TestCondition(BaseTestCase):
677
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000678 @classmethod
679 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000680 cond.acquire()
681 sleeping.release()
682 cond.wait(timeout)
683 woken.release()
684 cond.release()
685
686 def check_invariant(self, cond):
687 # this is only supposed to succeed when there are no sleepers
688 if self.TYPE == 'processes':
689 try:
690 sleepers = (cond._sleeping_count.get_value() -
691 cond._woken_count.get_value())
692 self.assertEqual(sleepers, 0)
693 self.assertEqual(cond._wait_semaphore.get_value(), 0)
694 except NotImplementedError:
695 pass
696
697 def test_notify(self):
698 cond = self.Condition()
699 sleeping = self.Semaphore(0)
700 woken = self.Semaphore(0)
701
702 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000703 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704 p.start()
705
706 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000707 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000708 p.start()
709
710 # wait for both children to start sleeping
711 sleeping.acquire()
712 sleeping.acquire()
713
714 # check no process/thread has woken up
715 time.sleep(DELTA)
716 self.assertReturnsIfImplemented(0, get_value, woken)
717
718 # wake up one process/thread
719 cond.acquire()
720 cond.notify()
721 cond.release()
722
723 # check one process/thread has woken up
724 time.sleep(DELTA)
725 self.assertReturnsIfImplemented(1, get_value, woken)
726
727 # wake up another
728 cond.acquire()
729 cond.notify()
730 cond.release()
731
732 # check other has woken up
733 time.sleep(DELTA)
734 self.assertReturnsIfImplemented(2, get_value, woken)
735
736 # check state is not mucked up
737 self.check_invariant(cond)
738 p.join()
739
740 def test_notify_all(self):
741 cond = self.Condition()
742 sleeping = self.Semaphore(0)
743 woken = self.Semaphore(0)
744
745 # start some threads/processes which will timeout
746 for i in range(3):
747 p = self.Process(target=self.f,
748 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000749 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000750 p.start()
751
752 t = threading.Thread(target=self.f,
753 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000754 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000755 t.start()
756
757 # wait for them all to sleep
758 for i in xrange(6):
759 sleeping.acquire()
760
761 # check they have all timed out
762 for i in xrange(6):
763 woken.acquire()
764 self.assertReturnsIfImplemented(0, get_value, woken)
765
766 # check state is not mucked up
767 self.check_invariant(cond)
768
769 # start some more threads/processes
770 for i in range(3):
771 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000772 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000773 p.start()
774
775 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000776 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000777 t.start()
778
779 # wait for them to all sleep
780 for i in xrange(6):
781 sleeping.acquire()
782
783 # check no process/thread has woken up
784 time.sleep(DELTA)
785 self.assertReturnsIfImplemented(0, get_value, woken)
786
787 # wake them all up
788 cond.acquire()
789 cond.notify_all()
790 cond.release()
791
792 # check they have all woken
793 time.sleep(DELTA)
794 self.assertReturnsIfImplemented(6, get_value, woken)
795
796 # check state is not mucked up
797 self.check_invariant(cond)
798
799 def test_timeout(self):
800 cond = self.Condition()
801 wait = TimingWrapper(cond.wait)
802 cond.acquire()
803 res = wait(TIMEOUT1)
804 cond.release()
805 self.assertEqual(res, None)
806 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
807
808
809class _TestEvent(BaseTestCase):
810
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000811 @classmethod
812 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000813 time.sleep(TIMEOUT2)
814 event.set()
815
816 def test_event(self):
817 event = self.Event()
818 wait = TimingWrapper(event.wait)
819
Ezio Melottic2077b02011-03-16 12:34:31 +0200820 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000821 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000822 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000823
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000824 # Removed, threading.Event.wait() will return the value of the __flag
825 # instead of None. API Shear with the semaphore backed mp.Event
826 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000827 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000828 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000829 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
830
831 event.set()
832
833 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000834 self.assertEqual(event.is_set(), True)
835 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000836 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000837 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000838 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
839 # self.assertEqual(event.is_set(), True)
840
841 event.clear()
842
843 #self.assertEqual(event.is_set(), False)
844
Jesus Cea6f6016b2011-09-09 20:26:57 +0200845 p = self.Process(target=self._test_event, args=(event,))
846 p.daemon = True
847 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000848 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000849
850#
851#
852#
853
854class _TestValue(BaseTestCase):
855
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000856 ALLOWED_TYPES = ('processes',)
857
Benjamin Petersondfd79492008-06-13 19:13:39 +0000858 codes_values = [
859 ('i', 4343, 24234),
860 ('d', 3.625, -4.25),
861 ('h', -232, 234),
862 ('c', latin('x'), latin('y'))
863 ]
864
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000865 def setUp(self):
866 if not HAS_SHAREDCTYPES:
867 self.skipTest("requires multiprocessing.sharedctypes")
868
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000869 @classmethod
870 def _test(cls, values):
871 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000872 sv.value = cv[2]
873
874
875 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000876 if raw:
877 values = [self.RawValue(code, value)
878 for code, value, _ in self.codes_values]
879 else:
880 values = [self.Value(code, value)
881 for code, value, _ in self.codes_values]
882
883 for sv, cv in zip(values, self.codes_values):
884 self.assertEqual(sv.value, cv[1])
885
886 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200887 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000888 proc.start()
889 proc.join()
890
891 for sv, cv in zip(values, self.codes_values):
892 self.assertEqual(sv.value, cv[2])
893
894 def test_rawvalue(self):
895 self.test_value(raw=True)
896
897 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000898 val1 = self.Value('i', 5)
899 lock1 = val1.get_lock()
900 obj1 = val1.get_obj()
901
902 val2 = self.Value('i', 5, lock=None)
903 lock2 = val2.get_lock()
904 obj2 = val2.get_obj()
905
906 lock = self.Lock()
907 val3 = self.Value('i', 5, lock=lock)
908 lock3 = val3.get_lock()
909 obj3 = val3.get_obj()
910 self.assertEqual(lock, lock3)
911
Jesse Noller6ab22152009-01-18 02:45:38 +0000912 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000913 self.assertFalse(hasattr(arr4, 'get_lock'))
914 self.assertFalse(hasattr(arr4, 'get_obj'))
915
Jesse Noller6ab22152009-01-18 02:45:38 +0000916 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
917
918 arr5 = self.RawValue('i', 5)
919 self.assertFalse(hasattr(arr5, 'get_lock'))
920 self.assertFalse(hasattr(arr5, 'get_obj'))
921
Benjamin Petersondfd79492008-06-13 19:13:39 +0000922
923class _TestArray(BaseTestCase):
924
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000925 ALLOWED_TYPES = ('processes',)
926
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000927 @classmethod
928 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 for i in range(1, len(seq)):
930 seq[i] += seq[i-1]
931
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000932 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000933 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000934 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
935 if raw:
936 arr = self.RawArray('i', seq)
937 else:
938 arr = self.Array('i', seq)
939
940 self.assertEqual(len(arr), len(seq))
941 self.assertEqual(arr[3], seq[3])
942 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
943
944 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
945
946 self.assertEqual(list(arr[:]), seq)
947
948 self.f(seq)
949
950 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200951 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000952 p.start()
953 p.join()
954
955 self.assertEqual(list(arr[:]), seq)
956
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000957 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +0000958 def test_array_from_size(self):
959 size = 10
960 # Test for zeroing (see issue #11675).
961 # The repetition below strengthens the test by increasing the chances
962 # of previously allocated non-zero memory being used for the new array
963 # on the 2nd and 3rd loops.
964 for _ in range(3):
965 arr = self.Array('i', size)
966 self.assertEqual(len(arr), size)
967 self.assertEqual(list(arr), [0] * size)
968 arr[:] = range(10)
969 self.assertEqual(list(arr), range(10))
970 del arr
971
972 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000973 def test_rawarray(self):
974 self.test_array(raw=True)
975
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000976 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +0000977 def test_array_accepts_long(self):
978 arr = self.Array('i', 10L)
979 self.assertEqual(len(arr), 10)
980 raw_arr = self.RawArray('i', 10L)
981 self.assertEqual(len(raw_arr), 10)
982
983 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000984 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000985 arr1 = self.Array('i', range(10))
986 lock1 = arr1.get_lock()
987 obj1 = arr1.get_obj()
988
989 arr2 = self.Array('i', range(10), lock=None)
990 lock2 = arr2.get_lock()
991 obj2 = arr2.get_obj()
992
993 lock = self.Lock()
994 arr3 = self.Array('i', range(10), lock=lock)
995 lock3 = arr3.get_lock()
996 obj3 = arr3.get_obj()
997 self.assertEqual(lock, lock3)
998
Jesse Noller6ab22152009-01-18 02:45:38 +0000999 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001000 self.assertFalse(hasattr(arr4, 'get_lock'))
1001 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001002 self.assertRaises(AttributeError,
1003 self.Array, 'i', range(10), lock='notalock')
1004
1005 arr5 = self.RawArray('i', range(10))
1006 self.assertFalse(hasattr(arr5, 'get_lock'))
1007 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001008
1009#
1010#
1011#
1012
1013class _TestContainers(BaseTestCase):
1014
1015 ALLOWED_TYPES = ('manager',)
1016
1017 def test_list(self):
1018 a = self.list(range(10))
1019 self.assertEqual(a[:], range(10))
1020
1021 b = self.list()
1022 self.assertEqual(b[:], [])
1023
1024 b.extend(range(5))
1025 self.assertEqual(b[:], range(5))
1026
1027 self.assertEqual(b[2], 2)
1028 self.assertEqual(b[2:10], [2,3,4])
1029
1030 b *= 2
1031 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1032
1033 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1034
1035 self.assertEqual(a[:], range(10))
1036
1037 d = [a, b]
1038 e = self.list(d)
1039 self.assertEqual(
1040 e[:],
1041 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1042 )
1043
1044 f = self.list([a])
1045 a.append('hello')
1046 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1047
1048 def test_dict(self):
1049 d = self.dict()
1050 indices = range(65, 70)
1051 for i in indices:
1052 d[i] = chr(i)
1053 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1054 self.assertEqual(sorted(d.keys()), indices)
1055 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1056 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1057
1058 def test_namespace(self):
1059 n = self.Namespace()
1060 n.name = 'Bob'
1061 n.job = 'Builder'
1062 n._hidden = 'hidden'
1063 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1064 del n.job
1065 self.assertEqual(str(n), "Namespace(name='Bob')")
1066 self.assertTrue(hasattr(n, 'name'))
1067 self.assertTrue(not hasattr(n, 'job'))
1068
1069#
1070#
1071#
1072
1073def sqr(x, wait=0.0):
1074 time.sleep(wait)
1075 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +00001076class _TestPool(BaseTestCase):
1077
1078 def test_apply(self):
1079 papply = self.pool.apply
1080 self.assertEqual(papply(sqr, (5,)), sqr(5))
1081 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1082
1083 def test_map(self):
1084 pmap = self.pool.map
1085 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1086 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1087 map(sqr, range(100)))
1088
Jesse Noller7530e472009-07-16 14:23:04 +00001089 def test_map_chunksize(self):
1090 try:
1091 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1092 except multiprocessing.TimeoutError:
1093 self.fail("pool.map_async with chunksize stalled on null list")
1094
Benjamin Petersondfd79492008-06-13 19:13:39 +00001095 def test_async(self):
1096 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1097 get = TimingWrapper(res.get)
1098 self.assertEqual(get(), 49)
1099 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1100
1101 def test_async_timeout(self):
1102 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1103 get = TimingWrapper(res.get)
1104 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1105 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1106
1107 def test_imap(self):
1108 it = self.pool.imap(sqr, range(10))
1109 self.assertEqual(list(it), map(sqr, range(10)))
1110
1111 it = self.pool.imap(sqr, range(10))
1112 for i in range(10):
1113 self.assertEqual(it.next(), i*i)
1114 self.assertRaises(StopIteration, it.next)
1115
1116 it = self.pool.imap(sqr, range(1000), chunksize=100)
1117 for i in range(1000):
1118 self.assertEqual(it.next(), i*i)
1119 self.assertRaises(StopIteration, it.next)
1120
1121 def test_imap_unordered(self):
1122 it = self.pool.imap_unordered(sqr, range(1000))
1123 self.assertEqual(sorted(it), map(sqr, range(1000)))
1124
1125 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1126 self.assertEqual(sorted(it), map(sqr, range(1000)))
1127
1128 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001129 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1130 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1131
Benjamin Petersondfd79492008-06-13 19:13:39 +00001132 p = multiprocessing.Pool(3)
1133 self.assertEqual(3, len(p._pool))
1134 p.close()
1135 p.join()
1136
1137 def test_terminate(self):
1138 if self.TYPE == 'manager':
1139 # On Unix a forked process increfs each shared object to
1140 # which its parent process held a reference. If the
1141 # forked process gets terminated then there is likely to
1142 # be a reference leak. So to prevent
1143 # _TestZZZNumberOfObjects from failing we skip this test
1144 # when using a manager.
1145 return
1146
1147 result = self.pool.map_async(
1148 time.sleep, [0.1 for i in range(10000)], chunksize=1
1149 )
1150 self.pool.terminate()
1151 join = TimingWrapper(self.pool.join)
1152 join()
1153 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001154
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001155def unpickleable_result():
1156 return lambda: 42
1157
1158class _TestPoolWorkerErrors(BaseTestCase):
1159 ALLOWED_TYPES = ('processes', )
1160
1161 def test_unpickleable_result(self):
1162 from multiprocessing.pool import MaybeEncodingError
1163 p = multiprocessing.Pool(2)
1164
1165 # Make sure we don't lose pool processes because of encoding errors.
1166 for iteration in range(20):
1167 res = p.apply_async(unpickleable_result)
1168 self.assertRaises(MaybeEncodingError, res.get)
1169
1170 p.close()
1171 p.join()
1172
Jesse Noller654ade32010-01-27 03:05:57 +00001173class _TestPoolWorkerLifetime(BaseTestCase):
1174
1175 ALLOWED_TYPES = ('processes', )
1176 def test_pool_worker_lifetime(self):
1177 p = multiprocessing.Pool(3, maxtasksperchild=10)
1178 self.assertEqual(3, len(p._pool))
1179 origworkerpids = [w.pid for w in p._pool]
1180 # Run many tasks so each worker gets replaced (hopefully)
1181 results = []
1182 for i in range(100):
1183 results.append(p.apply_async(sqr, (i, )))
1184 # Fetch the results and verify we got the right answers,
1185 # also ensuring all the tasks have completed.
1186 for (j, res) in enumerate(results):
1187 self.assertEqual(res.get(), sqr(j))
1188 # Refill the pool
1189 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001190 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001191 # (countdown * DELTA = 5 seconds max startup process time)
1192 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001193 while countdown and not all(w.is_alive() for w in p._pool):
1194 countdown -= 1
1195 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001196 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001197 # All pids should be assigned. See issue #7805.
1198 self.assertNotIn(None, origworkerpids)
1199 self.assertNotIn(None, finalworkerpids)
1200 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001201 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1202 p.close()
1203 p.join()
1204
Charles-François Natali46f990e2011-10-24 18:43:51 +02001205 def test_pool_worker_lifetime_early_close(self):
1206 # Issue #10332: closing a pool whose workers have limited lifetimes
1207 # before all the tasks completed would make join() hang.
1208 p = multiprocessing.Pool(3, maxtasksperchild=1)
1209 results = []
1210 for i in range(6):
1211 results.append(p.apply_async(sqr, (i, 0.3)))
1212 p.close()
1213 p.join()
1214 # check the results
1215 for (j, res) in enumerate(results):
1216 self.assertEqual(res.get(), sqr(j))
1217
1218
Benjamin Petersondfd79492008-06-13 19:13:39 +00001219#
1220# Test that manager has expected number of shared objects left
1221#
1222
1223class _TestZZZNumberOfObjects(BaseTestCase):
1224 # Because test cases are sorted alphabetically, this one will get
1225 # run after all the other tests for the manager. It tests that
1226 # there have been no "reference leaks" for the manager's shared
1227 # objects. Note the comment in _TestPool.test_terminate().
1228 ALLOWED_TYPES = ('manager',)
1229
1230 def test_number_of_objects(self):
1231 EXPECTED_NUMBER = 1 # the pool object is still alive
1232 multiprocessing.active_children() # discard dead process objs
1233 gc.collect() # do garbage collection
1234 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001235 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001236 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001237 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001238 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001239
1240 self.assertEqual(refs, EXPECTED_NUMBER)
1241
1242#
1243# Test of creating a customized manager class
1244#
1245
1246from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1247
1248class FooBar(object):
1249 def f(self):
1250 return 'f()'
1251 def g(self):
1252 raise ValueError
1253 def _h(self):
1254 return '_h()'
1255
1256def baz():
1257 for i in xrange(10):
1258 yield i*i
1259
1260class IteratorProxy(BaseProxy):
1261 _exposed_ = ('next', '__next__')
1262 def __iter__(self):
1263 return self
1264 def next(self):
1265 return self._callmethod('next')
1266 def __next__(self):
1267 return self._callmethod('__next__')
1268
1269class MyManager(BaseManager):
1270 pass
1271
1272MyManager.register('Foo', callable=FooBar)
1273MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1274MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1275
1276
1277class _TestMyManager(BaseTestCase):
1278
1279 ALLOWED_TYPES = ('manager',)
1280
1281 def test_mymanager(self):
1282 manager = MyManager()
1283 manager.start()
1284
1285 foo = manager.Foo()
1286 bar = manager.Bar()
1287 baz = manager.baz()
1288
1289 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1290 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1291
1292 self.assertEqual(foo_methods, ['f', 'g'])
1293 self.assertEqual(bar_methods, ['f', '_h'])
1294
1295 self.assertEqual(foo.f(), 'f()')
1296 self.assertRaises(ValueError, foo.g)
1297 self.assertEqual(foo._callmethod('f'), 'f()')
1298 self.assertRaises(RemoteError, foo._callmethod, '_h')
1299
1300 self.assertEqual(bar.f(), 'f()')
1301 self.assertEqual(bar._h(), '_h()')
1302 self.assertEqual(bar._callmethod('f'), 'f()')
1303 self.assertEqual(bar._callmethod('_h'), '_h()')
1304
1305 self.assertEqual(list(baz), [i*i for i in range(10)])
1306
1307 manager.shutdown()
1308
1309#
1310# Test of connecting to a remote server and using xmlrpclib for serialization
1311#
1312
1313_queue = Queue.Queue()
1314def get_queue():
1315 return _queue
1316
1317class QueueManager(BaseManager):
1318 '''manager class used by server process'''
1319QueueManager.register('get_queue', callable=get_queue)
1320
1321class QueueManager2(BaseManager):
1322 '''manager class which specifies the same interface as QueueManager'''
1323QueueManager2.register('get_queue')
1324
1325
1326SERIALIZER = 'xmlrpclib'
1327
1328class _TestRemoteManager(BaseTestCase):
1329
1330 ALLOWED_TYPES = ('manager',)
1331
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001332 @classmethod
1333 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001334 manager = QueueManager2(
1335 address=address, authkey=authkey, serializer=SERIALIZER
1336 )
1337 manager.connect()
1338 queue = manager.get_queue()
1339 queue.put(('hello world', None, True, 2.25))
1340
1341 def test_remote(self):
1342 authkey = os.urandom(32)
1343
1344 manager = QueueManager(
1345 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1346 )
1347 manager.start()
1348
1349 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001350 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001351 p.start()
1352
1353 manager2 = QueueManager2(
1354 address=manager.address, authkey=authkey, serializer=SERIALIZER
1355 )
1356 manager2.connect()
1357 queue = manager2.get_queue()
1358
1359 # Note that xmlrpclib will deserialize object as a list not a tuple
1360 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1361
1362 # Because we are using xmlrpclib for serialization instead of
1363 # pickle this will cause a serialization error.
1364 self.assertRaises(Exception, queue.put, time.sleep)
1365
1366 # Make queue finalizer run before the server is stopped
1367 del queue
1368 manager.shutdown()
1369
Jesse Noller459a6482009-03-30 15:50:42 +00001370class _TestManagerRestart(BaseTestCase):
1371
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001372 @classmethod
1373 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001374 manager = QueueManager(
1375 address=address, authkey=authkey, serializer=SERIALIZER)
1376 manager.connect()
1377 queue = manager.get_queue()
1378 queue.put('hello world')
1379
1380 def test_rapid_restart(self):
1381 authkey = os.urandom(32)
1382 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001383 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001384 srvr = manager.get_server()
1385 addr = srvr.address
1386 # Close the connection.Listener socket which gets opened as a part
1387 # of manager.get_server(). It's not needed for the test.
1388 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001389 manager.start()
1390
1391 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001392 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001393 p.start()
1394 queue = manager.get_queue()
1395 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001396 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001397 manager.shutdown()
1398 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001399 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001400 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001401 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001402
Benjamin Petersondfd79492008-06-13 19:13:39 +00001403#
1404#
1405#
1406
1407SENTINEL = latin('')
1408
1409class _TestConnection(BaseTestCase):
1410
1411 ALLOWED_TYPES = ('processes', 'threads')
1412
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001413 @classmethod
1414 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001415 for msg in iter(conn.recv_bytes, SENTINEL):
1416 conn.send_bytes(msg)
1417 conn.close()
1418
1419 def test_connection(self):
1420 conn, child_conn = self.Pipe()
1421
1422 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001423 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001424 p.start()
1425
1426 seq = [1, 2.25, None]
1427 msg = latin('hello world')
1428 longmsg = msg * 10
1429 arr = array.array('i', range(4))
1430
1431 if self.TYPE == 'processes':
1432 self.assertEqual(type(conn.fileno()), int)
1433
1434 self.assertEqual(conn.send(seq), None)
1435 self.assertEqual(conn.recv(), seq)
1436
1437 self.assertEqual(conn.send_bytes(msg), None)
1438 self.assertEqual(conn.recv_bytes(), msg)
1439
1440 if self.TYPE == 'processes':
1441 buffer = array.array('i', [0]*10)
1442 expected = list(arr) + [0] * (10 - len(arr))
1443 self.assertEqual(conn.send_bytes(arr), None)
1444 self.assertEqual(conn.recv_bytes_into(buffer),
1445 len(arr) * buffer.itemsize)
1446 self.assertEqual(list(buffer), expected)
1447
1448 buffer = array.array('i', [0]*10)
1449 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1450 self.assertEqual(conn.send_bytes(arr), None)
1451 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1452 len(arr) * buffer.itemsize)
1453 self.assertEqual(list(buffer), expected)
1454
1455 buffer = bytearray(latin(' ' * 40))
1456 self.assertEqual(conn.send_bytes(longmsg), None)
1457 try:
1458 res = conn.recv_bytes_into(buffer)
1459 except multiprocessing.BufferTooShort, e:
1460 self.assertEqual(e.args, (longmsg,))
1461 else:
1462 self.fail('expected BufferTooShort, got %s' % res)
1463
1464 poll = TimingWrapper(conn.poll)
1465
1466 self.assertEqual(poll(), False)
1467 self.assertTimingAlmostEqual(poll.elapsed, 0)
1468
1469 self.assertEqual(poll(TIMEOUT1), False)
1470 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1471
1472 conn.send(None)
1473
1474 self.assertEqual(poll(TIMEOUT1), True)
1475 self.assertTimingAlmostEqual(poll.elapsed, 0)
1476
1477 self.assertEqual(conn.recv(), None)
1478
1479 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1480 conn.send_bytes(really_big_msg)
1481 self.assertEqual(conn.recv_bytes(), really_big_msg)
1482
1483 conn.send_bytes(SENTINEL) # tell child to quit
1484 child_conn.close()
1485
1486 if self.TYPE == 'processes':
1487 self.assertEqual(conn.readable, True)
1488 self.assertEqual(conn.writable, True)
1489 self.assertRaises(EOFError, conn.recv)
1490 self.assertRaises(EOFError, conn.recv_bytes)
1491
1492 p.join()
1493
1494 def test_duplex_false(self):
1495 reader, writer = self.Pipe(duplex=False)
1496 self.assertEqual(writer.send(1), None)
1497 self.assertEqual(reader.recv(), 1)
1498 if self.TYPE == 'processes':
1499 self.assertEqual(reader.readable, True)
1500 self.assertEqual(reader.writable, False)
1501 self.assertEqual(writer.readable, False)
1502 self.assertEqual(writer.writable, True)
1503 self.assertRaises(IOError, reader.send, 2)
1504 self.assertRaises(IOError, writer.recv)
1505 self.assertRaises(IOError, writer.poll)
1506
1507 def test_spawn_close(self):
1508 # We test that a pipe connection can be closed by parent
1509 # process immediately after child is spawned. On Windows this
1510 # would have sometimes failed on old versions because
1511 # child_conn would be closed before the child got a chance to
1512 # duplicate it.
1513 conn, child_conn = self.Pipe()
1514
1515 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001516 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001517 p.start()
1518 child_conn.close() # this might complete before child initializes
1519
1520 msg = latin('hello')
1521 conn.send_bytes(msg)
1522 self.assertEqual(conn.recv_bytes(), msg)
1523
1524 conn.send_bytes(SENTINEL)
1525 conn.close()
1526 p.join()
1527
1528 def test_sendbytes(self):
1529 if self.TYPE != 'processes':
1530 return
1531
1532 msg = latin('abcdefghijklmnopqrstuvwxyz')
1533 a, b = self.Pipe()
1534
1535 a.send_bytes(msg)
1536 self.assertEqual(b.recv_bytes(), msg)
1537
1538 a.send_bytes(msg, 5)
1539 self.assertEqual(b.recv_bytes(), msg[5:])
1540
1541 a.send_bytes(msg, 7, 8)
1542 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1543
1544 a.send_bytes(msg, 26)
1545 self.assertEqual(b.recv_bytes(), latin(''))
1546
1547 a.send_bytes(msg, 26, 0)
1548 self.assertEqual(b.recv_bytes(), latin(''))
1549
1550 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1551
1552 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1553
1554 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1555
1556 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1557
1558 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1559
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001560 @classmethod
1561 def _is_fd_assigned(cls, fd):
1562 try:
1563 os.fstat(fd)
1564 except OSError as e:
1565 if e.errno == errno.EBADF:
1566 return False
1567 raise
1568 else:
1569 return True
1570
1571 @classmethod
1572 def _writefd(cls, conn, data, create_dummy_fds=False):
1573 if create_dummy_fds:
1574 for i in range(0, 256):
1575 if not cls._is_fd_assigned(i):
1576 os.dup2(conn.fileno(), i)
1577 fd = reduction.recv_handle(conn)
1578 if msvcrt:
1579 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1580 os.write(fd, data)
1581 os.close(fd)
1582
Charles-François Natalif8413b22011-09-21 18:44:49 +02001583 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001584 def test_fd_transfer(self):
1585 if self.TYPE != 'processes':
1586 self.skipTest("only makes sense with processes")
1587 conn, child_conn = self.Pipe(duplex=True)
1588
1589 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001590 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001591 p.start()
1592 with open(test_support.TESTFN, "wb") as f:
1593 fd = f.fileno()
1594 if msvcrt:
1595 fd = msvcrt.get_osfhandle(fd)
1596 reduction.send_handle(conn, fd, p.pid)
1597 p.join()
1598 with open(test_support.TESTFN, "rb") as f:
1599 self.assertEqual(f.read(), b"foo")
1600
Charles-François Natalif8413b22011-09-21 18:44:49 +02001601 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001602 @unittest.skipIf(sys.platform == "win32",
1603 "test semantics don't make sense on Windows")
1604 @unittest.skipIf(MAXFD <= 256,
1605 "largest assignable fd number is too small")
1606 @unittest.skipUnless(hasattr(os, "dup2"),
1607 "test needs os.dup2()")
1608 def test_large_fd_transfer(self):
1609 # With fd > 256 (issue #11657)
1610 if self.TYPE != 'processes':
1611 self.skipTest("only makes sense with processes")
1612 conn, child_conn = self.Pipe(duplex=True)
1613
1614 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001615 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001616 p.start()
1617 with open(test_support.TESTFN, "wb") as f:
1618 fd = f.fileno()
1619 for newfd in range(256, MAXFD):
1620 if not self._is_fd_assigned(newfd):
1621 break
1622 else:
1623 self.fail("could not find an unassigned large file descriptor")
1624 os.dup2(fd, newfd)
1625 try:
1626 reduction.send_handle(conn, newfd, p.pid)
1627 finally:
1628 os.close(newfd)
1629 p.join()
1630 with open(test_support.TESTFN, "rb") as f:
1631 self.assertEqual(f.read(), b"bar")
1632
Jesus Ceac23484b2011-09-21 03:47:39 +02001633 @classmethod
1634 def _send_data_without_fd(self, conn):
1635 os.write(conn.fileno(), b"\0")
1636
Charles-François Natalif8413b22011-09-21 18:44:49 +02001637 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001638 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1639 def test_missing_fd_transfer(self):
1640 # Check that exception is raised when received data is not
1641 # accompanied by a file descriptor in ancillary data.
1642 if self.TYPE != 'processes':
1643 self.skipTest("only makes sense with processes")
1644 conn, child_conn = self.Pipe(duplex=True)
1645
1646 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1647 p.daemon = True
1648 p.start()
1649 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1650 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001651
Benjamin Petersondfd79492008-06-13 19:13:39 +00001652class _TestListenerClient(BaseTestCase):
1653
1654 ALLOWED_TYPES = ('processes', 'threads')
1655
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001656 @classmethod
1657 def _test(cls, address):
1658 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001659 conn.send('hello')
1660 conn.close()
1661
1662 def test_listener_client(self):
1663 for family in self.connection.families:
1664 l = self.connection.Listener(family=family)
1665 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001666 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001667 p.start()
1668 conn = l.accept()
1669 self.assertEqual(conn.recv(), 'hello')
1670 p.join()
1671 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001672
1673 def test_issue14725(self):
1674 l = self.connection.Listener()
1675 p = self.Process(target=self._test, args=(l.address,))
1676 p.daemon = True
1677 p.start()
1678 time.sleep(1)
1679 # On Windows the client process should by now have connected,
1680 # written data and closed the pipe handle by now. This causes
1681 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1682 # 14725.
1683 conn = l.accept()
1684 self.assertEqual(conn.recv(), 'hello')
1685 conn.close()
1686 p.join()
1687 l.close()
1688
Benjamin Petersondfd79492008-06-13 19:13:39 +00001689#
1690# Test of sending connection and socket objects between processes
1691#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001692"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001693class _TestPicklingConnections(BaseTestCase):
1694
1695 ALLOWED_TYPES = ('processes',)
1696
1697 def _listener(self, conn, families):
1698 for fam in families:
1699 l = self.connection.Listener(family=fam)
1700 conn.send(l.address)
1701 new_conn = l.accept()
1702 conn.send(new_conn)
1703
1704 if self.TYPE == 'processes':
1705 l = socket.socket()
1706 l.bind(('localhost', 0))
1707 conn.send(l.getsockname())
1708 l.listen(1)
1709 new_conn, addr = l.accept()
1710 conn.send(new_conn)
1711
1712 conn.recv()
1713
1714 def _remote(self, conn):
1715 for (address, msg) in iter(conn.recv, None):
1716 client = self.connection.Client(address)
1717 client.send(msg.upper())
1718 client.close()
1719
1720 if self.TYPE == 'processes':
1721 address, msg = conn.recv()
1722 client = socket.socket()
1723 client.connect(address)
1724 client.sendall(msg.upper())
1725 client.close()
1726
1727 conn.close()
1728
1729 def test_pickling(self):
1730 try:
1731 multiprocessing.allow_connection_pickling()
1732 except ImportError:
1733 return
1734
1735 families = self.connection.families
1736
1737 lconn, lconn0 = self.Pipe()
1738 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001739 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001740 lp.start()
1741 lconn0.close()
1742
1743 rconn, rconn0 = self.Pipe()
1744 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001745 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001746 rp.start()
1747 rconn0.close()
1748
1749 for fam in families:
1750 msg = ('This connection uses family %s' % fam).encode('ascii')
1751 address = lconn.recv()
1752 rconn.send((address, msg))
1753 new_conn = lconn.recv()
1754 self.assertEqual(new_conn.recv(), msg.upper())
1755
1756 rconn.send(None)
1757
1758 if self.TYPE == 'processes':
1759 msg = latin('This connection uses a normal socket')
1760 address = lconn.recv()
1761 rconn.send((address, msg))
1762 if hasattr(socket, 'fromfd'):
1763 new_conn = lconn.recv()
1764 self.assertEqual(new_conn.recv(100), msg.upper())
1765 else:
1766 # XXX On Windows with Py2.6 need to backport fromfd()
1767 discard = lconn.recv_bytes()
1768
1769 lconn.send(None)
1770
1771 rconn.close()
1772 lconn.close()
1773
1774 lp.join()
1775 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001776"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001777#
1778#
1779#
1780
1781class _TestHeap(BaseTestCase):
1782
1783 ALLOWED_TYPES = ('processes',)
1784
1785 def test_heap(self):
1786 iterations = 5000
1787 maxblocks = 50
1788 blocks = []
1789
1790 # create and destroy lots of blocks of different sizes
1791 for i in xrange(iterations):
1792 size = int(random.lognormvariate(0, 1) * 1000)
1793 b = multiprocessing.heap.BufferWrapper(size)
1794 blocks.append(b)
1795 if len(blocks) > maxblocks:
1796 i = random.randrange(maxblocks)
1797 del blocks[i]
1798
1799 # get the heap object
1800 heap = multiprocessing.heap.BufferWrapper._heap
1801
1802 # verify the state of the heap
1803 all = []
1804 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001805 heap._lock.acquire()
1806 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001807 for L in heap._len_to_seq.values():
1808 for arena, start, stop in L:
1809 all.append((heap._arenas.index(arena), start, stop,
1810 stop-start, 'free'))
1811 for arena, start, stop in heap._allocated_blocks:
1812 all.append((heap._arenas.index(arena), start, stop,
1813 stop-start, 'occupied'))
1814 occupied += (stop-start)
1815
1816 all.sort()
1817
1818 for i in range(len(all)-1):
1819 (arena, start, stop) = all[i][:3]
1820 (narena, nstart, nstop) = all[i+1][:3]
1821 self.assertTrue((arena != narena and nstart == 0) or
1822 (stop == nstart))
1823
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001824 def test_free_from_gc(self):
1825 # Check that freeing of blocks by the garbage collector doesn't deadlock
1826 # (issue #12352).
1827 # Make sure the GC is enabled, and set lower collection thresholds to
1828 # make collections more frequent (and increase the probability of
1829 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001830 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001831 gc.enable()
1832 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02001833 thresholds = gc.get_threshold()
1834 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001835 gc.set_threshold(10)
1836
1837 # perform numerous block allocations, with cyclic references to make
1838 # sure objects are collected asynchronously by the gc
1839 for i in range(5000):
1840 a = multiprocessing.heap.BufferWrapper(1)
1841 b = multiprocessing.heap.BufferWrapper(1)
1842 # circular references
1843 a.buddy = b
1844 b.buddy = a
1845
Benjamin Petersondfd79492008-06-13 19:13:39 +00001846#
1847#
1848#
1849
Benjamin Petersondfd79492008-06-13 19:13:39 +00001850class _Foo(Structure):
1851 _fields_ = [
1852 ('x', c_int),
1853 ('y', c_double)
1854 ]
1855
1856class _TestSharedCTypes(BaseTestCase):
1857
1858 ALLOWED_TYPES = ('processes',)
1859
Antoine Pitrou55d935a2010-11-22 16:35:57 +00001860 def setUp(self):
1861 if not HAS_SHAREDCTYPES:
1862 self.skipTest("requires multiprocessing.sharedctypes")
1863
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001864 @classmethod
1865 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001866 x.value *= 2
1867 y.value *= 2
1868 foo.x *= 2
1869 foo.y *= 2
1870 string.value *= 2
1871 for i in range(len(arr)):
1872 arr[i] *= 2
1873
1874 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001875 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001876 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001877 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00001878 arr = self.Array('d', range(10), lock=lock)
1879 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00001880 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001881
1882 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001883 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001884 p.start()
1885 p.join()
1886
1887 self.assertEqual(x.value, 14)
1888 self.assertAlmostEqual(y.value, 2.0/3.0)
1889 self.assertEqual(foo.x, 6)
1890 self.assertAlmostEqual(foo.y, 4.0)
1891 for i in range(10):
1892 self.assertAlmostEqual(arr[i], i*2)
1893 self.assertEqual(string.value, latin('hellohello'))
1894
1895 def test_synchronize(self):
1896 self.test_sharedctypes(lock=True)
1897
1898 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001899 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00001900 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001901 foo.x = 0
1902 foo.y = 0
1903 self.assertEqual(bar.x, 2)
1904 self.assertAlmostEqual(bar.y, 5.0)
1905
1906#
1907#
1908#
1909
1910class _TestFinalize(BaseTestCase):
1911
1912 ALLOWED_TYPES = ('processes',)
1913
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001914 @classmethod
1915 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001916 class Foo(object):
1917 pass
1918
1919 a = Foo()
1920 util.Finalize(a, conn.send, args=('a',))
1921 del a # triggers callback for a
1922
1923 b = Foo()
1924 close_b = util.Finalize(b, conn.send, args=('b',))
1925 close_b() # triggers callback for b
1926 close_b() # does nothing because callback has already been called
1927 del b # does nothing because callback has already been called
1928
1929 c = Foo()
1930 util.Finalize(c, conn.send, args=('c',))
1931
1932 d10 = Foo()
1933 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1934
1935 d01 = Foo()
1936 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1937 d02 = Foo()
1938 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1939 d03 = Foo()
1940 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1941
1942 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1943
1944 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1945
Ezio Melottic2077b02011-03-16 12:34:31 +02001946 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00001947 # garbage collecting locals
1948 util._exit_function()
1949 conn.close()
1950 os._exit(0)
1951
1952 def test_finalize(self):
1953 conn, child_conn = self.Pipe()
1954
1955 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001956 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001957 p.start()
1958 p.join()
1959
1960 result = [obj for obj in iter(conn.recv, 'STOP')]
1961 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1962
1963#
1964# Test that from ... import * works for each module
1965#
1966
1967class _TestImportStar(BaseTestCase):
1968
1969 ALLOWED_TYPES = ('processes',)
1970
1971 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001972 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00001973 'multiprocessing', 'multiprocessing.connection',
1974 'multiprocessing.heap', 'multiprocessing.managers',
1975 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00001976 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001977 ]
1978
Charles-François Natalif8413b22011-09-21 18:44:49 +02001979 if HAS_REDUCTION:
1980 modules.append('multiprocessing.reduction')
1981
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001982 if c_int is not None:
1983 # This module requires _ctypes
1984 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00001985
1986 for name in modules:
1987 __import__(name)
1988 mod = sys.modules[name]
1989
1990 for attr in getattr(mod, '__all__', ()):
1991 self.assertTrue(
1992 hasattr(mod, attr),
1993 '%r does not have attribute %r' % (mod, attr)
1994 )
1995
1996#
1997# Quick test that logging works -- does not test logging output
1998#
1999
2000class _TestLogging(BaseTestCase):
2001
2002 ALLOWED_TYPES = ('processes',)
2003
2004 def test_enable_logging(self):
2005 logger = multiprocessing.get_logger()
2006 logger.setLevel(util.SUBWARNING)
2007 self.assertTrue(logger is not None)
2008 logger.debug('this will not be printed')
2009 logger.info('nor will this')
2010 logger.setLevel(LOG_LEVEL)
2011
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002012 @classmethod
2013 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002014 logger = multiprocessing.get_logger()
2015 conn.send(logger.getEffectiveLevel())
2016
2017 def test_level(self):
2018 LEVEL1 = 32
2019 LEVEL2 = 37
2020
2021 logger = multiprocessing.get_logger()
2022 root_logger = logging.getLogger()
2023 root_level = root_logger.level
2024
2025 reader, writer = multiprocessing.Pipe(duplex=False)
2026
2027 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002028 p = self.Process(target=self._test_level, args=(writer,))
2029 p.daemon = True
2030 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002031 self.assertEqual(LEVEL1, reader.recv())
2032
2033 logger.setLevel(logging.NOTSET)
2034 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002035 p = self.Process(target=self._test_level, args=(writer,))
2036 p.daemon = True
2037 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002038 self.assertEqual(LEVEL2, reader.recv())
2039
2040 root_logger.setLevel(root_level)
2041 logger.setLevel(level=LOG_LEVEL)
2042
Jesse Noller814d02d2009-11-21 14:38:23 +00002043
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002044# class _TestLoggingProcessName(BaseTestCase):
2045#
2046# def handle(self, record):
2047# assert record.processName == multiprocessing.current_process().name
2048# self.__handled = True
2049#
2050# def test_logging(self):
2051# handler = logging.Handler()
2052# handler.handle = self.handle
2053# self.__handled = False
2054# # Bypass getLogger() and side-effects
2055# logger = logging.getLoggerClass()(
2056# 'multiprocessing.test.TestLoggingProcessName')
2057# logger.addHandler(handler)
2058# logger.propagate = False
2059#
2060# logger.warn('foo')
2061# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002062
Benjamin Petersondfd79492008-06-13 19:13:39 +00002063#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002064# Test to verify handle verification, see issue 3321
2065#
2066
2067class TestInvalidHandle(unittest.TestCase):
2068
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002069 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002070 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002071 conn = _multiprocessing.Connection(44977608)
2072 self.assertRaises(IOError, conn.poll)
2073 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002074
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002075#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002076# Functions used to create test cases from the base ones in this module
2077#
2078
2079def get_attributes(Source, names):
2080 d = {}
2081 for name in names:
2082 obj = getattr(Source, name)
2083 if type(obj) == type(get_attributes):
2084 obj = staticmethod(obj)
2085 d[name] = obj
2086 return d
2087
2088def create_test_cases(Mixin, type):
2089 result = {}
2090 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002091 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002092
2093 for name in glob.keys():
2094 if name.startswith('_Test'):
2095 base = glob[name]
2096 if type in base.ALLOWED_TYPES:
2097 newname = 'With' + Type + name[1:]
2098 class Temp(base, unittest.TestCase, Mixin):
2099 pass
2100 result[newname] = Temp
2101 Temp.__name__ = newname
2102 Temp.__module__ = Mixin.__module__
2103 return result
2104
2105#
2106# Create test cases
2107#
2108
2109class ProcessesMixin(object):
2110 TYPE = 'processes'
2111 Process = multiprocessing.Process
2112 locals().update(get_attributes(multiprocessing, (
2113 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2114 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2115 'RawArray', 'current_process', 'active_children', 'Pipe',
2116 'connection', 'JoinableQueue'
2117 )))
2118
2119testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2120globals().update(testcases_processes)
2121
2122
2123class ManagerMixin(object):
2124 TYPE = 'manager'
2125 Process = multiprocessing.Process
2126 manager = object.__new__(multiprocessing.managers.SyncManager)
2127 locals().update(get_attributes(manager, (
2128 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2129 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2130 'Namespace', 'JoinableQueue'
2131 )))
2132
2133testcases_manager = create_test_cases(ManagerMixin, type='manager')
2134globals().update(testcases_manager)
2135
2136
2137class ThreadsMixin(object):
2138 TYPE = 'threads'
2139 Process = multiprocessing.dummy.Process
2140 locals().update(get_attributes(multiprocessing.dummy, (
2141 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2142 'Condition', 'Event', 'Value', 'Array', 'current_process',
2143 'active_children', 'Pipe', 'connection', 'dict', 'list',
2144 'Namespace', 'JoinableQueue'
2145 )))
2146
2147testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2148globals().update(testcases_threads)
2149
Neal Norwitz0c519b32008-08-25 01:50:24 +00002150class OtherTest(unittest.TestCase):
2151 # TODO: add more tests for deliver/answer challenge.
2152 def test_deliver_challenge_auth_failure(self):
2153 class _FakeConnection(object):
2154 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002155 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002156 def send_bytes(self, data):
2157 pass
2158 self.assertRaises(multiprocessing.AuthenticationError,
2159 multiprocessing.connection.deliver_challenge,
2160 _FakeConnection(), b'abc')
2161
2162 def test_answer_challenge_auth_failure(self):
2163 class _FakeConnection(object):
2164 def __init__(self):
2165 self.count = 0
2166 def recv_bytes(self, size):
2167 self.count += 1
2168 if self.count == 1:
2169 return multiprocessing.connection.CHALLENGE
2170 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002171 return b'something bogus'
2172 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002173 def send_bytes(self, data):
2174 pass
2175 self.assertRaises(multiprocessing.AuthenticationError,
2176 multiprocessing.connection.answer_challenge,
2177 _FakeConnection(), b'abc')
2178
Jesse Noller7152f6d2009-04-02 05:17:26 +00002179#
2180# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2181#
2182
2183def initializer(ns):
2184 ns.test += 1
2185
2186class TestInitializers(unittest.TestCase):
2187 def setUp(self):
2188 self.mgr = multiprocessing.Manager()
2189 self.ns = self.mgr.Namespace()
2190 self.ns.test = 0
2191
2192 def tearDown(self):
2193 self.mgr.shutdown()
2194
2195 def test_manager_initializer(self):
2196 m = multiprocessing.managers.SyncManager()
2197 self.assertRaises(TypeError, m.start, 1)
2198 m.start(initializer, (self.ns,))
2199 self.assertEqual(self.ns.test, 1)
2200 m.shutdown()
2201
2202 def test_pool_initializer(self):
2203 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2204 p = multiprocessing.Pool(1, initializer, (self.ns,))
2205 p.close()
2206 p.join()
2207 self.assertEqual(self.ns.test, 1)
2208
Jesse Noller1b90efb2009-06-30 17:11:52 +00002209#
2210# Issue 5155, 5313, 5331: Test process in processes
2211# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2212#
2213
2214def _ThisSubProcess(q):
2215 try:
2216 item = q.get(block=False)
2217 except Queue.Empty:
2218 pass
2219
2220def _TestProcess(q):
2221 queue = multiprocessing.Queue()
2222 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002223 subProc.daemon = True
Jesse Noller1b90efb2009-06-30 17:11:52 +00002224 subProc.start()
2225 subProc.join()
2226
2227def _afunc(x):
2228 return x*x
2229
2230def pool_in_process():
2231 pool = multiprocessing.Pool(processes=4)
2232 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2233
2234class _file_like(object):
2235 def __init__(self, delegate):
2236 self._delegate = delegate
2237 self._pid = None
2238
2239 @property
2240 def cache(self):
2241 pid = os.getpid()
2242 # There are no race conditions since fork keeps only the running thread
2243 if pid != self._pid:
2244 self._pid = pid
2245 self._cache = []
2246 return self._cache
2247
2248 def write(self, data):
2249 self.cache.append(data)
2250
2251 def flush(self):
2252 self._delegate.write(''.join(self.cache))
2253 self._cache = []
2254
2255class TestStdinBadfiledescriptor(unittest.TestCase):
2256
2257 def test_queue_in_process(self):
2258 queue = multiprocessing.Queue()
2259 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2260 proc.start()
2261 proc.join()
2262
2263 def test_pool_in_process(self):
2264 p = multiprocessing.Process(target=pool_in_process)
2265 p.start()
2266 p.join()
2267
2268 def test_flushing(self):
2269 sio = StringIO()
2270 flike = _file_like(sio)
2271 flike.write('foo')
2272 proc = multiprocessing.Process(target=lambda: flike.flush())
2273 flike.flush()
2274 assert sio.getvalue() == 'foo'
2275
2276testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2277 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002278
Benjamin Petersondfd79492008-06-13 19:13:39 +00002279#
2280#
2281#
2282
2283def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002284 if sys.platform.startswith("linux"):
2285 try:
2286 lock = multiprocessing.RLock()
2287 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002288 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002289
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002290 check_enough_semaphores()
2291
Benjamin Petersondfd79492008-06-13 19:13:39 +00002292 if run is None:
2293 from test.test_support import run_unittest as run
2294
2295 util.get_temp_dir() # creates temp directory for use by all processes
2296
2297 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2298
Jesse Noller146b7ab2008-07-02 16:44:09 +00002299 ProcessesMixin.pool = multiprocessing.Pool(4)
2300 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2301 ManagerMixin.manager.__init__()
2302 ManagerMixin.manager.start()
2303 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002304
2305 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002306 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2307 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002308 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2309 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002310 )
2311
2312 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2313 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002314 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2315 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002316 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002317 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002318 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002319 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2320 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2321 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002322 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002323
Jesse Noller146b7ab2008-07-02 16:44:09 +00002324 ThreadsMixin.pool.terminate()
2325 ProcessesMixin.pool.terminate()
2326 ManagerMixin.pool.terminate()
2327 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002328
Jesse Noller146b7ab2008-07-02 16:44:09 +00002329 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002330
2331def main():
2332 test_main(unittest.TextTestRunner(verbosity=2).run)
2333
2334if __name__ == '__main__':
2335 main()