blob: b293e2fbc4ea9afa5bbbf600002604e8413e6d11 [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Antoine Pitrou5084ff72017-03-24 16:03:46 +010017import weakref
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010018import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
Antoine Pitrou12536bd2017-06-28 13:48:38 +0200178class DummyCallable(object):
179 def __call__(self, q, c):
180 assert isinstance(c, DummyCallable)
181 q.put(5)
182
183
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184class _TestProcess(BaseTestCase):
185
186 ALLOWED_TYPES = ('processes', 'threads')
187
188 def test_current(self):
189 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600190 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000191
192 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194
195 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000196 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000197 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000199 self.assertEqual(current.ident, os.getpid())
200 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 @classmethod
203 def _test(cls, q, *args, **kwds):
204 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205 q.put(args)
206 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000207 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000208 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000209 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000210 q.put(current.pid)
211
212 def test_process(self):
213 q = self.Queue(1)
214 e = self.Event()
215 args = (q, 1, 2)
216 kwargs = {'hello':23, 'bye':2.54}
217 name = 'SomeProcess'
218 p = self.Process(
219 target=self._test, args=args, kwargs=kwargs, name=name
220 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000221 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 current = self.current_process()
223
224 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000225 self.assertEqual(p.authkey, current.authkey)
226 self.assertEqual(p.is_alive(), False)
227 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000228 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000229 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000230 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
232 p.start()
233
Ezio Melotti2623a372010-11-21 13:34:58 +0000234 self.assertEqual(p.exitcode, None)
235 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000236 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000237
Ezio Melotti2623a372010-11-21 13:34:58 +0000238 self.assertEqual(q.get(), args[1:])
239 self.assertEqual(q.get(), kwargs)
240 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000241 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000242 self.assertEqual(q.get(), current.authkey)
243 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
245 p.join()
246
Ezio Melotti2623a372010-11-21 13:34:58 +0000247 self.assertEqual(p.exitcode, 0)
248 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000249 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000250
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000251 @classmethod
252 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000253 time.sleep(1000)
254
255 def test_terminate(self):
256 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600257 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000260 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261 p.start()
262
263 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000264 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000265 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000266
267 p.terminate()
268
269 join = TimingWrapper(p.join)
270 self.assertEqual(join(), None)
271 self.assertTimingAlmostEqual(join.elapsed, 0.0)
272
273 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000274 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000275
276 p.join()
277
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000278 # XXX sometimes get p.exitcode == 0 on Windows ...
279 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000280
281 def test_cpu_count(self):
282 try:
283 cpus = multiprocessing.cpu_count()
284 except NotImplementedError:
285 cpus = 1
286 self.assertTrue(type(cpus) is int)
287 self.assertTrue(cpus >= 1)
288
289 def test_active_children(self):
290 self.assertEqual(type(self.active_children()), list)
291
292 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000294
Jesus Cea6f6016b2011-09-09 20:26:57 +0200295 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000296 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000297 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298
299 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000300 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000301
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 @classmethod
303 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 from multiprocessing import forking
305 wconn.send(id)
306 if len(id) < 2:
307 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000308 p = cls.Process(
309 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000310 )
311 p.start()
312 p.join()
313
314 def test_recursion(self):
315 rconn, wconn = self.Pipe(duplex=False)
316 self._test_recursion(wconn, [])
317
318 time.sleep(DELTA)
319 result = []
320 while rconn.poll():
321 result.append(rconn.recv())
322
323 expected = [
324 [],
325 [0],
326 [0, 0],
327 [0, 1],
328 [1],
329 [1, 0],
330 [1, 1]
331 ]
332 self.assertEqual(result, expected)
333
Richard Oudkerk2182e052012-06-06 19:01:14 +0100334 @classmethod
335 def _test_sys_exit(cls, reason, testfn):
336 sys.stderr = open(testfn, 'w')
337 sys.exit(reason)
338
339 def test_sys_exit(self):
340 # See Issue 13854
341 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600342 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100343
344 testfn = test_support.TESTFN
345 self.addCleanup(test_support.unlink, testfn)
346
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000347 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100348 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
349 p.daemon = True
350 p.start()
351 p.join(5)
352 self.assertEqual(p.exitcode, code)
353
354 with open(testfn, 'r') as f:
355 self.assertEqual(f.read().rstrip(), str(reason))
356
357 for reason in (True, False, 8):
358 p = self.Process(target=sys.exit, args=(reason,))
359 p.daemon = True
360 p.start()
361 p.join(5)
362 self.assertEqual(p.exitcode, reason)
363
Antoine Pitrou12536bd2017-06-28 13:48:38 +0200364 def test_lose_target_ref(self):
365 c = DummyCallable()
366 wr = weakref.ref(c)
367 q = self.Queue()
368 p = self.Process(target=c, args=(q, c))
369 del c
370 p.start()
371 p.join()
372 self.assertIs(wr(), None)
373 self.assertEqual(q.get(), 5)
374
375
Benjamin Petersondfd79492008-06-13 19:13:39 +0000376#
377#
378#
379
380class _UpperCaser(multiprocessing.Process):
381
382 def __init__(self):
383 multiprocessing.Process.__init__(self)
384 self.child_conn, self.parent_conn = multiprocessing.Pipe()
385
386 def run(self):
387 self.parent_conn.close()
388 for s in iter(self.child_conn.recv, None):
389 self.child_conn.send(s.upper())
390 self.child_conn.close()
391
392 def submit(self, s):
393 assert type(s) is str
394 self.parent_conn.send(s)
395 return self.parent_conn.recv()
396
397 def stop(self):
398 self.parent_conn.send(None)
399 self.parent_conn.close()
400 self.child_conn.close()
401
402class _TestSubclassingProcess(BaseTestCase):
403
404 ALLOWED_TYPES = ('processes',)
405
406 def test_subclassing(self):
407 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200408 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000409 uppercaser.start()
410 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
411 self.assertEqual(uppercaser.submit('world'), 'WORLD')
412 uppercaser.stop()
413 uppercaser.join()
414
415#
416#
417#
418
419def queue_empty(q):
420 if hasattr(q, 'empty'):
421 return q.empty()
422 else:
423 return q.qsize() == 0
424
425def queue_full(q, maxsize):
426 if hasattr(q, 'full'):
427 return q.full()
428 else:
429 return q.qsize() == maxsize
430
431
432class _TestQueue(BaseTestCase):
433
434
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000435 @classmethod
436 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 child_can_start.wait()
438 for i in range(6):
439 queue.get()
440 parent_can_continue.set()
441
442 def test_put(self):
443 MAXSIZE = 6
444 queue = self.Queue(maxsize=MAXSIZE)
445 child_can_start = self.Event()
446 parent_can_continue = self.Event()
447
448 proc = self.Process(
449 target=self._test_put,
450 args=(queue, child_can_start, parent_can_continue)
451 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000452 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000453 proc.start()
454
455 self.assertEqual(queue_empty(queue), True)
456 self.assertEqual(queue_full(queue, MAXSIZE), False)
457
458 queue.put(1)
459 queue.put(2, True)
460 queue.put(3, True, None)
461 queue.put(4, False)
462 queue.put(5, False, None)
463 queue.put_nowait(6)
464
465 # the values may be in buffer but not yet in pipe so sleep a bit
466 time.sleep(DELTA)
467
468 self.assertEqual(queue_empty(queue), False)
469 self.assertEqual(queue_full(queue, MAXSIZE), True)
470
471 put = TimingWrapper(queue.put)
472 put_nowait = TimingWrapper(queue.put_nowait)
473
474 self.assertRaises(Queue.Full, put, 7, False)
475 self.assertTimingAlmostEqual(put.elapsed, 0)
476
477 self.assertRaises(Queue.Full, put, 7, False, None)
478 self.assertTimingAlmostEqual(put.elapsed, 0)
479
480 self.assertRaises(Queue.Full, put_nowait, 7)
481 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
482
483 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
484 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
485
486 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
487 self.assertTimingAlmostEqual(put.elapsed, 0)
488
489 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
490 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
491
492 child_can_start.set()
493 parent_can_continue.wait()
494
495 self.assertEqual(queue_empty(queue), True)
496 self.assertEqual(queue_full(queue, MAXSIZE), False)
497
498 proc.join()
499
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000500 @classmethod
501 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000502 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000503 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000504 queue.put(2)
505 queue.put(3)
506 queue.put(4)
507 queue.put(5)
508 parent_can_continue.set()
509
510 def test_get(self):
511 queue = self.Queue()
512 child_can_start = self.Event()
513 parent_can_continue = self.Event()
514
515 proc = self.Process(
516 target=self._test_get,
517 args=(queue, child_can_start, parent_can_continue)
518 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000519 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000520 proc.start()
521
522 self.assertEqual(queue_empty(queue), True)
523
524 child_can_start.set()
525 parent_can_continue.wait()
526
527 time.sleep(DELTA)
528 self.assertEqual(queue_empty(queue), False)
529
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000530 # Hangs unexpectedly, remove for now
531 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000532 self.assertEqual(queue.get(True, None), 2)
533 self.assertEqual(queue.get(True), 3)
534 self.assertEqual(queue.get(timeout=1), 4)
535 self.assertEqual(queue.get_nowait(), 5)
536
537 self.assertEqual(queue_empty(queue), True)
538
539 get = TimingWrapper(queue.get)
540 get_nowait = TimingWrapper(queue.get_nowait)
541
542 self.assertRaises(Queue.Empty, get, False)
543 self.assertTimingAlmostEqual(get.elapsed, 0)
544
545 self.assertRaises(Queue.Empty, get, False, None)
546 self.assertTimingAlmostEqual(get.elapsed, 0)
547
548 self.assertRaises(Queue.Empty, get_nowait)
549 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
550
551 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
552 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
553
554 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
555 self.assertTimingAlmostEqual(get.elapsed, 0)
556
557 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
558 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
559
560 proc.join()
561
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000562 @classmethod
563 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000564 for i in range(10, 20):
565 queue.put(i)
566 # note that at this point the items may only be buffered, so the
567 # process cannot shutdown until the feeder thread has finished
568 # pushing items onto the pipe.
569
570 def test_fork(self):
571 # Old versions of Queue would fail to create a new feeder
572 # thread for a forked process if the original process had its
573 # own feeder thread. This test checks that this no longer
574 # happens.
575
576 queue = self.Queue()
577
578 # put items on queue so that main process starts a feeder thread
579 for i in range(10):
580 queue.put(i)
581
582 # wait to make sure thread starts before we fork a new process
583 time.sleep(DELTA)
584
585 # fork process
586 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200587 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000588 p.start()
589
590 # check that all expected items are in the queue
591 for i in range(20):
592 self.assertEqual(queue.get(), i)
593 self.assertRaises(Queue.Empty, queue.get, False)
594
595 p.join()
596
597 def test_qsize(self):
598 q = self.Queue()
599 try:
600 self.assertEqual(q.qsize(), 0)
601 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600602 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000603 q.put(1)
604 self.assertEqual(q.qsize(), 1)
605 q.put(5)
606 self.assertEqual(q.qsize(), 2)
607 q.get()
608 self.assertEqual(q.qsize(), 1)
609 q.get()
610 self.assertEqual(q.qsize(), 0)
611
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000612 @classmethod
613 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000614 for obj in iter(q.get, None):
615 time.sleep(DELTA)
616 q.task_done()
617
618 def test_task_done(self):
619 queue = self.JoinableQueue()
620
621 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000622 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000623
624 workers = [self.Process(target=self._test_task_done, args=(queue,))
625 for i in xrange(4)]
626
627 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200628 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000629 p.start()
630
631 for i in xrange(10):
632 queue.put(i)
633
634 queue.join()
635
636 for p in workers:
637 queue.put(None)
638
639 for p in workers:
640 p.join()
641
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200642 def test_no_import_lock_contention(self):
643 with test_support.temp_cwd():
644 module_name = 'imported_by_an_imported_module'
645 with open(module_name + '.py', 'w') as f:
646 f.write("""if 1:
647 import multiprocessing
648
649 q = multiprocessing.Queue()
650 q.put('knock knock')
651 q.get(timeout=3)
652 q.close()
653 """)
654
655 with test_support.DirsOnSysPath(os.getcwd()):
656 try:
657 __import__(module_name)
658 except Queue.Empty:
659 self.fail("Probable regression on import lock contention;"
660 " see Issue #22853")
661
Antoine Pitroubdd96472017-05-25 17:53:04 +0200662 def test_queue_feeder_donot_stop_onexc(self):
663 # bpo-30414: verify feeder handles exceptions correctly
664 if self.TYPE != 'processes':
665 self.skipTest('test not appropriate for {}'.format(self.TYPE))
666
667 class NotSerializable(object):
668 def __reduce__(self):
669 raise AttributeError
670 with test.support.captured_stderr():
671 q = self.Queue()
672 q.put(NotSerializable())
673 q.put(True)
Victor Stinnerb60f43a2018-01-29 16:54:29 +0100674 # bpo-30595: use a timeout of 1 second for slow buildbots
675 self.assertTrue(q.get(timeout=1.0))
Antoine Pitroubdd96472017-05-25 17:53:04 +0200676
677
Benjamin Petersondfd79492008-06-13 19:13:39 +0000678#
679#
680#
681
682class _TestLock(BaseTestCase):
683
684 def test_lock(self):
685 lock = self.Lock()
686 self.assertEqual(lock.acquire(), True)
687 self.assertEqual(lock.acquire(False), False)
688 self.assertEqual(lock.release(), None)
689 self.assertRaises((ValueError, threading.ThreadError), lock.release)
690
691 def test_rlock(self):
692 lock = self.RLock()
693 self.assertEqual(lock.acquire(), True)
694 self.assertEqual(lock.acquire(), True)
695 self.assertEqual(lock.acquire(), True)
696 self.assertEqual(lock.release(), None)
697 self.assertEqual(lock.release(), None)
698 self.assertEqual(lock.release(), None)
699 self.assertRaises((AssertionError, RuntimeError), lock.release)
700
Jesse Noller82eb5902009-03-30 23:29:31 +0000701 def test_lock_context(self):
702 with self.Lock():
703 pass
704
Benjamin Petersondfd79492008-06-13 19:13:39 +0000705
706class _TestSemaphore(BaseTestCase):
707
708 def _test_semaphore(self, sem):
709 self.assertReturnsIfImplemented(2, get_value, sem)
710 self.assertEqual(sem.acquire(), True)
711 self.assertReturnsIfImplemented(1, get_value, sem)
712 self.assertEqual(sem.acquire(), True)
713 self.assertReturnsIfImplemented(0, get_value, sem)
714 self.assertEqual(sem.acquire(False), False)
715 self.assertReturnsIfImplemented(0, get_value, sem)
716 self.assertEqual(sem.release(), None)
717 self.assertReturnsIfImplemented(1, get_value, sem)
718 self.assertEqual(sem.release(), None)
719 self.assertReturnsIfImplemented(2, get_value, sem)
720
721 def test_semaphore(self):
722 sem = self.Semaphore(2)
723 self._test_semaphore(sem)
724 self.assertEqual(sem.release(), None)
725 self.assertReturnsIfImplemented(3, get_value, sem)
726 self.assertEqual(sem.release(), None)
727 self.assertReturnsIfImplemented(4, get_value, sem)
728
729 def test_bounded_semaphore(self):
730 sem = self.BoundedSemaphore(2)
731 self._test_semaphore(sem)
732 # Currently fails on OS/X
733 #if HAVE_GETVALUE:
734 # self.assertRaises(ValueError, sem.release)
735 # self.assertReturnsIfImplemented(2, get_value, sem)
736
737 def test_timeout(self):
738 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600739 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000740
741 sem = self.Semaphore(0)
742 acquire = TimingWrapper(sem.acquire)
743
744 self.assertEqual(acquire(False), False)
745 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
746
747 self.assertEqual(acquire(False, None), False)
748 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
749
750 self.assertEqual(acquire(False, TIMEOUT1), False)
751 self.assertTimingAlmostEqual(acquire.elapsed, 0)
752
753 self.assertEqual(acquire(True, TIMEOUT2), False)
754 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
755
756 self.assertEqual(acquire(timeout=TIMEOUT3), False)
757 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
758
759
760class _TestCondition(BaseTestCase):
761
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000762 @classmethod
763 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000764 cond.acquire()
765 sleeping.release()
766 cond.wait(timeout)
767 woken.release()
768 cond.release()
769
770 def check_invariant(self, cond):
771 # this is only supposed to succeed when there are no sleepers
772 if self.TYPE == 'processes':
773 try:
774 sleepers = (cond._sleeping_count.get_value() -
775 cond._woken_count.get_value())
776 self.assertEqual(sleepers, 0)
777 self.assertEqual(cond._wait_semaphore.get_value(), 0)
778 except NotImplementedError:
779 pass
780
781 def test_notify(self):
782 cond = self.Condition()
783 sleeping = self.Semaphore(0)
784 woken = self.Semaphore(0)
785
786 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000787 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000788 p.start()
789
790 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000791 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000792 p.start()
793
794 # wait for both children to start sleeping
795 sleeping.acquire()
796 sleeping.acquire()
797
798 # check no process/thread has woken up
799 time.sleep(DELTA)
800 self.assertReturnsIfImplemented(0, get_value, woken)
801
802 # wake up one process/thread
803 cond.acquire()
804 cond.notify()
805 cond.release()
806
807 # check one process/thread has woken up
808 time.sleep(DELTA)
809 self.assertReturnsIfImplemented(1, get_value, woken)
810
811 # wake up another
812 cond.acquire()
813 cond.notify()
814 cond.release()
815
816 # check other has woken up
817 time.sleep(DELTA)
818 self.assertReturnsIfImplemented(2, get_value, woken)
819
820 # check state is not mucked up
821 self.check_invariant(cond)
822 p.join()
823
824 def test_notify_all(self):
825 cond = self.Condition()
826 sleeping = self.Semaphore(0)
827 woken = self.Semaphore(0)
828
829 # start some threads/processes which will timeout
830 for i in range(3):
831 p = self.Process(target=self.f,
832 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000833 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000834 p.start()
835
836 t = threading.Thread(target=self.f,
837 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000838 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000839 t.start()
840
841 # wait for them all to sleep
842 for i in xrange(6):
843 sleeping.acquire()
844
845 # check they have all timed out
846 for i in xrange(6):
847 woken.acquire()
848 self.assertReturnsIfImplemented(0, get_value, woken)
849
850 # check state is not mucked up
851 self.check_invariant(cond)
852
853 # start some more threads/processes
854 for i in range(3):
855 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000856 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000857 p.start()
858
859 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000860 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000861 t.start()
862
863 # wait for them to all sleep
864 for i in xrange(6):
865 sleeping.acquire()
866
867 # check no process/thread has woken up
868 time.sleep(DELTA)
869 self.assertReturnsIfImplemented(0, get_value, woken)
870
871 # wake them all up
872 cond.acquire()
873 cond.notify_all()
874 cond.release()
875
876 # check they have all woken
Victor Stinner9d1983b2017-05-15 17:32:14 +0200877 for i in range(10):
878 try:
879 if get_value(woken) == 6:
880 break
881 except NotImplementedError:
882 break
883 time.sleep(DELTA)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000884 self.assertReturnsIfImplemented(6, get_value, woken)
885
886 # check state is not mucked up
887 self.check_invariant(cond)
888
889 def test_timeout(self):
890 cond = self.Condition()
891 wait = TimingWrapper(cond.wait)
892 cond.acquire()
893 res = wait(TIMEOUT1)
894 cond.release()
895 self.assertEqual(res, None)
896 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
897
898
899class _TestEvent(BaseTestCase):
900
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000901 @classmethod
902 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000903 time.sleep(TIMEOUT2)
904 event.set()
905
906 def test_event(self):
907 event = self.Event()
908 wait = TimingWrapper(event.wait)
909
Ezio Melottic2077b02011-03-16 12:34:31 +0200910 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000911 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000912 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000913
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000914 # Removed, threading.Event.wait() will return the value of the __flag
915 # instead of None. API Shear with the semaphore backed mp.Event
916 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000917 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000918 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000919 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
920
921 event.set()
922
923 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000924 self.assertEqual(event.is_set(), True)
925 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000926 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000927 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000928 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
929 # self.assertEqual(event.is_set(), True)
930
931 event.clear()
932
933 #self.assertEqual(event.is_set(), False)
934
Jesus Cea6f6016b2011-09-09 20:26:57 +0200935 p = self.Process(target=self._test_event, args=(event,))
936 p.daemon = True
937 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000938 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000939
940#
941#
942#
943
944class _TestValue(BaseTestCase):
945
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000946 ALLOWED_TYPES = ('processes',)
947
Benjamin Petersondfd79492008-06-13 19:13:39 +0000948 codes_values = [
949 ('i', 4343, 24234),
950 ('d', 3.625, -4.25),
951 ('h', -232, 234),
952 ('c', latin('x'), latin('y'))
953 ]
954
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000955 def setUp(self):
956 if not HAS_SHAREDCTYPES:
957 self.skipTest("requires multiprocessing.sharedctypes")
958
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000959 @classmethod
960 def _test(cls, values):
961 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000962 sv.value = cv[2]
963
964
965 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000966 if raw:
967 values = [self.RawValue(code, value)
968 for code, value, _ in self.codes_values]
969 else:
970 values = [self.Value(code, value)
971 for code, value, _ in self.codes_values]
972
973 for sv, cv in zip(values, self.codes_values):
974 self.assertEqual(sv.value, cv[1])
975
976 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200977 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000978 proc.start()
979 proc.join()
980
981 for sv, cv in zip(values, self.codes_values):
982 self.assertEqual(sv.value, cv[2])
983
984 def test_rawvalue(self):
985 self.test_value(raw=True)
986
987 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000988 val1 = self.Value('i', 5)
989 lock1 = val1.get_lock()
990 obj1 = val1.get_obj()
991
992 val2 = self.Value('i', 5, lock=None)
993 lock2 = val2.get_lock()
994 obj2 = val2.get_obj()
995
996 lock = self.Lock()
997 val3 = self.Value('i', 5, lock=lock)
998 lock3 = val3.get_lock()
999 obj3 = val3.get_obj()
1000 self.assertEqual(lock, lock3)
1001
Jesse Noller6ab22152009-01-18 02:45:38 +00001002 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001003 self.assertFalse(hasattr(arr4, 'get_lock'))
1004 self.assertFalse(hasattr(arr4, 'get_obj'))
1005
Jesse Noller6ab22152009-01-18 02:45:38 +00001006 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1007
1008 arr5 = self.RawValue('i', 5)
1009 self.assertFalse(hasattr(arr5, 'get_lock'))
1010 self.assertFalse(hasattr(arr5, 'get_obj'))
1011
Benjamin Petersondfd79492008-06-13 19:13:39 +00001012
1013class _TestArray(BaseTestCase):
1014
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001015 ALLOWED_TYPES = ('processes',)
1016
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001017 @classmethod
1018 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001019 for i in range(1, len(seq)):
1020 seq[i] += seq[i-1]
1021
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001022 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001023 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001024 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1025 if raw:
1026 arr = self.RawArray('i', seq)
1027 else:
1028 arr = self.Array('i', seq)
1029
1030 self.assertEqual(len(arr), len(seq))
1031 self.assertEqual(arr[3], seq[3])
1032 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1033
1034 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1035
1036 self.assertEqual(list(arr[:]), seq)
1037
1038 self.f(seq)
1039
1040 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001041 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001042 p.start()
1043 p.join()
1044
1045 self.assertEqual(list(arr[:]), seq)
1046
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001047 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001048 def test_array_from_size(self):
1049 size = 10
1050 # Test for zeroing (see issue #11675).
1051 # The repetition below strengthens the test by increasing the chances
1052 # of previously allocated non-zero memory being used for the new array
1053 # on the 2nd and 3rd loops.
1054 for _ in range(3):
1055 arr = self.Array('i', size)
1056 self.assertEqual(len(arr), size)
1057 self.assertEqual(list(arr), [0] * size)
1058 arr[:] = range(10)
1059 self.assertEqual(list(arr), range(10))
1060 del arr
1061
1062 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001063 def test_rawarray(self):
1064 self.test_array(raw=True)
1065
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001066 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001067 def test_array_accepts_long(self):
1068 arr = self.Array('i', 10L)
1069 self.assertEqual(len(arr), 10)
1070 raw_arr = self.RawArray('i', 10L)
1071 self.assertEqual(len(raw_arr), 10)
1072
1073 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001074 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001075 arr1 = self.Array('i', range(10))
1076 lock1 = arr1.get_lock()
1077 obj1 = arr1.get_obj()
1078
1079 arr2 = self.Array('i', range(10), lock=None)
1080 lock2 = arr2.get_lock()
1081 obj2 = arr2.get_obj()
1082
1083 lock = self.Lock()
1084 arr3 = self.Array('i', range(10), lock=lock)
1085 lock3 = arr3.get_lock()
1086 obj3 = arr3.get_obj()
1087 self.assertEqual(lock, lock3)
1088
Jesse Noller6ab22152009-01-18 02:45:38 +00001089 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001090 self.assertFalse(hasattr(arr4, 'get_lock'))
1091 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001092 self.assertRaises(AttributeError,
1093 self.Array, 'i', range(10), lock='notalock')
1094
1095 arr5 = self.RawArray('i', range(10))
1096 self.assertFalse(hasattr(arr5, 'get_lock'))
1097 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001098
1099#
1100#
1101#
1102
1103class _TestContainers(BaseTestCase):
1104
1105 ALLOWED_TYPES = ('manager',)
1106
1107 def test_list(self):
1108 a = self.list(range(10))
1109 self.assertEqual(a[:], range(10))
1110
1111 b = self.list()
1112 self.assertEqual(b[:], [])
1113
1114 b.extend(range(5))
1115 self.assertEqual(b[:], range(5))
1116
1117 self.assertEqual(b[2], 2)
1118 self.assertEqual(b[2:10], [2,3,4])
1119
1120 b *= 2
1121 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1122
1123 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1124
1125 self.assertEqual(a[:], range(10))
1126
1127 d = [a, b]
1128 e = self.list(d)
1129 self.assertEqual(
1130 e[:],
1131 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1132 )
1133
1134 f = self.list([a])
1135 a.append('hello')
1136 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1137
1138 def test_dict(self):
1139 d = self.dict()
1140 indices = range(65, 70)
1141 for i in indices:
1142 d[i] = chr(i)
1143 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1144 self.assertEqual(sorted(d.keys()), indices)
1145 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1146 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1147
1148 def test_namespace(self):
1149 n = self.Namespace()
1150 n.name = 'Bob'
1151 n.job = 'Builder'
1152 n._hidden = 'hidden'
1153 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1154 del n.job
1155 self.assertEqual(str(n), "Namespace(name='Bob')")
1156 self.assertTrue(hasattr(n, 'name'))
1157 self.assertTrue(not hasattr(n, 'job'))
1158
1159#
1160#
1161#
1162
1163def sqr(x, wait=0.0):
1164 time.sleep(wait)
1165 return x*x
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001166
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001167def identity(x):
1168 return x
1169
1170class CountedObject(object):
1171 n_instances = 0
1172
1173 def __new__(cls):
1174 cls.n_instances += 1
1175 return object.__new__(cls)
1176
1177 def __del__(self):
1178 type(self).n_instances -= 1
1179
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001180class SayWhenError(ValueError): pass
1181
1182def exception_throwing_generator(total, when):
1183 for i in range(total):
1184 if i == when:
1185 raise SayWhenError("Somebody said when")
1186 yield i
1187
Benjamin Petersondfd79492008-06-13 19:13:39 +00001188class _TestPool(BaseTestCase):
1189
1190 def test_apply(self):
1191 papply = self.pool.apply
1192 self.assertEqual(papply(sqr, (5,)), sqr(5))
1193 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1194
1195 def test_map(self):
1196 pmap = self.pool.map
1197 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1198 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1199 map(sqr, range(100)))
1200
Richard Oudkerk21aad972013-10-28 23:02:22 +00001201 def test_map_unplicklable(self):
1202 # Issue #19425 -- failure to pickle should not cause a hang
1203 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001204 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001205 class A(object):
1206 def __reduce__(self):
1207 raise RuntimeError('cannot pickle')
1208 with self.assertRaises(RuntimeError):
1209 self.pool.map(sqr, [A()]*10)
1210
Jesse Noller7530e472009-07-16 14:23:04 +00001211 def test_map_chunksize(self):
1212 try:
1213 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1214 except multiprocessing.TimeoutError:
1215 self.fail("pool.map_async with chunksize stalled on null list")
1216
Benjamin Petersondfd79492008-06-13 19:13:39 +00001217 def test_async(self):
1218 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1219 get = TimingWrapper(res.get)
1220 self.assertEqual(get(), 49)
1221 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1222
1223 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001224 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001225 get = TimingWrapper(res.get)
1226 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1227 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1228
1229 def test_imap(self):
1230 it = self.pool.imap(sqr, range(10))
1231 self.assertEqual(list(it), map(sqr, range(10)))
1232
1233 it = self.pool.imap(sqr, range(10))
1234 for i in range(10):
1235 self.assertEqual(it.next(), i*i)
1236 self.assertRaises(StopIteration, it.next)
1237
1238 it = self.pool.imap(sqr, range(1000), chunksize=100)
1239 for i in range(1000):
1240 self.assertEqual(it.next(), i*i)
1241 self.assertRaises(StopIteration, it.next)
1242
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001243 def test_imap_handle_iterable_exception(self):
1244 if self.TYPE == 'manager':
1245 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1246
1247 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1248 for i in range(3):
1249 self.assertEqual(next(it), i*i)
1250 self.assertRaises(SayWhenError, it.next)
1251
1252 # SayWhenError seen at start of problematic chunk's results
1253 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1254 for i in range(6):
1255 self.assertEqual(next(it), i*i)
1256 self.assertRaises(SayWhenError, it.next)
1257 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1258 for i in range(4):
1259 self.assertEqual(next(it), i*i)
1260 self.assertRaises(SayWhenError, it.next)
1261
Benjamin Petersondfd79492008-06-13 19:13:39 +00001262 def test_imap_unordered(self):
1263 it = self.pool.imap_unordered(sqr, range(1000))
1264 self.assertEqual(sorted(it), map(sqr, range(1000)))
1265
1266 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1267 self.assertEqual(sorted(it), map(sqr, range(1000)))
1268
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001269 def test_imap_unordered_handle_iterable_exception(self):
1270 if self.TYPE == 'manager':
1271 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1272
1273 it = self.pool.imap_unordered(sqr,
1274 exception_throwing_generator(10, 3),
1275 1)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001276 expected_values = map(sqr, range(10))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001277 with self.assertRaises(SayWhenError):
1278 # imap_unordered makes it difficult to anticipate the SayWhenError
1279 for i in range(10):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001280 value = next(it)
1281 self.assertIn(value, expected_values)
1282 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001283
1284 it = self.pool.imap_unordered(sqr,
1285 exception_throwing_generator(20, 7),
1286 2)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001287 expected_values = map(sqr, range(20))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001288 with self.assertRaises(SayWhenError):
1289 for i in range(20):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001290 value = next(it)
1291 self.assertIn(value, expected_values)
1292 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001293
Benjamin Petersondfd79492008-06-13 19:13:39 +00001294 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001295 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1296 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1297
Benjamin Petersondfd79492008-06-13 19:13:39 +00001298 p = multiprocessing.Pool(3)
1299 self.assertEqual(3, len(p._pool))
1300 p.close()
1301 p.join()
1302
1303 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001304 p = self.Pool(4)
1305 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001306 time.sleep, [0.1 for i in range(10000)], chunksize=1
1307 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001308 p.terminate()
1309 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001310 join()
1311 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001312
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001313 def test_empty_iterable(self):
1314 # See Issue 12157
1315 p = self.Pool(1)
1316
1317 self.assertEqual(p.map(sqr, []), [])
1318 self.assertEqual(list(p.imap(sqr, [])), [])
1319 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1320 self.assertEqual(p.map_async(sqr, []).get(), [])
1321
1322 p.close()
1323 p.join()
1324
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001325 def test_release_task_refs(self):
1326 # Issue #29861: task arguments and results should not be kept
1327 # alive after we are done with them.
1328 objs = list(CountedObject() for i in range(10))
1329 refs = list(weakref.ref(o) for o in objs)
1330 self.pool.map(identity, objs)
1331
1332 del objs
Victor Stinnerfd6094c2017-05-05 09:47:11 +02001333 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001334 self.assertEqual(set(wr() for wr in refs), {None})
1335 # With a process pool, copies of the objects are returned, check
1336 # they were released too.
1337 self.assertEqual(CountedObject.n_instances, 0)
1338
1339
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001340def unpickleable_result():
1341 return lambda: 42
1342
1343class _TestPoolWorkerErrors(BaseTestCase):
1344 ALLOWED_TYPES = ('processes', )
1345
1346 def test_unpickleable_result(self):
1347 from multiprocessing.pool import MaybeEncodingError
1348 p = multiprocessing.Pool(2)
1349
1350 # Make sure we don't lose pool processes because of encoding errors.
1351 for iteration in range(20):
1352 res = p.apply_async(unpickleable_result)
1353 self.assertRaises(MaybeEncodingError, res.get)
1354
1355 p.close()
1356 p.join()
1357
Jesse Noller654ade32010-01-27 03:05:57 +00001358class _TestPoolWorkerLifetime(BaseTestCase):
1359
1360 ALLOWED_TYPES = ('processes', )
1361 def test_pool_worker_lifetime(self):
1362 p = multiprocessing.Pool(3, maxtasksperchild=10)
1363 self.assertEqual(3, len(p._pool))
1364 origworkerpids = [w.pid for w in p._pool]
1365 # Run many tasks so each worker gets replaced (hopefully)
1366 results = []
1367 for i in range(100):
1368 results.append(p.apply_async(sqr, (i, )))
1369 # Fetch the results and verify we got the right answers,
1370 # also ensuring all the tasks have completed.
1371 for (j, res) in enumerate(results):
1372 self.assertEqual(res.get(), sqr(j))
1373 # Refill the pool
1374 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001375 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001376 # (countdown * DELTA = 5 seconds max startup process time)
1377 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001378 while countdown and not all(w.is_alive() for w in p._pool):
1379 countdown -= 1
1380 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001381 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001382 # All pids should be assigned. See issue #7805.
1383 self.assertNotIn(None, origworkerpids)
1384 self.assertNotIn(None, finalworkerpids)
1385 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001386 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1387 p.close()
1388 p.join()
1389
Charles-François Natali46f990e2011-10-24 18:43:51 +02001390 def test_pool_worker_lifetime_early_close(self):
1391 # Issue #10332: closing a pool whose workers have limited lifetimes
1392 # before all the tasks completed would make join() hang.
1393 p = multiprocessing.Pool(3, maxtasksperchild=1)
1394 results = []
1395 for i in range(6):
1396 results.append(p.apply_async(sqr, (i, 0.3)))
1397 p.close()
1398 p.join()
1399 # check the results
1400 for (j, res) in enumerate(results):
1401 self.assertEqual(res.get(), sqr(j))
1402
1403
Benjamin Petersondfd79492008-06-13 19:13:39 +00001404#
1405# Test that manager has expected number of shared objects left
1406#
1407
1408class _TestZZZNumberOfObjects(BaseTestCase):
1409 # Because test cases are sorted alphabetically, this one will get
1410 # run after all the other tests for the manager. It tests that
1411 # there have been no "reference leaks" for the manager's shared
1412 # objects. Note the comment in _TestPool.test_terminate().
1413 ALLOWED_TYPES = ('manager',)
1414
1415 def test_number_of_objects(self):
1416 EXPECTED_NUMBER = 1 # the pool object is still alive
1417 multiprocessing.active_children() # discard dead process objs
1418 gc.collect() # do garbage collection
1419 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001420 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001421 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001422 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001423 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001424
1425 self.assertEqual(refs, EXPECTED_NUMBER)
1426
1427#
1428# Test of creating a customized manager class
1429#
1430
1431from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1432
1433class FooBar(object):
1434 def f(self):
1435 return 'f()'
1436 def g(self):
1437 raise ValueError
1438 def _h(self):
1439 return '_h()'
1440
1441def baz():
1442 for i in xrange(10):
1443 yield i*i
1444
1445class IteratorProxy(BaseProxy):
1446 _exposed_ = ('next', '__next__')
1447 def __iter__(self):
1448 return self
1449 def next(self):
1450 return self._callmethod('next')
1451 def __next__(self):
1452 return self._callmethod('__next__')
1453
1454class MyManager(BaseManager):
1455 pass
1456
1457MyManager.register('Foo', callable=FooBar)
1458MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1459MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1460
1461
1462class _TestMyManager(BaseTestCase):
1463
1464 ALLOWED_TYPES = ('manager',)
1465
1466 def test_mymanager(self):
1467 manager = MyManager()
1468 manager.start()
1469
1470 foo = manager.Foo()
1471 bar = manager.Bar()
1472 baz = manager.baz()
1473
1474 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1475 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1476
1477 self.assertEqual(foo_methods, ['f', 'g'])
1478 self.assertEqual(bar_methods, ['f', '_h'])
1479
1480 self.assertEqual(foo.f(), 'f()')
1481 self.assertRaises(ValueError, foo.g)
1482 self.assertEqual(foo._callmethod('f'), 'f()')
1483 self.assertRaises(RemoteError, foo._callmethod, '_h')
1484
1485 self.assertEqual(bar.f(), 'f()')
1486 self.assertEqual(bar._h(), '_h()')
1487 self.assertEqual(bar._callmethod('f'), 'f()')
1488 self.assertEqual(bar._callmethod('_h'), '_h()')
1489
1490 self.assertEqual(list(baz), [i*i for i in range(10)])
1491
1492 manager.shutdown()
1493
1494#
1495# Test of connecting to a remote server and using xmlrpclib for serialization
1496#
1497
1498_queue = Queue.Queue()
1499def get_queue():
1500 return _queue
1501
1502class QueueManager(BaseManager):
1503 '''manager class used by server process'''
1504QueueManager.register('get_queue', callable=get_queue)
1505
1506class QueueManager2(BaseManager):
1507 '''manager class which specifies the same interface as QueueManager'''
1508QueueManager2.register('get_queue')
1509
1510
1511SERIALIZER = 'xmlrpclib'
1512
1513class _TestRemoteManager(BaseTestCase):
1514
1515 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001516 values = ['hello world', None, True, 2.25,
1517 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1518 ]
1519 result = values[:]
1520 if test_support.have_unicode:
1521 #result[-1] = u'hall\xe5 v\xe4rlden'
1522 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1523 r'\u0441\u0432\u0456\u0442')
1524 values.append(uvalue)
1525 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001526
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001527 @classmethod
1528 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001529 manager = QueueManager2(
1530 address=address, authkey=authkey, serializer=SERIALIZER
1531 )
1532 manager.connect()
1533 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001534 # Note that xmlrpclib will deserialize object as a list not a tuple
1535 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001536
1537 def test_remote(self):
1538 authkey = os.urandom(32)
1539
1540 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001541 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001542 )
1543 manager.start()
1544
1545 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001546 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001547 p.start()
1548
1549 manager2 = QueueManager2(
1550 address=manager.address, authkey=authkey, serializer=SERIALIZER
1551 )
1552 manager2.connect()
1553 queue = manager2.get_queue()
1554
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001555 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001556
1557 # Because we are using xmlrpclib for serialization instead of
1558 # pickle this will cause a serialization error.
1559 self.assertRaises(Exception, queue.put, time.sleep)
1560
1561 # Make queue finalizer run before the server is stopped
1562 del queue
1563 manager.shutdown()
1564
Jesse Noller459a6482009-03-30 15:50:42 +00001565class _TestManagerRestart(BaseTestCase):
1566
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001567 @classmethod
1568 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001569 manager = QueueManager(
1570 address=address, authkey=authkey, serializer=SERIALIZER)
1571 manager.connect()
1572 queue = manager.get_queue()
1573 queue.put('hello world')
1574
1575 def test_rapid_restart(self):
1576 authkey = os.urandom(32)
1577 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001578 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001579 srvr = manager.get_server()
1580 addr = srvr.address
1581 # Close the connection.Listener socket which gets opened as a part
1582 # of manager.get_server(). It's not needed for the test.
1583 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001584 manager.start()
1585
1586 p = self.Process(target=self._putter, args=(manager.address, authkey))
1587 p.start()
Victor Stinner883520a2017-08-16 13:14:40 +02001588 p.join()
Jesse Noller459a6482009-03-30 15:50:42 +00001589 queue = manager.get_queue()
1590 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001591 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001592 manager.shutdown()
Victor Stinner883520a2017-08-16 13:14:40 +02001593
Jesse Noller459a6482009-03-30 15:50:42 +00001594 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001595 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001596 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001597 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001598
Benjamin Petersondfd79492008-06-13 19:13:39 +00001599#
1600#
1601#
1602
1603SENTINEL = latin('')
1604
1605class _TestConnection(BaseTestCase):
1606
1607 ALLOWED_TYPES = ('processes', 'threads')
1608
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001609 @classmethod
1610 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001611 for msg in iter(conn.recv_bytes, SENTINEL):
1612 conn.send_bytes(msg)
1613 conn.close()
1614
1615 def test_connection(self):
1616 conn, child_conn = self.Pipe()
1617
1618 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001619 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001620 p.start()
1621
1622 seq = [1, 2.25, None]
1623 msg = latin('hello world')
1624 longmsg = msg * 10
1625 arr = array.array('i', range(4))
1626
1627 if self.TYPE == 'processes':
1628 self.assertEqual(type(conn.fileno()), int)
1629
1630 self.assertEqual(conn.send(seq), None)
1631 self.assertEqual(conn.recv(), seq)
1632
1633 self.assertEqual(conn.send_bytes(msg), None)
1634 self.assertEqual(conn.recv_bytes(), msg)
1635
1636 if self.TYPE == 'processes':
1637 buffer = array.array('i', [0]*10)
1638 expected = list(arr) + [0] * (10 - len(arr))
1639 self.assertEqual(conn.send_bytes(arr), None)
1640 self.assertEqual(conn.recv_bytes_into(buffer),
1641 len(arr) * buffer.itemsize)
1642 self.assertEqual(list(buffer), expected)
1643
1644 buffer = array.array('i', [0]*10)
1645 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1646 self.assertEqual(conn.send_bytes(arr), None)
1647 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1648 len(arr) * buffer.itemsize)
1649 self.assertEqual(list(buffer), expected)
1650
1651 buffer = bytearray(latin(' ' * 40))
1652 self.assertEqual(conn.send_bytes(longmsg), None)
1653 try:
1654 res = conn.recv_bytes_into(buffer)
1655 except multiprocessing.BufferTooShort, e:
1656 self.assertEqual(e.args, (longmsg,))
1657 else:
1658 self.fail('expected BufferTooShort, got %s' % res)
1659
1660 poll = TimingWrapper(conn.poll)
1661
1662 self.assertEqual(poll(), False)
1663 self.assertTimingAlmostEqual(poll.elapsed, 0)
1664
1665 self.assertEqual(poll(TIMEOUT1), False)
1666 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1667
1668 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001669 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001670
1671 self.assertEqual(poll(TIMEOUT1), True)
1672 self.assertTimingAlmostEqual(poll.elapsed, 0)
1673
1674 self.assertEqual(conn.recv(), None)
1675
1676 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1677 conn.send_bytes(really_big_msg)
1678 self.assertEqual(conn.recv_bytes(), really_big_msg)
1679
1680 conn.send_bytes(SENTINEL) # tell child to quit
1681 child_conn.close()
1682
1683 if self.TYPE == 'processes':
1684 self.assertEqual(conn.readable, True)
1685 self.assertEqual(conn.writable, True)
1686 self.assertRaises(EOFError, conn.recv)
1687 self.assertRaises(EOFError, conn.recv_bytes)
1688
1689 p.join()
1690
1691 def test_duplex_false(self):
1692 reader, writer = self.Pipe(duplex=False)
1693 self.assertEqual(writer.send(1), None)
1694 self.assertEqual(reader.recv(), 1)
1695 if self.TYPE == 'processes':
1696 self.assertEqual(reader.readable, True)
1697 self.assertEqual(reader.writable, False)
1698 self.assertEqual(writer.readable, False)
1699 self.assertEqual(writer.writable, True)
1700 self.assertRaises(IOError, reader.send, 2)
1701 self.assertRaises(IOError, writer.recv)
1702 self.assertRaises(IOError, writer.poll)
1703
1704 def test_spawn_close(self):
1705 # We test that a pipe connection can be closed by parent
1706 # process immediately after child is spawned. On Windows this
1707 # would have sometimes failed on old versions because
1708 # child_conn would be closed before the child got a chance to
1709 # duplicate it.
1710 conn, child_conn = self.Pipe()
1711
1712 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001713 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001714 p.start()
1715 child_conn.close() # this might complete before child initializes
1716
1717 msg = latin('hello')
1718 conn.send_bytes(msg)
1719 self.assertEqual(conn.recv_bytes(), msg)
1720
1721 conn.send_bytes(SENTINEL)
1722 conn.close()
1723 p.join()
1724
1725 def test_sendbytes(self):
1726 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001727 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001728
1729 msg = latin('abcdefghijklmnopqrstuvwxyz')
1730 a, b = self.Pipe()
1731
1732 a.send_bytes(msg)
1733 self.assertEqual(b.recv_bytes(), msg)
1734
1735 a.send_bytes(msg, 5)
1736 self.assertEqual(b.recv_bytes(), msg[5:])
1737
1738 a.send_bytes(msg, 7, 8)
1739 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1740
1741 a.send_bytes(msg, 26)
1742 self.assertEqual(b.recv_bytes(), latin(''))
1743
1744 a.send_bytes(msg, 26, 0)
1745 self.assertEqual(b.recv_bytes(), latin(''))
1746
1747 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1748
1749 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1750
1751 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1752
1753 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1754
1755 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1756
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001757 @classmethod
1758 def _is_fd_assigned(cls, fd):
1759 try:
1760 os.fstat(fd)
1761 except OSError as e:
1762 if e.errno == errno.EBADF:
1763 return False
1764 raise
1765 else:
1766 return True
1767
1768 @classmethod
1769 def _writefd(cls, conn, data, create_dummy_fds=False):
1770 if create_dummy_fds:
1771 for i in range(0, 256):
1772 if not cls._is_fd_assigned(i):
1773 os.dup2(conn.fileno(), i)
1774 fd = reduction.recv_handle(conn)
1775 if msvcrt:
1776 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1777 os.write(fd, data)
1778 os.close(fd)
1779
Charles-François Natalif8413b22011-09-21 18:44:49 +02001780 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001781 def test_fd_transfer(self):
1782 if self.TYPE != 'processes':
1783 self.skipTest("only makes sense with processes")
1784 conn, child_conn = self.Pipe(duplex=True)
1785
1786 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001787 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001788 p.start()
1789 with open(test_support.TESTFN, "wb") as f:
1790 fd = f.fileno()
1791 if msvcrt:
1792 fd = msvcrt.get_osfhandle(fd)
1793 reduction.send_handle(conn, fd, p.pid)
1794 p.join()
1795 with open(test_support.TESTFN, "rb") as f:
1796 self.assertEqual(f.read(), b"foo")
1797
Charles-François Natalif8413b22011-09-21 18:44:49 +02001798 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001799 @unittest.skipIf(sys.platform == "win32",
1800 "test semantics don't make sense on Windows")
1801 @unittest.skipIf(MAXFD <= 256,
1802 "largest assignable fd number is too small")
1803 @unittest.skipUnless(hasattr(os, "dup2"),
1804 "test needs os.dup2()")
1805 def test_large_fd_transfer(self):
1806 # With fd > 256 (issue #11657)
1807 if self.TYPE != 'processes':
1808 self.skipTest("only makes sense with processes")
1809 conn, child_conn = self.Pipe(duplex=True)
1810
1811 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001812 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001813 p.start()
1814 with open(test_support.TESTFN, "wb") as f:
1815 fd = f.fileno()
1816 for newfd in range(256, MAXFD):
1817 if not self._is_fd_assigned(newfd):
1818 break
1819 else:
1820 self.fail("could not find an unassigned large file descriptor")
1821 os.dup2(fd, newfd)
1822 try:
1823 reduction.send_handle(conn, newfd, p.pid)
1824 finally:
1825 os.close(newfd)
1826 p.join()
1827 with open(test_support.TESTFN, "rb") as f:
1828 self.assertEqual(f.read(), b"bar")
1829
Jesus Ceac23484b2011-09-21 03:47:39 +02001830 @classmethod
1831 def _send_data_without_fd(self, conn):
1832 os.write(conn.fileno(), b"\0")
1833
Charles-François Natalif8413b22011-09-21 18:44:49 +02001834 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001835 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1836 def test_missing_fd_transfer(self):
1837 # Check that exception is raised when received data is not
1838 # accompanied by a file descriptor in ancillary data.
1839 if self.TYPE != 'processes':
1840 self.skipTest("only makes sense with processes")
1841 conn, child_conn = self.Pipe(duplex=True)
1842
1843 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1844 p.daemon = True
1845 p.start()
1846 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1847 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001848
Benjamin Petersondfd79492008-06-13 19:13:39 +00001849class _TestListenerClient(BaseTestCase):
1850
1851 ALLOWED_TYPES = ('processes', 'threads')
1852
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001853 @classmethod
1854 def _test(cls, address):
1855 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001856 conn.send('hello')
1857 conn.close()
1858
1859 def test_listener_client(self):
1860 for family in self.connection.families:
1861 l = self.connection.Listener(family=family)
1862 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001863 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001864 p.start()
1865 conn = l.accept()
1866 self.assertEqual(conn.recv(), 'hello')
1867 p.join()
1868 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001869
1870 def test_issue14725(self):
1871 l = self.connection.Listener()
1872 p = self.Process(target=self._test, args=(l.address,))
1873 p.daemon = True
1874 p.start()
1875 time.sleep(1)
1876 # On Windows the client process should by now have connected,
1877 # written data and closed the pipe handle by now. This causes
1878 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1879 # 14725.
1880 conn = l.accept()
1881 self.assertEqual(conn.recv(), 'hello')
1882 conn.close()
1883 p.join()
1884 l.close()
1885
Benjamin Petersondfd79492008-06-13 19:13:39 +00001886#
1887# Test of sending connection and socket objects between processes
1888#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001889"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001890class _TestPicklingConnections(BaseTestCase):
1891
1892 ALLOWED_TYPES = ('processes',)
1893
1894 def _listener(self, conn, families):
1895 for fam in families:
1896 l = self.connection.Listener(family=fam)
1897 conn.send(l.address)
1898 new_conn = l.accept()
1899 conn.send(new_conn)
1900
1901 if self.TYPE == 'processes':
1902 l = socket.socket()
1903 l.bind(('localhost', 0))
1904 conn.send(l.getsockname())
1905 l.listen(1)
1906 new_conn, addr = l.accept()
1907 conn.send(new_conn)
1908
1909 conn.recv()
1910
1911 def _remote(self, conn):
1912 for (address, msg) in iter(conn.recv, None):
1913 client = self.connection.Client(address)
1914 client.send(msg.upper())
1915 client.close()
1916
1917 if self.TYPE == 'processes':
1918 address, msg = conn.recv()
1919 client = socket.socket()
1920 client.connect(address)
1921 client.sendall(msg.upper())
1922 client.close()
1923
1924 conn.close()
1925
1926 def test_pickling(self):
1927 try:
1928 multiprocessing.allow_connection_pickling()
1929 except ImportError:
1930 return
1931
1932 families = self.connection.families
1933
1934 lconn, lconn0 = self.Pipe()
1935 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001936 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001937 lp.start()
1938 lconn0.close()
1939
1940 rconn, rconn0 = self.Pipe()
1941 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001942 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001943 rp.start()
1944 rconn0.close()
1945
1946 for fam in families:
1947 msg = ('This connection uses family %s' % fam).encode('ascii')
1948 address = lconn.recv()
1949 rconn.send((address, msg))
1950 new_conn = lconn.recv()
1951 self.assertEqual(new_conn.recv(), msg.upper())
1952
1953 rconn.send(None)
1954
1955 if self.TYPE == 'processes':
1956 msg = latin('This connection uses a normal socket')
1957 address = lconn.recv()
1958 rconn.send((address, msg))
1959 if hasattr(socket, 'fromfd'):
1960 new_conn = lconn.recv()
1961 self.assertEqual(new_conn.recv(100), msg.upper())
1962 else:
1963 # XXX On Windows with Py2.6 need to backport fromfd()
1964 discard = lconn.recv_bytes()
1965
1966 lconn.send(None)
1967
1968 rconn.close()
1969 lconn.close()
1970
1971 lp.join()
1972 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001973"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001974#
1975#
1976#
1977
1978class _TestHeap(BaseTestCase):
1979
1980 ALLOWED_TYPES = ('processes',)
1981
1982 def test_heap(self):
1983 iterations = 5000
1984 maxblocks = 50
1985 blocks = []
1986
1987 # create and destroy lots of blocks of different sizes
1988 for i in xrange(iterations):
1989 size = int(random.lognormvariate(0, 1) * 1000)
1990 b = multiprocessing.heap.BufferWrapper(size)
1991 blocks.append(b)
1992 if len(blocks) > maxblocks:
1993 i = random.randrange(maxblocks)
1994 del blocks[i]
1995
1996 # get the heap object
1997 heap = multiprocessing.heap.BufferWrapper._heap
1998
1999 # verify the state of the heap
2000 all = []
2001 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002002 heap._lock.acquire()
2003 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002004 for L in heap._len_to_seq.values():
2005 for arena, start, stop in L:
2006 all.append((heap._arenas.index(arena), start, stop,
2007 stop-start, 'free'))
2008 for arena, start, stop in heap._allocated_blocks:
2009 all.append((heap._arenas.index(arena), start, stop,
2010 stop-start, 'occupied'))
2011 occupied += (stop-start)
2012
2013 all.sort()
2014
2015 for i in range(len(all)-1):
2016 (arena, start, stop) = all[i][:3]
2017 (narena, nstart, nstop) = all[i+1][:3]
2018 self.assertTrue((arena != narena and nstart == 0) or
2019 (stop == nstart))
2020
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002021 def test_free_from_gc(self):
2022 # Check that freeing of blocks by the garbage collector doesn't deadlock
2023 # (issue #12352).
2024 # Make sure the GC is enabled, and set lower collection thresholds to
2025 # make collections more frequent (and increase the probability of
2026 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02002027 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002028 gc.enable()
2029 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02002030 thresholds = gc.get_threshold()
2031 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002032 gc.set_threshold(10)
2033
2034 # perform numerous block allocations, with cyclic references to make
2035 # sure objects are collected asynchronously by the gc
2036 for i in range(5000):
2037 a = multiprocessing.heap.BufferWrapper(1)
2038 b = multiprocessing.heap.BufferWrapper(1)
2039 # circular references
2040 a.buddy = b
2041 b.buddy = a
2042
Benjamin Petersondfd79492008-06-13 19:13:39 +00002043#
2044#
2045#
2046
Benjamin Petersondfd79492008-06-13 19:13:39 +00002047class _Foo(Structure):
2048 _fields_ = [
2049 ('x', c_int),
2050 ('y', c_double)
2051 ]
2052
2053class _TestSharedCTypes(BaseTestCase):
2054
2055 ALLOWED_TYPES = ('processes',)
2056
Antoine Pitrou55d935a2010-11-22 16:35:57 +00002057 def setUp(self):
2058 if not HAS_SHAREDCTYPES:
2059 self.skipTest("requires multiprocessing.sharedctypes")
2060
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002061 @classmethod
2062 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002063 x.value *= 2
2064 y.value *= 2
2065 foo.x *= 2
2066 foo.y *= 2
2067 string.value *= 2
2068 for i in range(len(arr)):
2069 arr[i] *= 2
2070
2071 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002072 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002073 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002074 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002075 arr = self.Array('d', range(10), lock=lock)
2076 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00002077 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002078
2079 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002080 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002081 p.start()
2082 p.join()
2083
2084 self.assertEqual(x.value, 14)
2085 self.assertAlmostEqual(y.value, 2.0/3.0)
2086 self.assertEqual(foo.x, 6)
2087 self.assertAlmostEqual(foo.y, 4.0)
2088 for i in range(10):
2089 self.assertAlmostEqual(arr[i], i*2)
2090 self.assertEqual(string.value, latin('hellohello'))
2091
2092 def test_synchronize(self):
2093 self.test_sharedctypes(lock=True)
2094
2095 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002096 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00002097 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002098 foo.x = 0
2099 foo.y = 0
2100 self.assertEqual(bar.x, 2)
2101 self.assertAlmostEqual(bar.y, 5.0)
2102
2103#
2104#
2105#
2106
2107class _TestFinalize(BaseTestCase):
2108
2109 ALLOWED_TYPES = ('processes',)
2110
Antoine Pitroud09f1672017-06-13 17:52:29 +02002111 def setUp(self):
2112 self.registry_backup = util._finalizer_registry.copy()
2113 util._finalizer_registry.clear()
2114
2115 def tearDown(self):
2116 self.assertFalse(util._finalizer_registry)
2117 util._finalizer_registry.update(self.registry_backup)
2118
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002119 @classmethod
2120 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002121 class Foo(object):
2122 pass
2123
2124 a = Foo()
2125 util.Finalize(a, conn.send, args=('a',))
2126 del a # triggers callback for a
2127
2128 b = Foo()
2129 close_b = util.Finalize(b, conn.send, args=('b',))
2130 close_b() # triggers callback for b
2131 close_b() # does nothing because callback has already been called
2132 del b # does nothing because callback has already been called
2133
2134 c = Foo()
2135 util.Finalize(c, conn.send, args=('c',))
2136
2137 d10 = Foo()
2138 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2139
2140 d01 = Foo()
2141 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2142 d02 = Foo()
2143 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2144 d03 = Foo()
2145 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2146
2147 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2148
2149 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2150
Ezio Melottic2077b02011-03-16 12:34:31 +02002151 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002152 # garbage collecting locals
2153 util._exit_function()
2154 conn.close()
2155 os._exit(0)
2156
2157 def test_finalize(self):
2158 conn, child_conn = self.Pipe()
2159
2160 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002161 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002162 p.start()
2163 p.join()
2164
2165 result = [obj for obj in iter(conn.recv, 'STOP')]
2166 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2167
Antoine Pitroud09f1672017-06-13 17:52:29 +02002168 def test_thread_safety(self):
2169 # bpo-24484: _run_finalizers() should be thread-safe
2170 def cb():
2171 pass
2172
2173 class Foo(object):
2174 def __init__(self):
2175 self.ref = self # create reference cycle
2176 # insert finalizer at random key
2177 util.Finalize(self, cb, exitpriority=random.randint(1, 100))
2178
2179 finish = False
2180 exc = []
2181
2182 def run_finalizers():
2183 while not finish:
2184 time.sleep(random.random() * 1e-1)
2185 try:
2186 # A GC run will eventually happen during this,
2187 # collecting stale Foo's and mutating the registry
2188 util._run_finalizers()
2189 except Exception as e:
2190 exc.append(e)
2191
2192 def make_finalizers():
2193 d = {}
2194 while not finish:
2195 try:
2196 # Old Foo's get gradually replaced and later
2197 # collected by the GC (because of the cyclic ref)
2198 d[random.getrandbits(5)] = {Foo() for i in range(10)}
2199 except Exception as e:
2200 exc.append(e)
2201 d.clear()
2202
2203 old_interval = sys.getcheckinterval()
2204 old_threshold = gc.get_threshold()
2205 try:
2206 sys.setcheckinterval(10)
2207 gc.set_threshold(5, 5, 5)
2208 threads = [threading.Thread(target=run_finalizers),
2209 threading.Thread(target=make_finalizers)]
2210 with test_support.start_threads(threads):
2211 time.sleep(4.0) # Wait a bit to trigger race condition
2212 finish = True
2213 if exc:
2214 raise exc[0]
2215 finally:
2216 sys.setcheckinterval(old_interval)
2217 gc.set_threshold(*old_threshold)
2218 gc.collect() # Collect remaining Foo's
2219
2220
Benjamin Petersondfd79492008-06-13 19:13:39 +00002221#
2222# Test that from ... import * works for each module
2223#
2224
2225class _TestImportStar(BaseTestCase):
2226
2227 ALLOWED_TYPES = ('processes',)
2228
2229 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002230 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002231 'multiprocessing', 'multiprocessing.connection',
2232 'multiprocessing.heap', 'multiprocessing.managers',
2233 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002234 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002235 ]
2236
Charles-François Natalif8413b22011-09-21 18:44:49 +02002237 if HAS_REDUCTION:
2238 modules.append('multiprocessing.reduction')
2239
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002240 if c_int is not None:
2241 # This module requires _ctypes
2242 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002243
2244 for name in modules:
2245 __import__(name)
2246 mod = sys.modules[name]
2247
2248 for attr in getattr(mod, '__all__', ()):
2249 self.assertTrue(
2250 hasattr(mod, attr),
2251 '%r does not have attribute %r' % (mod, attr)
2252 )
2253
2254#
2255# Quick test that logging works -- does not test logging output
2256#
2257
2258class _TestLogging(BaseTestCase):
2259
2260 ALLOWED_TYPES = ('processes',)
2261
2262 def test_enable_logging(self):
2263 logger = multiprocessing.get_logger()
2264 logger.setLevel(util.SUBWARNING)
2265 self.assertTrue(logger is not None)
2266 logger.debug('this will not be printed')
2267 logger.info('nor will this')
2268 logger.setLevel(LOG_LEVEL)
2269
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002270 @classmethod
2271 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002272 logger = multiprocessing.get_logger()
2273 conn.send(logger.getEffectiveLevel())
2274
2275 def test_level(self):
2276 LEVEL1 = 32
2277 LEVEL2 = 37
2278
2279 logger = multiprocessing.get_logger()
2280 root_logger = logging.getLogger()
2281 root_level = root_logger.level
2282
2283 reader, writer = multiprocessing.Pipe(duplex=False)
2284
2285 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002286 p = self.Process(target=self._test_level, args=(writer,))
2287 p.daemon = True
2288 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002289 self.assertEqual(LEVEL1, reader.recv())
2290
2291 logger.setLevel(logging.NOTSET)
2292 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002293 p = self.Process(target=self._test_level, args=(writer,))
2294 p.daemon = True
2295 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002296 self.assertEqual(LEVEL2, reader.recv())
2297
2298 root_logger.setLevel(root_level)
2299 logger.setLevel(level=LOG_LEVEL)
2300
Jesse Noller814d02d2009-11-21 14:38:23 +00002301
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002302# class _TestLoggingProcessName(BaseTestCase):
2303#
2304# def handle(self, record):
2305# assert record.processName == multiprocessing.current_process().name
2306# self.__handled = True
2307#
2308# def test_logging(self):
2309# handler = logging.Handler()
2310# handler.handle = self.handle
2311# self.__handled = False
2312# # Bypass getLogger() and side-effects
2313# logger = logging.getLoggerClass()(
2314# 'multiprocessing.test.TestLoggingProcessName')
2315# logger.addHandler(handler)
2316# logger.propagate = False
2317#
2318# logger.warn('foo')
2319# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002320
Benjamin Petersondfd79492008-06-13 19:13:39 +00002321#
Richard Oudkerkba482642013-02-26 12:37:07 +00002322# Check that Process.join() retries if os.waitpid() fails with EINTR
2323#
2324
2325class _TestPollEintr(BaseTestCase):
2326
2327 ALLOWED_TYPES = ('processes',)
2328
2329 @classmethod
2330 def _killer(cls, pid):
2331 time.sleep(0.5)
2332 os.kill(pid, signal.SIGUSR1)
2333
2334 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2335 def test_poll_eintr(self):
2336 got_signal = [False]
2337 def record(*args):
2338 got_signal[0] = True
2339 pid = os.getpid()
2340 oldhandler = signal.signal(signal.SIGUSR1, record)
2341 try:
2342 killer = self.Process(target=self._killer, args=(pid,))
2343 killer.start()
2344 p = self.Process(target=time.sleep, args=(1,))
2345 p.start()
2346 p.join()
2347 self.assertTrue(got_signal[0])
2348 self.assertEqual(p.exitcode, 0)
2349 killer.join()
2350 finally:
2351 signal.signal(signal.SIGUSR1, oldhandler)
2352
2353#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002354# Test to verify handle verification, see issue 3321
2355#
2356
2357class TestInvalidHandle(unittest.TestCase):
2358
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002359 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002360 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002361 conn = _multiprocessing.Connection(44977608)
2362 self.assertRaises(IOError, conn.poll)
2363 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002364
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002365#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002366# Functions used to create test cases from the base ones in this module
2367#
2368
2369def get_attributes(Source, names):
2370 d = {}
2371 for name in names:
2372 obj = getattr(Source, name)
2373 if type(obj) == type(get_attributes):
2374 obj = staticmethod(obj)
2375 d[name] = obj
2376 return d
2377
2378def create_test_cases(Mixin, type):
2379 result = {}
2380 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002381 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002382
2383 for name in glob.keys():
2384 if name.startswith('_Test'):
2385 base = glob[name]
2386 if type in base.ALLOWED_TYPES:
2387 newname = 'With' + Type + name[1:]
2388 class Temp(base, unittest.TestCase, Mixin):
2389 pass
2390 result[newname] = Temp
2391 Temp.__name__ = newname
2392 Temp.__module__ = Mixin.__module__
2393 return result
2394
2395#
2396# Create test cases
2397#
2398
2399class ProcessesMixin(object):
2400 TYPE = 'processes'
2401 Process = multiprocessing.Process
2402 locals().update(get_attributes(multiprocessing, (
2403 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2404 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2405 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002406 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002407 )))
2408
2409testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2410globals().update(testcases_processes)
2411
2412
2413class ManagerMixin(object):
2414 TYPE = 'manager'
2415 Process = multiprocessing.Process
2416 manager = object.__new__(multiprocessing.managers.SyncManager)
2417 locals().update(get_attributes(manager, (
2418 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2419 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002420 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002421 )))
2422
2423testcases_manager = create_test_cases(ManagerMixin, type='manager')
2424globals().update(testcases_manager)
2425
2426
2427class ThreadsMixin(object):
2428 TYPE = 'threads'
2429 Process = multiprocessing.dummy.Process
2430 locals().update(get_attributes(multiprocessing.dummy, (
2431 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2432 'Condition', 'Event', 'Value', 'Array', 'current_process',
2433 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002434 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002435 )))
2436
2437testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2438globals().update(testcases_threads)
2439
Neal Norwitz0c519b32008-08-25 01:50:24 +00002440class OtherTest(unittest.TestCase):
2441 # TODO: add more tests for deliver/answer challenge.
2442 def test_deliver_challenge_auth_failure(self):
2443 class _FakeConnection(object):
2444 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002445 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002446 def send_bytes(self, data):
2447 pass
2448 self.assertRaises(multiprocessing.AuthenticationError,
2449 multiprocessing.connection.deliver_challenge,
2450 _FakeConnection(), b'abc')
2451
2452 def test_answer_challenge_auth_failure(self):
2453 class _FakeConnection(object):
2454 def __init__(self):
2455 self.count = 0
2456 def recv_bytes(self, size):
2457 self.count += 1
2458 if self.count == 1:
2459 return multiprocessing.connection.CHALLENGE
2460 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002461 return b'something bogus'
2462 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002463 def send_bytes(self, data):
2464 pass
2465 self.assertRaises(multiprocessing.AuthenticationError,
2466 multiprocessing.connection.answer_challenge,
2467 _FakeConnection(), b'abc')
2468
Jesse Noller7152f6d2009-04-02 05:17:26 +00002469#
2470# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2471#
2472
2473def initializer(ns):
2474 ns.test += 1
2475
2476class TestInitializers(unittest.TestCase):
2477 def setUp(self):
2478 self.mgr = multiprocessing.Manager()
2479 self.ns = self.mgr.Namespace()
2480 self.ns.test = 0
2481
2482 def tearDown(self):
2483 self.mgr.shutdown()
2484
2485 def test_manager_initializer(self):
2486 m = multiprocessing.managers.SyncManager()
2487 self.assertRaises(TypeError, m.start, 1)
2488 m.start(initializer, (self.ns,))
2489 self.assertEqual(self.ns.test, 1)
2490 m.shutdown()
2491
2492 def test_pool_initializer(self):
2493 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2494 p = multiprocessing.Pool(1, initializer, (self.ns,))
2495 p.close()
2496 p.join()
2497 self.assertEqual(self.ns.test, 1)
2498
Jesse Noller1b90efb2009-06-30 17:11:52 +00002499#
2500# Issue 5155, 5313, 5331: Test process in processes
2501# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2502#
2503
Richard Oudkerkc5496072013-09-29 17:10:40 +01002504def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002505 try:
2506 item = q.get(block=False)
2507 except Queue.Empty:
2508 pass
2509
Richard Oudkerkc5496072013-09-29 17:10:40 +01002510def _test_process(q):
2511 queue = multiprocessing.Queue()
2512 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2513 subProc.daemon = True
2514 subProc.start()
2515 subProc.join()
2516
Jesse Noller1b90efb2009-06-30 17:11:52 +00002517def _afunc(x):
2518 return x*x
2519
2520def pool_in_process():
2521 pool = multiprocessing.Pool(processes=4)
2522 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2523
2524class _file_like(object):
2525 def __init__(self, delegate):
2526 self._delegate = delegate
2527 self._pid = None
2528
2529 @property
2530 def cache(self):
2531 pid = os.getpid()
2532 # There are no race conditions since fork keeps only the running thread
2533 if pid != self._pid:
2534 self._pid = pid
2535 self._cache = []
2536 return self._cache
2537
2538 def write(self, data):
2539 self.cache.append(data)
2540
2541 def flush(self):
2542 self._delegate.write(''.join(self.cache))
2543 self._cache = []
2544
2545class TestStdinBadfiledescriptor(unittest.TestCase):
2546
2547 def test_queue_in_process(self):
2548 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002549 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002550 proc.start()
2551 proc.join()
2552
2553 def test_pool_in_process(self):
2554 p = multiprocessing.Process(target=pool_in_process)
2555 p.start()
2556 p.join()
2557
2558 def test_flushing(self):
2559 sio = StringIO()
2560 flike = _file_like(sio)
2561 flike.write('foo')
2562 proc = multiprocessing.Process(target=lambda: flike.flush())
2563 flike.flush()
2564 assert sio.getvalue() == 'foo'
2565
Richard Oudkerke4b99382012-07-27 14:05:46 +01002566#
2567# Test interaction with socket timeouts - see Issue #6056
2568#
2569
2570class TestTimeouts(unittest.TestCase):
2571 @classmethod
2572 def _test_timeout(cls, child, address):
2573 time.sleep(1)
2574 child.send(123)
2575 child.close()
2576 conn = multiprocessing.connection.Client(address)
2577 conn.send(456)
2578 conn.close()
2579
2580 def test_timeout(self):
2581 old_timeout = socket.getdefaulttimeout()
2582 try:
2583 socket.setdefaulttimeout(0.1)
2584 parent, child = multiprocessing.Pipe(duplex=True)
2585 l = multiprocessing.connection.Listener(family='AF_INET')
2586 p = multiprocessing.Process(target=self._test_timeout,
2587 args=(child, l.address))
2588 p.start()
2589 child.close()
2590 self.assertEqual(parent.recv(), 123)
2591 parent.close()
2592 conn = l.accept()
2593 self.assertEqual(conn.recv(), 456)
2594 conn.close()
2595 l.close()
2596 p.join(10)
2597 finally:
2598 socket.setdefaulttimeout(old_timeout)
2599
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002600#
2601# Test what happens with no "if __name__ == '__main__'"
2602#
2603
2604class TestNoForkBomb(unittest.TestCase):
2605 def test_noforkbomb(self):
2606 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2607 if WIN32:
2608 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002609 self.assertEqual(out, '')
2610 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002611 else:
2612 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002613 self.assertEqual(out.rstrip(), '123')
2614 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002615
2616#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002617# Issue 12098: check sys.flags of child matches that for parent
2618#
2619
2620class TestFlags(unittest.TestCase):
2621 @classmethod
2622 def run_in_grandchild(cls, conn):
2623 conn.send(tuple(sys.flags))
2624
2625 @classmethod
2626 def run_in_child(cls):
2627 import json
2628 r, w = multiprocessing.Pipe(duplex=False)
2629 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2630 p.start()
2631 grandchild_flags = r.recv()
2632 p.join()
2633 r.close()
2634 w.close()
2635 flags = (tuple(sys.flags), grandchild_flags)
2636 print(json.dumps(flags))
2637
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002638 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002639 def test_flags(self):
2640 import json, subprocess
2641 # start child process using unusual flags
2642 prog = ('from test.test_multiprocessing import TestFlags; ' +
2643 'TestFlags.run_in_child()')
2644 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002645 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002646 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2647 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002648
2649#
2650# Issue #17555: ForkAwareThreadLock
2651#
2652
2653class TestForkAwareThreadLock(unittest.TestCase):
2654 # We recurisvely start processes. Issue #17555 meant that the
2655 # after fork registry would get duplicate entries for the same
2656 # lock. The size of the registry at generation n was ~2**n.
2657
2658 @classmethod
2659 def child(cls, n, conn):
2660 if n > 1:
2661 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2662 p.start()
2663 p.join()
2664 else:
2665 conn.send(len(util._afterfork_registry))
2666 conn.close()
2667
2668 def test_lock(self):
2669 r, w = multiprocessing.Pipe(False)
2670 l = util.ForkAwareThreadLock()
2671 old_size = len(util._afterfork_registry)
2672 p = multiprocessing.Process(target=self.child, args=(5, w))
2673 p.start()
2674 new_size = r.recv()
2675 p.join()
2676 self.assertLessEqual(new_size, old_size)
2677
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002678#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002679# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2680#
2681
2682class TestIgnoreEINTR(unittest.TestCase):
2683
2684 @classmethod
2685 def _test_ignore(cls, conn):
2686 def handler(signum, frame):
2687 pass
2688 signal.signal(signal.SIGUSR1, handler)
2689 conn.send('ready')
2690 x = conn.recv()
2691 conn.send(x)
Victor Stinner82c456f2018-05-31 07:35:34 +02002692 conn.send_bytes(b'x' * test_support.PIPE_MAX_SIZE)
Richard Oudkerk41072db2013-07-01 18:45:28 +01002693
2694 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2695 def test_ignore(self):
2696 conn, child_conn = multiprocessing.Pipe()
2697 try:
2698 p = multiprocessing.Process(target=self._test_ignore,
2699 args=(child_conn,))
2700 p.daemon = True
2701 p.start()
2702 child_conn.close()
2703 self.assertEqual(conn.recv(), 'ready')
2704 time.sleep(0.1)
2705 os.kill(p.pid, signal.SIGUSR1)
2706 time.sleep(0.1)
2707 conn.send(1234)
2708 self.assertEqual(conn.recv(), 1234)
2709 time.sleep(0.1)
2710 os.kill(p.pid, signal.SIGUSR1)
Victor Stinner82c456f2018-05-31 07:35:34 +02002711 self.assertEqual(conn.recv_bytes(),
2712 b'x' * test_support.PIPE_MAX_SIZE)
Richard Oudkerk41072db2013-07-01 18:45:28 +01002713 time.sleep(0.1)
2714 p.join()
2715 finally:
2716 conn.close()
2717
2718 @classmethod
2719 def _test_ignore_listener(cls, conn):
2720 def handler(signum, frame):
2721 pass
2722 signal.signal(signal.SIGUSR1, handler)
2723 l = multiprocessing.connection.Listener()
2724 conn.send(l.address)
2725 a = l.accept()
2726 a.send('welcome')
2727
2728 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2729 def test_ignore_listener(self):
2730 conn, child_conn = multiprocessing.Pipe()
2731 try:
2732 p = multiprocessing.Process(target=self._test_ignore_listener,
2733 args=(child_conn,))
2734 p.daemon = True
2735 p.start()
2736 child_conn.close()
2737 address = conn.recv()
2738 time.sleep(0.1)
2739 os.kill(p.pid, signal.SIGUSR1)
2740 time.sleep(0.1)
2741 client = multiprocessing.connection.Client(address)
2742 self.assertEqual(client.recv(), 'welcome')
2743 p.join()
2744 finally:
2745 conn.close()
2746
2747#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002748#
2749#
2750
Jesse Noller1b90efb2009-06-30 17:11:52 +00002751testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002752 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002753 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002754
Benjamin Petersondfd79492008-06-13 19:13:39 +00002755#
2756#
2757#
2758
2759def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002760 if sys.platform.startswith("linux"):
2761 try:
2762 lock = multiprocessing.RLock()
2763 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002764 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002765
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002766 check_enough_semaphores()
2767
Benjamin Petersondfd79492008-06-13 19:13:39 +00002768 if run is None:
2769 from test.test_support import run_unittest as run
2770
2771 util.get_temp_dir() # creates temp directory for use by all processes
2772
2773 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2774
Jesse Noller146b7ab2008-07-02 16:44:09 +00002775 ProcessesMixin.pool = multiprocessing.Pool(4)
2776 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2777 ManagerMixin.manager.__init__()
2778 ManagerMixin.manager.start()
2779 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002780
2781 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002782 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2783 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002784 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2785 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002786 )
2787
2788 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2789 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002790 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2791 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002792 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002793 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002794 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002795 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2796 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2797 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002798 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002799
Jesse Noller146b7ab2008-07-02 16:44:09 +00002800 ThreadsMixin.pool.terminate()
2801 ProcessesMixin.pool.terminate()
2802 ManagerMixin.pool.terminate()
2803 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002804
Jesse Noller146b7ab2008-07-02 16:44:09 +00002805 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002806
2807def main():
2808 test_main(unittest.TextTestRunner(verbosity=2).run)
2809
2810if __name__ == '__main__':
2811 main()