blob: 78208c30499c0094938b0a4afb7b7548953b7f07 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00006import queue as pyqueue
7import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00008import io
Antoine Pitroude911b22011-12-21 11:03:24 +01009import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000010import sys
11import os
12import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020013import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000014import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010019import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010020import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010022import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000023
Benjamin Petersone5384b02008-10-04 22:00:42 +000024
R. David Murraya21e4ca2009-03-31 23:16:50 +000025# Skip tests if _multiprocessing wasn't built.
26_multiprocessing = test.support.import_module('_multiprocessing')
27# Skip tests if sem_open implementation is broken.
28test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000029# import threading after _multiprocessing to raise a more revelant error
30# message: "No module named _multiprocessing". _multiprocessing is not compiled
31# without thread support.
32import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000033
Benjamin Petersone711caf2008-06-11 16:44:04 +000034import multiprocessing.dummy
35import multiprocessing.connection
36import multiprocessing.managers
37import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
Charles-François Natalibc8f0822011-09-20 20:36:51 +020040from multiprocessing import util
41
42try:
43 from multiprocessing import reduction
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010044 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
Charles-François Natalibc8f0822011-09-20 20:36:51 +020045except ImportError:
46 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
Brian Curtinafa88b52010-10-07 01:12:19 +000048try:
49 from multiprocessing.sharedctypes import Value, copy
50 HAS_SHAREDCTYPES = True
51except ImportError:
52 HAS_SHAREDCTYPES = False
53
Antoine Pitroubcb39d42011-08-23 19:46:22 +020054try:
55 import msvcrt
56except ImportError:
57 msvcrt = None
58
Benjamin Petersone711caf2008-06-11 16:44:04 +000059#
60#
61#
62
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000063def latin(s):
64 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000065
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
67# Constants
68#
69
70LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000071#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000072
73DELTA = 0.1
74CHECK_TIMINGS = False # making true makes tests take a lot longer
75 # and can sometimes cause some non-serious
76 # failures because some calls block a bit
77 # longer than expected
78if CHECK_TIMINGS:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
80else:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
82
83HAVE_GETVALUE = not getattr(_multiprocessing,
84 'HAVE_BROKEN_SEM_GETVALUE', False)
85
Jesse Noller6214edd2009-01-19 16:23:53 +000086WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020087
Richard Oudkerk59d54042012-05-10 16:11:12 +010088from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090def wait_for_handle(handle, timeout):
91 if timeout is not None and timeout < 0.0:
92 timeout = None
93 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000094
Antoine Pitroubcb39d42011-08-23 19:46:22 +020095try:
96 MAXFD = os.sysconf("SC_OPEN_MAX")
97except:
98 MAXFD = 256
99
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100100# To speed up tests when using the forkserver, we can preload these:
101PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
102
Benjamin Petersone711caf2008-06-11 16:44:04 +0000103#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000104# Some tests require ctypes
105#
106
107try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000108 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000109except ImportError:
110 Structure = object
111 c_int = c_double = None
112
Charles-François Natali221ef672011-11-22 18:55:22 +0100113
114def check_enough_semaphores():
115 """Check that the system supports enough semaphores to run the test."""
116 # minimum number of semaphores available according to POSIX
117 nsems_min = 256
118 try:
119 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
120 except (AttributeError, ValueError):
121 # sysconf not available or setting not available
122 return
123 if nsems == -1 or nsems >= nsems_min:
124 return
125 raise unittest.SkipTest("The OS doesn't support enough semaphores "
126 "to run the test (required: %d)." % nsems_min)
127
128
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000129#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000130# Creates a wrapper for a function which records the time it takes to finish
131#
132
133class TimingWrapper(object):
134
135 def __init__(self, func):
136 self.func = func
137 self.elapsed = None
138
139 def __call__(self, *args, **kwds):
140 t = time.time()
141 try:
142 return self.func(*args, **kwds)
143 finally:
144 self.elapsed = time.time() - t
145
146#
147# Base class for test cases
148#
149
150class BaseTestCase(object):
151
152 ALLOWED_TYPES = ('processes', 'manager', 'threads')
153
154 def assertTimingAlmostEqual(self, a, b):
155 if CHECK_TIMINGS:
156 self.assertAlmostEqual(a, b, 1)
157
158 def assertReturnsIfImplemented(self, value, func, *args):
159 try:
160 res = func(*args)
161 except NotImplementedError:
162 pass
163 else:
164 return self.assertEqual(value, res)
165
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000166 # For the sanity of Windows users, rather than crashing or freezing in
167 # multiple ways.
168 def __reduce__(self, *args):
169 raise NotImplementedError("shouldn't try to pickle a test case")
170
171 __reduce_ex__ = __reduce__
172
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173#
174# Return the value of a semaphore
175#
176
177def get_value(self):
178 try:
179 return self.get_value()
180 except AttributeError:
181 try:
182 return self._Semaphore__value
183 except AttributeError:
184 try:
185 return self._value
186 except AttributeError:
187 raise NotImplementedError
188
189#
190# Testcases
191#
192
193class _TestProcess(BaseTestCase):
194
195 ALLOWED_TYPES = ('processes', 'threads')
196
197 def test_current(self):
198 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600199 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000200
201 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203
204 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000205 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000206 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000208 self.assertEqual(current.ident, os.getpid())
209 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000210
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000211 def test_daemon_argument(self):
212 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600213 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000214
215 # By default uses the current process's daemon flag.
216 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000217 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000218 proc1 = self.Process(target=self._test, daemon=True)
219 self.assertTrue(proc1.daemon)
220 proc2 = self.Process(target=self._test, daemon=False)
221 self.assertFalse(proc2.daemon)
222
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000223 @classmethod
224 def _test(cls, q, *args, **kwds):
225 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000226 q.put(args)
227 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000228 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000229 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231 q.put(current.pid)
232
233 def test_process(self):
234 q = self.Queue(1)
235 e = self.Event()
236 args = (q, 1, 2)
237 kwargs = {'hello':23, 'bye':2.54}
238 name = 'SomeProcess'
239 p = self.Process(
240 target=self._test, args=args, kwargs=kwargs, name=name
241 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000242 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000243 current = self.current_process()
244
245 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000246 self.assertEqual(p.authkey, current.authkey)
247 self.assertEqual(p.is_alive(), False)
248 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000249 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000251 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000252
253 p.start()
254
Ezio Melottib3aedd42010-11-20 19:04:17 +0000255 self.assertEqual(p.exitcode, None)
256 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000257 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
Ezio Melottib3aedd42010-11-20 19:04:17 +0000259 self.assertEqual(q.get(), args[1:])
260 self.assertEqual(q.get(), kwargs)
261 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000263 self.assertEqual(q.get(), current.authkey)
264 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265
266 p.join()
267
Ezio Melottib3aedd42010-11-20 19:04:17 +0000268 self.assertEqual(p.exitcode, 0)
269 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000270 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000272 @classmethod
273 def _test_terminate(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100274 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275
276 def test_terminate(self):
277 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600278 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279
280 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000281 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282 p.start()
283
284 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000285 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000286 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000287
Richard Oudkerk59d54042012-05-10 16:11:12 +0100288 join = TimingWrapper(p.join)
289
290 self.assertEqual(join(0), None)
291 self.assertTimingAlmostEqual(join.elapsed, 0.0)
292 self.assertEqual(p.is_alive(), True)
293
294 self.assertEqual(join(-1), None)
295 self.assertTimingAlmostEqual(join.elapsed, 0.0)
296 self.assertEqual(p.is_alive(), True)
297
Richard Oudkerk26f92682013-10-17 13:56:18 +0100298 # XXX maybe terminating too soon causes the problems on Gentoo...
299 time.sleep(1)
300
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 p.terminate()
302
Richard Oudkerk4f350792013-10-13 00:49:27 +0100303 if hasattr(signal, 'alarm'):
Richard Oudkerkd44500a2013-10-17 10:38:37 +0100304 # On the Gentoo buildbot waitpid() often seems to block forever.
Richard Oudkerk26f92682013-10-17 13:56:18 +0100305 # We use alarm() to interrupt it if it blocks for too long.
Richard Oudkerk4f350792013-10-13 00:49:27 +0100306 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100307 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100308 old_handler = signal.signal(signal.SIGALRM, handler)
309 try:
310 signal.alarm(10)
311 self.assertEqual(join(), None)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100312 finally:
Richard Oudkerk1e2f67c2013-10-17 14:24:06 +0100313 signal.alarm(0)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100314 signal.signal(signal.SIGALRM, old_handler)
315 else:
316 self.assertEqual(join(), None)
317
Benjamin Petersone711caf2008-06-11 16:44:04 +0000318 self.assertTimingAlmostEqual(join.elapsed, 0.0)
319
320 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000321 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322
323 p.join()
324
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000325 # XXX sometimes get p.exitcode == 0 on Windows ...
326 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 def test_cpu_count(self):
329 try:
330 cpus = multiprocessing.cpu_count()
331 except NotImplementedError:
332 cpus = 1
333 self.assertTrue(type(cpus) is int)
334 self.assertTrue(cpus >= 1)
335
336 def test_active_children(self):
337 self.assertEqual(type(self.active_children()), list)
338
339 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000340 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000341
Jesus Cea94f964f2011-09-09 20:26:57 +0200342 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000344 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345
346 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000347 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000348
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000349 @classmethod
350 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000351 wconn.send(id)
352 if len(id) < 2:
353 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000354 p = cls.Process(
355 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000356 )
357 p.start()
358 p.join()
359
360 def test_recursion(self):
361 rconn, wconn = self.Pipe(duplex=False)
362 self._test_recursion(wconn, [])
363
364 time.sleep(DELTA)
365 result = []
366 while rconn.poll():
367 result.append(rconn.recv())
368
369 expected = [
370 [],
371 [0],
372 [0, 0],
373 [0, 1],
374 [1],
375 [1, 0],
376 [1, 1]
377 ]
378 self.assertEqual(result, expected)
379
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200380 @classmethod
381 def _test_sentinel(cls, event):
382 event.wait(10.0)
383
384 def test_sentinel(self):
385 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600386 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200387 event = self.Event()
388 p = self.Process(target=self._test_sentinel, args=(event,))
389 with self.assertRaises(ValueError):
390 p.sentinel
391 p.start()
392 self.addCleanup(p.join)
393 sentinel = p.sentinel
394 self.assertIsInstance(sentinel, int)
395 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
396 event.set()
397 p.join()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100398 self.assertTrue(wait_for_handle(sentinel, timeout=1))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200399
Benjamin Petersone711caf2008-06-11 16:44:04 +0000400#
401#
402#
403
404class _UpperCaser(multiprocessing.Process):
405
406 def __init__(self):
407 multiprocessing.Process.__init__(self)
408 self.child_conn, self.parent_conn = multiprocessing.Pipe()
409
410 def run(self):
411 self.parent_conn.close()
412 for s in iter(self.child_conn.recv, None):
413 self.child_conn.send(s.upper())
414 self.child_conn.close()
415
416 def submit(self, s):
417 assert type(s) is str
418 self.parent_conn.send(s)
419 return self.parent_conn.recv()
420
421 def stop(self):
422 self.parent_conn.send(None)
423 self.parent_conn.close()
424 self.child_conn.close()
425
426class _TestSubclassingProcess(BaseTestCase):
427
428 ALLOWED_TYPES = ('processes',)
429
430 def test_subclassing(self):
431 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200432 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000433 uppercaser.start()
434 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
435 self.assertEqual(uppercaser.submit('world'), 'WORLD')
436 uppercaser.stop()
437 uppercaser.join()
438
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100439 def test_stderr_flush(self):
440 # sys.stderr is flushed at process shutdown (issue #13812)
441 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600442 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100443
444 testfn = test.support.TESTFN
445 self.addCleanup(test.support.unlink, testfn)
446 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
447 proc.start()
448 proc.join()
449 with open(testfn, 'r') as f:
450 err = f.read()
451 # The whole traceback was printed
452 self.assertIn("ZeroDivisionError", err)
453 self.assertIn("test_multiprocessing.py", err)
454 self.assertIn("1/0 # MARKER", err)
455
456 @classmethod
457 def _test_stderr_flush(cls, testfn):
458 sys.stderr = open(testfn, 'w')
459 1/0 # MARKER
460
461
Richard Oudkerk29471de2012-06-06 19:04:57 +0100462 @classmethod
463 def _test_sys_exit(cls, reason, testfn):
464 sys.stderr = open(testfn, 'w')
465 sys.exit(reason)
466
467 def test_sys_exit(self):
468 # See Issue 13854
469 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600470 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk29471de2012-06-06 19:04:57 +0100471
472 testfn = test.support.TESTFN
473 self.addCleanup(test.support.unlink, testfn)
474
Richard Oudkerk8731d7b2013-11-17 17:24:11 +0000475 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100476 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
477 p.daemon = True
478 p.start()
479 p.join(5)
480 self.assertEqual(p.exitcode, code)
481
482 with open(testfn, 'r') as f:
483 self.assertEqual(f.read().rstrip(), str(reason))
484
485 for reason in (True, False, 8):
486 p = self.Process(target=sys.exit, args=(reason,))
487 p.daemon = True
488 p.start()
489 p.join(5)
490 self.assertEqual(p.exitcode, reason)
491
Benjamin Petersone711caf2008-06-11 16:44:04 +0000492#
493#
494#
495
496def queue_empty(q):
497 if hasattr(q, 'empty'):
498 return q.empty()
499 else:
500 return q.qsize() == 0
501
502def queue_full(q, maxsize):
503 if hasattr(q, 'full'):
504 return q.full()
505 else:
506 return q.qsize() == maxsize
507
508
509class _TestQueue(BaseTestCase):
510
511
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000512 @classmethod
513 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000514 child_can_start.wait()
515 for i in range(6):
516 queue.get()
517 parent_can_continue.set()
518
519 def test_put(self):
520 MAXSIZE = 6
521 queue = self.Queue(maxsize=MAXSIZE)
522 child_can_start = self.Event()
523 parent_can_continue = self.Event()
524
525 proc = self.Process(
526 target=self._test_put,
527 args=(queue, child_can_start, parent_can_continue)
528 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000529 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 proc.start()
531
532 self.assertEqual(queue_empty(queue), True)
533 self.assertEqual(queue_full(queue, MAXSIZE), False)
534
535 queue.put(1)
536 queue.put(2, True)
537 queue.put(3, True, None)
538 queue.put(4, False)
539 queue.put(5, False, None)
540 queue.put_nowait(6)
541
542 # the values may be in buffer but not yet in pipe so sleep a bit
543 time.sleep(DELTA)
544
545 self.assertEqual(queue_empty(queue), False)
546 self.assertEqual(queue_full(queue, MAXSIZE), True)
547
548 put = TimingWrapper(queue.put)
549 put_nowait = TimingWrapper(queue.put_nowait)
550
551 self.assertRaises(pyqueue.Full, put, 7, False)
552 self.assertTimingAlmostEqual(put.elapsed, 0)
553
554 self.assertRaises(pyqueue.Full, put, 7, False, None)
555 self.assertTimingAlmostEqual(put.elapsed, 0)
556
557 self.assertRaises(pyqueue.Full, put_nowait, 7)
558 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
559
560 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
561 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
562
563 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
564 self.assertTimingAlmostEqual(put.elapsed, 0)
565
566 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
567 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
568
569 child_can_start.set()
570 parent_can_continue.wait()
571
572 self.assertEqual(queue_empty(queue), True)
573 self.assertEqual(queue_full(queue, MAXSIZE), False)
574
575 proc.join()
576
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000577 @classmethod
578 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000579 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000580 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000581 queue.put(2)
582 queue.put(3)
583 queue.put(4)
584 queue.put(5)
585 parent_can_continue.set()
586
587 def test_get(self):
588 queue = self.Queue()
589 child_can_start = self.Event()
590 parent_can_continue = self.Event()
591
592 proc = self.Process(
593 target=self._test_get,
594 args=(queue, child_can_start, parent_can_continue)
595 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000596 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000597 proc.start()
598
599 self.assertEqual(queue_empty(queue), True)
600
601 child_can_start.set()
602 parent_can_continue.wait()
603
604 time.sleep(DELTA)
605 self.assertEqual(queue_empty(queue), False)
606
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000607 # Hangs unexpectedly, remove for now
608 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000609 self.assertEqual(queue.get(True, None), 2)
610 self.assertEqual(queue.get(True), 3)
611 self.assertEqual(queue.get(timeout=1), 4)
612 self.assertEqual(queue.get_nowait(), 5)
613
614 self.assertEqual(queue_empty(queue), True)
615
616 get = TimingWrapper(queue.get)
617 get_nowait = TimingWrapper(queue.get_nowait)
618
619 self.assertRaises(pyqueue.Empty, get, False)
620 self.assertTimingAlmostEqual(get.elapsed, 0)
621
622 self.assertRaises(pyqueue.Empty, get, False, None)
623 self.assertTimingAlmostEqual(get.elapsed, 0)
624
625 self.assertRaises(pyqueue.Empty, get_nowait)
626 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
627
628 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
629 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
630
631 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
632 self.assertTimingAlmostEqual(get.elapsed, 0)
633
634 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
635 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
636
637 proc.join()
638
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000639 @classmethod
640 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000641 for i in range(10, 20):
642 queue.put(i)
643 # note that at this point the items may only be buffered, so the
644 # process cannot shutdown until the feeder thread has finished
645 # pushing items onto the pipe.
646
647 def test_fork(self):
648 # Old versions of Queue would fail to create a new feeder
649 # thread for a forked process if the original process had its
650 # own feeder thread. This test checks that this no longer
651 # happens.
652
653 queue = self.Queue()
654
655 # put items on queue so that main process starts a feeder thread
656 for i in range(10):
657 queue.put(i)
658
659 # wait to make sure thread starts before we fork a new process
660 time.sleep(DELTA)
661
662 # fork process
663 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200664 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000665 p.start()
666
667 # check that all expected items are in the queue
668 for i in range(20):
669 self.assertEqual(queue.get(), i)
670 self.assertRaises(pyqueue.Empty, queue.get, False)
671
672 p.join()
673
674 def test_qsize(self):
675 q = self.Queue()
676 try:
677 self.assertEqual(q.qsize(), 0)
678 except NotImplementedError:
Zachary Ware9fe6d862013-12-08 00:20:35 -0600679 self.skipTest('qsize method not implemented')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000680 q.put(1)
681 self.assertEqual(q.qsize(), 1)
682 q.put(5)
683 self.assertEqual(q.qsize(), 2)
684 q.get()
685 self.assertEqual(q.qsize(), 1)
686 q.get()
687 self.assertEqual(q.qsize(), 0)
688
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000689 @classmethod
690 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000691 for obj in iter(q.get, None):
692 time.sleep(DELTA)
693 q.task_done()
694
695 def test_task_done(self):
696 queue = self.JoinableQueue()
697
Benjamin Petersone711caf2008-06-11 16:44:04 +0000698 workers = [self.Process(target=self._test_task_done, args=(queue,))
699 for i in range(4)]
700
701 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200702 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000703 p.start()
704
705 for i in range(10):
706 queue.put(i)
707
708 queue.join()
709
710 for p in workers:
711 queue.put(None)
712
713 for p in workers:
714 p.join()
715
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200716 def test_timeout(self):
717 q = multiprocessing.Queue()
718 start = time.time()
719 self.assertRaises(pyqueue.Empty, q.get, True, 0.2)
720 delta = time.time() - start
Richard Oudkerkb8ec1e32013-11-02 16:46:32 +0000721 self.assertGreaterEqual(delta, 0.18)
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200722
Benjamin Petersone711caf2008-06-11 16:44:04 +0000723#
724#
725#
726
727class _TestLock(BaseTestCase):
728
729 def test_lock(self):
730 lock = self.Lock()
731 self.assertEqual(lock.acquire(), True)
732 self.assertEqual(lock.acquire(False), False)
733 self.assertEqual(lock.release(), None)
734 self.assertRaises((ValueError, threading.ThreadError), lock.release)
735
736 def test_rlock(self):
737 lock = self.RLock()
738 self.assertEqual(lock.acquire(), True)
739 self.assertEqual(lock.acquire(), True)
740 self.assertEqual(lock.acquire(), True)
741 self.assertEqual(lock.release(), None)
742 self.assertEqual(lock.release(), None)
743 self.assertEqual(lock.release(), None)
744 self.assertRaises((AssertionError, RuntimeError), lock.release)
745
Jesse Nollerf8d00852009-03-31 03:25:07 +0000746 def test_lock_context(self):
747 with self.Lock():
748 pass
749
Benjamin Petersone711caf2008-06-11 16:44:04 +0000750
751class _TestSemaphore(BaseTestCase):
752
753 def _test_semaphore(self, sem):
754 self.assertReturnsIfImplemented(2, get_value, sem)
755 self.assertEqual(sem.acquire(), True)
756 self.assertReturnsIfImplemented(1, get_value, sem)
757 self.assertEqual(sem.acquire(), True)
758 self.assertReturnsIfImplemented(0, get_value, sem)
759 self.assertEqual(sem.acquire(False), False)
760 self.assertReturnsIfImplemented(0, get_value, sem)
761 self.assertEqual(sem.release(), None)
762 self.assertReturnsIfImplemented(1, get_value, sem)
763 self.assertEqual(sem.release(), None)
764 self.assertReturnsIfImplemented(2, get_value, sem)
765
766 def test_semaphore(self):
767 sem = self.Semaphore(2)
768 self._test_semaphore(sem)
769 self.assertEqual(sem.release(), None)
770 self.assertReturnsIfImplemented(3, get_value, sem)
771 self.assertEqual(sem.release(), None)
772 self.assertReturnsIfImplemented(4, get_value, sem)
773
774 def test_bounded_semaphore(self):
775 sem = self.BoundedSemaphore(2)
776 self._test_semaphore(sem)
777 # Currently fails on OS/X
778 #if HAVE_GETVALUE:
779 # self.assertRaises(ValueError, sem.release)
780 # self.assertReturnsIfImplemented(2, get_value, sem)
781
782 def test_timeout(self):
783 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600784 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785
786 sem = self.Semaphore(0)
787 acquire = TimingWrapper(sem.acquire)
788
789 self.assertEqual(acquire(False), False)
790 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
791
792 self.assertEqual(acquire(False, None), False)
793 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
794
795 self.assertEqual(acquire(False, TIMEOUT1), False)
796 self.assertTimingAlmostEqual(acquire.elapsed, 0)
797
798 self.assertEqual(acquire(True, TIMEOUT2), False)
799 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
800
801 self.assertEqual(acquire(timeout=TIMEOUT3), False)
802 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
803
804
805class _TestCondition(BaseTestCase):
806
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000807 @classmethod
808 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000809 cond.acquire()
810 sleeping.release()
811 cond.wait(timeout)
812 woken.release()
813 cond.release()
814
815 def check_invariant(self, cond):
816 # this is only supposed to succeed when there are no sleepers
817 if self.TYPE == 'processes':
818 try:
819 sleepers = (cond._sleeping_count.get_value() -
820 cond._woken_count.get_value())
821 self.assertEqual(sleepers, 0)
822 self.assertEqual(cond._wait_semaphore.get_value(), 0)
823 except NotImplementedError:
824 pass
825
826 def test_notify(self):
827 cond = self.Condition()
828 sleeping = self.Semaphore(0)
829 woken = self.Semaphore(0)
830
831 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000832 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000833 p.start()
834
835 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000836 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000837 p.start()
838
839 # wait for both children to start sleeping
840 sleeping.acquire()
841 sleeping.acquire()
842
843 # check no process/thread has woken up
844 time.sleep(DELTA)
845 self.assertReturnsIfImplemented(0, get_value, woken)
846
847 # wake up one process/thread
848 cond.acquire()
849 cond.notify()
850 cond.release()
851
852 # check one process/thread has woken up
853 time.sleep(DELTA)
854 self.assertReturnsIfImplemented(1, get_value, woken)
855
856 # wake up another
857 cond.acquire()
858 cond.notify()
859 cond.release()
860
861 # check other has woken up
862 time.sleep(DELTA)
863 self.assertReturnsIfImplemented(2, get_value, woken)
864
865 # check state is not mucked up
866 self.check_invariant(cond)
867 p.join()
868
869 def test_notify_all(self):
870 cond = self.Condition()
871 sleeping = self.Semaphore(0)
872 woken = self.Semaphore(0)
873
874 # start some threads/processes which will timeout
875 for i in range(3):
876 p = self.Process(target=self.f,
877 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000878 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000879 p.start()
880
881 t = threading.Thread(target=self.f,
882 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000883 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000884 t.start()
885
886 # wait for them all to sleep
887 for i in range(6):
888 sleeping.acquire()
889
890 # check they have all timed out
891 for i in range(6):
892 woken.acquire()
893 self.assertReturnsIfImplemented(0, get_value, woken)
894
895 # check state is not mucked up
896 self.check_invariant(cond)
897
898 # start some more threads/processes
899 for i in range(3):
900 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000901 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000902 p.start()
903
904 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000905 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000906 t.start()
907
908 # wait for them to all sleep
909 for i in range(6):
910 sleeping.acquire()
911
912 # check no process/thread has woken up
913 time.sleep(DELTA)
914 self.assertReturnsIfImplemented(0, get_value, woken)
915
916 # wake them all up
917 cond.acquire()
918 cond.notify_all()
919 cond.release()
920
921 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200922 for i in range(10):
923 try:
924 if get_value(woken) == 6:
925 break
926 except NotImplementedError:
927 break
928 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000929 self.assertReturnsIfImplemented(6, get_value, woken)
930
931 # check state is not mucked up
932 self.check_invariant(cond)
933
934 def test_timeout(self):
935 cond = self.Condition()
936 wait = TimingWrapper(cond.wait)
937 cond.acquire()
938 res = wait(TIMEOUT1)
939 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000940 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000941 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
942
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200943 @classmethod
944 def _test_waitfor_f(cls, cond, state):
945 with cond:
946 state.value = 0
947 cond.notify()
948 result = cond.wait_for(lambda : state.value==4)
949 if not result or state.value != 4:
950 sys.exit(1)
951
952 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
953 def test_waitfor(self):
954 # based on test in test/lock_tests.py
955 cond = self.Condition()
956 state = self.Value('i', -1)
957
958 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
959 p.daemon = True
960 p.start()
961
962 with cond:
963 result = cond.wait_for(lambda : state.value==0)
964 self.assertTrue(result)
965 self.assertEqual(state.value, 0)
966
967 for i in range(4):
968 time.sleep(0.01)
969 with cond:
970 state.value += 1
971 cond.notify()
972
973 p.join(5)
974 self.assertFalse(p.is_alive())
975 self.assertEqual(p.exitcode, 0)
976
977 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100978 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
979 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200980 with cond:
981 expected = 0.1
982 dt = time.time()
983 result = cond.wait_for(lambda : state.value==4, timeout=expected)
984 dt = time.time() - dt
985 # borrow logic in assertTimeout() from test/lock_tests.py
986 if not result and expected * 0.6 < dt < expected * 10.0:
987 success.value = True
988
989 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
990 def test_waitfor_timeout(self):
991 # based on test in test/lock_tests.py
992 cond = self.Condition()
993 state = self.Value('i', 0)
994 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100995 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200996
997 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100998 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200999 p.daemon = True
1000 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001001 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001002
1003 # Only increment 3 times, so state == 4 is never reached.
1004 for i in range(3):
1005 time.sleep(0.01)
1006 with cond:
1007 state.value += 1
1008 cond.notify()
1009
1010 p.join(5)
1011 self.assertTrue(success.value)
1012
Richard Oudkerk98449932012-06-05 13:15:29 +01001013 @classmethod
1014 def _test_wait_result(cls, c, pid):
1015 with c:
1016 c.notify()
1017 time.sleep(1)
1018 if pid is not None:
1019 os.kill(pid, signal.SIGINT)
1020
1021 def test_wait_result(self):
1022 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1023 pid = os.getpid()
1024 else:
1025 pid = None
1026
1027 c = self.Condition()
1028 with c:
1029 self.assertFalse(c.wait(0))
1030 self.assertFalse(c.wait(0.1))
1031
1032 p = self.Process(target=self._test_wait_result, args=(c, pid))
1033 p.start()
1034
1035 self.assertTrue(c.wait(10))
1036 if pid is not None:
1037 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1038
1039 p.join()
1040
Benjamin Petersone711caf2008-06-11 16:44:04 +00001041
1042class _TestEvent(BaseTestCase):
1043
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001044 @classmethod
1045 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001046 time.sleep(TIMEOUT2)
1047 event.set()
1048
1049 def test_event(self):
1050 event = self.Event()
1051 wait = TimingWrapper(event.wait)
1052
Ezio Melotti13925002011-03-16 11:05:33 +02001053 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001054 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001055 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001056
Benjamin Peterson965ce872009-04-05 21:24:58 +00001057 # Removed, threading.Event.wait() will return the value of the __flag
1058 # instead of None. API Shear with the semaphore backed mp.Event
1059 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001060 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001061 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001062 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1063
1064 event.set()
1065
1066 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001067 self.assertEqual(event.is_set(), True)
1068 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001069 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001070 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001071 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1072 # self.assertEqual(event.is_set(), True)
1073
1074 event.clear()
1075
1076 #self.assertEqual(event.is_set(), False)
1077
Jesus Cea94f964f2011-09-09 20:26:57 +02001078 p = self.Process(target=self._test_event, args=(event,))
1079 p.daemon = True
1080 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001081 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001082
1083#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001084# Tests for Barrier - adapted from tests in test/lock_tests.py
1085#
1086
1087# Many of the tests for threading.Barrier use a list as an atomic
1088# counter: a value is appended to increment the counter, and the
1089# length of the list gives the value. We use the class DummyList
1090# for the same purpose.
1091
1092class _DummyList(object):
1093
1094 def __init__(self):
1095 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1096 lock = multiprocessing.Lock()
1097 self.__setstate__((wrapper, lock))
1098 self._lengthbuf[0] = 0
1099
1100 def __setstate__(self, state):
1101 (self._wrapper, self._lock) = state
1102 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1103
1104 def __getstate__(self):
1105 return (self._wrapper, self._lock)
1106
1107 def append(self, _):
1108 with self._lock:
1109 self._lengthbuf[0] += 1
1110
1111 def __len__(self):
1112 with self._lock:
1113 return self._lengthbuf[0]
1114
1115def _wait():
1116 # A crude wait/yield function not relying on synchronization primitives.
1117 time.sleep(0.01)
1118
1119
1120class Bunch(object):
1121 """
1122 A bunch of threads.
1123 """
1124 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1125 """
1126 Construct a bunch of `n` threads running the same function `f`.
1127 If `wait_before_exit` is True, the threads won't terminate until
1128 do_finish() is called.
1129 """
1130 self.f = f
1131 self.args = args
1132 self.n = n
1133 self.started = namespace.DummyList()
1134 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001135 self._can_exit = namespace.Event()
1136 if not wait_before_exit:
1137 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001138 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001139 p = namespace.Process(target=self.task)
1140 p.daemon = True
1141 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001142
1143 def task(self):
1144 pid = os.getpid()
1145 self.started.append(pid)
1146 try:
1147 self.f(*self.args)
1148 finally:
1149 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001150 self._can_exit.wait(30)
1151 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001152
1153 def wait_for_started(self):
1154 while len(self.started) < self.n:
1155 _wait()
1156
1157 def wait_for_finished(self):
1158 while len(self.finished) < self.n:
1159 _wait()
1160
1161 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001162 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001163
1164
1165class AppendTrue(object):
1166 def __init__(self, obj):
1167 self.obj = obj
1168 def __call__(self):
1169 self.obj.append(True)
1170
1171
1172class _TestBarrier(BaseTestCase):
1173 """
1174 Tests for Barrier objects.
1175 """
1176 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001177 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001178
1179 def setUp(self):
1180 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1181
1182 def tearDown(self):
1183 self.barrier.abort()
1184 self.barrier = None
1185
1186 def DummyList(self):
1187 if self.TYPE == 'threads':
1188 return []
1189 elif self.TYPE == 'manager':
1190 return self.manager.list()
1191 else:
1192 return _DummyList()
1193
1194 def run_threads(self, f, args):
1195 b = Bunch(self, f, args, self.N-1)
1196 f(*args)
1197 b.wait_for_finished()
1198
1199 @classmethod
1200 def multipass(cls, barrier, results, n):
1201 m = barrier.parties
1202 assert m == cls.N
1203 for i in range(n):
1204 results[0].append(True)
1205 assert len(results[1]) == i * m
1206 barrier.wait()
1207 results[1].append(True)
1208 assert len(results[0]) == (i + 1) * m
1209 barrier.wait()
1210 try:
1211 assert barrier.n_waiting == 0
1212 except NotImplementedError:
1213 pass
1214 assert not barrier.broken
1215
1216 def test_barrier(self, passes=1):
1217 """
1218 Test that a barrier is passed in lockstep
1219 """
1220 results = [self.DummyList(), self.DummyList()]
1221 self.run_threads(self.multipass, (self.barrier, results, passes))
1222
1223 def test_barrier_10(self):
1224 """
1225 Test that a barrier works for 10 consecutive runs
1226 """
1227 return self.test_barrier(10)
1228
1229 @classmethod
1230 def _test_wait_return_f(cls, barrier, queue):
1231 res = barrier.wait()
1232 queue.put(res)
1233
1234 def test_wait_return(self):
1235 """
1236 test the return value from barrier.wait
1237 """
1238 queue = self.Queue()
1239 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1240 results = [queue.get() for i in range(self.N)]
1241 self.assertEqual(results.count(0), 1)
1242
1243 @classmethod
1244 def _test_action_f(cls, barrier, results):
1245 barrier.wait()
1246 if len(results) != 1:
1247 raise RuntimeError
1248
1249 def test_action(self):
1250 """
1251 Test the 'action' callback
1252 """
1253 results = self.DummyList()
1254 barrier = self.Barrier(self.N, action=AppendTrue(results))
1255 self.run_threads(self._test_action_f, (barrier, results))
1256 self.assertEqual(len(results), 1)
1257
1258 @classmethod
1259 def _test_abort_f(cls, barrier, results1, results2):
1260 try:
1261 i = barrier.wait()
1262 if i == cls.N//2:
1263 raise RuntimeError
1264 barrier.wait()
1265 results1.append(True)
1266 except threading.BrokenBarrierError:
1267 results2.append(True)
1268 except RuntimeError:
1269 barrier.abort()
1270
1271 def test_abort(self):
1272 """
1273 Test that an abort will put the barrier in a broken state
1274 """
1275 results1 = self.DummyList()
1276 results2 = self.DummyList()
1277 self.run_threads(self._test_abort_f,
1278 (self.barrier, results1, results2))
1279 self.assertEqual(len(results1), 0)
1280 self.assertEqual(len(results2), self.N-1)
1281 self.assertTrue(self.barrier.broken)
1282
1283 @classmethod
1284 def _test_reset_f(cls, barrier, results1, results2, results3):
1285 i = barrier.wait()
1286 if i == cls.N//2:
1287 # Wait until the other threads are all in the barrier.
1288 while barrier.n_waiting < cls.N-1:
1289 time.sleep(0.001)
1290 barrier.reset()
1291 else:
1292 try:
1293 barrier.wait()
1294 results1.append(True)
1295 except threading.BrokenBarrierError:
1296 results2.append(True)
1297 # Now, pass the barrier again
1298 barrier.wait()
1299 results3.append(True)
1300
1301 def test_reset(self):
1302 """
1303 Test that a 'reset' on a barrier frees the waiting threads
1304 """
1305 results1 = self.DummyList()
1306 results2 = self.DummyList()
1307 results3 = self.DummyList()
1308 self.run_threads(self._test_reset_f,
1309 (self.barrier, results1, results2, results3))
1310 self.assertEqual(len(results1), 0)
1311 self.assertEqual(len(results2), self.N-1)
1312 self.assertEqual(len(results3), self.N)
1313
1314 @classmethod
1315 def _test_abort_and_reset_f(cls, barrier, barrier2,
1316 results1, results2, results3):
1317 try:
1318 i = barrier.wait()
1319 if i == cls.N//2:
1320 raise RuntimeError
1321 barrier.wait()
1322 results1.append(True)
1323 except threading.BrokenBarrierError:
1324 results2.append(True)
1325 except RuntimeError:
1326 barrier.abort()
1327 # Synchronize and reset the barrier. Must synchronize first so
1328 # that everyone has left it when we reset, and after so that no
1329 # one enters it before the reset.
1330 if barrier2.wait() == cls.N//2:
1331 barrier.reset()
1332 barrier2.wait()
1333 barrier.wait()
1334 results3.append(True)
1335
1336 def test_abort_and_reset(self):
1337 """
1338 Test that a barrier can be reset after being broken.
1339 """
1340 results1 = self.DummyList()
1341 results2 = self.DummyList()
1342 results3 = self.DummyList()
1343 barrier2 = self.Barrier(self.N)
1344
1345 self.run_threads(self._test_abort_and_reset_f,
1346 (self.barrier, barrier2, results1, results2, results3))
1347 self.assertEqual(len(results1), 0)
1348 self.assertEqual(len(results2), self.N-1)
1349 self.assertEqual(len(results3), self.N)
1350
1351 @classmethod
1352 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001353 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001354 if i == cls.N//2:
1355 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001356 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001357 try:
1358 barrier.wait(0.5)
1359 except threading.BrokenBarrierError:
1360 results.append(True)
1361
1362 def test_timeout(self):
1363 """
1364 Test wait(timeout)
1365 """
1366 results = self.DummyList()
1367 self.run_threads(self._test_timeout_f, (self.barrier, results))
1368 self.assertEqual(len(results), self.barrier.parties)
1369
1370 @classmethod
1371 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001372 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001373 if i == cls.N//2:
1374 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001375 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001376 try:
1377 barrier.wait()
1378 except threading.BrokenBarrierError:
1379 results.append(True)
1380
1381 def test_default_timeout(self):
1382 """
1383 Test the barrier's default timeout
1384 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001385 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001386 results = self.DummyList()
1387 self.run_threads(self._test_default_timeout_f, (barrier, results))
1388 self.assertEqual(len(results), barrier.parties)
1389
1390 def test_single_thread(self):
1391 b = self.Barrier(1)
1392 b.wait()
1393 b.wait()
1394
1395 @classmethod
1396 def _test_thousand_f(cls, barrier, passes, conn, lock):
1397 for i in range(passes):
1398 barrier.wait()
1399 with lock:
1400 conn.send(i)
1401
1402 def test_thousand(self):
1403 if self.TYPE == 'manager':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001404 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk3730a172012-06-15 18:26:07 +01001405 passes = 1000
1406 lock = self.Lock()
1407 conn, child_conn = self.Pipe(False)
1408 for j in range(self.N):
1409 p = self.Process(target=self._test_thousand_f,
1410 args=(self.barrier, passes, child_conn, lock))
1411 p.start()
1412
1413 for i in range(passes):
1414 for j in range(self.N):
1415 self.assertEqual(conn.recv(), i)
1416
1417#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001418#
1419#
1420
1421class _TestValue(BaseTestCase):
1422
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001423 ALLOWED_TYPES = ('processes',)
1424
Benjamin Petersone711caf2008-06-11 16:44:04 +00001425 codes_values = [
1426 ('i', 4343, 24234),
1427 ('d', 3.625, -4.25),
1428 ('h', -232, 234),
1429 ('c', latin('x'), latin('y'))
1430 ]
1431
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001432 def setUp(self):
1433 if not HAS_SHAREDCTYPES:
1434 self.skipTest("requires multiprocessing.sharedctypes")
1435
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001436 @classmethod
1437 def _test(cls, values):
1438 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001439 sv.value = cv[2]
1440
1441
1442 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001443 if raw:
1444 values = [self.RawValue(code, value)
1445 for code, value, _ in self.codes_values]
1446 else:
1447 values = [self.Value(code, value)
1448 for code, value, _ in self.codes_values]
1449
1450 for sv, cv in zip(values, self.codes_values):
1451 self.assertEqual(sv.value, cv[1])
1452
1453 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001454 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001455 proc.start()
1456 proc.join()
1457
1458 for sv, cv in zip(values, self.codes_values):
1459 self.assertEqual(sv.value, cv[2])
1460
1461 def test_rawvalue(self):
1462 self.test_value(raw=True)
1463
1464 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001465 val1 = self.Value('i', 5)
1466 lock1 = val1.get_lock()
1467 obj1 = val1.get_obj()
1468
1469 val2 = self.Value('i', 5, lock=None)
1470 lock2 = val2.get_lock()
1471 obj2 = val2.get_obj()
1472
1473 lock = self.Lock()
1474 val3 = self.Value('i', 5, lock=lock)
1475 lock3 = val3.get_lock()
1476 obj3 = val3.get_obj()
1477 self.assertEqual(lock, lock3)
1478
Jesse Nollerb0516a62009-01-18 03:11:38 +00001479 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001480 self.assertFalse(hasattr(arr4, 'get_lock'))
1481 self.assertFalse(hasattr(arr4, 'get_obj'))
1482
Jesse Nollerb0516a62009-01-18 03:11:38 +00001483 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1484
1485 arr5 = self.RawValue('i', 5)
1486 self.assertFalse(hasattr(arr5, 'get_lock'))
1487 self.assertFalse(hasattr(arr5, 'get_obj'))
1488
Benjamin Petersone711caf2008-06-11 16:44:04 +00001489
1490class _TestArray(BaseTestCase):
1491
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001492 ALLOWED_TYPES = ('processes',)
1493
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001494 @classmethod
1495 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001496 for i in range(1, len(seq)):
1497 seq[i] += seq[i-1]
1498
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001499 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001500 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001501 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1502 if raw:
1503 arr = self.RawArray('i', seq)
1504 else:
1505 arr = self.Array('i', seq)
1506
1507 self.assertEqual(len(arr), len(seq))
1508 self.assertEqual(arr[3], seq[3])
1509 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1510
1511 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1512
1513 self.assertEqual(list(arr[:]), seq)
1514
1515 self.f(seq)
1516
1517 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001518 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001519 p.start()
1520 p.join()
1521
1522 self.assertEqual(list(arr[:]), seq)
1523
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001524 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001525 def test_array_from_size(self):
1526 size = 10
1527 # Test for zeroing (see issue #11675).
1528 # The repetition below strengthens the test by increasing the chances
1529 # of previously allocated non-zero memory being used for the new array
1530 # on the 2nd and 3rd loops.
1531 for _ in range(3):
1532 arr = self.Array('i', size)
1533 self.assertEqual(len(arr), size)
1534 self.assertEqual(list(arr), [0] * size)
1535 arr[:] = range(10)
1536 self.assertEqual(list(arr), list(range(10)))
1537 del arr
1538
1539 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001540 def test_rawarray(self):
1541 self.test_array(raw=True)
1542
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001543 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001544 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001545 arr1 = self.Array('i', list(range(10)))
1546 lock1 = arr1.get_lock()
1547 obj1 = arr1.get_obj()
1548
1549 arr2 = self.Array('i', list(range(10)), lock=None)
1550 lock2 = arr2.get_lock()
1551 obj2 = arr2.get_obj()
1552
1553 lock = self.Lock()
1554 arr3 = self.Array('i', list(range(10)), lock=lock)
1555 lock3 = arr3.get_lock()
1556 obj3 = arr3.get_obj()
1557 self.assertEqual(lock, lock3)
1558
Jesse Nollerb0516a62009-01-18 03:11:38 +00001559 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001560 self.assertFalse(hasattr(arr4, 'get_lock'))
1561 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001562 self.assertRaises(AttributeError,
1563 self.Array, 'i', range(10), lock='notalock')
1564
1565 arr5 = self.RawArray('i', range(10))
1566 self.assertFalse(hasattr(arr5, 'get_lock'))
1567 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001568
1569#
1570#
1571#
1572
1573class _TestContainers(BaseTestCase):
1574
1575 ALLOWED_TYPES = ('manager',)
1576
1577 def test_list(self):
1578 a = self.list(list(range(10)))
1579 self.assertEqual(a[:], list(range(10)))
1580
1581 b = self.list()
1582 self.assertEqual(b[:], [])
1583
1584 b.extend(list(range(5)))
1585 self.assertEqual(b[:], list(range(5)))
1586
1587 self.assertEqual(b[2], 2)
1588 self.assertEqual(b[2:10], [2,3,4])
1589
1590 b *= 2
1591 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1592
1593 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1594
1595 self.assertEqual(a[:], list(range(10)))
1596
1597 d = [a, b]
1598 e = self.list(d)
1599 self.assertEqual(
1600 e[:],
1601 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1602 )
1603
1604 f = self.list([a])
1605 a.append('hello')
1606 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1607
1608 def test_dict(self):
1609 d = self.dict()
1610 indices = list(range(65, 70))
1611 for i in indices:
1612 d[i] = chr(i)
1613 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1614 self.assertEqual(sorted(d.keys()), indices)
1615 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1616 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1617
1618 def test_namespace(self):
1619 n = self.Namespace()
1620 n.name = 'Bob'
1621 n.job = 'Builder'
1622 n._hidden = 'hidden'
1623 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1624 del n.job
1625 self.assertEqual(str(n), "Namespace(name='Bob')")
1626 self.assertTrue(hasattr(n, 'name'))
1627 self.assertTrue(not hasattr(n, 'job'))
1628
1629#
1630#
1631#
1632
1633def sqr(x, wait=0.0):
1634 time.sleep(wait)
1635 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001636
Antoine Pitroude911b22011-12-21 11:03:24 +01001637def mul(x, y):
1638 return x*y
1639
Benjamin Petersone711caf2008-06-11 16:44:04 +00001640class _TestPool(BaseTestCase):
1641
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01001642 @classmethod
1643 def setUpClass(cls):
1644 super().setUpClass()
1645 cls.pool = cls.Pool(4)
1646
1647 @classmethod
1648 def tearDownClass(cls):
1649 cls.pool.terminate()
1650 cls.pool.join()
1651 cls.pool = None
1652 super().tearDownClass()
1653
Benjamin Petersone711caf2008-06-11 16:44:04 +00001654 def test_apply(self):
1655 papply = self.pool.apply
1656 self.assertEqual(papply(sqr, (5,)), sqr(5))
1657 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1658
1659 def test_map(self):
1660 pmap = self.pool.map
1661 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1662 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1663 list(map(sqr, list(range(100)))))
1664
Antoine Pitroude911b22011-12-21 11:03:24 +01001665 def test_starmap(self):
1666 psmap = self.pool.starmap
1667 tuples = list(zip(range(10), range(9,-1, -1)))
1668 self.assertEqual(psmap(mul, tuples),
1669 list(itertools.starmap(mul, tuples)))
1670 tuples = list(zip(range(100), range(99,-1, -1)))
1671 self.assertEqual(psmap(mul, tuples, chunksize=20),
1672 list(itertools.starmap(mul, tuples)))
1673
1674 def test_starmap_async(self):
1675 tuples = list(zip(range(100), range(99,-1, -1)))
1676 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1677 list(itertools.starmap(mul, tuples)))
1678
Hynek Schlawack254af262012-10-27 12:53:02 +02001679 def test_map_async(self):
1680 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1681 list(map(sqr, list(range(10)))))
1682
1683 def test_map_async_callbacks(self):
1684 call_args = self.manager.list() if self.TYPE == 'manager' else []
1685 self.pool.map_async(int, ['1'],
1686 callback=call_args.append,
1687 error_callback=call_args.append).wait()
1688 self.assertEqual(1, len(call_args))
1689 self.assertEqual([1], call_args[0])
1690 self.pool.map_async(int, ['a'],
1691 callback=call_args.append,
1692 error_callback=call_args.append).wait()
1693 self.assertEqual(2, len(call_args))
1694 self.assertIsInstance(call_args[1], ValueError)
1695
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001696 def test_map_unplicklable(self):
1697 # Issue #19425 -- failure to pickle should not cause a hang
1698 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001699 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001700 class A(object):
1701 def __reduce__(self):
1702 raise RuntimeError('cannot pickle')
1703 with self.assertRaises(RuntimeError):
1704 self.pool.map(sqr, [A()]*10)
1705
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001706 def test_map_chunksize(self):
1707 try:
1708 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1709 except multiprocessing.TimeoutError:
1710 self.fail("pool.map_async with chunksize stalled on null list")
1711
Benjamin Petersone711caf2008-06-11 16:44:04 +00001712 def test_async(self):
1713 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1714 get = TimingWrapper(res.get)
1715 self.assertEqual(get(), 49)
1716 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1717
1718 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00001719 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001720 get = TimingWrapper(res.get)
1721 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1722 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1723
1724 def test_imap(self):
1725 it = self.pool.imap(sqr, list(range(10)))
1726 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1727
1728 it = self.pool.imap(sqr, list(range(10)))
1729 for i in range(10):
1730 self.assertEqual(next(it), i*i)
1731 self.assertRaises(StopIteration, it.__next__)
1732
1733 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1734 for i in range(1000):
1735 self.assertEqual(next(it), i*i)
1736 self.assertRaises(StopIteration, it.__next__)
1737
1738 def test_imap_unordered(self):
1739 it = self.pool.imap_unordered(sqr, list(range(1000)))
1740 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1741
1742 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1743 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1744
1745 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001746 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1747 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1748
Benjamin Petersone711caf2008-06-11 16:44:04 +00001749 p = multiprocessing.Pool(3)
1750 self.assertEqual(3, len(p._pool))
1751 p.close()
1752 p.join()
1753
1754 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001755 result = self.pool.map_async(
1756 time.sleep, [0.1 for i in range(10000)], chunksize=1
1757 )
1758 self.pool.terminate()
1759 join = TimingWrapper(self.pool.join)
1760 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001761 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001762
Richard Oudkerke41682b2012-06-06 19:04:57 +01001763 def test_empty_iterable(self):
1764 # See Issue 12157
1765 p = self.Pool(1)
1766
1767 self.assertEqual(p.map(sqr, []), [])
1768 self.assertEqual(list(p.imap(sqr, [])), [])
1769 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1770 self.assertEqual(p.map_async(sqr, []).get(), [])
1771
1772 p.close()
1773 p.join()
1774
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001775 def test_context(self):
1776 if self.TYPE == 'processes':
1777 L = list(range(10))
1778 expected = [sqr(i) for i in L]
1779 with multiprocessing.Pool(2) as p:
1780 r = p.map_async(sqr, L)
1781 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001782 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001783
Richard Oudkerk85757832013-05-06 11:38:25 +01001784 @classmethod
1785 def _test_traceback(cls):
1786 raise RuntimeError(123) # some comment
1787
1788 def test_traceback(self):
1789 # We want ensure that the traceback from the child process is
1790 # contained in the traceback raised in the main process.
1791 if self.TYPE == 'processes':
1792 with self.Pool(1) as p:
1793 try:
1794 p.apply(self._test_traceback)
1795 except Exception as e:
1796 exc = e
1797 else:
1798 raise AssertionError('expected RuntimeError')
1799 self.assertIs(type(exc), RuntimeError)
1800 self.assertEqual(exc.args, (123,))
1801 cause = exc.__cause__
1802 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
1803 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
1804
1805 with test.support.captured_stderr() as f1:
1806 try:
1807 raise exc
1808 except RuntimeError:
1809 sys.excepthook(*sys.exc_info())
1810 self.assertIn('raise RuntimeError(123) # some comment',
1811 f1.getvalue())
1812
Richard Oudkerk80a5be12014-03-23 12:30:54 +00001813 @classmethod
1814 def _test_wrapped_exception(cls):
1815 raise RuntimeError('foo')
1816
1817 def test_wrapped_exception(self):
1818 # Issue #20980: Should not wrap exception when using thread pool
1819 with self.Pool(1) as p:
1820 with self.assertRaises(RuntimeError):
1821 p.apply(self._test_wrapped_exception)
1822
1823
Ask Solem2afcbf22010-11-09 20:55:52 +00001824def raising():
1825 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001826
Ask Solem2afcbf22010-11-09 20:55:52 +00001827def unpickleable_result():
1828 return lambda: 42
1829
1830class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001831 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001832
1833 def test_async_error_callback(self):
1834 p = multiprocessing.Pool(2)
1835
1836 scratchpad = [None]
1837 def errback(exc):
1838 scratchpad[0] = exc
1839
1840 res = p.apply_async(raising, error_callback=errback)
1841 self.assertRaises(KeyError, res.get)
1842 self.assertTrue(scratchpad[0])
1843 self.assertIsInstance(scratchpad[0], KeyError)
1844
1845 p.close()
1846 p.join()
1847
1848 def test_unpickleable_result(self):
1849 from multiprocessing.pool import MaybeEncodingError
1850 p = multiprocessing.Pool(2)
1851
1852 # Make sure we don't lose pool processes because of encoding errors.
1853 for iteration in range(20):
1854
1855 scratchpad = [None]
1856 def errback(exc):
1857 scratchpad[0] = exc
1858
1859 res = p.apply_async(unpickleable_result, error_callback=errback)
1860 self.assertRaises(MaybeEncodingError, res.get)
1861 wrapped = scratchpad[0]
1862 self.assertTrue(wrapped)
1863 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1864 self.assertIsNotNone(wrapped.exc)
1865 self.assertIsNotNone(wrapped.value)
1866
1867 p.close()
1868 p.join()
1869
1870class _TestPoolWorkerLifetime(BaseTestCase):
1871 ALLOWED_TYPES = ('processes', )
1872
Jesse Noller1f0b6582010-01-27 03:36:01 +00001873 def test_pool_worker_lifetime(self):
1874 p = multiprocessing.Pool(3, maxtasksperchild=10)
1875 self.assertEqual(3, len(p._pool))
1876 origworkerpids = [w.pid for w in p._pool]
1877 # Run many tasks so each worker gets replaced (hopefully)
1878 results = []
1879 for i in range(100):
1880 results.append(p.apply_async(sqr, (i, )))
1881 # Fetch the results and verify we got the right answers,
1882 # also ensuring all the tasks have completed.
1883 for (j, res) in enumerate(results):
1884 self.assertEqual(res.get(), sqr(j))
1885 # Refill the pool
1886 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001887 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001888 # (countdown * DELTA = 5 seconds max startup process time)
1889 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001890 while countdown and not all(w.is_alive() for w in p._pool):
1891 countdown -= 1
1892 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001893 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001894 # All pids should be assigned. See issue #7805.
1895 self.assertNotIn(None, origworkerpids)
1896 self.assertNotIn(None, finalworkerpids)
1897 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001898 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1899 p.close()
1900 p.join()
1901
Charles-François Natalif8859e12011-10-24 18:45:29 +02001902 def test_pool_worker_lifetime_early_close(self):
1903 # Issue #10332: closing a pool whose workers have limited lifetimes
1904 # before all the tasks completed would make join() hang.
1905 p = multiprocessing.Pool(3, maxtasksperchild=1)
1906 results = []
1907 for i in range(6):
1908 results.append(p.apply_async(sqr, (i, 0.3)))
1909 p.close()
1910 p.join()
1911 # check the results
1912 for (j, res) in enumerate(results):
1913 self.assertEqual(res.get(), sqr(j))
1914
Benjamin Petersone711caf2008-06-11 16:44:04 +00001915#
1916# Test of creating a customized manager class
1917#
1918
1919from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1920
1921class FooBar(object):
1922 def f(self):
1923 return 'f()'
1924 def g(self):
1925 raise ValueError
1926 def _h(self):
1927 return '_h()'
1928
1929def baz():
1930 for i in range(10):
1931 yield i*i
1932
1933class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001934 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001935 def __iter__(self):
1936 return self
1937 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001938 return self._callmethod('__next__')
1939
1940class MyManager(BaseManager):
1941 pass
1942
1943MyManager.register('Foo', callable=FooBar)
1944MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1945MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1946
1947
1948class _TestMyManager(BaseTestCase):
1949
1950 ALLOWED_TYPES = ('manager',)
1951
1952 def test_mymanager(self):
1953 manager = MyManager()
1954 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001955 self.common(manager)
1956 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001957
Richard Oudkerkac385712012-06-18 21:29:30 +01001958 # If the manager process exited cleanly then the exitcode
1959 # will be zero. Otherwise (after a short timeout)
1960 # terminate() is used, resulting in an exitcode of -SIGTERM.
1961 self.assertEqual(manager._process.exitcode, 0)
1962
1963 def test_mymanager_context(self):
1964 with MyManager() as manager:
1965 self.common(manager)
1966 self.assertEqual(manager._process.exitcode, 0)
1967
1968 def test_mymanager_context_prestarted(self):
1969 manager = MyManager()
1970 manager.start()
1971 with manager:
1972 self.common(manager)
1973 self.assertEqual(manager._process.exitcode, 0)
1974
1975 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001976 foo = manager.Foo()
1977 bar = manager.Bar()
1978 baz = manager.baz()
1979
1980 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1981 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1982
1983 self.assertEqual(foo_methods, ['f', 'g'])
1984 self.assertEqual(bar_methods, ['f', '_h'])
1985
1986 self.assertEqual(foo.f(), 'f()')
1987 self.assertRaises(ValueError, foo.g)
1988 self.assertEqual(foo._callmethod('f'), 'f()')
1989 self.assertRaises(RemoteError, foo._callmethod, '_h')
1990
1991 self.assertEqual(bar.f(), 'f()')
1992 self.assertEqual(bar._h(), '_h()')
1993 self.assertEqual(bar._callmethod('f'), 'f()')
1994 self.assertEqual(bar._callmethod('_h'), '_h()')
1995
1996 self.assertEqual(list(baz), [i*i for i in range(10)])
1997
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001998
Benjamin Petersone711caf2008-06-11 16:44:04 +00001999#
2000# Test of connecting to a remote server and using xmlrpclib for serialization
2001#
2002
2003_queue = pyqueue.Queue()
2004def get_queue():
2005 return _queue
2006
2007class QueueManager(BaseManager):
2008 '''manager class used by server process'''
2009QueueManager.register('get_queue', callable=get_queue)
2010
2011class QueueManager2(BaseManager):
2012 '''manager class which specifies the same interface as QueueManager'''
2013QueueManager2.register('get_queue')
2014
2015
2016SERIALIZER = 'xmlrpclib'
2017
2018class _TestRemoteManager(BaseTestCase):
2019
2020 ALLOWED_TYPES = ('manager',)
2021
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002022 @classmethod
2023 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002024 manager = QueueManager2(
2025 address=address, authkey=authkey, serializer=SERIALIZER
2026 )
2027 manager.connect()
2028 queue = manager.get_queue()
2029 queue.put(('hello world', None, True, 2.25))
2030
2031 def test_remote(self):
2032 authkey = os.urandom(32)
2033
2034 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002035 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00002036 )
2037 manager.start()
2038
2039 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002040 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002041 p.start()
2042
2043 manager2 = QueueManager2(
2044 address=manager.address, authkey=authkey, serializer=SERIALIZER
2045 )
2046 manager2.connect()
2047 queue = manager2.get_queue()
2048
2049 # Note that xmlrpclib will deserialize object as a list not a tuple
2050 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
2051
2052 # Because we are using xmlrpclib for serialization instead of
2053 # pickle this will cause a serialization error.
2054 self.assertRaises(Exception, queue.put, time.sleep)
2055
2056 # Make queue finalizer run before the server is stopped
2057 del queue
2058 manager.shutdown()
2059
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002060class _TestManagerRestart(BaseTestCase):
2061
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002062 @classmethod
2063 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002064 manager = QueueManager(
2065 address=address, authkey=authkey, serializer=SERIALIZER)
2066 manager.connect()
2067 queue = manager.get_queue()
2068 queue.put('hello world')
2069
2070 def test_rapid_restart(self):
2071 authkey = os.urandom(32)
2072 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002073 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002074 srvr = manager.get_server()
2075 addr = srvr.address
2076 # Close the connection.Listener socket which gets opened as a part
2077 # of manager.get_server(). It's not needed for the test.
2078 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002079 manager.start()
2080
2081 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002082 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002083 p.start()
2084 queue = manager.get_queue()
2085 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002086 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002087 manager.shutdown()
2088 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002089 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002090 try:
2091 manager.start()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002092 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002093 if e.errno != errno.EADDRINUSE:
2094 raise
2095 # Retry after some time, in case the old socket was lingering
2096 # (sporadic failure on buildbots)
2097 time.sleep(1.0)
2098 manager = QueueManager(
2099 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002100 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002101
Benjamin Petersone711caf2008-06-11 16:44:04 +00002102#
2103#
2104#
2105
2106SENTINEL = latin('')
2107
2108class _TestConnection(BaseTestCase):
2109
2110 ALLOWED_TYPES = ('processes', 'threads')
2111
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002112 @classmethod
2113 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002114 for msg in iter(conn.recv_bytes, SENTINEL):
2115 conn.send_bytes(msg)
2116 conn.close()
2117
2118 def test_connection(self):
2119 conn, child_conn = self.Pipe()
2120
2121 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002122 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002123 p.start()
2124
2125 seq = [1, 2.25, None]
2126 msg = latin('hello world')
2127 longmsg = msg * 10
2128 arr = array.array('i', list(range(4)))
2129
2130 if self.TYPE == 'processes':
2131 self.assertEqual(type(conn.fileno()), int)
2132
2133 self.assertEqual(conn.send(seq), None)
2134 self.assertEqual(conn.recv(), seq)
2135
2136 self.assertEqual(conn.send_bytes(msg), None)
2137 self.assertEqual(conn.recv_bytes(), msg)
2138
2139 if self.TYPE == 'processes':
2140 buffer = array.array('i', [0]*10)
2141 expected = list(arr) + [0] * (10 - len(arr))
2142 self.assertEqual(conn.send_bytes(arr), None)
2143 self.assertEqual(conn.recv_bytes_into(buffer),
2144 len(arr) * buffer.itemsize)
2145 self.assertEqual(list(buffer), expected)
2146
2147 buffer = array.array('i', [0]*10)
2148 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2149 self.assertEqual(conn.send_bytes(arr), None)
2150 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2151 len(arr) * buffer.itemsize)
2152 self.assertEqual(list(buffer), expected)
2153
2154 buffer = bytearray(latin(' ' * 40))
2155 self.assertEqual(conn.send_bytes(longmsg), None)
2156 try:
2157 res = conn.recv_bytes_into(buffer)
2158 except multiprocessing.BufferTooShort as e:
2159 self.assertEqual(e.args, (longmsg,))
2160 else:
2161 self.fail('expected BufferTooShort, got %s' % res)
2162
2163 poll = TimingWrapper(conn.poll)
2164
2165 self.assertEqual(poll(), False)
2166 self.assertTimingAlmostEqual(poll.elapsed, 0)
2167
Richard Oudkerk59d54042012-05-10 16:11:12 +01002168 self.assertEqual(poll(-1), False)
2169 self.assertTimingAlmostEqual(poll.elapsed, 0)
2170
Benjamin Petersone711caf2008-06-11 16:44:04 +00002171 self.assertEqual(poll(TIMEOUT1), False)
2172 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2173
2174 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002175 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002176
2177 self.assertEqual(poll(TIMEOUT1), True)
2178 self.assertTimingAlmostEqual(poll.elapsed, 0)
2179
2180 self.assertEqual(conn.recv(), None)
2181
2182 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2183 conn.send_bytes(really_big_msg)
2184 self.assertEqual(conn.recv_bytes(), really_big_msg)
2185
2186 conn.send_bytes(SENTINEL) # tell child to quit
2187 child_conn.close()
2188
2189 if self.TYPE == 'processes':
2190 self.assertEqual(conn.readable, True)
2191 self.assertEqual(conn.writable, True)
2192 self.assertRaises(EOFError, conn.recv)
2193 self.assertRaises(EOFError, conn.recv_bytes)
2194
2195 p.join()
2196
2197 def test_duplex_false(self):
2198 reader, writer = self.Pipe(duplex=False)
2199 self.assertEqual(writer.send(1), None)
2200 self.assertEqual(reader.recv(), 1)
2201 if self.TYPE == 'processes':
2202 self.assertEqual(reader.readable, True)
2203 self.assertEqual(reader.writable, False)
2204 self.assertEqual(writer.readable, False)
2205 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002206 self.assertRaises(OSError, reader.send, 2)
2207 self.assertRaises(OSError, writer.recv)
2208 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002209
2210 def test_spawn_close(self):
2211 # We test that a pipe connection can be closed by parent
2212 # process immediately after child is spawned. On Windows this
2213 # would have sometimes failed on old versions because
2214 # child_conn would be closed before the child got a chance to
2215 # duplicate it.
2216 conn, child_conn = self.Pipe()
2217
2218 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002219 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002220 p.start()
2221 child_conn.close() # this might complete before child initializes
2222
2223 msg = latin('hello')
2224 conn.send_bytes(msg)
2225 self.assertEqual(conn.recv_bytes(), msg)
2226
2227 conn.send_bytes(SENTINEL)
2228 conn.close()
2229 p.join()
2230
2231 def test_sendbytes(self):
2232 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002233 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002234
2235 msg = latin('abcdefghijklmnopqrstuvwxyz')
2236 a, b = self.Pipe()
2237
2238 a.send_bytes(msg)
2239 self.assertEqual(b.recv_bytes(), msg)
2240
2241 a.send_bytes(msg, 5)
2242 self.assertEqual(b.recv_bytes(), msg[5:])
2243
2244 a.send_bytes(msg, 7, 8)
2245 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2246
2247 a.send_bytes(msg, 26)
2248 self.assertEqual(b.recv_bytes(), latin(''))
2249
2250 a.send_bytes(msg, 26, 0)
2251 self.assertEqual(b.recv_bytes(), latin(''))
2252
2253 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2254
2255 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2256
2257 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2258
2259 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2260
2261 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2262
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002263 @classmethod
2264 def _is_fd_assigned(cls, fd):
2265 try:
2266 os.fstat(fd)
2267 except OSError as e:
2268 if e.errno == errno.EBADF:
2269 return False
2270 raise
2271 else:
2272 return True
2273
2274 @classmethod
2275 def _writefd(cls, conn, data, create_dummy_fds=False):
2276 if create_dummy_fds:
2277 for i in range(0, 256):
2278 if not cls._is_fd_assigned(i):
2279 os.dup2(conn.fileno(), i)
2280 fd = reduction.recv_handle(conn)
2281 if msvcrt:
2282 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2283 os.write(fd, data)
2284 os.close(fd)
2285
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002286 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002287 def test_fd_transfer(self):
2288 if self.TYPE != 'processes':
2289 self.skipTest("only makes sense with processes")
2290 conn, child_conn = self.Pipe(duplex=True)
2291
2292 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002293 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002294 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002295 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002296 with open(test.support.TESTFN, "wb") as f:
2297 fd = f.fileno()
2298 if msvcrt:
2299 fd = msvcrt.get_osfhandle(fd)
2300 reduction.send_handle(conn, fd, p.pid)
2301 p.join()
2302 with open(test.support.TESTFN, "rb") as f:
2303 self.assertEqual(f.read(), b"foo")
2304
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002305 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002306 @unittest.skipIf(sys.platform == "win32",
2307 "test semantics don't make sense on Windows")
2308 @unittest.skipIf(MAXFD <= 256,
2309 "largest assignable fd number is too small")
2310 @unittest.skipUnless(hasattr(os, "dup2"),
2311 "test needs os.dup2()")
2312 def test_large_fd_transfer(self):
2313 # With fd > 256 (issue #11657)
2314 if self.TYPE != 'processes':
2315 self.skipTest("only makes sense with processes")
2316 conn, child_conn = self.Pipe(duplex=True)
2317
2318 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002319 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002320 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002321 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002322 with open(test.support.TESTFN, "wb") as f:
2323 fd = f.fileno()
2324 for newfd in range(256, MAXFD):
2325 if not self._is_fd_assigned(newfd):
2326 break
2327 else:
2328 self.fail("could not find an unassigned large file descriptor")
2329 os.dup2(fd, newfd)
2330 try:
2331 reduction.send_handle(conn, newfd, p.pid)
2332 finally:
2333 os.close(newfd)
2334 p.join()
2335 with open(test.support.TESTFN, "rb") as f:
2336 self.assertEqual(f.read(), b"bar")
2337
Jesus Cea4507e642011-09-21 03:53:25 +02002338 @classmethod
2339 def _send_data_without_fd(self, conn):
2340 os.write(conn.fileno(), b"\0")
2341
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002342 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002343 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2344 def test_missing_fd_transfer(self):
2345 # Check that exception is raised when received data is not
2346 # accompanied by a file descriptor in ancillary data.
2347 if self.TYPE != 'processes':
2348 self.skipTest("only makes sense with processes")
2349 conn, child_conn = self.Pipe(duplex=True)
2350
2351 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2352 p.daemon = True
2353 p.start()
2354 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2355 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002356
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002357 def test_context(self):
2358 a, b = self.Pipe()
2359
2360 with a, b:
2361 a.send(1729)
2362 self.assertEqual(b.recv(), 1729)
2363 if self.TYPE == 'processes':
2364 self.assertFalse(a.closed)
2365 self.assertFalse(b.closed)
2366
2367 if self.TYPE == 'processes':
2368 self.assertTrue(a.closed)
2369 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002370 self.assertRaises(OSError, a.recv)
2371 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002372
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002373class _TestListener(BaseTestCase):
2374
Richard Oudkerk91257752012-06-15 21:53:34 +01002375 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002376
2377 def test_multiple_bind(self):
2378 for family in self.connection.families:
2379 l = self.connection.Listener(family=family)
2380 self.addCleanup(l.close)
2381 self.assertRaises(OSError, self.connection.Listener,
2382 l.address, family)
2383
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002384 def test_context(self):
2385 with self.connection.Listener() as l:
2386 with self.connection.Client(l.address) as c:
2387 with l.accept() as d:
2388 c.send(1729)
2389 self.assertEqual(d.recv(), 1729)
2390
2391 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002392 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002393
Benjamin Petersone711caf2008-06-11 16:44:04 +00002394class _TestListenerClient(BaseTestCase):
2395
2396 ALLOWED_TYPES = ('processes', 'threads')
2397
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002398 @classmethod
2399 def _test(cls, address):
2400 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002401 conn.send('hello')
2402 conn.close()
2403
2404 def test_listener_client(self):
2405 for family in self.connection.families:
2406 l = self.connection.Listener(family=family)
2407 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002408 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002409 p.start()
2410 conn = l.accept()
2411 self.assertEqual(conn.recv(), 'hello')
2412 p.join()
2413 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002414
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002415 def test_issue14725(self):
2416 l = self.connection.Listener()
2417 p = self.Process(target=self._test, args=(l.address,))
2418 p.daemon = True
2419 p.start()
2420 time.sleep(1)
2421 # On Windows the client process should by now have connected,
2422 # written data and closed the pipe handle by now. This causes
2423 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2424 # 14725.
2425 conn = l.accept()
2426 self.assertEqual(conn.recv(), 'hello')
2427 conn.close()
2428 p.join()
2429 l.close()
2430
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002431 def test_issue16955(self):
2432 for fam in self.connection.families:
2433 l = self.connection.Listener(family=fam)
2434 c = self.connection.Client(l.address)
2435 a = l.accept()
2436 a.send_bytes(b"hello")
2437 self.assertTrue(c.poll(1))
2438 a.close()
2439 c.close()
2440 l.close()
2441
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002442class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002443
2444 ALLOWED_TYPES = ('processes', 'threads')
2445
2446 def test_empty_string(self):
2447 a, b = self.Pipe()
2448 self.assertEqual(a.poll(), False)
2449 b.send_bytes(b'')
2450 self.assertEqual(a.poll(), True)
2451 self.assertEqual(a.poll(), True)
2452
2453 @classmethod
2454 def _child_strings(cls, conn, strings):
2455 for s in strings:
2456 time.sleep(0.1)
2457 conn.send_bytes(s)
2458 conn.close()
2459
2460 def test_strings(self):
2461 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2462 a, b = self.Pipe()
2463 p = self.Process(target=self._child_strings, args=(b, strings))
2464 p.start()
2465
2466 for s in strings:
2467 for i in range(200):
2468 if a.poll(0.01):
2469 break
2470 x = a.recv_bytes()
2471 self.assertEqual(s, x)
2472
2473 p.join()
2474
2475 @classmethod
2476 def _child_boundaries(cls, r):
2477 # Polling may "pull" a message in to the child process, but we
2478 # don't want it to pull only part of a message, as that would
2479 # corrupt the pipe for any other processes which might later
2480 # read from it.
2481 r.poll(5)
2482
2483 def test_boundaries(self):
2484 r, w = self.Pipe(False)
2485 p = self.Process(target=self._child_boundaries, args=(r,))
2486 p.start()
2487 time.sleep(2)
2488 L = [b"first", b"second"]
2489 for obj in L:
2490 w.send_bytes(obj)
2491 w.close()
2492 p.join()
2493 self.assertIn(r.recv_bytes(), L)
2494
2495 @classmethod
2496 def _child_dont_merge(cls, b):
2497 b.send_bytes(b'a')
2498 b.send_bytes(b'b')
2499 b.send_bytes(b'cd')
2500
2501 def test_dont_merge(self):
2502 a, b = self.Pipe()
2503 self.assertEqual(a.poll(0.0), False)
2504 self.assertEqual(a.poll(0.1), False)
2505
2506 p = self.Process(target=self._child_dont_merge, args=(b,))
2507 p.start()
2508
2509 self.assertEqual(a.recv_bytes(), b'a')
2510 self.assertEqual(a.poll(1.0), True)
2511 self.assertEqual(a.poll(1.0), True)
2512 self.assertEqual(a.recv_bytes(), b'b')
2513 self.assertEqual(a.poll(1.0), True)
2514 self.assertEqual(a.poll(1.0), True)
2515 self.assertEqual(a.poll(0.0), True)
2516 self.assertEqual(a.recv_bytes(), b'cd')
2517
2518 p.join()
2519
Benjamin Petersone711caf2008-06-11 16:44:04 +00002520#
2521# Test of sending connection and socket objects between processes
2522#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002523
2524@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002525class _TestPicklingConnections(BaseTestCase):
2526
2527 ALLOWED_TYPES = ('processes',)
2528
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002529 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002530 def tearDownClass(cls):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002531 from multiprocessing import resource_sharer
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002532 resource_sharer.stop(timeout=5)
2533
2534 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002535 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002536 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002537 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002538 conn.send(l.address)
2539 new_conn = l.accept()
2540 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002541 new_conn.close()
2542 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002543
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002544 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002545 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01002546 l.listen()
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002547 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002548 new_conn, addr = l.accept()
2549 conn.send(new_conn)
2550 new_conn.close()
2551 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002552
2553 conn.recv()
2554
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002555 @classmethod
2556 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002557 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002558 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002559 client.send(msg.upper())
2560 client.close()
2561
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002562 address, msg = conn.recv()
2563 client = socket.socket()
2564 client.connect(address)
2565 client.sendall(msg.upper())
2566 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002567
2568 conn.close()
2569
2570 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002571 families = self.connection.families
2572
2573 lconn, lconn0 = self.Pipe()
2574 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002575 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002576 lp.start()
2577 lconn0.close()
2578
2579 rconn, rconn0 = self.Pipe()
2580 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002581 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002582 rp.start()
2583 rconn0.close()
2584
2585 for fam in families:
2586 msg = ('This connection uses family %s' % fam).encode('ascii')
2587 address = lconn.recv()
2588 rconn.send((address, msg))
2589 new_conn = lconn.recv()
2590 self.assertEqual(new_conn.recv(), msg.upper())
2591
2592 rconn.send(None)
2593
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002594 msg = latin('This connection uses a normal socket')
2595 address = lconn.recv()
2596 rconn.send((address, msg))
2597 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002598 buf = []
2599 while True:
2600 s = new_conn.recv(100)
2601 if not s:
2602 break
2603 buf.append(s)
2604 buf = b''.join(buf)
2605 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002606 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002607
2608 lconn.send(None)
2609
2610 rconn.close()
2611 lconn.close()
2612
2613 lp.join()
2614 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002615
2616 @classmethod
2617 def child_access(cls, conn):
2618 w = conn.recv()
2619 w.send('all is well')
2620 w.close()
2621
2622 r = conn.recv()
2623 msg = r.recv()
2624 conn.send(msg*2)
2625
2626 conn.close()
2627
2628 def test_access(self):
2629 # On Windows, if we do not specify a destination pid when
2630 # using DupHandle then we need to be careful to use the
2631 # correct access flags for DuplicateHandle(), or else
2632 # DupHandle.detach() will raise PermissionError. For example,
2633 # for a read only pipe handle we should use
2634 # access=FILE_GENERIC_READ. (Unfortunately
2635 # DUPLICATE_SAME_ACCESS does not work.)
2636 conn, child_conn = self.Pipe()
2637 p = self.Process(target=self.child_access, args=(child_conn,))
2638 p.daemon = True
2639 p.start()
2640 child_conn.close()
2641
2642 r, w = self.Pipe(duplex=False)
2643 conn.send(w)
2644 w.close()
2645 self.assertEqual(r.recv(), 'all is well')
2646 r.close()
2647
2648 r, w = self.Pipe(duplex=False)
2649 conn.send(r)
2650 r.close()
2651 w.send('foobar')
2652 w.close()
2653 self.assertEqual(conn.recv(), 'foobar'*2)
2654
Benjamin Petersone711caf2008-06-11 16:44:04 +00002655#
2656#
2657#
2658
2659class _TestHeap(BaseTestCase):
2660
2661 ALLOWED_TYPES = ('processes',)
2662
2663 def test_heap(self):
2664 iterations = 5000
2665 maxblocks = 50
2666 blocks = []
2667
2668 # create and destroy lots of blocks of different sizes
2669 for i in range(iterations):
2670 size = int(random.lognormvariate(0, 1) * 1000)
2671 b = multiprocessing.heap.BufferWrapper(size)
2672 blocks.append(b)
2673 if len(blocks) > maxblocks:
2674 i = random.randrange(maxblocks)
2675 del blocks[i]
2676
2677 # get the heap object
2678 heap = multiprocessing.heap.BufferWrapper._heap
2679
2680 # verify the state of the heap
2681 all = []
2682 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002683 heap._lock.acquire()
2684 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002685 for L in list(heap._len_to_seq.values()):
2686 for arena, start, stop in L:
2687 all.append((heap._arenas.index(arena), start, stop,
2688 stop-start, 'free'))
2689 for arena, start, stop in heap._allocated_blocks:
2690 all.append((heap._arenas.index(arena), start, stop,
2691 stop-start, 'occupied'))
2692 occupied += (stop-start)
2693
2694 all.sort()
2695
2696 for i in range(len(all)-1):
2697 (arena, start, stop) = all[i][:3]
2698 (narena, nstart, nstop) = all[i+1][:3]
2699 self.assertTrue((arena != narena and nstart == 0) or
2700 (stop == nstart))
2701
Charles-François Natali778db492011-07-02 14:35:49 +02002702 def test_free_from_gc(self):
2703 # Check that freeing of blocks by the garbage collector doesn't deadlock
2704 # (issue #12352).
2705 # Make sure the GC is enabled, and set lower collection thresholds to
2706 # make collections more frequent (and increase the probability of
2707 # deadlock).
2708 if not gc.isenabled():
2709 gc.enable()
2710 self.addCleanup(gc.disable)
2711 thresholds = gc.get_threshold()
2712 self.addCleanup(gc.set_threshold, *thresholds)
2713 gc.set_threshold(10)
2714
2715 # perform numerous block allocations, with cyclic references to make
2716 # sure objects are collected asynchronously by the gc
2717 for i in range(5000):
2718 a = multiprocessing.heap.BufferWrapper(1)
2719 b = multiprocessing.heap.BufferWrapper(1)
2720 # circular references
2721 a.buddy = b
2722 b.buddy = a
2723
Benjamin Petersone711caf2008-06-11 16:44:04 +00002724#
2725#
2726#
2727
Benjamin Petersone711caf2008-06-11 16:44:04 +00002728class _Foo(Structure):
2729 _fields_ = [
2730 ('x', c_int),
2731 ('y', c_double)
2732 ]
2733
2734class _TestSharedCTypes(BaseTestCase):
2735
2736 ALLOWED_TYPES = ('processes',)
2737
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002738 def setUp(self):
2739 if not HAS_SHAREDCTYPES:
2740 self.skipTest("requires multiprocessing.sharedctypes")
2741
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002742 @classmethod
2743 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002744 x.value *= 2
2745 y.value *= 2
2746 foo.x *= 2
2747 foo.y *= 2
2748 string.value *= 2
2749 for i in range(len(arr)):
2750 arr[i] *= 2
2751
2752 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002753 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002754 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002755 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002756 arr = self.Array('d', list(range(10)), lock=lock)
2757 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002758 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002759
2760 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002761 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002762 p.start()
2763 p.join()
2764
2765 self.assertEqual(x.value, 14)
2766 self.assertAlmostEqual(y.value, 2.0/3.0)
2767 self.assertEqual(foo.x, 6)
2768 self.assertAlmostEqual(foo.y, 4.0)
2769 for i in range(10):
2770 self.assertAlmostEqual(arr[i], i*2)
2771 self.assertEqual(string.value, latin('hellohello'))
2772
2773 def test_synchronize(self):
2774 self.test_sharedctypes(lock=True)
2775
2776 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002777 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002778 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002779 foo.x = 0
2780 foo.y = 0
2781 self.assertEqual(bar.x, 2)
2782 self.assertAlmostEqual(bar.y, 5.0)
2783
2784#
2785#
2786#
2787
2788class _TestFinalize(BaseTestCase):
2789
2790 ALLOWED_TYPES = ('processes',)
2791
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002792 @classmethod
2793 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002794 class Foo(object):
2795 pass
2796
2797 a = Foo()
2798 util.Finalize(a, conn.send, args=('a',))
2799 del a # triggers callback for a
2800
2801 b = Foo()
2802 close_b = util.Finalize(b, conn.send, args=('b',))
2803 close_b() # triggers callback for b
2804 close_b() # does nothing because callback has already been called
2805 del b # does nothing because callback has already been called
2806
2807 c = Foo()
2808 util.Finalize(c, conn.send, args=('c',))
2809
2810 d10 = Foo()
2811 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2812
2813 d01 = Foo()
2814 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2815 d02 = Foo()
2816 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2817 d03 = Foo()
2818 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2819
2820 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2821
2822 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2823
Ezio Melotti13925002011-03-16 11:05:33 +02002824 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002825 # garbage collecting locals
2826 util._exit_function()
2827 conn.close()
2828 os._exit(0)
2829
2830 def test_finalize(self):
2831 conn, child_conn = self.Pipe()
2832
2833 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002834 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002835 p.start()
2836 p.join()
2837
2838 result = [obj for obj in iter(conn.recv, 'STOP')]
2839 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2840
2841#
2842# Test that from ... import * works for each module
2843#
2844
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002845class _TestImportStar(unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002846
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002847 def get_module_names(self):
2848 import glob
2849 folder = os.path.dirname(multiprocessing.__file__)
2850 pattern = os.path.join(folder, '*.py')
2851 files = glob.glob(pattern)
2852 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
2853 modules = ['multiprocessing.' + m for m in modules]
2854 modules.remove('multiprocessing.__init__')
2855 modules.append('multiprocessing')
2856 return modules
Benjamin Petersone711caf2008-06-11 16:44:04 +00002857
2858 def test_import(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002859 modules = self.get_module_names()
2860 if sys.platform == 'win32':
2861 modules.remove('multiprocessing.popen_fork')
2862 modules.remove('multiprocessing.popen_forkserver')
2863 modules.remove('multiprocessing.popen_spawn_posix')
2864 else:
2865 modules.remove('multiprocessing.popen_spawn_win32')
2866 if not HAS_REDUCTION:
2867 modules.remove('multiprocessing.popen_forkserver')
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002868
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002869 if c_int is None:
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002870 # This module requires _ctypes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002871 modules.remove('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002872
2873 for name in modules:
2874 __import__(name)
2875 mod = sys.modules[name]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002876 self.assertTrue(hasattr(mod, '__all__'), name)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002877
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002878 for attr in mod.__all__:
Benjamin Petersone711caf2008-06-11 16:44:04 +00002879 self.assertTrue(
2880 hasattr(mod, attr),
2881 '%r does not have attribute %r' % (mod, attr)
2882 )
2883
2884#
2885# Quick test that logging works -- does not test logging output
2886#
2887
2888class _TestLogging(BaseTestCase):
2889
2890 ALLOWED_TYPES = ('processes',)
2891
2892 def test_enable_logging(self):
2893 logger = multiprocessing.get_logger()
2894 logger.setLevel(util.SUBWARNING)
2895 self.assertTrue(logger is not None)
2896 logger.debug('this will not be printed')
2897 logger.info('nor will this')
2898 logger.setLevel(LOG_LEVEL)
2899
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002900 @classmethod
2901 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002902 logger = multiprocessing.get_logger()
2903 conn.send(logger.getEffectiveLevel())
2904
2905 def test_level(self):
2906 LEVEL1 = 32
2907 LEVEL2 = 37
2908
2909 logger = multiprocessing.get_logger()
2910 root_logger = logging.getLogger()
2911 root_level = root_logger.level
2912
2913 reader, writer = multiprocessing.Pipe(duplex=False)
2914
2915 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002916 p = self.Process(target=self._test_level, args=(writer,))
2917 p.daemon = True
2918 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002919 self.assertEqual(LEVEL1, reader.recv())
2920
2921 logger.setLevel(logging.NOTSET)
2922 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002923 p = self.Process(target=self._test_level, args=(writer,))
2924 p.daemon = True
2925 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002926 self.assertEqual(LEVEL2, reader.recv())
2927
2928 root_logger.setLevel(root_level)
2929 logger.setLevel(level=LOG_LEVEL)
2930
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002931
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002932# class _TestLoggingProcessName(BaseTestCase):
2933#
2934# def handle(self, record):
2935# assert record.processName == multiprocessing.current_process().name
2936# self.__handled = True
2937#
2938# def test_logging(self):
2939# handler = logging.Handler()
2940# handler.handle = self.handle
2941# self.__handled = False
2942# # Bypass getLogger() and side-effects
2943# logger = logging.getLoggerClass()(
2944# 'multiprocessing.test.TestLoggingProcessName')
2945# logger.addHandler(handler)
2946# logger.propagate = False
2947#
2948# logger.warn('foo')
2949# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002950
Benjamin Petersone711caf2008-06-11 16:44:04 +00002951#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002952# Check that Process.join() retries if os.waitpid() fails with EINTR
2953#
2954
2955class _TestPollEintr(BaseTestCase):
2956
2957 ALLOWED_TYPES = ('processes',)
2958
2959 @classmethod
2960 def _killer(cls, pid):
Richard Oudkerk6a53af82013-08-28 13:50:19 +01002961 time.sleep(0.1)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002962 os.kill(pid, signal.SIGUSR1)
2963
2964 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2965 def test_poll_eintr(self):
2966 got_signal = [False]
2967 def record(*args):
2968 got_signal[0] = True
2969 pid = os.getpid()
2970 oldhandler = signal.signal(signal.SIGUSR1, record)
2971 try:
2972 killer = self.Process(target=self._killer, args=(pid,))
2973 killer.start()
Richard Oudkerk6a53af82013-08-28 13:50:19 +01002974 try:
2975 p = self.Process(target=time.sleep, args=(2,))
2976 p.start()
2977 p.join()
2978 finally:
2979 killer.join()
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002980 self.assertTrue(got_signal[0])
2981 self.assertEqual(p.exitcode, 0)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002982 finally:
2983 signal.signal(signal.SIGUSR1, oldhandler)
2984
2985#
Jesse Noller6214edd2009-01-19 16:23:53 +00002986# Test to verify handle verification, see issue 3321
2987#
2988
2989class TestInvalidHandle(unittest.TestCase):
2990
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002991 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002992 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002993 conn = multiprocessing.connection.Connection(44977608)
Charles-François Natali6703bb42013-09-06 21:12:22 +02002994 # check that poll() doesn't crash
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002995 try:
Charles-François Natali6703bb42013-09-06 21:12:22 +02002996 conn.poll()
2997 except (ValueError, OSError):
2998 pass
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002999 finally:
3000 # Hack private attribute _handle to avoid printing an error
3001 # in conn.__del__
3002 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003003 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003004 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003005
Benjamin Petersone711caf2008-06-11 16:44:04 +00003006
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003007
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003008class OtherTest(unittest.TestCase):
3009 # TODO: add more tests for deliver/answer challenge.
3010 def test_deliver_challenge_auth_failure(self):
3011 class _FakeConnection(object):
3012 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003013 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003014 def send_bytes(self, data):
3015 pass
3016 self.assertRaises(multiprocessing.AuthenticationError,
3017 multiprocessing.connection.deliver_challenge,
3018 _FakeConnection(), b'abc')
3019
3020 def test_answer_challenge_auth_failure(self):
3021 class _FakeConnection(object):
3022 def __init__(self):
3023 self.count = 0
3024 def recv_bytes(self, size):
3025 self.count += 1
3026 if self.count == 1:
3027 return multiprocessing.connection.CHALLENGE
3028 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003029 return b'something bogus'
3030 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003031 def send_bytes(self, data):
3032 pass
3033 self.assertRaises(multiprocessing.AuthenticationError,
3034 multiprocessing.connection.answer_challenge,
3035 _FakeConnection(), b'abc')
3036
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003037#
3038# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3039#
3040
3041def initializer(ns):
3042 ns.test += 1
3043
3044class TestInitializers(unittest.TestCase):
3045 def setUp(self):
3046 self.mgr = multiprocessing.Manager()
3047 self.ns = self.mgr.Namespace()
3048 self.ns.test = 0
3049
3050 def tearDown(self):
3051 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003052 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003053
3054 def test_manager_initializer(self):
3055 m = multiprocessing.managers.SyncManager()
3056 self.assertRaises(TypeError, m.start, 1)
3057 m.start(initializer, (self.ns,))
3058 self.assertEqual(self.ns.test, 1)
3059 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003060 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003061
3062 def test_pool_initializer(self):
3063 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3064 p = multiprocessing.Pool(1, initializer, (self.ns,))
3065 p.close()
3066 p.join()
3067 self.assertEqual(self.ns.test, 1)
3068
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003069#
3070# Issue 5155, 5313, 5331: Test process in processes
3071# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3072#
3073
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003074def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003075 try:
3076 item = q.get(block=False)
3077 except pyqueue.Empty:
3078 pass
3079
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003080def _test_process(q):
3081 queue = multiprocessing.Queue()
3082 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3083 subProc.daemon = True
3084 subProc.start()
3085 subProc.join()
3086
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003087def _afunc(x):
3088 return x*x
3089
3090def pool_in_process():
3091 pool = multiprocessing.Pool(processes=4)
3092 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003093 pool.close()
3094 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003095
3096class _file_like(object):
3097 def __init__(self, delegate):
3098 self._delegate = delegate
3099 self._pid = None
3100
3101 @property
3102 def cache(self):
3103 pid = os.getpid()
3104 # There are no race conditions since fork keeps only the running thread
3105 if pid != self._pid:
3106 self._pid = pid
3107 self._cache = []
3108 return self._cache
3109
3110 def write(self, data):
3111 self.cache.append(data)
3112
3113 def flush(self):
3114 self._delegate.write(''.join(self.cache))
3115 self._cache = []
3116
3117class TestStdinBadfiledescriptor(unittest.TestCase):
3118
3119 def test_queue_in_process(self):
3120 queue = multiprocessing.Queue()
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003121 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003122 proc.start()
3123 proc.join()
3124
3125 def test_pool_in_process(self):
3126 p = multiprocessing.Process(target=pool_in_process)
3127 p.start()
3128 p.join()
3129
3130 def test_flushing(self):
3131 sio = io.StringIO()
3132 flike = _file_like(sio)
3133 flike.write('foo')
3134 proc = multiprocessing.Process(target=lambda: flike.flush())
3135 flike.flush()
3136 assert sio.getvalue() == 'foo'
3137
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003138
3139class TestWait(unittest.TestCase):
3140
3141 @classmethod
3142 def _child_test_wait(cls, w, slow):
3143 for i in range(10):
3144 if slow:
3145 time.sleep(random.random()*0.1)
3146 w.send((i, os.getpid()))
3147 w.close()
3148
3149 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003150 from multiprocessing.connection import wait
3151 readers = []
3152 procs = []
3153 messages = []
3154
3155 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003156 r, w = multiprocessing.Pipe(duplex=False)
3157 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003158 p.daemon = True
3159 p.start()
3160 w.close()
3161 readers.append(r)
3162 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003163 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003164
3165 while readers:
3166 for r in wait(readers):
3167 try:
3168 msg = r.recv()
3169 except EOFError:
3170 readers.remove(r)
3171 r.close()
3172 else:
3173 messages.append(msg)
3174
3175 messages.sort()
3176 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3177 self.assertEqual(messages, expected)
3178
3179 @classmethod
3180 def _child_test_wait_socket(cls, address, slow):
3181 s = socket.socket()
3182 s.connect(address)
3183 for i in range(10):
3184 if slow:
3185 time.sleep(random.random()*0.1)
3186 s.sendall(('%s\n' % i).encode('ascii'))
3187 s.close()
3188
3189 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003190 from multiprocessing.connection import wait
3191 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003192 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01003193 l.listen()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003194 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003195 readers = []
3196 procs = []
3197 dic = {}
3198
3199 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003200 p = multiprocessing.Process(target=self._child_test_wait_socket,
3201 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003202 p.daemon = True
3203 p.start()
3204 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003205 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003206
3207 for i in range(4):
3208 r, _ = l.accept()
3209 readers.append(r)
3210 dic[r] = []
3211 l.close()
3212
3213 while readers:
3214 for r in wait(readers):
3215 msg = r.recv(32)
3216 if not msg:
3217 readers.remove(r)
3218 r.close()
3219 else:
3220 dic[r].append(msg)
3221
3222 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3223 for v in dic.values():
3224 self.assertEqual(b''.join(v), expected)
3225
3226 def test_wait_slow(self):
3227 self.test_wait(True)
3228
3229 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003230 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003231
3232 def test_wait_timeout(self):
3233 from multiprocessing.connection import wait
3234
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003235 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003236 a, b = multiprocessing.Pipe()
3237
3238 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003239 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003240 delta = time.time() - start
3241
3242 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003243 self.assertLess(delta, expected * 2)
3244 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003245
3246 b.send(None)
3247
3248 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003249 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003250 delta = time.time() - start
3251
3252 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003253 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003254
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003255 @classmethod
3256 def signal_and_sleep(cls, sem, period):
3257 sem.release()
3258 time.sleep(period)
3259
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003260 def test_wait_integer(self):
3261 from multiprocessing.connection import wait
3262
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003263 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01003264 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003265 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003266 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003267 p = multiprocessing.Process(target=self.signal_and_sleep,
3268 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003269
3270 p.start()
3271 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003272 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003273
3274 start = time.time()
3275 res = wait([a, p.sentinel, b], expected + 20)
3276 delta = time.time() - start
3277
3278 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003279 self.assertLess(delta, expected + 2)
3280 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003281
3282 a.send(None)
3283
3284 start = time.time()
3285 res = wait([a, p.sentinel, b], 20)
3286 delta = time.time() - start
3287
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003288 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003289 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003290
3291 b.send(None)
3292
3293 start = time.time()
3294 res = wait([a, p.sentinel, b], 20)
3295 delta = time.time() - start
3296
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003297 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003298 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003299
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003300 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003301 p.join()
3302
Richard Oudkerk59d54042012-05-10 16:11:12 +01003303 def test_neg_timeout(self):
3304 from multiprocessing.connection import wait
3305 a, b = multiprocessing.Pipe()
3306 t = time.time()
3307 res = wait([a], timeout=-1)
3308 t = time.time() - t
3309 self.assertEqual(res, [])
3310 self.assertLess(t, 1)
3311 a.close()
3312 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003313
Antoine Pitrou709176f2012-04-01 17:19:09 +02003314#
3315# Issue 14151: Test invalid family on invalid environment
3316#
3317
3318class TestInvalidFamily(unittest.TestCase):
3319
3320 @unittest.skipIf(WIN32, "skipped on Windows")
3321 def test_invalid_family(self):
3322 with self.assertRaises(ValueError):
3323 multiprocessing.connection.Listener(r'\\.\test')
3324
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003325 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3326 def test_invalid_family_win32(self):
3327 with self.assertRaises(ValueError):
3328 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003329
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003330#
3331# Issue 12098: check sys.flags of child matches that for parent
3332#
3333
3334class TestFlags(unittest.TestCase):
3335 @classmethod
3336 def run_in_grandchild(cls, conn):
3337 conn.send(tuple(sys.flags))
3338
3339 @classmethod
3340 def run_in_child(cls):
3341 import json
3342 r, w = multiprocessing.Pipe(duplex=False)
3343 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3344 p.start()
3345 grandchild_flags = r.recv()
3346 p.join()
3347 r.close()
3348 w.close()
3349 flags = (tuple(sys.flags), grandchild_flags)
3350 print(json.dumps(flags))
3351
3352 def test_flags(self):
3353 import json, subprocess
3354 # start child process using unusual flags
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003355 prog = ('from test._test_multiprocessing import TestFlags; ' +
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003356 'TestFlags.run_in_child()')
3357 data = subprocess.check_output(
3358 [sys.executable, '-E', '-S', '-O', '-c', prog])
3359 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3360 self.assertEqual(child_flags, grandchild_flags)
3361
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003362#
3363# Test interaction with socket timeouts - see Issue #6056
3364#
3365
3366class TestTimeouts(unittest.TestCase):
3367 @classmethod
3368 def _test_timeout(cls, child, address):
3369 time.sleep(1)
3370 child.send(123)
3371 child.close()
3372 conn = multiprocessing.connection.Client(address)
3373 conn.send(456)
3374 conn.close()
3375
3376 def test_timeout(self):
3377 old_timeout = socket.getdefaulttimeout()
3378 try:
3379 socket.setdefaulttimeout(0.1)
3380 parent, child = multiprocessing.Pipe(duplex=True)
3381 l = multiprocessing.connection.Listener(family='AF_INET')
3382 p = multiprocessing.Process(target=self._test_timeout,
3383 args=(child, l.address))
3384 p.start()
3385 child.close()
3386 self.assertEqual(parent.recv(), 123)
3387 parent.close()
3388 conn = l.accept()
3389 self.assertEqual(conn.recv(), 456)
3390 conn.close()
3391 l.close()
3392 p.join(10)
3393 finally:
3394 socket.setdefaulttimeout(old_timeout)
3395
Richard Oudkerke88a2442012-08-14 11:41:32 +01003396#
3397# Test what happens with no "if __name__ == '__main__'"
3398#
3399
3400class TestNoForkBomb(unittest.TestCase):
3401 def test_noforkbomb(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003402 sm = multiprocessing.get_start_method()
Richard Oudkerke88a2442012-08-14 11:41:32 +01003403 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003404 if sm != 'fork':
3405 rc, out, err = test.script_helper.assert_python_failure(name, sm)
Richard Oudkerke88a2442012-08-14 11:41:32 +01003406 self.assertEqual('', out.decode('ascii'))
3407 self.assertIn('RuntimeError', err.decode('ascii'))
3408 else:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003409 rc, out, err = test.script_helper.assert_python_ok(name, sm)
Richard Oudkerke88a2442012-08-14 11:41:32 +01003410 self.assertEqual('123', out.decode('ascii').rstrip())
3411 self.assertEqual('', err.decode('ascii'))
3412
3413#
Richard Oudkerk409c3132013-04-17 20:58:00 +01003414# Issue #17555: ForkAwareThreadLock
3415#
3416
3417class TestForkAwareThreadLock(unittest.TestCase):
3418 # We recurisvely start processes. Issue #17555 meant that the
3419 # after fork registry would get duplicate entries for the same
3420 # lock. The size of the registry at generation n was ~2**n.
3421
3422 @classmethod
3423 def child(cls, n, conn):
3424 if n > 1:
3425 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3426 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003427 conn.close()
3428 p.join(timeout=5)
Richard Oudkerk409c3132013-04-17 20:58:00 +01003429 else:
3430 conn.send(len(util._afterfork_registry))
3431 conn.close()
3432
3433 def test_lock(self):
3434 r, w = multiprocessing.Pipe(False)
3435 l = util.ForkAwareThreadLock()
3436 old_size = len(util._afterfork_registry)
3437 p = multiprocessing.Process(target=self.child, args=(5, w))
3438 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003439 w.close()
Richard Oudkerk409c3132013-04-17 20:58:00 +01003440 new_size = r.recv()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003441 p.join(timeout=5)
Richard Oudkerk409c3132013-04-17 20:58:00 +01003442 self.assertLessEqual(new_size, old_size)
3443
3444#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003445# Check that non-forked child processes do not inherit unneeded fds/handles
3446#
3447
3448class TestCloseFds(unittest.TestCase):
3449
3450 def get_high_socket_fd(self):
3451 if WIN32:
3452 # The child process will not have any socket handles, so
3453 # calling socket.fromfd() should produce WSAENOTSOCK even
3454 # if there is a handle of the same number.
3455 return socket.socket().detach()
3456 else:
3457 # We want to produce a socket with an fd high enough that a
3458 # freshly created child process will not have any fds as high.
3459 fd = socket.socket().detach()
3460 to_close = []
3461 while fd < 50:
3462 to_close.append(fd)
3463 fd = os.dup(fd)
3464 for x in to_close:
3465 os.close(x)
3466 return fd
3467
3468 def close(self, fd):
3469 if WIN32:
3470 socket.socket(fileno=fd).close()
3471 else:
3472 os.close(fd)
3473
3474 @classmethod
3475 def _test_closefds(cls, conn, fd):
3476 try:
3477 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
3478 except Exception as e:
3479 conn.send(e)
3480 else:
3481 s.close()
3482 conn.send(None)
3483
3484 def test_closefd(self):
3485 if not HAS_REDUCTION:
3486 raise unittest.SkipTest('requires fd pickling')
3487
3488 reader, writer = multiprocessing.Pipe()
3489 fd = self.get_high_socket_fd()
3490 try:
3491 p = multiprocessing.Process(target=self._test_closefds,
3492 args=(writer, fd))
3493 p.start()
3494 writer.close()
3495 e = reader.recv()
3496 p.join(timeout=5)
3497 finally:
3498 self.close(fd)
3499 writer.close()
3500 reader.close()
3501
3502 if multiprocessing.get_start_method() == 'fork':
3503 self.assertIs(e, None)
3504 else:
3505 WSAENOTSOCK = 10038
3506 self.assertIsInstance(e, OSError)
3507 self.assertTrue(e.errno == errno.EBADF or
3508 e.winerror == WSAENOTSOCK, e)
3509
3510#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003511# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3512#
3513
3514class TestIgnoreEINTR(unittest.TestCase):
3515
3516 @classmethod
3517 def _test_ignore(cls, conn):
3518 def handler(signum, frame):
3519 pass
3520 signal.signal(signal.SIGUSR1, handler)
3521 conn.send('ready')
3522 x = conn.recv()
3523 conn.send(x)
3524 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
3525
3526 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3527 def test_ignore(self):
3528 conn, child_conn = multiprocessing.Pipe()
3529 try:
3530 p = multiprocessing.Process(target=self._test_ignore,
3531 args=(child_conn,))
3532 p.daemon = True
3533 p.start()
3534 child_conn.close()
3535 self.assertEqual(conn.recv(), 'ready')
3536 time.sleep(0.1)
3537 os.kill(p.pid, signal.SIGUSR1)
3538 time.sleep(0.1)
3539 conn.send(1234)
3540 self.assertEqual(conn.recv(), 1234)
3541 time.sleep(0.1)
3542 os.kill(p.pid, signal.SIGUSR1)
3543 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3544 time.sleep(0.1)
3545 p.join()
3546 finally:
3547 conn.close()
3548
3549 @classmethod
3550 def _test_ignore_listener(cls, conn):
3551 def handler(signum, frame):
3552 pass
3553 signal.signal(signal.SIGUSR1, handler)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003554 with multiprocessing.connection.Listener() as l:
3555 conn.send(l.address)
3556 a = l.accept()
3557 a.send('welcome')
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003558
3559 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3560 def test_ignore_listener(self):
3561 conn, child_conn = multiprocessing.Pipe()
3562 try:
3563 p = multiprocessing.Process(target=self._test_ignore_listener,
3564 args=(child_conn,))
3565 p.daemon = True
3566 p.start()
3567 child_conn.close()
3568 address = conn.recv()
3569 time.sleep(0.1)
3570 os.kill(p.pid, signal.SIGUSR1)
3571 time.sleep(0.1)
3572 client = multiprocessing.connection.Client(address)
3573 self.assertEqual(client.recv(), 'welcome')
3574 p.join()
3575 finally:
3576 conn.close()
3577
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003578class TestStartMethod(unittest.TestCase):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003579 @classmethod
3580 def _check_context(cls, conn):
3581 conn.send(multiprocessing.get_start_method())
3582
3583 def check_context(self, ctx):
3584 r, w = ctx.Pipe(duplex=False)
3585 p = ctx.Process(target=self._check_context, args=(w,))
3586 p.start()
3587 w.close()
3588 child_method = r.recv()
3589 r.close()
3590 p.join()
3591 self.assertEqual(child_method, ctx.get_start_method())
3592
3593 def test_context(self):
3594 for method in ('fork', 'spawn', 'forkserver'):
3595 try:
3596 ctx = multiprocessing.get_context(method)
3597 except ValueError:
3598 continue
3599 self.assertEqual(ctx.get_start_method(), method)
3600 self.assertIs(ctx.get_context(), ctx)
3601 self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
3602 self.assertRaises(ValueError, ctx.set_start_method, None)
3603 self.check_context(ctx)
3604
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003605 def test_set_get(self):
3606 multiprocessing.set_forkserver_preload(PRELOAD)
3607 count = 0
3608 old_method = multiprocessing.get_start_method()
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003609 try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003610 for method in ('fork', 'spawn', 'forkserver'):
3611 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003612 multiprocessing.set_start_method(method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003613 except ValueError:
3614 continue
3615 self.assertEqual(multiprocessing.get_start_method(), method)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003616 ctx = multiprocessing.get_context()
3617 self.assertEqual(ctx.get_start_method(), method)
3618 self.assertTrue(type(ctx).__name__.lower().startswith(method))
3619 self.assertTrue(
3620 ctx.Process.__name__.lower().startswith(method))
3621 self.check_context(multiprocessing)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003622 count += 1
3623 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003624 multiprocessing.set_start_method(old_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003625 self.assertGreaterEqual(count, 1)
3626
3627 def test_get_all(self):
3628 methods = multiprocessing.get_all_start_methods()
3629 if sys.platform == 'win32':
3630 self.assertEqual(methods, ['spawn'])
3631 else:
3632 self.assertTrue(methods == ['fork', 'spawn'] or
3633 methods == ['fork', 'spawn', 'forkserver'])
3634
3635#
3636# Check that killing process does not leak named semaphores
3637#
3638
3639@unittest.skipIf(sys.platform == "win32",
3640 "test semantics don't make sense on Windows")
3641class TestSemaphoreTracker(unittest.TestCase):
3642 def test_semaphore_tracker(self):
3643 import subprocess
3644 cmd = '''if 1:
3645 import multiprocessing as mp, time, os
3646 mp.set_start_method("spawn")
3647 lock1 = mp.Lock()
3648 lock2 = mp.Lock()
3649 os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
3650 os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
3651 time.sleep(10)
3652 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003653 r, w = os.pipe()
3654 p = subprocess.Popen([sys.executable,
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003655 '-c', cmd % (w, w)],
Richard Oudkerk67e51982013-08-22 23:37:23 +01003656 pass_fds=[w],
3657 stderr=subprocess.PIPE)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003658 os.close(w)
3659 with open(r, 'rb', closefd=True) as f:
3660 name1 = f.readline().rstrip().decode('ascii')
3661 name2 = f.readline().rstrip().decode('ascii')
3662 _multiprocessing.sem_unlink(name1)
3663 p.terminate()
3664 p.wait()
Richard Oudkerk42a526c2014-02-21 22:29:58 +00003665 time.sleep(2.0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003666 with self.assertRaises(OSError) as ctx:
3667 _multiprocessing.sem_unlink(name2)
3668 # docs say it should be ENOENT, but OSX seems to give EINVAL
3669 self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
Richard Oudkerk67e51982013-08-22 23:37:23 +01003670 err = p.stderr.read().decode('utf-8')
3671 p.stderr.close()
3672 expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
3673 self.assertRegex(err, expected)
3674 self.assertRegex(err, 'semaphore_tracker: %r: \[Errno' % name1)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003675
3676#
3677# Mixins
3678#
3679
3680class ProcessesMixin(object):
3681 TYPE = 'processes'
3682 Process = multiprocessing.Process
3683 connection = multiprocessing.connection
3684 current_process = staticmethod(multiprocessing.current_process)
3685 active_children = staticmethod(multiprocessing.active_children)
3686 Pool = staticmethod(multiprocessing.Pool)
3687 Pipe = staticmethod(multiprocessing.Pipe)
3688 Queue = staticmethod(multiprocessing.Queue)
3689 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
3690 Lock = staticmethod(multiprocessing.Lock)
3691 RLock = staticmethod(multiprocessing.RLock)
3692 Semaphore = staticmethod(multiprocessing.Semaphore)
3693 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
3694 Condition = staticmethod(multiprocessing.Condition)
3695 Event = staticmethod(multiprocessing.Event)
3696 Barrier = staticmethod(multiprocessing.Barrier)
3697 Value = staticmethod(multiprocessing.Value)
3698 Array = staticmethod(multiprocessing.Array)
3699 RawValue = staticmethod(multiprocessing.RawValue)
3700 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003701
Benjamin Petersone711caf2008-06-11 16:44:04 +00003702
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003703class ManagerMixin(object):
3704 TYPE = 'manager'
3705 Process = multiprocessing.Process
3706 Queue = property(operator.attrgetter('manager.Queue'))
3707 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3708 Lock = property(operator.attrgetter('manager.Lock'))
3709 RLock = property(operator.attrgetter('manager.RLock'))
3710 Semaphore = property(operator.attrgetter('manager.Semaphore'))
3711 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3712 Condition = property(operator.attrgetter('manager.Condition'))
3713 Event = property(operator.attrgetter('manager.Event'))
3714 Barrier = property(operator.attrgetter('manager.Barrier'))
3715 Value = property(operator.attrgetter('manager.Value'))
3716 Array = property(operator.attrgetter('manager.Array'))
3717 list = property(operator.attrgetter('manager.list'))
3718 dict = property(operator.attrgetter('manager.dict'))
3719 Namespace = property(operator.attrgetter('manager.Namespace'))
3720
3721 @classmethod
3722 def Pool(cls, *args, **kwds):
3723 return cls.manager.Pool(*args, **kwds)
3724
3725 @classmethod
3726 def setUpClass(cls):
3727 cls.manager = multiprocessing.Manager()
3728
3729 @classmethod
3730 def tearDownClass(cls):
3731 # only the manager process should be returned by active_children()
3732 # but this can take a bit on slow machines, so wait a few seconds
3733 # if there are other children too (see #17395)
3734 t = 0.01
3735 while len(multiprocessing.active_children()) > 1 and t < 5:
3736 time.sleep(t)
3737 t *= 2
3738 gc.collect() # do garbage collection
3739 if cls.manager._number_of_objects() != 0:
3740 # This is not really an error since some tests do not
3741 # ensure that all processes which hold a reference to a
3742 # managed object have been joined.
3743 print('Shared objects which still exist at manager shutdown:')
3744 print(cls.manager._debug_info())
3745 cls.manager.shutdown()
3746 cls.manager.join()
3747 cls.manager = None
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003748
3749
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003750class ThreadsMixin(object):
3751 TYPE = 'threads'
3752 Process = multiprocessing.dummy.Process
3753 connection = multiprocessing.dummy.connection
3754 current_process = staticmethod(multiprocessing.dummy.current_process)
3755 active_children = staticmethod(multiprocessing.dummy.active_children)
3756 Pool = staticmethod(multiprocessing.Pool)
3757 Pipe = staticmethod(multiprocessing.dummy.Pipe)
3758 Queue = staticmethod(multiprocessing.dummy.Queue)
3759 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3760 Lock = staticmethod(multiprocessing.dummy.Lock)
3761 RLock = staticmethod(multiprocessing.dummy.RLock)
3762 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3763 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3764 Condition = staticmethod(multiprocessing.dummy.Condition)
3765 Event = staticmethod(multiprocessing.dummy.Event)
3766 Barrier = staticmethod(multiprocessing.dummy.Barrier)
3767 Value = staticmethod(multiprocessing.dummy.Value)
3768 Array = staticmethod(multiprocessing.dummy.Array)
3769
3770#
3771# Functions used to create test cases from the base ones in this module
3772#
3773
3774def install_tests_in_module_dict(remote_globs, start_method):
3775 __module__ = remote_globs['__name__']
3776 local_globs = globals()
3777 ALL_TYPES = {'processes', 'threads', 'manager'}
3778
3779 for name, base in local_globs.items():
3780 if not isinstance(base, type):
3781 continue
3782 if issubclass(base, BaseTestCase):
3783 if base is BaseTestCase:
3784 continue
3785 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
3786 for type_ in base.ALLOWED_TYPES:
3787 newname = 'With' + type_.capitalize() + name[1:]
3788 Mixin = local_globs[type_.capitalize() + 'Mixin']
3789 class Temp(base, Mixin, unittest.TestCase):
3790 pass
3791 Temp.__name__ = Temp.__qualname__ = newname
3792 Temp.__module__ = __module__
3793 remote_globs[newname] = Temp
3794 elif issubclass(base, unittest.TestCase):
3795 class Temp(base, object):
3796 pass
3797 Temp.__name__ = Temp.__qualname__ = name
3798 Temp.__module__ = __module__
3799 remote_globs[name] = Temp
3800
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01003801 dangling = [None, None]
3802 old_start_method = [None]
3803
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003804 def setUpModule():
3805 multiprocessing.set_forkserver_preload(PRELOAD)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01003806 multiprocessing.process._cleanup()
3807 dangling[0] = multiprocessing.process._dangling.copy()
3808 dangling[1] = threading._dangling.copy()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003809 old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003810 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003811 multiprocessing.set_start_method(start_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003812 except ValueError:
3813 raise unittest.SkipTest(start_method +
3814 ' start method not supported')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003815
3816 if sys.platform.startswith("linux"):
3817 try:
3818 lock = multiprocessing.RLock()
3819 except OSError:
3820 raise unittest.SkipTest("OSError raises on RLock creation, "
3821 "see issue 3111!")
3822 check_enough_semaphores()
3823 util.get_temp_dir() # creates temp directory
3824 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3825
3826 def tearDownModule():
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003827 multiprocessing.set_start_method(old_start_method[0], force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003828 # pause a bit so we don't get warning about dangling threads/processes
3829 time.sleep(0.5)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01003830 multiprocessing.process._cleanup()
3831 gc.collect()
3832 tmp = set(multiprocessing.process._dangling) - set(dangling[0])
3833 if tmp:
3834 print('Dangling processes:', tmp, file=sys.stderr)
3835 del tmp
3836 tmp = set(threading._dangling) - set(dangling[1])
3837 if tmp:
3838 print('Dangling threads:', tmp, file=sys.stderr)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003839
3840 remote_globs['setUpModule'] = setUpModule
3841 remote_globs['tearDownModule'] = tearDownModule