blob: a77bfee6718ee66afab33b28e0c69b64dfd0340c [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Antoine Pitrou5084ff72017-03-24 16:03:46 +010017import weakref
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010018import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
Antoine Pitrou12536bd2017-06-28 13:48:38 +0200178class DummyCallable(object):
179 def __call__(self, q, c):
180 assert isinstance(c, DummyCallable)
181 q.put(5)
182
183
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184class _TestProcess(BaseTestCase):
185
186 ALLOWED_TYPES = ('processes', 'threads')
187
188 def test_current(self):
189 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600190 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000191
192 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194
195 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000196 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000197 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000199 self.assertEqual(current.ident, os.getpid())
200 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 @classmethod
203 def _test(cls, q, *args, **kwds):
204 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205 q.put(args)
206 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000207 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000208 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000209 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000210 q.put(current.pid)
211
212 def test_process(self):
213 q = self.Queue(1)
214 e = self.Event()
215 args = (q, 1, 2)
216 kwargs = {'hello':23, 'bye':2.54}
217 name = 'SomeProcess'
218 p = self.Process(
219 target=self._test, args=args, kwargs=kwargs, name=name
220 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000221 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 current = self.current_process()
223
224 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000225 self.assertEqual(p.authkey, current.authkey)
226 self.assertEqual(p.is_alive(), False)
227 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000228 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000229 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000230 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
232 p.start()
233
Ezio Melotti2623a372010-11-21 13:34:58 +0000234 self.assertEqual(p.exitcode, None)
235 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000236 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000237
Ezio Melotti2623a372010-11-21 13:34:58 +0000238 self.assertEqual(q.get(), args[1:])
239 self.assertEqual(q.get(), kwargs)
240 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000241 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000242 self.assertEqual(q.get(), current.authkey)
243 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
245 p.join()
246
Ezio Melotti2623a372010-11-21 13:34:58 +0000247 self.assertEqual(p.exitcode, 0)
248 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000249 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000250
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000251 @classmethod
252 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000253 time.sleep(1000)
254
255 def test_terminate(self):
256 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600257 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000260 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261 p.start()
262
263 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000264 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000265 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000266
267 p.terminate()
268
269 join = TimingWrapper(p.join)
270 self.assertEqual(join(), None)
271 self.assertTimingAlmostEqual(join.elapsed, 0.0)
272
273 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000274 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000275
276 p.join()
277
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000278 # XXX sometimes get p.exitcode == 0 on Windows ...
279 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000280
281 def test_cpu_count(self):
282 try:
283 cpus = multiprocessing.cpu_count()
284 except NotImplementedError:
285 cpus = 1
286 self.assertTrue(type(cpus) is int)
287 self.assertTrue(cpus >= 1)
288
289 def test_active_children(self):
290 self.assertEqual(type(self.active_children()), list)
291
292 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000294
Jesus Cea6f6016b2011-09-09 20:26:57 +0200295 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000296 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000297 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298
299 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000300 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000301
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 @classmethod
303 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 from multiprocessing import forking
305 wconn.send(id)
306 if len(id) < 2:
307 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000308 p = cls.Process(
309 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000310 )
311 p.start()
312 p.join()
313
314 def test_recursion(self):
315 rconn, wconn = self.Pipe(duplex=False)
316 self._test_recursion(wconn, [])
317
318 time.sleep(DELTA)
319 result = []
320 while rconn.poll():
321 result.append(rconn.recv())
322
323 expected = [
324 [],
325 [0],
326 [0, 0],
327 [0, 1],
328 [1],
329 [1, 0],
330 [1, 1]
331 ]
332 self.assertEqual(result, expected)
333
Richard Oudkerk2182e052012-06-06 19:01:14 +0100334 @classmethod
335 def _test_sys_exit(cls, reason, testfn):
336 sys.stderr = open(testfn, 'w')
337 sys.exit(reason)
338
339 def test_sys_exit(self):
340 # See Issue 13854
341 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600342 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100343
344 testfn = test_support.TESTFN
345 self.addCleanup(test_support.unlink, testfn)
346
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000347 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100348 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
349 p.daemon = True
350 p.start()
351 p.join(5)
352 self.assertEqual(p.exitcode, code)
353
354 with open(testfn, 'r') as f:
355 self.assertEqual(f.read().rstrip(), str(reason))
356
357 for reason in (True, False, 8):
358 p = self.Process(target=sys.exit, args=(reason,))
359 p.daemon = True
360 p.start()
361 p.join(5)
362 self.assertEqual(p.exitcode, reason)
363
Antoine Pitrou12536bd2017-06-28 13:48:38 +0200364 def test_lose_target_ref(self):
365 c = DummyCallable()
366 wr = weakref.ref(c)
367 q = self.Queue()
368 p = self.Process(target=c, args=(q, c))
369 del c
370 p.start()
371 p.join()
372 self.assertIs(wr(), None)
373 self.assertEqual(q.get(), 5)
374
375
Benjamin Petersondfd79492008-06-13 19:13:39 +0000376#
377#
378#
379
380class _UpperCaser(multiprocessing.Process):
381
382 def __init__(self):
383 multiprocessing.Process.__init__(self)
384 self.child_conn, self.parent_conn = multiprocessing.Pipe()
385
386 def run(self):
387 self.parent_conn.close()
388 for s in iter(self.child_conn.recv, None):
389 self.child_conn.send(s.upper())
390 self.child_conn.close()
391
392 def submit(self, s):
393 assert type(s) is str
394 self.parent_conn.send(s)
395 return self.parent_conn.recv()
396
397 def stop(self):
398 self.parent_conn.send(None)
399 self.parent_conn.close()
400 self.child_conn.close()
401
402class _TestSubclassingProcess(BaseTestCase):
403
404 ALLOWED_TYPES = ('processes',)
405
406 def test_subclassing(self):
407 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200408 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000409 uppercaser.start()
410 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
411 self.assertEqual(uppercaser.submit('world'), 'WORLD')
412 uppercaser.stop()
413 uppercaser.join()
414
415#
416#
417#
418
419def queue_empty(q):
420 if hasattr(q, 'empty'):
421 return q.empty()
422 else:
423 return q.qsize() == 0
424
425def queue_full(q, maxsize):
426 if hasattr(q, 'full'):
427 return q.full()
428 else:
429 return q.qsize() == maxsize
430
431
432class _TestQueue(BaseTestCase):
433
434
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000435 @classmethod
436 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 child_can_start.wait()
438 for i in range(6):
439 queue.get()
440 parent_can_continue.set()
441
442 def test_put(self):
443 MAXSIZE = 6
444 queue = self.Queue(maxsize=MAXSIZE)
445 child_can_start = self.Event()
446 parent_can_continue = self.Event()
447
448 proc = self.Process(
449 target=self._test_put,
450 args=(queue, child_can_start, parent_can_continue)
451 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000452 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000453 proc.start()
454
455 self.assertEqual(queue_empty(queue), True)
456 self.assertEqual(queue_full(queue, MAXSIZE), False)
457
458 queue.put(1)
459 queue.put(2, True)
460 queue.put(3, True, None)
461 queue.put(4, False)
462 queue.put(5, False, None)
463 queue.put_nowait(6)
464
465 # the values may be in buffer but not yet in pipe so sleep a bit
466 time.sleep(DELTA)
467
468 self.assertEqual(queue_empty(queue), False)
469 self.assertEqual(queue_full(queue, MAXSIZE), True)
470
471 put = TimingWrapper(queue.put)
472 put_nowait = TimingWrapper(queue.put_nowait)
473
474 self.assertRaises(Queue.Full, put, 7, False)
475 self.assertTimingAlmostEqual(put.elapsed, 0)
476
477 self.assertRaises(Queue.Full, put, 7, False, None)
478 self.assertTimingAlmostEqual(put.elapsed, 0)
479
480 self.assertRaises(Queue.Full, put_nowait, 7)
481 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
482
483 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
484 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
485
486 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
487 self.assertTimingAlmostEqual(put.elapsed, 0)
488
489 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
490 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
491
492 child_can_start.set()
493 parent_can_continue.wait()
494
495 self.assertEqual(queue_empty(queue), True)
496 self.assertEqual(queue_full(queue, MAXSIZE), False)
497
498 proc.join()
499
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000500 @classmethod
501 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000502 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000503 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000504 queue.put(2)
505 queue.put(3)
506 queue.put(4)
507 queue.put(5)
508 parent_can_continue.set()
509
510 def test_get(self):
511 queue = self.Queue()
512 child_can_start = self.Event()
513 parent_can_continue = self.Event()
514
515 proc = self.Process(
516 target=self._test_get,
517 args=(queue, child_can_start, parent_can_continue)
518 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000519 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000520 proc.start()
521
522 self.assertEqual(queue_empty(queue), True)
523
524 child_can_start.set()
525 parent_can_continue.wait()
526
527 time.sleep(DELTA)
528 self.assertEqual(queue_empty(queue), False)
529
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000530 # Hangs unexpectedly, remove for now
531 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000532 self.assertEqual(queue.get(True, None), 2)
533 self.assertEqual(queue.get(True), 3)
534 self.assertEqual(queue.get(timeout=1), 4)
535 self.assertEqual(queue.get_nowait(), 5)
536
537 self.assertEqual(queue_empty(queue), True)
538
539 get = TimingWrapper(queue.get)
540 get_nowait = TimingWrapper(queue.get_nowait)
541
542 self.assertRaises(Queue.Empty, get, False)
543 self.assertTimingAlmostEqual(get.elapsed, 0)
544
545 self.assertRaises(Queue.Empty, get, False, None)
546 self.assertTimingAlmostEqual(get.elapsed, 0)
547
548 self.assertRaises(Queue.Empty, get_nowait)
549 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
550
551 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
552 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
553
554 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
555 self.assertTimingAlmostEqual(get.elapsed, 0)
556
557 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
558 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
559
560 proc.join()
561
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000562 @classmethod
563 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000564 for i in range(10, 20):
565 queue.put(i)
566 # note that at this point the items may only be buffered, so the
567 # process cannot shutdown until the feeder thread has finished
568 # pushing items onto the pipe.
569
570 def test_fork(self):
571 # Old versions of Queue would fail to create a new feeder
572 # thread for a forked process if the original process had its
573 # own feeder thread. This test checks that this no longer
574 # happens.
575
576 queue = self.Queue()
577
578 # put items on queue so that main process starts a feeder thread
579 for i in range(10):
580 queue.put(i)
581
582 # wait to make sure thread starts before we fork a new process
583 time.sleep(DELTA)
584
585 # fork process
586 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200587 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000588 p.start()
589
590 # check that all expected items are in the queue
591 for i in range(20):
592 self.assertEqual(queue.get(), i)
593 self.assertRaises(Queue.Empty, queue.get, False)
594
595 p.join()
596
597 def test_qsize(self):
598 q = self.Queue()
599 try:
600 self.assertEqual(q.qsize(), 0)
601 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600602 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000603 q.put(1)
604 self.assertEqual(q.qsize(), 1)
605 q.put(5)
606 self.assertEqual(q.qsize(), 2)
607 q.get()
608 self.assertEqual(q.qsize(), 1)
609 q.get()
610 self.assertEqual(q.qsize(), 0)
611
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000612 @classmethod
613 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000614 for obj in iter(q.get, None):
615 time.sleep(DELTA)
616 q.task_done()
617
618 def test_task_done(self):
619 queue = self.JoinableQueue()
620
621 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000622 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000623
624 workers = [self.Process(target=self._test_task_done, args=(queue,))
625 for i in xrange(4)]
626
627 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200628 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000629 p.start()
630
631 for i in xrange(10):
632 queue.put(i)
633
634 queue.join()
635
636 for p in workers:
637 queue.put(None)
638
639 for p in workers:
640 p.join()
641
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200642 def test_no_import_lock_contention(self):
643 with test_support.temp_cwd():
644 module_name = 'imported_by_an_imported_module'
645 with open(module_name + '.py', 'w') as f:
646 f.write("""if 1:
647 import multiprocessing
648
649 q = multiprocessing.Queue()
650 q.put('knock knock')
651 q.get(timeout=3)
652 q.close()
653 """)
654
655 with test_support.DirsOnSysPath(os.getcwd()):
656 try:
657 __import__(module_name)
658 except Queue.Empty:
659 self.fail("Probable regression on import lock contention;"
660 " see Issue #22853")
661
Antoine Pitroubdd96472017-05-25 17:53:04 +0200662 def test_queue_feeder_donot_stop_onexc(self):
663 # bpo-30414: verify feeder handles exceptions correctly
664 if self.TYPE != 'processes':
665 self.skipTest('test not appropriate for {}'.format(self.TYPE))
666
667 class NotSerializable(object):
668 def __reduce__(self):
669 raise AttributeError
670 with test.support.captured_stderr():
671 q = self.Queue()
672 q.put(NotSerializable())
673 q.put(True)
674 self.assertTrue(q.get(timeout=0.1))
675
676
Benjamin Petersondfd79492008-06-13 19:13:39 +0000677#
678#
679#
680
681class _TestLock(BaseTestCase):
682
683 def test_lock(self):
684 lock = self.Lock()
685 self.assertEqual(lock.acquire(), True)
686 self.assertEqual(lock.acquire(False), False)
687 self.assertEqual(lock.release(), None)
688 self.assertRaises((ValueError, threading.ThreadError), lock.release)
689
690 def test_rlock(self):
691 lock = self.RLock()
692 self.assertEqual(lock.acquire(), True)
693 self.assertEqual(lock.acquire(), True)
694 self.assertEqual(lock.acquire(), True)
695 self.assertEqual(lock.release(), None)
696 self.assertEqual(lock.release(), None)
697 self.assertEqual(lock.release(), None)
698 self.assertRaises((AssertionError, RuntimeError), lock.release)
699
Jesse Noller82eb5902009-03-30 23:29:31 +0000700 def test_lock_context(self):
701 with self.Lock():
702 pass
703
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704
705class _TestSemaphore(BaseTestCase):
706
707 def _test_semaphore(self, sem):
708 self.assertReturnsIfImplemented(2, get_value, sem)
709 self.assertEqual(sem.acquire(), True)
710 self.assertReturnsIfImplemented(1, get_value, sem)
711 self.assertEqual(sem.acquire(), True)
712 self.assertReturnsIfImplemented(0, get_value, sem)
713 self.assertEqual(sem.acquire(False), False)
714 self.assertReturnsIfImplemented(0, get_value, sem)
715 self.assertEqual(sem.release(), None)
716 self.assertReturnsIfImplemented(1, get_value, sem)
717 self.assertEqual(sem.release(), None)
718 self.assertReturnsIfImplemented(2, get_value, sem)
719
720 def test_semaphore(self):
721 sem = self.Semaphore(2)
722 self._test_semaphore(sem)
723 self.assertEqual(sem.release(), None)
724 self.assertReturnsIfImplemented(3, get_value, sem)
725 self.assertEqual(sem.release(), None)
726 self.assertReturnsIfImplemented(4, get_value, sem)
727
728 def test_bounded_semaphore(self):
729 sem = self.BoundedSemaphore(2)
730 self._test_semaphore(sem)
731 # Currently fails on OS/X
732 #if HAVE_GETVALUE:
733 # self.assertRaises(ValueError, sem.release)
734 # self.assertReturnsIfImplemented(2, get_value, sem)
735
736 def test_timeout(self):
737 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600738 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000739
740 sem = self.Semaphore(0)
741 acquire = TimingWrapper(sem.acquire)
742
743 self.assertEqual(acquire(False), False)
744 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
745
746 self.assertEqual(acquire(False, None), False)
747 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
748
749 self.assertEqual(acquire(False, TIMEOUT1), False)
750 self.assertTimingAlmostEqual(acquire.elapsed, 0)
751
752 self.assertEqual(acquire(True, TIMEOUT2), False)
753 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
754
755 self.assertEqual(acquire(timeout=TIMEOUT3), False)
756 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
757
758
759class _TestCondition(BaseTestCase):
760
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000761 @classmethod
762 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000763 cond.acquire()
764 sleeping.release()
765 cond.wait(timeout)
766 woken.release()
767 cond.release()
768
769 def check_invariant(self, cond):
770 # this is only supposed to succeed when there are no sleepers
771 if self.TYPE == 'processes':
772 try:
773 sleepers = (cond._sleeping_count.get_value() -
774 cond._woken_count.get_value())
775 self.assertEqual(sleepers, 0)
776 self.assertEqual(cond._wait_semaphore.get_value(), 0)
777 except NotImplementedError:
778 pass
779
780 def test_notify(self):
781 cond = self.Condition()
782 sleeping = self.Semaphore(0)
783 woken = self.Semaphore(0)
784
785 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000786 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000787 p.start()
788
789 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000790 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000791 p.start()
792
793 # wait for both children to start sleeping
794 sleeping.acquire()
795 sleeping.acquire()
796
797 # check no process/thread has woken up
798 time.sleep(DELTA)
799 self.assertReturnsIfImplemented(0, get_value, woken)
800
801 # wake up one process/thread
802 cond.acquire()
803 cond.notify()
804 cond.release()
805
806 # check one process/thread has woken up
807 time.sleep(DELTA)
808 self.assertReturnsIfImplemented(1, get_value, woken)
809
810 # wake up another
811 cond.acquire()
812 cond.notify()
813 cond.release()
814
815 # check other has woken up
816 time.sleep(DELTA)
817 self.assertReturnsIfImplemented(2, get_value, woken)
818
819 # check state is not mucked up
820 self.check_invariant(cond)
821 p.join()
822
823 def test_notify_all(self):
824 cond = self.Condition()
825 sleeping = self.Semaphore(0)
826 woken = self.Semaphore(0)
827
828 # start some threads/processes which will timeout
829 for i in range(3):
830 p = self.Process(target=self.f,
831 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000832 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833 p.start()
834
835 t = threading.Thread(target=self.f,
836 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000837 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000838 t.start()
839
840 # wait for them all to sleep
841 for i in xrange(6):
842 sleeping.acquire()
843
844 # check they have all timed out
845 for i in xrange(6):
846 woken.acquire()
847 self.assertReturnsIfImplemented(0, get_value, woken)
848
849 # check state is not mucked up
850 self.check_invariant(cond)
851
852 # start some more threads/processes
853 for i in range(3):
854 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000855 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000856 p.start()
857
858 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000859 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000860 t.start()
861
862 # wait for them to all sleep
863 for i in xrange(6):
864 sleeping.acquire()
865
866 # check no process/thread has woken up
867 time.sleep(DELTA)
868 self.assertReturnsIfImplemented(0, get_value, woken)
869
870 # wake them all up
871 cond.acquire()
872 cond.notify_all()
873 cond.release()
874
875 # check they have all woken
Victor Stinner9d1983b2017-05-15 17:32:14 +0200876 for i in range(10):
877 try:
878 if get_value(woken) == 6:
879 break
880 except NotImplementedError:
881 break
882 time.sleep(DELTA)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000883 self.assertReturnsIfImplemented(6, get_value, woken)
884
885 # check state is not mucked up
886 self.check_invariant(cond)
887
888 def test_timeout(self):
889 cond = self.Condition()
890 wait = TimingWrapper(cond.wait)
891 cond.acquire()
892 res = wait(TIMEOUT1)
893 cond.release()
894 self.assertEqual(res, None)
895 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
896
897
898class _TestEvent(BaseTestCase):
899
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000900 @classmethod
901 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000902 time.sleep(TIMEOUT2)
903 event.set()
904
905 def test_event(self):
906 event = self.Event()
907 wait = TimingWrapper(event.wait)
908
Ezio Melottic2077b02011-03-16 12:34:31 +0200909 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000910 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000911 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000912
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000913 # Removed, threading.Event.wait() will return the value of the __flag
914 # instead of None. API Shear with the semaphore backed mp.Event
915 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000916 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000917 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000918 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
919
920 event.set()
921
922 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000923 self.assertEqual(event.is_set(), True)
924 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000925 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000926 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000927 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
928 # self.assertEqual(event.is_set(), True)
929
930 event.clear()
931
932 #self.assertEqual(event.is_set(), False)
933
Jesus Cea6f6016b2011-09-09 20:26:57 +0200934 p = self.Process(target=self._test_event, args=(event,))
935 p.daemon = True
936 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000937 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000938
939#
940#
941#
942
943class _TestValue(BaseTestCase):
944
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000945 ALLOWED_TYPES = ('processes',)
946
Benjamin Petersondfd79492008-06-13 19:13:39 +0000947 codes_values = [
948 ('i', 4343, 24234),
949 ('d', 3.625, -4.25),
950 ('h', -232, 234),
951 ('c', latin('x'), latin('y'))
952 ]
953
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000954 def setUp(self):
955 if not HAS_SHAREDCTYPES:
956 self.skipTest("requires multiprocessing.sharedctypes")
957
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000958 @classmethod
959 def _test(cls, values):
960 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000961 sv.value = cv[2]
962
963
964 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000965 if raw:
966 values = [self.RawValue(code, value)
967 for code, value, _ in self.codes_values]
968 else:
969 values = [self.Value(code, value)
970 for code, value, _ in self.codes_values]
971
972 for sv, cv in zip(values, self.codes_values):
973 self.assertEqual(sv.value, cv[1])
974
975 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200976 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000977 proc.start()
978 proc.join()
979
980 for sv, cv in zip(values, self.codes_values):
981 self.assertEqual(sv.value, cv[2])
982
983 def test_rawvalue(self):
984 self.test_value(raw=True)
985
986 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000987 val1 = self.Value('i', 5)
988 lock1 = val1.get_lock()
989 obj1 = val1.get_obj()
990
991 val2 = self.Value('i', 5, lock=None)
992 lock2 = val2.get_lock()
993 obj2 = val2.get_obj()
994
995 lock = self.Lock()
996 val3 = self.Value('i', 5, lock=lock)
997 lock3 = val3.get_lock()
998 obj3 = val3.get_obj()
999 self.assertEqual(lock, lock3)
1000
Jesse Noller6ab22152009-01-18 02:45:38 +00001001 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001002 self.assertFalse(hasattr(arr4, 'get_lock'))
1003 self.assertFalse(hasattr(arr4, 'get_obj'))
1004
Jesse Noller6ab22152009-01-18 02:45:38 +00001005 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1006
1007 arr5 = self.RawValue('i', 5)
1008 self.assertFalse(hasattr(arr5, 'get_lock'))
1009 self.assertFalse(hasattr(arr5, 'get_obj'))
1010
Benjamin Petersondfd79492008-06-13 19:13:39 +00001011
1012class _TestArray(BaseTestCase):
1013
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001014 ALLOWED_TYPES = ('processes',)
1015
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001016 @classmethod
1017 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001018 for i in range(1, len(seq)):
1019 seq[i] += seq[i-1]
1020
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001021 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001022 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001023 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1024 if raw:
1025 arr = self.RawArray('i', seq)
1026 else:
1027 arr = self.Array('i', seq)
1028
1029 self.assertEqual(len(arr), len(seq))
1030 self.assertEqual(arr[3], seq[3])
1031 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1032
1033 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1034
1035 self.assertEqual(list(arr[:]), seq)
1036
1037 self.f(seq)
1038
1039 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001040 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001041 p.start()
1042 p.join()
1043
1044 self.assertEqual(list(arr[:]), seq)
1045
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001046 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001047 def test_array_from_size(self):
1048 size = 10
1049 # Test for zeroing (see issue #11675).
1050 # The repetition below strengthens the test by increasing the chances
1051 # of previously allocated non-zero memory being used for the new array
1052 # on the 2nd and 3rd loops.
1053 for _ in range(3):
1054 arr = self.Array('i', size)
1055 self.assertEqual(len(arr), size)
1056 self.assertEqual(list(arr), [0] * size)
1057 arr[:] = range(10)
1058 self.assertEqual(list(arr), range(10))
1059 del arr
1060
1061 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001062 def test_rawarray(self):
1063 self.test_array(raw=True)
1064
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001065 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001066 def test_array_accepts_long(self):
1067 arr = self.Array('i', 10L)
1068 self.assertEqual(len(arr), 10)
1069 raw_arr = self.RawArray('i', 10L)
1070 self.assertEqual(len(raw_arr), 10)
1071
1072 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001073 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001074 arr1 = self.Array('i', range(10))
1075 lock1 = arr1.get_lock()
1076 obj1 = arr1.get_obj()
1077
1078 arr2 = self.Array('i', range(10), lock=None)
1079 lock2 = arr2.get_lock()
1080 obj2 = arr2.get_obj()
1081
1082 lock = self.Lock()
1083 arr3 = self.Array('i', range(10), lock=lock)
1084 lock3 = arr3.get_lock()
1085 obj3 = arr3.get_obj()
1086 self.assertEqual(lock, lock3)
1087
Jesse Noller6ab22152009-01-18 02:45:38 +00001088 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001089 self.assertFalse(hasattr(arr4, 'get_lock'))
1090 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001091 self.assertRaises(AttributeError,
1092 self.Array, 'i', range(10), lock='notalock')
1093
1094 arr5 = self.RawArray('i', range(10))
1095 self.assertFalse(hasattr(arr5, 'get_lock'))
1096 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001097
1098#
1099#
1100#
1101
1102class _TestContainers(BaseTestCase):
1103
1104 ALLOWED_TYPES = ('manager',)
1105
1106 def test_list(self):
1107 a = self.list(range(10))
1108 self.assertEqual(a[:], range(10))
1109
1110 b = self.list()
1111 self.assertEqual(b[:], [])
1112
1113 b.extend(range(5))
1114 self.assertEqual(b[:], range(5))
1115
1116 self.assertEqual(b[2], 2)
1117 self.assertEqual(b[2:10], [2,3,4])
1118
1119 b *= 2
1120 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1121
1122 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1123
1124 self.assertEqual(a[:], range(10))
1125
1126 d = [a, b]
1127 e = self.list(d)
1128 self.assertEqual(
1129 e[:],
1130 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1131 )
1132
1133 f = self.list([a])
1134 a.append('hello')
1135 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1136
1137 def test_dict(self):
1138 d = self.dict()
1139 indices = range(65, 70)
1140 for i in indices:
1141 d[i] = chr(i)
1142 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1143 self.assertEqual(sorted(d.keys()), indices)
1144 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1145 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1146
1147 def test_namespace(self):
1148 n = self.Namespace()
1149 n.name = 'Bob'
1150 n.job = 'Builder'
1151 n._hidden = 'hidden'
1152 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1153 del n.job
1154 self.assertEqual(str(n), "Namespace(name='Bob')")
1155 self.assertTrue(hasattr(n, 'name'))
1156 self.assertTrue(not hasattr(n, 'job'))
1157
1158#
1159#
1160#
1161
1162def sqr(x, wait=0.0):
1163 time.sleep(wait)
1164 return x*x
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001165
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001166def identity(x):
1167 return x
1168
1169class CountedObject(object):
1170 n_instances = 0
1171
1172 def __new__(cls):
1173 cls.n_instances += 1
1174 return object.__new__(cls)
1175
1176 def __del__(self):
1177 type(self).n_instances -= 1
1178
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001179class SayWhenError(ValueError): pass
1180
1181def exception_throwing_generator(total, when):
1182 for i in range(total):
1183 if i == when:
1184 raise SayWhenError("Somebody said when")
1185 yield i
1186
Benjamin Petersondfd79492008-06-13 19:13:39 +00001187class _TestPool(BaseTestCase):
1188
1189 def test_apply(self):
1190 papply = self.pool.apply
1191 self.assertEqual(papply(sqr, (5,)), sqr(5))
1192 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1193
1194 def test_map(self):
1195 pmap = self.pool.map
1196 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1197 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1198 map(sqr, range(100)))
1199
Richard Oudkerk21aad972013-10-28 23:02:22 +00001200 def test_map_unplicklable(self):
1201 # Issue #19425 -- failure to pickle should not cause a hang
1202 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001203 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001204 class A(object):
1205 def __reduce__(self):
1206 raise RuntimeError('cannot pickle')
1207 with self.assertRaises(RuntimeError):
1208 self.pool.map(sqr, [A()]*10)
1209
Jesse Noller7530e472009-07-16 14:23:04 +00001210 def test_map_chunksize(self):
1211 try:
1212 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1213 except multiprocessing.TimeoutError:
1214 self.fail("pool.map_async with chunksize stalled on null list")
1215
Benjamin Petersondfd79492008-06-13 19:13:39 +00001216 def test_async(self):
1217 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1218 get = TimingWrapper(res.get)
1219 self.assertEqual(get(), 49)
1220 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1221
1222 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001223 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001224 get = TimingWrapper(res.get)
1225 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1226 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1227
1228 def test_imap(self):
1229 it = self.pool.imap(sqr, range(10))
1230 self.assertEqual(list(it), map(sqr, range(10)))
1231
1232 it = self.pool.imap(sqr, range(10))
1233 for i in range(10):
1234 self.assertEqual(it.next(), i*i)
1235 self.assertRaises(StopIteration, it.next)
1236
1237 it = self.pool.imap(sqr, range(1000), chunksize=100)
1238 for i in range(1000):
1239 self.assertEqual(it.next(), i*i)
1240 self.assertRaises(StopIteration, it.next)
1241
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001242 def test_imap_handle_iterable_exception(self):
1243 if self.TYPE == 'manager':
1244 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1245
1246 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1247 for i in range(3):
1248 self.assertEqual(next(it), i*i)
1249 self.assertRaises(SayWhenError, it.next)
1250
1251 # SayWhenError seen at start of problematic chunk's results
1252 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1253 for i in range(6):
1254 self.assertEqual(next(it), i*i)
1255 self.assertRaises(SayWhenError, it.next)
1256 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1257 for i in range(4):
1258 self.assertEqual(next(it), i*i)
1259 self.assertRaises(SayWhenError, it.next)
1260
Benjamin Petersondfd79492008-06-13 19:13:39 +00001261 def test_imap_unordered(self):
1262 it = self.pool.imap_unordered(sqr, range(1000))
1263 self.assertEqual(sorted(it), map(sqr, range(1000)))
1264
1265 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1266 self.assertEqual(sorted(it), map(sqr, range(1000)))
1267
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001268 def test_imap_unordered_handle_iterable_exception(self):
1269 if self.TYPE == 'manager':
1270 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1271
1272 it = self.pool.imap_unordered(sqr,
1273 exception_throwing_generator(10, 3),
1274 1)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001275 expected_values = map(sqr, range(10))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001276 with self.assertRaises(SayWhenError):
1277 # imap_unordered makes it difficult to anticipate the SayWhenError
1278 for i in range(10):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001279 value = next(it)
1280 self.assertIn(value, expected_values)
1281 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001282
1283 it = self.pool.imap_unordered(sqr,
1284 exception_throwing_generator(20, 7),
1285 2)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001286 expected_values = map(sqr, range(20))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001287 with self.assertRaises(SayWhenError):
1288 for i in range(20):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001289 value = next(it)
1290 self.assertIn(value, expected_values)
1291 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001292
Benjamin Petersondfd79492008-06-13 19:13:39 +00001293 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001294 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1295 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1296
Benjamin Petersondfd79492008-06-13 19:13:39 +00001297 p = multiprocessing.Pool(3)
1298 self.assertEqual(3, len(p._pool))
1299 p.close()
1300 p.join()
1301
1302 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001303 p = self.Pool(4)
1304 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001305 time.sleep, [0.1 for i in range(10000)], chunksize=1
1306 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001307 p.terminate()
1308 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001309 join()
1310 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001311
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001312 def test_empty_iterable(self):
1313 # See Issue 12157
1314 p = self.Pool(1)
1315
1316 self.assertEqual(p.map(sqr, []), [])
1317 self.assertEqual(list(p.imap(sqr, [])), [])
1318 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1319 self.assertEqual(p.map_async(sqr, []).get(), [])
1320
1321 p.close()
1322 p.join()
1323
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001324 def test_release_task_refs(self):
1325 # Issue #29861: task arguments and results should not be kept
1326 # alive after we are done with them.
1327 objs = list(CountedObject() for i in range(10))
1328 refs = list(weakref.ref(o) for o in objs)
1329 self.pool.map(identity, objs)
1330
1331 del objs
Victor Stinnerfd6094c2017-05-05 09:47:11 +02001332 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001333 self.assertEqual(set(wr() for wr in refs), {None})
1334 # With a process pool, copies of the objects are returned, check
1335 # they were released too.
1336 self.assertEqual(CountedObject.n_instances, 0)
1337
1338
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001339def unpickleable_result():
1340 return lambda: 42
1341
1342class _TestPoolWorkerErrors(BaseTestCase):
1343 ALLOWED_TYPES = ('processes', )
1344
1345 def test_unpickleable_result(self):
1346 from multiprocessing.pool import MaybeEncodingError
1347 p = multiprocessing.Pool(2)
1348
1349 # Make sure we don't lose pool processes because of encoding errors.
1350 for iteration in range(20):
1351 res = p.apply_async(unpickleable_result)
1352 self.assertRaises(MaybeEncodingError, res.get)
1353
1354 p.close()
1355 p.join()
1356
Jesse Noller654ade32010-01-27 03:05:57 +00001357class _TestPoolWorkerLifetime(BaseTestCase):
1358
1359 ALLOWED_TYPES = ('processes', )
1360 def test_pool_worker_lifetime(self):
1361 p = multiprocessing.Pool(3, maxtasksperchild=10)
1362 self.assertEqual(3, len(p._pool))
1363 origworkerpids = [w.pid for w in p._pool]
1364 # Run many tasks so each worker gets replaced (hopefully)
1365 results = []
1366 for i in range(100):
1367 results.append(p.apply_async(sqr, (i, )))
1368 # Fetch the results and verify we got the right answers,
1369 # also ensuring all the tasks have completed.
1370 for (j, res) in enumerate(results):
1371 self.assertEqual(res.get(), sqr(j))
1372 # Refill the pool
1373 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001374 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001375 # (countdown * DELTA = 5 seconds max startup process time)
1376 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001377 while countdown and not all(w.is_alive() for w in p._pool):
1378 countdown -= 1
1379 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001380 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001381 # All pids should be assigned. See issue #7805.
1382 self.assertNotIn(None, origworkerpids)
1383 self.assertNotIn(None, finalworkerpids)
1384 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001385 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1386 p.close()
1387 p.join()
1388
Charles-François Natali46f990e2011-10-24 18:43:51 +02001389 def test_pool_worker_lifetime_early_close(self):
1390 # Issue #10332: closing a pool whose workers have limited lifetimes
1391 # before all the tasks completed would make join() hang.
1392 p = multiprocessing.Pool(3, maxtasksperchild=1)
1393 results = []
1394 for i in range(6):
1395 results.append(p.apply_async(sqr, (i, 0.3)))
1396 p.close()
1397 p.join()
1398 # check the results
1399 for (j, res) in enumerate(results):
1400 self.assertEqual(res.get(), sqr(j))
1401
1402
Benjamin Petersondfd79492008-06-13 19:13:39 +00001403#
1404# Test that manager has expected number of shared objects left
1405#
1406
1407class _TestZZZNumberOfObjects(BaseTestCase):
1408 # Because test cases are sorted alphabetically, this one will get
1409 # run after all the other tests for the manager. It tests that
1410 # there have been no "reference leaks" for the manager's shared
1411 # objects. Note the comment in _TestPool.test_terminate().
1412 ALLOWED_TYPES = ('manager',)
1413
1414 def test_number_of_objects(self):
1415 EXPECTED_NUMBER = 1 # the pool object is still alive
1416 multiprocessing.active_children() # discard dead process objs
1417 gc.collect() # do garbage collection
1418 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001419 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001420 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001421 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001422 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001423
1424 self.assertEqual(refs, EXPECTED_NUMBER)
1425
1426#
1427# Test of creating a customized manager class
1428#
1429
1430from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1431
1432class FooBar(object):
1433 def f(self):
1434 return 'f()'
1435 def g(self):
1436 raise ValueError
1437 def _h(self):
1438 return '_h()'
1439
1440def baz():
1441 for i in xrange(10):
1442 yield i*i
1443
1444class IteratorProxy(BaseProxy):
1445 _exposed_ = ('next', '__next__')
1446 def __iter__(self):
1447 return self
1448 def next(self):
1449 return self._callmethod('next')
1450 def __next__(self):
1451 return self._callmethod('__next__')
1452
1453class MyManager(BaseManager):
1454 pass
1455
1456MyManager.register('Foo', callable=FooBar)
1457MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1458MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1459
1460
1461class _TestMyManager(BaseTestCase):
1462
1463 ALLOWED_TYPES = ('manager',)
1464
1465 def test_mymanager(self):
1466 manager = MyManager()
1467 manager.start()
1468
1469 foo = manager.Foo()
1470 bar = manager.Bar()
1471 baz = manager.baz()
1472
1473 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1474 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1475
1476 self.assertEqual(foo_methods, ['f', 'g'])
1477 self.assertEqual(bar_methods, ['f', '_h'])
1478
1479 self.assertEqual(foo.f(), 'f()')
1480 self.assertRaises(ValueError, foo.g)
1481 self.assertEqual(foo._callmethod('f'), 'f()')
1482 self.assertRaises(RemoteError, foo._callmethod, '_h')
1483
1484 self.assertEqual(bar.f(), 'f()')
1485 self.assertEqual(bar._h(), '_h()')
1486 self.assertEqual(bar._callmethod('f'), 'f()')
1487 self.assertEqual(bar._callmethod('_h'), '_h()')
1488
1489 self.assertEqual(list(baz), [i*i for i in range(10)])
1490
1491 manager.shutdown()
1492
1493#
1494# Test of connecting to a remote server and using xmlrpclib for serialization
1495#
1496
1497_queue = Queue.Queue()
1498def get_queue():
1499 return _queue
1500
1501class QueueManager(BaseManager):
1502 '''manager class used by server process'''
1503QueueManager.register('get_queue', callable=get_queue)
1504
1505class QueueManager2(BaseManager):
1506 '''manager class which specifies the same interface as QueueManager'''
1507QueueManager2.register('get_queue')
1508
1509
1510SERIALIZER = 'xmlrpclib'
1511
1512class _TestRemoteManager(BaseTestCase):
1513
1514 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001515 values = ['hello world', None, True, 2.25,
1516 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1517 ]
1518 result = values[:]
1519 if test_support.have_unicode:
1520 #result[-1] = u'hall\xe5 v\xe4rlden'
1521 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1522 r'\u0441\u0432\u0456\u0442')
1523 values.append(uvalue)
1524 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001525
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001526 @classmethod
1527 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001528 manager = QueueManager2(
1529 address=address, authkey=authkey, serializer=SERIALIZER
1530 )
1531 manager.connect()
1532 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001533 # Note that xmlrpclib will deserialize object as a list not a tuple
1534 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001535
1536 def test_remote(self):
1537 authkey = os.urandom(32)
1538
1539 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001540 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001541 )
1542 manager.start()
1543
1544 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001545 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001546 p.start()
1547
1548 manager2 = QueueManager2(
1549 address=manager.address, authkey=authkey, serializer=SERIALIZER
1550 )
1551 manager2.connect()
1552 queue = manager2.get_queue()
1553
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001554 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001555
1556 # Because we are using xmlrpclib for serialization instead of
1557 # pickle this will cause a serialization error.
1558 self.assertRaises(Exception, queue.put, time.sleep)
1559
1560 # Make queue finalizer run before the server is stopped
1561 del queue
1562 manager.shutdown()
1563
Jesse Noller459a6482009-03-30 15:50:42 +00001564class _TestManagerRestart(BaseTestCase):
1565
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001566 @classmethod
1567 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001568 manager = QueueManager(
1569 address=address, authkey=authkey, serializer=SERIALIZER)
1570 manager.connect()
1571 queue = manager.get_queue()
1572 queue.put('hello world')
1573
1574 def test_rapid_restart(self):
1575 authkey = os.urandom(32)
1576 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001577 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001578 srvr = manager.get_server()
1579 addr = srvr.address
1580 # Close the connection.Listener socket which gets opened as a part
1581 # of manager.get_server(). It's not needed for the test.
1582 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001583 manager.start()
1584
1585 p = self.Process(target=self._putter, args=(manager.address, authkey))
1586 p.start()
Victor Stinner883520a2017-08-16 13:14:40 +02001587 p.join()
Jesse Noller459a6482009-03-30 15:50:42 +00001588 queue = manager.get_queue()
1589 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001590 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001591 manager.shutdown()
Victor Stinner883520a2017-08-16 13:14:40 +02001592
Jesse Noller459a6482009-03-30 15:50:42 +00001593 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001594 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001595 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001596 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001597
Benjamin Petersondfd79492008-06-13 19:13:39 +00001598#
1599#
1600#
1601
1602SENTINEL = latin('')
1603
1604class _TestConnection(BaseTestCase):
1605
1606 ALLOWED_TYPES = ('processes', 'threads')
1607
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001608 @classmethod
1609 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001610 for msg in iter(conn.recv_bytes, SENTINEL):
1611 conn.send_bytes(msg)
1612 conn.close()
1613
1614 def test_connection(self):
1615 conn, child_conn = self.Pipe()
1616
1617 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001618 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001619 p.start()
1620
1621 seq = [1, 2.25, None]
1622 msg = latin('hello world')
1623 longmsg = msg * 10
1624 arr = array.array('i', range(4))
1625
1626 if self.TYPE == 'processes':
1627 self.assertEqual(type(conn.fileno()), int)
1628
1629 self.assertEqual(conn.send(seq), None)
1630 self.assertEqual(conn.recv(), seq)
1631
1632 self.assertEqual(conn.send_bytes(msg), None)
1633 self.assertEqual(conn.recv_bytes(), msg)
1634
1635 if self.TYPE == 'processes':
1636 buffer = array.array('i', [0]*10)
1637 expected = list(arr) + [0] * (10 - len(arr))
1638 self.assertEqual(conn.send_bytes(arr), None)
1639 self.assertEqual(conn.recv_bytes_into(buffer),
1640 len(arr) * buffer.itemsize)
1641 self.assertEqual(list(buffer), expected)
1642
1643 buffer = array.array('i', [0]*10)
1644 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1645 self.assertEqual(conn.send_bytes(arr), None)
1646 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1647 len(arr) * buffer.itemsize)
1648 self.assertEqual(list(buffer), expected)
1649
1650 buffer = bytearray(latin(' ' * 40))
1651 self.assertEqual(conn.send_bytes(longmsg), None)
1652 try:
1653 res = conn.recv_bytes_into(buffer)
1654 except multiprocessing.BufferTooShort, e:
1655 self.assertEqual(e.args, (longmsg,))
1656 else:
1657 self.fail('expected BufferTooShort, got %s' % res)
1658
1659 poll = TimingWrapper(conn.poll)
1660
1661 self.assertEqual(poll(), False)
1662 self.assertTimingAlmostEqual(poll.elapsed, 0)
1663
1664 self.assertEqual(poll(TIMEOUT1), False)
1665 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1666
1667 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001668 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001669
1670 self.assertEqual(poll(TIMEOUT1), True)
1671 self.assertTimingAlmostEqual(poll.elapsed, 0)
1672
1673 self.assertEqual(conn.recv(), None)
1674
1675 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1676 conn.send_bytes(really_big_msg)
1677 self.assertEqual(conn.recv_bytes(), really_big_msg)
1678
1679 conn.send_bytes(SENTINEL) # tell child to quit
1680 child_conn.close()
1681
1682 if self.TYPE == 'processes':
1683 self.assertEqual(conn.readable, True)
1684 self.assertEqual(conn.writable, True)
1685 self.assertRaises(EOFError, conn.recv)
1686 self.assertRaises(EOFError, conn.recv_bytes)
1687
1688 p.join()
1689
1690 def test_duplex_false(self):
1691 reader, writer = self.Pipe(duplex=False)
1692 self.assertEqual(writer.send(1), None)
1693 self.assertEqual(reader.recv(), 1)
1694 if self.TYPE == 'processes':
1695 self.assertEqual(reader.readable, True)
1696 self.assertEqual(reader.writable, False)
1697 self.assertEqual(writer.readable, False)
1698 self.assertEqual(writer.writable, True)
1699 self.assertRaises(IOError, reader.send, 2)
1700 self.assertRaises(IOError, writer.recv)
1701 self.assertRaises(IOError, writer.poll)
1702
1703 def test_spawn_close(self):
1704 # We test that a pipe connection can be closed by parent
1705 # process immediately after child is spawned. On Windows this
1706 # would have sometimes failed on old versions because
1707 # child_conn would be closed before the child got a chance to
1708 # duplicate it.
1709 conn, child_conn = self.Pipe()
1710
1711 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001712 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001713 p.start()
1714 child_conn.close() # this might complete before child initializes
1715
1716 msg = latin('hello')
1717 conn.send_bytes(msg)
1718 self.assertEqual(conn.recv_bytes(), msg)
1719
1720 conn.send_bytes(SENTINEL)
1721 conn.close()
1722 p.join()
1723
1724 def test_sendbytes(self):
1725 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001726 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001727
1728 msg = latin('abcdefghijklmnopqrstuvwxyz')
1729 a, b = self.Pipe()
1730
1731 a.send_bytes(msg)
1732 self.assertEqual(b.recv_bytes(), msg)
1733
1734 a.send_bytes(msg, 5)
1735 self.assertEqual(b.recv_bytes(), msg[5:])
1736
1737 a.send_bytes(msg, 7, 8)
1738 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1739
1740 a.send_bytes(msg, 26)
1741 self.assertEqual(b.recv_bytes(), latin(''))
1742
1743 a.send_bytes(msg, 26, 0)
1744 self.assertEqual(b.recv_bytes(), latin(''))
1745
1746 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1747
1748 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1749
1750 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1751
1752 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1753
1754 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1755
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001756 @classmethod
1757 def _is_fd_assigned(cls, fd):
1758 try:
1759 os.fstat(fd)
1760 except OSError as e:
1761 if e.errno == errno.EBADF:
1762 return False
1763 raise
1764 else:
1765 return True
1766
1767 @classmethod
1768 def _writefd(cls, conn, data, create_dummy_fds=False):
1769 if create_dummy_fds:
1770 for i in range(0, 256):
1771 if not cls._is_fd_assigned(i):
1772 os.dup2(conn.fileno(), i)
1773 fd = reduction.recv_handle(conn)
1774 if msvcrt:
1775 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1776 os.write(fd, data)
1777 os.close(fd)
1778
Charles-François Natalif8413b22011-09-21 18:44:49 +02001779 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001780 def test_fd_transfer(self):
1781 if self.TYPE != 'processes':
1782 self.skipTest("only makes sense with processes")
1783 conn, child_conn = self.Pipe(duplex=True)
1784
1785 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001786 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001787 p.start()
1788 with open(test_support.TESTFN, "wb") as f:
1789 fd = f.fileno()
1790 if msvcrt:
1791 fd = msvcrt.get_osfhandle(fd)
1792 reduction.send_handle(conn, fd, p.pid)
1793 p.join()
1794 with open(test_support.TESTFN, "rb") as f:
1795 self.assertEqual(f.read(), b"foo")
1796
Charles-François Natalif8413b22011-09-21 18:44:49 +02001797 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001798 @unittest.skipIf(sys.platform == "win32",
1799 "test semantics don't make sense on Windows")
1800 @unittest.skipIf(MAXFD <= 256,
1801 "largest assignable fd number is too small")
1802 @unittest.skipUnless(hasattr(os, "dup2"),
1803 "test needs os.dup2()")
1804 def test_large_fd_transfer(self):
1805 # With fd > 256 (issue #11657)
1806 if self.TYPE != 'processes':
1807 self.skipTest("only makes sense with processes")
1808 conn, child_conn = self.Pipe(duplex=True)
1809
1810 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001811 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001812 p.start()
1813 with open(test_support.TESTFN, "wb") as f:
1814 fd = f.fileno()
1815 for newfd in range(256, MAXFD):
1816 if not self._is_fd_assigned(newfd):
1817 break
1818 else:
1819 self.fail("could not find an unassigned large file descriptor")
1820 os.dup2(fd, newfd)
1821 try:
1822 reduction.send_handle(conn, newfd, p.pid)
1823 finally:
1824 os.close(newfd)
1825 p.join()
1826 with open(test_support.TESTFN, "rb") as f:
1827 self.assertEqual(f.read(), b"bar")
1828
Jesus Ceac23484b2011-09-21 03:47:39 +02001829 @classmethod
1830 def _send_data_without_fd(self, conn):
1831 os.write(conn.fileno(), b"\0")
1832
Charles-François Natalif8413b22011-09-21 18:44:49 +02001833 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001834 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1835 def test_missing_fd_transfer(self):
1836 # Check that exception is raised when received data is not
1837 # accompanied by a file descriptor in ancillary data.
1838 if self.TYPE != 'processes':
1839 self.skipTest("only makes sense with processes")
1840 conn, child_conn = self.Pipe(duplex=True)
1841
1842 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1843 p.daemon = True
1844 p.start()
1845 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1846 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001847
Benjamin Petersondfd79492008-06-13 19:13:39 +00001848class _TestListenerClient(BaseTestCase):
1849
1850 ALLOWED_TYPES = ('processes', 'threads')
1851
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001852 @classmethod
1853 def _test(cls, address):
1854 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001855 conn.send('hello')
1856 conn.close()
1857
1858 def test_listener_client(self):
1859 for family in self.connection.families:
1860 l = self.connection.Listener(family=family)
1861 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001862 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001863 p.start()
1864 conn = l.accept()
1865 self.assertEqual(conn.recv(), 'hello')
1866 p.join()
1867 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001868
1869 def test_issue14725(self):
1870 l = self.connection.Listener()
1871 p = self.Process(target=self._test, args=(l.address,))
1872 p.daemon = True
1873 p.start()
1874 time.sleep(1)
1875 # On Windows the client process should by now have connected,
1876 # written data and closed the pipe handle by now. This causes
1877 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1878 # 14725.
1879 conn = l.accept()
1880 self.assertEqual(conn.recv(), 'hello')
1881 conn.close()
1882 p.join()
1883 l.close()
1884
Benjamin Petersondfd79492008-06-13 19:13:39 +00001885#
1886# Test of sending connection and socket objects between processes
1887#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001888"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001889class _TestPicklingConnections(BaseTestCase):
1890
1891 ALLOWED_TYPES = ('processes',)
1892
1893 def _listener(self, conn, families):
1894 for fam in families:
1895 l = self.connection.Listener(family=fam)
1896 conn.send(l.address)
1897 new_conn = l.accept()
1898 conn.send(new_conn)
1899
1900 if self.TYPE == 'processes':
1901 l = socket.socket()
1902 l.bind(('localhost', 0))
1903 conn.send(l.getsockname())
1904 l.listen(1)
1905 new_conn, addr = l.accept()
1906 conn.send(new_conn)
1907
1908 conn.recv()
1909
1910 def _remote(self, conn):
1911 for (address, msg) in iter(conn.recv, None):
1912 client = self.connection.Client(address)
1913 client.send(msg.upper())
1914 client.close()
1915
1916 if self.TYPE == 'processes':
1917 address, msg = conn.recv()
1918 client = socket.socket()
1919 client.connect(address)
1920 client.sendall(msg.upper())
1921 client.close()
1922
1923 conn.close()
1924
1925 def test_pickling(self):
1926 try:
1927 multiprocessing.allow_connection_pickling()
1928 except ImportError:
1929 return
1930
1931 families = self.connection.families
1932
1933 lconn, lconn0 = self.Pipe()
1934 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001935 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001936 lp.start()
1937 lconn0.close()
1938
1939 rconn, rconn0 = self.Pipe()
1940 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001941 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001942 rp.start()
1943 rconn0.close()
1944
1945 for fam in families:
1946 msg = ('This connection uses family %s' % fam).encode('ascii')
1947 address = lconn.recv()
1948 rconn.send((address, msg))
1949 new_conn = lconn.recv()
1950 self.assertEqual(new_conn.recv(), msg.upper())
1951
1952 rconn.send(None)
1953
1954 if self.TYPE == 'processes':
1955 msg = latin('This connection uses a normal socket')
1956 address = lconn.recv()
1957 rconn.send((address, msg))
1958 if hasattr(socket, 'fromfd'):
1959 new_conn = lconn.recv()
1960 self.assertEqual(new_conn.recv(100), msg.upper())
1961 else:
1962 # XXX On Windows with Py2.6 need to backport fromfd()
1963 discard = lconn.recv_bytes()
1964
1965 lconn.send(None)
1966
1967 rconn.close()
1968 lconn.close()
1969
1970 lp.join()
1971 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001972"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001973#
1974#
1975#
1976
1977class _TestHeap(BaseTestCase):
1978
1979 ALLOWED_TYPES = ('processes',)
1980
1981 def test_heap(self):
1982 iterations = 5000
1983 maxblocks = 50
1984 blocks = []
1985
1986 # create and destroy lots of blocks of different sizes
1987 for i in xrange(iterations):
1988 size = int(random.lognormvariate(0, 1) * 1000)
1989 b = multiprocessing.heap.BufferWrapper(size)
1990 blocks.append(b)
1991 if len(blocks) > maxblocks:
1992 i = random.randrange(maxblocks)
1993 del blocks[i]
1994
1995 # get the heap object
1996 heap = multiprocessing.heap.BufferWrapper._heap
1997
1998 # verify the state of the heap
1999 all = []
2000 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002001 heap._lock.acquire()
2002 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002003 for L in heap._len_to_seq.values():
2004 for arena, start, stop in L:
2005 all.append((heap._arenas.index(arena), start, stop,
2006 stop-start, 'free'))
2007 for arena, start, stop in heap._allocated_blocks:
2008 all.append((heap._arenas.index(arena), start, stop,
2009 stop-start, 'occupied'))
2010 occupied += (stop-start)
2011
2012 all.sort()
2013
2014 for i in range(len(all)-1):
2015 (arena, start, stop) = all[i][:3]
2016 (narena, nstart, nstop) = all[i+1][:3]
2017 self.assertTrue((arena != narena and nstart == 0) or
2018 (stop == nstart))
2019
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002020 def test_free_from_gc(self):
2021 # Check that freeing of blocks by the garbage collector doesn't deadlock
2022 # (issue #12352).
2023 # Make sure the GC is enabled, and set lower collection thresholds to
2024 # make collections more frequent (and increase the probability of
2025 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02002026 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002027 gc.enable()
2028 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02002029 thresholds = gc.get_threshold()
2030 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002031 gc.set_threshold(10)
2032
2033 # perform numerous block allocations, with cyclic references to make
2034 # sure objects are collected asynchronously by the gc
2035 for i in range(5000):
2036 a = multiprocessing.heap.BufferWrapper(1)
2037 b = multiprocessing.heap.BufferWrapper(1)
2038 # circular references
2039 a.buddy = b
2040 b.buddy = a
2041
Benjamin Petersondfd79492008-06-13 19:13:39 +00002042#
2043#
2044#
2045
Benjamin Petersondfd79492008-06-13 19:13:39 +00002046class _Foo(Structure):
2047 _fields_ = [
2048 ('x', c_int),
2049 ('y', c_double)
2050 ]
2051
2052class _TestSharedCTypes(BaseTestCase):
2053
2054 ALLOWED_TYPES = ('processes',)
2055
Antoine Pitrou55d935a2010-11-22 16:35:57 +00002056 def setUp(self):
2057 if not HAS_SHAREDCTYPES:
2058 self.skipTest("requires multiprocessing.sharedctypes")
2059
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002060 @classmethod
2061 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002062 x.value *= 2
2063 y.value *= 2
2064 foo.x *= 2
2065 foo.y *= 2
2066 string.value *= 2
2067 for i in range(len(arr)):
2068 arr[i] *= 2
2069
2070 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002071 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002072 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002073 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002074 arr = self.Array('d', range(10), lock=lock)
2075 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00002076 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002077
2078 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002079 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002080 p.start()
2081 p.join()
2082
2083 self.assertEqual(x.value, 14)
2084 self.assertAlmostEqual(y.value, 2.0/3.0)
2085 self.assertEqual(foo.x, 6)
2086 self.assertAlmostEqual(foo.y, 4.0)
2087 for i in range(10):
2088 self.assertAlmostEqual(arr[i], i*2)
2089 self.assertEqual(string.value, latin('hellohello'))
2090
2091 def test_synchronize(self):
2092 self.test_sharedctypes(lock=True)
2093
2094 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002095 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00002096 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002097 foo.x = 0
2098 foo.y = 0
2099 self.assertEqual(bar.x, 2)
2100 self.assertAlmostEqual(bar.y, 5.0)
2101
2102#
2103#
2104#
2105
2106class _TestFinalize(BaseTestCase):
2107
2108 ALLOWED_TYPES = ('processes',)
2109
Antoine Pitroud09f1672017-06-13 17:52:29 +02002110 def setUp(self):
2111 self.registry_backup = util._finalizer_registry.copy()
2112 util._finalizer_registry.clear()
2113
2114 def tearDown(self):
2115 self.assertFalse(util._finalizer_registry)
2116 util._finalizer_registry.update(self.registry_backup)
2117
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002118 @classmethod
2119 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002120 class Foo(object):
2121 pass
2122
2123 a = Foo()
2124 util.Finalize(a, conn.send, args=('a',))
2125 del a # triggers callback for a
2126
2127 b = Foo()
2128 close_b = util.Finalize(b, conn.send, args=('b',))
2129 close_b() # triggers callback for b
2130 close_b() # does nothing because callback has already been called
2131 del b # does nothing because callback has already been called
2132
2133 c = Foo()
2134 util.Finalize(c, conn.send, args=('c',))
2135
2136 d10 = Foo()
2137 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2138
2139 d01 = Foo()
2140 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2141 d02 = Foo()
2142 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2143 d03 = Foo()
2144 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2145
2146 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2147
2148 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2149
Ezio Melottic2077b02011-03-16 12:34:31 +02002150 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002151 # garbage collecting locals
2152 util._exit_function()
2153 conn.close()
2154 os._exit(0)
2155
2156 def test_finalize(self):
2157 conn, child_conn = self.Pipe()
2158
2159 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002160 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002161 p.start()
2162 p.join()
2163
2164 result = [obj for obj in iter(conn.recv, 'STOP')]
2165 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2166
Antoine Pitroud09f1672017-06-13 17:52:29 +02002167 def test_thread_safety(self):
2168 # bpo-24484: _run_finalizers() should be thread-safe
2169 def cb():
2170 pass
2171
2172 class Foo(object):
2173 def __init__(self):
2174 self.ref = self # create reference cycle
2175 # insert finalizer at random key
2176 util.Finalize(self, cb, exitpriority=random.randint(1, 100))
2177
2178 finish = False
2179 exc = []
2180
2181 def run_finalizers():
2182 while not finish:
2183 time.sleep(random.random() * 1e-1)
2184 try:
2185 # A GC run will eventually happen during this,
2186 # collecting stale Foo's and mutating the registry
2187 util._run_finalizers()
2188 except Exception as e:
2189 exc.append(e)
2190
2191 def make_finalizers():
2192 d = {}
2193 while not finish:
2194 try:
2195 # Old Foo's get gradually replaced and later
2196 # collected by the GC (because of the cyclic ref)
2197 d[random.getrandbits(5)] = {Foo() for i in range(10)}
2198 except Exception as e:
2199 exc.append(e)
2200 d.clear()
2201
2202 old_interval = sys.getcheckinterval()
2203 old_threshold = gc.get_threshold()
2204 try:
2205 sys.setcheckinterval(10)
2206 gc.set_threshold(5, 5, 5)
2207 threads = [threading.Thread(target=run_finalizers),
2208 threading.Thread(target=make_finalizers)]
2209 with test_support.start_threads(threads):
2210 time.sleep(4.0) # Wait a bit to trigger race condition
2211 finish = True
2212 if exc:
2213 raise exc[0]
2214 finally:
2215 sys.setcheckinterval(old_interval)
2216 gc.set_threshold(*old_threshold)
2217 gc.collect() # Collect remaining Foo's
2218
2219
Benjamin Petersondfd79492008-06-13 19:13:39 +00002220#
2221# Test that from ... import * works for each module
2222#
2223
2224class _TestImportStar(BaseTestCase):
2225
2226 ALLOWED_TYPES = ('processes',)
2227
2228 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002229 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002230 'multiprocessing', 'multiprocessing.connection',
2231 'multiprocessing.heap', 'multiprocessing.managers',
2232 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002233 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002234 ]
2235
Charles-François Natalif8413b22011-09-21 18:44:49 +02002236 if HAS_REDUCTION:
2237 modules.append('multiprocessing.reduction')
2238
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002239 if c_int is not None:
2240 # This module requires _ctypes
2241 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002242
2243 for name in modules:
2244 __import__(name)
2245 mod = sys.modules[name]
2246
2247 for attr in getattr(mod, '__all__', ()):
2248 self.assertTrue(
2249 hasattr(mod, attr),
2250 '%r does not have attribute %r' % (mod, attr)
2251 )
2252
2253#
2254# Quick test that logging works -- does not test logging output
2255#
2256
2257class _TestLogging(BaseTestCase):
2258
2259 ALLOWED_TYPES = ('processes',)
2260
2261 def test_enable_logging(self):
2262 logger = multiprocessing.get_logger()
2263 logger.setLevel(util.SUBWARNING)
2264 self.assertTrue(logger is not None)
2265 logger.debug('this will not be printed')
2266 logger.info('nor will this')
2267 logger.setLevel(LOG_LEVEL)
2268
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002269 @classmethod
2270 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002271 logger = multiprocessing.get_logger()
2272 conn.send(logger.getEffectiveLevel())
2273
2274 def test_level(self):
2275 LEVEL1 = 32
2276 LEVEL2 = 37
2277
2278 logger = multiprocessing.get_logger()
2279 root_logger = logging.getLogger()
2280 root_level = root_logger.level
2281
2282 reader, writer = multiprocessing.Pipe(duplex=False)
2283
2284 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002285 p = self.Process(target=self._test_level, args=(writer,))
2286 p.daemon = True
2287 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002288 self.assertEqual(LEVEL1, reader.recv())
2289
2290 logger.setLevel(logging.NOTSET)
2291 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002292 p = self.Process(target=self._test_level, args=(writer,))
2293 p.daemon = True
2294 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002295 self.assertEqual(LEVEL2, reader.recv())
2296
2297 root_logger.setLevel(root_level)
2298 logger.setLevel(level=LOG_LEVEL)
2299
Jesse Noller814d02d2009-11-21 14:38:23 +00002300
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002301# class _TestLoggingProcessName(BaseTestCase):
2302#
2303# def handle(self, record):
2304# assert record.processName == multiprocessing.current_process().name
2305# self.__handled = True
2306#
2307# def test_logging(self):
2308# handler = logging.Handler()
2309# handler.handle = self.handle
2310# self.__handled = False
2311# # Bypass getLogger() and side-effects
2312# logger = logging.getLoggerClass()(
2313# 'multiprocessing.test.TestLoggingProcessName')
2314# logger.addHandler(handler)
2315# logger.propagate = False
2316#
2317# logger.warn('foo')
2318# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002319
Benjamin Petersondfd79492008-06-13 19:13:39 +00002320#
Richard Oudkerkba482642013-02-26 12:37:07 +00002321# Check that Process.join() retries if os.waitpid() fails with EINTR
2322#
2323
2324class _TestPollEintr(BaseTestCase):
2325
2326 ALLOWED_TYPES = ('processes',)
2327
2328 @classmethod
2329 def _killer(cls, pid):
2330 time.sleep(0.5)
2331 os.kill(pid, signal.SIGUSR1)
2332
2333 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2334 def test_poll_eintr(self):
2335 got_signal = [False]
2336 def record(*args):
2337 got_signal[0] = True
2338 pid = os.getpid()
2339 oldhandler = signal.signal(signal.SIGUSR1, record)
2340 try:
2341 killer = self.Process(target=self._killer, args=(pid,))
2342 killer.start()
2343 p = self.Process(target=time.sleep, args=(1,))
2344 p.start()
2345 p.join()
2346 self.assertTrue(got_signal[0])
2347 self.assertEqual(p.exitcode, 0)
2348 killer.join()
2349 finally:
2350 signal.signal(signal.SIGUSR1, oldhandler)
2351
2352#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002353# Test to verify handle verification, see issue 3321
2354#
2355
2356class TestInvalidHandle(unittest.TestCase):
2357
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002358 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002359 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002360 conn = _multiprocessing.Connection(44977608)
2361 self.assertRaises(IOError, conn.poll)
2362 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002363
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002364#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002365# Functions used to create test cases from the base ones in this module
2366#
2367
2368def get_attributes(Source, names):
2369 d = {}
2370 for name in names:
2371 obj = getattr(Source, name)
2372 if type(obj) == type(get_attributes):
2373 obj = staticmethod(obj)
2374 d[name] = obj
2375 return d
2376
2377def create_test_cases(Mixin, type):
2378 result = {}
2379 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002380 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002381
2382 for name in glob.keys():
2383 if name.startswith('_Test'):
2384 base = glob[name]
2385 if type in base.ALLOWED_TYPES:
2386 newname = 'With' + Type + name[1:]
2387 class Temp(base, unittest.TestCase, Mixin):
2388 pass
2389 result[newname] = Temp
2390 Temp.__name__ = newname
2391 Temp.__module__ = Mixin.__module__
2392 return result
2393
2394#
2395# Create test cases
2396#
2397
2398class ProcessesMixin(object):
2399 TYPE = 'processes'
2400 Process = multiprocessing.Process
2401 locals().update(get_attributes(multiprocessing, (
2402 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2403 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2404 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002405 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002406 )))
2407
2408testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2409globals().update(testcases_processes)
2410
2411
2412class ManagerMixin(object):
2413 TYPE = 'manager'
2414 Process = multiprocessing.Process
2415 manager = object.__new__(multiprocessing.managers.SyncManager)
2416 locals().update(get_attributes(manager, (
2417 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2418 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002419 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002420 )))
2421
2422testcases_manager = create_test_cases(ManagerMixin, type='manager')
2423globals().update(testcases_manager)
2424
2425
2426class ThreadsMixin(object):
2427 TYPE = 'threads'
2428 Process = multiprocessing.dummy.Process
2429 locals().update(get_attributes(multiprocessing.dummy, (
2430 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2431 'Condition', 'Event', 'Value', 'Array', 'current_process',
2432 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002433 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002434 )))
2435
2436testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2437globals().update(testcases_threads)
2438
Neal Norwitz0c519b32008-08-25 01:50:24 +00002439class OtherTest(unittest.TestCase):
2440 # TODO: add more tests for deliver/answer challenge.
2441 def test_deliver_challenge_auth_failure(self):
2442 class _FakeConnection(object):
2443 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002444 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002445 def send_bytes(self, data):
2446 pass
2447 self.assertRaises(multiprocessing.AuthenticationError,
2448 multiprocessing.connection.deliver_challenge,
2449 _FakeConnection(), b'abc')
2450
2451 def test_answer_challenge_auth_failure(self):
2452 class _FakeConnection(object):
2453 def __init__(self):
2454 self.count = 0
2455 def recv_bytes(self, size):
2456 self.count += 1
2457 if self.count == 1:
2458 return multiprocessing.connection.CHALLENGE
2459 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002460 return b'something bogus'
2461 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002462 def send_bytes(self, data):
2463 pass
2464 self.assertRaises(multiprocessing.AuthenticationError,
2465 multiprocessing.connection.answer_challenge,
2466 _FakeConnection(), b'abc')
2467
Jesse Noller7152f6d2009-04-02 05:17:26 +00002468#
2469# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2470#
2471
2472def initializer(ns):
2473 ns.test += 1
2474
2475class TestInitializers(unittest.TestCase):
2476 def setUp(self):
2477 self.mgr = multiprocessing.Manager()
2478 self.ns = self.mgr.Namespace()
2479 self.ns.test = 0
2480
2481 def tearDown(self):
2482 self.mgr.shutdown()
2483
2484 def test_manager_initializer(self):
2485 m = multiprocessing.managers.SyncManager()
2486 self.assertRaises(TypeError, m.start, 1)
2487 m.start(initializer, (self.ns,))
2488 self.assertEqual(self.ns.test, 1)
2489 m.shutdown()
2490
2491 def test_pool_initializer(self):
2492 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2493 p = multiprocessing.Pool(1, initializer, (self.ns,))
2494 p.close()
2495 p.join()
2496 self.assertEqual(self.ns.test, 1)
2497
Jesse Noller1b90efb2009-06-30 17:11:52 +00002498#
2499# Issue 5155, 5313, 5331: Test process in processes
2500# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2501#
2502
Richard Oudkerkc5496072013-09-29 17:10:40 +01002503def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002504 try:
2505 item = q.get(block=False)
2506 except Queue.Empty:
2507 pass
2508
Richard Oudkerkc5496072013-09-29 17:10:40 +01002509def _test_process(q):
2510 queue = multiprocessing.Queue()
2511 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2512 subProc.daemon = True
2513 subProc.start()
2514 subProc.join()
2515
Jesse Noller1b90efb2009-06-30 17:11:52 +00002516def _afunc(x):
2517 return x*x
2518
2519def pool_in_process():
2520 pool = multiprocessing.Pool(processes=4)
2521 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2522
2523class _file_like(object):
2524 def __init__(self, delegate):
2525 self._delegate = delegate
2526 self._pid = None
2527
2528 @property
2529 def cache(self):
2530 pid = os.getpid()
2531 # There are no race conditions since fork keeps only the running thread
2532 if pid != self._pid:
2533 self._pid = pid
2534 self._cache = []
2535 return self._cache
2536
2537 def write(self, data):
2538 self.cache.append(data)
2539
2540 def flush(self):
2541 self._delegate.write(''.join(self.cache))
2542 self._cache = []
2543
2544class TestStdinBadfiledescriptor(unittest.TestCase):
2545
2546 def test_queue_in_process(self):
2547 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002548 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002549 proc.start()
2550 proc.join()
2551
2552 def test_pool_in_process(self):
2553 p = multiprocessing.Process(target=pool_in_process)
2554 p.start()
2555 p.join()
2556
2557 def test_flushing(self):
2558 sio = StringIO()
2559 flike = _file_like(sio)
2560 flike.write('foo')
2561 proc = multiprocessing.Process(target=lambda: flike.flush())
2562 flike.flush()
2563 assert sio.getvalue() == 'foo'
2564
Richard Oudkerke4b99382012-07-27 14:05:46 +01002565#
2566# Test interaction with socket timeouts - see Issue #6056
2567#
2568
2569class TestTimeouts(unittest.TestCase):
2570 @classmethod
2571 def _test_timeout(cls, child, address):
2572 time.sleep(1)
2573 child.send(123)
2574 child.close()
2575 conn = multiprocessing.connection.Client(address)
2576 conn.send(456)
2577 conn.close()
2578
2579 def test_timeout(self):
2580 old_timeout = socket.getdefaulttimeout()
2581 try:
2582 socket.setdefaulttimeout(0.1)
2583 parent, child = multiprocessing.Pipe(duplex=True)
2584 l = multiprocessing.connection.Listener(family='AF_INET')
2585 p = multiprocessing.Process(target=self._test_timeout,
2586 args=(child, l.address))
2587 p.start()
2588 child.close()
2589 self.assertEqual(parent.recv(), 123)
2590 parent.close()
2591 conn = l.accept()
2592 self.assertEqual(conn.recv(), 456)
2593 conn.close()
2594 l.close()
2595 p.join(10)
2596 finally:
2597 socket.setdefaulttimeout(old_timeout)
2598
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002599#
2600# Test what happens with no "if __name__ == '__main__'"
2601#
2602
2603class TestNoForkBomb(unittest.TestCase):
2604 def test_noforkbomb(self):
2605 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2606 if WIN32:
2607 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002608 self.assertEqual(out, '')
2609 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002610 else:
2611 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002612 self.assertEqual(out.rstrip(), '123')
2613 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002614
2615#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002616# Issue 12098: check sys.flags of child matches that for parent
2617#
2618
2619class TestFlags(unittest.TestCase):
2620 @classmethod
2621 def run_in_grandchild(cls, conn):
2622 conn.send(tuple(sys.flags))
2623
2624 @classmethod
2625 def run_in_child(cls):
2626 import json
2627 r, w = multiprocessing.Pipe(duplex=False)
2628 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2629 p.start()
2630 grandchild_flags = r.recv()
2631 p.join()
2632 r.close()
2633 w.close()
2634 flags = (tuple(sys.flags), grandchild_flags)
2635 print(json.dumps(flags))
2636
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002637 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002638 def test_flags(self):
2639 import json, subprocess
2640 # start child process using unusual flags
2641 prog = ('from test.test_multiprocessing import TestFlags; ' +
2642 'TestFlags.run_in_child()')
2643 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002644 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002645 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2646 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002647
2648#
2649# Issue #17555: ForkAwareThreadLock
2650#
2651
2652class TestForkAwareThreadLock(unittest.TestCase):
2653 # We recurisvely start processes. Issue #17555 meant that the
2654 # after fork registry would get duplicate entries for the same
2655 # lock. The size of the registry at generation n was ~2**n.
2656
2657 @classmethod
2658 def child(cls, n, conn):
2659 if n > 1:
2660 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2661 p.start()
2662 p.join()
2663 else:
2664 conn.send(len(util._afterfork_registry))
2665 conn.close()
2666
2667 def test_lock(self):
2668 r, w = multiprocessing.Pipe(False)
2669 l = util.ForkAwareThreadLock()
2670 old_size = len(util._afterfork_registry)
2671 p = multiprocessing.Process(target=self.child, args=(5, w))
2672 p.start()
2673 new_size = r.recv()
2674 p.join()
2675 self.assertLessEqual(new_size, old_size)
2676
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002677#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002678# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2679#
2680
2681class TestIgnoreEINTR(unittest.TestCase):
2682
2683 @classmethod
2684 def _test_ignore(cls, conn):
2685 def handler(signum, frame):
2686 pass
2687 signal.signal(signal.SIGUSR1, handler)
2688 conn.send('ready')
2689 x = conn.recv()
2690 conn.send(x)
2691 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2692
2693 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2694 def test_ignore(self):
2695 conn, child_conn = multiprocessing.Pipe()
2696 try:
2697 p = multiprocessing.Process(target=self._test_ignore,
2698 args=(child_conn,))
2699 p.daemon = True
2700 p.start()
2701 child_conn.close()
2702 self.assertEqual(conn.recv(), 'ready')
2703 time.sleep(0.1)
2704 os.kill(p.pid, signal.SIGUSR1)
2705 time.sleep(0.1)
2706 conn.send(1234)
2707 self.assertEqual(conn.recv(), 1234)
2708 time.sleep(0.1)
2709 os.kill(p.pid, signal.SIGUSR1)
2710 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2711 time.sleep(0.1)
2712 p.join()
2713 finally:
2714 conn.close()
2715
2716 @classmethod
2717 def _test_ignore_listener(cls, conn):
2718 def handler(signum, frame):
2719 pass
2720 signal.signal(signal.SIGUSR1, handler)
2721 l = multiprocessing.connection.Listener()
2722 conn.send(l.address)
2723 a = l.accept()
2724 a.send('welcome')
2725
2726 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2727 def test_ignore_listener(self):
2728 conn, child_conn = multiprocessing.Pipe()
2729 try:
2730 p = multiprocessing.Process(target=self._test_ignore_listener,
2731 args=(child_conn,))
2732 p.daemon = True
2733 p.start()
2734 child_conn.close()
2735 address = conn.recv()
2736 time.sleep(0.1)
2737 os.kill(p.pid, signal.SIGUSR1)
2738 time.sleep(0.1)
2739 client = multiprocessing.connection.Client(address)
2740 self.assertEqual(client.recv(), 'welcome')
2741 p.join()
2742 finally:
2743 conn.close()
2744
2745#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002746#
2747#
2748
Jesse Noller1b90efb2009-06-30 17:11:52 +00002749testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002750 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002751 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002752
Benjamin Petersondfd79492008-06-13 19:13:39 +00002753#
2754#
2755#
2756
2757def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002758 if sys.platform.startswith("linux"):
2759 try:
2760 lock = multiprocessing.RLock()
2761 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002762 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002763
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002764 check_enough_semaphores()
2765
Benjamin Petersondfd79492008-06-13 19:13:39 +00002766 if run is None:
2767 from test.test_support import run_unittest as run
2768
2769 util.get_temp_dir() # creates temp directory for use by all processes
2770
2771 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2772
Jesse Noller146b7ab2008-07-02 16:44:09 +00002773 ProcessesMixin.pool = multiprocessing.Pool(4)
2774 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2775 ManagerMixin.manager.__init__()
2776 ManagerMixin.manager.start()
2777 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002778
2779 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002780 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2781 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002782 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2783 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002784 )
2785
2786 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2787 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002788 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2789 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002790 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002791 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002792 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002793 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2794 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2795 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002796 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002797
Jesse Noller146b7ab2008-07-02 16:44:09 +00002798 ThreadsMixin.pool.terminate()
2799 ProcessesMixin.pool.terminate()
2800 ManagerMixin.pool.terminate()
2801 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002802
Jesse Noller146b7ab2008-07-02 16:44:09 +00002803 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002804
2805def main():
2806 test_main(unittest.TextTestRunner(verbosity=2).run)
2807
2808if __name__ == '__main__':
2809 main()