blob: 9fcd3bdad4189b69396bb67a00620da8df8ff2f4 [file] [log] [blame]
Benjamin Petersondfd79492008-06-13 19:13:39 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersondfd79492008-06-13 19:13:39 +00006import Queue
7import time
8import sys
9import os
10import gc
11import signal
12import array
Benjamin Petersondfd79492008-06-13 19:13:39 +000013import socket
14import random
15import logging
Antoine Pitroua1a8da82011-08-23 19:54:20 +020016import errno
Antoine Pitrou5084ff72017-03-24 16:03:46 +010017import weakref
Richard Oudkerkfaee75c2012-08-14 11:41:19 +010018import test.script_helper
Mark Dickinsonc4920e82009-11-20 19:30:22 +000019from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000020from StringIO import StringIO
R. David Murray3db8a342009-03-30 23:05:48 +000021_multiprocessing = test_support.import_module('_multiprocessing')
Ezio Melottic2077b02011-03-16 12:34:31 +020022# import threading after _multiprocessing to raise a more relevant error
Victor Stinner613b4cf2010-04-27 21:56:26 +000023# message: "No module named _multiprocessing". _multiprocessing is not compiled
24# without thread support.
25import threading
R. David Murray3db8a342009-03-30 23:05:48 +000026
Jesse Noller37040cd2008-09-30 00:15:45 +000027# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000028test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000029
Benjamin Petersondfd79492008-06-13 19:13:39 +000030import multiprocessing.dummy
31import multiprocessing.connection
32import multiprocessing.managers
33import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000034import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Charles-François Natalif8413b22011-09-21 18:44:49 +020036from multiprocessing import util
37
38try:
39 from multiprocessing import reduction
40 HAS_REDUCTION = True
41except ImportError:
42 HAS_REDUCTION = False
Benjamin Petersondfd79492008-06-13 19:13:39 +000043
Brian Curtina06e9b82010-10-07 02:27:41 +000044try:
45 from multiprocessing.sharedctypes import Value, copy
46 HAS_SHAREDCTYPES = True
47except ImportError:
48 HAS_SHAREDCTYPES = False
49
Antoine Pitroua1a8da82011-08-23 19:54:20 +020050try:
51 import msvcrt
52except ImportError:
53 msvcrt = None
54
Benjamin Petersondfd79492008-06-13 19:13:39 +000055#
56#
57#
58
Benjamin Petersone79edf52008-07-13 18:34:58 +000059latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000060
Benjamin Petersondfd79492008-06-13 19:13:39 +000061#
62# Constants
63#
64
65LOG_LEVEL = util.SUBWARNING
Jesse Noller654ade32010-01-27 03:05:57 +000066#LOG_LEVEL = logging.DEBUG
Benjamin Petersondfd79492008-06-13 19:13:39 +000067
68DELTA = 0.1
69CHECK_TIMINGS = False # making true makes tests take a lot longer
70 # and can sometimes cause some non-serious
71 # failures because some calls block a bit
72 # longer than expected
73if CHECK_TIMINGS:
74 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
75else:
76 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
77
78HAVE_GETVALUE = not getattr(_multiprocessing,
79 'HAVE_BROKEN_SEM_GETVALUE', False)
80
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000081WIN32 = (sys.platform == "win32")
82
Antoine Pitroua1a8da82011-08-23 19:54:20 +020083try:
84 MAXFD = os.sysconf("SC_OPEN_MAX")
85except:
86 MAXFD = 256
87
Benjamin Petersondfd79492008-06-13 19:13:39 +000088#
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000089# Some tests require ctypes
90#
91
92try:
Nick Coghlan13623662010-04-10 14:24:36 +000093 from ctypes import Structure, c_int, c_double
Florent Xicluna36b9fbb2010-03-24 19:33:25 +000094except ImportError:
95 Structure = object
96 c_int = c_double = None
97
Charles-François Natali6392d7f2011-11-22 18:35:18 +010098
99def check_enough_semaphores():
100 """Check that the system supports enough semaphores to run the test."""
101 # minimum number of semaphores available according to POSIX
102 nsems_min = 256
103 try:
104 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
105 except (AttributeError, ValueError):
106 # sysconf not available or setting not available
107 return
108 if nsems == -1 or nsems >= nsems_min:
109 return
110 raise unittest.SkipTest("The OS doesn't support enough semaphores "
111 "to run the test (required: %d)." % nsems_min)
112
113
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000114#
Benjamin Petersondfd79492008-06-13 19:13:39 +0000115# Creates a wrapper for a function which records the time it takes to finish
116#
117
118class TimingWrapper(object):
119
120 def __init__(self, func):
121 self.func = func
122 self.elapsed = None
123
124 def __call__(self, *args, **kwds):
125 t = time.time()
126 try:
127 return self.func(*args, **kwds)
128 finally:
129 self.elapsed = time.time() - t
130
131#
132# Base class for test cases
133#
134
135class BaseTestCase(object):
136
137 ALLOWED_TYPES = ('processes', 'manager', 'threads')
138
139 def assertTimingAlmostEqual(self, a, b):
140 if CHECK_TIMINGS:
141 self.assertAlmostEqual(a, b, 1)
142
143 def assertReturnsIfImplemented(self, value, func, *args):
144 try:
145 res = func(*args)
146 except NotImplementedError:
147 pass
148 else:
149 return self.assertEqual(value, res)
150
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000151 # For the sanity of Windows users, rather than crashing or freezing in
152 # multiple ways.
153 def __reduce__(self, *args):
154 raise NotImplementedError("shouldn't try to pickle a test case")
155
156 __reduce_ex__ = __reduce__
157
Benjamin Petersondfd79492008-06-13 19:13:39 +0000158#
159# Return the value of a semaphore
160#
161
162def get_value(self):
163 try:
164 return self.get_value()
165 except AttributeError:
166 try:
167 return self._Semaphore__value
168 except AttributeError:
169 try:
170 return self._value
171 except AttributeError:
172 raise NotImplementedError
173
174#
175# Testcases
176#
177
178class _TestProcess(BaseTestCase):
179
180 ALLOWED_TYPES = ('processes', 'threads')
181
182 def test_current(self):
183 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600184 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000185
186 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000187 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000188
189 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000190 self.assertTrue(not current.daemon)
Ezio Melottib0f5adc2010-01-24 16:58:36 +0000191 self.assertIsInstance(authkey, bytes)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000192 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000193 self.assertEqual(current.ident, os.getpid())
194 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000195
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000196 @classmethod
197 def _test(cls, q, *args, **kwds):
198 current = cls.current_process()
Benjamin Petersondfd79492008-06-13 19:13:39 +0000199 q.put(args)
200 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 q.put(current.name)
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000202 if cls.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000203 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000204 q.put(current.pid)
205
206 def test_process(self):
207 q = self.Queue(1)
208 e = self.Event()
209 args = (q, 1, 2)
210 kwargs = {'hello':23, 'bye':2.54}
211 name = 'SomeProcess'
212 p = self.Process(
213 target=self._test, args=args, kwargs=kwargs, name=name
214 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000215 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216 current = self.current_process()
217
218 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000219 self.assertEqual(p.authkey, current.authkey)
220 self.assertEqual(p.is_alive(), False)
221 self.assertEqual(p.daemon, True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000222 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000223 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000224 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000225
226 p.start()
227
Ezio Melotti2623a372010-11-21 13:34:58 +0000228 self.assertEqual(p.exitcode, None)
229 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000230 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000231
Ezio Melotti2623a372010-11-21 13:34:58 +0000232 self.assertEqual(q.get(), args[1:])
233 self.assertEqual(q.get(), kwargs)
234 self.assertEqual(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000235 if self.TYPE != 'threads':
Ezio Melotti2623a372010-11-21 13:34:58 +0000236 self.assertEqual(q.get(), current.authkey)
237 self.assertEqual(q.get(), p.pid)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000238
239 p.join()
240
Ezio Melotti2623a372010-11-21 13:34:58 +0000241 self.assertEqual(p.exitcode, 0)
242 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000243 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000244
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000245 @classmethod
246 def _test_terminate(cls):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000247 time.sleep(1000)
248
249 def test_terminate(self):
250 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600251 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000252
253 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000254 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000255 p.start()
256
257 self.assertEqual(p.is_alive(), True)
Ezio Melottiaa980582010-01-23 23:04:36 +0000258 self.assertIn(p, self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000259 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000260
261 p.terminate()
262
263 join = TimingWrapper(p.join)
264 self.assertEqual(join(), None)
265 self.assertTimingAlmostEqual(join.elapsed, 0.0)
266
267 self.assertEqual(p.is_alive(), False)
Ezio Melottiaa980582010-01-23 23:04:36 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000269
270 p.join()
271
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000272 # XXX sometimes get p.exitcode == 0 on Windows ...
273 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000274
275 def test_cpu_count(self):
276 try:
277 cpus = multiprocessing.cpu_count()
278 except NotImplementedError:
279 cpus = 1
280 self.assertTrue(type(cpus) is int)
281 self.assertTrue(cpus >= 1)
282
283 def test_active_children(self):
284 self.assertEqual(type(self.active_children()), list)
285
286 p = self.Process(target=time.sleep, args=(DELTA,))
Ezio Melottiaa980582010-01-23 23:04:36 +0000287 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000288
Jesus Cea6f6016b2011-09-09 20:26:57 +0200289 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000290 p.start()
Ezio Melottiaa980582010-01-23 23:04:36 +0000291 self.assertIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000292
293 p.join()
Ezio Melottiaa980582010-01-23 23:04:36 +0000294 self.assertNotIn(p, self.active_children())
Benjamin Petersondfd79492008-06-13 19:13:39 +0000295
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000296 @classmethod
297 def _test_recursion(cls, wconn, id):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000298 from multiprocessing import forking
299 wconn.send(id)
300 if len(id) < 2:
301 for i in range(2):
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000302 p = cls.Process(
303 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersondfd79492008-06-13 19:13:39 +0000304 )
305 p.start()
306 p.join()
307
308 def test_recursion(self):
309 rconn, wconn = self.Pipe(duplex=False)
310 self._test_recursion(wconn, [])
311
312 time.sleep(DELTA)
313 result = []
314 while rconn.poll():
315 result.append(rconn.recv())
316
317 expected = [
318 [],
319 [0],
320 [0, 0],
321 [0, 1],
322 [1],
323 [1, 0],
324 [1, 1]
325 ]
326 self.assertEqual(result, expected)
327
Richard Oudkerk2182e052012-06-06 19:01:14 +0100328 @classmethod
329 def _test_sys_exit(cls, reason, testfn):
330 sys.stderr = open(testfn, 'w')
331 sys.exit(reason)
332
333 def test_sys_exit(self):
334 # See Issue 13854
335 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -0600336 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk2182e052012-06-06 19:01:14 +0100337
338 testfn = test_support.TESTFN
339 self.addCleanup(test_support.unlink, testfn)
340
Richard Oudkerk3f8376e2013-11-17 17:24:11 +0000341 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk2182e052012-06-06 19:01:14 +0100342 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
343 p.daemon = True
344 p.start()
345 p.join(5)
346 self.assertEqual(p.exitcode, code)
347
348 with open(testfn, 'r') as f:
349 self.assertEqual(f.read().rstrip(), str(reason))
350
351 for reason in (True, False, 8):
352 p = self.Process(target=sys.exit, args=(reason,))
353 p.daemon = True
354 p.start()
355 p.join(5)
356 self.assertEqual(p.exitcode, reason)
357
Benjamin Petersondfd79492008-06-13 19:13:39 +0000358#
359#
360#
361
362class _UpperCaser(multiprocessing.Process):
363
364 def __init__(self):
365 multiprocessing.Process.__init__(self)
366 self.child_conn, self.parent_conn = multiprocessing.Pipe()
367
368 def run(self):
369 self.parent_conn.close()
370 for s in iter(self.child_conn.recv, None):
371 self.child_conn.send(s.upper())
372 self.child_conn.close()
373
374 def submit(self, s):
375 assert type(s) is str
376 self.parent_conn.send(s)
377 return self.parent_conn.recv()
378
379 def stop(self):
380 self.parent_conn.send(None)
381 self.parent_conn.close()
382 self.child_conn.close()
383
384class _TestSubclassingProcess(BaseTestCase):
385
386 ALLOWED_TYPES = ('processes',)
387
388 def test_subclassing(self):
389 uppercaser = _UpperCaser()
Jesus Cea6f6016b2011-09-09 20:26:57 +0200390 uppercaser.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000391 uppercaser.start()
392 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
393 self.assertEqual(uppercaser.submit('world'), 'WORLD')
394 uppercaser.stop()
395 uppercaser.join()
396
397#
398#
399#
400
401def queue_empty(q):
402 if hasattr(q, 'empty'):
403 return q.empty()
404 else:
405 return q.qsize() == 0
406
407def queue_full(q, maxsize):
408 if hasattr(q, 'full'):
409 return q.full()
410 else:
411 return q.qsize() == maxsize
412
413
414class _TestQueue(BaseTestCase):
415
416
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000417 @classmethod
418 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000419 child_can_start.wait()
420 for i in range(6):
421 queue.get()
422 parent_can_continue.set()
423
424 def test_put(self):
425 MAXSIZE = 6
426 queue = self.Queue(maxsize=MAXSIZE)
427 child_can_start = self.Event()
428 parent_can_continue = self.Event()
429
430 proc = self.Process(
431 target=self._test_put,
432 args=(queue, child_can_start, parent_can_continue)
433 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000434 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000435 proc.start()
436
437 self.assertEqual(queue_empty(queue), True)
438 self.assertEqual(queue_full(queue, MAXSIZE), False)
439
440 queue.put(1)
441 queue.put(2, True)
442 queue.put(3, True, None)
443 queue.put(4, False)
444 queue.put(5, False, None)
445 queue.put_nowait(6)
446
447 # the values may be in buffer but not yet in pipe so sleep a bit
448 time.sleep(DELTA)
449
450 self.assertEqual(queue_empty(queue), False)
451 self.assertEqual(queue_full(queue, MAXSIZE), True)
452
453 put = TimingWrapper(queue.put)
454 put_nowait = TimingWrapper(queue.put_nowait)
455
456 self.assertRaises(Queue.Full, put, 7, False)
457 self.assertTimingAlmostEqual(put.elapsed, 0)
458
459 self.assertRaises(Queue.Full, put, 7, False, None)
460 self.assertTimingAlmostEqual(put.elapsed, 0)
461
462 self.assertRaises(Queue.Full, put_nowait, 7)
463 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
464
465 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
466 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
467
468 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
469 self.assertTimingAlmostEqual(put.elapsed, 0)
470
471 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
472 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
473
474 child_can_start.set()
475 parent_can_continue.wait()
476
477 self.assertEqual(queue_empty(queue), True)
478 self.assertEqual(queue_full(queue, MAXSIZE), False)
479
480 proc.join()
481
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000482 @classmethod
483 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000484 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000485 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000486 queue.put(2)
487 queue.put(3)
488 queue.put(4)
489 queue.put(5)
490 parent_can_continue.set()
491
492 def test_get(self):
493 queue = self.Queue()
494 child_can_start = self.Event()
495 parent_can_continue = self.Event()
496
497 proc = self.Process(
498 target=self._test_get,
499 args=(queue, child_can_start, parent_can_continue)
500 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000501 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000502 proc.start()
503
504 self.assertEqual(queue_empty(queue), True)
505
506 child_can_start.set()
507 parent_can_continue.wait()
508
509 time.sleep(DELTA)
510 self.assertEqual(queue_empty(queue), False)
511
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000512 # Hangs unexpectedly, remove for now
513 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000514 self.assertEqual(queue.get(True, None), 2)
515 self.assertEqual(queue.get(True), 3)
516 self.assertEqual(queue.get(timeout=1), 4)
517 self.assertEqual(queue.get_nowait(), 5)
518
519 self.assertEqual(queue_empty(queue), True)
520
521 get = TimingWrapper(queue.get)
522 get_nowait = TimingWrapper(queue.get_nowait)
523
524 self.assertRaises(Queue.Empty, get, False)
525 self.assertTimingAlmostEqual(get.elapsed, 0)
526
527 self.assertRaises(Queue.Empty, get, False, None)
528 self.assertTimingAlmostEqual(get.elapsed, 0)
529
530 self.assertRaises(Queue.Empty, get_nowait)
531 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
532
533 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
534 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
535
536 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
537 self.assertTimingAlmostEqual(get.elapsed, 0)
538
539 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
540 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
541
542 proc.join()
543
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000544 @classmethod
545 def _test_fork(cls, queue):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000546 for i in range(10, 20):
547 queue.put(i)
548 # note that at this point the items may only be buffered, so the
549 # process cannot shutdown until the feeder thread has finished
550 # pushing items onto the pipe.
551
552 def test_fork(self):
553 # Old versions of Queue would fail to create a new feeder
554 # thread for a forked process if the original process had its
555 # own feeder thread. This test checks that this no longer
556 # happens.
557
558 queue = self.Queue()
559
560 # put items on queue so that main process starts a feeder thread
561 for i in range(10):
562 queue.put(i)
563
564 # wait to make sure thread starts before we fork a new process
565 time.sleep(DELTA)
566
567 # fork process
568 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200569 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000570 p.start()
571
572 # check that all expected items are in the queue
573 for i in range(20):
574 self.assertEqual(queue.get(), i)
575 self.assertRaises(Queue.Empty, queue.get, False)
576
577 p.join()
578
579 def test_qsize(self):
580 q = self.Queue()
581 try:
582 self.assertEqual(q.qsize(), 0)
583 except NotImplementedError:
Zachary Ware1f702212013-12-10 14:09:20 -0600584 self.skipTest('qsize method not implemented')
Benjamin Petersondfd79492008-06-13 19:13:39 +0000585 q.put(1)
586 self.assertEqual(q.qsize(), 1)
587 q.put(5)
588 self.assertEqual(q.qsize(), 2)
589 q.get()
590 self.assertEqual(q.qsize(), 1)
591 q.get()
592 self.assertEqual(q.qsize(), 0)
593
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000594 @classmethod
595 def _test_task_done(cls, q):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000596 for obj in iter(q.get, None):
597 time.sleep(DELTA)
598 q.task_done()
599
600 def test_task_done(self):
601 queue = self.JoinableQueue()
602
603 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000604 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersondfd79492008-06-13 19:13:39 +0000605
606 workers = [self.Process(target=self._test_task_done, args=(queue,))
607 for i in xrange(4)]
608
609 for p in workers:
Jesus Cea6f6016b2011-09-09 20:26:57 +0200610 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000611 p.start()
612
613 for i in xrange(10):
614 queue.put(i)
615
616 queue.join()
617
618 for p in workers:
619 queue.put(None)
620
621 for p in workers:
622 p.join()
623
Serhiy Storchaka233e6982015-03-06 22:17:25 +0200624 def test_no_import_lock_contention(self):
625 with test_support.temp_cwd():
626 module_name = 'imported_by_an_imported_module'
627 with open(module_name + '.py', 'w') as f:
628 f.write("""if 1:
629 import multiprocessing
630
631 q = multiprocessing.Queue()
632 q.put('knock knock')
633 q.get(timeout=3)
634 q.close()
635 """)
636
637 with test_support.DirsOnSysPath(os.getcwd()):
638 try:
639 __import__(module_name)
640 except Queue.Empty:
641 self.fail("Probable regression on import lock contention;"
642 " see Issue #22853")
643
Antoine Pitroubdd96472017-05-25 17:53:04 +0200644 def test_queue_feeder_donot_stop_onexc(self):
645 # bpo-30414: verify feeder handles exceptions correctly
646 if self.TYPE != 'processes':
647 self.skipTest('test not appropriate for {}'.format(self.TYPE))
648
649 class NotSerializable(object):
650 def __reduce__(self):
651 raise AttributeError
652 with test.support.captured_stderr():
653 q = self.Queue()
654 q.put(NotSerializable())
655 q.put(True)
656 self.assertTrue(q.get(timeout=0.1))
657
658
Benjamin Petersondfd79492008-06-13 19:13:39 +0000659#
660#
661#
662
663class _TestLock(BaseTestCase):
664
665 def test_lock(self):
666 lock = self.Lock()
667 self.assertEqual(lock.acquire(), True)
668 self.assertEqual(lock.acquire(False), False)
669 self.assertEqual(lock.release(), None)
670 self.assertRaises((ValueError, threading.ThreadError), lock.release)
671
672 def test_rlock(self):
673 lock = self.RLock()
674 self.assertEqual(lock.acquire(), True)
675 self.assertEqual(lock.acquire(), True)
676 self.assertEqual(lock.acquire(), True)
677 self.assertEqual(lock.release(), None)
678 self.assertEqual(lock.release(), None)
679 self.assertEqual(lock.release(), None)
680 self.assertRaises((AssertionError, RuntimeError), lock.release)
681
Jesse Noller82eb5902009-03-30 23:29:31 +0000682 def test_lock_context(self):
683 with self.Lock():
684 pass
685
Benjamin Petersondfd79492008-06-13 19:13:39 +0000686
687class _TestSemaphore(BaseTestCase):
688
689 def _test_semaphore(self, sem):
690 self.assertReturnsIfImplemented(2, get_value, sem)
691 self.assertEqual(sem.acquire(), True)
692 self.assertReturnsIfImplemented(1, get_value, sem)
693 self.assertEqual(sem.acquire(), True)
694 self.assertReturnsIfImplemented(0, get_value, sem)
695 self.assertEqual(sem.acquire(False), False)
696 self.assertReturnsIfImplemented(0, get_value, sem)
697 self.assertEqual(sem.release(), None)
698 self.assertReturnsIfImplemented(1, get_value, sem)
699 self.assertEqual(sem.release(), None)
700 self.assertReturnsIfImplemented(2, get_value, sem)
701
702 def test_semaphore(self):
703 sem = self.Semaphore(2)
704 self._test_semaphore(sem)
705 self.assertEqual(sem.release(), None)
706 self.assertReturnsIfImplemented(3, get_value, sem)
707 self.assertEqual(sem.release(), None)
708 self.assertReturnsIfImplemented(4, get_value, sem)
709
710 def test_bounded_semaphore(self):
711 sem = self.BoundedSemaphore(2)
712 self._test_semaphore(sem)
713 # Currently fails on OS/X
714 #if HAVE_GETVALUE:
715 # self.assertRaises(ValueError, sem.release)
716 # self.assertReturnsIfImplemented(2, get_value, sem)
717
718 def test_timeout(self):
719 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -0600720 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000721
722 sem = self.Semaphore(0)
723 acquire = TimingWrapper(sem.acquire)
724
725 self.assertEqual(acquire(False), False)
726 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
727
728 self.assertEqual(acquire(False, None), False)
729 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
730
731 self.assertEqual(acquire(False, TIMEOUT1), False)
732 self.assertTimingAlmostEqual(acquire.elapsed, 0)
733
734 self.assertEqual(acquire(True, TIMEOUT2), False)
735 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
736
737 self.assertEqual(acquire(timeout=TIMEOUT3), False)
738 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
739
740
741class _TestCondition(BaseTestCase):
742
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000743 @classmethod
744 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000745 cond.acquire()
746 sleeping.release()
747 cond.wait(timeout)
748 woken.release()
749 cond.release()
750
751 def check_invariant(self, cond):
752 # this is only supposed to succeed when there are no sleepers
753 if self.TYPE == 'processes':
754 try:
755 sleepers = (cond._sleeping_count.get_value() -
756 cond._woken_count.get_value())
757 self.assertEqual(sleepers, 0)
758 self.assertEqual(cond._wait_semaphore.get_value(), 0)
759 except NotImplementedError:
760 pass
761
762 def test_notify(self):
763 cond = self.Condition()
764 sleeping = self.Semaphore(0)
765 woken = self.Semaphore(0)
766
767 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000768 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000769 p.start()
770
771 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000772 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000773 p.start()
774
775 # wait for both children to start sleeping
776 sleeping.acquire()
777 sleeping.acquire()
778
779 # check no process/thread has woken up
780 time.sleep(DELTA)
781 self.assertReturnsIfImplemented(0, get_value, woken)
782
783 # wake up one process/thread
784 cond.acquire()
785 cond.notify()
786 cond.release()
787
788 # check one process/thread has woken up
789 time.sleep(DELTA)
790 self.assertReturnsIfImplemented(1, get_value, woken)
791
792 # wake up another
793 cond.acquire()
794 cond.notify()
795 cond.release()
796
797 # check other has woken up
798 time.sleep(DELTA)
799 self.assertReturnsIfImplemented(2, get_value, woken)
800
801 # check state is not mucked up
802 self.check_invariant(cond)
803 p.join()
804
805 def test_notify_all(self):
806 cond = self.Condition()
807 sleeping = self.Semaphore(0)
808 woken = self.Semaphore(0)
809
810 # start some threads/processes which will timeout
811 for i in range(3):
812 p = self.Process(target=self.f,
813 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000814 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000815 p.start()
816
817 t = threading.Thread(target=self.f,
818 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000819 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000820 t.start()
821
822 # wait for them all to sleep
823 for i in xrange(6):
824 sleeping.acquire()
825
826 # check they have all timed out
827 for i in xrange(6):
828 woken.acquire()
829 self.assertReturnsIfImplemented(0, get_value, woken)
830
831 # check state is not mucked up
832 self.check_invariant(cond)
833
834 # start some more threads/processes
835 for i in range(3):
836 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000837 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000838 p.start()
839
840 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000841 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000842 t.start()
843
844 # wait for them to all sleep
845 for i in xrange(6):
846 sleeping.acquire()
847
848 # check no process/thread has woken up
849 time.sleep(DELTA)
850 self.assertReturnsIfImplemented(0, get_value, woken)
851
852 # wake them all up
853 cond.acquire()
854 cond.notify_all()
855 cond.release()
856
857 # check they have all woken
Victor Stinner9d1983b2017-05-15 17:32:14 +0200858 for i in range(10):
859 try:
860 if get_value(woken) == 6:
861 break
862 except NotImplementedError:
863 break
864 time.sleep(DELTA)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000865 self.assertReturnsIfImplemented(6, get_value, woken)
866
867 # check state is not mucked up
868 self.check_invariant(cond)
869
870 def test_timeout(self):
871 cond = self.Condition()
872 wait = TimingWrapper(cond.wait)
873 cond.acquire()
874 res = wait(TIMEOUT1)
875 cond.release()
876 self.assertEqual(res, None)
877 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
878
879
880class _TestEvent(BaseTestCase):
881
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000882 @classmethod
883 def _test_event(cls, event):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000884 time.sleep(TIMEOUT2)
885 event.set()
886
887 def test_event(self):
888 event = self.Event()
889 wait = TimingWrapper(event.wait)
890
Ezio Melottic2077b02011-03-16 12:34:31 +0200891 # Removed temporarily, due to API shear, this does not
Benjamin Petersondfd79492008-06-13 19:13:39 +0000892 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000893 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000894
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000895 # Removed, threading.Event.wait() will return the value of the __flag
896 # instead of None. API Shear with the semaphore backed mp.Event
897 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000898 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000899 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000900 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
901
902 event.set()
903
904 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000905 self.assertEqual(event.is_set(), True)
906 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000907 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000908 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000909 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
910 # self.assertEqual(event.is_set(), True)
911
912 event.clear()
913
914 #self.assertEqual(event.is_set(), False)
915
Jesus Cea6f6016b2011-09-09 20:26:57 +0200916 p = self.Process(target=self._test_event, args=(event,))
917 p.daemon = True
918 p.start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000919 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000920
921#
922#
923#
924
925class _TestValue(BaseTestCase):
926
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000927 ALLOWED_TYPES = ('processes',)
928
Benjamin Petersondfd79492008-06-13 19:13:39 +0000929 codes_values = [
930 ('i', 4343, 24234),
931 ('d', 3.625, -4.25),
932 ('h', -232, 234),
933 ('c', latin('x'), latin('y'))
934 ]
935
Antoine Pitrou55d935a2010-11-22 16:35:57 +0000936 def setUp(self):
937 if not HAS_SHAREDCTYPES:
938 self.skipTest("requires multiprocessing.sharedctypes")
939
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000940 @classmethod
941 def _test(cls, values):
942 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000943 sv.value = cv[2]
944
945
946 def test_value(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000947 if raw:
948 values = [self.RawValue(code, value)
949 for code, value, _ in self.codes_values]
950 else:
951 values = [self.Value(code, value)
952 for code, value, _ in self.codes_values]
953
954 for sv, cv in zip(values, self.codes_values):
955 self.assertEqual(sv.value, cv[1])
956
957 proc = self.Process(target=self._test, args=(values,))
Jesus Cea6f6016b2011-09-09 20:26:57 +0200958 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000959 proc.start()
960 proc.join()
961
962 for sv, cv in zip(values, self.codes_values):
963 self.assertEqual(sv.value, cv[2])
964
965 def test_rawvalue(self):
966 self.test_value(raw=True)
967
968 def test_getobj_getlock(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +0000969 val1 = self.Value('i', 5)
970 lock1 = val1.get_lock()
971 obj1 = val1.get_obj()
972
973 val2 = self.Value('i', 5, lock=None)
974 lock2 = val2.get_lock()
975 obj2 = val2.get_obj()
976
977 lock = self.Lock()
978 val3 = self.Value('i', 5, lock=lock)
979 lock3 = val3.get_lock()
980 obj3 = val3.get_obj()
981 self.assertEqual(lock, lock3)
982
Jesse Noller6ab22152009-01-18 02:45:38 +0000983 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000984 self.assertFalse(hasattr(arr4, 'get_lock'))
985 self.assertFalse(hasattr(arr4, 'get_obj'))
986
Jesse Noller6ab22152009-01-18 02:45:38 +0000987 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
988
989 arr5 = self.RawValue('i', 5)
990 self.assertFalse(hasattr(arr5, 'get_lock'))
991 self.assertFalse(hasattr(arr5, 'get_obj'))
992
Benjamin Petersondfd79492008-06-13 19:13:39 +0000993
994class _TestArray(BaseTestCase):
995
Florent Xicluna36b9fbb2010-03-24 19:33:25 +0000996 ALLOWED_TYPES = ('processes',)
997
Antoine Pitrou4eb2b282010-11-02 23:51:30 +0000998 @classmethod
999 def f(cls, seq):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001000 for i in range(1, len(seq)):
1001 seq[i] += seq[i-1]
1002
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001003 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001004 def test_array(self, raw=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001005 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1006 if raw:
1007 arr = self.RawArray('i', seq)
1008 else:
1009 arr = self.Array('i', seq)
1010
1011 self.assertEqual(len(arr), len(seq))
1012 self.assertEqual(arr[3], seq[3])
1013 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1014
1015 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1016
1017 self.assertEqual(list(arr[:]), seq)
1018
1019 self.f(seq)
1020
1021 p = self.Process(target=self.f, args=(arr,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001022 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001023 p.start()
1024 p.join()
1025
1026 self.assertEqual(list(arr[:]), seq)
1027
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001028 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsond3cb2f62011-03-26 10:02:37 +00001029 def test_array_from_size(self):
1030 size = 10
1031 # Test for zeroing (see issue #11675).
1032 # The repetition below strengthens the test by increasing the chances
1033 # of previously allocated non-zero memory being used for the new array
1034 # on the 2nd and 3rd loops.
1035 for _ in range(3):
1036 arr = self.Array('i', size)
1037 self.assertEqual(len(arr), size)
1038 self.assertEqual(list(arr), [0] * size)
1039 arr[:] = range(10)
1040 self.assertEqual(list(arr), range(10))
1041 del arr
1042
1043 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001044 def test_rawarray(self):
1045 self.test_array(raw=True)
1046
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00001047 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinsonf9e9a6f2011-03-25 22:01:06 +00001048 def test_array_accepts_long(self):
1049 arr = self.Array('i', 10L)
1050 self.assertEqual(len(arr), 10)
1051 raw_arr = self.RawArray('i', 10L)
1052 self.assertEqual(len(raw_arr), 10)
1053
1054 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersondfd79492008-06-13 19:13:39 +00001055 def test_getobj_getlock_obj(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001056 arr1 = self.Array('i', range(10))
1057 lock1 = arr1.get_lock()
1058 obj1 = arr1.get_obj()
1059
1060 arr2 = self.Array('i', range(10), lock=None)
1061 lock2 = arr2.get_lock()
1062 obj2 = arr2.get_obj()
1063
1064 lock = self.Lock()
1065 arr3 = self.Array('i', range(10), lock=lock)
1066 lock3 = arr3.get_lock()
1067 obj3 = arr3.get_obj()
1068 self.assertEqual(lock, lock3)
1069
Jesse Noller6ab22152009-01-18 02:45:38 +00001070 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001071 self.assertFalse(hasattr(arr4, 'get_lock'))
1072 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +00001073 self.assertRaises(AttributeError,
1074 self.Array, 'i', range(10), lock='notalock')
1075
1076 arr5 = self.RawArray('i', range(10))
1077 self.assertFalse(hasattr(arr5, 'get_lock'))
1078 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001079
1080#
1081#
1082#
1083
1084class _TestContainers(BaseTestCase):
1085
1086 ALLOWED_TYPES = ('manager',)
1087
1088 def test_list(self):
1089 a = self.list(range(10))
1090 self.assertEqual(a[:], range(10))
1091
1092 b = self.list()
1093 self.assertEqual(b[:], [])
1094
1095 b.extend(range(5))
1096 self.assertEqual(b[:], range(5))
1097
1098 self.assertEqual(b[2], 2)
1099 self.assertEqual(b[2:10], [2,3,4])
1100
1101 b *= 2
1102 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1103
1104 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1105
1106 self.assertEqual(a[:], range(10))
1107
1108 d = [a, b]
1109 e = self.list(d)
1110 self.assertEqual(
1111 e[:],
1112 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1113 )
1114
1115 f = self.list([a])
1116 a.append('hello')
1117 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1118
1119 def test_dict(self):
1120 d = self.dict()
1121 indices = range(65, 70)
1122 for i in indices:
1123 d[i] = chr(i)
1124 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1125 self.assertEqual(sorted(d.keys()), indices)
1126 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1127 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1128
1129 def test_namespace(self):
1130 n = self.Namespace()
1131 n.name = 'Bob'
1132 n.job = 'Builder'
1133 n._hidden = 'hidden'
1134 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1135 del n.job
1136 self.assertEqual(str(n), "Namespace(name='Bob')")
1137 self.assertTrue(hasattr(n, 'name'))
1138 self.assertTrue(not hasattr(n, 'job'))
1139
1140#
1141#
1142#
1143
1144def sqr(x, wait=0.0):
1145 time.sleep(wait)
1146 return x*x
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001147
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001148def identity(x):
1149 return x
1150
1151class CountedObject(object):
1152 n_instances = 0
1153
1154 def __new__(cls):
1155 cls.n_instances += 1
1156 return object.__new__(cls)
1157
1158 def __del__(self):
1159 type(self).n_instances -= 1
1160
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001161class SayWhenError(ValueError): pass
1162
1163def exception_throwing_generator(total, when):
1164 for i in range(total):
1165 if i == when:
1166 raise SayWhenError("Somebody said when")
1167 yield i
1168
Benjamin Petersondfd79492008-06-13 19:13:39 +00001169class _TestPool(BaseTestCase):
1170
1171 def test_apply(self):
1172 papply = self.pool.apply
1173 self.assertEqual(papply(sqr, (5,)), sqr(5))
1174 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1175
1176 def test_map(self):
1177 pmap = self.pool.map
1178 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
1179 self.assertEqual(pmap(sqr, range(100), chunksize=20),
1180 map(sqr, range(100)))
1181
Richard Oudkerk21aad972013-10-28 23:02:22 +00001182 def test_map_unplicklable(self):
1183 # Issue #19425 -- failure to pickle should not cause a hang
1184 if self.TYPE == 'threads':
Zachary Ware1f702212013-12-10 14:09:20 -06001185 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk21aad972013-10-28 23:02:22 +00001186 class A(object):
1187 def __reduce__(self):
1188 raise RuntimeError('cannot pickle')
1189 with self.assertRaises(RuntimeError):
1190 self.pool.map(sqr, [A()]*10)
1191
Jesse Noller7530e472009-07-16 14:23:04 +00001192 def test_map_chunksize(self):
1193 try:
1194 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1195 except multiprocessing.TimeoutError:
1196 self.fail("pool.map_async with chunksize stalled on null list")
1197
Benjamin Petersondfd79492008-06-13 19:13:39 +00001198 def test_async(self):
1199 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1200 get = TimingWrapper(res.get)
1201 self.assertEqual(get(), 49)
1202 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1203
1204 def test_async_timeout(self):
Richard Oudkerk65162a72013-11-17 17:45:16 +00001205 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001206 get = TimingWrapper(res.get)
1207 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1208 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1209
1210 def test_imap(self):
1211 it = self.pool.imap(sqr, range(10))
1212 self.assertEqual(list(it), map(sqr, range(10)))
1213
1214 it = self.pool.imap(sqr, range(10))
1215 for i in range(10):
1216 self.assertEqual(it.next(), i*i)
1217 self.assertRaises(StopIteration, it.next)
1218
1219 it = self.pool.imap(sqr, range(1000), chunksize=100)
1220 for i in range(1000):
1221 self.assertEqual(it.next(), i*i)
1222 self.assertRaises(StopIteration, it.next)
1223
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001224 def test_imap_handle_iterable_exception(self):
1225 if self.TYPE == 'manager':
1226 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1227
1228 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1229 for i in range(3):
1230 self.assertEqual(next(it), i*i)
1231 self.assertRaises(SayWhenError, it.next)
1232
1233 # SayWhenError seen at start of problematic chunk's results
1234 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1235 for i in range(6):
1236 self.assertEqual(next(it), i*i)
1237 self.assertRaises(SayWhenError, it.next)
1238 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1239 for i in range(4):
1240 self.assertEqual(next(it), i*i)
1241 self.assertRaises(SayWhenError, it.next)
1242
Benjamin Petersondfd79492008-06-13 19:13:39 +00001243 def test_imap_unordered(self):
1244 it = self.pool.imap_unordered(sqr, range(1000))
1245 self.assertEqual(sorted(it), map(sqr, range(1000)))
1246
1247 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1248 self.assertEqual(sorted(it), map(sqr, range(1000)))
1249
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001250 def test_imap_unordered_handle_iterable_exception(self):
1251 if self.TYPE == 'manager':
1252 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1253
1254 it = self.pool.imap_unordered(sqr,
1255 exception_throwing_generator(10, 3),
1256 1)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001257 expected_values = map(sqr, range(10))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001258 with self.assertRaises(SayWhenError):
1259 # imap_unordered makes it difficult to anticipate the SayWhenError
1260 for i in range(10):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001261 value = next(it)
1262 self.assertIn(value, expected_values)
1263 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001264
1265 it = self.pool.imap_unordered(sqr,
1266 exception_throwing_generator(20, 7),
1267 2)
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001268 expected_values = map(sqr, range(20))
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001269 with self.assertRaises(SayWhenError):
1270 for i in range(20):
Serhiy Storchaka89c3b8e2015-04-23 11:35:43 +03001271 value = next(it)
1272 self.assertIn(value, expected_values)
1273 expected_values.remove(value)
Serhiy Storchaka7c26be52015-03-13 08:31:34 +02001274
Benjamin Petersondfd79492008-06-13 19:13:39 +00001275 def test_make_pool(self):
Victor Stinnerf64a0cf2011-06-20 17:54:33 +02001276 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1277 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1278
Benjamin Petersondfd79492008-06-13 19:13:39 +00001279 p = multiprocessing.Pool(3)
1280 self.assertEqual(3, len(p._pool))
1281 p.close()
1282 p.join()
1283
1284 def test_terminate(self):
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001285 p = self.Pool(4)
1286 result = p.map_async(
Benjamin Petersondfd79492008-06-13 19:13:39 +00001287 time.sleep, [0.1 for i in range(10000)], chunksize=1
1288 )
Richard Oudkerk6d24a6e2013-11-21 16:35:12 +00001289 p.terminate()
1290 join = TimingWrapper(p.join)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001291 join()
1292 self.assertTrue(join.elapsed < 0.2)
Jesse Noller654ade32010-01-27 03:05:57 +00001293
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01001294 def test_empty_iterable(self):
1295 # See Issue 12157
1296 p = self.Pool(1)
1297
1298 self.assertEqual(p.map(sqr, []), [])
1299 self.assertEqual(list(p.imap(sqr, [])), [])
1300 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1301 self.assertEqual(p.map_async(sqr, []).get(), [])
1302
1303 p.close()
1304 p.join()
1305
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001306 def test_release_task_refs(self):
1307 # Issue #29861: task arguments and results should not be kept
1308 # alive after we are done with them.
1309 objs = list(CountedObject() for i in range(10))
1310 refs = list(weakref.ref(o) for o in objs)
1311 self.pool.map(identity, objs)
1312
1313 del objs
Victor Stinnerfd6094c2017-05-05 09:47:11 +02001314 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou5084ff72017-03-24 16:03:46 +01001315 self.assertEqual(set(wr() for wr in refs), {None})
1316 # With a process pool, copies of the objects are returned, check
1317 # they were released too.
1318 self.assertEqual(CountedObject.n_instances, 0)
1319
1320
Richard Oudkerk0c200c22012-05-02 16:36:26 +01001321def unpickleable_result():
1322 return lambda: 42
1323
1324class _TestPoolWorkerErrors(BaseTestCase):
1325 ALLOWED_TYPES = ('processes', )
1326
1327 def test_unpickleable_result(self):
1328 from multiprocessing.pool import MaybeEncodingError
1329 p = multiprocessing.Pool(2)
1330
1331 # Make sure we don't lose pool processes because of encoding errors.
1332 for iteration in range(20):
1333 res = p.apply_async(unpickleable_result)
1334 self.assertRaises(MaybeEncodingError, res.get)
1335
1336 p.close()
1337 p.join()
1338
Jesse Noller654ade32010-01-27 03:05:57 +00001339class _TestPoolWorkerLifetime(BaseTestCase):
1340
1341 ALLOWED_TYPES = ('processes', )
1342 def test_pool_worker_lifetime(self):
1343 p = multiprocessing.Pool(3, maxtasksperchild=10)
1344 self.assertEqual(3, len(p._pool))
1345 origworkerpids = [w.pid for w in p._pool]
1346 # Run many tasks so each worker gets replaced (hopefully)
1347 results = []
1348 for i in range(100):
1349 results.append(p.apply_async(sqr, (i, )))
1350 # Fetch the results and verify we got the right answers,
1351 # also ensuring all the tasks have completed.
1352 for (j, res) in enumerate(results):
1353 self.assertEqual(res.get(), sqr(j))
1354 # Refill the pool
1355 p._repopulate_pool()
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001356 # Wait until all workers are alive
Antoine Pitrouc2b0d762011-04-06 22:54:14 +02001357 # (countdown * DELTA = 5 seconds max startup process time)
1358 countdown = 50
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001359 while countdown and not all(w.is_alive() for w in p._pool):
1360 countdown -= 1
1361 time.sleep(DELTA)
Jesse Noller654ade32010-01-27 03:05:57 +00001362 finalworkerpids = [w.pid for w in p._pool]
Florent Xicluna3bc5cb72010-03-04 15:58:54 +00001363 # All pids should be assigned. See issue #7805.
1364 self.assertNotIn(None, origworkerpids)
1365 self.assertNotIn(None, finalworkerpids)
1366 # Finally, check that the worker pids have changed
Jesse Noller654ade32010-01-27 03:05:57 +00001367 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1368 p.close()
1369 p.join()
1370
Charles-François Natali46f990e2011-10-24 18:43:51 +02001371 def test_pool_worker_lifetime_early_close(self):
1372 # Issue #10332: closing a pool whose workers have limited lifetimes
1373 # before all the tasks completed would make join() hang.
1374 p = multiprocessing.Pool(3, maxtasksperchild=1)
1375 results = []
1376 for i in range(6):
1377 results.append(p.apply_async(sqr, (i, 0.3)))
1378 p.close()
1379 p.join()
1380 # check the results
1381 for (j, res) in enumerate(results):
1382 self.assertEqual(res.get(), sqr(j))
1383
1384
Benjamin Petersondfd79492008-06-13 19:13:39 +00001385#
1386# Test that manager has expected number of shared objects left
1387#
1388
1389class _TestZZZNumberOfObjects(BaseTestCase):
1390 # Because test cases are sorted alphabetically, this one will get
1391 # run after all the other tests for the manager. It tests that
1392 # there have been no "reference leaks" for the manager's shared
1393 # objects. Note the comment in _TestPool.test_terminate().
1394 ALLOWED_TYPES = ('manager',)
1395
1396 def test_number_of_objects(self):
1397 EXPECTED_NUMBER = 1 # the pool object is still alive
1398 multiprocessing.active_children() # discard dead process objs
1399 gc.collect() # do garbage collection
1400 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001401 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001402 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001403 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001404 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001405
1406 self.assertEqual(refs, EXPECTED_NUMBER)
1407
1408#
1409# Test of creating a customized manager class
1410#
1411
1412from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1413
1414class FooBar(object):
1415 def f(self):
1416 return 'f()'
1417 def g(self):
1418 raise ValueError
1419 def _h(self):
1420 return '_h()'
1421
1422def baz():
1423 for i in xrange(10):
1424 yield i*i
1425
1426class IteratorProxy(BaseProxy):
1427 _exposed_ = ('next', '__next__')
1428 def __iter__(self):
1429 return self
1430 def next(self):
1431 return self._callmethod('next')
1432 def __next__(self):
1433 return self._callmethod('__next__')
1434
1435class MyManager(BaseManager):
1436 pass
1437
1438MyManager.register('Foo', callable=FooBar)
1439MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1440MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1441
1442
1443class _TestMyManager(BaseTestCase):
1444
1445 ALLOWED_TYPES = ('manager',)
1446
1447 def test_mymanager(self):
1448 manager = MyManager()
1449 manager.start()
1450
1451 foo = manager.Foo()
1452 bar = manager.Bar()
1453 baz = manager.baz()
1454
1455 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1456 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1457
1458 self.assertEqual(foo_methods, ['f', 'g'])
1459 self.assertEqual(bar_methods, ['f', '_h'])
1460
1461 self.assertEqual(foo.f(), 'f()')
1462 self.assertRaises(ValueError, foo.g)
1463 self.assertEqual(foo._callmethod('f'), 'f()')
1464 self.assertRaises(RemoteError, foo._callmethod, '_h')
1465
1466 self.assertEqual(bar.f(), 'f()')
1467 self.assertEqual(bar._h(), '_h()')
1468 self.assertEqual(bar._callmethod('f'), 'f()')
1469 self.assertEqual(bar._callmethod('_h'), '_h()')
1470
1471 self.assertEqual(list(baz), [i*i for i in range(10)])
1472
1473 manager.shutdown()
1474
1475#
1476# Test of connecting to a remote server and using xmlrpclib for serialization
1477#
1478
1479_queue = Queue.Queue()
1480def get_queue():
1481 return _queue
1482
1483class QueueManager(BaseManager):
1484 '''manager class used by server process'''
1485QueueManager.register('get_queue', callable=get_queue)
1486
1487class QueueManager2(BaseManager):
1488 '''manager class which specifies the same interface as QueueManager'''
1489QueueManager2.register('get_queue')
1490
1491
1492SERIALIZER = 'xmlrpclib'
1493
1494class _TestRemoteManager(BaseTestCase):
1495
1496 ALLOWED_TYPES = ('manager',)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001497 values = ['hello world', None, True, 2.25,
1498 #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
1499 ]
1500 result = values[:]
1501 if test_support.have_unicode:
1502 #result[-1] = u'hall\xe5 v\xe4rlden'
1503 uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
1504 r'\u0441\u0432\u0456\u0442')
1505 values.append(uvalue)
1506 result.append(uvalue)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001507
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001508 @classmethod
1509 def _putter(cls, address, authkey):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001510 manager = QueueManager2(
1511 address=address, authkey=authkey, serializer=SERIALIZER
1512 )
1513 manager.connect()
1514 queue = manager.get_queue()
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001515 # Note that xmlrpclib will deserialize object as a list not a tuple
1516 queue.put(tuple(cls.values))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001517
1518 def test_remote(self):
1519 authkey = os.urandom(32)
1520
1521 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001522 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersondfd79492008-06-13 19:13:39 +00001523 )
1524 manager.start()
1525
1526 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001527 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001528 p.start()
1529
1530 manager2 = QueueManager2(
1531 address=manager.address, authkey=authkey, serializer=SERIALIZER
1532 )
1533 manager2.connect()
1534 queue = manager2.get_queue()
1535
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02001536 self.assertEqual(queue.get(), self.result)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001537
1538 # Because we are using xmlrpclib for serialization instead of
1539 # pickle this will cause a serialization error.
1540 self.assertRaises(Exception, queue.put, time.sleep)
1541
1542 # Make queue finalizer run before the server is stopped
1543 del queue
1544 manager.shutdown()
1545
Jesse Noller459a6482009-03-30 15:50:42 +00001546class _TestManagerRestart(BaseTestCase):
1547
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001548 @classmethod
1549 def _putter(cls, address, authkey):
Jesse Noller459a6482009-03-30 15:50:42 +00001550 manager = QueueManager(
1551 address=address, authkey=authkey, serializer=SERIALIZER)
1552 manager.connect()
1553 queue = manager.get_queue()
1554 queue.put('hello world')
1555
1556 def test_rapid_restart(self):
1557 authkey = os.urandom(32)
1558 manager = QueueManager(
Antoine Pitrou78254dc2013-08-22 00:39:46 +02001559 address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin87d86e02010-11-01 05:15:55 +00001560 srvr = manager.get_server()
1561 addr = srvr.address
1562 # Close the connection.Listener socket which gets opened as a part
1563 # of manager.get_server(). It's not needed for the test.
1564 srvr.listener.close()
Jesse Noller459a6482009-03-30 15:50:42 +00001565 manager.start()
1566
1567 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001568 p.daemon = True
Jesse Noller459a6482009-03-30 15:50:42 +00001569 p.start()
1570 queue = manager.get_queue()
1571 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001572 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001573 manager.shutdown()
1574 manager = QueueManager(
Antoine Pitrou54f9f832010-04-30 23:08:48 +00001575 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller459a6482009-03-30 15:50:42 +00001576 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001577 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001578
Benjamin Petersondfd79492008-06-13 19:13:39 +00001579#
1580#
1581#
1582
1583SENTINEL = latin('')
1584
1585class _TestConnection(BaseTestCase):
1586
1587 ALLOWED_TYPES = ('processes', 'threads')
1588
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001589 @classmethod
1590 def _echo(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00001591 for msg in iter(conn.recv_bytes, SENTINEL):
1592 conn.send_bytes(msg)
1593 conn.close()
1594
1595 def test_connection(self):
1596 conn, child_conn = self.Pipe()
1597
1598 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001599 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001600 p.start()
1601
1602 seq = [1, 2.25, None]
1603 msg = latin('hello world')
1604 longmsg = msg * 10
1605 arr = array.array('i', range(4))
1606
1607 if self.TYPE == 'processes':
1608 self.assertEqual(type(conn.fileno()), int)
1609
1610 self.assertEqual(conn.send(seq), None)
1611 self.assertEqual(conn.recv(), seq)
1612
1613 self.assertEqual(conn.send_bytes(msg), None)
1614 self.assertEqual(conn.recv_bytes(), msg)
1615
1616 if self.TYPE == 'processes':
1617 buffer = array.array('i', [0]*10)
1618 expected = list(arr) + [0] * (10 - len(arr))
1619 self.assertEqual(conn.send_bytes(arr), None)
1620 self.assertEqual(conn.recv_bytes_into(buffer),
1621 len(arr) * buffer.itemsize)
1622 self.assertEqual(list(buffer), expected)
1623
1624 buffer = array.array('i', [0]*10)
1625 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1626 self.assertEqual(conn.send_bytes(arr), None)
1627 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1628 len(arr) * buffer.itemsize)
1629 self.assertEqual(list(buffer), expected)
1630
1631 buffer = bytearray(latin(' ' * 40))
1632 self.assertEqual(conn.send_bytes(longmsg), None)
1633 try:
1634 res = conn.recv_bytes_into(buffer)
1635 except multiprocessing.BufferTooShort, e:
1636 self.assertEqual(e.args, (longmsg,))
1637 else:
1638 self.fail('expected BufferTooShort, got %s' % res)
1639
1640 poll = TimingWrapper(conn.poll)
1641
1642 self.assertEqual(poll(), False)
1643 self.assertTimingAlmostEqual(poll.elapsed, 0)
1644
1645 self.assertEqual(poll(TIMEOUT1), False)
1646 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1647
1648 conn.send(None)
Giampaolo Rodola'cef20062012-12-31 17:23:09 +01001649 time.sleep(.1)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001650
1651 self.assertEqual(poll(TIMEOUT1), True)
1652 self.assertTimingAlmostEqual(poll.elapsed, 0)
1653
1654 self.assertEqual(conn.recv(), None)
1655
1656 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1657 conn.send_bytes(really_big_msg)
1658 self.assertEqual(conn.recv_bytes(), really_big_msg)
1659
1660 conn.send_bytes(SENTINEL) # tell child to quit
1661 child_conn.close()
1662
1663 if self.TYPE == 'processes':
1664 self.assertEqual(conn.readable, True)
1665 self.assertEqual(conn.writable, True)
1666 self.assertRaises(EOFError, conn.recv)
1667 self.assertRaises(EOFError, conn.recv_bytes)
1668
1669 p.join()
1670
1671 def test_duplex_false(self):
1672 reader, writer = self.Pipe(duplex=False)
1673 self.assertEqual(writer.send(1), None)
1674 self.assertEqual(reader.recv(), 1)
1675 if self.TYPE == 'processes':
1676 self.assertEqual(reader.readable, True)
1677 self.assertEqual(reader.writable, False)
1678 self.assertEqual(writer.readable, False)
1679 self.assertEqual(writer.writable, True)
1680 self.assertRaises(IOError, reader.send, 2)
1681 self.assertRaises(IOError, writer.recv)
1682 self.assertRaises(IOError, writer.poll)
1683
1684 def test_spawn_close(self):
1685 # We test that a pipe connection can be closed by parent
1686 # process immediately after child is spawned. On Windows this
1687 # would have sometimes failed on old versions because
1688 # child_conn would be closed before the child got a chance to
1689 # duplicate it.
1690 conn, child_conn = self.Pipe()
1691
1692 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001693 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001694 p.start()
1695 child_conn.close() # this might complete before child initializes
1696
1697 msg = latin('hello')
1698 conn.send_bytes(msg)
1699 self.assertEqual(conn.recv_bytes(), msg)
1700
1701 conn.send_bytes(SENTINEL)
1702 conn.close()
1703 p.join()
1704
1705 def test_sendbytes(self):
1706 if self.TYPE != 'processes':
Zachary Ware1f702212013-12-10 14:09:20 -06001707 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersondfd79492008-06-13 19:13:39 +00001708
1709 msg = latin('abcdefghijklmnopqrstuvwxyz')
1710 a, b = self.Pipe()
1711
1712 a.send_bytes(msg)
1713 self.assertEqual(b.recv_bytes(), msg)
1714
1715 a.send_bytes(msg, 5)
1716 self.assertEqual(b.recv_bytes(), msg[5:])
1717
1718 a.send_bytes(msg, 7, 8)
1719 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1720
1721 a.send_bytes(msg, 26)
1722 self.assertEqual(b.recv_bytes(), latin(''))
1723
1724 a.send_bytes(msg, 26, 0)
1725 self.assertEqual(b.recv_bytes(), latin(''))
1726
1727 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1728
1729 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1730
1731 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1732
1733 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1734
1735 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1736
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001737 @classmethod
1738 def _is_fd_assigned(cls, fd):
1739 try:
1740 os.fstat(fd)
1741 except OSError as e:
1742 if e.errno == errno.EBADF:
1743 return False
1744 raise
1745 else:
1746 return True
1747
1748 @classmethod
1749 def _writefd(cls, conn, data, create_dummy_fds=False):
1750 if create_dummy_fds:
1751 for i in range(0, 256):
1752 if not cls._is_fd_assigned(i):
1753 os.dup2(conn.fileno(), i)
1754 fd = reduction.recv_handle(conn)
1755 if msvcrt:
1756 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1757 os.write(fd, data)
1758 os.close(fd)
1759
Charles-François Natalif8413b22011-09-21 18:44:49 +02001760 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001761 def test_fd_transfer(self):
1762 if self.TYPE != 'processes':
1763 self.skipTest("only makes sense with processes")
1764 conn, child_conn = self.Pipe(duplex=True)
1765
1766 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001767 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001768 p.start()
1769 with open(test_support.TESTFN, "wb") as f:
1770 fd = f.fileno()
1771 if msvcrt:
1772 fd = msvcrt.get_osfhandle(fd)
1773 reduction.send_handle(conn, fd, p.pid)
1774 p.join()
1775 with open(test_support.TESTFN, "rb") as f:
1776 self.assertEqual(f.read(), b"foo")
1777
Charles-François Natalif8413b22011-09-21 18:44:49 +02001778 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001779 @unittest.skipIf(sys.platform == "win32",
1780 "test semantics don't make sense on Windows")
1781 @unittest.skipIf(MAXFD <= 256,
1782 "largest assignable fd number is too small")
1783 @unittest.skipUnless(hasattr(os, "dup2"),
1784 "test needs os.dup2()")
1785 def test_large_fd_transfer(self):
1786 # With fd > 256 (issue #11657)
1787 if self.TYPE != 'processes':
1788 self.skipTest("only makes sense with processes")
1789 conn, child_conn = self.Pipe(duplex=True)
1790
1791 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001792 p.daemon = True
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001793 p.start()
1794 with open(test_support.TESTFN, "wb") as f:
1795 fd = f.fileno()
1796 for newfd in range(256, MAXFD):
1797 if not self._is_fd_assigned(newfd):
1798 break
1799 else:
1800 self.fail("could not find an unassigned large file descriptor")
1801 os.dup2(fd, newfd)
1802 try:
1803 reduction.send_handle(conn, newfd, p.pid)
1804 finally:
1805 os.close(newfd)
1806 p.join()
1807 with open(test_support.TESTFN, "rb") as f:
1808 self.assertEqual(f.read(), b"bar")
1809
Jesus Ceac23484b2011-09-21 03:47:39 +02001810 @classmethod
1811 def _send_data_without_fd(self, conn):
1812 os.write(conn.fileno(), b"\0")
1813
Charles-François Natalif8413b22011-09-21 18:44:49 +02001814 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Ceac23484b2011-09-21 03:47:39 +02001815 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1816 def test_missing_fd_transfer(self):
1817 # Check that exception is raised when received data is not
1818 # accompanied by a file descriptor in ancillary data.
1819 if self.TYPE != 'processes':
1820 self.skipTest("only makes sense with processes")
1821 conn, child_conn = self.Pipe(duplex=True)
1822
1823 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1824 p.daemon = True
1825 p.start()
1826 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1827 p.join()
Antoine Pitroua1a8da82011-08-23 19:54:20 +02001828
Benjamin Petersondfd79492008-06-13 19:13:39 +00001829class _TestListenerClient(BaseTestCase):
1830
1831 ALLOWED_TYPES = ('processes', 'threads')
1832
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00001833 @classmethod
1834 def _test(cls, address):
1835 conn = cls.connection.Client(address)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001836 conn.send('hello')
1837 conn.close()
1838
1839 def test_listener_client(self):
1840 for family in self.connection.families:
1841 l = self.connection.Listener(family=family)
1842 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001843 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001844 p.start()
1845 conn = l.accept()
1846 self.assertEqual(conn.recv(), 'hello')
1847 p.join()
1848 l.close()
Richard Oudkerk9a16fa62012-05-05 20:41:08 +01001849
1850 def test_issue14725(self):
1851 l = self.connection.Listener()
1852 p = self.Process(target=self._test, args=(l.address,))
1853 p.daemon = True
1854 p.start()
1855 time.sleep(1)
1856 # On Windows the client process should by now have connected,
1857 # written data and closed the pipe handle by now. This causes
1858 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1859 # 14725.
1860 conn = l.accept()
1861 self.assertEqual(conn.recv(), 'hello')
1862 conn.close()
1863 p.join()
1864 l.close()
1865
Benjamin Petersondfd79492008-06-13 19:13:39 +00001866#
1867# Test of sending connection and socket objects between processes
1868#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001869"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001870class _TestPicklingConnections(BaseTestCase):
1871
1872 ALLOWED_TYPES = ('processes',)
1873
1874 def _listener(self, conn, families):
1875 for fam in families:
1876 l = self.connection.Listener(family=fam)
1877 conn.send(l.address)
1878 new_conn = l.accept()
1879 conn.send(new_conn)
1880
1881 if self.TYPE == 'processes':
1882 l = socket.socket()
1883 l.bind(('localhost', 0))
1884 conn.send(l.getsockname())
1885 l.listen(1)
1886 new_conn, addr = l.accept()
1887 conn.send(new_conn)
1888
1889 conn.recv()
1890
1891 def _remote(self, conn):
1892 for (address, msg) in iter(conn.recv, None):
1893 client = self.connection.Client(address)
1894 client.send(msg.upper())
1895 client.close()
1896
1897 if self.TYPE == 'processes':
1898 address, msg = conn.recv()
1899 client = socket.socket()
1900 client.connect(address)
1901 client.sendall(msg.upper())
1902 client.close()
1903
1904 conn.close()
1905
1906 def test_pickling(self):
1907 try:
1908 multiprocessing.allow_connection_pickling()
1909 except ImportError:
1910 return
1911
1912 families = self.connection.families
1913
1914 lconn, lconn0 = self.Pipe()
1915 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001916 lp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001917 lp.start()
1918 lconn0.close()
1919
1920 rconn, rconn0 = self.Pipe()
1921 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02001922 rp.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001923 rp.start()
1924 rconn0.close()
1925
1926 for fam in families:
1927 msg = ('This connection uses family %s' % fam).encode('ascii')
1928 address = lconn.recv()
1929 rconn.send((address, msg))
1930 new_conn = lconn.recv()
1931 self.assertEqual(new_conn.recv(), msg.upper())
1932
1933 rconn.send(None)
1934
1935 if self.TYPE == 'processes':
1936 msg = latin('This connection uses a normal socket')
1937 address = lconn.recv()
1938 rconn.send((address, msg))
1939 if hasattr(socket, 'fromfd'):
1940 new_conn = lconn.recv()
1941 self.assertEqual(new_conn.recv(100), msg.upper())
1942 else:
1943 # XXX On Windows with Py2.6 need to backport fromfd()
1944 discard = lconn.recv_bytes()
1945
1946 lconn.send(None)
1947
1948 rconn.close()
1949 lconn.close()
1950
1951 lp.join()
1952 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001953"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001954#
1955#
1956#
1957
1958class _TestHeap(BaseTestCase):
1959
1960 ALLOWED_TYPES = ('processes',)
1961
1962 def test_heap(self):
1963 iterations = 5000
1964 maxblocks = 50
1965 blocks = []
1966
1967 # create and destroy lots of blocks of different sizes
1968 for i in xrange(iterations):
1969 size = int(random.lognormvariate(0, 1) * 1000)
1970 b = multiprocessing.heap.BufferWrapper(size)
1971 blocks.append(b)
1972 if len(blocks) > maxblocks:
1973 i = random.randrange(maxblocks)
1974 del blocks[i]
1975
1976 # get the heap object
1977 heap = multiprocessing.heap.BufferWrapper._heap
1978
1979 # verify the state of the heap
1980 all = []
1981 occupied = 0
Charles-François Natali414d0fa2011-07-02 13:56:19 +02001982 heap._lock.acquire()
1983 self.addCleanup(heap._lock.release)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001984 for L in heap._len_to_seq.values():
1985 for arena, start, stop in L:
1986 all.append((heap._arenas.index(arena), start, stop,
1987 stop-start, 'free'))
1988 for arena, start, stop in heap._allocated_blocks:
1989 all.append((heap._arenas.index(arena), start, stop,
1990 stop-start, 'occupied'))
1991 occupied += (stop-start)
1992
1993 all.sort()
1994
1995 for i in range(len(all)-1):
1996 (arena, start, stop) = all[i][:3]
1997 (narena, nstart, nstop) = all[i+1][:3]
1998 self.assertTrue((arena != narena and nstart == 0) or
1999 (stop == nstart))
2000
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002001 def test_free_from_gc(self):
2002 # Check that freeing of blocks by the garbage collector doesn't deadlock
2003 # (issue #12352).
2004 # Make sure the GC is enabled, and set lower collection thresholds to
2005 # make collections more frequent (and increase the probability of
2006 # deadlock).
Charles-François Natali7c20ad32011-07-02 14:08:27 +02002007 if not gc.isenabled():
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002008 gc.enable()
2009 self.addCleanup(gc.disable)
Charles-François Natali7c20ad32011-07-02 14:08:27 +02002010 thresholds = gc.get_threshold()
2011 self.addCleanup(gc.set_threshold, *thresholds)
Charles-François Natali414d0fa2011-07-02 13:56:19 +02002012 gc.set_threshold(10)
2013
2014 # perform numerous block allocations, with cyclic references to make
2015 # sure objects are collected asynchronously by the gc
2016 for i in range(5000):
2017 a = multiprocessing.heap.BufferWrapper(1)
2018 b = multiprocessing.heap.BufferWrapper(1)
2019 # circular references
2020 a.buddy = b
2021 b.buddy = a
2022
Benjamin Petersondfd79492008-06-13 19:13:39 +00002023#
2024#
2025#
2026
Benjamin Petersondfd79492008-06-13 19:13:39 +00002027class _Foo(Structure):
2028 _fields_ = [
2029 ('x', c_int),
2030 ('y', c_double)
2031 ]
2032
2033class _TestSharedCTypes(BaseTestCase):
2034
2035 ALLOWED_TYPES = ('processes',)
2036
Antoine Pitrou55d935a2010-11-22 16:35:57 +00002037 def setUp(self):
2038 if not HAS_SHAREDCTYPES:
2039 self.skipTest("requires multiprocessing.sharedctypes")
2040
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002041 @classmethod
2042 def _double(cls, x, y, foo, arr, string):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002043 x.value *= 2
2044 y.value *= 2
2045 foo.x *= 2
2046 foo.y *= 2
2047 string.value *= 2
2048 for i in range(len(arr)):
2049 arr[i] *= 2
2050
2051 def test_sharedctypes(self, lock=False):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002052 x = Value('i', 7, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002053 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002054 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandlbd564c32010-02-06 23:33:33 +00002055 arr = self.Array('d', range(10), lock=lock)
2056 string = self.Array('c', 20, lock=lock)
Brian Curtina06e9b82010-10-07 02:27:41 +00002057 string.value = latin('hello')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002058
2059 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002060 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002061 p.start()
2062 p.join()
2063
2064 self.assertEqual(x.value, 14)
2065 self.assertAlmostEqual(y.value, 2.0/3.0)
2066 self.assertEqual(foo.x, 6)
2067 self.assertAlmostEqual(foo.y, 4.0)
2068 for i in range(10):
2069 self.assertAlmostEqual(arr[i], i*2)
2070 self.assertEqual(string.value, latin('hellohello'))
2071
2072 def test_synchronize(self):
2073 self.test_sharedctypes(lock=True)
2074
2075 def test_copy(self):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002076 foo = _Foo(2, 5.0)
Brian Curtina06e9b82010-10-07 02:27:41 +00002077 bar = copy(foo)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002078 foo.x = 0
2079 foo.y = 0
2080 self.assertEqual(bar.x, 2)
2081 self.assertAlmostEqual(bar.y, 5.0)
2082
2083#
2084#
2085#
2086
2087class _TestFinalize(BaseTestCase):
2088
2089 ALLOWED_TYPES = ('processes',)
2090
Antoine Pitroud09f1672017-06-13 17:52:29 +02002091 def setUp(self):
2092 self.registry_backup = util._finalizer_registry.copy()
2093 util._finalizer_registry.clear()
2094
2095 def tearDown(self):
2096 self.assertFalse(util._finalizer_registry)
2097 util._finalizer_registry.update(self.registry_backup)
2098
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002099 @classmethod
2100 def _test_finalize(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002101 class Foo(object):
2102 pass
2103
2104 a = Foo()
2105 util.Finalize(a, conn.send, args=('a',))
2106 del a # triggers callback for a
2107
2108 b = Foo()
2109 close_b = util.Finalize(b, conn.send, args=('b',))
2110 close_b() # triggers callback for b
2111 close_b() # does nothing because callback has already been called
2112 del b # does nothing because callback has already been called
2113
2114 c = Foo()
2115 util.Finalize(c, conn.send, args=('c',))
2116
2117 d10 = Foo()
2118 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2119
2120 d01 = Foo()
2121 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2122 d02 = Foo()
2123 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2124 d03 = Foo()
2125 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2126
2127 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2128
2129 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2130
Ezio Melottic2077b02011-03-16 12:34:31 +02002131 # call multiprocessing's cleanup function then exit process without
Benjamin Petersondfd79492008-06-13 19:13:39 +00002132 # garbage collecting locals
2133 util._exit_function()
2134 conn.close()
2135 os._exit(0)
2136
2137 def test_finalize(self):
2138 conn, child_conn = self.Pipe()
2139
2140 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea6f6016b2011-09-09 20:26:57 +02002141 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00002142 p.start()
2143 p.join()
2144
2145 result = [obj for obj in iter(conn.recv, 'STOP')]
2146 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2147
Antoine Pitroud09f1672017-06-13 17:52:29 +02002148 def test_thread_safety(self):
2149 # bpo-24484: _run_finalizers() should be thread-safe
2150 def cb():
2151 pass
2152
2153 class Foo(object):
2154 def __init__(self):
2155 self.ref = self # create reference cycle
2156 # insert finalizer at random key
2157 util.Finalize(self, cb, exitpriority=random.randint(1, 100))
2158
2159 finish = False
2160 exc = []
2161
2162 def run_finalizers():
2163 while not finish:
2164 time.sleep(random.random() * 1e-1)
2165 try:
2166 # A GC run will eventually happen during this,
2167 # collecting stale Foo's and mutating the registry
2168 util._run_finalizers()
2169 except Exception as e:
2170 exc.append(e)
2171
2172 def make_finalizers():
2173 d = {}
2174 while not finish:
2175 try:
2176 # Old Foo's get gradually replaced and later
2177 # collected by the GC (because of the cyclic ref)
2178 d[random.getrandbits(5)] = {Foo() for i in range(10)}
2179 except Exception as e:
2180 exc.append(e)
2181 d.clear()
2182
2183 old_interval = sys.getcheckinterval()
2184 old_threshold = gc.get_threshold()
2185 try:
2186 sys.setcheckinterval(10)
2187 gc.set_threshold(5, 5, 5)
2188 threads = [threading.Thread(target=run_finalizers),
2189 threading.Thread(target=make_finalizers)]
2190 with test_support.start_threads(threads):
2191 time.sleep(4.0) # Wait a bit to trigger race condition
2192 finish = True
2193 if exc:
2194 raise exc[0]
2195 finally:
2196 sys.setcheckinterval(old_interval)
2197 gc.set_threshold(*old_threshold)
2198 gc.collect() # Collect remaining Foo's
2199
2200
Benjamin Petersondfd79492008-06-13 19:13:39 +00002201#
2202# Test that from ... import * works for each module
2203#
2204
2205class _TestImportStar(BaseTestCase):
2206
2207 ALLOWED_TYPES = ('processes',)
2208
2209 def test_import(self):
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002210 modules = [
Benjamin Petersondfd79492008-06-13 19:13:39 +00002211 'multiprocessing', 'multiprocessing.connection',
2212 'multiprocessing.heap', 'multiprocessing.managers',
2213 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersondfd79492008-06-13 19:13:39 +00002214 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002215 ]
2216
Charles-François Natalif8413b22011-09-21 18:44:49 +02002217 if HAS_REDUCTION:
2218 modules.append('multiprocessing.reduction')
2219
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002220 if c_int is not None:
2221 # This module requires _ctypes
2222 modules.append('multiprocessing.sharedctypes')
Benjamin Petersondfd79492008-06-13 19:13:39 +00002223
2224 for name in modules:
2225 __import__(name)
2226 mod = sys.modules[name]
2227
2228 for attr in getattr(mod, '__all__', ()):
2229 self.assertTrue(
2230 hasattr(mod, attr),
2231 '%r does not have attribute %r' % (mod, attr)
2232 )
2233
2234#
2235# Quick test that logging works -- does not test logging output
2236#
2237
2238class _TestLogging(BaseTestCase):
2239
2240 ALLOWED_TYPES = ('processes',)
2241
2242 def test_enable_logging(self):
2243 logger = multiprocessing.get_logger()
2244 logger.setLevel(util.SUBWARNING)
2245 self.assertTrue(logger is not None)
2246 logger.debug('this will not be printed')
2247 logger.info('nor will this')
2248 logger.setLevel(LOG_LEVEL)
2249
Antoine Pitrou4eb2b282010-11-02 23:51:30 +00002250 @classmethod
2251 def _test_level(cls, conn):
Benjamin Petersondfd79492008-06-13 19:13:39 +00002252 logger = multiprocessing.get_logger()
2253 conn.send(logger.getEffectiveLevel())
2254
2255 def test_level(self):
2256 LEVEL1 = 32
2257 LEVEL2 = 37
2258
2259 logger = multiprocessing.get_logger()
2260 root_logger = logging.getLogger()
2261 root_level = root_logger.level
2262
2263 reader, writer = multiprocessing.Pipe(duplex=False)
2264
2265 logger.setLevel(LEVEL1)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002266 p = self.Process(target=self._test_level, args=(writer,))
2267 p.daemon = True
2268 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002269 self.assertEqual(LEVEL1, reader.recv())
2270
2271 logger.setLevel(logging.NOTSET)
2272 root_logger.setLevel(LEVEL2)
Jesus Cea6f6016b2011-09-09 20:26:57 +02002273 p = self.Process(target=self._test_level, args=(writer,))
2274 p.daemon = True
2275 p.start()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002276 self.assertEqual(LEVEL2, reader.recv())
2277
2278 root_logger.setLevel(root_level)
2279 logger.setLevel(level=LOG_LEVEL)
2280
Jesse Noller814d02d2009-11-21 14:38:23 +00002281
Jesse Noller9a03f2f2009-11-24 14:17:29 +00002282# class _TestLoggingProcessName(BaseTestCase):
2283#
2284# def handle(self, record):
2285# assert record.processName == multiprocessing.current_process().name
2286# self.__handled = True
2287#
2288# def test_logging(self):
2289# handler = logging.Handler()
2290# handler.handle = self.handle
2291# self.__handled = False
2292# # Bypass getLogger() and side-effects
2293# logger = logging.getLoggerClass()(
2294# 'multiprocessing.test.TestLoggingProcessName')
2295# logger.addHandler(handler)
2296# logger.propagate = False
2297#
2298# logger.warn('foo')
2299# assert self.__handled
Jesse Noller814d02d2009-11-21 14:38:23 +00002300
Benjamin Petersondfd79492008-06-13 19:13:39 +00002301#
Richard Oudkerkba482642013-02-26 12:37:07 +00002302# Check that Process.join() retries if os.waitpid() fails with EINTR
2303#
2304
2305class _TestPollEintr(BaseTestCase):
2306
2307 ALLOWED_TYPES = ('processes',)
2308
2309 @classmethod
2310 def _killer(cls, pid):
2311 time.sleep(0.5)
2312 os.kill(pid, signal.SIGUSR1)
2313
2314 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2315 def test_poll_eintr(self):
2316 got_signal = [False]
2317 def record(*args):
2318 got_signal[0] = True
2319 pid = os.getpid()
2320 oldhandler = signal.signal(signal.SIGUSR1, record)
2321 try:
2322 killer = self.Process(target=self._killer, args=(pid,))
2323 killer.start()
2324 p = self.Process(target=time.sleep, args=(1,))
2325 p.start()
2326 p.join()
2327 self.assertTrue(got_signal[0])
2328 self.assertEqual(p.exitcode, 0)
2329 killer.join()
2330 finally:
2331 signal.signal(signal.SIGUSR1, oldhandler)
2332
2333#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002334# Test to verify handle verification, see issue 3321
2335#
2336
2337class TestInvalidHandle(unittest.TestCase):
2338
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002339 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002340 def test_invalid_handles(self):
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002341 conn = _multiprocessing.Connection(44977608)
2342 self.assertRaises(IOError, conn.poll)
2343 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002344
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00002345#
Benjamin Petersondfd79492008-06-13 19:13:39 +00002346# Functions used to create test cases from the base ones in this module
2347#
2348
2349def get_attributes(Source, names):
2350 d = {}
2351 for name in names:
2352 obj = getattr(Source, name)
2353 if type(obj) == type(get_attributes):
2354 obj = staticmethod(obj)
2355 d[name] = obj
2356 return d
2357
2358def create_test_cases(Mixin, type):
2359 result = {}
2360 glob = globals()
Florent Xicluna36b9fbb2010-03-24 19:33:25 +00002361 Type = type.capitalize()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002362
2363 for name in glob.keys():
2364 if name.startswith('_Test'):
2365 base = glob[name]
2366 if type in base.ALLOWED_TYPES:
2367 newname = 'With' + Type + name[1:]
2368 class Temp(base, unittest.TestCase, Mixin):
2369 pass
2370 result[newname] = Temp
2371 Temp.__name__ = newname
2372 Temp.__module__ = Mixin.__module__
2373 return result
2374
2375#
2376# Create test cases
2377#
2378
2379class ProcessesMixin(object):
2380 TYPE = 'processes'
2381 Process = multiprocessing.Process
2382 locals().update(get_attributes(multiprocessing, (
2383 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2384 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2385 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002386 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002387 )))
2388
2389testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2390globals().update(testcases_processes)
2391
2392
2393class ManagerMixin(object):
2394 TYPE = 'manager'
2395 Process = multiprocessing.Process
2396 manager = object.__new__(multiprocessing.managers.SyncManager)
2397 locals().update(get_attributes(manager, (
2398 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2399 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002400 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002401 )))
2402
2403testcases_manager = create_test_cases(ManagerMixin, type='manager')
2404globals().update(testcases_manager)
2405
2406
2407class ThreadsMixin(object):
2408 TYPE = 'threads'
2409 Process = multiprocessing.dummy.Process
2410 locals().update(get_attributes(multiprocessing.dummy, (
2411 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2412 'Condition', 'Event', 'Value', 'Array', 'current_process',
2413 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerkd44a4a22012-06-06 17:52:18 +01002414 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersondfd79492008-06-13 19:13:39 +00002415 )))
2416
2417testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2418globals().update(testcases_threads)
2419
Neal Norwitz0c519b32008-08-25 01:50:24 +00002420class OtherTest(unittest.TestCase):
2421 # TODO: add more tests for deliver/answer challenge.
2422 def test_deliver_challenge_auth_failure(self):
2423 class _FakeConnection(object):
2424 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002425 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00002426 def send_bytes(self, data):
2427 pass
2428 self.assertRaises(multiprocessing.AuthenticationError,
2429 multiprocessing.connection.deliver_challenge,
2430 _FakeConnection(), b'abc')
2431
2432 def test_answer_challenge_auth_failure(self):
2433 class _FakeConnection(object):
2434 def __init__(self):
2435 self.count = 0
2436 def recv_bytes(self, size):
2437 self.count += 1
2438 if self.count == 1:
2439 return multiprocessing.connection.CHALLENGE
2440 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00002441 return b'something bogus'
2442 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00002443 def send_bytes(self, data):
2444 pass
2445 self.assertRaises(multiprocessing.AuthenticationError,
2446 multiprocessing.connection.answer_challenge,
2447 _FakeConnection(), b'abc')
2448
Jesse Noller7152f6d2009-04-02 05:17:26 +00002449#
2450# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2451#
2452
2453def initializer(ns):
2454 ns.test += 1
2455
2456class TestInitializers(unittest.TestCase):
2457 def setUp(self):
2458 self.mgr = multiprocessing.Manager()
2459 self.ns = self.mgr.Namespace()
2460 self.ns.test = 0
2461
2462 def tearDown(self):
2463 self.mgr.shutdown()
2464
2465 def test_manager_initializer(self):
2466 m = multiprocessing.managers.SyncManager()
2467 self.assertRaises(TypeError, m.start, 1)
2468 m.start(initializer, (self.ns,))
2469 self.assertEqual(self.ns.test, 1)
2470 m.shutdown()
2471
2472 def test_pool_initializer(self):
2473 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2474 p = multiprocessing.Pool(1, initializer, (self.ns,))
2475 p.close()
2476 p.join()
2477 self.assertEqual(self.ns.test, 1)
2478
Jesse Noller1b90efb2009-06-30 17:11:52 +00002479#
2480# Issue 5155, 5313, 5331: Test process in processes
2481# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2482#
2483
Richard Oudkerkc5496072013-09-29 17:10:40 +01002484def _this_sub_process(q):
Jesse Noller1b90efb2009-06-30 17:11:52 +00002485 try:
2486 item = q.get(block=False)
2487 except Queue.Empty:
2488 pass
2489
Richard Oudkerkc5496072013-09-29 17:10:40 +01002490def _test_process(q):
2491 queue = multiprocessing.Queue()
2492 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
2493 subProc.daemon = True
2494 subProc.start()
2495 subProc.join()
2496
Jesse Noller1b90efb2009-06-30 17:11:52 +00002497def _afunc(x):
2498 return x*x
2499
2500def pool_in_process():
2501 pool = multiprocessing.Pool(processes=4)
2502 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2503
2504class _file_like(object):
2505 def __init__(self, delegate):
2506 self._delegate = delegate
2507 self._pid = None
2508
2509 @property
2510 def cache(self):
2511 pid = os.getpid()
2512 # There are no race conditions since fork keeps only the running thread
2513 if pid != self._pid:
2514 self._pid = pid
2515 self._cache = []
2516 return self._cache
2517
2518 def write(self, data):
2519 self.cache.append(data)
2520
2521 def flush(self):
2522 self._delegate.write(''.join(self.cache))
2523 self._cache = []
2524
2525class TestStdinBadfiledescriptor(unittest.TestCase):
2526
2527 def test_queue_in_process(self):
2528 queue = multiprocessing.Queue()
Richard Oudkerkc5496072013-09-29 17:10:40 +01002529 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Jesse Noller1b90efb2009-06-30 17:11:52 +00002530 proc.start()
2531 proc.join()
2532
2533 def test_pool_in_process(self):
2534 p = multiprocessing.Process(target=pool_in_process)
2535 p.start()
2536 p.join()
2537
2538 def test_flushing(self):
2539 sio = StringIO()
2540 flike = _file_like(sio)
2541 flike.write('foo')
2542 proc = multiprocessing.Process(target=lambda: flike.flush())
2543 flike.flush()
2544 assert sio.getvalue() == 'foo'
2545
Richard Oudkerke4b99382012-07-27 14:05:46 +01002546#
2547# Test interaction with socket timeouts - see Issue #6056
2548#
2549
2550class TestTimeouts(unittest.TestCase):
2551 @classmethod
2552 def _test_timeout(cls, child, address):
2553 time.sleep(1)
2554 child.send(123)
2555 child.close()
2556 conn = multiprocessing.connection.Client(address)
2557 conn.send(456)
2558 conn.close()
2559
2560 def test_timeout(self):
2561 old_timeout = socket.getdefaulttimeout()
2562 try:
2563 socket.setdefaulttimeout(0.1)
2564 parent, child = multiprocessing.Pipe(duplex=True)
2565 l = multiprocessing.connection.Listener(family='AF_INET')
2566 p = multiprocessing.Process(target=self._test_timeout,
2567 args=(child, l.address))
2568 p.start()
2569 child.close()
2570 self.assertEqual(parent.recv(), 123)
2571 parent.close()
2572 conn = l.accept()
2573 self.assertEqual(conn.recv(), 456)
2574 conn.close()
2575 l.close()
2576 p.join(10)
2577 finally:
2578 socket.setdefaulttimeout(old_timeout)
2579
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002580#
2581# Test what happens with no "if __name__ == '__main__'"
2582#
2583
2584class TestNoForkBomb(unittest.TestCase):
2585 def test_noforkbomb(self):
2586 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
2587 if WIN32:
2588 rc, out, err = test.script_helper.assert_python_failure(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002589 self.assertEqual(out, '')
2590 self.assertIn('RuntimeError', err)
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002591 else:
2592 rc, out, err = test.script_helper.assert_python_ok(name)
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002593 self.assertEqual(out.rstrip(), '123')
2594 self.assertEqual(err, '')
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002595
2596#
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002597# Issue 12098: check sys.flags of child matches that for parent
2598#
2599
2600class TestFlags(unittest.TestCase):
2601 @classmethod
2602 def run_in_grandchild(cls, conn):
2603 conn.send(tuple(sys.flags))
2604
2605 @classmethod
2606 def run_in_child(cls):
2607 import json
2608 r, w = multiprocessing.Pipe(duplex=False)
2609 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
2610 p.start()
2611 grandchild_flags = r.recv()
2612 p.join()
2613 r.close()
2614 w.close()
2615 flags = (tuple(sys.flags), grandchild_flags)
2616 print(json.dumps(flags))
2617
Serhiy Storchaka7fe04f12015-02-13 15:08:36 +02002618 @test_support.requires_unicode # XXX json needs unicode support
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002619 def test_flags(self):
2620 import json, subprocess
2621 # start child process using unusual flags
2622 prog = ('from test.test_multiprocessing import TestFlags; ' +
2623 'TestFlags.run_in_child()')
2624 data = subprocess.check_output(
Benjamin Peterson625af8e2013-03-20 12:47:57 -05002625 [sys.executable, '-E', '-B', '-O', '-c', prog])
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002626 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
2627 self.assertEqual(child_flags, grandchild_flags)
Richard Oudkerk7bdd93c2013-04-17 19:15:52 +01002628
2629#
2630# Issue #17555: ForkAwareThreadLock
2631#
2632
2633class TestForkAwareThreadLock(unittest.TestCase):
2634 # We recurisvely start processes. Issue #17555 meant that the
2635 # after fork registry would get duplicate entries for the same
2636 # lock. The size of the registry at generation n was ~2**n.
2637
2638 @classmethod
2639 def child(cls, n, conn):
2640 if n > 1:
2641 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
2642 p.start()
2643 p.join()
2644 else:
2645 conn.send(len(util._afterfork_registry))
2646 conn.close()
2647
2648 def test_lock(self):
2649 r, w = multiprocessing.Pipe(False)
2650 l = util.ForkAwareThreadLock()
2651 old_size = len(util._afterfork_registry)
2652 p = multiprocessing.Process(target=self.child, args=(5, w))
2653 p.start()
2654 new_size = r.recv()
2655 p.join()
2656 self.assertLessEqual(new_size, old_size)
2657
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002658#
Richard Oudkerk41072db2013-07-01 18:45:28 +01002659# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
2660#
2661
2662class TestIgnoreEINTR(unittest.TestCase):
2663
2664 @classmethod
2665 def _test_ignore(cls, conn):
2666 def handler(signum, frame):
2667 pass
2668 signal.signal(signal.SIGUSR1, handler)
2669 conn.send('ready')
2670 x = conn.recv()
2671 conn.send(x)
2672 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
2673
2674 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2675 def test_ignore(self):
2676 conn, child_conn = multiprocessing.Pipe()
2677 try:
2678 p = multiprocessing.Process(target=self._test_ignore,
2679 args=(child_conn,))
2680 p.daemon = True
2681 p.start()
2682 child_conn.close()
2683 self.assertEqual(conn.recv(), 'ready')
2684 time.sleep(0.1)
2685 os.kill(p.pid, signal.SIGUSR1)
2686 time.sleep(0.1)
2687 conn.send(1234)
2688 self.assertEqual(conn.recv(), 1234)
2689 time.sleep(0.1)
2690 os.kill(p.pid, signal.SIGUSR1)
2691 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
2692 time.sleep(0.1)
2693 p.join()
2694 finally:
2695 conn.close()
2696
2697 @classmethod
2698 def _test_ignore_listener(cls, conn):
2699 def handler(signum, frame):
2700 pass
2701 signal.signal(signal.SIGUSR1, handler)
2702 l = multiprocessing.connection.Listener()
2703 conn.send(l.address)
2704 a = l.accept()
2705 a.send('welcome')
2706
2707 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2708 def test_ignore_listener(self):
2709 conn, child_conn = multiprocessing.Pipe()
2710 try:
2711 p = multiprocessing.Process(target=self._test_ignore_listener,
2712 args=(child_conn,))
2713 p.daemon = True
2714 p.start()
2715 child_conn.close()
2716 address = conn.recv()
2717 time.sleep(0.1)
2718 os.kill(p.pid, signal.SIGUSR1)
2719 time.sleep(0.1)
2720 client = multiprocessing.connection.Client(address)
2721 self.assertEqual(client.recv(), 'welcome')
2722 p.join()
2723 finally:
2724 conn.close()
2725
2726#
Richard Oudkerkfaee75c2012-08-14 11:41:19 +01002727#
2728#
2729
Jesse Noller1b90efb2009-06-30 17:11:52 +00002730testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Kristján Valur Jónsson8927e8f2013-03-19 15:07:35 -07002731 TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
Richard Oudkerk41072db2013-07-01 18:45:28 +01002732 TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
Neal Norwitz0c519b32008-08-25 01:50:24 +00002733
Benjamin Petersondfd79492008-06-13 19:13:39 +00002734#
2735#
2736#
2737
2738def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00002739 if sys.platform.startswith("linux"):
2740 try:
2741 lock = multiprocessing.RLock()
2742 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00002743 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00002744
Charles-François Natali6392d7f2011-11-22 18:35:18 +01002745 check_enough_semaphores()
2746
Benjamin Petersondfd79492008-06-13 19:13:39 +00002747 if run is None:
2748 from test.test_support import run_unittest as run
2749
2750 util.get_temp_dir() # creates temp directory for use by all processes
2751
2752 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2753
Jesse Noller146b7ab2008-07-02 16:44:09 +00002754 ProcessesMixin.pool = multiprocessing.Pool(4)
2755 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2756 ManagerMixin.manager.__init__()
2757 ManagerMixin.manager.start()
2758 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002759
2760 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00002761 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2762 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00002763 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2764 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00002765 )
2766
2767 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2768 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Nick Coghlan13623662010-04-10 14:24:36 +00002769 # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
2770 # module during these tests is at least platform dependent and possibly
Nick Coghlan14459d52010-04-10 15:01:54 +00002771 # non-deterministic on any given platform. So we don't mind if the listed
Nick Coghlan13623662010-04-10 14:24:36 +00002772 # warnings aren't actually raised.
Florent Xicluna07627882010-03-21 01:14:24 +00002773 with test_support.check_py3k_warnings(
Nick Coghlan13623662010-04-10 14:24:36 +00002774 (".+__(get|set)slice__ has been removed", DeprecationWarning),
2775 (r"sys.exc_clear\(\) not supported", DeprecationWarning),
2776 quiet=True):
Florent Xicluna07627882010-03-21 01:14:24 +00002777 run(suite)
Benjamin Petersondfd79492008-06-13 19:13:39 +00002778
Jesse Noller146b7ab2008-07-02 16:44:09 +00002779 ThreadsMixin.pool.terminate()
2780 ProcessesMixin.pool.terminate()
2781 ManagerMixin.pool.terminate()
2782 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00002783
Jesse Noller146b7ab2008-07-02 16:44:09 +00002784 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00002785
2786def main():
2787 test_main(unittest.TextTestRunner(verbosity=2).run)
2788
2789if __name__ == '__main__':
2790 main()