blob: 69bd8f79a85568895bf93970787f4b553e4adbc0 [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Antoine Pitrou5084ff72017-03-24 16:03:46 +010017import weakref
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010018import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
Antoine Pitrou12536bd2017-06-28 13:48:38 +0200178class DummyCallable(object):
179 def __call__(self, q, c):
180 assert isinstance(c, DummyCallable)
181 q.put(5)
182
183
Benjamin Petersondfd79492008-06-13 19:13:39 +0000184class _TestProcess(BaseTestCase):
185
186 ALLOWED_TYPES = ('processes', 'threads')
187
188 def test_current(self):
189 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600190 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000191
192 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000194
195 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000196 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000197 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000198 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000199 self.assertEqual(current.ident, os.getpid())
200 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000201
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 @classmethod
203 def _test(cls, q, *args, **kwds):
204 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000205 q.put(args)
206 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000207 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000208 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000209 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000210 q.put(current.pid)
211
212 def test_process(self):
213 q = self.Queue(1)
214 e = self.Event()
215 args = (q, 1, 2)
216 kwargs = {'hello':23, 'bye':2.54}
217 name = 'SomeProcess'
218 p = self.Process(
219 target=self._test, args=args, kwargs=kwargs, name=name
220 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000221 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000222 current = self.current_process()
223
224 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000225 self.assertEqual(p.authkey, current.authkey)
226 self.assertEqual(p.is_alive(), False)
227 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000228 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000229 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000230 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
232 p.start()
233
Ezio Melotti2623a372010-11-21 13:34:58 +0000234 self.assertEqual(p.exitcode, None)
235 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000236 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000237
Ezio Melotti2623a372010-11-21 13:34:58 +0000238 self.assertEqual(q.get(), args[1:])
239 self.assertEqual(q.get(), kwargs)
240 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000241 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000242 self.assertEqual(q.get(), current.authkey)
243 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
245 p.join()
246
Ezio Melotti2623a372010-11-21 13:34:58 +0000247 self.assertEqual(p.exitcode, 0)
248 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000249 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000250
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000251 @classmethod
252 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000253 time.sleep(1000)
254
255 def test_terminate(self):
256 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600257 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000258
259 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000260 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000261 p.start()
262
263 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000264 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000265 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000266
267 p.terminate()
268
269 join = TimingWrapper(p.join)
270 self.assertEqual(join(), None)
271 self.assertTimingAlmostEqual(join.elapsed, 0.0)
272
273 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000274 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000275
276 p.join()
277
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000278 # XXX sometimes get p.exitcode == 0 on Windows ...
279 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000280
281 def test_cpu_count(self):
282 try:
283 cpus = multiprocessing.cpu_count()
284 except NotImplementedError:
285 cpus = 1
286 self.assertTrue(type(cpus) is int)
287 self.assertTrue(cpus >= 1)
288
289 def test_active_children(self):
290 self.assertEqual(type(self.active_children()), list)
291
292 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000293 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000294
Jesus Cea6f6016b2011-09-09 20:26:57 +0200295 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000296 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000297 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298
299 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000300 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000301
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 @classmethod
303 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 from multiprocessing import forking
305 wconn.send(id)
306 if len(id) < 2:
307 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000308 p = cls.Process(
309 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000310 )
311 p.start()
312 p.join()
313
314 def test_recursion(self):
315 rconn, wconn = self.Pipe(duplex=False)
316 self._test_recursion(wconn, [])
317
318 time.sleep(DELTA)
319 result = []
320 while rconn.poll():
321 result.append(rconn.recv())
322
323 expected = [
324 [],
325 [0],
326 [0, 0],
327 [0, 1],
328 [1],
329 [1, 0],
330 [1, 1]
331 ]
332 self.assertEqual(result, expected)
333
Richard Oudkerk2182e052012-06-06 19:01:14 +0100334 @classmethod
335 def _test_sys_exit(cls, reason, testfn):
336 sys.stderr = open(testfn, 'w')
337 sys.exit(reason)
338
339 def test_sys_exit(self):
340 # See Issue 13854
341 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600342 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100343
344 testfn = test_support.TESTFN
345 self.addCleanup(test_support.unlink, testfn)
346
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000347 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100348 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
349 p.daemon = True
350 p.start()
351 p.join(5)
352 self.assertEqual(p.exitcode, code)
353
354 with open(testfn, 'r') as f:
355 self.assertEqual(f.read().rstrip(), str(reason))
356
357 for reason in (True, False, 8):
358 p = self.Process(target=sys.exit, args=(reason,))
359 p.daemon = True
360 p.start()
361 p.join(5)
362 self.assertEqual(p.exitcode, reason)
363
Antoine Pitrou12536bd2017-06-28 13:48:38 +0200364 def test_lose_target_ref(self):
365 c = DummyCallable()
366 wr = weakref.ref(c)
367 q = self.Queue()
368 p = self.Process(target=c, args=(q, c))
369 del c
370 p.start()
371 p.join()
372 self.assertIs(wr(), None)
373 self.assertEqual(q.get(), 5)
374
375
Benjamin Petersondfd79492008-06-13 19:13:39 +0000376#
377#
378#
379
380class _UpperCaser(multiprocessing.Process):
381
382 def __init__(self):
383 multiprocessing.Process.__init__(self)
384 self.child_conn, self.parent_conn = multiprocessing.Pipe()
385
386 def run(self):
387 self.parent_conn.close()
388 for s in iter(self.child_conn.recv, None):
389 self.child_conn.send(s.upper())
390 self.child_conn.close()
391
392 def submit(self, s):
393 assert type(s) is str
394 self.parent_conn.send(s)
395 return self.parent_conn.recv()
396
397 def stop(self):
398 self.parent_conn.send(None)
399 self.parent_conn.close()
400 self.child_conn.close()
401
402class _TestSubclassingProcess(BaseTestCase):
403
404 ALLOWED_TYPES = ('processes',)
405
406 def test_subclassing(self):
407 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200408 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000409 uppercaser.start()
410 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
411 self.assertEqual(uppercaser.submit('world'), 'WORLD')
412 uppercaser.stop()
413 uppercaser.join()
414
415#
416#
417#
418
419def queue_empty(q):
420 if hasattr(q, 'empty'):
421 return q.empty()
422 else:
423 return q.qsize() == 0
424
425def queue_full(q, maxsize):
426 if hasattr(q, 'full'):
427 return q.full()
428 else:
429 return q.qsize() == maxsize
430
431
432class _TestQueue(BaseTestCase):
433
434
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000435 @classmethod
436 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000437 child_can_start.wait()
438 for i in range(6):
439 queue.get()
440 parent_can_continue.set()
441
442 def test_put(self):
443 MAXSIZE = 6
444 queue = self.Queue(maxsize=MAXSIZE)
445 child_can_start = self.Event()
446 parent_can_continue = self.Event()
447
448 proc = self.Process(
449 target=self._test_put,
450 args=(queue, child_can_start, parent_can_continue)
451 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000452 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000453 proc.start()
454
455 self.assertEqual(queue_empty(queue), True)
456 self.assertEqual(queue_full(queue, MAXSIZE), False)
457
458 queue.put(1)
459 queue.put(2, True)
460 queue.put(3, True, None)
461 queue.put(4, False)
462 queue.put(5, False, None)
463 queue.put_nowait(6)
464
465 # the values may be in buffer but not yet in pipe so sleep a bit
466 time.sleep(DELTA)
467
468 self.assertEqual(queue_empty(queue), False)
469 self.assertEqual(queue_full(queue, MAXSIZE), True)
470
471 put = TimingWrapper(queue.put)
472 put_nowait = TimingWrapper(queue.put_nowait)
473
474 self.assertRaises(Queue.Full, put, 7, False)
475 self.assertTimingAlmostEqual(put.elapsed, 0)
476
477 self.assertRaises(Queue.Full, put, 7, False, None)
478 self.assertTimingAlmostEqual(put.elapsed, 0)
479
480 self.assertRaises(Queue.Full, put_nowait, 7)
481 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
482
483 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
484 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
485
486 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
487 self.assertTimingAlmostEqual(put.elapsed, 0)
488
489 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
490 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
491
492 child_can_start.set()
493 parent_can_continue.wait()
494
495 self.assertEqual(queue_empty(queue), True)
496 self.assertEqual(queue_full(queue, MAXSIZE), False)
497
498 proc.join()
499
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000500 @classmethod
501 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000502 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000503 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000504 queue.put(2)
505 queue.put(3)
506 queue.put(4)
507 queue.put(5)
508 parent_can_continue.set()
509
510 def test_get(self):
511 queue = self.Queue()
512 child_can_start = self.Event()
513 parent_can_continue = self.Event()
514
515 proc = self.Process(
516 target=self._test_get,
517 args=(queue, child_can_start, parent_can_continue)
518 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000519 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000520 proc.start()
521
522 self.assertEqual(queue_empty(queue), True)
523
524 child_can_start.set()
525 parent_can_continue.wait()
526
527 time.sleep(DELTA)
528 self.assertEqual(queue_empty(queue), False)
529
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000530 # Hangs unexpectedly, remove for now
531 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000532 self.assertEqual(queue.get(True, None), 2)
533 self.assertEqual(queue.get(True), 3)
534 self.assertEqual(queue.get(timeout=1), 4)
535 self.assertEqual(queue.get_nowait(), 5)
536
537 self.assertEqual(queue_empty(queue), True)
538
539 get = TimingWrapper(queue.get)
540 get_nowait = TimingWrapper(queue.get_nowait)
541
542 self.assertRaises(Queue.Empty, get, False)
543 self.assertTimingAlmostEqual(get.elapsed, 0)
544
545 self.assertRaises(Queue.Empty, get, False, None)
546 self.assertTimingAlmostEqual(get.elapsed, 0)
547
548 self.assertRaises(Queue.Empty, get_nowait)
549 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
550
551 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
552 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
553
554 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
555 self.assertTimingAlmostEqual(get.elapsed, 0)
556
557 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
558 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
559
560 proc.join()
561
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000562 @classmethod
563 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000564 for i in range(10, 20):
565 queue.put(i)
566 # note that at this point the items may only be buffered, so the
567 # process cannot shutdown until the feeder thread has finished
568 # pushing items onto the pipe.
569
570 def test_fork(self):
571 # Old versions of Queue would fail to create a new feeder
572 # thread for a forked process if the original process had its
573 # own feeder thread. This test checks that this no longer
574 # happens.
575
576 queue = self.Queue()
577
578 # put items on queue so that main process starts a feeder thread
579 for i in range(10):
580 queue.put(i)
581
582 # wait to make sure thread starts before we fork a new process
583 time.sleep(DELTA)
584
585 # fork process
586 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200587 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000588 p.start()
589
590 # check that all expected items are in the queue
591 for i in range(20):
592 self.assertEqual(queue.get(), i)
593 self.assertRaises(Queue.Empty, queue.get, False)
594
595 p.join()
596
597 def test_qsize(self):
598 q = self.Queue()
599 try:
600 self.assertEqual(q.qsize(), 0)
601 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600602 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000603 q.put(1)
604 self.assertEqual(q.qsize(), 1)
605 q.put(5)
606 self.assertEqual(q.qsize(), 2)
607 q.get()
608 self.assertEqual(q.qsize(), 1)
609 q.get()
610 self.assertEqual(q.qsize(), 0)
611
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000612 @classmethod
613 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000614 for obj in iter(q.get, None):
615 time.sleep(DELTA)
616 q.task_done()
617
618 def test_task_done(self):
619 queue = self.JoinableQueue()
620
621 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000622 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000623
624 workers = [self.Process(target=self._test_task_done, args=(queue,))
625 for i in xrange(4)]
626
627 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200628 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000629 p.start()
630
631 for i in xrange(10):
632 queue.put(i)
633
634 queue.join()
635
636 for p in workers:
637 queue.put(None)
638
639 for p in workers:
640 p.join()
641
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200642 def test_no_import_lock_contention(self):
643 with test_support.temp_cwd():
644 module_name = 'imported_by_an_imported_module'
645 with open(module_name + '.py', 'w') as f:
646 f.write("""if 1:
647 import multiprocessing
648
649 q = multiprocessing.Queue()
650 q.put('knock knock')
651 q.get(timeout=3)
652 q.close()
653 """)
654
655 with test_support.DirsOnSysPath(os.getcwd()):
656 try:
657 __import__(module_name)
658 except Queue.Empty:
659 self.fail("Probable regression on import lock contention;"
660 " see Issue #22853")
661
Antoine Pitroubdd96472017-05-25 17:53:04 +0200662 def test_queue_feeder_donot_stop_onexc(self):
663 # bpo-30414: verify feeder handles exceptions correctly
664 if self.TYPE != 'processes':
665 self.skipTest('test not appropriate for {}'.format(self.TYPE))
666
667 class NotSerializable(object):
668 def __reduce__(self):
669 raise AttributeError
670 with test.support.captured_stderr():
671 q = self.Queue()
672 q.put(NotSerializable())
673 q.put(True)
674 self.assertTrue(q.get(timeout=0.1))
675
676
Benjamin Petersondfd79492008-06-13 19:13:39 +0000677#
678#
679#
680
681class _TestLock(BaseTestCase):
682
683 def test_lock(self):
684 lock = self.Lock()
685 self.assertEqual(lock.acquire(), True)
686 self.assertEqual(lock.acquire(False), False)
687 self.assertEqual(lock.release(), None)
688 self.assertRaises((ValueError, threading.ThreadError), lock.release)
689
690 def test_rlock(self):
691 lock = self.RLock()
692 self.assertEqual(lock.acquire(), True)
693 self.assertEqual(lock.acquire(), True)
694 self.assertEqual(lock.acquire(), True)
695 self.assertEqual(lock.release(), None)
696 self.assertEqual(lock.release(), None)
697 self.assertEqual(lock.release(), None)
698 self.assertRaises((AssertionError, RuntimeError), lock.release)
699
Jesse Noller82eb5902009-03-30 23:29:31 +0000700 def test_lock_context(self):
701 with self.Lock():
702 pass
703
Benjamin Petersondfd79492008-06-13 19:13:39 +0000704
705class _TestSemaphore(BaseTestCase):
706
707 def _test_semaphore(self, sem):
708 self.assertReturnsIfImplemented(2, get_value, sem)
709 self.assertEqual(sem.acquire(), True)
710 self.assertReturnsIfImplemented(1, get_value, sem)
711 self.assertEqual(sem.acquire(), True)
712 self.assertReturnsIfImplemented(0, get_value, sem)
713 self.assertEqual(sem.acquire(False), False)
714 self.assertReturnsIfImplemented(0, get_value, sem)
715 self.assertEqual(sem.release(), None)
716 self.assertReturnsIfImplemented(1, get_value, sem)
717 self.assertEqual(sem.release(), None)
718 self.assertReturnsIfImplemented(2, get_value, sem)
719
720 def test_semaphore(self):
721 sem = self.Semaphore(2)
722 self._test_semaphore(sem)
723 self.assertEqual(sem.release(), None)
724 self.assertReturnsIfImplemented(3, get_value, sem)
725 self.assertEqual(sem.release(), None)
726 self.assertReturnsIfImplemented(4, get_value, sem)
727
728 def test_bounded_semaphore(self):
729 sem = self.BoundedSemaphore(2)
730 self._test_semaphore(sem)
731 # Currently fails on OS/X
732 #if HAVE_GETVALUE:
733 # self.assertRaises(ValueError, sem.release)
734 # self.assertReturnsIfImplemented(2, get_value, sem)
735
736 def test_timeout(self):
737 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600738 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000739
740 sem = self.Semaphore(0)
741 acquire = TimingWrapper(sem.acquire)
742
743 self.assertEqual(acquire(False), False)
744 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
745
746 self.assertEqual(acquire(False, None), False)
747 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
748
749 self.assertEqual(acquire(False, TIMEOUT1), False)
750 self.assertTimingAlmostEqual(acquire.elapsed, 0)
751
752 self.assertEqual(acquire(True, TIMEOUT2), False)
753 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
754
755 self.assertEqual(acquire(timeout=TIMEOUT3), False)
756 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
757
758
759class _TestCondition(BaseTestCase):
760
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000761 @classmethod
762 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000763 cond.acquire()
764 sleeping.release()
765 cond.wait(timeout)
766 woken.release()
767 cond.release()
768
769 def check_invariant(self, cond):
770 # this is only supposed to succeed when there are no sleepers
771 if self.TYPE == 'processes':
772 try:
773 sleepers = (cond._sleeping_count.get_value() -
774 cond._woken_count.get_value())
775 self.assertEqual(sleepers, 0)
776 self.assertEqual(cond._wait_semaphore.get_value(), 0)
777 except NotImplementedError:
778 pass
779
780 def test_notify(self):
781 cond = self.Condition()
782 sleeping = self.Semaphore(0)
783 woken = self.Semaphore(0)
784
785 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000786 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000787 p.start()
788
789 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000790 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000791 p.start()
792
793 # wait for both children to start sleeping
794 sleeping.acquire()
795 sleeping.acquire()
796
797 # check no process/thread has woken up
798 time.sleep(DELTA)
799 self.assertReturnsIfImplemented(0, get_value, woken)
800
801 # wake up one process/thread
802 cond.acquire()
803 cond.notify()
804 cond.release()
805
806 # check one process/thread has woken up
807 time.sleep(DELTA)
808 self.assertReturnsIfImplemented(1, get_value, woken)
809
810 # wake up another
811 cond.acquire()
812 cond.notify()
813 cond.release()
814
815 # check other has woken up
816 time.sleep(DELTA)
817 self.assertReturnsIfImplemented(2, get_value, woken)
818
819 # check state is not mucked up
820 self.check_invariant(cond)
821 p.join()
822
823 def test_notify_all(self):
824 cond = self.Condition()
825 sleeping = self.Semaphore(0)
826 woken = self.Semaphore(0)
827
828 # start some threads/processes which will timeout
829 for i in range(3):
830 p = self.Process(target=self.f,
831 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000832 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000833 p.start()
834
835 t = threading.Thread(target=self.f,
836 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000837 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000838 t.start()
839
840 # wait for them all to sleep
841 for i in xrange(6):
842 sleeping.acquire()
843
844 # check they have all timed out
845 for i in xrange(6):
846 woken.acquire()
847 self.assertReturnsIfImplemented(0, get_value, woken)
848
849 # check state is not mucked up
850 self.check_invariant(cond)
851
852 # start some more threads/processes
853 for i in range(3):
854 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000855 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000856 p.start()
857
858 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000859 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000860 t.start()
861
862 # wait for them to all sleep
863 for i in xrange(6):
864 sleeping.acquire()
865
866 # check no process/thread has woken up
867 time.sleep(DELTA)
868 self.assertReturnsIfImplemented(0, get_value, woken)
869
870 # wake them all up
871 cond.acquire()
872 cond.notify_all()
873 cond.release()
874
875 # check they have all woken
Victor Stinner9d1983b2017-05-15 17:32:14 +0200876 for i in range(10):
877 try:
878 if get_value(woken) == 6:
879 break
880 except NotImplementedError:
881 break
882 time.sleep(DELTA)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000883 self.assertReturnsIfImplemented(6, get_value, woken)
884
885 # check state is not mucked up
886 self.check_invariant(cond)
887
888 def test_timeout(self):
889 cond = self.Condition()
890 wait = TimingWrapper(cond.wait)
891 cond.acquire()
892 res = wait(TIMEOUT1)
893 cond.release()
894 self.assertEqual(res, None)
895 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
896
897
898class _TestEvent(BaseTestCase):
899
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000900 @classmethod
901 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000902 time.sleep(TIMEOUT2)
903 event.set()
904
905 def test_event(self):
906 event = self.Event()
907 wait = TimingWrapper(event.wait)
908
Ezio Melottic2077b02011-03-16 12:34:31 +0200909 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000910 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000911 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000912
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000913 # Removed, threading.Event.wait() will return the value of the __flag
914 # instead of None. API Shear with the semaphore backed mp.Event
915 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000916 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000917 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000918 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
919
920 event.set()
921
922 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000923 self.assertEqual(event.is_set(), True)
924 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000925 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000926 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000927 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
928 # self.assertEqual(event.is_set(), True)
929
930 event.clear()
931
932 #self.assertEqual(event.is_set(), False)
933
Jesus Cea6f6016b2011-09-09 20:26:57 +0200934 p = self.Process(target=self._test_event, args=(event,))
935 p.daemon = True
936 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000937 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000938
939#
940#
941#
942
943class _TestValue(BaseTestCase):
944
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000945 ALLOWED_TYPES = ('processes',)
946
Benjamin Petersondfd79492008-06-13 19:13:39 +0000947 codes_values = [
948 ('i', 4343, 24234),
949 ('d', 3.625, -4.25),
950 ('h', -232, 234),
951 ('c', latin('x'), latin('y'))
952 ]
953
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000954 def setUp(self):
955 if not HAS_SHAREDCTYPES:
956 self.skipTest("requires multiprocessing.sharedctypes")
957
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000958 @classmethod
959 def _test(cls, values):
960 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000961 sv.value = cv[2]
962
963
964 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000965 if raw:
966 values = [self.RawValue(code, value)
967 for code, value, _ in self.codes_values]
968 else:
969 values = [self.Value(code, value)
970 for code, value, _ in self.codes_values]
971
972 for sv, cv in zip(values, self.codes_values):
973 self.assertEqual(sv.value, cv[1])
974
975 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200976 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000977 proc.start()
978 proc.join()
979
980 for sv, cv in zip(values, self.codes_values):
981 self.assertEqual(sv.value, cv[2])
982
983 def test_rawvalue(self):
984 self.test_value(raw=True)
985
986 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000987 val1 = self.Value('i', 5)
988 lock1 = val1.get_lock()
989 obj1 = val1.get_obj()
990
991 val2 = self.Value('i', 5, lock=None)
992 lock2 = val2.get_lock()
993 obj2 = val2.get_obj()
994
995 lock = self.Lock()
996 val3 = self.Value('i', 5, lock=lock)
997 lock3 = val3.get_lock()
998 obj3 = val3.get_obj()
999 self.assertEqual(lock, lock3)
1000
Jesse Noller6ab22152009-01-18 02:45:38 +00001001 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001002 self.assertFalse(hasattr(arr4, 'get_lock'))
1003 self.assertFalse(hasattr(arr4, 'get_obj'))
1004
Jesse Noller6ab22152009-01-18 02:45:38 +00001005 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1006
1007 arr5 = self.RawValue('i', 5)
1008 self.assertFalse(hasattr(arr5, 'get_lock'))
1009 self.assertFalse(hasattr(arr5, 'get_obj'))
1010
Benjamin Petersondfd79492008-06-13 19:13:39 +00001011
1012class _TestArray(BaseTestCase):
1013
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001014 ALLOWED_TYPES = ('processes',)
1015
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001016 @classmethod
1017 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001018 for i in range(1, len(seq)):
1019 seq[i] += seq[i-1]
1020
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001021 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001022 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001023 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1024 if raw:
1025 arr = self.RawArray('i', seq)
1026 else:
1027 arr = self.Array('i', seq)
1028
1029 self.assertEqual(len(arr), len(seq))
1030 self.assertEqual(arr[3], seq[3])
1031 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1032
1033 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1034
1035 self.assertEqual(list(arr[:]), seq)
1036
1037 self.f(seq)
1038
1039 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001040 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001041 p.start()
1042 p.join()
1043
1044 self.assertEqual(list(arr[:]), seq)
1045
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001046 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001047 def test_array_from_size(self):
1048 size = 10
1049 # Test for zeroing (see issue #11675).
1050 # The repetition below strengthens the test by increasing the chances
1051 # of previously allocated non-zero memory being used for the new array
1052 # on the 2nd and 3rd loops.
1053 for _ in range(3):
1054 arr = self.Array('i', size)
1055 self.assertEqual(len(arr), size)
1056 self.assertEqual(list(arr), [0] * size)
1057 arr[:] = range(10)
1058 self.assertEqual(list(arr), range(10))
1059 del arr
1060
1061 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001062 def test_rawarray(self):
1063 self.test_array(raw=True)
1064
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001065 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001066 def test_array_accepts_long(self):
1067 arr = self.Array('i', 10L)
1068 self.assertEqual(len(arr), 10)
1069 raw_arr = self.RawArray('i', 10L)
1070 self.assertEqual(len(raw_arr), 10)
1071
1072 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001073 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001074 arr1 = self.Array('i', range(10))
1075 lock1 = arr1.get_lock()
1076 obj1 = arr1.get_obj()
1077
1078 arr2 = self.Array('i', range(10), lock=None)
1079 lock2 = arr2.get_lock()
1080 obj2 = arr2.get_obj()
1081
1082 lock = self.Lock()
1083 arr3 = self.Array('i', range(10), lock=lock)
1084 lock3 = arr3.get_lock()
1085 obj3 = arr3.get_obj()
1086 self.assertEqual(lock, lock3)
1087
Jesse Noller6ab22152009-01-18 02:45:38 +00001088 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001089 self.assertFalse(hasattr(arr4, 'get_lock'))
1090 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001091 self.assertRaises(AttributeError,
1092 self.Array, 'i', range(10), lock='notalock')
1093
1094 arr5 = self.RawArray('i', range(10))
1095 self.assertFalse(hasattr(arr5, 'get_lock'))
1096 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001097
1098#
1099#
1100#
1101
1102class _TestContainers(BaseTestCase):
1103
1104 ALLOWED_TYPES = ('manager',)
1105
1106 def test_list(self):
1107 a = self.list(range(10))
1108 self.assertEqual(a[:], range(10))
1109
1110 b = self.list()
1111 self.assertEqual(b[:], [])
1112
1113 b.extend(range(5))
1114 self.assertEqual(b[:], range(5))
1115
1116 self.assertEqual(b[2], 2)
1117 self.assertEqual(b[2:10], [2,3,4])
1118
1119 b *= 2
1120 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1121
1122 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1123
1124 self.assertEqual(a[:], range(10))
1125
1126 d = [a, b]
1127 e = self.list(d)
1128 self.assertEqual(
1129 e[:],
1130 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1131 )
1132
1133 f = self.list([a])
1134 a.append('hello')
1135 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1136
1137 def test_dict(self):
1138 d = self.dict()
1139 indices = range(65, 70)
1140 for i in indices:
1141 d[i] = chr(i)
1142 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1143 self.assertEqual(sorted(d.keys()), indices)
1144 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1145 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1146
1147 def test_namespace(self):
1148 n = self.Namespace()
1149 n.name = 'Bob'
1150 n.job = 'Builder'
1151 n._hidden = 'hidden'
1152 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1153 del n.job
1154 self.assertEqual(str(n), "Namespace(name='Bob')")
1155 self.assertTrue(hasattr(n, 'name'))
1156 self.assertTrue(not hasattr(n, 'job'))
1157
1158#
1159#
1160#
1161
1162def sqr(x, wait=0.0):
1163 time.sleep(wait)
1164 return x*x
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001165
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001166def identity(x):
1167 return x
1168
1169class CountedObject(object):
1170 n_instances = 0
1171
1172 def __new__(cls):
1173 cls.n_instances += 1
1174 return object.__new__(cls)
1175
1176 def __del__(self):
1177 type(self).n_instances -= 1
1178
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001179class SayWhenError(ValueError): pass
1180
1181def exception_throwing_generator(total, when):
1182 for i in range(total):
1183 if i == when:
1184 raise SayWhenError("Somebody said when")
1185 yield i
1186
Benjamin Petersondfd79492008-06-13 19:13:39 +00001187class _TestPool(BaseTestCase):
1188
1189 def test_apply(self):
1190 papply = self.pool.apply
1191 self.assertEqual(papply(sqr, (5,)), sqr(5))
1192 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1193
1194 def test_map(self):
1195 pmap = self.pool.map
1196 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1197 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1198 map(sqr, range(100)))
1199
Richard Oudkerk21aad972013-10-28 23:02:22 +00001200 def test_map_unplicklable(self):
1201 # Issue #19425 -- failure to pickle should not cause a hang
1202 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001203 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001204 class A(object):
1205 def __reduce__(self):
1206 raise RuntimeError('cannot pickle')
1207 with self.assertRaises(RuntimeError):
1208 self.pool.map(sqr, [A()]*10)
1209
Jesse Noller7530e472009-07-16 14:23:04 +00001210 def test_map_chunksize(self):
1211 try:
1212 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1213 except multiprocessing.TimeoutError:
1214 self.fail("pool.map_async with chunksize stalled on null list")
1215
Benjamin Petersondfd79492008-06-13 19:13:39 +00001216 def test_async(self):
1217 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1218 get = TimingWrapper(res.get)
1219 self.assertEqual(get(), 49)
1220 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1221
1222 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001223 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001224 get = TimingWrapper(res.get)
1225 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1226 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1227
1228 def test_imap(self):
1229 it = self.pool.imap(sqr, range(10))
1230 self.assertEqual(list(it), map(sqr, range(10)))
1231
1232 it = self.pool.imap(sqr, range(10))
1233 for i in range(10):
1234 self.assertEqual(it.next(), i*i)
1235 self.assertRaises(StopIteration, it.next)
1236
1237 it = self.pool.imap(sqr, range(1000), chunksize=100)
1238 for i in range(1000):
1239 self.assertEqual(it.next(), i*i)
1240 self.assertRaises(StopIteration, it.next)
1241
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001242 def test_imap_handle_iterable_exception(self):
1243 if self.TYPE == 'manager':
1244 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1245
1246 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1247 for i in range(3):
1248 self.assertEqual(next(it), i*i)
1249 self.assertRaises(SayWhenError, it.next)
1250
1251 # SayWhenError seen at start of problematic chunk's results
1252 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1253 for i in range(6):
1254 self.assertEqual(next(it), i*i)
1255 self.assertRaises(SayWhenError, it.next)
1256 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1257 for i in range(4):
1258 self.assertEqual(next(it), i*i)
1259 self.assertRaises(SayWhenError, it.next)
1260
Benjamin Petersondfd79492008-06-13 19:13:39 +00001261 def test_imap_unordered(self):
1262 it = self.pool.imap_unordered(sqr, range(1000))
1263 self.assertEqual(sorted(it), map(sqr, range(1000)))
1264
1265 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1266 self.assertEqual(sorted(it), map(sqr, range(1000)))
1267
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001268 def test_imap_unordered_handle_iterable_exception(self):
1269 if self.TYPE == 'manager':
1270 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1271
1272 it = self.pool.imap_unordered(sqr,
1273 exception_throwing_generator(10, 3),
1274 1)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001275 expected_values = map(sqr, range(10))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001276 with self.assertRaises(SayWhenError):
1277 # imap_unordered makes it difficult to anticipate the SayWhenError
1278 for i in range(10):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001279 value = next(it)
1280 self.assertIn(value, expected_values)
1281 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001282
1283 it = self.pool.imap_unordered(sqr,
1284 exception_throwing_generator(20, 7),
1285 2)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001286 expected_values = map(sqr, range(20))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001287 with self.assertRaises(SayWhenError):
1288 for i in range(20):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001289 value = next(it)
1290 self.assertIn(value, expected_values)
1291 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001292
Benjamin Petersondfd79492008-06-13 19:13:39 +00001293 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001294 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1295 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1296
Benjamin Petersondfd79492008-06-13 19:13:39 +00001297 p = multiprocessing.Pool(3)
1298 self.assertEqual(3, len(p._pool))
1299 p.close()
1300 p.join()
1301
1302 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001303 p = self.Pool(4)
1304 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001305 time.sleep, [0.1 for i in range(10000)], chunksize=1
1306 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001307 p.terminate()
1308 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001309 join()
1310 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001311
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001312 def test_empty_iterable(self):
1313 # See Issue 12157
1314 p = self.Pool(1)
1315
1316 self.assertEqual(p.map(sqr, []), [])
1317 self.assertEqual(list(p.imap(sqr, [])), [])
1318 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1319 self.assertEqual(p.map_async(sqr, []).get(), [])
1320
1321 p.close()
1322 p.join()
1323
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001324 def test_release_task_refs(self):
1325 # Issue #29861: task arguments and results should not be kept
1326 # alive after we are done with them.
1327 objs = list(CountedObject() for i in range(10))
1328 refs = list(weakref.ref(o) for o in objs)
1329 self.pool.map(identity, objs)
1330
1331 del objs
Victor Stinnerfd6094c2017-05-05 09:47:11 +02001332 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001333 self.assertEqual(set(wr() for wr in refs), {None})
1334 # With a process pool, copies of the objects are returned, check
1335 # they were released too.
1336 self.assertEqual(CountedObject.n_instances, 0)
1337
1338
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001339def unpickleable_result():
1340 return lambda: 42
1341
1342class _TestPoolWorkerErrors(BaseTestCase):
1343 ALLOWED_TYPES = ('processes', )
1344
1345 def test_unpickleable_result(self):
1346 from multiprocessing.pool import MaybeEncodingError
1347 p = multiprocessing.Pool(2)
1348
1349 # Make sure we don't lose pool processes because of encoding errors.
1350 for iteration in range(20):
1351 res = p.apply_async(unpickleable_result)
1352 self.assertRaises(MaybeEncodingError, res.get)
1353
1354 p.close()
1355 p.join()
1356
Jesse Noller654ade32010-01-27 03:05:57 +00001357class _TestPoolWorkerLifetime(BaseTestCase):
1358
1359 ALLOWED_TYPES = ('processes', )
1360 def test_pool_worker_lifetime(self):
1361 p = multiprocessing.Pool(3, maxtasksperchild=10)
1362 self.assertEqual(3, len(p._pool))
1363 origworkerpids = [w.pid for w in p._pool]
1364 # Run many tasks so each worker gets replaced (hopefully)
1365 results = []
1366 for i in range(100):
1367 results.append(p.apply_async(sqr, (i, )))
1368 # Fetch the results and verify we got the right answers,
1369 # also ensuring all the tasks have completed.
1370 for (j, res) in enumerate(results):
1371 self.assertEqual(res.get(), sqr(j))
1372 # Refill the pool
1373 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001374 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001375 # (countdown * DELTA = 5 seconds max startup process time)
1376 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001377 while countdown and not all(w.is_alive() for w in p._pool):
1378 countdown -= 1
1379 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001380 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001381 # All pids should be assigned. See issue #7805.
1382 self.assertNotIn(None, origworkerpids)
1383 self.assertNotIn(None, finalworkerpids)
1384 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001385 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1386 p.close()
1387 p.join()
1388
Charles-François Natali46f990e2011-10-24 18:43:51 +02001389 def test_pool_worker_lifetime_early_close(self):
1390 # Issue #10332: closing a pool whose workers have limited lifetimes
1391 # before all the tasks completed would make join() hang.
1392 p = multiprocessing.Pool(3, maxtasksperchild=1)
1393 results = []
1394 for i in range(6):
1395 results.append(p.apply_async(sqr, (i, 0.3)))
1396 p.close()
1397 p.join()
1398 # check the results
1399 for (j, res) in enumerate(results):
1400 self.assertEqual(res.get(), sqr(j))
1401
1402
Benjamin Petersondfd79492008-06-13 19:13:39 +00001403#
1404# Test that manager has expected number of shared objects left
1405#
1406
1407class _TestZZZNumberOfObjects(BaseTestCase):
1408 # Because test cases are sorted alphabetically, this one will get
1409 # run after all the other tests for the manager. It tests that
1410 # there have been no "reference leaks" for the manager's shared
1411 # objects. Note the comment in _TestPool.test_terminate().
1412 ALLOWED_TYPES = ('manager',)
1413
1414 def test_number_of_objects(self):
1415 EXPECTED_NUMBER = 1 # the pool object is still alive
1416 multiprocessing.active_children() # discard dead process objs
1417 gc.collect() # do garbage collection
1418 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001419 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001420 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001421 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001422 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001423
1424 self.assertEqual(refs, EXPECTED_NUMBER)
1425
1426#
1427# Test of creating a customized manager class
1428#
1429
1430from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1431
1432class FooBar(object):
1433 def f(self):
1434 return 'f()'
1435 def g(self):
1436 raise ValueError
1437 def _h(self):
1438 return '_h()'
1439
1440def baz():
1441 for i in xrange(10):
1442 yield i*i
1443
1444class IteratorProxy(BaseProxy):
1445 _exposed_ = ('next', '__next__')
1446 def __iter__(self):
1447 return self
1448 def next(self):
1449 return self._callmethod('next')
1450 def __next__(self):
1451 return self._callmethod('__next__')
1452
1453class MyManager(BaseManager):
1454 pass
1455
1456MyManager.register('Foo', callable=FooBar)
1457MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1458MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1459
1460
1461class _TestMyManager(BaseTestCase):
1462
1463 ALLOWED_TYPES = ('manager',)
1464
1465 def test_mymanager(self):
1466 manager = MyManager()
1467 manager.start()
1468
1469 foo = manager.Foo()
1470 bar = manager.Bar()
1471 baz = manager.baz()
1472
1473 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1474 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1475
1476 self.assertEqual(foo_methods, ['f', 'g'])
1477 self.assertEqual(bar_methods, ['f', '_h'])
1478
1479 self.assertEqual(foo.f(), 'f()')
1480 self.assertRaises(ValueError, foo.g)
1481 self.assertEqual(foo._callmethod('f'), 'f()')
1482 self.assertRaises(RemoteError, foo._callmethod, '_h')
1483
1484 self.assertEqual(bar.f(), 'f()')
1485 self.assertEqual(bar._h(), '_h()')
1486 self.assertEqual(bar._callmethod('f'), 'f()')
1487 self.assertEqual(bar._callmethod('_h'), '_h()')
1488
1489 self.assertEqual(list(baz), [i*i for i in range(10)])
1490
1491 manager.shutdown()
1492
1493#
1494# Test of connecting to a remote server and using xmlrpclib for serialization
1495#
1496
1497_queue = Queue.Queue()
1498def get_queue():
1499 return _queue
1500
1501class QueueManager(BaseManager):
1502 '''manager class used by server process'''
1503QueueManager.register('get_queue', callable=get_queue)
1504
1505class QueueManager2(BaseManager):
1506 '''manager class which specifies the same interface as QueueManager'''
1507QueueManager2.register('get_queue')
1508
1509
1510SERIALIZER = 'xmlrpclib'
1511
1512class _TestRemoteManager(BaseTestCase):
1513
1514 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001515 values = ['hello world', None, True, 2.25,
1516 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1517 ]
1518 result = values[:]
1519 if test_support.have_unicode:
1520 #result[-1] = u'hall\xe5 v\xe4rlden'
1521 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1522 r'\u0441\u0432\u0456\u0442')
1523 values.append(uvalue)
1524 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001525
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001526 @classmethod
1527 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001528 manager = QueueManager2(
1529 address=address, authkey=authkey, serializer=SERIALIZER
1530 )
1531 manager.connect()
1532 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001533 # Note that xmlrpclib will deserialize object as a list not a tuple
1534 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001535
1536 def test_remote(self):
1537 authkey = os.urandom(32)
1538
1539 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001540 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001541 )
1542 manager.start()
1543
1544 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001545 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001546 p.start()
1547
1548 manager2 = QueueManager2(
1549 address=manager.address, authkey=authkey, serializer=SERIALIZER
1550 )
1551 manager2.connect()
1552 queue = manager2.get_queue()
1553
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001554 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001555
1556 # Because we are using xmlrpclib for serialization instead of
1557 # pickle this will cause a serialization error.
1558 self.assertRaises(Exception, queue.put, time.sleep)
1559
1560 # Make queue finalizer run before the server is stopped
1561 del queue
1562 manager.shutdown()
1563
Jesse Noller459a6482009-03-30 15:50:42 +00001564class _TestManagerRestart(BaseTestCase):
1565
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001566 @classmethod
1567 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001568 manager = QueueManager(
1569 address=address, authkey=authkey, serializer=SERIALIZER)
1570 manager.connect()
1571 queue = manager.get_queue()
1572 queue.put('hello world')
1573
1574 def test_rapid_restart(self):
1575 authkey = os.urandom(32)
1576 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001577 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001578 srvr = manager.get_server()
1579 addr = srvr.address
1580 # Close the connection.Listener socket which gets opened as a part
1581 # of manager.get_server(). It's not needed for the test.
1582 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001583 manager.start()
1584
1585 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001586 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001587 p.start()
1588 queue = manager.get_queue()
1589 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001590 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001591 manager.shutdown()
1592 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001593 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001594 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001595 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001596
Benjamin Petersondfd79492008-06-13 19:13:39 +00001597#
1598#
1599#
1600
1601SENTINEL = latin('')
1602
1603class _TestConnection(BaseTestCase):
1604
1605 ALLOWED_TYPES = ('processes', 'threads')
1606
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001607 @classmethod
1608 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001609 for msg in iter(conn.recv_bytes, SENTINEL):
1610 conn.send_bytes(msg)
1611 conn.close()
1612
1613 def test_connection(self):
1614 conn, child_conn = self.Pipe()
1615
1616 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001617 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001618 p.start()
1619
1620 seq = [1, 2.25, None]
1621 msg = latin('hello world')
1622 longmsg = msg * 10
1623 arr = array.array('i', range(4))
1624
1625 if self.TYPE == 'processes':
1626 self.assertEqual(type(conn.fileno()), int)
1627
1628 self.assertEqual(conn.send(seq), None)
1629 self.assertEqual(conn.recv(), seq)
1630
1631 self.assertEqual(conn.send_bytes(msg), None)
1632 self.assertEqual(conn.recv_bytes(), msg)
1633
1634 if self.TYPE == 'processes':
1635 buffer = array.array('i', [0]*10)
1636 expected = list(arr) + [0] * (10 - len(arr))
1637 self.assertEqual(conn.send_bytes(arr), None)
1638 self.assertEqual(conn.recv_bytes_into(buffer),
1639 len(arr) * buffer.itemsize)
1640 self.assertEqual(list(buffer), expected)
1641
1642 buffer = array.array('i', [0]*10)
1643 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1644 self.assertEqual(conn.send_bytes(arr), None)
1645 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1646 len(arr) * buffer.itemsize)
1647 self.assertEqual(list(buffer), expected)
1648
1649 buffer = bytearray(latin(' ' * 40))
1650 self.assertEqual(conn.send_bytes(longmsg), None)
1651 try:
1652 res = conn.recv_bytes_into(buffer)
1653 except multiprocessing.BufferTooShort, e:
1654 self.assertEqual(e.args, (longmsg,))
1655 else:
1656 self.fail('expected BufferTooShort, got %s' % res)
1657
1658 poll = TimingWrapper(conn.poll)
1659
1660 self.assertEqual(poll(), False)
1661 self.assertTimingAlmostEqual(poll.elapsed, 0)
1662
1663 self.assertEqual(poll(TIMEOUT1), False)
1664 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1665
1666 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001667 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001668
1669 self.assertEqual(poll(TIMEOUT1), True)
1670 self.assertTimingAlmostEqual(poll.elapsed, 0)
1671
1672 self.assertEqual(conn.recv(), None)
1673
1674 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1675 conn.send_bytes(really_big_msg)
1676 self.assertEqual(conn.recv_bytes(), really_big_msg)
1677
1678 conn.send_bytes(SENTINEL) # tell child to quit
1679 child_conn.close()
1680
1681 if self.TYPE == 'processes':
1682 self.assertEqual(conn.readable, True)
1683 self.assertEqual(conn.writable, True)
1684 self.assertRaises(EOFError, conn.recv)
1685 self.assertRaises(EOFError, conn.recv_bytes)
1686
1687 p.join()
1688
1689 def test_duplex_false(self):
1690 reader, writer = self.Pipe(duplex=False)
1691 self.assertEqual(writer.send(1), None)
1692 self.assertEqual(reader.recv(), 1)
1693 if self.TYPE == 'processes':
1694 self.assertEqual(reader.readable, True)
1695 self.assertEqual(reader.writable, False)
1696 self.assertEqual(writer.readable, False)
1697 self.assertEqual(writer.writable, True)
1698 self.assertRaises(IOError, reader.send, 2)
1699 self.assertRaises(IOError, writer.recv)
1700 self.assertRaises(IOError, writer.poll)
1701
1702 def test_spawn_close(self):
1703 # We test that a pipe connection can be closed by parent
1704 # process immediately after child is spawned. On Windows this
1705 # would have sometimes failed on old versions because
1706 # child_conn would be closed before the child got a chance to
1707 # duplicate it.
1708 conn, child_conn = self.Pipe()
1709
1710 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001711 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001712 p.start()
1713 child_conn.close() # this might complete before child initializes
1714
1715 msg = latin('hello')
1716 conn.send_bytes(msg)
1717 self.assertEqual(conn.recv_bytes(), msg)
1718
1719 conn.send_bytes(SENTINEL)
1720 conn.close()
1721 p.join()
1722
1723 def test_sendbytes(self):
1724 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001725 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001726
1727 msg = latin('abcdefghijklmnopqrstuvwxyz')
1728 a, b = self.Pipe()
1729
1730 a.send_bytes(msg)
1731 self.assertEqual(b.recv_bytes(), msg)
1732
1733 a.send_bytes(msg, 5)
1734 self.assertEqual(b.recv_bytes(), msg[5:])
1735
1736 a.send_bytes(msg, 7, 8)
1737 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1738
1739 a.send_bytes(msg, 26)
1740 self.assertEqual(b.recv_bytes(), latin(''))
1741
1742 a.send_bytes(msg, 26, 0)
1743 self.assertEqual(b.recv_bytes(), latin(''))
1744
1745 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1746
1747 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1748
1749 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1750
1751 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1752
1753 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1754
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001755 @classmethod
1756 def _is_fd_assigned(cls, fd):
1757 try:
1758 os.fstat(fd)
1759 except OSError as e:
1760 if e.errno == errno.EBADF:
1761 return False
1762 raise
1763 else:
1764 return True
1765
1766 @classmethod
1767 def _writefd(cls, conn, data, create_dummy_fds=False):
1768 if create_dummy_fds:
1769 for i in range(0, 256):
1770 if not cls._is_fd_assigned(i):
1771 os.dup2(conn.fileno(), i)
1772 fd = reduction.recv_handle(conn)
1773 if msvcrt:
1774 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1775 os.write(fd, data)
1776 os.close(fd)
1777
Charles-François Natalif8413b22011-09-21 18:44:49 +02001778 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001779 def test_fd_transfer(self):
1780 if self.TYPE != 'processes':
1781 self.skipTest("only makes sense with processes")
1782 conn, child_conn = self.Pipe(duplex=True)
1783
1784 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001785 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001786 p.start()
1787 with open(test_support.TESTFN, "wb") as f:
1788 fd = f.fileno()
1789 if msvcrt:
1790 fd = msvcrt.get_osfhandle(fd)
1791 reduction.send_handle(conn, fd, p.pid)
1792 p.join()
1793 with open(test_support.TESTFN, "rb") as f:
1794 self.assertEqual(f.read(), b"foo")
1795
Charles-François Natalif8413b22011-09-21 18:44:49 +02001796 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001797 @unittest.skipIf(sys.platform == "win32",
1798 "test semantics don't make sense on Windows")
1799 @unittest.skipIf(MAXFD <= 256,
1800 "largest assignable fd number is too small")
1801 @unittest.skipUnless(hasattr(os, "dup2"),
1802 "test needs os.dup2()")
1803 def test_large_fd_transfer(self):
1804 # With fd > 256 (issue #11657)
1805 if self.TYPE != 'processes':
1806 self.skipTest("only makes sense with processes")
1807 conn, child_conn = self.Pipe(duplex=True)
1808
1809 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001810 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001811 p.start()
1812 with open(test_support.TESTFN, "wb") as f:
1813 fd = f.fileno()
1814 for newfd in range(256, MAXFD):
1815 if not self._is_fd_assigned(newfd):
1816 break
1817 else:
1818 self.fail("could not find an unassigned large file descriptor")
1819 os.dup2(fd, newfd)
1820 try:
1821 reduction.send_handle(conn, newfd, p.pid)
1822 finally:
1823 os.close(newfd)
1824 p.join()
1825 with open(test_support.TESTFN, "rb") as f:
1826 self.assertEqual(f.read(), b"bar")
1827
Jesus Ceac23484b2011-09-21 03:47:39 +02001828 @classmethod
1829 def _send_data_without_fd(self, conn):
1830 os.write(conn.fileno(), b"\0")
1831
Charles-François Natalif8413b22011-09-21 18:44:49 +02001832 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001833 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1834 def test_missing_fd_transfer(self):
1835 # Check that exception is raised when received data is not
1836 # accompanied by a file descriptor in ancillary data.
1837 if self.TYPE != 'processes':
1838 self.skipTest("only makes sense with processes")
1839 conn, child_conn = self.Pipe(duplex=True)
1840
1841 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1842 p.daemon = True
1843 p.start()
1844 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1845 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001846
Benjamin Petersondfd79492008-06-13 19:13:39 +00001847class _TestListenerClient(BaseTestCase):
1848
1849 ALLOWED_TYPES = ('processes', 'threads')
1850
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001851 @classmethod
1852 def _test(cls, address):
1853 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001854 conn.send('hello')
1855 conn.close()
1856
1857 def test_listener_client(self):
1858 for family in self.connection.families:
1859 l = self.connection.Listener(family=family)
1860 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001861 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001862 p.start()
1863 conn = l.accept()
1864 self.assertEqual(conn.recv(), 'hello')
1865 p.join()
1866 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001867
1868 def test_issue14725(self):
1869 l = self.connection.Listener()
1870 p = self.Process(target=self._test, args=(l.address,))
1871 p.daemon = True
1872 p.start()
1873 time.sleep(1)
1874 # On Windows the client process should by now have connected,
1875 # written data and closed the pipe handle by now. This causes
1876 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1877 # 14725.
1878 conn = l.accept()
1879 self.assertEqual(conn.recv(), 'hello')
1880 conn.close()
1881 p.join()
1882 l.close()
1883
Benjamin Petersondfd79492008-06-13 19:13:39 +00001884#
1885# Test of sending connection and socket objects between processes
1886#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001887"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001888class _TestPicklingConnections(BaseTestCase):
1889
1890 ALLOWED_TYPES = ('processes',)
1891
1892 def _listener(self, conn, families):
1893 for fam in families:
1894 l = self.connection.Listener(family=fam)
1895 conn.send(l.address)
1896 new_conn = l.accept()
1897 conn.send(new_conn)
1898
1899 if self.TYPE == 'processes':
1900 l = socket.socket()
1901 l.bind(('localhost', 0))
1902 conn.send(l.getsockname())
1903 l.listen(1)
1904 new_conn, addr = l.accept()
1905 conn.send(new_conn)
1906
1907 conn.recv()
1908
1909 def _remote(self, conn):
1910 for (address, msg) in iter(conn.recv, None):
1911 client = self.connection.Client(address)
1912 client.send(msg.upper())
1913 client.close()
1914
1915 if self.TYPE == 'processes':
1916 address, msg = conn.recv()
1917 client = socket.socket()
1918 client.connect(address)
1919 client.sendall(msg.upper())
1920 client.close()
1921
1922 conn.close()
1923
1924 def test_pickling(self):
1925 try:
1926 multiprocessing.allow_connection_pickling()
1927 except ImportError:
1928 return
1929
1930 families = self.connection.families
1931
1932 lconn, lconn0 = self.Pipe()
1933 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001934 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001935 lp.start()
1936 lconn0.close()
1937
1938 rconn, rconn0 = self.Pipe()
1939 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001940 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001941 rp.start()
1942 rconn0.close()
1943
1944 for fam in families:
1945 msg = ('This connection uses family %s' % fam).encode('ascii')
1946 address = lconn.recv()
1947 rconn.send((address, msg))
1948 new_conn = lconn.recv()
1949 self.assertEqual(new_conn.recv(), msg.upper())
1950
1951 rconn.send(None)
1952
1953 if self.TYPE == 'processes':
1954 msg = latin('This connection uses a normal socket')
1955 address = lconn.recv()
1956 rconn.send((address, msg))
1957 if hasattr(socket, 'fromfd'):
1958 new_conn = lconn.recv()
1959 self.assertEqual(new_conn.recv(100), msg.upper())
1960 else:
1961 # XXX On Windows with Py2.6 need to backport fromfd()
1962 discard = lconn.recv_bytes()
1963
1964 lconn.send(None)
1965
1966 rconn.close()
1967 lconn.close()
1968
1969 lp.join()
1970 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001971"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001972#
1973#
1974#
1975
1976class _TestHeap(BaseTestCase):
1977
1978 ALLOWED_TYPES = ('processes',)
1979
1980 def test_heap(self):
1981 iterations = 5000
1982 maxblocks = 50
1983 blocks = []
1984
1985 # create and destroy lots of blocks of different sizes
1986 for i in xrange(iterations):
1987 size = int(random.lognormvariate(0, 1) * 1000)
1988 b = multiprocessing.heap.BufferWrapper(size)
1989 blocks.append(b)
1990 if len(blocks) > maxblocks:
1991 i = random.randrange(maxblocks)
1992 del blocks[i]
1993
1994 # get the heap object
1995 heap = multiprocessing.heap.BufferWrapper._heap
1996
1997 # verify the state of the heap
1998 all = []
1999 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002000 heap._lock.acquire()
2001 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002002 for L in heap._len_to_seq.values():
2003 for arena, start, stop in L:
2004 all.append((heap._arenas.index(arena), start, stop,
2005 stop-start, 'free'))
2006 for arena, start, stop in heap._allocated_blocks:
2007 all.append((heap._arenas.index(arena), start, stop,
2008 stop-start, 'occupied'))
2009 occupied += (stop-start)
2010
2011 all.sort()
2012
2013 for i in range(len(all)-1):
2014 (arena, start, stop) = all[i][:3]
2015 (narena, nstart, nstop) = all[i+1][:3]
2016 self.assertTrue((arena != narena and nstart == 0) or
2017 (stop == nstart))
2018
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002019 def test_free_from_gc(self):
2020 # Check that freeing of blocks by the garbage collector doesn't deadlock
2021 # (issue #12352).
2022 # Make sure the GC is enabled, and set lower collection thresholds to
2023 # make collections more frequent (and increase the probability of
2024 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02002025 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002026 gc.enable()
2027 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02002028 thresholds = gc.get_threshold()
2029 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002030 gc.set_threshold(10)
2031
2032 # perform numerous block allocations, with cyclic references to make
2033 # sure objects are collected asynchronously by the gc
2034 for i in range(5000):
2035 a = multiprocessing.heap.BufferWrapper(1)
2036 b = multiprocessing.heap.BufferWrapper(1)
2037 # circular references
2038 a.buddy = b
2039 b.buddy = a
2040
Benjamin Petersondfd79492008-06-13 19:13:39 +00002041#
2042#
2043#
2044
Benjamin Petersondfd79492008-06-13 19:13:39 +00002045class _Foo(Structure):
2046 _fields_ = [
2047 ('x', c_int),
2048 ('y', c_double)
2049 ]
2050
2051class _TestSharedCTypes(BaseTestCase):
2052
2053 ALLOWED_TYPES = ('processes',)
2054
Antoine Pitrou55d935a2010-11-22 16:35:57 +00002055 def setUp(self):
2056 if not HAS_SHAREDCTYPES:
2057 self.skipTest("requires multiprocessing.sharedctypes")
2058
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002059 @classmethod
2060 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002061 x.value *= 2
2062 y.value *= 2
2063 foo.x *= 2
2064 foo.y *= 2
2065 string.value *= 2
2066 for i in range(len(arr)):
2067 arr[i] *= 2
2068
2069 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002070 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002071 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002072 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002073 arr = self.Array('d', range(10), lock=lock)
2074 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00002075 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002076
2077 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002078 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002079 p.start()
2080 p.join()
2081
2082 self.assertEqual(x.value, 14)
2083 self.assertAlmostEqual(y.value, 2.0/3.0)
2084 self.assertEqual(foo.x, 6)
2085 self.assertAlmostEqual(foo.y, 4.0)
2086 for i in range(10):
2087 self.assertAlmostEqual(arr[i], i*2)
2088 self.assertEqual(string.value, latin('hellohello'))
2089
2090 def test_synchronize(self):
2091 self.test_sharedctypes(lock=True)
2092
2093 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002094 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00002095 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002096 foo.x = 0
2097 foo.y = 0
2098 self.assertEqual(bar.x, 2)
2099 self.assertAlmostEqual(bar.y, 5.0)
2100
2101#
2102#
2103#
2104
2105class _TestFinalize(BaseTestCase):
2106
2107 ALLOWED_TYPES = ('processes',)
2108
Antoine Pitroud09f1672017-06-13 17:52:29 +02002109 def setUp(self):
2110 self.registry_backup = util._finalizer_registry.copy()
2111 util._finalizer_registry.clear()
2112
2113 def tearDown(self):
2114 self.assertFalse(util._finalizer_registry)
2115 util._finalizer_registry.update(self.registry_backup)
2116
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002117 @classmethod
2118 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002119 class Foo(object):
2120 pass
2121
2122 a = Foo()
2123 util.Finalize(a, conn.send, args=('a',))
2124 del a # triggers callback for a
2125
2126 b = Foo()
2127 close_b = util.Finalize(b, conn.send, args=('b',))
2128 close_b() # triggers callback for b
2129 close_b() # does nothing because callback has already been called
2130 del b # does nothing because callback has already been called
2131
2132 c = Foo()
2133 util.Finalize(c, conn.send, args=('c',))
2134
2135 d10 = Foo()
2136 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2137
2138 d01 = Foo()
2139 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2140 d02 = Foo()
2141 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2142 d03 = Foo()
2143 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2144
2145 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2146
2147 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2148
Ezio Melottic2077b02011-03-16 12:34:31 +02002149 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002150 # garbage collecting locals
2151 util._exit_function()
2152 conn.close()
2153 os._exit(0)
2154
2155 def test_finalize(self):
2156 conn, child_conn = self.Pipe()
2157
2158 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002159 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002160 p.start()
2161 p.join()
2162
2163 result = [obj for obj in iter(conn.recv, 'STOP')]
2164 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2165
Antoine Pitroud09f1672017-06-13 17:52:29 +02002166 def test_thread_safety(self):
2167 # bpo-24484: _run_finalizers() should be thread-safe
2168 def cb():
2169 pass
2170
2171 class Foo(object):
2172 def __init__(self):
2173 self.ref = self # create reference cycle
2174 # insert finalizer at random key
2175 util.Finalize(self, cb, exitpriority=random.randint(1, 100))
2176
2177 finish = False
2178 exc = []
2179
2180 def run_finalizers():
2181 while not finish:
2182 time.sleep(random.random() * 1e-1)
2183 try:
2184 # A GC run will eventually happen during this,
2185 # collecting stale Foo's and mutating the registry
2186 util._run_finalizers()
2187 except Exception as e:
2188 exc.append(e)
2189
2190 def make_finalizers():
2191 d = {}
2192 while not finish:
2193 try:
2194 # Old Foo's get gradually replaced and later
2195 # collected by the GC (because of the cyclic ref)
2196 d[random.getrandbits(5)] = {Foo() for i in range(10)}
2197 except Exception as e:
2198 exc.append(e)
2199 d.clear()
2200
2201 old_interval = sys.getcheckinterval()
2202 old_threshold = gc.get_threshold()
2203 try:
2204 sys.setcheckinterval(10)
2205 gc.set_threshold(5, 5, 5)
2206 threads = [threading.Thread(target=run_finalizers),
2207 threading.Thread(target=make_finalizers)]
2208 with test_support.start_threads(threads):
2209 time.sleep(4.0) # Wait a bit to trigger race condition
2210 finish = True
2211 if exc:
2212 raise exc[0]
2213 finally:
2214 sys.setcheckinterval(old_interval)
2215 gc.set_threshold(*old_threshold)
2216 gc.collect() # Collect remaining Foo's
2217
2218
Benjamin Petersondfd79492008-06-13 19:13:39 +00002219#
2220# Test that from ... import * works for each module
2221#
2222
2223class _TestImportStar(BaseTestCase):
2224
2225 ALLOWED_TYPES = ('processes',)
2226
2227 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002228 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002229 'multiprocessing', 'multiprocessing.connection',
2230 'multiprocessing.heap', 'multiprocessing.managers',
2231 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002232 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002233 ]
2234
Charles-François Natalif8413b22011-09-21 18:44:49 +02002235 if HAS_REDUCTION:
2236 modules.append('multiprocessing.reduction')
2237
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002238 if c_int is not None:
2239 # This module requires _ctypes
2240 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002241
2242 for name in modules:
2243 __import__(name)
2244 mod = sys.modules[name]
2245
2246 for attr in getattr(mod, '__all__', ()):
2247 self.assertTrue(
2248 hasattr(mod, attr),
2249 '%r does not have attribute %r' % (mod, attr)
2250 )
2251
2252#
2253# Quick test that logging works -- does not test logging output
2254#
2255
2256class _TestLogging(BaseTestCase):
2257
2258 ALLOWED_TYPES = ('processes',)
2259
2260 def test_enable_logging(self):
2261 logger = multiprocessing.get_logger()
2262 logger.setLevel(util.SUBWARNING)
2263 self.assertTrue(logger is not None)
2264 logger.debug('this will not be printed')
2265 logger.info('nor will this')
2266 logger.setLevel(LOG_LEVEL)
2267
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002268 @classmethod
2269 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002270 logger = multiprocessing.get_logger()
2271 conn.send(logger.getEffectiveLevel())
2272
2273 def test_level(self):
2274 LEVEL1 = 32
2275 LEVEL2 = 37
2276
2277 logger = multiprocessing.get_logger()
2278 root_logger = logging.getLogger()
2279 root_level = root_logger.level
2280
2281 reader, writer = multiprocessing.Pipe(duplex=False)
2282
2283 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002284 p = self.Process(target=self._test_level, args=(writer,))
2285 p.daemon = True
2286 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002287 self.assertEqual(LEVEL1, reader.recv())
2288
2289 logger.setLevel(logging.NOTSET)
2290 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002291 p = self.Process(target=self._test_level, args=(writer,))
2292 p.daemon = True
2293 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002294 self.assertEqual(LEVEL2, reader.recv())
2295
2296 root_logger.setLevel(root_level)
2297 logger.setLevel(level=LOG_LEVEL)
2298
Jesse Noller814d02d2009-11-21 14:38:23 +00002299
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002300# class _TestLoggingProcessName(BaseTestCase):
2301#
2302# def handle(self, record):
2303# assert record.processName == multiprocessing.current_process().name
2304# self.__handled = True
2305#
2306# def test_logging(self):
2307# handler = logging.Handler()
2308# handler.handle = self.handle
2309# self.__handled = False
2310# # Bypass getLogger() and side-effects
2311# logger = logging.getLoggerClass()(
2312# 'multiprocessing.test.TestLoggingProcessName')
2313# logger.addHandler(handler)
2314# logger.propagate = False
2315#
2316# logger.warn('foo')
2317# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002318
Benjamin Petersondfd79492008-06-13 19:13:39 +00002319#
Richard Oudkerkba482642013-02-26 12:37:07 +00002320# Check that Process.join() retries if os.waitpid() fails with EINTR
2321#
2322
2323class _TestPollEintr(BaseTestCase):
2324
2325 ALLOWED_TYPES = ('processes',)
2326
2327 @classmethod
2328 def _killer(cls, pid):
2329 time.sleep(0.5)
2330 os.kill(pid, signal.SIGUSR1)
2331
2332 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2333 def test_poll_eintr(self):
2334 got_signal = [False]
2335 def record(*args):
2336 got_signal[0] = True
2337 pid = os.getpid()
2338 oldhandler = signal.signal(signal.SIGUSR1, record)
2339 try:
2340 killer = self.Process(target=self._killer, args=(pid,))
2341 killer.start()
2342 p = self.Process(target=time.sleep, args=(1,))
2343 p.start()
2344 p.join()
2345 self.assertTrue(got_signal[0])
2346 self.assertEqual(p.exitcode, 0)
2347 killer.join()
2348 finally:
2349 signal.signal(signal.SIGUSR1, oldhandler)
2350
2351#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002352# Test to verify handle verification, see issue 3321
2353#
2354
2355class TestInvalidHandle(unittest.TestCase):
2356
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002357 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002358 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002359 conn = _multiprocessing.Connection(44977608)
2360 self.assertRaises(IOError, conn.poll)
2361 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002362
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002363#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002364# Functions used to create test cases from the base ones in this module
2365#
2366
2367def get_attributes(Source, names):
2368 d = {}
2369 for name in names:
2370 obj = getattr(Source, name)
2371 if type(obj) == type(get_attributes):
2372 obj = staticmethod(obj)
2373 d[name] = obj
2374 return d
2375
2376def create_test_cases(Mixin, type):
2377 result = {}
2378 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002379 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002380
2381 for name in glob.keys():
2382 if name.startswith('_Test'):
2383 base = glob[name]
2384 if type in base.ALLOWED_TYPES:
2385 newname = 'With' + Type + name[1:]
2386 class Temp(base, unittest.TestCase, Mixin):
2387 pass
2388 result[newname] = Temp
2389 Temp.__name__ = newname
2390 Temp.__module__ = Mixin.__module__
2391 return result
2392
2393#
2394# Create test cases
2395#
2396
2397class ProcessesMixin(object):
2398 TYPE = 'processes'
2399 Process = multiprocessing.Process
2400 locals().update(get_attributes(multiprocessing, (
2401 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2402 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2403 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002404 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002405 )))
2406
2407testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2408globals().update(testcases_processes)
2409
2410
2411class ManagerMixin(object):
2412 TYPE = 'manager'
2413 Process = multiprocessing.Process
2414 manager = object.__new__(multiprocessing.managers.SyncManager)
2415 locals().update(get_attributes(manager, (
2416 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2417 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002418 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002419 )))
2420
2421testcases_manager = create_test_cases(ManagerMixin, type='manager')
2422globals().update(testcases_manager)
2423
2424
2425class ThreadsMixin(object):
2426 TYPE = 'threads'
2427 Process = multiprocessing.dummy.Process
2428 locals().update(get_attributes(multiprocessing.dummy, (
2429 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2430 'Condition', 'Event', 'Value', 'Array', 'current_process',
2431 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002432 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002433 )))
2434
2435testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2436globals().update(testcases_threads)
2437
Neal Norwitz0c519b32008-08-25 01:50:24 +00002438class OtherTest(unittest.TestCase):
2439 # TODO: add more tests for deliver/answer challenge.
2440 def test_deliver_challenge_auth_failure(self):
2441 class _FakeConnection(object):
2442 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002443 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002444 def send_bytes(self, data):
2445 pass
2446 self.assertRaises(multiprocessing.AuthenticationError,
2447 multiprocessing.connection.deliver_challenge,
2448 _FakeConnection(), b'abc')
2449
2450 def test_answer_challenge_auth_failure(self):
2451 class _FakeConnection(object):
2452 def __init__(self):
2453 self.count = 0
2454 def recv_bytes(self, size):
2455 self.count += 1
2456 if self.count == 1:
2457 return multiprocessing.connection.CHALLENGE
2458 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002459 return b'something bogus'
2460 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002461 def send_bytes(self, data):
2462 pass
2463 self.assertRaises(multiprocessing.AuthenticationError,
2464 multiprocessing.connection.answer_challenge,
2465 _FakeConnection(), b'abc')
2466
Jesse Noller7152f6d2009-04-02 05:17:26 +00002467#
2468# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2469#
2470
2471def initializer(ns):
2472 ns.test += 1
2473
2474class TestInitializers(unittest.TestCase):
2475 def setUp(self):
2476 self.mgr = multiprocessing.Manager()
2477 self.ns = self.mgr.Namespace()
2478 self.ns.test = 0
2479
2480 def tearDown(self):
2481 self.mgr.shutdown()
2482
2483 def test_manager_initializer(self):
2484 m = multiprocessing.managers.SyncManager()
2485 self.assertRaises(TypeError, m.start, 1)
2486 m.start(initializer, (self.ns,))
2487 self.assertEqual(self.ns.test, 1)
2488 m.shutdown()
2489
2490 def test_pool_initializer(self):
2491 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2492 p = multiprocessing.Pool(1, initializer, (self.ns,))
2493 p.close()
2494 p.join()
2495 self.assertEqual(self.ns.test, 1)
2496
Jesse Noller1b90efb2009-06-30 17:11:52 +00002497#
2498# Issue 5155, 5313, 5331: Test process in processes
2499# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2500#
2501
Richard Oudkerkc5496072013-09-29 17:10:40 +01002502def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002503 try:
2504 item = q.get(block=False)
2505 except Queue.Empty:
2506 pass
2507
Richard Oudkerkc5496072013-09-29 17:10:40 +01002508def _test_process(q):
2509 queue = multiprocessing.Queue()
2510 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2511 subProc.daemon = True
2512 subProc.start()
2513 subProc.join()
2514
Jesse Noller1b90efb2009-06-30 17:11:52 +00002515def _afunc(x):
2516 return x*x
2517
2518def pool_in_process():
2519 pool = multiprocessing.Pool(processes=4)
2520 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2521
2522class _file_like(object):
2523 def __init__(self, delegate):
2524 self._delegate = delegate
2525 self._pid = None
2526
2527 @property
2528 def cache(self):
2529 pid = os.getpid()
2530 # There are no race conditions since fork keeps only the running thread
2531 if pid != self._pid:
2532 self._pid = pid
2533 self._cache = []
2534 return self._cache
2535
2536 def write(self, data):
2537 self.cache.append(data)
2538
2539 def flush(self):
2540 self._delegate.write(''.join(self.cache))
2541 self._cache = []
2542
2543class TestStdinBadfiledescriptor(unittest.TestCase):
2544
2545 def test_queue_in_process(self):
2546 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002547 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002548 proc.start()
2549 proc.join()
2550
2551 def test_pool_in_process(self):
2552 p = multiprocessing.Process(target=pool_in_process)
2553 p.start()
2554 p.join()
2555
2556 def test_flushing(self):
2557 sio = StringIO()
2558 flike = _file_like(sio)
2559 flike.write('foo')
2560 proc = multiprocessing.Process(target=lambda: flike.flush())
2561 flike.flush()
2562 assert sio.getvalue() == 'foo'
2563
Richard Oudkerke4b99382012-07-27 14:05:46 +01002564#
2565# Test interaction with socket timeouts - see Issue #6056
2566#
2567
2568class TestTimeouts(unittest.TestCase):
2569 @classmethod
2570 def _test_timeout(cls, child, address):
2571 time.sleep(1)
2572 child.send(123)
2573 child.close()
2574 conn = multiprocessing.connection.Client(address)
2575 conn.send(456)
2576 conn.close()
2577
2578 def test_timeout(self):
2579 old_timeout = socket.getdefaulttimeout()
2580 try:
2581 socket.setdefaulttimeout(0.1)
2582 parent, child = multiprocessing.Pipe(duplex=True)
2583 l = multiprocessing.connection.Listener(family='AF_INET')
2584 p = multiprocessing.Process(target=self._test_timeout,
2585 args=(child, l.address))
2586 p.start()
2587 child.close()
2588 self.assertEqual(parent.recv(), 123)
2589 parent.close()
2590 conn = l.accept()
2591 self.assertEqual(conn.recv(), 456)
2592 conn.close()
2593 l.close()
2594 p.join(10)
2595 finally:
2596 socket.setdefaulttimeout(old_timeout)
2597
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002598#
2599# Test what happens with no "if __name__ == '__main__'"
2600#
2601
2602class TestNoForkBomb(unittest.TestCase):
2603 def test_noforkbomb(self):
2604 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2605 if WIN32:
2606 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002607 self.assertEqual(out, '')
2608 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002609 else:
2610 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002611 self.assertEqual(out.rstrip(), '123')
2612 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002613
2614#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002615# Issue 12098: check sys.flags of child matches that for parent
2616#
2617
2618class TestFlags(unittest.TestCase):
2619 @classmethod
2620 def run_in_grandchild(cls, conn):
2621 conn.send(tuple(sys.flags))
2622
2623 @classmethod
2624 def run_in_child(cls):
2625 import json
2626 r, w = multiprocessing.Pipe(duplex=False)
2627 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2628 p.start()
2629 grandchild_flags = r.recv()
2630 p.join()
2631 r.close()
2632 w.close()
2633 flags = (tuple(sys.flags), grandchild_flags)
2634 print(json.dumps(flags))
2635
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002636 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002637 def test_flags(self):
2638 import json, subprocess
2639 # start child process using unusual flags
2640 prog = ('from test.test_multiprocessing import TestFlags; ' +
2641 'TestFlags.run_in_child()')
2642 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002643 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002644 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2645 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002646
2647#
2648# Issue #17555: ForkAwareThreadLock
2649#
2650
2651class TestForkAwareThreadLock(unittest.TestCase):
2652 # We recurisvely start processes. Issue #17555 meant that the
2653 # after fork registry would get duplicate entries for the same
2654 # lock. The size of the registry at generation n was ~2**n.
2655
2656 @classmethod
2657 def child(cls, n, conn):
2658 if n > 1:
2659 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2660 p.start()
2661 p.join()
2662 else:
2663 conn.send(len(util._afterfork_registry))
2664 conn.close()
2665
2666 def test_lock(self):
2667 r, w = multiprocessing.Pipe(False)
2668 l = util.ForkAwareThreadLock()
2669 old_size = len(util._afterfork_registry)
2670 p = multiprocessing.Process(target=self.child, args=(5, w))
2671 p.start()
2672 new_size = r.recv()
2673 p.join()
2674 self.assertLessEqual(new_size, old_size)
2675
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002676#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002677# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2678#
2679
2680class TestIgnoreEINTR(unittest.TestCase):
2681
2682 @classmethod
2683 def _test_ignore(cls, conn):
2684 def handler(signum, frame):
2685 pass
2686 signal.signal(signal.SIGUSR1, handler)
2687 conn.send('ready')
2688 x = conn.recv()
2689 conn.send(x)
2690 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2691
2692 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2693 def test_ignore(self):
2694 conn, child_conn = multiprocessing.Pipe()
2695 try:
2696 p = multiprocessing.Process(target=self._test_ignore,
2697 args=(child_conn,))
2698 p.daemon = True
2699 p.start()
2700 child_conn.close()
2701 self.assertEqual(conn.recv(), 'ready')
2702 time.sleep(0.1)
2703 os.kill(p.pid, signal.SIGUSR1)
2704 time.sleep(0.1)
2705 conn.send(1234)
2706 self.assertEqual(conn.recv(), 1234)
2707 time.sleep(0.1)
2708 os.kill(p.pid, signal.SIGUSR1)
2709 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2710 time.sleep(0.1)
2711 p.join()
2712 finally:
2713 conn.close()
2714
2715 @classmethod
2716 def _test_ignore_listener(cls, conn):
2717 def handler(signum, frame):
2718 pass
2719 signal.signal(signal.SIGUSR1, handler)
2720 l = multiprocessing.connection.Listener()
2721 conn.send(l.address)
2722 a = l.accept()
2723 a.send('welcome')
2724
2725 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2726 def test_ignore_listener(self):
2727 conn, child_conn = multiprocessing.Pipe()
2728 try:
2729 p = multiprocessing.Process(target=self._test_ignore_listener,
2730 args=(child_conn,))
2731 p.daemon = True
2732 p.start()
2733 child_conn.close()
2734 address = conn.recv()
2735 time.sleep(0.1)
2736 os.kill(p.pid, signal.SIGUSR1)
2737 time.sleep(0.1)
2738 client = multiprocessing.connection.Client(address)
2739 self.assertEqual(client.recv(), 'welcome')
2740 p.join()
2741 finally:
2742 conn.close()
2743
2744#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002745#
2746#
2747
Jesse Noller1b90efb2009-06-30 17:11:52 +00002748testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002749 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002750 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002751
Benjamin Petersondfd79492008-06-13 19:13:39 +00002752#
2753#
2754#
2755
2756def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002757 if sys.platform.startswith("linux"):
2758 try:
2759 lock = multiprocessing.RLock()
2760 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002761 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002762
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002763 check_enough_semaphores()
2764
Benjamin Petersondfd79492008-06-13 19:13:39 +00002765 if run is None:
2766 from test.test_support import run_unittest as run
2767
2768 util.get_temp_dir() # creates temp directory for use by all processes
2769
2770 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2771
Jesse Noller146b7ab2008-07-02 16:44:09 +00002772 ProcessesMixin.pool = multiprocessing.Pool(4)
2773 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2774 ManagerMixin.manager.__init__()
2775 ManagerMixin.manager.start()
2776 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002777
2778 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002779 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2780 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002781 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2782 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002783 )
2784
2785 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2786 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002787 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2788 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002789 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002790 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002791 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002792 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2793 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2794 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002795 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002796
Jesse Noller146b7ab2008-07-02 16:44:09 +00002797 ThreadsMixin.pool.terminate()
2798 ProcessesMixin.pool.terminate()
2799 ManagerMixin.pool.terminate()
2800 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002801
Jesse Noller146b7ab2008-07-02 16:44:09 +00002802 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002803
2804def main():
2805 test_main(unittest.TextTestRunner(verbosity=2).run)
2806
2807if __name__ == '__main__':
2808 main()