blob: 6cff4fcd96a1d92e1b549f42cdac05b95bc00a29 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00006import queue as pyqueue
7import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00008import io
Antoine Pitroude911b22011-12-21 11:03:24 +01009import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000010import sys
11import os
12import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020013import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000014import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010019import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010020import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010022import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000023
Benjamin Petersone5384b02008-10-04 22:00:42 +000024
R. David Murraya21e4ca2009-03-31 23:16:50 +000025# Skip tests if _multiprocessing wasn't built.
26_multiprocessing = test.support.import_module('_multiprocessing')
27# Skip tests if sem_open implementation is broken.
28test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000029# import threading after _multiprocessing to raise a more revelant error
30# message: "No module named _multiprocessing". _multiprocessing is not compiled
31# without thread support.
32import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000033
Benjamin Petersone711caf2008-06-11 16:44:04 +000034import multiprocessing.dummy
35import multiprocessing.connection
36import multiprocessing.managers
37import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
Charles-François Natalibc8f0822011-09-20 20:36:51 +020040from multiprocessing import util
41
42try:
43 from multiprocessing import reduction
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010044 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
Charles-François Natalibc8f0822011-09-20 20:36:51 +020045except ImportError:
46 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
Brian Curtinafa88b52010-10-07 01:12:19 +000048try:
49 from multiprocessing.sharedctypes import Value, copy
50 HAS_SHAREDCTYPES = True
51except ImportError:
52 HAS_SHAREDCTYPES = False
53
Antoine Pitroubcb39d42011-08-23 19:46:22 +020054try:
55 import msvcrt
56except ImportError:
57 msvcrt = None
58
Benjamin Petersone711caf2008-06-11 16:44:04 +000059#
60#
61#
62
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000063def latin(s):
64 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000065
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
67# Constants
68#
69
70LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000071#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000072
73DELTA = 0.1
74CHECK_TIMINGS = False # making true makes tests take a lot longer
75 # and can sometimes cause some non-serious
76 # failures because some calls block a bit
77 # longer than expected
78if CHECK_TIMINGS:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
80else:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
82
83HAVE_GETVALUE = not getattr(_multiprocessing,
84 'HAVE_BROKEN_SEM_GETVALUE', False)
85
Jesse Noller6214edd2009-01-19 16:23:53 +000086WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020087
Richard Oudkerk59d54042012-05-10 16:11:12 +010088from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090def wait_for_handle(handle, timeout):
91 if timeout is not None and timeout < 0.0:
92 timeout = None
93 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000094
Antoine Pitroubcb39d42011-08-23 19:46:22 +020095try:
96 MAXFD = os.sysconf("SC_OPEN_MAX")
97except:
98 MAXFD = 256
99
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100100# To speed up tests when using the forkserver, we can preload these:
101PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
102
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000104# Some tests require ctypes
105#
106
107try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000108 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000109except ImportError:
110 Structure = object
111 c_int = c_double = None
112
Charles-François Natali221ef672011-11-22 18:55:22 +0100113
114def check_enough_semaphores():
115 """Check that the system supports enough semaphores to run the test."""
116 # minimum number of semaphores available according to POSIX
117 nsems_min = 256
118 try:
119 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
120 except (AttributeError, ValueError):
121 # sysconf not available or setting not available
122 return
123 if nsems == -1 or nsems >= nsems_min:
124 return
125 raise unittest.SkipTest("The OS doesn't support enough semaphores "
126 "to run the test (required: %d)." % nsems_min)
127
128
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000129#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000130# Creates a wrapper for a function which records the time it takes to finish
131#
132
133class TimingWrapper(object):
134
135 def __init__(self, func):
136 self.func = func
137 self.elapsed = None
138
139 def __call__(self, *args, **kwds):
140 t = time.time()
141 try:
142 return self.func(*args, **kwds)
143 finally:
144 self.elapsed = time.time() - t
145
146#
147# Base class for test cases
148#
149
150class BaseTestCase(object):
151
152 ALLOWED_TYPES = ('processes', 'manager', 'threads')
153
154 def assertTimingAlmostEqual(self, a, b):
155 if CHECK_TIMINGS:
156 self.assertAlmostEqual(a, b, 1)
157
158 def assertReturnsIfImplemented(self, value, func, *args):
159 try:
160 res = func(*args)
161 except NotImplementedError:
162 pass
163 else:
164 return self.assertEqual(value, res)
165
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000166 # For the sanity of Windows users, rather than crashing or freezing in
167 # multiple ways.
168 def __reduce__(self, *args):
169 raise NotImplementedError("shouldn't try to pickle a test case")
170
171 __reduce_ex__ = __reduce__
172
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173#
174# Return the value of a semaphore
175#
176
177def get_value(self):
178 try:
179 return self.get_value()
180 except AttributeError:
181 try:
182 return self._Semaphore__value
183 except AttributeError:
184 try:
185 return self._value
186 except AttributeError:
187 raise NotImplementedError
188
189#
190# Testcases
191#
192
193class _TestProcess(BaseTestCase):
194
195 ALLOWED_TYPES = ('processes', 'threads')
196
197 def test_current(self):
198 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600199 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000200
201 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203
204 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000205 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000206 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000208 self.assertEqual(current.ident, os.getpid())
209 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000210
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000211 def test_daemon_argument(self):
212 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600213 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000214
215 # By default uses the current process's daemon flag.
216 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000217 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000218 proc1 = self.Process(target=self._test, daemon=True)
219 self.assertTrue(proc1.daemon)
220 proc2 = self.Process(target=self._test, daemon=False)
221 self.assertFalse(proc2.daemon)
222
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000223 @classmethod
224 def _test(cls, q, *args, **kwds):
225 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 q.put(args)
227 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000228 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000229 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231 q.put(current.pid)
232
233 def test_process(self):
234 q = self.Queue(1)
235 e = self.Event()
236 args = (q, 1, 2)
237 kwargs = {'hello':23, 'bye':2.54}
238 name = 'SomeProcess'
239 p = self.Process(
240 target=self._test, args=args, kwargs=kwargs, name=name
241 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000242 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000243 current = self.current_process()
244
245 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000246 self.assertEqual(p.authkey, current.authkey)
247 self.assertEqual(p.is_alive(), False)
248 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000249 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000251 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000252
253 p.start()
254
Ezio Melottib3aedd42010-11-20 19:04:17 +0000255 self.assertEqual(p.exitcode, None)
256 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000257 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
Ezio Melottib3aedd42010-11-20 19:04:17 +0000259 self.assertEqual(q.get(), args[1:])
260 self.assertEqual(q.get(), kwargs)
261 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000263 self.assertEqual(q.get(), current.authkey)
264 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265
266 p.join()
267
Ezio Melottib3aedd42010-11-20 19:04:17 +0000268 self.assertEqual(p.exitcode, 0)
269 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000270 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000272 @classmethod
273 def _test_terminate(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100274 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275
276 def test_terminate(self):
277 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600278 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279
280 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000281 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282 p.start()
283
284 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000285 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000286 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000287
Richard Oudkerk59d54042012-05-10 16:11:12 +0100288 join = TimingWrapper(p.join)
289
290 self.assertEqual(join(0), None)
291 self.assertTimingAlmostEqual(join.elapsed, 0.0)
292 self.assertEqual(p.is_alive(), True)
293
294 self.assertEqual(join(-1), None)
295 self.assertTimingAlmostEqual(join.elapsed, 0.0)
296 self.assertEqual(p.is_alive(), True)
297
Richard Oudkerk26f92682013-10-17 13:56:18 +0100298 # XXX maybe terminating too soon causes the problems on Gentoo...
299 time.sleep(1)
300
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 p.terminate()
302
Richard Oudkerk4f350792013-10-13 00:49:27 +0100303 if hasattr(signal, 'alarm'):
Richard Oudkerkd44500a2013-10-17 10:38:37 +0100304 # On the Gentoo buildbot waitpid() often seems to block forever.
Richard Oudkerk26f92682013-10-17 13:56:18 +0100305 # We use alarm() to interrupt it if it blocks for too long.
Richard Oudkerk4f350792013-10-13 00:49:27 +0100306 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100307 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100308 old_handler = signal.signal(signal.SIGALRM, handler)
309 try:
310 signal.alarm(10)
311 self.assertEqual(join(), None)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100312 finally:
Richard Oudkerk1e2f67c2013-10-17 14:24:06 +0100313 signal.alarm(0)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100314 signal.signal(signal.SIGALRM, old_handler)
315 else:
316 self.assertEqual(join(), None)
317
Benjamin Petersone711caf2008-06-11 16:44:04 +0000318 self.assertTimingAlmostEqual(join.elapsed, 0.0)
319
320 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000321 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322
323 p.join()
324
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000325 # XXX sometimes get p.exitcode == 0 on Windows ...
326 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 def test_cpu_count(self):
329 try:
330 cpus = multiprocessing.cpu_count()
331 except NotImplementedError:
332 cpus = 1
333 self.assertTrue(type(cpus) is int)
334 self.assertTrue(cpus >= 1)
335
336 def test_active_children(self):
337 self.assertEqual(type(self.active_children()), list)
338
339 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000340 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000341
Jesus Cea94f964f2011-09-09 20:26:57 +0200342 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000344 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345
346 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000347 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000348
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000349 @classmethod
350 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000351 wconn.send(id)
352 if len(id) < 2:
353 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000354 p = cls.Process(
355 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000356 )
357 p.start()
358 p.join()
359
360 def test_recursion(self):
361 rconn, wconn = self.Pipe(duplex=False)
362 self._test_recursion(wconn, [])
363
364 time.sleep(DELTA)
365 result = []
366 while rconn.poll():
367 result.append(rconn.recv())
368
369 expected = [
370 [],
371 [0],
372 [0, 0],
373 [0, 1],
374 [1],
375 [1, 0],
376 [1, 1]
377 ]
378 self.assertEqual(result, expected)
379
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200380 @classmethod
381 def _test_sentinel(cls, event):
382 event.wait(10.0)
383
384 def test_sentinel(self):
385 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600386 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200387 event = self.Event()
388 p = self.Process(target=self._test_sentinel, args=(event,))
389 with self.assertRaises(ValueError):
390 p.sentinel
391 p.start()
392 self.addCleanup(p.join)
393 sentinel = p.sentinel
394 self.assertIsInstance(sentinel, int)
395 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
396 event.set()
397 p.join()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100398 self.assertTrue(wait_for_handle(sentinel, timeout=1))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200399
Benjamin Petersone711caf2008-06-11 16:44:04 +0000400#
401#
402#
403
404class _UpperCaser(multiprocessing.Process):
405
406 def __init__(self):
407 multiprocessing.Process.__init__(self)
408 self.child_conn, self.parent_conn = multiprocessing.Pipe()
409
410 def run(self):
411 self.parent_conn.close()
412 for s in iter(self.child_conn.recv, None):
413 self.child_conn.send(s.upper())
414 self.child_conn.close()
415
416 def submit(self, s):
417 assert type(s) is str
418 self.parent_conn.send(s)
419 return self.parent_conn.recv()
420
421 def stop(self):
422 self.parent_conn.send(None)
423 self.parent_conn.close()
424 self.child_conn.close()
425
426class _TestSubclassingProcess(BaseTestCase):
427
428 ALLOWED_TYPES = ('processes',)
429
430 def test_subclassing(self):
431 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200432 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000433 uppercaser.start()
434 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
435 self.assertEqual(uppercaser.submit('world'), 'WORLD')
436 uppercaser.stop()
437 uppercaser.join()
438
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100439 def test_stderr_flush(self):
440 # sys.stderr is flushed at process shutdown (issue #13812)
441 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600442 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100443
444 testfn = test.support.TESTFN
445 self.addCleanup(test.support.unlink, testfn)
446 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
447 proc.start()
448 proc.join()
449 with open(testfn, 'r') as f:
450 err = f.read()
451 # The whole traceback was printed
452 self.assertIn("ZeroDivisionError", err)
453 self.assertIn("test_multiprocessing.py", err)
454 self.assertIn("1/0 # MARKER", err)
455
456 @classmethod
457 def _test_stderr_flush(cls, testfn):
458 sys.stderr = open(testfn, 'w')
459 1/0 # MARKER
460
461
Richard Oudkerk29471de2012-06-06 19:04:57 +0100462 @classmethod
463 def _test_sys_exit(cls, reason, testfn):
464 sys.stderr = open(testfn, 'w')
465 sys.exit(reason)
466
467 def test_sys_exit(self):
468 # See Issue 13854
469 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600470 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk29471de2012-06-06 19:04:57 +0100471
472 testfn = test.support.TESTFN
473 self.addCleanup(test.support.unlink, testfn)
474
Richard Oudkerk8731d7b2013-11-17 17:24:11 +0000475 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100476 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
477 p.daemon = True
478 p.start()
479 p.join(5)
480 self.assertEqual(p.exitcode, code)
481
482 with open(testfn, 'r') as f:
483 self.assertEqual(f.read().rstrip(), str(reason))
484
485 for reason in (True, False, 8):
486 p = self.Process(target=sys.exit, args=(reason,))
487 p.daemon = True
488 p.start()
489 p.join(5)
490 self.assertEqual(p.exitcode, reason)
491
Benjamin Petersone711caf2008-06-11 16:44:04 +0000492#
493#
494#
495
496def queue_empty(q):
497 if hasattr(q, 'empty'):
498 return q.empty()
499 else:
500 return q.qsize() == 0
501
502def queue_full(q, maxsize):
503 if hasattr(q, 'full'):
504 return q.full()
505 else:
506 return q.qsize() == maxsize
507
508
509class _TestQueue(BaseTestCase):
510
511
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000512 @classmethod
513 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000514 child_can_start.wait()
515 for i in range(6):
516 queue.get()
517 parent_can_continue.set()
518
519 def test_put(self):
520 MAXSIZE = 6
521 queue = self.Queue(maxsize=MAXSIZE)
522 child_can_start = self.Event()
523 parent_can_continue = self.Event()
524
525 proc = self.Process(
526 target=self._test_put,
527 args=(queue, child_can_start, parent_can_continue)
528 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000529 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 proc.start()
531
532 self.assertEqual(queue_empty(queue), True)
533 self.assertEqual(queue_full(queue, MAXSIZE), False)
534
535 queue.put(1)
536 queue.put(2, True)
537 queue.put(3, True, None)
538 queue.put(4, False)
539 queue.put(5, False, None)
540 queue.put_nowait(6)
541
542 # the values may be in buffer but not yet in pipe so sleep a bit
543 time.sleep(DELTA)
544
545 self.assertEqual(queue_empty(queue), False)
546 self.assertEqual(queue_full(queue, MAXSIZE), True)
547
548 put = TimingWrapper(queue.put)
549 put_nowait = TimingWrapper(queue.put_nowait)
550
551 self.assertRaises(pyqueue.Full, put, 7, False)
552 self.assertTimingAlmostEqual(put.elapsed, 0)
553
554 self.assertRaises(pyqueue.Full, put, 7, False, None)
555 self.assertTimingAlmostEqual(put.elapsed, 0)
556
557 self.assertRaises(pyqueue.Full, put_nowait, 7)
558 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
559
560 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
561 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
562
563 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
564 self.assertTimingAlmostEqual(put.elapsed, 0)
565
566 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
567 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
568
569 child_can_start.set()
570 parent_can_continue.wait()
571
572 self.assertEqual(queue_empty(queue), True)
573 self.assertEqual(queue_full(queue, MAXSIZE), False)
574
575 proc.join()
576
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000577 @classmethod
578 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000579 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000580 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000581 queue.put(2)
582 queue.put(3)
583 queue.put(4)
584 queue.put(5)
585 parent_can_continue.set()
586
587 def test_get(self):
588 queue = self.Queue()
589 child_can_start = self.Event()
590 parent_can_continue = self.Event()
591
592 proc = self.Process(
593 target=self._test_get,
594 args=(queue, child_can_start, parent_can_continue)
595 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000596 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000597 proc.start()
598
599 self.assertEqual(queue_empty(queue), True)
600
601 child_can_start.set()
602 parent_can_continue.wait()
603
604 time.sleep(DELTA)
605 self.assertEqual(queue_empty(queue), False)
606
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000607 # Hangs unexpectedly, remove for now
608 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000609 self.assertEqual(queue.get(True, None), 2)
610 self.assertEqual(queue.get(True), 3)
611 self.assertEqual(queue.get(timeout=1), 4)
612 self.assertEqual(queue.get_nowait(), 5)
613
614 self.assertEqual(queue_empty(queue), True)
615
616 get = TimingWrapper(queue.get)
617 get_nowait = TimingWrapper(queue.get_nowait)
618
619 self.assertRaises(pyqueue.Empty, get, False)
620 self.assertTimingAlmostEqual(get.elapsed, 0)
621
622 self.assertRaises(pyqueue.Empty, get, False, None)
623 self.assertTimingAlmostEqual(get.elapsed, 0)
624
625 self.assertRaises(pyqueue.Empty, get_nowait)
626 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
627
628 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
629 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
630
631 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
632 self.assertTimingAlmostEqual(get.elapsed, 0)
633
634 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
635 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
636
637 proc.join()
638
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000639 @classmethod
640 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000641 for i in range(10, 20):
642 queue.put(i)
643 # note that at this point the items may only be buffered, so the
644 # process cannot shutdown until the feeder thread has finished
645 # pushing items onto the pipe.
646
647 def test_fork(self):
648 # Old versions of Queue would fail to create a new feeder
649 # thread for a forked process if the original process had its
650 # own feeder thread. This test checks that this no longer
651 # happens.
652
653 queue = self.Queue()
654
655 # put items on queue so that main process starts a feeder thread
656 for i in range(10):
657 queue.put(i)
658
659 # wait to make sure thread starts before we fork a new process
660 time.sleep(DELTA)
661
662 # fork process
663 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200664 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000665 p.start()
666
667 # check that all expected items are in the queue
668 for i in range(20):
669 self.assertEqual(queue.get(), i)
670 self.assertRaises(pyqueue.Empty, queue.get, False)
671
672 p.join()
673
674 def test_qsize(self):
675 q = self.Queue()
676 try:
677 self.assertEqual(q.qsize(), 0)
678 except NotImplementedError:
Zachary Ware9fe6d862013-12-08 00:20:35 -0600679 self.skipTest('qsize method not implemented')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000680 q.put(1)
681 self.assertEqual(q.qsize(), 1)
682 q.put(5)
683 self.assertEqual(q.qsize(), 2)
684 q.get()
685 self.assertEqual(q.qsize(), 1)
686 q.get()
687 self.assertEqual(q.qsize(), 0)
688
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000689 @classmethod
690 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000691 for obj in iter(q.get, None):
692 time.sleep(DELTA)
693 q.task_done()
694
695 def test_task_done(self):
696 queue = self.JoinableQueue()
697
Benjamin Petersone711caf2008-06-11 16:44:04 +0000698 workers = [self.Process(target=self._test_task_done, args=(queue,))
699 for i in range(4)]
700
701 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200702 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000703 p.start()
704
705 for i in range(10):
706 queue.put(i)
707
708 queue.join()
709
710 for p in workers:
711 queue.put(None)
712
713 for p in workers:
714 p.join()
715
Serhiy Storchakaf8904e92015-03-06 23:32:54 +0200716 def test_no_import_lock_contention(self):
717 with test.support.temp_cwd():
718 module_name = 'imported_by_an_imported_module'
719 with open(module_name + '.py', 'w') as f:
720 f.write("""if 1:
721 import multiprocessing
722
723 q = multiprocessing.Queue()
724 q.put('knock knock')
725 q.get(timeout=3)
726 q.close()
727 del q
728 """)
729
730 with test.support.DirsOnSysPath(os.getcwd()):
731 try:
732 __import__(module_name)
733 except pyqueue.Empty:
734 self.fail("Probable regression on import lock contention;"
735 " see Issue #22853")
736
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200737 def test_timeout(self):
738 q = multiprocessing.Queue()
739 start = time.time()
Victor Stinneraad7b2e2015-02-05 14:25:05 +0100740 self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200741 delta = time.time() - start
Victor Stinneraad7b2e2015-02-05 14:25:05 +0100742 # Tolerate a delta of 30 ms because of the bad clock resolution on
743 # Windows (usually 15.6 ms)
744 self.assertGreaterEqual(delta, 0.170)
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200745
Benjamin Petersone711caf2008-06-11 16:44:04 +0000746#
747#
748#
749
750class _TestLock(BaseTestCase):
751
752 def test_lock(self):
753 lock = self.Lock()
754 self.assertEqual(lock.acquire(), True)
755 self.assertEqual(lock.acquire(False), False)
756 self.assertEqual(lock.release(), None)
757 self.assertRaises((ValueError, threading.ThreadError), lock.release)
758
759 def test_rlock(self):
760 lock = self.RLock()
761 self.assertEqual(lock.acquire(), True)
762 self.assertEqual(lock.acquire(), True)
763 self.assertEqual(lock.acquire(), True)
764 self.assertEqual(lock.release(), None)
765 self.assertEqual(lock.release(), None)
766 self.assertEqual(lock.release(), None)
767 self.assertRaises((AssertionError, RuntimeError), lock.release)
768
Jesse Nollerf8d00852009-03-31 03:25:07 +0000769 def test_lock_context(self):
770 with self.Lock():
771 pass
772
Benjamin Petersone711caf2008-06-11 16:44:04 +0000773
774class _TestSemaphore(BaseTestCase):
775
776 def _test_semaphore(self, sem):
777 self.assertReturnsIfImplemented(2, get_value, sem)
778 self.assertEqual(sem.acquire(), True)
779 self.assertReturnsIfImplemented(1, get_value, sem)
780 self.assertEqual(sem.acquire(), True)
781 self.assertReturnsIfImplemented(0, get_value, sem)
782 self.assertEqual(sem.acquire(False), False)
783 self.assertReturnsIfImplemented(0, get_value, sem)
784 self.assertEqual(sem.release(), None)
785 self.assertReturnsIfImplemented(1, get_value, sem)
786 self.assertEqual(sem.release(), None)
787 self.assertReturnsIfImplemented(2, get_value, sem)
788
789 def test_semaphore(self):
790 sem = self.Semaphore(2)
791 self._test_semaphore(sem)
792 self.assertEqual(sem.release(), None)
793 self.assertReturnsIfImplemented(3, get_value, sem)
794 self.assertEqual(sem.release(), None)
795 self.assertReturnsIfImplemented(4, get_value, sem)
796
797 def test_bounded_semaphore(self):
798 sem = self.BoundedSemaphore(2)
799 self._test_semaphore(sem)
800 # Currently fails on OS/X
801 #if HAVE_GETVALUE:
802 # self.assertRaises(ValueError, sem.release)
803 # self.assertReturnsIfImplemented(2, get_value, sem)
804
805 def test_timeout(self):
806 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600807 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000808
809 sem = self.Semaphore(0)
810 acquire = TimingWrapper(sem.acquire)
811
812 self.assertEqual(acquire(False), False)
813 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
814
815 self.assertEqual(acquire(False, None), False)
816 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
817
818 self.assertEqual(acquire(False, TIMEOUT1), False)
819 self.assertTimingAlmostEqual(acquire.elapsed, 0)
820
821 self.assertEqual(acquire(True, TIMEOUT2), False)
822 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
823
824 self.assertEqual(acquire(timeout=TIMEOUT3), False)
825 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
826
827
828class _TestCondition(BaseTestCase):
829
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000830 @classmethod
831 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000832 cond.acquire()
833 sleeping.release()
834 cond.wait(timeout)
835 woken.release()
836 cond.release()
837
838 def check_invariant(self, cond):
839 # this is only supposed to succeed when there are no sleepers
840 if self.TYPE == 'processes':
841 try:
842 sleepers = (cond._sleeping_count.get_value() -
843 cond._woken_count.get_value())
844 self.assertEqual(sleepers, 0)
845 self.assertEqual(cond._wait_semaphore.get_value(), 0)
846 except NotImplementedError:
847 pass
848
849 def test_notify(self):
850 cond = self.Condition()
851 sleeping = self.Semaphore(0)
852 woken = self.Semaphore(0)
853
854 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000855 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000856 p.start()
857
858 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000859 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000860 p.start()
861
862 # wait for both children to start sleeping
863 sleeping.acquire()
864 sleeping.acquire()
865
866 # check no process/thread has woken up
867 time.sleep(DELTA)
868 self.assertReturnsIfImplemented(0, get_value, woken)
869
870 # wake up one process/thread
871 cond.acquire()
872 cond.notify()
873 cond.release()
874
875 # check one process/thread has woken up
876 time.sleep(DELTA)
877 self.assertReturnsIfImplemented(1, get_value, woken)
878
879 # wake up another
880 cond.acquire()
881 cond.notify()
882 cond.release()
883
884 # check other has woken up
885 time.sleep(DELTA)
886 self.assertReturnsIfImplemented(2, get_value, woken)
887
888 # check state is not mucked up
889 self.check_invariant(cond)
890 p.join()
891
892 def test_notify_all(self):
893 cond = self.Condition()
894 sleeping = self.Semaphore(0)
895 woken = self.Semaphore(0)
896
897 # start some threads/processes which will timeout
898 for i in range(3):
899 p = self.Process(target=self.f,
900 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000901 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000902 p.start()
903
904 t = threading.Thread(target=self.f,
905 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000906 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000907 t.start()
908
909 # wait for them all to sleep
910 for i in range(6):
911 sleeping.acquire()
912
913 # check they have all timed out
914 for i in range(6):
915 woken.acquire()
916 self.assertReturnsIfImplemented(0, get_value, woken)
917
918 # check state is not mucked up
919 self.check_invariant(cond)
920
921 # start some more threads/processes
922 for i in range(3):
923 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000924 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000925 p.start()
926
927 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000928 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000929 t.start()
930
931 # wait for them to all sleep
932 for i in range(6):
933 sleeping.acquire()
934
935 # check no process/thread has woken up
936 time.sleep(DELTA)
937 self.assertReturnsIfImplemented(0, get_value, woken)
938
939 # wake them all up
940 cond.acquire()
941 cond.notify_all()
942 cond.release()
943
944 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200945 for i in range(10):
946 try:
947 if get_value(woken) == 6:
948 break
949 except NotImplementedError:
950 break
951 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000952 self.assertReturnsIfImplemented(6, get_value, woken)
953
954 # check state is not mucked up
955 self.check_invariant(cond)
956
957 def test_timeout(self):
958 cond = self.Condition()
959 wait = TimingWrapper(cond.wait)
960 cond.acquire()
961 res = wait(TIMEOUT1)
962 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000963 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
965
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200966 @classmethod
967 def _test_waitfor_f(cls, cond, state):
968 with cond:
969 state.value = 0
970 cond.notify()
971 result = cond.wait_for(lambda : state.value==4)
972 if not result or state.value != 4:
973 sys.exit(1)
974
975 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
976 def test_waitfor(self):
977 # based on test in test/lock_tests.py
978 cond = self.Condition()
979 state = self.Value('i', -1)
980
981 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
982 p.daemon = True
983 p.start()
984
985 with cond:
986 result = cond.wait_for(lambda : state.value==0)
987 self.assertTrue(result)
988 self.assertEqual(state.value, 0)
989
990 for i in range(4):
991 time.sleep(0.01)
992 with cond:
993 state.value += 1
994 cond.notify()
995
996 p.join(5)
997 self.assertFalse(p.is_alive())
998 self.assertEqual(p.exitcode, 0)
999
1000 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001001 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1002 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001003 with cond:
1004 expected = 0.1
1005 dt = time.time()
1006 result = cond.wait_for(lambda : state.value==4, timeout=expected)
1007 dt = time.time() - dt
1008 # borrow logic in assertTimeout() from test/lock_tests.py
1009 if not result and expected * 0.6 < dt < expected * 10.0:
1010 success.value = True
1011
1012 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1013 def test_waitfor_timeout(self):
1014 # based on test in test/lock_tests.py
1015 cond = self.Condition()
1016 state = self.Value('i', 0)
1017 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001018 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001019
1020 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001021 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001022 p.daemon = True
1023 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001024 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001025
1026 # Only increment 3 times, so state == 4 is never reached.
1027 for i in range(3):
1028 time.sleep(0.01)
1029 with cond:
1030 state.value += 1
1031 cond.notify()
1032
1033 p.join(5)
1034 self.assertTrue(success.value)
1035
Richard Oudkerk98449932012-06-05 13:15:29 +01001036 @classmethod
1037 def _test_wait_result(cls, c, pid):
1038 with c:
1039 c.notify()
1040 time.sleep(1)
1041 if pid is not None:
1042 os.kill(pid, signal.SIGINT)
1043
1044 def test_wait_result(self):
1045 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1046 pid = os.getpid()
1047 else:
1048 pid = None
1049
1050 c = self.Condition()
1051 with c:
1052 self.assertFalse(c.wait(0))
1053 self.assertFalse(c.wait(0.1))
1054
1055 p = self.Process(target=self._test_wait_result, args=(c, pid))
1056 p.start()
1057
1058 self.assertTrue(c.wait(10))
1059 if pid is not None:
1060 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1061
1062 p.join()
1063
Benjamin Petersone711caf2008-06-11 16:44:04 +00001064
1065class _TestEvent(BaseTestCase):
1066
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001067 @classmethod
1068 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001069 time.sleep(TIMEOUT2)
1070 event.set()
1071
1072 def test_event(self):
1073 event = self.Event()
1074 wait = TimingWrapper(event.wait)
1075
Ezio Melotti13925002011-03-16 11:05:33 +02001076 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001077 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001078 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001079
Benjamin Peterson965ce872009-04-05 21:24:58 +00001080 # Removed, threading.Event.wait() will return the value of the __flag
1081 # instead of None. API Shear with the semaphore backed mp.Event
1082 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001083 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001084 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1086
1087 event.set()
1088
1089 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001090 self.assertEqual(event.is_set(), True)
1091 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001092 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001093 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001094 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1095 # self.assertEqual(event.is_set(), True)
1096
1097 event.clear()
1098
1099 #self.assertEqual(event.is_set(), False)
1100
Jesus Cea94f964f2011-09-09 20:26:57 +02001101 p = self.Process(target=self._test_event, args=(event,))
1102 p.daemon = True
1103 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001104 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001105
1106#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001107# Tests for Barrier - adapted from tests in test/lock_tests.py
1108#
1109
1110# Many of the tests for threading.Barrier use a list as an atomic
1111# counter: a value is appended to increment the counter, and the
1112# length of the list gives the value. We use the class DummyList
1113# for the same purpose.
1114
1115class _DummyList(object):
1116
1117 def __init__(self):
1118 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1119 lock = multiprocessing.Lock()
1120 self.__setstate__((wrapper, lock))
1121 self._lengthbuf[0] = 0
1122
1123 def __setstate__(self, state):
1124 (self._wrapper, self._lock) = state
1125 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1126
1127 def __getstate__(self):
1128 return (self._wrapper, self._lock)
1129
1130 def append(self, _):
1131 with self._lock:
1132 self._lengthbuf[0] += 1
1133
1134 def __len__(self):
1135 with self._lock:
1136 return self._lengthbuf[0]
1137
1138def _wait():
1139 # A crude wait/yield function not relying on synchronization primitives.
1140 time.sleep(0.01)
1141
1142
1143class Bunch(object):
1144 """
1145 A bunch of threads.
1146 """
1147 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1148 """
1149 Construct a bunch of `n` threads running the same function `f`.
1150 If `wait_before_exit` is True, the threads won't terminate until
1151 do_finish() is called.
1152 """
1153 self.f = f
1154 self.args = args
1155 self.n = n
1156 self.started = namespace.DummyList()
1157 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001158 self._can_exit = namespace.Event()
1159 if not wait_before_exit:
1160 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001161 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001162 p = namespace.Process(target=self.task)
1163 p.daemon = True
1164 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001165
1166 def task(self):
1167 pid = os.getpid()
1168 self.started.append(pid)
1169 try:
1170 self.f(*self.args)
1171 finally:
1172 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001173 self._can_exit.wait(30)
1174 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001175
1176 def wait_for_started(self):
1177 while len(self.started) < self.n:
1178 _wait()
1179
1180 def wait_for_finished(self):
1181 while len(self.finished) < self.n:
1182 _wait()
1183
1184 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001185 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001186
1187
1188class AppendTrue(object):
1189 def __init__(self, obj):
1190 self.obj = obj
1191 def __call__(self):
1192 self.obj.append(True)
1193
1194
1195class _TestBarrier(BaseTestCase):
1196 """
1197 Tests for Barrier objects.
1198 """
1199 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001200 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001201
1202 def setUp(self):
1203 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1204
1205 def tearDown(self):
1206 self.barrier.abort()
1207 self.barrier = None
1208
1209 def DummyList(self):
1210 if self.TYPE == 'threads':
1211 return []
1212 elif self.TYPE == 'manager':
1213 return self.manager.list()
1214 else:
1215 return _DummyList()
1216
1217 def run_threads(self, f, args):
1218 b = Bunch(self, f, args, self.N-1)
1219 f(*args)
1220 b.wait_for_finished()
1221
1222 @classmethod
1223 def multipass(cls, barrier, results, n):
1224 m = barrier.parties
1225 assert m == cls.N
1226 for i in range(n):
1227 results[0].append(True)
1228 assert len(results[1]) == i * m
1229 barrier.wait()
1230 results[1].append(True)
1231 assert len(results[0]) == (i + 1) * m
1232 barrier.wait()
1233 try:
1234 assert barrier.n_waiting == 0
1235 except NotImplementedError:
1236 pass
1237 assert not barrier.broken
1238
1239 def test_barrier(self, passes=1):
1240 """
1241 Test that a barrier is passed in lockstep
1242 """
1243 results = [self.DummyList(), self.DummyList()]
1244 self.run_threads(self.multipass, (self.barrier, results, passes))
1245
1246 def test_barrier_10(self):
1247 """
1248 Test that a barrier works for 10 consecutive runs
1249 """
1250 return self.test_barrier(10)
1251
1252 @classmethod
1253 def _test_wait_return_f(cls, barrier, queue):
1254 res = barrier.wait()
1255 queue.put(res)
1256
1257 def test_wait_return(self):
1258 """
1259 test the return value from barrier.wait
1260 """
1261 queue = self.Queue()
1262 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1263 results = [queue.get() for i in range(self.N)]
1264 self.assertEqual(results.count(0), 1)
1265
1266 @classmethod
1267 def _test_action_f(cls, barrier, results):
1268 barrier.wait()
1269 if len(results) != 1:
1270 raise RuntimeError
1271
1272 def test_action(self):
1273 """
1274 Test the 'action' callback
1275 """
1276 results = self.DummyList()
1277 barrier = self.Barrier(self.N, action=AppendTrue(results))
1278 self.run_threads(self._test_action_f, (barrier, results))
1279 self.assertEqual(len(results), 1)
1280
1281 @classmethod
1282 def _test_abort_f(cls, barrier, results1, results2):
1283 try:
1284 i = barrier.wait()
1285 if i == cls.N//2:
1286 raise RuntimeError
1287 barrier.wait()
1288 results1.append(True)
1289 except threading.BrokenBarrierError:
1290 results2.append(True)
1291 except RuntimeError:
1292 barrier.abort()
1293
1294 def test_abort(self):
1295 """
1296 Test that an abort will put the barrier in a broken state
1297 """
1298 results1 = self.DummyList()
1299 results2 = self.DummyList()
1300 self.run_threads(self._test_abort_f,
1301 (self.barrier, results1, results2))
1302 self.assertEqual(len(results1), 0)
1303 self.assertEqual(len(results2), self.N-1)
1304 self.assertTrue(self.barrier.broken)
1305
1306 @classmethod
1307 def _test_reset_f(cls, barrier, results1, results2, results3):
1308 i = barrier.wait()
1309 if i == cls.N//2:
1310 # Wait until the other threads are all in the barrier.
1311 while barrier.n_waiting < cls.N-1:
1312 time.sleep(0.001)
1313 barrier.reset()
1314 else:
1315 try:
1316 barrier.wait()
1317 results1.append(True)
1318 except threading.BrokenBarrierError:
1319 results2.append(True)
1320 # Now, pass the barrier again
1321 barrier.wait()
1322 results3.append(True)
1323
1324 def test_reset(self):
1325 """
1326 Test that a 'reset' on a barrier frees the waiting threads
1327 """
1328 results1 = self.DummyList()
1329 results2 = self.DummyList()
1330 results3 = self.DummyList()
1331 self.run_threads(self._test_reset_f,
1332 (self.barrier, results1, results2, results3))
1333 self.assertEqual(len(results1), 0)
1334 self.assertEqual(len(results2), self.N-1)
1335 self.assertEqual(len(results3), self.N)
1336
1337 @classmethod
1338 def _test_abort_and_reset_f(cls, barrier, barrier2,
1339 results1, results2, results3):
1340 try:
1341 i = barrier.wait()
1342 if i == cls.N//2:
1343 raise RuntimeError
1344 barrier.wait()
1345 results1.append(True)
1346 except threading.BrokenBarrierError:
1347 results2.append(True)
1348 except RuntimeError:
1349 barrier.abort()
1350 # Synchronize and reset the barrier. Must synchronize first so
1351 # that everyone has left it when we reset, and after so that no
1352 # one enters it before the reset.
1353 if barrier2.wait() == cls.N//2:
1354 barrier.reset()
1355 barrier2.wait()
1356 barrier.wait()
1357 results3.append(True)
1358
1359 def test_abort_and_reset(self):
1360 """
1361 Test that a barrier can be reset after being broken.
1362 """
1363 results1 = self.DummyList()
1364 results2 = self.DummyList()
1365 results3 = self.DummyList()
1366 barrier2 = self.Barrier(self.N)
1367
1368 self.run_threads(self._test_abort_and_reset_f,
1369 (self.barrier, barrier2, results1, results2, results3))
1370 self.assertEqual(len(results1), 0)
1371 self.assertEqual(len(results2), self.N-1)
1372 self.assertEqual(len(results3), self.N)
1373
1374 @classmethod
1375 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001376 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001377 if i == cls.N//2:
1378 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001379 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001380 try:
1381 barrier.wait(0.5)
1382 except threading.BrokenBarrierError:
1383 results.append(True)
1384
1385 def test_timeout(self):
1386 """
1387 Test wait(timeout)
1388 """
1389 results = self.DummyList()
1390 self.run_threads(self._test_timeout_f, (self.barrier, results))
1391 self.assertEqual(len(results), self.barrier.parties)
1392
1393 @classmethod
1394 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001395 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001396 if i == cls.N//2:
1397 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001398 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001399 try:
1400 barrier.wait()
1401 except threading.BrokenBarrierError:
1402 results.append(True)
1403
1404 def test_default_timeout(self):
1405 """
1406 Test the barrier's default timeout
1407 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001408 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001409 results = self.DummyList()
1410 self.run_threads(self._test_default_timeout_f, (barrier, results))
1411 self.assertEqual(len(results), barrier.parties)
1412
1413 def test_single_thread(self):
1414 b = self.Barrier(1)
1415 b.wait()
1416 b.wait()
1417
1418 @classmethod
1419 def _test_thousand_f(cls, barrier, passes, conn, lock):
1420 for i in range(passes):
1421 barrier.wait()
1422 with lock:
1423 conn.send(i)
1424
1425 def test_thousand(self):
1426 if self.TYPE == 'manager':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001427 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk3730a172012-06-15 18:26:07 +01001428 passes = 1000
1429 lock = self.Lock()
1430 conn, child_conn = self.Pipe(False)
1431 for j in range(self.N):
1432 p = self.Process(target=self._test_thousand_f,
1433 args=(self.barrier, passes, child_conn, lock))
1434 p.start()
1435
1436 for i in range(passes):
1437 for j in range(self.N):
1438 self.assertEqual(conn.recv(), i)
1439
1440#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001441#
1442#
1443
1444class _TestValue(BaseTestCase):
1445
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001446 ALLOWED_TYPES = ('processes',)
1447
Benjamin Petersone711caf2008-06-11 16:44:04 +00001448 codes_values = [
1449 ('i', 4343, 24234),
1450 ('d', 3.625, -4.25),
1451 ('h', -232, 234),
1452 ('c', latin('x'), latin('y'))
1453 ]
1454
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001455 def setUp(self):
1456 if not HAS_SHAREDCTYPES:
1457 self.skipTest("requires multiprocessing.sharedctypes")
1458
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001459 @classmethod
1460 def _test(cls, values):
1461 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001462 sv.value = cv[2]
1463
1464
1465 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001466 if raw:
1467 values = [self.RawValue(code, value)
1468 for code, value, _ in self.codes_values]
1469 else:
1470 values = [self.Value(code, value)
1471 for code, value, _ in self.codes_values]
1472
1473 for sv, cv in zip(values, self.codes_values):
1474 self.assertEqual(sv.value, cv[1])
1475
1476 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001477 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001478 proc.start()
1479 proc.join()
1480
1481 for sv, cv in zip(values, self.codes_values):
1482 self.assertEqual(sv.value, cv[2])
1483
1484 def test_rawvalue(self):
1485 self.test_value(raw=True)
1486
1487 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001488 val1 = self.Value('i', 5)
1489 lock1 = val1.get_lock()
1490 obj1 = val1.get_obj()
1491
1492 val2 = self.Value('i', 5, lock=None)
1493 lock2 = val2.get_lock()
1494 obj2 = val2.get_obj()
1495
1496 lock = self.Lock()
1497 val3 = self.Value('i', 5, lock=lock)
1498 lock3 = val3.get_lock()
1499 obj3 = val3.get_obj()
1500 self.assertEqual(lock, lock3)
1501
Jesse Nollerb0516a62009-01-18 03:11:38 +00001502 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001503 self.assertFalse(hasattr(arr4, 'get_lock'))
1504 self.assertFalse(hasattr(arr4, 'get_obj'))
1505
Jesse Nollerb0516a62009-01-18 03:11:38 +00001506 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1507
1508 arr5 = self.RawValue('i', 5)
1509 self.assertFalse(hasattr(arr5, 'get_lock'))
1510 self.assertFalse(hasattr(arr5, 'get_obj'))
1511
Benjamin Petersone711caf2008-06-11 16:44:04 +00001512
1513class _TestArray(BaseTestCase):
1514
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001515 ALLOWED_TYPES = ('processes',)
1516
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001517 @classmethod
1518 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001519 for i in range(1, len(seq)):
1520 seq[i] += seq[i-1]
1521
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001522 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001523 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001524 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1525 if raw:
1526 arr = self.RawArray('i', seq)
1527 else:
1528 arr = self.Array('i', seq)
1529
1530 self.assertEqual(len(arr), len(seq))
1531 self.assertEqual(arr[3], seq[3])
1532 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1533
1534 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1535
1536 self.assertEqual(list(arr[:]), seq)
1537
1538 self.f(seq)
1539
1540 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001541 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001542 p.start()
1543 p.join()
1544
1545 self.assertEqual(list(arr[:]), seq)
1546
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001547 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001548 def test_array_from_size(self):
1549 size = 10
1550 # Test for zeroing (see issue #11675).
1551 # The repetition below strengthens the test by increasing the chances
1552 # of previously allocated non-zero memory being used for the new array
1553 # on the 2nd and 3rd loops.
1554 for _ in range(3):
1555 arr = self.Array('i', size)
1556 self.assertEqual(len(arr), size)
1557 self.assertEqual(list(arr), [0] * size)
1558 arr[:] = range(10)
1559 self.assertEqual(list(arr), list(range(10)))
1560 del arr
1561
1562 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001563 def test_rawarray(self):
1564 self.test_array(raw=True)
1565
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001566 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001567 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001568 arr1 = self.Array('i', list(range(10)))
1569 lock1 = arr1.get_lock()
1570 obj1 = arr1.get_obj()
1571
1572 arr2 = self.Array('i', list(range(10)), lock=None)
1573 lock2 = arr2.get_lock()
1574 obj2 = arr2.get_obj()
1575
1576 lock = self.Lock()
1577 arr3 = self.Array('i', list(range(10)), lock=lock)
1578 lock3 = arr3.get_lock()
1579 obj3 = arr3.get_obj()
1580 self.assertEqual(lock, lock3)
1581
Jesse Nollerb0516a62009-01-18 03:11:38 +00001582 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001583 self.assertFalse(hasattr(arr4, 'get_lock'))
1584 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001585 self.assertRaises(AttributeError,
1586 self.Array, 'i', range(10), lock='notalock')
1587
1588 arr5 = self.RawArray('i', range(10))
1589 self.assertFalse(hasattr(arr5, 'get_lock'))
1590 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001591
1592#
1593#
1594#
1595
1596class _TestContainers(BaseTestCase):
1597
1598 ALLOWED_TYPES = ('manager',)
1599
1600 def test_list(self):
1601 a = self.list(list(range(10)))
1602 self.assertEqual(a[:], list(range(10)))
1603
1604 b = self.list()
1605 self.assertEqual(b[:], [])
1606
1607 b.extend(list(range(5)))
1608 self.assertEqual(b[:], list(range(5)))
1609
1610 self.assertEqual(b[2], 2)
1611 self.assertEqual(b[2:10], [2,3,4])
1612
1613 b *= 2
1614 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1615
1616 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1617
1618 self.assertEqual(a[:], list(range(10)))
1619
1620 d = [a, b]
1621 e = self.list(d)
1622 self.assertEqual(
1623 e[:],
1624 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1625 )
1626
1627 f = self.list([a])
1628 a.append('hello')
1629 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1630
1631 def test_dict(self):
1632 d = self.dict()
1633 indices = list(range(65, 70))
1634 for i in indices:
1635 d[i] = chr(i)
1636 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1637 self.assertEqual(sorted(d.keys()), indices)
1638 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1639 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1640
1641 def test_namespace(self):
1642 n = self.Namespace()
1643 n.name = 'Bob'
1644 n.job = 'Builder'
1645 n._hidden = 'hidden'
1646 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1647 del n.job
1648 self.assertEqual(str(n), "Namespace(name='Bob')")
1649 self.assertTrue(hasattr(n, 'name'))
1650 self.assertTrue(not hasattr(n, 'job'))
1651
1652#
1653#
1654#
1655
1656def sqr(x, wait=0.0):
1657 time.sleep(wait)
1658 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001659
Antoine Pitroude911b22011-12-21 11:03:24 +01001660def mul(x, y):
1661 return x*y
1662
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001663class SayWhenError(ValueError): pass
1664
1665def exception_throwing_generator(total, when):
1666 for i in range(total):
1667 if i == when:
1668 raise SayWhenError("Somebody said when")
1669 yield i
1670
Benjamin Petersone711caf2008-06-11 16:44:04 +00001671class _TestPool(BaseTestCase):
1672
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01001673 @classmethod
1674 def setUpClass(cls):
1675 super().setUpClass()
1676 cls.pool = cls.Pool(4)
1677
1678 @classmethod
1679 def tearDownClass(cls):
1680 cls.pool.terminate()
1681 cls.pool.join()
1682 cls.pool = None
1683 super().tearDownClass()
1684
Benjamin Petersone711caf2008-06-11 16:44:04 +00001685 def test_apply(self):
1686 papply = self.pool.apply
1687 self.assertEqual(papply(sqr, (5,)), sqr(5))
1688 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1689
1690 def test_map(self):
1691 pmap = self.pool.map
1692 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1693 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1694 list(map(sqr, list(range(100)))))
1695
Antoine Pitroude911b22011-12-21 11:03:24 +01001696 def test_starmap(self):
1697 psmap = self.pool.starmap
1698 tuples = list(zip(range(10), range(9,-1, -1)))
1699 self.assertEqual(psmap(mul, tuples),
1700 list(itertools.starmap(mul, tuples)))
1701 tuples = list(zip(range(100), range(99,-1, -1)))
1702 self.assertEqual(psmap(mul, tuples, chunksize=20),
1703 list(itertools.starmap(mul, tuples)))
1704
1705 def test_starmap_async(self):
1706 tuples = list(zip(range(100), range(99,-1, -1)))
1707 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1708 list(itertools.starmap(mul, tuples)))
1709
Hynek Schlawack254af262012-10-27 12:53:02 +02001710 def test_map_async(self):
1711 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1712 list(map(sqr, list(range(10)))))
1713
1714 def test_map_async_callbacks(self):
1715 call_args = self.manager.list() if self.TYPE == 'manager' else []
1716 self.pool.map_async(int, ['1'],
1717 callback=call_args.append,
1718 error_callback=call_args.append).wait()
1719 self.assertEqual(1, len(call_args))
1720 self.assertEqual([1], call_args[0])
1721 self.pool.map_async(int, ['a'],
1722 callback=call_args.append,
1723 error_callback=call_args.append).wait()
1724 self.assertEqual(2, len(call_args))
1725 self.assertIsInstance(call_args[1], ValueError)
1726
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001727 def test_map_unplicklable(self):
1728 # Issue #19425 -- failure to pickle should not cause a hang
1729 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001730 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001731 class A(object):
1732 def __reduce__(self):
1733 raise RuntimeError('cannot pickle')
1734 with self.assertRaises(RuntimeError):
1735 self.pool.map(sqr, [A()]*10)
1736
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001737 def test_map_chunksize(self):
1738 try:
1739 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1740 except multiprocessing.TimeoutError:
1741 self.fail("pool.map_async with chunksize stalled on null list")
1742
Benjamin Petersone711caf2008-06-11 16:44:04 +00001743 def test_async(self):
1744 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1745 get = TimingWrapper(res.get)
1746 self.assertEqual(get(), 49)
1747 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1748
1749 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00001750 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001751 get = TimingWrapper(res.get)
1752 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1753 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1754
1755 def test_imap(self):
1756 it = self.pool.imap(sqr, list(range(10)))
1757 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1758
1759 it = self.pool.imap(sqr, list(range(10)))
1760 for i in range(10):
1761 self.assertEqual(next(it), i*i)
1762 self.assertRaises(StopIteration, it.__next__)
1763
1764 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1765 for i in range(1000):
1766 self.assertEqual(next(it), i*i)
1767 self.assertRaises(StopIteration, it.__next__)
1768
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001769 def test_imap_handle_iterable_exception(self):
1770 if self.TYPE == 'manager':
1771 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1772
1773 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1774 for i in range(3):
1775 self.assertEqual(next(it), i*i)
1776 self.assertRaises(SayWhenError, it.__next__)
1777
1778 # SayWhenError seen at start of problematic chunk's results
1779 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1780 for i in range(6):
1781 self.assertEqual(next(it), i*i)
1782 self.assertRaises(SayWhenError, it.__next__)
1783 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1784 for i in range(4):
1785 self.assertEqual(next(it), i*i)
1786 self.assertRaises(SayWhenError, it.__next__)
1787
Benjamin Petersone711caf2008-06-11 16:44:04 +00001788 def test_imap_unordered(self):
1789 it = self.pool.imap_unordered(sqr, list(range(1000)))
1790 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1791
1792 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1793 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1794
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001795 def test_imap_unordered_handle_iterable_exception(self):
1796 if self.TYPE == 'manager':
1797 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1798
1799 it = self.pool.imap_unordered(sqr,
1800 exception_throwing_generator(10, 3),
1801 1)
1802 with self.assertRaises(SayWhenError):
1803 # imap_unordered makes it difficult to anticipate the SayWhenError
1804 for i in range(10):
1805 self.assertEqual(next(it), i*i)
1806
1807 it = self.pool.imap_unordered(sqr,
1808 exception_throwing_generator(20, 7),
1809 2)
1810 with self.assertRaises(SayWhenError):
1811 for i in range(20):
1812 self.assertEqual(next(it), i*i)
1813
Benjamin Petersone711caf2008-06-11 16:44:04 +00001814 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001815 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1816 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1817
Benjamin Petersone711caf2008-06-11 16:44:04 +00001818 p = multiprocessing.Pool(3)
1819 self.assertEqual(3, len(p._pool))
1820 p.close()
1821 p.join()
1822
1823 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001824 result = self.pool.map_async(
1825 time.sleep, [0.1 for i in range(10000)], chunksize=1
1826 )
1827 self.pool.terminate()
1828 join = TimingWrapper(self.pool.join)
1829 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001830 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001831
Richard Oudkerke41682b2012-06-06 19:04:57 +01001832 def test_empty_iterable(self):
1833 # See Issue 12157
1834 p = self.Pool(1)
1835
1836 self.assertEqual(p.map(sqr, []), [])
1837 self.assertEqual(list(p.imap(sqr, [])), [])
1838 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1839 self.assertEqual(p.map_async(sqr, []).get(), [])
1840
1841 p.close()
1842 p.join()
1843
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001844 def test_context(self):
1845 if self.TYPE == 'processes':
1846 L = list(range(10))
1847 expected = [sqr(i) for i in L]
1848 with multiprocessing.Pool(2) as p:
1849 r = p.map_async(sqr, L)
1850 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001851 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001852
Richard Oudkerk85757832013-05-06 11:38:25 +01001853 @classmethod
1854 def _test_traceback(cls):
1855 raise RuntimeError(123) # some comment
1856
1857 def test_traceback(self):
1858 # We want ensure that the traceback from the child process is
1859 # contained in the traceback raised in the main process.
1860 if self.TYPE == 'processes':
1861 with self.Pool(1) as p:
1862 try:
1863 p.apply(self._test_traceback)
1864 except Exception as e:
1865 exc = e
1866 else:
1867 raise AssertionError('expected RuntimeError')
1868 self.assertIs(type(exc), RuntimeError)
1869 self.assertEqual(exc.args, (123,))
1870 cause = exc.__cause__
1871 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
1872 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
1873
1874 with test.support.captured_stderr() as f1:
1875 try:
1876 raise exc
1877 except RuntimeError:
1878 sys.excepthook(*sys.exc_info())
1879 self.assertIn('raise RuntimeError(123) # some comment',
1880 f1.getvalue())
1881
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001882 @classmethod
1883 def _test_wrapped_exception(cls):
1884 raise RuntimeError('foo')
1885
1886 def test_wrapped_exception(self):
1887 # Issue #20980: Should not wrap exception when using thread pool
1888 with self.Pool(1) as p:
1889 with self.assertRaises(RuntimeError):
1890 p.apply(self._test_wrapped_exception)
1891
1892
Ask Solem2afcbf22010-11-09 20:55:52 +00001893def raising():
1894 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001895
Ask Solem2afcbf22010-11-09 20:55:52 +00001896def unpickleable_result():
1897 return lambda: 42
1898
1899class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001900 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001901
1902 def test_async_error_callback(self):
1903 p = multiprocessing.Pool(2)
1904
1905 scratchpad = [None]
1906 def errback(exc):
1907 scratchpad[0] = exc
1908
1909 res = p.apply_async(raising, error_callback=errback)
1910 self.assertRaises(KeyError, res.get)
1911 self.assertTrue(scratchpad[0])
1912 self.assertIsInstance(scratchpad[0], KeyError)
1913
1914 p.close()
1915 p.join()
1916
1917 def test_unpickleable_result(self):
1918 from multiprocessing.pool import MaybeEncodingError
1919 p = multiprocessing.Pool(2)
1920
1921 # Make sure we don't lose pool processes because of encoding errors.
1922 for iteration in range(20):
1923
1924 scratchpad = [None]
1925 def errback(exc):
1926 scratchpad[0] = exc
1927
1928 res = p.apply_async(unpickleable_result, error_callback=errback)
1929 self.assertRaises(MaybeEncodingError, res.get)
1930 wrapped = scratchpad[0]
1931 self.assertTrue(wrapped)
1932 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1933 self.assertIsNotNone(wrapped.exc)
1934 self.assertIsNotNone(wrapped.value)
1935
1936 p.close()
1937 p.join()
1938
1939class _TestPoolWorkerLifetime(BaseTestCase):
1940 ALLOWED_TYPES = ('processes', )
1941
Jesse Noller1f0b6582010-01-27 03:36:01 +00001942 def test_pool_worker_lifetime(self):
1943 p = multiprocessing.Pool(3, maxtasksperchild=10)
1944 self.assertEqual(3, len(p._pool))
1945 origworkerpids = [w.pid for w in p._pool]
1946 # Run many tasks so each worker gets replaced (hopefully)
1947 results = []
1948 for i in range(100):
1949 results.append(p.apply_async(sqr, (i, )))
1950 # Fetch the results and verify we got the right answers,
1951 # also ensuring all the tasks have completed.
1952 for (j, res) in enumerate(results):
1953 self.assertEqual(res.get(), sqr(j))
1954 # Refill the pool
1955 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001956 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001957 # (countdown * DELTA = 5 seconds max startup process time)
1958 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001959 while countdown and not all(w.is_alive() for w in p._pool):
1960 countdown -= 1
1961 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001962 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001963 # All pids should be assigned. See issue #7805.
1964 self.assertNotIn(None, origworkerpids)
1965 self.assertNotIn(None, finalworkerpids)
1966 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001967 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1968 p.close()
1969 p.join()
1970
Charles-François Natalif8859e12011-10-24 18:45:29 +02001971 def test_pool_worker_lifetime_early_close(self):
1972 # Issue #10332: closing a pool whose workers have limited lifetimes
1973 # before all the tasks completed would make join() hang.
1974 p = multiprocessing.Pool(3, maxtasksperchild=1)
1975 results = []
1976 for i in range(6):
1977 results.append(p.apply_async(sqr, (i, 0.3)))
1978 p.close()
1979 p.join()
1980 # check the results
1981 for (j, res) in enumerate(results):
1982 self.assertEqual(res.get(), sqr(j))
1983
Benjamin Petersone711caf2008-06-11 16:44:04 +00001984#
1985# Test of creating a customized manager class
1986#
1987
1988from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1989
1990class FooBar(object):
1991 def f(self):
1992 return 'f()'
1993 def g(self):
1994 raise ValueError
1995 def _h(self):
1996 return '_h()'
1997
1998def baz():
1999 for i in range(10):
2000 yield i*i
2001
2002class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00002003 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002004 def __iter__(self):
2005 return self
2006 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002007 return self._callmethod('__next__')
2008
2009class MyManager(BaseManager):
2010 pass
2011
2012MyManager.register('Foo', callable=FooBar)
2013MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2014MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2015
2016
2017class _TestMyManager(BaseTestCase):
2018
2019 ALLOWED_TYPES = ('manager',)
2020
2021 def test_mymanager(self):
2022 manager = MyManager()
2023 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01002024 self.common(manager)
2025 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002026
Richard Oudkerkac385712012-06-18 21:29:30 +01002027 # If the manager process exited cleanly then the exitcode
2028 # will be zero. Otherwise (after a short timeout)
2029 # terminate() is used, resulting in an exitcode of -SIGTERM.
2030 self.assertEqual(manager._process.exitcode, 0)
2031
2032 def test_mymanager_context(self):
2033 with MyManager() as manager:
2034 self.common(manager)
2035 self.assertEqual(manager._process.exitcode, 0)
2036
2037 def test_mymanager_context_prestarted(self):
2038 manager = MyManager()
2039 manager.start()
2040 with manager:
2041 self.common(manager)
2042 self.assertEqual(manager._process.exitcode, 0)
2043
2044 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002045 foo = manager.Foo()
2046 bar = manager.Bar()
2047 baz = manager.baz()
2048
2049 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2050 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2051
2052 self.assertEqual(foo_methods, ['f', 'g'])
2053 self.assertEqual(bar_methods, ['f', '_h'])
2054
2055 self.assertEqual(foo.f(), 'f()')
2056 self.assertRaises(ValueError, foo.g)
2057 self.assertEqual(foo._callmethod('f'), 'f()')
2058 self.assertRaises(RemoteError, foo._callmethod, '_h')
2059
2060 self.assertEqual(bar.f(), 'f()')
2061 self.assertEqual(bar._h(), '_h()')
2062 self.assertEqual(bar._callmethod('f'), 'f()')
2063 self.assertEqual(bar._callmethod('_h'), '_h()')
2064
2065 self.assertEqual(list(baz), [i*i for i in range(10)])
2066
Richard Oudkerk73d9a292012-06-14 15:30:10 +01002067
Benjamin Petersone711caf2008-06-11 16:44:04 +00002068#
2069# Test of connecting to a remote server and using xmlrpclib for serialization
2070#
2071
2072_queue = pyqueue.Queue()
2073def get_queue():
2074 return _queue
2075
2076class QueueManager(BaseManager):
2077 '''manager class used by server process'''
2078QueueManager.register('get_queue', callable=get_queue)
2079
2080class QueueManager2(BaseManager):
2081 '''manager class which specifies the same interface as QueueManager'''
2082QueueManager2.register('get_queue')
2083
2084
2085SERIALIZER = 'xmlrpclib'
2086
2087class _TestRemoteManager(BaseTestCase):
2088
2089 ALLOWED_TYPES = ('manager',)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002090 values = ['hello world', None, True, 2.25,
2091 'hall\xe5 v\xe4rlden',
2092 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2093 b'hall\xe5 v\xe4rlden',
2094 ]
2095 result = values[:]
Benjamin Petersone711caf2008-06-11 16:44:04 +00002096
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002097 @classmethod
2098 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002099 manager = QueueManager2(
2100 address=address, authkey=authkey, serializer=SERIALIZER
2101 )
2102 manager.connect()
2103 queue = manager.get_queue()
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002104 # Note that xmlrpclib will deserialize object as a list not a tuple
2105 queue.put(tuple(cls.values))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002106
2107 def test_remote(self):
2108 authkey = os.urandom(32)
2109
2110 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002111 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00002112 )
2113 manager.start()
2114
2115 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002116 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002117 p.start()
2118
2119 manager2 = QueueManager2(
2120 address=manager.address, authkey=authkey, serializer=SERIALIZER
2121 )
2122 manager2.connect()
2123 queue = manager2.get_queue()
2124
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002125 self.assertEqual(queue.get(), self.result)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002126
2127 # Because we are using xmlrpclib for serialization instead of
2128 # pickle this will cause a serialization error.
2129 self.assertRaises(Exception, queue.put, time.sleep)
2130
2131 # Make queue finalizer run before the server is stopped
2132 del queue
2133 manager.shutdown()
2134
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002135class _TestManagerRestart(BaseTestCase):
2136
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002137 @classmethod
2138 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002139 manager = QueueManager(
2140 address=address, authkey=authkey, serializer=SERIALIZER)
2141 manager.connect()
2142 queue = manager.get_queue()
2143 queue.put('hello world')
2144
2145 def test_rapid_restart(self):
2146 authkey = os.urandom(32)
2147 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002148 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002149 srvr = manager.get_server()
2150 addr = srvr.address
2151 # Close the connection.Listener socket which gets opened as a part
2152 # of manager.get_server(). It's not needed for the test.
2153 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002154 manager.start()
2155
2156 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002157 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002158 p.start()
2159 queue = manager.get_queue()
2160 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002161 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002162 manager.shutdown()
2163 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002164 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002165 try:
2166 manager.start()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002167 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002168 if e.errno != errno.EADDRINUSE:
2169 raise
2170 # Retry after some time, in case the old socket was lingering
2171 # (sporadic failure on buildbots)
2172 time.sleep(1.0)
2173 manager = QueueManager(
2174 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002175 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002176
Benjamin Petersone711caf2008-06-11 16:44:04 +00002177#
2178#
2179#
2180
2181SENTINEL = latin('')
2182
2183class _TestConnection(BaseTestCase):
2184
2185 ALLOWED_TYPES = ('processes', 'threads')
2186
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002187 @classmethod
2188 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002189 for msg in iter(conn.recv_bytes, SENTINEL):
2190 conn.send_bytes(msg)
2191 conn.close()
2192
2193 def test_connection(self):
2194 conn, child_conn = self.Pipe()
2195
2196 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002197 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002198 p.start()
2199
2200 seq = [1, 2.25, None]
2201 msg = latin('hello world')
2202 longmsg = msg * 10
2203 arr = array.array('i', list(range(4)))
2204
2205 if self.TYPE == 'processes':
2206 self.assertEqual(type(conn.fileno()), int)
2207
2208 self.assertEqual(conn.send(seq), None)
2209 self.assertEqual(conn.recv(), seq)
2210
2211 self.assertEqual(conn.send_bytes(msg), None)
2212 self.assertEqual(conn.recv_bytes(), msg)
2213
2214 if self.TYPE == 'processes':
2215 buffer = array.array('i', [0]*10)
2216 expected = list(arr) + [0] * (10 - len(arr))
2217 self.assertEqual(conn.send_bytes(arr), None)
2218 self.assertEqual(conn.recv_bytes_into(buffer),
2219 len(arr) * buffer.itemsize)
2220 self.assertEqual(list(buffer), expected)
2221
2222 buffer = array.array('i', [0]*10)
2223 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2224 self.assertEqual(conn.send_bytes(arr), None)
2225 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2226 len(arr) * buffer.itemsize)
2227 self.assertEqual(list(buffer), expected)
2228
2229 buffer = bytearray(latin(' ' * 40))
2230 self.assertEqual(conn.send_bytes(longmsg), None)
2231 try:
2232 res = conn.recv_bytes_into(buffer)
2233 except multiprocessing.BufferTooShort as e:
2234 self.assertEqual(e.args, (longmsg,))
2235 else:
2236 self.fail('expected BufferTooShort, got %s' % res)
2237
2238 poll = TimingWrapper(conn.poll)
2239
2240 self.assertEqual(poll(), False)
2241 self.assertTimingAlmostEqual(poll.elapsed, 0)
2242
Richard Oudkerk59d54042012-05-10 16:11:12 +01002243 self.assertEqual(poll(-1), False)
2244 self.assertTimingAlmostEqual(poll.elapsed, 0)
2245
Benjamin Petersone711caf2008-06-11 16:44:04 +00002246 self.assertEqual(poll(TIMEOUT1), False)
2247 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2248
2249 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002250 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002251
2252 self.assertEqual(poll(TIMEOUT1), True)
2253 self.assertTimingAlmostEqual(poll.elapsed, 0)
2254
2255 self.assertEqual(conn.recv(), None)
2256
2257 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2258 conn.send_bytes(really_big_msg)
2259 self.assertEqual(conn.recv_bytes(), really_big_msg)
2260
2261 conn.send_bytes(SENTINEL) # tell child to quit
2262 child_conn.close()
2263
2264 if self.TYPE == 'processes':
2265 self.assertEqual(conn.readable, True)
2266 self.assertEqual(conn.writable, True)
2267 self.assertRaises(EOFError, conn.recv)
2268 self.assertRaises(EOFError, conn.recv_bytes)
2269
2270 p.join()
2271
2272 def test_duplex_false(self):
2273 reader, writer = self.Pipe(duplex=False)
2274 self.assertEqual(writer.send(1), None)
2275 self.assertEqual(reader.recv(), 1)
2276 if self.TYPE == 'processes':
2277 self.assertEqual(reader.readable, True)
2278 self.assertEqual(reader.writable, False)
2279 self.assertEqual(writer.readable, False)
2280 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002281 self.assertRaises(OSError, reader.send, 2)
2282 self.assertRaises(OSError, writer.recv)
2283 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002284
2285 def test_spawn_close(self):
2286 # We test that a pipe connection can be closed by parent
2287 # process immediately after child is spawned. On Windows this
2288 # would have sometimes failed on old versions because
2289 # child_conn would be closed before the child got a chance to
2290 # duplicate it.
2291 conn, child_conn = self.Pipe()
2292
2293 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002294 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002295 p.start()
2296 child_conn.close() # this might complete before child initializes
2297
2298 msg = latin('hello')
2299 conn.send_bytes(msg)
2300 self.assertEqual(conn.recv_bytes(), msg)
2301
2302 conn.send_bytes(SENTINEL)
2303 conn.close()
2304 p.join()
2305
2306 def test_sendbytes(self):
2307 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002308 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002309
2310 msg = latin('abcdefghijklmnopqrstuvwxyz')
2311 a, b = self.Pipe()
2312
2313 a.send_bytes(msg)
2314 self.assertEqual(b.recv_bytes(), msg)
2315
2316 a.send_bytes(msg, 5)
2317 self.assertEqual(b.recv_bytes(), msg[5:])
2318
2319 a.send_bytes(msg, 7, 8)
2320 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2321
2322 a.send_bytes(msg, 26)
2323 self.assertEqual(b.recv_bytes(), latin(''))
2324
2325 a.send_bytes(msg, 26, 0)
2326 self.assertEqual(b.recv_bytes(), latin(''))
2327
2328 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2329
2330 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2331
2332 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2333
2334 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2335
2336 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2337
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002338 @classmethod
2339 def _is_fd_assigned(cls, fd):
2340 try:
2341 os.fstat(fd)
2342 except OSError as e:
2343 if e.errno == errno.EBADF:
2344 return False
2345 raise
2346 else:
2347 return True
2348
2349 @classmethod
2350 def _writefd(cls, conn, data, create_dummy_fds=False):
2351 if create_dummy_fds:
2352 for i in range(0, 256):
2353 if not cls._is_fd_assigned(i):
2354 os.dup2(conn.fileno(), i)
2355 fd = reduction.recv_handle(conn)
2356 if msvcrt:
2357 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2358 os.write(fd, data)
2359 os.close(fd)
2360
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002361 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002362 def test_fd_transfer(self):
2363 if self.TYPE != 'processes':
2364 self.skipTest("only makes sense with processes")
2365 conn, child_conn = self.Pipe(duplex=True)
2366
2367 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002368 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002369 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002370 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002371 with open(test.support.TESTFN, "wb") as f:
2372 fd = f.fileno()
2373 if msvcrt:
2374 fd = msvcrt.get_osfhandle(fd)
2375 reduction.send_handle(conn, fd, p.pid)
2376 p.join()
2377 with open(test.support.TESTFN, "rb") as f:
2378 self.assertEqual(f.read(), b"foo")
2379
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002380 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002381 @unittest.skipIf(sys.platform == "win32",
2382 "test semantics don't make sense on Windows")
2383 @unittest.skipIf(MAXFD <= 256,
2384 "largest assignable fd number is too small")
2385 @unittest.skipUnless(hasattr(os, "dup2"),
2386 "test needs os.dup2()")
2387 def test_large_fd_transfer(self):
2388 # With fd > 256 (issue #11657)
2389 if self.TYPE != 'processes':
2390 self.skipTest("only makes sense with processes")
2391 conn, child_conn = self.Pipe(duplex=True)
2392
2393 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002394 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002395 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002396 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002397 with open(test.support.TESTFN, "wb") as f:
2398 fd = f.fileno()
2399 for newfd in range(256, MAXFD):
2400 if not self._is_fd_assigned(newfd):
2401 break
2402 else:
2403 self.fail("could not find an unassigned large file descriptor")
2404 os.dup2(fd, newfd)
2405 try:
2406 reduction.send_handle(conn, newfd, p.pid)
2407 finally:
2408 os.close(newfd)
2409 p.join()
2410 with open(test.support.TESTFN, "rb") as f:
2411 self.assertEqual(f.read(), b"bar")
2412
Jesus Cea4507e642011-09-21 03:53:25 +02002413 @classmethod
2414 def _send_data_without_fd(self, conn):
2415 os.write(conn.fileno(), b"\0")
2416
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002417 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002418 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2419 def test_missing_fd_transfer(self):
2420 # Check that exception is raised when received data is not
2421 # accompanied by a file descriptor in ancillary data.
2422 if self.TYPE != 'processes':
2423 self.skipTest("only makes sense with processes")
2424 conn, child_conn = self.Pipe(duplex=True)
2425
2426 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2427 p.daemon = True
2428 p.start()
2429 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2430 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002431
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002432 def test_context(self):
2433 a, b = self.Pipe()
2434
2435 with a, b:
2436 a.send(1729)
2437 self.assertEqual(b.recv(), 1729)
2438 if self.TYPE == 'processes':
2439 self.assertFalse(a.closed)
2440 self.assertFalse(b.closed)
2441
2442 if self.TYPE == 'processes':
2443 self.assertTrue(a.closed)
2444 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002445 self.assertRaises(OSError, a.recv)
2446 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002447
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002448class _TestListener(BaseTestCase):
2449
Richard Oudkerk91257752012-06-15 21:53:34 +01002450 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002451
2452 def test_multiple_bind(self):
2453 for family in self.connection.families:
2454 l = self.connection.Listener(family=family)
2455 self.addCleanup(l.close)
2456 self.assertRaises(OSError, self.connection.Listener,
2457 l.address, family)
2458
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002459 def test_context(self):
2460 with self.connection.Listener() as l:
2461 with self.connection.Client(l.address) as c:
2462 with l.accept() as d:
2463 c.send(1729)
2464 self.assertEqual(d.recv(), 1729)
2465
2466 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002467 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002468
Benjamin Petersone711caf2008-06-11 16:44:04 +00002469class _TestListenerClient(BaseTestCase):
2470
2471 ALLOWED_TYPES = ('processes', 'threads')
2472
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002473 @classmethod
2474 def _test(cls, address):
2475 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002476 conn.send('hello')
2477 conn.close()
2478
2479 def test_listener_client(self):
2480 for family in self.connection.families:
2481 l = self.connection.Listener(family=family)
2482 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002483 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002484 p.start()
2485 conn = l.accept()
2486 self.assertEqual(conn.recv(), 'hello')
2487 p.join()
2488 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002489
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002490 def test_issue14725(self):
2491 l = self.connection.Listener()
2492 p = self.Process(target=self._test, args=(l.address,))
2493 p.daemon = True
2494 p.start()
2495 time.sleep(1)
2496 # On Windows the client process should by now have connected,
2497 # written data and closed the pipe handle by now. This causes
2498 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2499 # 14725.
2500 conn = l.accept()
2501 self.assertEqual(conn.recv(), 'hello')
2502 conn.close()
2503 p.join()
2504 l.close()
2505
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002506 def test_issue16955(self):
2507 for fam in self.connection.families:
2508 l = self.connection.Listener(family=fam)
2509 c = self.connection.Client(l.address)
2510 a = l.accept()
2511 a.send_bytes(b"hello")
2512 self.assertTrue(c.poll(1))
2513 a.close()
2514 c.close()
2515 l.close()
2516
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002517class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002518
2519 ALLOWED_TYPES = ('processes', 'threads')
2520
2521 def test_empty_string(self):
2522 a, b = self.Pipe()
2523 self.assertEqual(a.poll(), False)
2524 b.send_bytes(b'')
2525 self.assertEqual(a.poll(), True)
2526 self.assertEqual(a.poll(), True)
2527
2528 @classmethod
2529 def _child_strings(cls, conn, strings):
2530 for s in strings:
2531 time.sleep(0.1)
2532 conn.send_bytes(s)
2533 conn.close()
2534
2535 def test_strings(self):
2536 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2537 a, b = self.Pipe()
2538 p = self.Process(target=self._child_strings, args=(b, strings))
2539 p.start()
2540
2541 for s in strings:
2542 for i in range(200):
2543 if a.poll(0.01):
2544 break
2545 x = a.recv_bytes()
2546 self.assertEqual(s, x)
2547
2548 p.join()
2549
2550 @classmethod
2551 def _child_boundaries(cls, r):
2552 # Polling may "pull" a message in to the child process, but we
2553 # don't want it to pull only part of a message, as that would
2554 # corrupt the pipe for any other processes which might later
2555 # read from it.
2556 r.poll(5)
2557
2558 def test_boundaries(self):
2559 r, w = self.Pipe(False)
2560 p = self.Process(target=self._child_boundaries, args=(r,))
2561 p.start()
2562 time.sleep(2)
2563 L = [b"first", b"second"]
2564 for obj in L:
2565 w.send_bytes(obj)
2566 w.close()
2567 p.join()
2568 self.assertIn(r.recv_bytes(), L)
2569
2570 @classmethod
2571 def _child_dont_merge(cls, b):
2572 b.send_bytes(b'a')
2573 b.send_bytes(b'b')
2574 b.send_bytes(b'cd')
2575
2576 def test_dont_merge(self):
2577 a, b = self.Pipe()
2578 self.assertEqual(a.poll(0.0), False)
2579 self.assertEqual(a.poll(0.1), False)
2580
2581 p = self.Process(target=self._child_dont_merge, args=(b,))
2582 p.start()
2583
2584 self.assertEqual(a.recv_bytes(), b'a')
2585 self.assertEqual(a.poll(1.0), True)
2586 self.assertEqual(a.poll(1.0), True)
2587 self.assertEqual(a.recv_bytes(), b'b')
2588 self.assertEqual(a.poll(1.0), True)
2589 self.assertEqual(a.poll(1.0), True)
2590 self.assertEqual(a.poll(0.0), True)
2591 self.assertEqual(a.recv_bytes(), b'cd')
2592
2593 p.join()
2594
Benjamin Petersone711caf2008-06-11 16:44:04 +00002595#
2596# Test of sending connection and socket objects between processes
2597#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002598
2599@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002600class _TestPicklingConnections(BaseTestCase):
2601
2602 ALLOWED_TYPES = ('processes',)
2603
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002604 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002605 def tearDownClass(cls):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002606 from multiprocessing import resource_sharer
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002607 resource_sharer.stop(timeout=5)
2608
2609 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002610 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002611 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002612 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002613 conn.send(l.address)
2614 new_conn = l.accept()
2615 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002616 new_conn.close()
2617 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002618
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002619 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002620 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01002621 l.listen()
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002622 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002623 new_conn, addr = l.accept()
2624 conn.send(new_conn)
2625 new_conn.close()
2626 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002627
2628 conn.recv()
2629
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002630 @classmethod
2631 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002632 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002633 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002634 client.send(msg.upper())
2635 client.close()
2636
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002637 address, msg = conn.recv()
2638 client = socket.socket()
2639 client.connect(address)
2640 client.sendall(msg.upper())
2641 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002642
2643 conn.close()
2644
2645 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002646 families = self.connection.families
2647
2648 lconn, lconn0 = self.Pipe()
2649 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002650 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002651 lp.start()
2652 lconn0.close()
2653
2654 rconn, rconn0 = self.Pipe()
2655 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002656 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002657 rp.start()
2658 rconn0.close()
2659
2660 for fam in families:
2661 msg = ('This connection uses family %s' % fam).encode('ascii')
2662 address = lconn.recv()
2663 rconn.send((address, msg))
2664 new_conn = lconn.recv()
2665 self.assertEqual(new_conn.recv(), msg.upper())
2666
2667 rconn.send(None)
2668
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002669 msg = latin('This connection uses a normal socket')
2670 address = lconn.recv()
2671 rconn.send((address, msg))
2672 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002673 buf = []
2674 while True:
2675 s = new_conn.recv(100)
2676 if not s:
2677 break
2678 buf.append(s)
2679 buf = b''.join(buf)
2680 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002681 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002682
2683 lconn.send(None)
2684
2685 rconn.close()
2686 lconn.close()
2687
2688 lp.join()
2689 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002690
2691 @classmethod
2692 def child_access(cls, conn):
2693 w = conn.recv()
2694 w.send('all is well')
2695 w.close()
2696
2697 r = conn.recv()
2698 msg = r.recv()
2699 conn.send(msg*2)
2700
2701 conn.close()
2702
2703 def test_access(self):
2704 # On Windows, if we do not specify a destination pid when
2705 # using DupHandle then we need to be careful to use the
2706 # correct access flags for DuplicateHandle(), or else
2707 # DupHandle.detach() will raise PermissionError. For example,
2708 # for a read only pipe handle we should use
2709 # access=FILE_GENERIC_READ. (Unfortunately
2710 # DUPLICATE_SAME_ACCESS does not work.)
2711 conn, child_conn = self.Pipe()
2712 p = self.Process(target=self.child_access, args=(child_conn,))
2713 p.daemon = True
2714 p.start()
2715 child_conn.close()
2716
2717 r, w = self.Pipe(duplex=False)
2718 conn.send(w)
2719 w.close()
2720 self.assertEqual(r.recv(), 'all is well')
2721 r.close()
2722
2723 r, w = self.Pipe(duplex=False)
2724 conn.send(r)
2725 r.close()
2726 w.send('foobar')
2727 w.close()
2728 self.assertEqual(conn.recv(), 'foobar'*2)
2729
Benjamin Petersone711caf2008-06-11 16:44:04 +00002730#
2731#
2732#
2733
2734class _TestHeap(BaseTestCase):
2735
2736 ALLOWED_TYPES = ('processes',)
2737
2738 def test_heap(self):
2739 iterations = 5000
2740 maxblocks = 50
2741 blocks = []
2742
2743 # create and destroy lots of blocks of different sizes
2744 for i in range(iterations):
2745 size = int(random.lognormvariate(0, 1) * 1000)
2746 b = multiprocessing.heap.BufferWrapper(size)
2747 blocks.append(b)
2748 if len(blocks) > maxblocks:
2749 i = random.randrange(maxblocks)
2750 del blocks[i]
2751
2752 # get the heap object
2753 heap = multiprocessing.heap.BufferWrapper._heap
2754
2755 # verify the state of the heap
2756 all = []
2757 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002758 heap._lock.acquire()
2759 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002760 for L in list(heap._len_to_seq.values()):
2761 for arena, start, stop in L:
2762 all.append((heap._arenas.index(arena), start, stop,
2763 stop-start, 'free'))
2764 for arena, start, stop in heap._allocated_blocks:
2765 all.append((heap._arenas.index(arena), start, stop,
2766 stop-start, 'occupied'))
2767 occupied += (stop-start)
2768
2769 all.sort()
2770
2771 for i in range(len(all)-1):
2772 (arena, start, stop) = all[i][:3]
2773 (narena, nstart, nstop) = all[i+1][:3]
2774 self.assertTrue((arena != narena and nstart == 0) or
2775 (stop == nstart))
2776
Charles-François Natali778db492011-07-02 14:35:49 +02002777 def test_free_from_gc(self):
2778 # Check that freeing of blocks by the garbage collector doesn't deadlock
2779 # (issue #12352).
2780 # Make sure the GC is enabled, and set lower collection thresholds to
2781 # make collections more frequent (and increase the probability of
2782 # deadlock).
2783 if not gc.isenabled():
2784 gc.enable()
2785 self.addCleanup(gc.disable)
2786 thresholds = gc.get_threshold()
2787 self.addCleanup(gc.set_threshold, *thresholds)
2788 gc.set_threshold(10)
2789
2790 # perform numerous block allocations, with cyclic references to make
2791 # sure objects are collected asynchronously by the gc
2792 for i in range(5000):
2793 a = multiprocessing.heap.BufferWrapper(1)
2794 b = multiprocessing.heap.BufferWrapper(1)
2795 # circular references
2796 a.buddy = b
2797 b.buddy = a
2798
Benjamin Petersone711caf2008-06-11 16:44:04 +00002799#
2800#
2801#
2802
Benjamin Petersone711caf2008-06-11 16:44:04 +00002803class _Foo(Structure):
2804 _fields_ = [
2805 ('x', c_int),
2806 ('y', c_double)
2807 ]
2808
2809class _TestSharedCTypes(BaseTestCase):
2810
2811 ALLOWED_TYPES = ('processes',)
2812
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002813 def setUp(self):
2814 if not HAS_SHAREDCTYPES:
2815 self.skipTest("requires multiprocessing.sharedctypes")
2816
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002817 @classmethod
2818 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002819 x.value *= 2
2820 y.value *= 2
2821 foo.x *= 2
2822 foo.y *= 2
2823 string.value *= 2
2824 for i in range(len(arr)):
2825 arr[i] *= 2
2826
2827 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002828 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002829 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002830 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002831 arr = self.Array('d', list(range(10)), lock=lock)
2832 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002833 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002834
2835 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002836 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002837 p.start()
2838 p.join()
2839
2840 self.assertEqual(x.value, 14)
2841 self.assertAlmostEqual(y.value, 2.0/3.0)
2842 self.assertEqual(foo.x, 6)
2843 self.assertAlmostEqual(foo.y, 4.0)
2844 for i in range(10):
2845 self.assertAlmostEqual(arr[i], i*2)
2846 self.assertEqual(string.value, latin('hellohello'))
2847
2848 def test_synchronize(self):
2849 self.test_sharedctypes(lock=True)
2850
2851 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002852 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002853 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002854 foo.x = 0
2855 foo.y = 0
2856 self.assertEqual(bar.x, 2)
2857 self.assertAlmostEqual(bar.y, 5.0)
2858
2859#
2860#
2861#
2862
2863class _TestFinalize(BaseTestCase):
2864
2865 ALLOWED_TYPES = ('processes',)
2866
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002867 @classmethod
2868 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002869 class Foo(object):
2870 pass
2871
2872 a = Foo()
2873 util.Finalize(a, conn.send, args=('a',))
2874 del a # triggers callback for a
2875
2876 b = Foo()
2877 close_b = util.Finalize(b, conn.send, args=('b',))
2878 close_b() # triggers callback for b
2879 close_b() # does nothing because callback has already been called
2880 del b # does nothing because callback has already been called
2881
2882 c = Foo()
2883 util.Finalize(c, conn.send, args=('c',))
2884
2885 d10 = Foo()
2886 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2887
2888 d01 = Foo()
2889 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2890 d02 = Foo()
2891 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2892 d03 = Foo()
2893 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2894
2895 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2896
2897 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2898
Ezio Melotti13925002011-03-16 11:05:33 +02002899 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002900 # garbage collecting locals
2901 util._exit_function()
2902 conn.close()
2903 os._exit(0)
2904
2905 def test_finalize(self):
2906 conn, child_conn = self.Pipe()
2907
2908 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002909 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002910 p.start()
2911 p.join()
2912
2913 result = [obj for obj in iter(conn.recv, 'STOP')]
2914 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2915
2916#
2917# Test that from ... import * works for each module
2918#
2919
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002920class _TestImportStar(unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002921
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002922 def get_module_names(self):
2923 import glob
2924 folder = os.path.dirname(multiprocessing.__file__)
2925 pattern = os.path.join(folder, '*.py')
2926 files = glob.glob(pattern)
2927 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
2928 modules = ['multiprocessing.' + m for m in modules]
2929 modules.remove('multiprocessing.__init__')
2930 modules.append('multiprocessing')
2931 return modules
Benjamin Petersone711caf2008-06-11 16:44:04 +00002932
2933 def test_import(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002934 modules = self.get_module_names()
2935 if sys.platform == 'win32':
2936 modules.remove('multiprocessing.popen_fork')
2937 modules.remove('multiprocessing.popen_forkserver')
2938 modules.remove('multiprocessing.popen_spawn_posix')
2939 else:
2940 modules.remove('multiprocessing.popen_spawn_win32')
2941 if not HAS_REDUCTION:
2942 modules.remove('multiprocessing.popen_forkserver')
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002943
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002944 if c_int is None:
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002945 # This module requires _ctypes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002946 modules.remove('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002947
2948 for name in modules:
2949 __import__(name)
2950 mod = sys.modules[name]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002951 self.assertTrue(hasattr(mod, '__all__'), name)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002952
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002953 for attr in mod.__all__:
Benjamin Petersone711caf2008-06-11 16:44:04 +00002954 self.assertTrue(
2955 hasattr(mod, attr),
2956 '%r does not have attribute %r' % (mod, attr)
2957 )
2958
2959#
2960# Quick test that logging works -- does not test logging output
2961#
2962
2963class _TestLogging(BaseTestCase):
2964
2965 ALLOWED_TYPES = ('processes',)
2966
2967 def test_enable_logging(self):
2968 logger = multiprocessing.get_logger()
2969 logger.setLevel(util.SUBWARNING)
2970 self.assertTrue(logger is not None)
2971 logger.debug('this will not be printed')
2972 logger.info('nor will this')
2973 logger.setLevel(LOG_LEVEL)
2974
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002975 @classmethod
2976 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002977 logger = multiprocessing.get_logger()
2978 conn.send(logger.getEffectiveLevel())
2979
2980 def test_level(self):
2981 LEVEL1 = 32
2982 LEVEL2 = 37
2983
2984 logger = multiprocessing.get_logger()
2985 root_logger = logging.getLogger()
2986 root_level = root_logger.level
2987
2988 reader, writer = multiprocessing.Pipe(duplex=False)
2989
2990 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002991 p = self.Process(target=self._test_level, args=(writer,))
2992 p.daemon = True
2993 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002994 self.assertEqual(LEVEL1, reader.recv())
2995
2996 logger.setLevel(logging.NOTSET)
2997 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002998 p = self.Process(target=self._test_level, args=(writer,))
2999 p.daemon = True
3000 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003001 self.assertEqual(LEVEL2, reader.recv())
3002
3003 root_logger.setLevel(root_level)
3004 logger.setLevel(level=LOG_LEVEL)
3005
Jesse Nollerb9a49b72009-11-21 18:09:38 +00003006
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00003007# class _TestLoggingProcessName(BaseTestCase):
3008#
3009# def handle(self, record):
3010# assert record.processName == multiprocessing.current_process().name
3011# self.__handled = True
3012#
3013# def test_logging(self):
3014# handler = logging.Handler()
3015# handler.handle = self.handle
3016# self.__handled = False
3017# # Bypass getLogger() and side-effects
3018# logger = logging.getLoggerClass()(
3019# 'multiprocessing.test.TestLoggingProcessName')
3020# logger.addHandler(handler)
3021# logger.propagate = False
3022#
3023# logger.warn('foo')
3024# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00003025
Benjamin Petersone711caf2008-06-11 16:44:04 +00003026#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003027# Check that Process.join() retries if os.waitpid() fails with EINTR
3028#
3029
3030class _TestPollEintr(BaseTestCase):
3031
3032 ALLOWED_TYPES = ('processes',)
3033
3034 @classmethod
3035 def _killer(cls, pid):
Richard Oudkerk6a53af82013-08-28 13:50:19 +01003036 time.sleep(0.1)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003037 os.kill(pid, signal.SIGUSR1)
3038
3039 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3040 def test_poll_eintr(self):
3041 got_signal = [False]
3042 def record(*args):
3043 got_signal[0] = True
3044 pid = os.getpid()
3045 oldhandler = signal.signal(signal.SIGUSR1, record)
3046 try:
3047 killer = self.Process(target=self._killer, args=(pid,))
3048 killer.start()
Richard Oudkerk6a53af82013-08-28 13:50:19 +01003049 try:
3050 p = self.Process(target=time.sleep, args=(2,))
3051 p.start()
3052 p.join()
3053 finally:
3054 killer.join()
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003055 self.assertTrue(got_signal[0])
3056 self.assertEqual(p.exitcode, 0)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003057 finally:
3058 signal.signal(signal.SIGUSR1, oldhandler)
3059
3060#
Jesse Noller6214edd2009-01-19 16:23:53 +00003061# Test to verify handle verification, see issue 3321
3062#
3063
3064class TestInvalidHandle(unittest.TestCase):
3065
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003066 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00003067 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003068 conn = multiprocessing.connection.Connection(44977608)
Charles-François Natali6703bb42013-09-06 21:12:22 +02003069 # check that poll() doesn't crash
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003070 try:
Charles-François Natali6703bb42013-09-06 21:12:22 +02003071 conn.poll()
3072 except (ValueError, OSError):
3073 pass
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003074 finally:
3075 # Hack private attribute _handle to avoid printing an error
3076 # in conn.__del__
3077 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003078 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003079 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003080
Benjamin Petersone711caf2008-06-11 16:44:04 +00003081
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003082
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003083class OtherTest(unittest.TestCase):
3084 # TODO: add more tests for deliver/answer challenge.
3085 def test_deliver_challenge_auth_failure(self):
3086 class _FakeConnection(object):
3087 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003088 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003089 def send_bytes(self, data):
3090 pass
3091 self.assertRaises(multiprocessing.AuthenticationError,
3092 multiprocessing.connection.deliver_challenge,
3093 _FakeConnection(), b'abc')
3094
3095 def test_answer_challenge_auth_failure(self):
3096 class _FakeConnection(object):
3097 def __init__(self):
3098 self.count = 0
3099 def recv_bytes(self, size):
3100 self.count += 1
3101 if self.count == 1:
3102 return multiprocessing.connection.CHALLENGE
3103 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003104 return b'something bogus'
3105 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003106 def send_bytes(self, data):
3107 pass
3108 self.assertRaises(multiprocessing.AuthenticationError,
3109 multiprocessing.connection.answer_challenge,
3110 _FakeConnection(), b'abc')
3111
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003112#
3113# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3114#
3115
3116def initializer(ns):
3117 ns.test += 1
3118
3119class TestInitializers(unittest.TestCase):
3120 def setUp(self):
3121 self.mgr = multiprocessing.Manager()
3122 self.ns = self.mgr.Namespace()
3123 self.ns.test = 0
3124
3125 def tearDown(self):
3126 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003127 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003128
3129 def test_manager_initializer(self):
3130 m = multiprocessing.managers.SyncManager()
3131 self.assertRaises(TypeError, m.start, 1)
3132 m.start(initializer, (self.ns,))
3133 self.assertEqual(self.ns.test, 1)
3134 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003135 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003136
3137 def test_pool_initializer(self):
3138 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3139 p = multiprocessing.Pool(1, initializer, (self.ns,))
3140 p.close()
3141 p.join()
3142 self.assertEqual(self.ns.test, 1)
3143
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003144#
3145# Issue 5155, 5313, 5331: Test process in processes
3146# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3147#
3148
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003149def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003150 try:
3151 item = q.get(block=False)
3152 except pyqueue.Empty:
3153 pass
3154
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003155def _test_process(q):
3156 queue = multiprocessing.Queue()
3157 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3158 subProc.daemon = True
3159 subProc.start()
3160 subProc.join()
3161
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003162def _afunc(x):
3163 return x*x
3164
3165def pool_in_process():
3166 pool = multiprocessing.Pool(processes=4)
3167 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003168 pool.close()
3169 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003170
3171class _file_like(object):
3172 def __init__(self, delegate):
3173 self._delegate = delegate
3174 self._pid = None
3175
3176 @property
3177 def cache(self):
3178 pid = os.getpid()
3179 # There are no race conditions since fork keeps only the running thread
3180 if pid != self._pid:
3181 self._pid = pid
3182 self._cache = []
3183 return self._cache
3184
3185 def write(self, data):
3186 self.cache.append(data)
3187
3188 def flush(self):
3189 self._delegate.write(''.join(self.cache))
3190 self._cache = []
3191
3192class TestStdinBadfiledescriptor(unittest.TestCase):
3193
3194 def test_queue_in_process(self):
3195 queue = multiprocessing.Queue()
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003196 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003197 proc.start()
3198 proc.join()
3199
3200 def test_pool_in_process(self):
3201 p = multiprocessing.Process(target=pool_in_process)
3202 p.start()
3203 p.join()
3204
3205 def test_flushing(self):
3206 sio = io.StringIO()
3207 flike = _file_like(sio)
3208 flike.write('foo')
3209 proc = multiprocessing.Process(target=lambda: flike.flush())
3210 flike.flush()
3211 assert sio.getvalue() == 'foo'
3212
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003213
3214class TestWait(unittest.TestCase):
3215
3216 @classmethod
3217 def _child_test_wait(cls, w, slow):
3218 for i in range(10):
3219 if slow:
3220 time.sleep(random.random()*0.1)
3221 w.send((i, os.getpid()))
3222 w.close()
3223
3224 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003225 from multiprocessing.connection import wait
3226 readers = []
3227 procs = []
3228 messages = []
3229
3230 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003231 r, w = multiprocessing.Pipe(duplex=False)
3232 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003233 p.daemon = True
3234 p.start()
3235 w.close()
3236 readers.append(r)
3237 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003238 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003239
3240 while readers:
3241 for r in wait(readers):
3242 try:
3243 msg = r.recv()
3244 except EOFError:
3245 readers.remove(r)
3246 r.close()
3247 else:
3248 messages.append(msg)
3249
3250 messages.sort()
3251 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3252 self.assertEqual(messages, expected)
3253
3254 @classmethod
3255 def _child_test_wait_socket(cls, address, slow):
3256 s = socket.socket()
3257 s.connect(address)
3258 for i in range(10):
3259 if slow:
3260 time.sleep(random.random()*0.1)
3261 s.sendall(('%s\n' % i).encode('ascii'))
3262 s.close()
3263
3264 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003265 from multiprocessing.connection import wait
3266 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003267 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01003268 l.listen()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003269 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003270 readers = []
3271 procs = []
3272 dic = {}
3273
3274 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003275 p = multiprocessing.Process(target=self._child_test_wait_socket,
3276 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003277 p.daemon = True
3278 p.start()
3279 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003280 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003281
3282 for i in range(4):
3283 r, _ = l.accept()
3284 readers.append(r)
3285 dic[r] = []
3286 l.close()
3287
3288 while readers:
3289 for r in wait(readers):
3290 msg = r.recv(32)
3291 if not msg:
3292 readers.remove(r)
3293 r.close()
3294 else:
3295 dic[r].append(msg)
3296
3297 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3298 for v in dic.values():
3299 self.assertEqual(b''.join(v), expected)
3300
3301 def test_wait_slow(self):
3302 self.test_wait(True)
3303
3304 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003305 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003306
3307 def test_wait_timeout(self):
3308 from multiprocessing.connection import wait
3309
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003310 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003311 a, b = multiprocessing.Pipe()
3312
3313 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003314 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003315 delta = time.time() - start
3316
3317 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003318 self.assertLess(delta, expected * 2)
3319 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003320
3321 b.send(None)
3322
3323 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003324 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003325 delta = time.time() - start
3326
3327 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003328 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003329
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003330 @classmethod
3331 def signal_and_sleep(cls, sem, period):
3332 sem.release()
3333 time.sleep(period)
3334
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003335 def test_wait_integer(self):
3336 from multiprocessing.connection import wait
3337
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003338 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01003339 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003340 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003341 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003342 p = multiprocessing.Process(target=self.signal_and_sleep,
3343 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003344
3345 p.start()
3346 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003347 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003348
3349 start = time.time()
3350 res = wait([a, p.sentinel, b], expected + 20)
3351 delta = time.time() - start
3352
3353 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003354 self.assertLess(delta, expected + 2)
3355 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003356
3357 a.send(None)
3358
3359 start = time.time()
3360 res = wait([a, p.sentinel, b], 20)
3361 delta = time.time() - start
3362
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003363 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003364 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003365
3366 b.send(None)
3367
3368 start = time.time()
3369 res = wait([a, p.sentinel, b], 20)
3370 delta = time.time() - start
3371
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003372 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003373 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003374
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003375 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003376 p.join()
3377
Richard Oudkerk59d54042012-05-10 16:11:12 +01003378 def test_neg_timeout(self):
3379 from multiprocessing.connection import wait
3380 a, b = multiprocessing.Pipe()
3381 t = time.time()
3382 res = wait([a], timeout=-1)
3383 t = time.time() - t
3384 self.assertEqual(res, [])
3385 self.assertLess(t, 1)
3386 a.close()
3387 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003388
Antoine Pitrou709176f2012-04-01 17:19:09 +02003389#
3390# Issue 14151: Test invalid family on invalid environment
3391#
3392
3393class TestInvalidFamily(unittest.TestCase):
3394
3395 @unittest.skipIf(WIN32, "skipped on Windows")
3396 def test_invalid_family(self):
3397 with self.assertRaises(ValueError):
3398 multiprocessing.connection.Listener(r'\\.\test')
3399
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003400 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3401 def test_invalid_family_win32(self):
3402 with self.assertRaises(ValueError):
3403 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003404
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003405#
3406# Issue 12098: check sys.flags of child matches that for parent
3407#
3408
3409class TestFlags(unittest.TestCase):
3410 @classmethod
3411 def run_in_grandchild(cls, conn):
3412 conn.send(tuple(sys.flags))
3413
3414 @classmethod
3415 def run_in_child(cls):
3416 import json
3417 r, w = multiprocessing.Pipe(duplex=False)
3418 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3419 p.start()
3420 grandchild_flags = r.recv()
3421 p.join()
3422 r.close()
3423 w.close()
3424 flags = (tuple(sys.flags), grandchild_flags)
3425 print(json.dumps(flags))
3426
3427 def test_flags(self):
3428 import json, subprocess
3429 # start child process using unusual flags
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003430 prog = ('from test._test_multiprocessing import TestFlags; ' +
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003431 'TestFlags.run_in_child()')
3432 data = subprocess.check_output(
3433 [sys.executable, '-E', '-S', '-O', '-c', prog])
3434 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3435 self.assertEqual(child_flags, grandchild_flags)
3436
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003437#
3438# Test interaction with socket timeouts - see Issue #6056
3439#
3440
3441class TestTimeouts(unittest.TestCase):
3442 @classmethod
3443 def _test_timeout(cls, child, address):
3444 time.sleep(1)
3445 child.send(123)
3446 child.close()
3447 conn = multiprocessing.connection.Client(address)
3448 conn.send(456)
3449 conn.close()
3450
3451 def test_timeout(self):
3452 old_timeout = socket.getdefaulttimeout()
3453 try:
3454 socket.setdefaulttimeout(0.1)
3455 parent, child = multiprocessing.Pipe(duplex=True)
3456 l = multiprocessing.connection.Listener(family='AF_INET')
3457 p = multiprocessing.Process(target=self._test_timeout,
3458 args=(child, l.address))
3459 p.start()
3460 child.close()
3461 self.assertEqual(parent.recv(), 123)
3462 parent.close()
3463 conn = l.accept()
3464 self.assertEqual(conn.recv(), 456)
3465 conn.close()
3466 l.close()
3467 p.join(10)
3468 finally:
3469 socket.setdefaulttimeout(old_timeout)
3470
Richard Oudkerke88a2442012-08-14 11:41:32 +01003471#
3472# Test what happens with no "if __name__ == '__main__'"
3473#
3474
3475class TestNoForkBomb(unittest.TestCase):
3476 def test_noforkbomb(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003477 sm = multiprocessing.get_start_method()
Richard Oudkerke88a2442012-08-14 11:41:32 +01003478 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003479 if sm != 'fork':
3480 rc, out, err = test.script_helper.assert_python_failure(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02003481 self.assertEqual(out, b'')
3482 self.assertIn(b'RuntimeError', err)
Richard Oudkerke88a2442012-08-14 11:41:32 +01003483 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003484 rc, out, err = test.script_helper.assert_python_ok(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02003485 self.assertEqual(out.rstrip(), b'123')
3486 self.assertEqual(err, b'')
Richard Oudkerke88a2442012-08-14 11:41:32 +01003487
3488#
Richard Oudkerk409c3132013-04-17 20:58:00 +01003489# Issue #17555: ForkAwareThreadLock
3490#
3491
3492class TestForkAwareThreadLock(unittest.TestCase):
3493 # We recurisvely start processes. Issue #17555 meant that the
3494 # after fork registry would get duplicate entries for the same
3495 # lock. The size of the registry at generation n was ~2**n.
3496
3497 @classmethod
3498 def child(cls, n, conn):
3499 if n > 1:
3500 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3501 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003502 conn.close()
3503 p.join(timeout=5)
Richard Oudkerk409c3132013-04-17 20:58:00 +01003504 else:
3505 conn.send(len(util._afterfork_registry))
3506 conn.close()
3507
3508 def test_lock(self):
3509 r, w = multiprocessing.Pipe(False)
3510 l = util.ForkAwareThreadLock()
3511 old_size = len(util._afterfork_registry)
3512 p = multiprocessing.Process(target=self.child, args=(5, w))
3513 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003514 w.close()
Richard Oudkerk409c3132013-04-17 20:58:00 +01003515 new_size = r.recv()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003516 p.join(timeout=5)
Richard Oudkerk409c3132013-04-17 20:58:00 +01003517 self.assertLessEqual(new_size, old_size)
3518
3519#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003520# Check that non-forked child processes do not inherit unneeded fds/handles
3521#
3522
3523class TestCloseFds(unittest.TestCase):
3524
3525 def get_high_socket_fd(self):
3526 if WIN32:
3527 # The child process will not have any socket handles, so
3528 # calling socket.fromfd() should produce WSAENOTSOCK even
3529 # if there is a handle of the same number.
3530 return socket.socket().detach()
3531 else:
3532 # We want to produce a socket with an fd high enough that a
3533 # freshly created child process will not have any fds as high.
3534 fd = socket.socket().detach()
3535 to_close = []
3536 while fd < 50:
3537 to_close.append(fd)
3538 fd = os.dup(fd)
3539 for x in to_close:
3540 os.close(x)
3541 return fd
3542
3543 def close(self, fd):
3544 if WIN32:
3545 socket.socket(fileno=fd).close()
3546 else:
3547 os.close(fd)
3548
3549 @classmethod
3550 def _test_closefds(cls, conn, fd):
3551 try:
3552 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
3553 except Exception as e:
3554 conn.send(e)
3555 else:
3556 s.close()
3557 conn.send(None)
3558
3559 def test_closefd(self):
3560 if not HAS_REDUCTION:
3561 raise unittest.SkipTest('requires fd pickling')
3562
3563 reader, writer = multiprocessing.Pipe()
3564 fd = self.get_high_socket_fd()
3565 try:
3566 p = multiprocessing.Process(target=self._test_closefds,
3567 args=(writer, fd))
3568 p.start()
3569 writer.close()
3570 e = reader.recv()
3571 p.join(timeout=5)
3572 finally:
3573 self.close(fd)
3574 writer.close()
3575 reader.close()
3576
3577 if multiprocessing.get_start_method() == 'fork':
3578 self.assertIs(e, None)
3579 else:
3580 WSAENOTSOCK = 10038
3581 self.assertIsInstance(e, OSError)
3582 self.assertTrue(e.errno == errno.EBADF or
3583 e.winerror == WSAENOTSOCK, e)
3584
3585#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003586# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3587#
3588
3589class TestIgnoreEINTR(unittest.TestCase):
3590
3591 @classmethod
3592 def _test_ignore(cls, conn):
3593 def handler(signum, frame):
3594 pass
3595 signal.signal(signal.SIGUSR1, handler)
3596 conn.send('ready')
3597 x = conn.recv()
3598 conn.send(x)
3599 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
3600
3601 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3602 def test_ignore(self):
3603 conn, child_conn = multiprocessing.Pipe()
3604 try:
3605 p = multiprocessing.Process(target=self._test_ignore,
3606 args=(child_conn,))
3607 p.daemon = True
3608 p.start()
3609 child_conn.close()
3610 self.assertEqual(conn.recv(), 'ready')
3611 time.sleep(0.1)
3612 os.kill(p.pid, signal.SIGUSR1)
3613 time.sleep(0.1)
3614 conn.send(1234)
3615 self.assertEqual(conn.recv(), 1234)
3616 time.sleep(0.1)
3617 os.kill(p.pid, signal.SIGUSR1)
3618 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3619 time.sleep(0.1)
3620 p.join()
3621 finally:
3622 conn.close()
3623
3624 @classmethod
3625 def _test_ignore_listener(cls, conn):
3626 def handler(signum, frame):
3627 pass
3628 signal.signal(signal.SIGUSR1, handler)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003629 with multiprocessing.connection.Listener() as l:
3630 conn.send(l.address)
3631 a = l.accept()
3632 a.send('welcome')
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003633
3634 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3635 def test_ignore_listener(self):
3636 conn, child_conn = multiprocessing.Pipe()
3637 try:
3638 p = multiprocessing.Process(target=self._test_ignore_listener,
3639 args=(child_conn,))
3640 p.daemon = True
3641 p.start()
3642 child_conn.close()
3643 address = conn.recv()
3644 time.sleep(0.1)
3645 os.kill(p.pid, signal.SIGUSR1)
3646 time.sleep(0.1)
3647 client = multiprocessing.connection.Client(address)
3648 self.assertEqual(client.recv(), 'welcome')
3649 p.join()
3650 finally:
3651 conn.close()
3652
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003653class TestStartMethod(unittest.TestCase):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003654 @classmethod
3655 def _check_context(cls, conn):
3656 conn.send(multiprocessing.get_start_method())
3657
3658 def check_context(self, ctx):
3659 r, w = ctx.Pipe(duplex=False)
3660 p = ctx.Process(target=self._check_context, args=(w,))
3661 p.start()
3662 w.close()
3663 child_method = r.recv()
3664 r.close()
3665 p.join()
3666 self.assertEqual(child_method, ctx.get_start_method())
3667
3668 def test_context(self):
3669 for method in ('fork', 'spawn', 'forkserver'):
3670 try:
3671 ctx = multiprocessing.get_context(method)
3672 except ValueError:
3673 continue
3674 self.assertEqual(ctx.get_start_method(), method)
3675 self.assertIs(ctx.get_context(), ctx)
3676 self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
3677 self.assertRaises(ValueError, ctx.set_start_method, None)
3678 self.check_context(ctx)
3679
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003680 def test_set_get(self):
3681 multiprocessing.set_forkserver_preload(PRELOAD)
3682 count = 0
3683 old_method = multiprocessing.get_start_method()
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003684 try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003685 for method in ('fork', 'spawn', 'forkserver'):
3686 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003687 multiprocessing.set_start_method(method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003688 except ValueError:
3689 continue
3690 self.assertEqual(multiprocessing.get_start_method(), method)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003691 ctx = multiprocessing.get_context()
3692 self.assertEqual(ctx.get_start_method(), method)
3693 self.assertTrue(type(ctx).__name__.lower().startswith(method))
3694 self.assertTrue(
3695 ctx.Process.__name__.lower().startswith(method))
3696 self.check_context(multiprocessing)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003697 count += 1
3698 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003699 multiprocessing.set_start_method(old_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003700 self.assertGreaterEqual(count, 1)
3701
3702 def test_get_all(self):
3703 methods = multiprocessing.get_all_start_methods()
3704 if sys.platform == 'win32':
3705 self.assertEqual(methods, ['spawn'])
3706 else:
3707 self.assertTrue(methods == ['fork', 'spawn'] or
3708 methods == ['fork', 'spawn', 'forkserver'])
3709
3710#
3711# Check that killing process does not leak named semaphores
3712#
3713
3714@unittest.skipIf(sys.platform == "win32",
3715 "test semantics don't make sense on Windows")
3716class TestSemaphoreTracker(unittest.TestCase):
3717 def test_semaphore_tracker(self):
3718 import subprocess
3719 cmd = '''if 1:
3720 import multiprocessing as mp, time, os
3721 mp.set_start_method("spawn")
3722 lock1 = mp.Lock()
3723 lock2 = mp.Lock()
3724 os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
3725 os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
3726 time.sleep(10)
3727 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003728 r, w = os.pipe()
3729 p = subprocess.Popen([sys.executable,
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003730 '-c', cmd % (w, w)],
Richard Oudkerk67e51982013-08-22 23:37:23 +01003731 pass_fds=[w],
3732 stderr=subprocess.PIPE)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003733 os.close(w)
3734 with open(r, 'rb', closefd=True) as f:
3735 name1 = f.readline().rstrip().decode('ascii')
3736 name2 = f.readline().rstrip().decode('ascii')
3737 _multiprocessing.sem_unlink(name1)
3738 p.terminate()
3739 p.wait()
Richard Oudkerk42a526c2014-02-21 22:29:58 +00003740 time.sleep(2.0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003741 with self.assertRaises(OSError) as ctx:
3742 _multiprocessing.sem_unlink(name2)
3743 # docs say it should be ENOENT, but OSX seems to give EINVAL
3744 self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
Richard Oudkerk67e51982013-08-22 23:37:23 +01003745 err = p.stderr.read().decode('utf-8')
3746 p.stderr.close()
3747 expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
3748 self.assertRegex(err, expected)
3749 self.assertRegex(err, 'semaphore_tracker: %r: \[Errno' % name1)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003750
3751#
3752# Mixins
3753#
3754
3755class ProcessesMixin(object):
3756 TYPE = 'processes'
3757 Process = multiprocessing.Process
3758 connection = multiprocessing.connection
3759 current_process = staticmethod(multiprocessing.current_process)
3760 active_children = staticmethod(multiprocessing.active_children)
3761 Pool = staticmethod(multiprocessing.Pool)
3762 Pipe = staticmethod(multiprocessing.Pipe)
3763 Queue = staticmethod(multiprocessing.Queue)
3764 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
3765 Lock = staticmethod(multiprocessing.Lock)
3766 RLock = staticmethod(multiprocessing.RLock)
3767 Semaphore = staticmethod(multiprocessing.Semaphore)
3768 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
3769 Condition = staticmethod(multiprocessing.Condition)
3770 Event = staticmethod(multiprocessing.Event)
3771 Barrier = staticmethod(multiprocessing.Barrier)
3772 Value = staticmethod(multiprocessing.Value)
3773 Array = staticmethod(multiprocessing.Array)
3774 RawValue = staticmethod(multiprocessing.RawValue)
3775 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003776
Benjamin Petersone711caf2008-06-11 16:44:04 +00003777
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003778class ManagerMixin(object):
3779 TYPE = 'manager'
3780 Process = multiprocessing.Process
3781 Queue = property(operator.attrgetter('manager.Queue'))
3782 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3783 Lock = property(operator.attrgetter('manager.Lock'))
3784 RLock = property(operator.attrgetter('manager.RLock'))
3785 Semaphore = property(operator.attrgetter('manager.Semaphore'))
3786 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3787 Condition = property(operator.attrgetter('manager.Condition'))
3788 Event = property(operator.attrgetter('manager.Event'))
3789 Barrier = property(operator.attrgetter('manager.Barrier'))
3790 Value = property(operator.attrgetter('manager.Value'))
3791 Array = property(operator.attrgetter('manager.Array'))
3792 list = property(operator.attrgetter('manager.list'))
3793 dict = property(operator.attrgetter('manager.dict'))
3794 Namespace = property(operator.attrgetter('manager.Namespace'))
3795
3796 @classmethod
3797 def Pool(cls, *args, **kwds):
3798 return cls.manager.Pool(*args, **kwds)
3799
3800 @classmethod
3801 def setUpClass(cls):
3802 cls.manager = multiprocessing.Manager()
3803
3804 @classmethod
3805 def tearDownClass(cls):
3806 # only the manager process should be returned by active_children()
3807 # but this can take a bit on slow machines, so wait a few seconds
3808 # if there are other children too (see #17395)
3809 t = 0.01
3810 while len(multiprocessing.active_children()) > 1 and t < 5:
3811 time.sleep(t)
3812 t *= 2
3813 gc.collect() # do garbage collection
3814 if cls.manager._number_of_objects() != 0:
3815 # This is not really an error since some tests do not
3816 # ensure that all processes which hold a reference to a
3817 # managed object have been joined.
3818 print('Shared objects which still exist at manager shutdown:')
3819 print(cls.manager._debug_info())
3820 cls.manager.shutdown()
3821 cls.manager.join()
3822 cls.manager = None
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003823
3824
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003825class ThreadsMixin(object):
3826 TYPE = 'threads'
3827 Process = multiprocessing.dummy.Process
3828 connection = multiprocessing.dummy.connection
3829 current_process = staticmethod(multiprocessing.dummy.current_process)
3830 active_children = staticmethod(multiprocessing.dummy.active_children)
3831 Pool = staticmethod(multiprocessing.Pool)
3832 Pipe = staticmethod(multiprocessing.dummy.Pipe)
3833 Queue = staticmethod(multiprocessing.dummy.Queue)
3834 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3835 Lock = staticmethod(multiprocessing.dummy.Lock)
3836 RLock = staticmethod(multiprocessing.dummy.RLock)
3837 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3838 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3839 Condition = staticmethod(multiprocessing.dummy.Condition)
3840 Event = staticmethod(multiprocessing.dummy.Event)
3841 Barrier = staticmethod(multiprocessing.dummy.Barrier)
3842 Value = staticmethod(multiprocessing.dummy.Value)
3843 Array = staticmethod(multiprocessing.dummy.Array)
3844
3845#
3846# Functions used to create test cases from the base ones in this module
3847#
3848
3849def install_tests_in_module_dict(remote_globs, start_method):
3850 __module__ = remote_globs['__name__']
3851 local_globs = globals()
3852 ALL_TYPES = {'processes', 'threads', 'manager'}
3853
3854 for name, base in local_globs.items():
3855 if not isinstance(base, type):
3856 continue
3857 if issubclass(base, BaseTestCase):
3858 if base is BaseTestCase:
3859 continue
3860 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
3861 for type_ in base.ALLOWED_TYPES:
3862 newname = 'With' + type_.capitalize() + name[1:]
3863 Mixin = local_globs[type_.capitalize() + 'Mixin']
3864 class Temp(base, Mixin, unittest.TestCase):
3865 pass
3866 Temp.__name__ = Temp.__qualname__ = newname
3867 Temp.__module__ = __module__
3868 remote_globs[newname] = Temp
3869 elif issubclass(base, unittest.TestCase):
3870 class Temp(base, object):
3871 pass
3872 Temp.__name__ = Temp.__qualname__ = name
3873 Temp.__module__ = __module__
3874 remote_globs[name] = Temp
3875
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01003876 dangling = [None, None]
3877 old_start_method = [None]
3878
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003879 def setUpModule():
3880 multiprocessing.set_forkserver_preload(PRELOAD)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01003881 multiprocessing.process._cleanup()
3882 dangling[0] = multiprocessing.process._dangling.copy()
3883 dangling[1] = threading._dangling.copy()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003884 old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003885 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003886 multiprocessing.set_start_method(start_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003887 except ValueError:
3888 raise unittest.SkipTest(start_method +
3889 ' start method not supported')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003890
3891 if sys.platform.startswith("linux"):
3892 try:
3893 lock = multiprocessing.RLock()
3894 except OSError:
3895 raise unittest.SkipTest("OSError raises on RLock creation, "
3896 "see issue 3111!")
3897 check_enough_semaphores()
3898 util.get_temp_dir() # creates temp directory
3899 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3900
3901 def tearDownModule():
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003902 multiprocessing.set_start_method(old_start_method[0], force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003903 # pause a bit so we don't get warning about dangling threads/processes
3904 time.sleep(0.5)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01003905 multiprocessing.process._cleanup()
3906 gc.collect()
3907 tmp = set(multiprocessing.process._dangling) - set(dangling[0])
3908 if tmp:
3909 print('Dangling processes:', tmp, file=sys.stderr)
3910 del tmp
3911 tmp = set(threading._dangling) - set(dangling[1])
3912 if tmp:
3913 print('Dangling threads:', tmp, file=sys.stderr)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003914
3915 remote_globs['setUpModule'] = setUpModule
3916 remote_globs['tearDownModule'] = tearDownModule