blob: 771bbf24265b91204df4fc7dacbd3fccc6c624d0 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00006import queue as pyqueue
7import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00008import io
Antoine Pitroude911b22011-12-21 11:03:24 +01009import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000010import sys
11import os
12import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020013import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000014import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010019import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010020import operator
Antoine Pitrou89889452017-03-24 13:52:11 +010021import weakref
R. David Murraya21e4ca2009-03-31 23:16:50 +000022import test.support
Berker Peksag076dbd02015-05-06 07:01:52 +030023import test.support.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000024
Benjamin Petersone5384b02008-10-04 22:00:42 +000025
R. David Murraya21e4ca2009-03-31 23:16:50 +000026# Skip tests if _multiprocessing wasn't built.
27_multiprocessing = test.support.import_module('_multiprocessing')
28# Skip tests if sem_open implementation is broken.
29test.support.import_module('multiprocessing.synchronize')
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070030# import threading after _multiprocessing to raise a more relevant error
Victor Stinner45df8202010-04-28 22:31:17 +000031# message: "No module named _multiprocessing". _multiprocessing is not compiled
32# without thread support.
33import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000034
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.dummy
36import multiprocessing.connection
37import multiprocessing.managers
38import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000039import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000040
Charles-François Natalibc8f0822011-09-20 20:36:51 +020041from multiprocessing import util
42
43try:
44 from multiprocessing import reduction
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010045 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
Charles-François Natalibc8f0822011-09-20 20:36:51 +020046except ImportError:
47 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
Brian Curtinafa88b52010-10-07 01:12:19 +000049try:
50 from multiprocessing.sharedctypes import Value, copy
51 HAS_SHAREDCTYPES = True
52except ImportError:
53 HAS_SHAREDCTYPES = False
54
Antoine Pitroubcb39d42011-08-23 19:46:22 +020055try:
56 import msvcrt
57except ImportError:
58 msvcrt = None
59
Benjamin Petersone711caf2008-06-11 16:44:04 +000060#
61#
62#
63
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000064def latin(s):
65 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000066
Benjamin Petersone711caf2008-06-11 16:44:04 +000067#
68# Constants
69#
70
71LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000072#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000073
74DELTA = 0.1
75CHECK_TIMINGS = False # making true makes tests take a lot longer
76 # and can sometimes cause some non-serious
77 # failures because some calls block a bit
78 # longer than expected
79if CHECK_TIMINGS:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
81else:
82 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
83
84HAVE_GETVALUE = not getattr(_multiprocessing,
85 'HAVE_BROKEN_SEM_GETVALUE', False)
86
Jesse Noller6214edd2009-01-19 16:23:53 +000087WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020088
Richard Oudkerk59d54042012-05-10 16:11:12 +010089from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020090
Richard Oudkerk59d54042012-05-10 16:11:12 +010091def wait_for_handle(handle, timeout):
92 if timeout is not None and timeout < 0.0:
93 timeout = None
94 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000095
Antoine Pitroubcb39d42011-08-23 19:46:22 +020096try:
97 MAXFD = os.sysconf("SC_OPEN_MAX")
98except:
99 MAXFD = 256
100
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100101# To speed up tests when using the forkserver, we can preload these:
102PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
103
Benjamin Petersone711caf2008-06-11 16:44:04 +0000104#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000105# Some tests require ctypes
106#
107
108try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000109 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000110except ImportError:
111 Structure = object
112 c_int = c_double = None
113
Charles-François Natali221ef672011-11-22 18:55:22 +0100114
115def check_enough_semaphores():
116 """Check that the system supports enough semaphores to run the test."""
117 # minimum number of semaphores available according to POSIX
118 nsems_min = 256
119 try:
120 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
121 except (AttributeError, ValueError):
122 # sysconf not available or setting not available
123 return
124 if nsems == -1 or nsems >= nsems_min:
125 return
126 raise unittest.SkipTest("The OS doesn't support enough semaphores "
127 "to run the test (required: %d)." % nsems_min)
128
129
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000130#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000131# Creates a wrapper for a function which records the time it takes to finish
132#
133
134class TimingWrapper(object):
135
136 def __init__(self, func):
137 self.func = func
138 self.elapsed = None
139
140 def __call__(self, *args, **kwds):
141 t = time.time()
142 try:
143 return self.func(*args, **kwds)
144 finally:
145 self.elapsed = time.time() - t
146
147#
148# Base class for test cases
149#
150
151class BaseTestCase(object):
152
153 ALLOWED_TYPES = ('processes', 'manager', 'threads')
154
155 def assertTimingAlmostEqual(self, a, b):
156 if CHECK_TIMINGS:
157 self.assertAlmostEqual(a, b, 1)
158
159 def assertReturnsIfImplemented(self, value, func, *args):
160 try:
161 res = func(*args)
162 except NotImplementedError:
163 pass
164 else:
165 return self.assertEqual(value, res)
166
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000167 # For the sanity of Windows users, rather than crashing or freezing in
168 # multiple ways.
169 def __reduce__(self, *args):
170 raise NotImplementedError("shouldn't try to pickle a test case")
171
172 __reduce_ex__ = __reduce__
173
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174#
175# Return the value of a semaphore
176#
177
178def get_value(self):
179 try:
180 return self.get_value()
181 except AttributeError:
182 try:
183 return self._Semaphore__value
184 except AttributeError:
185 try:
186 return self._value
187 except AttributeError:
188 raise NotImplementedError
189
190#
191# Testcases
192#
193
194class _TestProcess(BaseTestCase):
195
196 ALLOWED_TYPES = ('processes', 'threads')
197
198 def test_current(self):
199 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600200 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201
202 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204
205 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000206 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000207 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000209 self.assertEqual(current.ident, os.getpid())
210 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000211
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000212 def test_daemon_argument(self):
213 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600214 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000215
216 # By default uses the current process's daemon flag.
217 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000218 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000219 proc1 = self.Process(target=self._test, daemon=True)
220 self.assertTrue(proc1.daemon)
221 proc2 = self.Process(target=self._test, daemon=False)
222 self.assertFalse(proc2.daemon)
223
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000224 @classmethod
225 def _test(cls, q, *args, **kwds):
226 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227 q.put(args)
228 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000230 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000231 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 q.put(current.pid)
233
234 def test_process(self):
235 q = self.Queue(1)
236 e = self.Event()
237 args = (q, 1, 2)
238 kwargs = {'hello':23, 'bye':2.54}
239 name = 'SomeProcess'
240 p = self.Process(
241 target=self._test, args=args, kwargs=kwargs, name=name
242 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000243 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000244 current = self.current_process()
245
246 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000247 self.assertEqual(p.authkey, current.authkey)
248 self.assertEqual(p.is_alive(), False)
249 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000250 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000252 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000253
254 p.start()
255
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(p.exitcode, None)
257 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000258 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259
Ezio Melottib3aedd42010-11-20 19:04:17 +0000260 self.assertEqual(q.get(), args[1:])
261 self.assertEqual(q.get(), kwargs)
262 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000264 self.assertEqual(q.get(), current.authkey)
265 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000266
267 p.join()
268
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(p.exitcode, 0)
270 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000271 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000273 @classmethod
274 def _test_terminate(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100275 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276
277 def test_terminate(self):
278 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600279 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280
281 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000282 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000283 p.start()
284
285 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000286 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000287 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288
Richard Oudkerk59d54042012-05-10 16:11:12 +0100289 join = TimingWrapper(p.join)
290
291 self.assertEqual(join(0), None)
292 self.assertTimingAlmostEqual(join.elapsed, 0.0)
293 self.assertEqual(p.is_alive(), True)
294
295 self.assertEqual(join(-1), None)
296 self.assertTimingAlmostEqual(join.elapsed, 0.0)
297 self.assertEqual(p.is_alive(), True)
298
Richard Oudkerk26f92682013-10-17 13:56:18 +0100299 # XXX maybe terminating too soon causes the problems on Gentoo...
300 time.sleep(1)
301
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302 p.terminate()
303
Richard Oudkerk4f350792013-10-13 00:49:27 +0100304 if hasattr(signal, 'alarm'):
Richard Oudkerkd44500a2013-10-17 10:38:37 +0100305 # On the Gentoo buildbot waitpid() often seems to block forever.
Richard Oudkerk26f92682013-10-17 13:56:18 +0100306 # We use alarm() to interrupt it if it blocks for too long.
Richard Oudkerk4f350792013-10-13 00:49:27 +0100307 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100308 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100309 old_handler = signal.signal(signal.SIGALRM, handler)
310 try:
311 signal.alarm(10)
312 self.assertEqual(join(), None)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100313 finally:
Richard Oudkerk1e2f67c2013-10-17 14:24:06 +0100314 signal.alarm(0)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100315 signal.signal(signal.SIGALRM, old_handler)
316 else:
317 self.assertEqual(join(), None)
318
Benjamin Petersone711caf2008-06-11 16:44:04 +0000319 self.assertTimingAlmostEqual(join.elapsed, 0.0)
320
321 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000322 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323
324 p.join()
325
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000326 # XXX sometimes get p.exitcode == 0 on Windows ...
327 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000328
329 def test_cpu_count(self):
330 try:
331 cpus = multiprocessing.cpu_count()
332 except NotImplementedError:
333 cpus = 1
334 self.assertTrue(type(cpus) is int)
335 self.assertTrue(cpus >= 1)
336
337 def test_active_children(self):
338 self.assertEqual(type(self.active_children()), list)
339
340 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000341 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000342
Jesus Cea94f964f2011-09-09 20:26:57 +0200343 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000344 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000345 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000346
347 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000348 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000349
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000350 @classmethod
351 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000352 wconn.send(id)
353 if len(id) < 2:
354 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000355 p = cls.Process(
356 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000357 )
358 p.start()
359 p.join()
360
361 def test_recursion(self):
362 rconn, wconn = self.Pipe(duplex=False)
363 self._test_recursion(wconn, [])
364
365 time.sleep(DELTA)
366 result = []
367 while rconn.poll():
368 result.append(rconn.recv())
369
370 expected = [
371 [],
372 [0],
373 [0, 0],
374 [0, 1],
375 [1],
376 [1, 0],
377 [1, 1]
378 ]
379 self.assertEqual(result, expected)
380
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200381 @classmethod
382 def _test_sentinel(cls, event):
383 event.wait(10.0)
384
385 def test_sentinel(self):
386 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600387 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200388 event = self.Event()
389 p = self.Process(target=self._test_sentinel, args=(event,))
390 with self.assertRaises(ValueError):
391 p.sentinel
392 p.start()
393 self.addCleanup(p.join)
394 sentinel = p.sentinel
395 self.assertIsInstance(sentinel, int)
396 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
397 event.set()
398 p.join()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100399 self.assertTrue(wait_for_handle(sentinel, timeout=1))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200400
Benjamin Petersone711caf2008-06-11 16:44:04 +0000401#
402#
403#
404
405class _UpperCaser(multiprocessing.Process):
406
407 def __init__(self):
408 multiprocessing.Process.__init__(self)
409 self.child_conn, self.parent_conn = multiprocessing.Pipe()
410
411 def run(self):
412 self.parent_conn.close()
413 for s in iter(self.child_conn.recv, None):
414 self.child_conn.send(s.upper())
415 self.child_conn.close()
416
417 def submit(self, s):
418 assert type(s) is str
419 self.parent_conn.send(s)
420 return self.parent_conn.recv()
421
422 def stop(self):
423 self.parent_conn.send(None)
424 self.parent_conn.close()
425 self.child_conn.close()
426
427class _TestSubclassingProcess(BaseTestCase):
428
429 ALLOWED_TYPES = ('processes',)
430
431 def test_subclassing(self):
432 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200433 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000434 uppercaser.start()
435 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
436 self.assertEqual(uppercaser.submit('world'), 'WORLD')
437 uppercaser.stop()
438 uppercaser.join()
439
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100440 def test_stderr_flush(self):
441 # sys.stderr is flushed at process shutdown (issue #13812)
442 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600443 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100444
445 testfn = test.support.TESTFN
446 self.addCleanup(test.support.unlink, testfn)
447 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
448 proc.start()
449 proc.join()
450 with open(testfn, 'r') as f:
451 err = f.read()
452 # The whole traceback was printed
453 self.assertIn("ZeroDivisionError", err)
454 self.assertIn("test_multiprocessing.py", err)
455 self.assertIn("1/0 # MARKER", err)
456
457 @classmethod
458 def _test_stderr_flush(cls, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100459 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
460 sys.stderr = open(fd, 'w', closefd=False)
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100461 1/0 # MARKER
462
463
Richard Oudkerk29471de2012-06-06 19:04:57 +0100464 @classmethod
465 def _test_sys_exit(cls, reason, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100466 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
467 sys.stderr = open(fd, 'w', closefd=False)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100468 sys.exit(reason)
469
470 def test_sys_exit(self):
471 # See Issue 13854
472 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600473 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk29471de2012-06-06 19:04:57 +0100474
475 testfn = test.support.TESTFN
476 self.addCleanup(test.support.unlink, testfn)
477
Victor Stinnera6d865c2016-03-25 09:29:50 +0100478 for reason in (
479 [1, 2, 3],
480 'ignore this',
481 ):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100482 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
483 p.daemon = True
484 p.start()
485 p.join(5)
Victor Stinnera6d865c2016-03-25 09:29:50 +0100486 self.assertEqual(p.exitcode, 1)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100487
488 with open(testfn, 'r') as f:
Victor Stinnera6d865c2016-03-25 09:29:50 +0100489 content = f.read()
490 self.assertEqual(content.rstrip(), str(reason))
491
492 os.unlink(testfn)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100493
494 for reason in (True, False, 8):
495 p = self.Process(target=sys.exit, args=(reason,))
496 p.daemon = True
497 p.start()
498 p.join(5)
499 self.assertEqual(p.exitcode, reason)
500
Benjamin Petersone711caf2008-06-11 16:44:04 +0000501#
502#
503#
504
505def queue_empty(q):
506 if hasattr(q, 'empty'):
507 return q.empty()
508 else:
509 return q.qsize() == 0
510
511def queue_full(q, maxsize):
512 if hasattr(q, 'full'):
513 return q.full()
514 else:
515 return q.qsize() == maxsize
516
517
518class _TestQueue(BaseTestCase):
519
520
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000521 @classmethod
522 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000523 child_can_start.wait()
524 for i in range(6):
525 queue.get()
526 parent_can_continue.set()
527
528 def test_put(self):
529 MAXSIZE = 6
530 queue = self.Queue(maxsize=MAXSIZE)
531 child_can_start = self.Event()
532 parent_can_continue = self.Event()
533
534 proc = self.Process(
535 target=self._test_put,
536 args=(queue, child_can_start, parent_can_continue)
537 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000538 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000539 proc.start()
540
541 self.assertEqual(queue_empty(queue), True)
542 self.assertEqual(queue_full(queue, MAXSIZE), False)
543
544 queue.put(1)
545 queue.put(2, True)
546 queue.put(3, True, None)
547 queue.put(4, False)
548 queue.put(5, False, None)
549 queue.put_nowait(6)
550
551 # the values may be in buffer but not yet in pipe so sleep a bit
552 time.sleep(DELTA)
553
554 self.assertEqual(queue_empty(queue), False)
555 self.assertEqual(queue_full(queue, MAXSIZE), True)
556
557 put = TimingWrapper(queue.put)
558 put_nowait = TimingWrapper(queue.put_nowait)
559
560 self.assertRaises(pyqueue.Full, put, 7, False)
561 self.assertTimingAlmostEqual(put.elapsed, 0)
562
563 self.assertRaises(pyqueue.Full, put, 7, False, None)
564 self.assertTimingAlmostEqual(put.elapsed, 0)
565
566 self.assertRaises(pyqueue.Full, put_nowait, 7)
567 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
568
569 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
570 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
571
572 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
573 self.assertTimingAlmostEqual(put.elapsed, 0)
574
575 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
576 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
577
578 child_can_start.set()
579 parent_can_continue.wait()
580
581 self.assertEqual(queue_empty(queue), True)
582 self.assertEqual(queue_full(queue, MAXSIZE), False)
583
584 proc.join()
585
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000586 @classmethod
587 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000588 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000589 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000590 queue.put(2)
591 queue.put(3)
592 queue.put(4)
593 queue.put(5)
594 parent_can_continue.set()
595
596 def test_get(self):
597 queue = self.Queue()
598 child_can_start = self.Event()
599 parent_can_continue = self.Event()
600
601 proc = self.Process(
602 target=self._test_get,
603 args=(queue, child_can_start, parent_can_continue)
604 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000605 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000606 proc.start()
607
608 self.assertEqual(queue_empty(queue), True)
609
610 child_can_start.set()
611 parent_can_continue.wait()
612
613 time.sleep(DELTA)
614 self.assertEqual(queue_empty(queue), False)
615
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000616 # Hangs unexpectedly, remove for now
617 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000618 self.assertEqual(queue.get(True, None), 2)
619 self.assertEqual(queue.get(True), 3)
620 self.assertEqual(queue.get(timeout=1), 4)
621 self.assertEqual(queue.get_nowait(), 5)
622
623 self.assertEqual(queue_empty(queue), True)
624
625 get = TimingWrapper(queue.get)
626 get_nowait = TimingWrapper(queue.get_nowait)
627
628 self.assertRaises(pyqueue.Empty, get, False)
629 self.assertTimingAlmostEqual(get.elapsed, 0)
630
631 self.assertRaises(pyqueue.Empty, get, False, None)
632 self.assertTimingAlmostEqual(get.elapsed, 0)
633
634 self.assertRaises(pyqueue.Empty, get_nowait)
635 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
636
637 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
638 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
639
640 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
641 self.assertTimingAlmostEqual(get.elapsed, 0)
642
643 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
644 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
645
646 proc.join()
647
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000648 @classmethod
649 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000650 for i in range(10, 20):
651 queue.put(i)
652 # note that at this point the items may only be buffered, so the
653 # process cannot shutdown until the feeder thread has finished
654 # pushing items onto the pipe.
655
656 def test_fork(self):
657 # Old versions of Queue would fail to create a new feeder
658 # thread for a forked process if the original process had its
659 # own feeder thread. This test checks that this no longer
660 # happens.
661
662 queue = self.Queue()
663
664 # put items on queue so that main process starts a feeder thread
665 for i in range(10):
666 queue.put(i)
667
668 # wait to make sure thread starts before we fork a new process
669 time.sleep(DELTA)
670
671 # fork process
672 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200673 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 p.start()
675
676 # check that all expected items are in the queue
677 for i in range(20):
678 self.assertEqual(queue.get(), i)
679 self.assertRaises(pyqueue.Empty, queue.get, False)
680
681 p.join()
682
683 def test_qsize(self):
684 q = self.Queue()
685 try:
686 self.assertEqual(q.qsize(), 0)
687 except NotImplementedError:
Zachary Ware9fe6d862013-12-08 00:20:35 -0600688 self.skipTest('qsize method not implemented')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000689 q.put(1)
690 self.assertEqual(q.qsize(), 1)
691 q.put(5)
692 self.assertEqual(q.qsize(), 2)
693 q.get()
694 self.assertEqual(q.qsize(), 1)
695 q.get()
696 self.assertEqual(q.qsize(), 0)
697
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000698 @classmethod
699 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000700 for obj in iter(q.get, None):
701 time.sleep(DELTA)
702 q.task_done()
703
704 def test_task_done(self):
705 queue = self.JoinableQueue()
706
Benjamin Petersone711caf2008-06-11 16:44:04 +0000707 workers = [self.Process(target=self._test_task_done, args=(queue,))
708 for i in range(4)]
709
710 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200711 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000712 p.start()
713
714 for i in range(10):
715 queue.put(i)
716
717 queue.join()
718
719 for p in workers:
720 queue.put(None)
721
722 for p in workers:
723 p.join()
724
Serhiy Storchakaf8904e92015-03-06 23:32:54 +0200725 def test_no_import_lock_contention(self):
726 with test.support.temp_cwd():
727 module_name = 'imported_by_an_imported_module'
728 with open(module_name + '.py', 'w') as f:
729 f.write("""if 1:
730 import multiprocessing
731
732 q = multiprocessing.Queue()
733 q.put('knock knock')
734 q.get(timeout=3)
735 q.close()
736 del q
737 """)
738
739 with test.support.DirsOnSysPath(os.getcwd()):
740 try:
741 __import__(module_name)
742 except pyqueue.Empty:
743 self.fail("Probable regression on import lock contention;"
744 " see Issue #22853")
745
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200746 def test_timeout(self):
747 q = multiprocessing.Queue()
748 start = time.time()
Victor Stinneraad7b2e2015-02-05 14:25:05 +0100749 self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200750 delta = time.time() - start
Victor Stinneraad7b2e2015-02-05 14:25:05 +0100751 # Tolerate a delta of 30 ms because of the bad clock resolution on
752 # Windows (usually 15.6 ms)
753 self.assertGreaterEqual(delta, 0.170)
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200754
Benjamin Petersone711caf2008-06-11 16:44:04 +0000755#
756#
757#
758
759class _TestLock(BaseTestCase):
760
761 def test_lock(self):
762 lock = self.Lock()
763 self.assertEqual(lock.acquire(), True)
764 self.assertEqual(lock.acquire(False), False)
765 self.assertEqual(lock.release(), None)
766 self.assertRaises((ValueError, threading.ThreadError), lock.release)
767
768 def test_rlock(self):
769 lock = self.RLock()
770 self.assertEqual(lock.acquire(), True)
771 self.assertEqual(lock.acquire(), True)
772 self.assertEqual(lock.acquire(), True)
773 self.assertEqual(lock.release(), None)
774 self.assertEqual(lock.release(), None)
775 self.assertEqual(lock.release(), None)
776 self.assertRaises((AssertionError, RuntimeError), lock.release)
777
Jesse Nollerf8d00852009-03-31 03:25:07 +0000778 def test_lock_context(self):
779 with self.Lock():
780 pass
781
Benjamin Petersone711caf2008-06-11 16:44:04 +0000782
783class _TestSemaphore(BaseTestCase):
784
785 def _test_semaphore(self, sem):
786 self.assertReturnsIfImplemented(2, get_value, sem)
787 self.assertEqual(sem.acquire(), True)
788 self.assertReturnsIfImplemented(1, get_value, sem)
789 self.assertEqual(sem.acquire(), True)
790 self.assertReturnsIfImplemented(0, get_value, sem)
791 self.assertEqual(sem.acquire(False), False)
792 self.assertReturnsIfImplemented(0, get_value, sem)
793 self.assertEqual(sem.release(), None)
794 self.assertReturnsIfImplemented(1, get_value, sem)
795 self.assertEqual(sem.release(), None)
796 self.assertReturnsIfImplemented(2, get_value, sem)
797
798 def test_semaphore(self):
799 sem = self.Semaphore(2)
800 self._test_semaphore(sem)
801 self.assertEqual(sem.release(), None)
802 self.assertReturnsIfImplemented(3, get_value, sem)
803 self.assertEqual(sem.release(), None)
804 self.assertReturnsIfImplemented(4, get_value, sem)
805
806 def test_bounded_semaphore(self):
807 sem = self.BoundedSemaphore(2)
808 self._test_semaphore(sem)
809 # Currently fails on OS/X
810 #if HAVE_GETVALUE:
811 # self.assertRaises(ValueError, sem.release)
812 # self.assertReturnsIfImplemented(2, get_value, sem)
813
814 def test_timeout(self):
815 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600816 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000817
818 sem = self.Semaphore(0)
819 acquire = TimingWrapper(sem.acquire)
820
821 self.assertEqual(acquire(False), False)
822 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
823
824 self.assertEqual(acquire(False, None), False)
825 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
826
827 self.assertEqual(acquire(False, TIMEOUT1), False)
828 self.assertTimingAlmostEqual(acquire.elapsed, 0)
829
830 self.assertEqual(acquire(True, TIMEOUT2), False)
831 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
832
833 self.assertEqual(acquire(timeout=TIMEOUT3), False)
834 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
835
836
837class _TestCondition(BaseTestCase):
838
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000839 @classmethod
840 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000841 cond.acquire()
842 sleeping.release()
843 cond.wait(timeout)
844 woken.release()
845 cond.release()
846
847 def check_invariant(self, cond):
848 # this is only supposed to succeed when there are no sleepers
849 if self.TYPE == 'processes':
850 try:
851 sleepers = (cond._sleeping_count.get_value() -
852 cond._woken_count.get_value())
853 self.assertEqual(sleepers, 0)
854 self.assertEqual(cond._wait_semaphore.get_value(), 0)
855 except NotImplementedError:
856 pass
857
858 def test_notify(self):
859 cond = self.Condition()
860 sleeping = self.Semaphore(0)
861 woken = self.Semaphore(0)
862
863 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000864 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000865 p.start()
866
867 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000868 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 p.start()
870
871 # wait for both children to start sleeping
872 sleeping.acquire()
873 sleeping.acquire()
874
875 # check no process/thread has woken up
876 time.sleep(DELTA)
877 self.assertReturnsIfImplemented(0, get_value, woken)
878
879 # wake up one process/thread
880 cond.acquire()
881 cond.notify()
882 cond.release()
883
884 # check one process/thread has woken up
885 time.sleep(DELTA)
886 self.assertReturnsIfImplemented(1, get_value, woken)
887
888 # wake up another
889 cond.acquire()
890 cond.notify()
891 cond.release()
892
893 # check other has woken up
894 time.sleep(DELTA)
895 self.assertReturnsIfImplemented(2, get_value, woken)
896
897 # check state is not mucked up
898 self.check_invariant(cond)
899 p.join()
900
901 def test_notify_all(self):
902 cond = self.Condition()
903 sleeping = self.Semaphore(0)
904 woken = self.Semaphore(0)
905
906 # start some threads/processes which will timeout
907 for i in range(3):
908 p = self.Process(target=self.f,
909 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000910 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000911 p.start()
912
913 t = threading.Thread(target=self.f,
914 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000915 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000916 t.start()
917
918 # wait for them all to sleep
919 for i in range(6):
920 sleeping.acquire()
921
922 # check they have all timed out
923 for i in range(6):
924 woken.acquire()
925 self.assertReturnsIfImplemented(0, get_value, woken)
926
927 # check state is not mucked up
928 self.check_invariant(cond)
929
930 # start some more threads/processes
931 for i in range(3):
932 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000933 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000934 p.start()
935
936 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000937 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000938 t.start()
939
940 # wait for them to all sleep
941 for i in range(6):
942 sleeping.acquire()
943
944 # check no process/thread has woken up
945 time.sleep(DELTA)
946 self.assertReturnsIfImplemented(0, get_value, woken)
947
948 # wake them all up
949 cond.acquire()
950 cond.notify_all()
951 cond.release()
952
953 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200954 for i in range(10):
955 try:
956 if get_value(woken) == 6:
957 break
958 except NotImplementedError:
959 break
960 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000961 self.assertReturnsIfImplemented(6, get_value, woken)
962
963 # check state is not mucked up
964 self.check_invariant(cond)
965
966 def test_timeout(self):
967 cond = self.Condition()
968 wait = TimingWrapper(cond.wait)
969 cond.acquire()
970 res = wait(TIMEOUT1)
971 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000972 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000973 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
974
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200975 @classmethod
976 def _test_waitfor_f(cls, cond, state):
977 with cond:
978 state.value = 0
979 cond.notify()
980 result = cond.wait_for(lambda : state.value==4)
981 if not result or state.value != 4:
982 sys.exit(1)
983
984 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
985 def test_waitfor(self):
986 # based on test in test/lock_tests.py
987 cond = self.Condition()
988 state = self.Value('i', -1)
989
990 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
991 p.daemon = True
992 p.start()
993
994 with cond:
995 result = cond.wait_for(lambda : state.value==0)
996 self.assertTrue(result)
997 self.assertEqual(state.value, 0)
998
999 for i in range(4):
1000 time.sleep(0.01)
1001 with cond:
1002 state.value += 1
1003 cond.notify()
1004
1005 p.join(5)
1006 self.assertFalse(p.is_alive())
1007 self.assertEqual(p.exitcode, 0)
1008
1009 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001010 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1011 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001012 with cond:
1013 expected = 0.1
1014 dt = time.time()
1015 result = cond.wait_for(lambda : state.value==4, timeout=expected)
1016 dt = time.time() - dt
1017 # borrow logic in assertTimeout() from test/lock_tests.py
1018 if not result and expected * 0.6 < dt < expected * 10.0:
1019 success.value = True
1020
1021 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1022 def test_waitfor_timeout(self):
1023 # based on test in test/lock_tests.py
1024 cond = self.Condition()
1025 state = self.Value('i', 0)
1026 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001027 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001028
1029 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001030 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001031 p.daemon = True
1032 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001033 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001034
1035 # Only increment 3 times, so state == 4 is never reached.
1036 for i in range(3):
1037 time.sleep(0.01)
1038 with cond:
1039 state.value += 1
1040 cond.notify()
1041
1042 p.join(5)
1043 self.assertTrue(success.value)
1044
Richard Oudkerk98449932012-06-05 13:15:29 +01001045 @classmethod
1046 def _test_wait_result(cls, c, pid):
1047 with c:
1048 c.notify()
1049 time.sleep(1)
1050 if pid is not None:
1051 os.kill(pid, signal.SIGINT)
1052
1053 def test_wait_result(self):
1054 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1055 pid = os.getpid()
1056 else:
1057 pid = None
1058
1059 c = self.Condition()
1060 with c:
1061 self.assertFalse(c.wait(0))
1062 self.assertFalse(c.wait(0.1))
1063
1064 p = self.Process(target=self._test_wait_result, args=(c, pid))
1065 p.start()
1066
1067 self.assertTrue(c.wait(10))
1068 if pid is not None:
1069 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1070
1071 p.join()
1072
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073
1074class _TestEvent(BaseTestCase):
1075
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001076 @classmethod
1077 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001078 time.sleep(TIMEOUT2)
1079 event.set()
1080
1081 def test_event(self):
1082 event = self.Event()
1083 wait = TimingWrapper(event.wait)
1084
Ezio Melotti13925002011-03-16 11:05:33 +02001085 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001086 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001087 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088
Benjamin Peterson965ce872009-04-05 21:24:58 +00001089 # Removed, threading.Event.wait() will return the value of the __flag
1090 # instead of None. API Shear with the semaphore backed mp.Event
1091 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001092 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001093 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001094 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1095
1096 event.set()
1097
1098 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001099 self.assertEqual(event.is_set(), True)
1100 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001101 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001102 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001103 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1104 # self.assertEqual(event.is_set(), True)
1105
1106 event.clear()
1107
1108 #self.assertEqual(event.is_set(), False)
1109
Jesus Cea94f964f2011-09-09 20:26:57 +02001110 p = self.Process(target=self._test_event, args=(event,))
1111 p.daemon = True
1112 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001113 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001114
1115#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001116# Tests for Barrier - adapted from tests in test/lock_tests.py
1117#
1118
1119# Many of the tests for threading.Barrier use a list as an atomic
1120# counter: a value is appended to increment the counter, and the
1121# length of the list gives the value. We use the class DummyList
1122# for the same purpose.
1123
1124class _DummyList(object):
1125
1126 def __init__(self):
1127 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1128 lock = multiprocessing.Lock()
1129 self.__setstate__((wrapper, lock))
1130 self._lengthbuf[0] = 0
1131
1132 def __setstate__(self, state):
1133 (self._wrapper, self._lock) = state
1134 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1135
1136 def __getstate__(self):
1137 return (self._wrapper, self._lock)
1138
1139 def append(self, _):
1140 with self._lock:
1141 self._lengthbuf[0] += 1
1142
1143 def __len__(self):
1144 with self._lock:
1145 return self._lengthbuf[0]
1146
1147def _wait():
1148 # A crude wait/yield function not relying on synchronization primitives.
1149 time.sleep(0.01)
1150
1151
1152class Bunch(object):
1153 """
1154 A bunch of threads.
1155 """
1156 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1157 """
1158 Construct a bunch of `n` threads running the same function `f`.
1159 If `wait_before_exit` is True, the threads won't terminate until
1160 do_finish() is called.
1161 """
1162 self.f = f
1163 self.args = args
1164 self.n = n
1165 self.started = namespace.DummyList()
1166 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001167 self._can_exit = namespace.Event()
1168 if not wait_before_exit:
1169 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001170 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001171 p = namespace.Process(target=self.task)
1172 p.daemon = True
1173 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001174
1175 def task(self):
1176 pid = os.getpid()
1177 self.started.append(pid)
1178 try:
1179 self.f(*self.args)
1180 finally:
1181 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001182 self._can_exit.wait(30)
1183 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001184
1185 def wait_for_started(self):
1186 while len(self.started) < self.n:
1187 _wait()
1188
1189 def wait_for_finished(self):
1190 while len(self.finished) < self.n:
1191 _wait()
1192
1193 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001194 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001195
1196
1197class AppendTrue(object):
1198 def __init__(self, obj):
1199 self.obj = obj
1200 def __call__(self):
1201 self.obj.append(True)
1202
1203
1204class _TestBarrier(BaseTestCase):
1205 """
1206 Tests for Barrier objects.
1207 """
1208 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001209 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001210
1211 def setUp(self):
1212 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1213
1214 def tearDown(self):
1215 self.barrier.abort()
1216 self.barrier = None
1217
1218 def DummyList(self):
1219 if self.TYPE == 'threads':
1220 return []
1221 elif self.TYPE == 'manager':
1222 return self.manager.list()
1223 else:
1224 return _DummyList()
1225
1226 def run_threads(self, f, args):
1227 b = Bunch(self, f, args, self.N-1)
1228 f(*args)
1229 b.wait_for_finished()
1230
1231 @classmethod
1232 def multipass(cls, barrier, results, n):
1233 m = barrier.parties
1234 assert m == cls.N
1235 for i in range(n):
1236 results[0].append(True)
1237 assert len(results[1]) == i * m
1238 barrier.wait()
1239 results[1].append(True)
1240 assert len(results[0]) == (i + 1) * m
1241 barrier.wait()
1242 try:
1243 assert barrier.n_waiting == 0
1244 except NotImplementedError:
1245 pass
1246 assert not barrier.broken
1247
1248 def test_barrier(self, passes=1):
1249 """
1250 Test that a barrier is passed in lockstep
1251 """
1252 results = [self.DummyList(), self.DummyList()]
1253 self.run_threads(self.multipass, (self.barrier, results, passes))
1254
1255 def test_barrier_10(self):
1256 """
1257 Test that a barrier works for 10 consecutive runs
1258 """
1259 return self.test_barrier(10)
1260
1261 @classmethod
1262 def _test_wait_return_f(cls, barrier, queue):
1263 res = barrier.wait()
1264 queue.put(res)
1265
1266 def test_wait_return(self):
1267 """
1268 test the return value from barrier.wait
1269 """
1270 queue = self.Queue()
1271 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1272 results = [queue.get() for i in range(self.N)]
1273 self.assertEqual(results.count(0), 1)
1274
1275 @classmethod
1276 def _test_action_f(cls, barrier, results):
1277 barrier.wait()
1278 if len(results) != 1:
1279 raise RuntimeError
1280
1281 def test_action(self):
1282 """
1283 Test the 'action' callback
1284 """
1285 results = self.DummyList()
1286 barrier = self.Barrier(self.N, action=AppendTrue(results))
1287 self.run_threads(self._test_action_f, (barrier, results))
1288 self.assertEqual(len(results), 1)
1289
1290 @classmethod
1291 def _test_abort_f(cls, barrier, results1, results2):
1292 try:
1293 i = barrier.wait()
1294 if i == cls.N//2:
1295 raise RuntimeError
1296 barrier.wait()
1297 results1.append(True)
1298 except threading.BrokenBarrierError:
1299 results2.append(True)
1300 except RuntimeError:
1301 barrier.abort()
1302
1303 def test_abort(self):
1304 """
1305 Test that an abort will put the barrier in a broken state
1306 """
1307 results1 = self.DummyList()
1308 results2 = self.DummyList()
1309 self.run_threads(self._test_abort_f,
1310 (self.barrier, results1, results2))
1311 self.assertEqual(len(results1), 0)
1312 self.assertEqual(len(results2), self.N-1)
1313 self.assertTrue(self.barrier.broken)
1314
1315 @classmethod
1316 def _test_reset_f(cls, barrier, results1, results2, results3):
1317 i = barrier.wait()
1318 if i == cls.N//2:
1319 # Wait until the other threads are all in the barrier.
1320 while barrier.n_waiting < cls.N-1:
1321 time.sleep(0.001)
1322 barrier.reset()
1323 else:
1324 try:
1325 barrier.wait()
1326 results1.append(True)
1327 except threading.BrokenBarrierError:
1328 results2.append(True)
1329 # Now, pass the barrier again
1330 barrier.wait()
1331 results3.append(True)
1332
1333 def test_reset(self):
1334 """
1335 Test that a 'reset' on a barrier frees the waiting threads
1336 """
1337 results1 = self.DummyList()
1338 results2 = self.DummyList()
1339 results3 = self.DummyList()
1340 self.run_threads(self._test_reset_f,
1341 (self.barrier, results1, results2, results3))
1342 self.assertEqual(len(results1), 0)
1343 self.assertEqual(len(results2), self.N-1)
1344 self.assertEqual(len(results3), self.N)
1345
1346 @classmethod
1347 def _test_abort_and_reset_f(cls, barrier, barrier2,
1348 results1, results2, results3):
1349 try:
1350 i = barrier.wait()
1351 if i == cls.N//2:
1352 raise RuntimeError
1353 barrier.wait()
1354 results1.append(True)
1355 except threading.BrokenBarrierError:
1356 results2.append(True)
1357 except RuntimeError:
1358 barrier.abort()
1359 # Synchronize and reset the barrier. Must synchronize first so
1360 # that everyone has left it when we reset, and after so that no
1361 # one enters it before the reset.
1362 if barrier2.wait() == cls.N//2:
1363 barrier.reset()
1364 barrier2.wait()
1365 barrier.wait()
1366 results3.append(True)
1367
1368 def test_abort_and_reset(self):
1369 """
1370 Test that a barrier can be reset after being broken.
1371 """
1372 results1 = self.DummyList()
1373 results2 = self.DummyList()
1374 results3 = self.DummyList()
1375 barrier2 = self.Barrier(self.N)
1376
1377 self.run_threads(self._test_abort_and_reset_f,
1378 (self.barrier, barrier2, results1, results2, results3))
1379 self.assertEqual(len(results1), 0)
1380 self.assertEqual(len(results2), self.N-1)
1381 self.assertEqual(len(results3), self.N)
1382
1383 @classmethod
1384 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001385 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001386 if i == cls.N//2:
1387 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001388 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001389 try:
1390 barrier.wait(0.5)
1391 except threading.BrokenBarrierError:
1392 results.append(True)
1393
1394 def test_timeout(self):
1395 """
1396 Test wait(timeout)
1397 """
1398 results = self.DummyList()
1399 self.run_threads(self._test_timeout_f, (self.barrier, results))
1400 self.assertEqual(len(results), self.barrier.parties)
1401
1402 @classmethod
1403 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001404 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001405 if i == cls.N//2:
1406 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001407 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001408 try:
1409 barrier.wait()
1410 except threading.BrokenBarrierError:
1411 results.append(True)
1412
1413 def test_default_timeout(self):
1414 """
1415 Test the barrier's default timeout
1416 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001417 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001418 results = self.DummyList()
1419 self.run_threads(self._test_default_timeout_f, (barrier, results))
1420 self.assertEqual(len(results), barrier.parties)
1421
1422 def test_single_thread(self):
1423 b = self.Barrier(1)
1424 b.wait()
1425 b.wait()
1426
1427 @classmethod
1428 def _test_thousand_f(cls, barrier, passes, conn, lock):
1429 for i in range(passes):
1430 barrier.wait()
1431 with lock:
1432 conn.send(i)
1433
1434 def test_thousand(self):
1435 if self.TYPE == 'manager':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001436 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk3730a172012-06-15 18:26:07 +01001437 passes = 1000
1438 lock = self.Lock()
1439 conn, child_conn = self.Pipe(False)
1440 for j in range(self.N):
1441 p = self.Process(target=self._test_thousand_f,
1442 args=(self.barrier, passes, child_conn, lock))
1443 p.start()
1444
1445 for i in range(passes):
1446 for j in range(self.N):
1447 self.assertEqual(conn.recv(), i)
1448
1449#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001450#
1451#
1452
1453class _TestValue(BaseTestCase):
1454
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001455 ALLOWED_TYPES = ('processes',)
1456
Benjamin Petersone711caf2008-06-11 16:44:04 +00001457 codes_values = [
1458 ('i', 4343, 24234),
1459 ('d', 3.625, -4.25),
1460 ('h', -232, 234),
1461 ('c', latin('x'), latin('y'))
1462 ]
1463
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001464 def setUp(self):
1465 if not HAS_SHAREDCTYPES:
1466 self.skipTest("requires multiprocessing.sharedctypes")
1467
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001468 @classmethod
1469 def _test(cls, values):
1470 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001471 sv.value = cv[2]
1472
1473
1474 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001475 if raw:
1476 values = [self.RawValue(code, value)
1477 for code, value, _ in self.codes_values]
1478 else:
1479 values = [self.Value(code, value)
1480 for code, value, _ in self.codes_values]
1481
1482 for sv, cv in zip(values, self.codes_values):
1483 self.assertEqual(sv.value, cv[1])
1484
1485 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001486 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001487 proc.start()
1488 proc.join()
1489
1490 for sv, cv in zip(values, self.codes_values):
1491 self.assertEqual(sv.value, cv[2])
1492
1493 def test_rawvalue(self):
1494 self.test_value(raw=True)
1495
1496 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001497 val1 = self.Value('i', 5)
1498 lock1 = val1.get_lock()
1499 obj1 = val1.get_obj()
1500
1501 val2 = self.Value('i', 5, lock=None)
1502 lock2 = val2.get_lock()
1503 obj2 = val2.get_obj()
1504
1505 lock = self.Lock()
1506 val3 = self.Value('i', 5, lock=lock)
1507 lock3 = val3.get_lock()
1508 obj3 = val3.get_obj()
1509 self.assertEqual(lock, lock3)
1510
Jesse Nollerb0516a62009-01-18 03:11:38 +00001511 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001512 self.assertFalse(hasattr(arr4, 'get_lock'))
1513 self.assertFalse(hasattr(arr4, 'get_obj'))
1514
Jesse Nollerb0516a62009-01-18 03:11:38 +00001515 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1516
1517 arr5 = self.RawValue('i', 5)
1518 self.assertFalse(hasattr(arr5, 'get_lock'))
1519 self.assertFalse(hasattr(arr5, 'get_obj'))
1520
Benjamin Petersone711caf2008-06-11 16:44:04 +00001521
1522class _TestArray(BaseTestCase):
1523
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001524 ALLOWED_TYPES = ('processes',)
1525
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001526 @classmethod
1527 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001528 for i in range(1, len(seq)):
1529 seq[i] += seq[i-1]
1530
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001531 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001532 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001533 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1534 if raw:
1535 arr = self.RawArray('i', seq)
1536 else:
1537 arr = self.Array('i', seq)
1538
1539 self.assertEqual(len(arr), len(seq))
1540 self.assertEqual(arr[3], seq[3])
1541 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1542
1543 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1544
1545 self.assertEqual(list(arr[:]), seq)
1546
1547 self.f(seq)
1548
1549 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001550 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001551 p.start()
1552 p.join()
1553
1554 self.assertEqual(list(arr[:]), seq)
1555
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001556 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001557 def test_array_from_size(self):
1558 size = 10
1559 # Test for zeroing (see issue #11675).
1560 # The repetition below strengthens the test by increasing the chances
1561 # of previously allocated non-zero memory being used for the new array
1562 # on the 2nd and 3rd loops.
1563 for _ in range(3):
1564 arr = self.Array('i', size)
1565 self.assertEqual(len(arr), size)
1566 self.assertEqual(list(arr), [0] * size)
1567 arr[:] = range(10)
1568 self.assertEqual(list(arr), list(range(10)))
1569 del arr
1570
1571 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001572 def test_rawarray(self):
1573 self.test_array(raw=True)
1574
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001575 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001576 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001577 arr1 = self.Array('i', list(range(10)))
1578 lock1 = arr1.get_lock()
1579 obj1 = arr1.get_obj()
1580
1581 arr2 = self.Array('i', list(range(10)), lock=None)
1582 lock2 = arr2.get_lock()
1583 obj2 = arr2.get_obj()
1584
1585 lock = self.Lock()
1586 arr3 = self.Array('i', list(range(10)), lock=lock)
1587 lock3 = arr3.get_lock()
1588 obj3 = arr3.get_obj()
1589 self.assertEqual(lock, lock3)
1590
Jesse Nollerb0516a62009-01-18 03:11:38 +00001591 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001592 self.assertFalse(hasattr(arr4, 'get_lock'))
1593 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001594 self.assertRaises(AttributeError,
1595 self.Array, 'i', range(10), lock='notalock')
1596
1597 arr5 = self.RawArray('i', range(10))
1598 self.assertFalse(hasattr(arr5, 'get_lock'))
1599 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001600
1601#
1602#
1603#
1604
1605class _TestContainers(BaseTestCase):
1606
1607 ALLOWED_TYPES = ('manager',)
1608
1609 def test_list(self):
1610 a = self.list(list(range(10)))
1611 self.assertEqual(a[:], list(range(10)))
1612
1613 b = self.list()
1614 self.assertEqual(b[:], [])
1615
1616 b.extend(list(range(5)))
1617 self.assertEqual(b[:], list(range(5)))
1618
1619 self.assertEqual(b[2], 2)
1620 self.assertEqual(b[2:10], [2,3,4])
1621
1622 b *= 2
1623 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1624
1625 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1626
1627 self.assertEqual(a[:], list(range(10)))
1628
1629 d = [a, b]
1630 e = self.list(d)
1631 self.assertEqual(
Davin Potts86a76682016-09-07 18:48:01 -05001632 [element[:] for element in e],
Benjamin Petersone711caf2008-06-11 16:44:04 +00001633 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1634 )
1635
1636 f = self.list([a])
1637 a.append('hello')
Davin Potts86a76682016-09-07 18:48:01 -05001638 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
1639
1640 def test_list_proxy_in_list(self):
1641 a = self.list([self.list(range(3)) for _i in range(3)])
1642 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
1643
1644 a[0][-1] = 55
1645 self.assertEqual(a[0][:], [0, 1, 55])
1646 for i in range(1, 3):
1647 self.assertEqual(a[i][:], [0, 1, 2])
1648
1649 self.assertEqual(a[1].pop(), 2)
1650 self.assertEqual(len(a[1]), 2)
1651 for i in range(0, 3, 2):
1652 self.assertEqual(len(a[i]), 3)
1653
1654 del a
1655
1656 b = self.list()
1657 b.append(b)
1658 del b
Benjamin Petersone711caf2008-06-11 16:44:04 +00001659
1660 def test_dict(self):
1661 d = self.dict()
1662 indices = list(range(65, 70))
1663 for i in indices:
1664 d[i] = chr(i)
1665 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1666 self.assertEqual(sorted(d.keys()), indices)
1667 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1668 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1669
Davin Potts86a76682016-09-07 18:48:01 -05001670 def test_dict_proxy_nested(self):
1671 pets = self.dict(ferrets=2, hamsters=4)
1672 supplies = self.dict(water=10, feed=3)
1673 d = self.dict(pets=pets, supplies=supplies)
1674
1675 self.assertEqual(supplies['water'], 10)
1676 self.assertEqual(d['supplies']['water'], 10)
1677
1678 d['supplies']['blankets'] = 5
1679 self.assertEqual(supplies['blankets'], 5)
1680 self.assertEqual(d['supplies']['blankets'], 5)
1681
1682 d['supplies']['water'] = 7
1683 self.assertEqual(supplies['water'], 7)
1684 self.assertEqual(d['supplies']['water'], 7)
1685
1686 del pets
1687 del supplies
1688 self.assertEqual(d['pets']['ferrets'], 2)
1689 d['supplies']['blankets'] = 11
1690 self.assertEqual(d['supplies']['blankets'], 11)
1691
1692 pets = d['pets']
1693 supplies = d['supplies']
1694 supplies['water'] = 7
1695 self.assertEqual(supplies['water'], 7)
1696 self.assertEqual(d['supplies']['water'], 7)
1697
1698 d.clear()
1699 self.assertEqual(len(d), 0)
1700 self.assertEqual(supplies['water'], 7)
1701 self.assertEqual(pets['hamsters'], 4)
1702
1703 l = self.list([pets, supplies])
1704 l[0]['marmots'] = 1
1705 self.assertEqual(pets['marmots'], 1)
1706 self.assertEqual(l[0]['marmots'], 1)
1707
1708 del pets
1709 del supplies
1710 self.assertEqual(l[0]['marmots'], 1)
1711
1712 outer = self.list([[88, 99], l])
1713 self.assertIsInstance(outer[0], list) # Not a ListProxy
1714 self.assertEqual(outer[-1][-1]['feed'], 3)
1715
Benjamin Petersone711caf2008-06-11 16:44:04 +00001716 def test_namespace(self):
1717 n = self.Namespace()
1718 n.name = 'Bob'
1719 n.job = 'Builder'
1720 n._hidden = 'hidden'
1721 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1722 del n.job
1723 self.assertEqual(str(n), "Namespace(name='Bob')")
1724 self.assertTrue(hasattr(n, 'name'))
1725 self.assertTrue(not hasattr(n, 'job'))
1726
1727#
1728#
1729#
1730
1731def sqr(x, wait=0.0):
1732 time.sleep(wait)
1733 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001734
Antoine Pitroude911b22011-12-21 11:03:24 +01001735def mul(x, y):
1736 return x*y
1737
Charles-François Natali78f55ff2016-02-10 22:58:18 +00001738def raise_large_valuerror(wait):
1739 time.sleep(wait)
1740 raise ValueError("x" * 1024**2)
1741
Antoine Pitrou89889452017-03-24 13:52:11 +01001742def identity(x):
1743 return x
1744
1745class CountedObject(object):
1746 n_instances = 0
1747
1748 def __new__(cls):
1749 cls.n_instances += 1
1750 return object.__new__(cls)
1751
1752 def __del__(self):
1753 type(self).n_instances -= 1
1754
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001755class SayWhenError(ValueError): pass
1756
1757def exception_throwing_generator(total, when):
Xiang Zhang794623b2017-03-29 11:58:54 +08001758 if when == -1:
1759 raise SayWhenError("Somebody said when")
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001760 for i in range(total):
1761 if i == when:
1762 raise SayWhenError("Somebody said when")
1763 yield i
1764
Antoine Pitrou89889452017-03-24 13:52:11 +01001765
Benjamin Petersone711caf2008-06-11 16:44:04 +00001766class _TestPool(BaseTestCase):
1767
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01001768 @classmethod
1769 def setUpClass(cls):
1770 super().setUpClass()
1771 cls.pool = cls.Pool(4)
1772
1773 @classmethod
1774 def tearDownClass(cls):
1775 cls.pool.terminate()
1776 cls.pool.join()
1777 cls.pool = None
1778 super().tearDownClass()
1779
Benjamin Petersone711caf2008-06-11 16:44:04 +00001780 def test_apply(self):
1781 papply = self.pool.apply
1782 self.assertEqual(papply(sqr, (5,)), sqr(5))
1783 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1784
1785 def test_map(self):
1786 pmap = self.pool.map
1787 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1788 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1789 list(map(sqr, list(range(100)))))
1790
Antoine Pitroude911b22011-12-21 11:03:24 +01001791 def test_starmap(self):
1792 psmap = self.pool.starmap
1793 tuples = list(zip(range(10), range(9,-1, -1)))
1794 self.assertEqual(psmap(mul, tuples),
1795 list(itertools.starmap(mul, tuples)))
1796 tuples = list(zip(range(100), range(99,-1, -1)))
1797 self.assertEqual(psmap(mul, tuples, chunksize=20),
1798 list(itertools.starmap(mul, tuples)))
1799
1800 def test_starmap_async(self):
1801 tuples = list(zip(range(100), range(99,-1, -1)))
1802 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1803 list(itertools.starmap(mul, tuples)))
1804
Hynek Schlawack254af262012-10-27 12:53:02 +02001805 def test_map_async(self):
1806 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1807 list(map(sqr, list(range(10)))))
1808
1809 def test_map_async_callbacks(self):
1810 call_args = self.manager.list() if self.TYPE == 'manager' else []
1811 self.pool.map_async(int, ['1'],
1812 callback=call_args.append,
1813 error_callback=call_args.append).wait()
1814 self.assertEqual(1, len(call_args))
1815 self.assertEqual([1], call_args[0])
1816 self.pool.map_async(int, ['a'],
1817 callback=call_args.append,
1818 error_callback=call_args.append).wait()
1819 self.assertEqual(2, len(call_args))
1820 self.assertIsInstance(call_args[1], ValueError)
1821
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001822 def test_map_unplicklable(self):
1823 # Issue #19425 -- failure to pickle should not cause a hang
1824 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001825 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001826 class A(object):
1827 def __reduce__(self):
1828 raise RuntimeError('cannot pickle')
1829 with self.assertRaises(RuntimeError):
1830 self.pool.map(sqr, [A()]*10)
1831
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001832 def test_map_chunksize(self):
1833 try:
1834 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1835 except multiprocessing.TimeoutError:
1836 self.fail("pool.map_async with chunksize stalled on null list")
1837
Xiang Zhang794623b2017-03-29 11:58:54 +08001838 def test_map_handle_iterable_exception(self):
1839 if self.TYPE == 'manager':
1840 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1841
1842 # SayWhenError seen at the very first of the iterable
1843 with self.assertRaises(SayWhenError):
1844 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
1845 # again, make sure it's reentrant
1846 with self.assertRaises(SayWhenError):
1847 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
1848
1849 with self.assertRaises(SayWhenError):
1850 self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
1851
1852 class SpecialIterable:
1853 def __iter__(self):
1854 return self
1855 def __next__(self):
1856 raise SayWhenError
1857 def __len__(self):
1858 return 1
1859 with self.assertRaises(SayWhenError):
1860 self.pool.map(sqr, SpecialIterable(), 1)
1861 with self.assertRaises(SayWhenError):
1862 self.pool.map(sqr, SpecialIterable(), 1)
1863
Benjamin Petersone711caf2008-06-11 16:44:04 +00001864 def test_async(self):
1865 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1866 get = TimingWrapper(res.get)
1867 self.assertEqual(get(), 49)
1868 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1869
1870 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00001871 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001872 get = TimingWrapper(res.get)
1873 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1874 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1875
1876 def test_imap(self):
1877 it = self.pool.imap(sqr, list(range(10)))
1878 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1879
1880 it = self.pool.imap(sqr, list(range(10)))
1881 for i in range(10):
1882 self.assertEqual(next(it), i*i)
1883 self.assertRaises(StopIteration, it.__next__)
1884
1885 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1886 for i in range(1000):
1887 self.assertEqual(next(it), i*i)
1888 self.assertRaises(StopIteration, it.__next__)
1889
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001890 def test_imap_handle_iterable_exception(self):
1891 if self.TYPE == 'manager':
1892 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1893
Xiang Zhang794623b2017-03-29 11:58:54 +08001894 # SayWhenError seen at the very first of the iterable
1895 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
1896 self.assertRaises(SayWhenError, it.__next__)
1897 # again, make sure it's reentrant
1898 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
1899 self.assertRaises(SayWhenError, it.__next__)
1900
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001901 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1902 for i in range(3):
1903 self.assertEqual(next(it), i*i)
1904 self.assertRaises(SayWhenError, it.__next__)
1905
1906 # SayWhenError seen at start of problematic chunk's results
1907 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1908 for i in range(6):
1909 self.assertEqual(next(it), i*i)
1910 self.assertRaises(SayWhenError, it.__next__)
1911 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1912 for i in range(4):
1913 self.assertEqual(next(it), i*i)
1914 self.assertRaises(SayWhenError, it.__next__)
1915
Benjamin Petersone711caf2008-06-11 16:44:04 +00001916 def test_imap_unordered(self):
1917 it = self.pool.imap_unordered(sqr, list(range(1000)))
1918 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1919
1920 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1921 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1922
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001923 def test_imap_unordered_handle_iterable_exception(self):
1924 if self.TYPE == 'manager':
1925 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1926
Xiang Zhang794623b2017-03-29 11:58:54 +08001927 # SayWhenError seen at the very first of the iterable
1928 it = self.pool.imap_unordered(sqr,
1929 exception_throwing_generator(1, -1),
1930 1)
1931 self.assertRaises(SayWhenError, it.__next__)
1932 # again, make sure it's reentrant
1933 it = self.pool.imap_unordered(sqr,
1934 exception_throwing_generator(1, -1),
1935 1)
1936 self.assertRaises(SayWhenError, it.__next__)
1937
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001938 it = self.pool.imap_unordered(sqr,
1939 exception_throwing_generator(10, 3),
1940 1)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03001941 expected_values = list(map(sqr, list(range(10))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001942 with self.assertRaises(SayWhenError):
1943 # imap_unordered makes it difficult to anticipate the SayWhenError
1944 for i in range(10):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03001945 value = next(it)
1946 self.assertIn(value, expected_values)
1947 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001948
1949 it = self.pool.imap_unordered(sqr,
1950 exception_throwing_generator(20, 7),
1951 2)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03001952 expected_values = list(map(sqr, list(range(20))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001953 with self.assertRaises(SayWhenError):
1954 for i in range(20):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03001955 value = next(it)
1956 self.assertIn(value, expected_values)
1957 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02001958
Benjamin Petersone711caf2008-06-11 16:44:04 +00001959 def test_make_pool(self):
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01001960 expected_error = (RemoteError if self.TYPE == 'manager'
1961 else ValueError)
Victor Stinner2fae27b2011-06-20 17:53:35 +02001962
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01001963 self.assertRaises(expected_error, self.Pool, -1)
1964 self.assertRaises(expected_error, self.Pool, 0)
1965
1966 if self.TYPE != 'manager':
1967 p = self.Pool(3)
1968 try:
1969 self.assertEqual(3, len(p._pool))
1970 finally:
1971 p.close()
1972 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001973
1974 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001975 result = self.pool.map_async(
1976 time.sleep, [0.1 for i in range(10000)], chunksize=1
1977 )
1978 self.pool.terminate()
1979 join = TimingWrapper(self.pool.join)
1980 join()
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01001981 # Sanity check the pool didn't wait for all tasks to finish
1982 self.assertLess(join.elapsed, 2.0)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001983
Richard Oudkerke41682b2012-06-06 19:04:57 +01001984 def test_empty_iterable(self):
1985 # See Issue 12157
1986 p = self.Pool(1)
1987
1988 self.assertEqual(p.map(sqr, []), [])
1989 self.assertEqual(list(p.imap(sqr, [])), [])
1990 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1991 self.assertEqual(p.map_async(sqr, []).get(), [])
1992
1993 p.close()
1994 p.join()
1995
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001996 def test_context(self):
1997 if self.TYPE == 'processes':
1998 L = list(range(10))
1999 expected = [sqr(i) for i in L]
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002000 with self.Pool(2) as p:
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002001 r = p.map_async(sqr, L)
2002 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04002003 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002004
Richard Oudkerk85757832013-05-06 11:38:25 +01002005 @classmethod
2006 def _test_traceback(cls):
2007 raise RuntimeError(123) # some comment
2008
2009 def test_traceback(self):
2010 # We want ensure that the traceback from the child process is
2011 # contained in the traceback raised in the main process.
2012 if self.TYPE == 'processes':
2013 with self.Pool(1) as p:
2014 try:
2015 p.apply(self._test_traceback)
2016 except Exception as e:
2017 exc = e
2018 else:
Xiang Zhang794623b2017-03-29 11:58:54 +08002019 self.fail('expected RuntimeError')
Richard Oudkerk85757832013-05-06 11:38:25 +01002020 self.assertIs(type(exc), RuntimeError)
2021 self.assertEqual(exc.args, (123,))
2022 cause = exc.__cause__
2023 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2024 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2025
2026 with test.support.captured_stderr() as f1:
2027 try:
2028 raise exc
2029 except RuntimeError:
2030 sys.excepthook(*sys.exc_info())
2031 self.assertIn('raise RuntimeError(123) # some comment',
2032 f1.getvalue())
Xiang Zhang794623b2017-03-29 11:58:54 +08002033 # _helper_reraises_exception should not make the error
2034 # a remote exception
2035 with self.Pool(1) as p:
2036 try:
2037 p.map(sqr, exception_throwing_generator(1, -1), 1)
2038 except Exception as e:
2039 exc = e
2040 else:
2041 self.fail('expected SayWhenError')
2042 self.assertIs(type(exc), SayWhenError)
2043 self.assertIs(exc.__cause__, None)
Richard Oudkerk85757832013-05-06 11:38:25 +01002044
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002045 @classmethod
2046 def _test_wrapped_exception(cls):
2047 raise RuntimeError('foo')
2048
2049 def test_wrapped_exception(self):
2050 # Issue #20980: Should not wrap exception when using thread pool
2051 with self.Pool(1) as p:
2052 with self.assertRaises(RuntimeError):
2053 p.apply(self._test_wrapped_exception)
2054
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002055 def test_map_no_failfast(self):
2056 # Issue #23992: the fail-fast behaviour when an exception is raised
2057 # during map() would make Pool.join() deadlock, because a worker
2058 # process would fill the result queue (after the result handler thread
2059 # terminated, hence not draining it anymore).
2060
2061 t_start = time.time()
2062
2063 with self.assertRaises(ValueError):
2064 with self.Pool(2) as p:
2065 try:
2066 p.map(raise_large_valuerror, [0, 1])
2067 finally:
2068 time.sleep(0.5)
2069 p.close()
2070 p.join()
2071
2072 # check that we indeed waited for all jobs
2073 self.assertGreater(time.time() - t_start, 0.9)
2074
Antoine Pitrou89889452017-03-24 13:52:11 +01002075 def test_release_task_refs(self):
2076 # Issue #29861: task arguments and results should not be kept
2077 # alive after we are done with them.
2078 objs = [CountedObject() for i in range(10)]
2079 refs = [weakref.ref(o) for o in objs]
2080 self.pool.map(identity, objs)
2081
2082 del objs
2083 self.assertEqual(set(wr() for wr in refs), {None})
2084 # With a process pool, copies of the objects are returned, check
2085 # they were released too.
2086 self.assertEqual(CountedObject.n_instances, 0)
2087
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002088
Ask Solem2afcbf22010-11-09 20:55:52 +00002089def raising():
2090 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00002091
Ask Solem2afcbf22010-11-09 20:55:52 +00002092def unpickleable_result():
2093 return lambda: 42
2094
2095class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00002096 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00002097
2098 def test_async_error_callback(self):
2099 p = multiprocessing.Pool(2)
2100
2101 scratchpad = [None]
2102 def errback(exc):
2103 scratchpad[0] = exc
2104
2105 res = p.apply_async(raising, error_callback=errback)
2106 self.assertRaises(KeyError, res.get)
2107 self.assertTrue(scratchpad[0])
2108 self.assertIsInstance(scratchpad[0], KeyError)
2109
2110 p.close()
2111 p.join()
2112
2113 def test_unpickleable_result(self):
2114 from multiprocessing.pool import MaybeEncodingError
2115 p = multiprocessing.Pool(2)
2116
2117 # Make sure we don't lose pool processes because of encoding errors.
2118 for iteration in range(20):
2119
2120 scratchpad = [None]
2121 def errback(exc):
2122 scratchpad[0] = exc
2123
2124 res = p.apply_async(unpickleable_result, error_callback=errback)
2125 self.assertRaises(MaybeEncodingError, res.get)
2126 wrapped = scratchpad[0]
2127 self.assertTrue(wrapped)
2128 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2129 self.assertIsNotNone(wrapped.exc)
2130 self.assertIsNotNone(wrapped.value)
2131
2132 p.close()
2133 p.join()
2134
2135class _TestPoolWorkerLifetime(BaseTestCase):
2136 ALLOWED_TYPES = ('processes', )
2137
Jesse Noller1f0b6582010-01-27 03:36:01 +00002138 def test_pool_worker_lifetime(self):
2139 p = multiprocessing.Pool(3, maxtasksperchild=10)
2140 self.assertEqual(3, len(p._pool))
2141 origworkerpids = [w.pid for w in p._pool]
2142 # Run many tasks so each worker gets replaced (hopefully)
2143 results = []
2144 for i in range(100):
2145 results.append(p.apply_async(sqr, (i, )))
2146 # Fetch the results and verify we got the right answers,
2147 # also ensuring all the tasks have completed.
2148 for (j, res) in enumerate(results):
2149 self.assertEqual(res.get(), sqr(j))
2150 # Refill the pool
2151 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00002152 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02002153 # (countdown * DELTA = 5 seconds max startup process time)
2154 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00002155 while countdown and not all(w.is_alive() for w in p._pool):
2156 countdown -= 1
2157 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002158 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00002159 # All pids should be assigned. See issue #7805.
2160 self.assertNotIn(None, origworkerpids)
2161 self.assertNotIn(None, finalworkerpids)
2162 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00002163 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2164 p.close()
2165 p.join()
2166
Charles-François Natalif8859e12011-10-24 18:45:29 +02002167 def test_pool_worker_lifetime_early_close(self):
2168 # Issue #10332: closing a pool whose workers have limited lifetimes
2169 # before all the tasks completed would make join() hang.
2170 p = multiprocessing.Pool(3, maxtasksperchild=1)
2171 results = []
2172 for i in range(6):
2173 results.append(p.apply_async(sqr, (i, 0.3)))
2174 p.close()
2175 p.join()
2176 # check the results
2177 for (j, res) in enumerate(results):
2178 self.assertEqual(res.get(), sqr(j))
2179
Benjamin Petersone711caf2008-06-11 16:44:04 +00002180#
2181# Test of creating a customized manager class
2182#
2183
2184from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2185
2186class FooBar(object):
2187 def f(self):
2188 return 'f()'
2189 def g(self):
2190 raise ValueError
2191 def _h(self):
2192 return '_h()'
2193
2194def baz():
2195 for i in range(10):
2196 yield i*i
2197
2198class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00002199 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002200 def __iter__(self):
2201 return self
2202 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002203 return self._callmethod('__next__')
2204
2205class MyManager(BaseManager):
2206 pass
2207
2208MyManager.register('Foo', callable=FooBar)
2209MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2210MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2211
2212
2213class _TestMyManager(BaseTestCase):
2214
2215 ALLOWED_TYPES = ('manager',)
2216
2217 def test_mymanager(self):
2218 manager = MyManager()
2219 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01002220 self.common(manager)
2221 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002222
Richard Oudkerkac385712012-06-18 21:29:30 +01002223 # If the manager process exited cleanly then the exitcode
2224 # will be zero. Otherwise (after a short timeout)
2225 # terminate() is used, resulting in an exitcode of -SIGTERM.
2226 self.assertEqual(manager._process.exitcode, 0)
2227
2228 def test_mymanager_context(self):
2229 with MyManager() as manager:
2230 self.common(manager)
2231 self.assertEqual(manager._process.exitcode, 0)
2232
2233 def test_mymanager_context_prestarted(self):
2234 manager = MyManager()
2235 manager.start()
2236 with manager:
2237 self.common(manager)
2238 self.assertEqual(manager._process.exitcode, 0)
2239
2240 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002241 foo = manager.Foo()
2242 bar = manager.Bar()
2243 baz = manager.baz()
2244
2245 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2246 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2247
2248 self.assertEqual(foo_methods, ['f', 'g'])
2249 self.assertEqual(bar_methods, ['f', '_h'])
2250
2251 self.assertEqual(foo.f(), 'f()')
2252 self.assertRaises(ValueError, foo.g)
2253 self.assertEqual(foo._callmethod('f'), 'f()')
2254 self.assertRaises(RemoteError, foo._callmethod, '_h')
2255
2256 self.assertEqual(bar.f(), 'f()')
2257 self.assertEqual(bar._h(), '_h()')
2258 self.assertEqual(bar._callmethod('f'), 'f()')
2259 self.assertEqual(bar._callmethod('_h'), '_h()')
2260
2261 self.assertEqual(list(baz), [i*i for i in range(10)])
2262
Richard Oudkerk73d9a292012-06-14 15:30:10 +01002263
Benjamin Petersone711caf2008-06-11 16:44:04 +00002264#
2265# Test of connecting to a remote server and using xmlrpclib for serialization
2266#
2267
2268_queue = pyqueue.Queue()
2269def get_queue():
2270 return _queue
2271
2272class QueueManager(BaseManager):
2273 '''manager class used by server process'''
2274QueueManager.register('get_queue', callable=get_queue)
2275
2276class QueueManager2(BaseManager):
2277 '''manager class which specifies the same interface as QueueManager'''
2278QueueManager2.register('get_queue')
2279
2280
2281SERIALIZER = 'xmlrpclib'
2282
2283class _TestRemoteManager(BaseTestCase):
2284
2285 ALLOWED_TYPES = ('manager',)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002286 values = ['hello world', None, True, 2.25,
2287 'hall\xe5 v\xe4rlden',
2288 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2289 b'hall\xe5 v\xe4rlden',
2290 ]
2291 result = values[:]
Benjamin Petersone711caf2008-06-11 16:44:04 +00002292
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002293 @classmethod
2294 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002295 manager = QueueManager2(
2296 address=address, authkey=authkey, serializer=SERIALIZER
2297 )
2298 manager.connect()
2299 queue = manager.get_queue()
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002300 # Note that xmlrpclib will deserialize object as a list not a tuple
2301 queue.put(tuple(cls.values))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002302
2303 def test_remote(self):
2304 authkey = os.urandom(32)
2305
2306 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002307 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00002308 )
2309 manager.start()
2310
2311 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002312 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002313 p.start()
2314
2315 manager2 = QueueManager2(
2316 address=manager.address, authkey=authkey, serializer=SERIALIZER
2317 )
2318 manager2.connect()
2319 queue = manager2.get_queue()
2320
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002321 self.assertEqual(queue.get(), self.result)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002322
2323 # Because we are using xmlrpclib for serialization instead of
2324 # pickle this will cause a serialization error.
2325 self.assertRaises(Exception, queue.put, time.sleep)
2326
2327 # Make queue finalizer run before the server is stopped
2328 del queue
2329 manager.shutdown()
2330
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002331class _TestManagerRestart(BaseTestCase):
2332
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002333 @classmethod
2334 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002335 manager = QueueManager(
2336 address=address, authkey=authkey, serializer=SERIALIZER)
2337 manager.connect()
2338 queue = manager.get_queue()
2339 queue.put('hello world')
2340
2341 def test_rapid_restart(self):
2342 authkey = os.urandom(32)
2343 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002344 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002345 srvr = manager.get_server()
2346 addr = srvr.address
2347 # Close the connection.Listener socket which gets opened as a part
2348 # of manager.get_server(). It's not needed for the test.
2349 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002350 manager.start()
2351
2352 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002353 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002354 p.start()
2355 queue = manager.get_queue()
2356 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002357 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002358 manager.shutdown()
2359 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002360 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002361 try:
2362 manager.start()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002363 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002364 if e.errno != errno.EADDRINUSE:
2365 raise
2366 # Retry after some time, in case the old socket was lingering
2367 # (sporadic failure on buildbots)
2368 time.sleep(1.0)
2369 manager = QueueManager(
2370 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002371 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002372
Benjamin Petersone711caf2008-06-11 16:44:04 +00002373#
2374#
2375#
2376
2377SENTINEL = latin('')
2378
2379class _TestConnection(BaseTestCase):
2380
2381 ALLOWED_TYPES = ('processes', 'threads')
2382
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002383 @classmethod
2384 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002385 for msg in iter(conn.recv_bytes, SENTINEL):
2386 conn.send_bytes(msg)
2387 conn.close()
2388
2389 def test_connection(self):
2390 conn, child_conn = self.Pipe()
2391
2392 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002393 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002394 p.start()
2395
2396 seq = [1, 2.25, None]
2397 msg = latin('hello world')
2398 longmsg = msg * 10
2399 arr = array.array('i', list(range(4)))
2400
2401 if self.TYPE == 'processes':
2402 self.assertEqual(type(conn.fileno()), int)
2403
2404 self.assertEqual(conn.send(seq), None)
2405 self.assertEqual(conn.recv(), seq)
2406
2407 self.assertEqual(conn.send_bytes(msg), None)
2408 self.assertEqual(conn.recv_bytes(), msg)
2409
2410 if self.TYPE == 'processes':
2411 buffer = array.array('i', [0]*10)
2412 expected = list(arr) + [0] * (10 - len(arr))
2413 self.assertEqual(conn.send_bytes(arr), None)
2414 self.assertEqual(conn.recv_bytes_into(buffer),
2415 len(arr) * buffer.itemsize)
2416 self.assertEqual(list(buffer), expected)
2417
2418 buffer = array.array('i', [0]*10)
2419 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2420 self.assertEqual(conn.send_bytes(arr), None)
2421 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2422 len(arr) * buffer.itemsize)
2423 self.assertEqual(list(buffer), expected)
2424
2425 buffer = bytearray(latin(' ' * 40))
2426 self.assertEqual(conn.send_bytes(longmsg), None)
2427 try:
2428 res = conn.recv_bytes_into(buffer)
2429 except multiprocessing.BufferTooShort as e:
2430 self.assertEqual(e.args, (longmsg,))
2431 else:
2432 self.fail('expected BufferTooShort, got %s' % res)
2433
2434 poll = TimingWrapper(conn.poll)
2435
2436 self.assertEqual(poll(), False)
2437 self.assertTimingAlmostEqual(poll.elapsed, 0)
2438
Richard Oudkerk59d54042012-05-10 16:11:12 +01002439 self.assertEqual(poll(-1), False)
2440 self.assertTimingAlmostEqual(poll.elapsed, 0)
2441
Benjamin Petersone711caf2008-06-11 16:44:04 +00002442 self.assertEqual(poll(TIMEOUT1), False)
2443 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2444
2445 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002446 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002447
2448 self.assertEqual(poll(TIMEOUT1), True)
2449 self.assertTimingAlmostEqual(poll.elapsed, 0)
2450
2451 self.assertEqual(conn.recv(), None)
2452
2453 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2454 conn.send_bytes(really_big_msg)
2455 self.assertEqual(conn.recv_bytes(), really_big_msg)
2456
2457 conn.send_bytes(SENTINEL) # tell child to quit
2458 child_conn.close()
2459
2460 if self.TYPE == 'processes':
2461 self.assertEqual(conn.readable, True)
2462 self.assertEqual(conn.writable, True)
2463 self.assertRaises(EOFError, conn.recv)
2464 self.assertRaises(EOFError, conn.recv_bytes)
2465
2466 p.join()
2467
2468 def test_duplex_false(self):
2469 reader, writer = self.Pipe(duplex=False)
2470 self.assertEqual(writer.send(1), None)
2471 self.assertEqual(reader.recv(), 1)
2472 if self.TYPE == 'processes':
2473 self.assertEqual(reader.readable, True)
2474 self.assertEqual(reader.writable, False)
2475 self.assertEqual(writer.readable, False)
2476 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002477 self.assertRaises(OSError, reader.send, 2)
2478 self.assertRaises(OSError, writer.recv)
2479 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002480
2481 def test_spawn_close(self):
2482 # We test that a pipe connection can be closed by parent
2483 # process immediately after child is spawned. On Windows this
2484 # would have sometimes failed on old versions because
2485 # child_conn would be closed before the child got a chance to
2486 # duplicate it.
2487 conn, child_conn = self.Pipe()
2488
2489 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002490 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002491 p.start()
2492 child_conn.close() # this might complete before child initializes
2493
2494 msg = latin('hello')
2495 conn.send_bytes(msg)
2496 self.assertEqual(conn.recv_bytes(), msg)
2497
2498 conn.send_bytes(SENTINEL)
2499 conn.close()
2500 p.join()
2501
2502 def test_sendbytes(self):
2503 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002504 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002505
2506 msg = latin('abcdefghijklmnopqrstuvwxyz')
2507 a, b = self.Pipe()
2508
2509 a.send_bytes(msg)
2510 self.assertEqual(b.recv_bytes(), msg)
2511
2512 a.send_bytes(msg, 5)
2513 self.assertEqual(b.recv_bytes(), msg[5:])
2514
2515 a.send_bytes(msg, 7, 8)
2516 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2517
2518 a.send_bytes(msg, 26)
2519 self.assertEqual(b.recv_bytes(), latin(''))
2520
2521 a.send_bytes(msg, 26, 0)
2522 self.assertEqual(b.recv_bytes(), latin(''))
2523
2524 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2525
2526 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2527
2528 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2529
2530 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2531
2532 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2533
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002534 @classmethod
2535 def _is_fd_assigned(cls, fd):
2536 try:
2537 os.fstat(fd)
2538 except OSError as e:
2539 if e.errno == errno.EBADF:
2540 return False
2541 raise
2542 else:
2543 return True
2544
2545 @classmethod
2546 def _writefd(cls, conn, data, create_dummy_fds=False):
2547 if create_dummy_fds:
2548 for i in range(0, 256):
2549 if not cls._is_fd_assigned(i):
2550 os.dup2(conn.fileno(), i)
2551 fd = reduction.recv_handle(conn)
2552 if msvcrt:
2553 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2554 os.write(fd, data)
2555 os.close(fd)
2556
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002557 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002558 def test_fd_transfer(self):
2559 if self.TYPE != 'processes':
2560 self.skipTest("only makes sense with processes")
2561 conn, child_conn = self.Pipe(duplex=True)
2562
2563 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002564 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002565 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002566 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002567 with open(test.support.TESTFN, "wb") as f:
2568 fd = f.fileno()
2569 if msvcrt:
2570 fd = msvcrt.get_osfhandle(fd)
2571 reduction.send_handle(conn, fd, p.pid)
2572 p.join()
2573 with open(test.support.TESTFN, "rb") as f:
2574 self.assertEqual(f.read(), b"foo")
2575
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002576 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002577 @unittest.skipIf(sys.platform == "win32",
2578 "test semantics don't make sense on Windows")
2579 @unittest.skipIf(MAXFD <= 256,
2580 "largest assignable fd number is too small")
2581 @unittest.skipUnless(hasattr(os, "dup2"),
2582 "test needs os.dup2()")
2583 def test_large_fd_transfer(self):
2584 # With fd > 256 (issue #11657)
2585 if self.TYPE != 'processes':
2586 self.skipTest("only makes sense with processes")
2587 conn, child_conn = self.Pipe(duplex=True)
2588
2589 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002590 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002591 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002592 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002593 with open(test.support.TESTFN, "wb") as f:
2594 fd = f.fileno()
2595 for newfd in range(256, MAXFD):
2596 if not self._is_fd_assigned(newfd):
2597 break
2598 else:
2599 self.fail("could not find an unassigned large file descriptor")
2600 os.dup2(fd, newfd)
2601 try:
2602 reduction.send_handle(conn, newfd, p.pid)
2603 finally:
2604 os.close(newfd)
2605 p.join()
2606 with open(test.support.TESTFN, "rb") as f:
2607 self.assertEqual(f.read(), b"bar")
2608
Jesus Cea4507e642011-09-21 03:53:25 +02002609 @classmethod
2610 def _send_data_without_fd(self, conn):
2611 os.write(conn.fileno(), b"\0")
2612
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002613 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002614 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2615 def test_missing_fd_transfer(self):
2616 # Check that exception is raised when received data is not
2617 # accompanied by a file descriptor in ancillary data.
2618 if self.TYPE != 'processes':
2619 self.skipTest("only makes sense with processes")
2620 conn, child_conn = self.Pipe(duplex=True)
2621
2622 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2623 p.daemon = True
2624 p.start()
2625 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2626 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002627
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002628 def test_context(self):
2629 a, b = self.Pipe()
2630
2631 with a, b:
2632 a.send(1729)
2633 self.assertEqual(b.recv(), 1729)
2634 if self.TYPE == 'processes':
2635 self.assertFalse(a.closed)
2636 self.assertFalse(b.closed)
2637
2638 if self.TYPE == 'processes':
2639 self.assertTrue(a.closed)
2640 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002641 self.assertRaises(OSError, a.recv)
2642 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002643
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002644class _TestListener(BaseTestCase):
2645
Richard Oudkerk91257752012-06-15 21:53:34 +01002646 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002647
2648 def test_multiple_bind(self):
2649 for family in self.connection.families:
2650 l = self.connection.Listener(family=family)
2651 self.addCleanup(l.close)
2652 self.assertRaises(OSError, self.connection.Listener,
2653 l.address, family)
2654
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002655 def test_context(self):
2656 with self.connection.Listener() as l:
2657 with self.connection.Client(l.address) as c:
2658 with l.accept() as d:
2659 c.send(1729)
2660 self.assertEqual(d.recv(), 1729)
2661
2662 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002663 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002664
Benjamin Petersone711caf2008-06-11 16:44:04 +00002665class _TestListenerClient(BaseTestCase):
2666
2667 ALLOWED_TYPES = ('processes', 'threads')
2668
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002669 @classmethod
2670 def _test(cls, address):
2671 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002672 conn.send('hello')
2673 conn.close()
2674
2675 def test_listener_client(self):
2676 for family in self.connection.families:
2677 l = self.connection.Listener(family=family)
2678 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002679 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002680 p.start()
2681 conn = l.accept()
2682 self.assertEqual(conn.recv(), 'hello')
2683 p.join()
2684 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002685
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002686 def test_issue14725(self):
2687 l = self.connection.Listener()
2688 p = self.Process(target=self._test, args=(l.address,))
2689 p.daemon = True
2690 p.start()
2691 time.sleep(1)
2692 # On Windows the client process should by now have connected,
2693 # written data and closed the pipe handle by now. This causes
2694 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2695 # 14725.
2696 conn = l.accept()
2697 self.assertEqual(conn.recv(), 'hello')
2698 conn.close()
2699 p.join()
2700 l.close()
2701
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002702 def test_issue16955(self):
2703 for fam in self.connection.families:
2704 l = self.connection.Listener(family=fam)
2705 c = self.connection.Client(l.address)
2706 a = l.accept()
2707 a.send_bytes(b"hello")
2708 self.assertTrue(c.poll(1))
2709 a.close()
2710 c.close()
2711 l.close()
2712
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002713class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002714
2715 ALLOWED_TYPES = ('processes', 'threads')
2716
2717 def test_empty_string(self):
2718 a, b = self.Pipe()
2719 self.assertEqual(a.poll(), False)
2720 b.send_bytes(b'')
2721 self.assertEqual(a.poll(), True)
2722 self.assertEqual(a.poll(), True)
2723
2724 @classmethod
2725 def _child_strings(cls, conn, strings):
2726 for s in strings:
2727 time.sleep(0.1)
2728 conn.send_bytes(s)
2729 conn.close()
2730
2731 def test_strings(self):
2732 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2733 a, b = self.Pipe()
2734 p = self.Process(target=self._child_strings, args=(b, strings))
2735 p.start()
2736
2737 for s in strings:
2738 for i in range(200):
2739 if a.poll(0.01):
2740 break
2741 x = a.recv_bytes()
2742 self.assertEqual(s, x)
2743
2744 p.join()
2745
2746 @classmethod
2747 def _child_boundaries(cls, r):
2748 # Polling may "pull" a message in to the child process, but we
2749 # don't want it to pull only part of a message, as that would
2750 # corrupt the pipe for any other processes which might later
2751 # read from it.
2752 r.poll(5)
2753
2754 def test_boundaries(self):
2755 r, w = self.Pipe(False)
2756 p = self.Process(target=self._child_boundaries, args=(r,))
2757 p.start()
2758 time.sleep(2)
2759 L = [b"first", b"second"]
2760 for obj in L:
2761 w.send_bytes(obj)
2762 w.close()
2763 p.join()
2764 self.assertIn(r.recv_bytes(), L)
2765
2766 @classmethod
2767 def _child_dont_merge(cls, b):
2768 b.send_bytes(b'a')
2769 b.send_bytes(b'b')
2770 b.send_bytes(b'cd')
2771
2772 def test_dont_merge(self):
2773 a, b = self.Pipe()
2774 self.assertEqual(a.poll(0.0), False)
2775 self.assertEqual(a.poll(0.1), False)
2776
2777 p = self.Process(target=self._child_dont_merge, args=(b,))
2778 p.start()
2779
2780 self.assertEqual(a.recv_bytes(), b'a')
2781 self.assertEqual(a.poll(1.0), True)
2782 self.assertEqual(a.poll(1.0), True)
2783 self.assertEqual(a.recv_bytes(), b'b')
2784 self.assertEqual(a.poll(1.0), True)
2785 self.assertEqual(a.poll(1.0), True)
2786 self.assertEqual(a.poll(0.0), True)
2787 self.assertEqual(a.recv_bytes(), b'cd')
2788
2789 p.join()
2790
Benjamin Petersone711caf2008-06-11 16:44:04 +00002791#
2792# Test of sending connection and socket objects between processes
2793#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002794
2795@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002796class _TestPicklingConnections(BaseTestCase):
2797
2798 ALLOWED_TYPES = ('processes',)
2799
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002800 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002801 def tearDownClass(cls):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01002802 from multiprocessing import resource_sharer
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002803 resource_sharer.stop(timeout=5)
2804
2805 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002806 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002807 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002808 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002809 conn.send(l.address)
2810 new_conn = l.accept()
2811 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002812 new_conn.close()
2813 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002814
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002815 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002816 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01002817 l.listen()
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002818 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002819 new_conn, addr = l.accept()
2820 conn.send(new_conn)
2821 new_conn.close()
2822 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002823
2824 conn.recv()
2825
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002826 @classmethod
2827 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002828 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002829 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002830 client.send(msg.upper())
2831 client.close()
2832
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002833 address, msg = conn.recv()
2834 client = socket.socket()
2835 client.connect(address)
2836 client.sendall(msg.upper())
2837 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002838
2839 conn.close()
2840
2841 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002842 families = self.connection.families
2843
2844 lconn, lconn0 = self.Pipe()
2845 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002846 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002847 lp.start()
2848 lconn0.close()
2849
2850 rconn, rconn0 = self.Pipe()
2851 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002852 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002853 rp.start()
2854 rconn0.close()
2855
2856 for fam in families:
2857 msg = ('This connection uses family %s' % fam).encode('ascii')
2858 address = lconn.recv()
2859 rconn.send((address, msg))
2860 new_conn = lconn.recv()
2861 self.assertEqual(new_conn.recv(), msg.upper())
2862
2863 rconn.send(None)
2864
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002865 msg = latin('This connection uses a normal socket')
2866 address = lconn.recv()
2867 rconn.send((address, msg))
2868 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002869 buf = []
2870 while True:
2871 s = new_conn.recv(100)
2872 if not s:
2873 break
2874 buf.append(s)
2875 buf = b''.join(buf)
2876 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002877 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002878
2879 lconn.send(None)
2880
2881 rconn.close()
2882 lconn.close()
2883
2884 lp.join()
2885 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002886
2887 @classmethod
2888 def child_access(cls, conn):
2889 w = conn.recv()
2890 w.send('all is well')
2891 w.close()
2892
2893 r = conn.recv()
2894 msg = r.recv()
2895 conn.send(msg*2)
2896
2897 conn.close()
2898
2899 def test_access(self):
2900 # On Windows, if we do not specify a destination pid when
2901 # using DupHandle then we need to be careful to use the
2902 # correct access flags for DuplicateHandle(), or else
2903 # DupHandle.detach() will raise PermissionError. For example,
2904 # for a read only pipe handle we should use
2905 # access=FILE_GENERIC_READ. (Unfortunately
2906 # DUPLICATE_SAME_ACCESS does not work.)
2907 conn, child_conn = self.Pipe()
2908 p = self.Process(target=self.child_access, args=(child_conn,))
2909 p.daemon = True
2910 p.start()
2911 child_conn.close()
2912
2913 r, w = self.Pipe(duplex=False)
2914 conn.send(w)
2915 w.close()
2916 self.assertEqual(r.recv(), 'all is well')
2917 r.close()
2918
2919 r, w = self.Pipe(duplex=False)
2920 conn.send(r)
2921 r.close()
2922 w.send('foobar')
2923 w.close()
2924 self.assertEqual(conn.recv(), 'foobar'*2)
2925
Benjamin Petersone711caf2008-06-11 16:44:04 +00002926#
2927#
2928#
2929
2930class _TestHeap(BaseTestCase):
2931
2932 ALLOWED_TYPES = ('processes',)
2933
2934 def test_heap(self):
2935 iterations = 5000
2936 maxblocks = 50
2937 blocks = []
2938
2939 # create and destroy lots of blocks of different sizes
2940 for i in range(iterations):
2941 size = int(random.lognormvariate(0, 1) * 1000)
2942 b = multiprocessing.heap.BufferWrapper(size)
2943 blocks.append(b)
2944 if len(blocks) > maxblocks:
2945 i = random.randrange(maxblocks)
2946 del blocks[i]
2947
2948 # get the heap object
2949 heap = multiprocessing.heap.BufferWrapper._heap
2950
2951 # verify the state of the heap
2952 all = []
2953 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002954 heap._lock.acquire()
2955 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002956 for L in list(heap._len_to_seq.values()):
2957 for arena, start, stop in L:
2958 all.append((heap._arenas.index(arena), start, stop,
2959 stop-start, 'free'))
2960 for arena, start, stop in heap._allocated_blocks:
2961 all.append((heap._arenas.index(arena), start, stop,
2962 stop-start, 'occupied'))
2963 occupied += (stop-start)
2964
2965 all.sort()
2966
2967 for i in range(len(all)-1):
2968 (arena, start, stop) = all[i][:3]
2969 (narena, nstart, nstop) = all[i+1][:3]
2970 self.assertTrue((arena != narena and nstart == 0) or
2971 (stop == nstart))
2972
Charles-François Natali778db492011-07-02 14:35:49 +02002973 def test_free_from_gc(self):
2974 # Check that freeing of blocks by the garbage collector doesn't deadlock
2975 # (issue #12352).
2976 # Make sure the GC is enabled, and set lower collection thresholds to
2977 # make collections more frequent (and increase the probability of
2978 # deadlock).
2979 if not gc.isenabled():
2980 gc.enable()
2981 self.addCleanup(gc.disable)
2982 thresholds = gc.get_threshold()
2983 self.addCleanup(gc.set_threshold, *thresholds)
2984 gc.set_threshold(10)
2985
2986 # perform numerous block allocations, with cyclic references to make
2987 # sure objects are collected asynchronously by the gc
2988 for i in range(5000):
2989 a = multiprocessing.heap.BufferWrapper(1)
2990 b = multiprocessing.heap.BufferWrapper(1)
2991 # circular references
2992 a.buddy = b
2993 b.buddy = a
2994
Benjamin Petersone711caf2008-06-11 16:44:04 +00002995#
2996#
2997#
2998
Benjamin Petersone711caf2008-06-11 16:44:04 +00002999class _Foo(Structure):
3000 _fields_ = [
3001 ('x', c_int),
3002 ('y', c_double)
3003 ]
3004
3005class _TestSharedCTypes(BaseTestCase):
3006
3007 ALLOWED_TYPES = ('processes',)
3008
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00003009 def setUp(self):
3010 if not HAS_SHAREDCTYPES:
3011 self.skipTest("requires multiprocessing.sharedctypes")
3012
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003013 @classmethod
3014 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003015 x.value *= 2
3016 y.value *= 2
3017 foo.x *= 2
3018 foo.y *= 2
3019 string.value *= 2
3020 for i in range(len(arr)):
3021 arr[i] *= 2
3022
3023 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003024 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003025 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003026 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003027 arr = self.Array('d', list(range(10)), lock=lock)
3028 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00003029 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00003030
3031 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02003032 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003033 p.start()
3034 p.join()
3035
3036 self.assertEqual(x.value, 14)
3037 self.assertAlmostEqual(y.value, 2.0/3.0)
3038 self.assertEqual(foo.x, 6)
3039 self.assertAlmostEqual(foo.y, 4.0)
3040 for i in range(10):
3041 self.assertAlmostEqual(arr[i], i*2)
3042 self.assertEqual(string.value, latin('hellohello'))
3043
3044 def test_synchronize(self):
3045 self.test_sharedctypes(lock=True)
3046
3047 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003048 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00003049 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003050 foo.x = 0
3051 foo.y = 0
3052 self.assertEqual(bar.x, 2)
3053 self.assertAlmostEqual(bar.y, 5.0)
3054
3055#
3056#
3057#
3058
3059class _TestFinalize(BaseTestCase):
3060
3061 ALLOWED_TYPES = ('processes',)
3062
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003063 @classmethod
3064 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003065 class Foo(object):
3066 pass
3067
3068 a = Foo()
3069 util.Finalize(a, conn.send, args=('a',))
3070 del a # triggers callback for a
3071
3072 b = Foo()
3073 close_b = util.Finalize(b, conn.send, args=('b',))
3074 close_b() # triggers callback for b
3075 close_b() # does nothing because callback has already been called
3076 del b # does nothing because callback has already been called
3077
3078 c = Foo()
3079 util.Finalize(c, conn.send, args=('c',))
3080
3081 d10 = Foo()
3082 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
3083
3084 d01 = Foo()
3085 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
3086 d02 = Foo()
3087 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
3088 d03 = Foo()
3089 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
3090
3091 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
3092
3093 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
3094
Ezio Melotti13925002011-03-16 11:05:33 +02003095 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00003096 # garbage collecting locals
3097 util._exit_function()
3098 conn.close()
3099 os._exit(0)
3100
3101 def test_finalize(self):
3102 conn, child_conn = self.Pipe()
3103
3104 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003105 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003106 p.start()
3107 p.join()
3108
3109 result = [obj for obj in iter(conn.recv, 'STOP')]
3110 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
3111
3112#
3113# Test that from ... import * works for each module
3114#
3115
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003116class _TestImportStar(unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003117
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003118 def get_module_names(self):
3119 import glob
3120 folder = os.path.dirname(multiprocessing.__file__)
3121 pattern = os.path.join(folder, '*.py')
3122 files = glob.glob(pattern)
3123 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
3124 modules = ['multiprocessing.' + m for m in modules]
3125 modules.remove('multiprocessing.__init__')
3126 modules.append('multiprocessing')
3127 return modules
Benjamin Petersone711caf2008-06-11 16:44:04 +00003128
3129 def test_import(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003130 modules = self.get_module_names()
3131 if sys.platform == 'win32':
3132 modules.remove('multiprocessing.popen_fork')
3133 modules.remove('multiprocessing.popen_forkserver')
3134 modules.remove('multiprocessing.popen_spawn_posix')
3135 else:
3136 modules.remove('multiprocessing.popen_spawn_win32')
3137 if not HAS_REDUCTION:
3138 modules.remove('multiprocessing.popen_forkserver')
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003139
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003140 if c_int is None:
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003141 # This module requires _ctypes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003142 modules.remove('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00003143
3144 for name in modules:
3145 __import__(name)
3146 mod = sys.modules[name]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003147 self.assertTrue(hasattr(mod, '__all__'), name)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003148
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003149 for attr in mod.__all__:
Benjamin Petersone711caf2008-06-11 16:44:04 +00003150 self.assertTrue(
3151 hasattr(mod, attr),
3152 '%r does not have attribute %r' % (mod, attr)
3153 )
3154
3155#
3156# Quick test that logging works -- does not test logging output
3157#
3158
3159class _TestLogging(BaseTestCase):
3160
3161 ALLOWED_TYPES = ('processes',)
3162
3163 def test_enable_logging(self):
3164 logger = multiprocessing.get_logger()
3165 logger.setLevel(util.SUBWARNING)
3166 self.assertTrue(logger is not None)
3167 logger.debug('this will not be printed')
3168 logger.info('nor will this')
3169 logger.setLevel(LOG_LEVEL)
3170
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003171 @classmethod
3172 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003173 logger = multiprocessing.get_logger()
3174 conn.send(logger.getEffectiveLevel())
3175
3176 def test_level(self):
3177 LEVEL1 = 32
3178 LEVEL2 = 37
3179
3180 logger = multiprocessing.get_logger()
3181 root_logger = logging.getLogger()
3182 root_level = root_logger.level
3183
3184 reader, writer = multiprocessing.Pipe(duplex=False)
3185
3186 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02003187 p = self.Process(target=self._test_level, args=(writer,))
3188 p.daemon = True
3189 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003190 self.assertEqual(LEVEL1, reader.recv())
3191
3192 logger.setLevel(logging.NOTSET)
3193 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02003194 p = self.Process(target=self._test_level, args=(writer,))
3195 p.daemon = True
3196 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003197 self.assertEqual(LEVEL2, reader.recv())
3198
3199 root_logger.setLevel(root_level)
3200 logger.setLevel(level=LOG_LEVEL)
3201
Jesse Nollerb9a49b72009-11-21 18:09:38 +00003202
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00003203# class _TestLoggingProcessName(BaseTestCase):
3204#
3205# def handle(self, record):
3206# assert record.processName == multiprocessing.current_process().name
3207# self.__handled = True
3208#
3209# def test_logging(self):
3210# handler = logging.Handler()
3211# handler.handle = self.handle
3212# self.__handled = False
3213# # Bypass getLogger() and side-effects
3214# logger = logging.getLoggerClass()(
3215# 'multiprocessing.test.TestLoggingProcessName')
3216# logger.addHandler(handler)
3217# logger.propagate = False
3218#
3219# logger.warn('foo')
3220# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00003221
Benjamin Petersone711caf2008-06-11 16:44:04 +00003222#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003223# Check that Process.join() retries if os.waitpid() fails with EINTR
3224#
3225
3226class _TestPollEintr(BaseTestCase):
3227
3228 ALLOWED_TYPES = ('processes',)
3229
3230 @classmethod
3231 def _killer(cls, pid):
Richard Oudkerk6a53af82013-08-28 13:50:19 +01003232 time.sleep(0.1)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003233 os.kill(pid, signal.SIGUSR1)
3234
3235 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3236 def test_poll_eintr(self):
3237 got_signal = [False]
3238 def record(*args):
3239 got_signal[0] = True
3240 pid = os.getpid()
3241 oldhandler = signal.signal(signal.SIGUSR1, record)
3242 try:
3243 killer = self.Process(target=self._killer, args=(pid,))
3244 killer.start()
Richard Oudkerk6a53af82013-08-28 13:50:19 +01003245 try:
3246 p = self.Process(target=time.sleep, args=(2,))
3247 p.start()
3248 p.join()
3249 finally:
3250 killer.join()
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003251 self.assertTrue(got_signal[0])
3252 self.assertEqual(p.exitcode, 0)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003253 finally:
3254 signal.signal(signal.SIGUSR1, oldhandler)
3255
3256#
Jesse Noller6214edd2009-01-19 16:23:53 +00003257# Test to verify handle verification, see issue 3321
3258#
3259
3260class TestInvalidHandle(unittest.TestCase):
3261
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003262 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00003263 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003264 conn = multiprocessing.connection.Connection(44977608)
Charles-François Natali6703bb42013-09-06 21:12:22 +02003265 # check that poll() doesn't crash
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003266 try:
Charles-François Natali6703bb42013-09-06 21:12:22 +02003267 conn.poll()
3268 except (ValueError, OSError):
3269 pass
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003270 finally:
3271 # Hack private attribute _handle to avoid printing an error
3272 # in conn.__del__
3273 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003274 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003275 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003276
Benjamin Petersone711caf2008-06-11 16:44:04 +00003277
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003278
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003279class OtherTest(unittest.TestCase):
3280 # TODO: add more tests for deliver/answer challenge.
3281 def test_deliver_challenge_auth_failure(self):
3282 class _FakeConnection(object):
3283 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003284 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003285 def send_bytes(self, data):
3286 pass
3287 self.assertRaises(multiprocessing.AuthenticationError,
3288 multiprocessing.connection.deliver_challenge,
3289 _FakeConnection(), b'abc')
3290
3291 def test_answer_challenge_auth_failure(self):
3292 class _FakeConnection(object):
3293 def __init__(self):
3294 self.count = 0
3295 def recv_bytes(self, size):
3296 self.count += 1
3297 if self.count == 1:
3298 return multiprocessing.connection.CHALLENGE
3299 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003300 return b'something bogus'
3301 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003302 def send_bytes(self, data):
3303 pass
3304 self.assertRaises(multiprocessing.AuthenticationError,
3305 multiprocessing.connection.answer_challenge,
3306 _FakeConnection(), b'abc')
3307
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003308#
3309# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3310#
3311
3312def initializer(ns):
3313 ns.test += 1
3314
3315class TestInitializers(unittest.TestCase):
3316 def setUp(self):
3317 self.mgr = multiprocessing.Manager()
3318 self.ns = self.mgr.Namespace()
3319 self.ns.test = 0
3320
3321 def tearDown(self):
3322 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003323 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003324
3325 def test_manager_initializer(self):
3326 m = multiprocessing.managers.SyncManager()
3327 self.assertRaises(TypeError, m.start, 1)
3328 m.start(initializer, (self.ns,))
3329 self.assertEqual(self.ns.test, 1)
3330 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003331 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003332
3333 def test_pool_initializer(self):
3334 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3335 p = multiprocessing.Pool(1, initializer, (self.ns,))
3336 p.close()
3337 p.join()
3338 self.assertEqual(self.ns.test, 1)
3339
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003340#
3341# Issue 5155, 5313, 5331: Test process in processes
3342# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3343#
3344
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003345def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003346 try:
3347 item = q.get(block=False)
3348 except pyqueue.Empty:
3349 pass
3350
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003351def _test_process(q):
3352 queue = multiprocessing.Queue()
3353 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3354 subProc.daemon = True
3355 subProc.start()
3356 subProc.join()
3357
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003358def _afunc(x):
3359 return x*x
3360
3361def pool_in_process():
3362 pool = multiprocessing.Pool(processes=4)
3363 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003364 pool.close()
3365 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003366
3367class _file_like(object):
3368 def __init__(self, delegate):
3369 self._delegate = delegate
3370 self._pid = None
3371
3372 @property
3373 def cache(self):
3374 pid = os.getpid()
3375 # There are no race conditions since fork keeps only the running thread
3376 if pid != self._pid:
3377 self._pid = pid
3378 self._cache = []
3379 return self._cache
3380
3381 def write(self, data):
3382 self.cache.append(data)
3383
3384 def flush(self):
3385 self._delegate.write(''.join(self.cache))
3386 self._cache = []
3387
3388class TestStdinBadfiledescriptor(unittest.TestCase):
3389
3390 def test_queue_in_process(self):
3391 queue = multiprocessing.Queue()
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003392 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003393 proc.start()
3394 proc.join()
3395
3396 def test_pool_in_process(self):
3397 p = multiprocessing.Process(target=pool_in_process)
3398 p.start()
3399 p.join()
3400
3401 def test_flushing(self):
3402 sio = io.StringIO()
3403 flike = _file_like(sio)
3404 flike.write('foo')
3405 proc = multiprocessing.Process(target=lambda: flike.flush())
3406 flike.flush()
3407 assert sio.getvalue() == 'foo'
3408
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003409
3410class TestWait(unittest.TestCase):
3411
3412 @classmethod
3413 def _child_test_wait(cls, w, slow):
3414 for i in range(10):
3415 if slow:
3416 time.sleep(random.random()*0.1)
3417 w.send((i, os.getpid()))
3418 w.close()
3419
3420 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003421 from multiprocessing.connection import wait
3422 readers = []
3423 procs = []
3424 messages = []
3425
3426 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003427 r, w = multiprocessing.Pipe(duplex=False)
3428 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003429 p.daemon = True
3430 p.start()
3431 w.close()
3432 readers.append(r)
3433 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003434 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003435
3436 while readers:
3437 for r in wait(readers):
3438 try:
3439 msg = r.recv()
3440 except EOFError:
3441 readers.remove(r)
3442 r.close()
3443 else:
3444 messages.append(msg)
3445
3446 messages.sort()
3447 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3448 self.assertEqual(messages, expected)
3449
3450 @classmethod
3451 def _child_test_wait_socket(cls, address, slow):
3452 s = socket.socket()
3453 s.connect(address)
3454 for i in range(10):
3455 if slow:
3456 time.sleep(random.random()*0.1)
3457 s.sendall(('%s\n' % i).encode('ascii'))
3458 s.close()
3459
3460 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003461 from multiprocessing.connection import wait
3462 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003463 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01003464 l.listen()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003465 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003466 readers = []
3467 procs = []
3468 dic = {}
3469
3470 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003471 p = multiprocessing.Process(target=self._child_test_wait_socket,
3472 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003473 p.daemon = True
3474 p.start()
3475 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003476 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003477
3478 for i in range(4):
3479 r, _ = l.accept()
3480 readers.append(r)
3481 dic[r] = []
3482 l.close()
3483
3484 while readers:
3485 for r in wait(readers):
3486 msg = r.recv(32)
3487 if not msg:
3488 readers.remove(r)
3489 r.close()
3490 else:
3491 dic[r].append(msg)
3492
3493 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3494 for v in dic.values():
3495 self.assertEqual(b''.join(v), expected)
3496
3497 def test_wait_slow(self):
3498 self.test_wait(True)
3499
3500 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003501 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003502
3503 def test_wait_timeout(self):
3504 from multiprocessing.connection import wait
3505
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003506 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003507 a, b = multiprocessing.Pipe()
3508
3509 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003510 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003511 delta = time.time() - start
3512
3513 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003514 self.assertLess(delta, expected * 2)
3515 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003516
3517 b.send(None)
3518
3519 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003520 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003521 delta = time.time() - start
3522
3523 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003524 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003525
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003526 @classmethod
3527 def signal_and_sleep(cls, sem, period):
3528 sem.release()
3529 time.sleep(period)
3530
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003531 def test_wait_integer(self):
3532 from multiprocessing.connection import wait
3533
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003534 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01003535 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003536 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003537 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003538 p = multiprocessing.Process(target=self.signal_and_sleep,
3539 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003540
3541 p.start()
3542 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003543 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003544
3545 start = time.time()
3546 res = wait([a, p.sentinel, b], expected + 20)
3547 delta = time.time() - start
3548
3549 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003550 self.assertLess(delta, expected + 2)
3551 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003552
3553 a.send(None)
3554
3555 start = time.time()
3556 res = wait([a, p.sentinel, b], 20)
3557 delta = time.time() - start
3558
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003559 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003560 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003561
3562 b.send(None)
3563
3564 start = time.time()
3565 res = wait([a, p.sentinel, b], 20)
3566 delta = time.time() - start
3567
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003568 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003569 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003570
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003571 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003572 p.join()
3573
Richard Oudkerk59d54042012-05-10 16:11:12 +01003574 def test_neg_timeout(self):
3575 from multiprocessing.connection import wait
3576 a, b = multiprocessing.Pipe()
3577 t = time.time()
3578 res = wait([a], timeout=-1)
3579 t = time.time() - t
3580 self.assertEqual(res, [])
3581 self.assertLess(t, 1)
3582 a.close()
3583 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003584
Antoine Pitrou709176f2012-04-01 17:19:09 +02003585#
3586# Issue 14151: Test invalid family on invalid environment
3587#
3588
3589class TestInvalidFamily(unittest.TestCase):
3590
3591 @unittest.skipIf(WIN32, "skipped on Windows")
3592 def test_invalid_family(self):
3593 with self.assertRaises(ValueError):
3594 multiprocessing.connection.Listener(r'\\.\test')
3595
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003596 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3597 def test_invalid_family_win32(self):
3598 with self.assertRaises(ValueError):
3599 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003600
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003601#
3602# Issue 12098: check sys.flags of child matches that for parent
3603#
3604
3605class TestFlags(unittest.TestCase):
3606 @classmethod
3607 def run_in_grandchild(cls, conn):
3608 conn.send(tuple(sys.flags))
3609
3610 @classmethod
3611 def run_in_child(cls):
3612 import json
3613 r, w = multiprocessing.Pipe(duplex=False)
3614 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3615 p.start()
3616 grandchild_flags = r.recv()
3617 p.join()
3618 r.close()
3619 w.close()
3620 flags = (tuple(sys.flags), grandchild_flags)
3621 print(json.dumps(flags))
3622
3623 def test_flags(self):
3624 import json, subprocess
3625 # start child process using unusual flags
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003626 prog = ('from test._test_multiprocessing import TestFlags; ' +
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003627 'TestFlags.run_in_child()')
3628 data = subprocess.check_output(
3629 [sys.executable, '-E', '-S', '-O', '-c', prog])
3630 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3631 self.assertEqual(child_flags, grandchild_flags)
3632
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003633#
3634# Test interaction with socket timeouts - see Issue #6056
3635#
3636
3637class TestTimeouts(unittest.TestCase):
3638 @classmethod
3639 def _test_timeout(cls, child, address):
3640 time.sleep(1)
3641 child.send(123)
3642 child.close()
3643 conn = multiprocessing.connection.Client(address)
3644 conn.send(456)
3645 conn.close()
3646
3647 def test_timeout(self):
3648 old_timeout = socket.getdefaulttimeout()
3649 try:
3650 socket.setdefaulttimeout(0.1)
3651 parent, child = multiprocessing.Pipe(duplex=True)
3652 l = multiprocessing.connection.Listener(family='AF_INET')
3653 p = multiprocessing.Process(target=self._test_timeout,
3654 args=(child, l.address))
3655 p.start()
3656 child.close()
3657 self.assertEqual(parent.recv(), 123)
3658 parent.close()
3659 conn = l.accept()
3660 self.assertEqual(conn.recv(), 456)
3661 conn.close()
3662 l.close()
3663 p.join(10)
3664 finally:
3665 socket.setdefaulttimeout(old_timeout)
3666
Richard Oudkerke88a2442012-08-14 11:41:32 +01003667#
3668# Test what happens with no "if __name__ == '__main__'"
3669#
3670
3671class TestNoForkBomb(unittest.TestCase):
3672 def test_noforkbomb(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003673 sm = multiprocessing.get_start_method()
Richard Oudkerke88a2442012-08-14 11:41:32 +01003674 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003675 if sm != 'fork':
Berker Peksag076dbd02015-05-06 07:01:52 +03003676 rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02003677 self.assertEqual(out, b'')
3678 self.assertIn(b'RuntimeError', err)
Richard Oudkerke88a2442012-08-14 11:41:32 +01003679 else:
Berker Peksag076dbd02015-05-06 07:01:52 +03003680 rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02003681 self.assertEqual(out.rstrip(), b'123')
3682 self.assertEqual(err, b'')
Richard Oudkerke88a2442012-08-14 11:41:32 +01003683
3684#
Richard Oudkerk409c3132013-04-17 20:58:00 +01003685# Issue #17555: ForkAwareThreadLock
3686#
3687
3688class TestForkAwareThreadLock(unittest.TestCase):
3689 # We recurisvely start processes. Issue #17555 meant that the
3690 # after fork registry would get duplicate entries for the same
3691 # lock. The size of the registry at generation n was ~2**n.
3692
3693 @classmethod
3694 def child(cls, n, conn):
3695 if n > 1:
3696 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3697 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003698 conn.close()
3699 p.join(timeout=5)
Richard Oudkerk409c3132013-04-17 20:58:00 +01003700 else:
3701 conn.send(len(util._afterfork_registry))
3702 conn.close()
3703
3704 def test_lock(self):
3705 r, w = multiprocessing.Pipe(False)
3706 l = util.ForkAwareThreadLock()
3707 old_size = len(util._afterfork_registry)
3708 p = multiprocessing.Process(target=self.child, args=(5, w))
3709 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003710 w.close()
Richard Oudkerk409c3132013-04-17 20:58:00 +01003711 new_size = r.recv()
Richard Oudkerka01fb392013-08-21 19:45:19 +01003712 p.join(timeout=5)
Richard Oudkerk409c3132013-04-17 20:58:00 +01003713 self.assertLessEqual(new_size, old_size)
3714
3715#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003716# Check that non-forked child processes do not inherit unneeded fds/handles
3717#
3718
3719class TestCloseFds(unittest.TestCase):
3720
3721 def get_high_socket_fd(self):
3722 if WIN32:
3723 # The child process will not have any socket handles, so
3724 # calling socket.fromfd() should produce WSAENOTSOCK even
3725 # if there is a handle of the same number.
3726 return socket.socket().detach()
3727 else:
3728 # We want to produce a socket with an fd high enough that a
3729 # freshly created child process will not have any fds as high.
3730 fd = socket.socket().detach()
3731 to_close = []
3732 while fd < 50:
3733 to_close.append(fd)
3734 fd = os.dup(fd)
3735 for x in to_close:
3736 os.close(x)
3737 return fd
3738
3739 def close(self, fd):
3740 if WIN32:
3741 socket.socket(fileno=fd).close()
3742 else:
3743 os.close(fd)
3744
3745 @classmethod
3746 def _test_closefds(cls, conn, fd):
3747 try:
3748 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
3749 except Exception as e:
3750 conn.send(e)
3751 else:
3752 s.close()
3753 conn.send(None)
3754
3755 def test_closefd(self):
3756 if not HAS_REDUCTION:
3757 raise unittest.SkipTest('requires fd pickling')
3758
3759 reader, writer = multiprocessing.Pipe()
3760 fd = self.get_high_socket_fd()
3761 try:
3762 p = multiprocessing.Process(target=self._test_closefds,
3763 args=(writer, fd))
3764 p.start()
3765 writer.close()
3766 e = reader.recv()
3767 p.join(timeout=5)
3768 finally:
3769 self.close(fd)
3770 writer.close()
3771 reader.close()
3772
3773 if multiprocessing.get_start_method() == 'fork':
3774 self.assertIs(e, None)
3775 else:
3776 WSAENOTSOCK = 10038
3777 self.assertIsInstance(e, OSError)
3778 self.assertTrue(e.errno == errno.EBADF or
3779 e.winerror == WSAENOTSOCK, e)
3780
3781#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003782# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3783#
3784
3785class TestIgnoreEINTR(unittest.TestCase):
3786
3787 @classmethod
3788 def _test_ignore(cls, conn):
3789 def handler(signum, frame):
3790 pass
3791 signal.signal(signal.SIGUSR1, handler)
3792 conn.send('ready')
3793 x = conn.recv()
3794 conn.send(x)
3795 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
3796
3797 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3798 def test_ignore(self):
3799 conn, child_conn = multiprocessing.Pipe()
3800 try:
3801 p = multiprocessing.Process(target=self._test_ignore,
3802 args=(child_conn,))
3803 p.daemon = True
3804 p.start()
3805 child_conn.close()
3806 self.assertEqual(conn.recv(), 'ready')
3807 time.sleep(0.1)
3808 os.kill(p.pid, signal.SIGUSR1)
3809 time.sleep(0.1)
3810 conn.send(1234)
3811 self.assertEqual(conn.recv(), 1234)
3812 time.sleep(0.1)
3813 os.kill(p.pid, signal.SIGUSR1)
3814 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3815 time.sleep(0.1)
3816 p.join()
3817 finally:
3818 conn.close()
3819
3820 @classmethod
3821 def _test_ignore_listener(cls, conn):
3822 def handler(signum, frame):
3823 pass
3824 signal.signal(signal.SIGUSR1, handler)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003825 with multiprocessing.connection.Listener() as l:
3826 conn.send(l.address)
3827 a = l.accept()
3828 a.send('welcome')
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003829
3830 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3831 def test_ignore_listener(self):
3832 conn, child_conn = multiprocessing.Pipe()
3833 try:
3834 p = multiprocessing.Process(target=self._test_ignore_listener,
3835 args=(child_conn,))
3836 p.daemon = True
3837 p.start()
3838 child_conn.close()
3839 address = conn.recv()
3840 time.sleep(0.1)
3841 os.kill(p.pid, signal.SIGUSR1)
3842 time.sleep(0.1)
3843 client = multiprocessing.connection.Client(address)
3844 self.assertEqual(client.recv(), 'welcome')
3845 p.join()
3846 finally:
3847 conn.close()
3848
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003849class TestStartMethod(unittest.TestCase):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003850 @classmethod
3851 def _check_context(cls, conn):
3852 conn.send(multiprocessing.get_start_method())
3853
3854 def check_context(self, ctx):
3855 r, w = ctx.Pipe(duplex=False)
3856 p = ctx.Process(target=self._check_context, args=(w,))
3857 p.start()
3858 w.close()
3859 child_method = r.recv()
3860 r.close()
3861 p.join()
3862 self.assertEqual(child_method, ctx.get_start_method())
3863
3864 def test_context(self):
3865 for method in ('fork', 'spawn', 'forkserver'):
3866 try:
3867 ctx = multiprocessing.get_context(method)
3868 except ValueError:
3869 continue
3870 self.assertEqual(ctx.get_start_method(), method)
3871 self.assertIs(ctx.get_context(), ctx)
3872 self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
3873 self.assertRaises(ValueError, ctx.set_start_method, None)
3874 self.check_context(ctx)
3875
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003876 def test_set_get(self):
3877 multiprocessing.set_forkserver_preload(PRELOAD)
3878 count = 0
3879 old_method = multiprocessing.get_start_method()
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003880 try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003881 for method in ('fork', 'spawn', 'forkserver'):
3882 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003883 multiprocessing.set_start_method(method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003884 except ValueError:
3885 continue
3886 self.assertEqual(multiprocessing.get_start_method(), method)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003887 ctx = multiprocessing.get_context()
3888 self.assertEqual(ctx.get_start_method(), method)
3889 self.assertTrue(type(ctx).__name__.lower().startswith(method))
3890 self.assertTrue(
3891 ctx.Process.__name__.lower().startswith(method))
3892 self.check_context(multiprocessing)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003893 count += 1
3894 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01003895 multiprocessing.set_start_method(old_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003896 self.assertGreaterEqual(count, 1)
3897
3898 def test_get_all(self):
3899 methods = multiprocessing.get_all_start_methods()
3900 if sys.platform == 'win32':
3901 self.assertEqual(methods, ['spawn'])
3902 else:
3903 self.assertTrue(methods == ['fork', 'spawn'] or
3904 methods == ['fork', 'spawn', 'forkserver'])
3905
Antoine Pitroucd2a2012016-12-10 17:13:16 +01003906 def test_preload_resources(self):
3907 if multiprocessing.get_start_method() != 'forkserver':
3908 self.skipTest("test only relevant for 'forkserver' method")
3909 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
3910 rc, out, err = test.support.script_helper.assert_python_ok(name)
3911 out = out.decode()
3912 err = err.decode()
3913 if out.rstrip() != 'ok' or err != '':
3914 print(out)
3915 print(err)
3916 self.fail("failed spawning forkserver or grandchild")
3917
3918
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003919#
3920# Check that killing process does not leak named semaphores
3921#
3922
3923@unittest.skipIf(sys.platform == "win32",
3924 "test semantics don't make sense on Windows")
3925class TestSemaphoreTracker(unittest.TestCase):
3926 def test_semaphore_tracker(self):
3927 import subprocess
3928 cmd = '''if 1:
3929 import multiprocessing as mp, time, os
3930 mp.set_start_method("spawn")
3931 lock1 = mp.Lock()
3932 lock2 = mp.Lock()
3933 os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
3934 os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
3935 time.sleep(10)
3936 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003937 r, w = os.pipe()
3938 p = subprocess.Popen([sys.executable,
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003939 '-c', cmd % (w, w)],
Richard Oudkerk67e51982013-08-22 23:37:23 +01003940 pass_fds=[w],
3941 stderr=subprocess.PIPE)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003942 os.close(w)
3943 with open(r, 'rb', closefd=True) as f:
3944 name1 = f.readline().rstrip().decode('ascii')
3945 name2 = f.readline().rstrip().decode('ascii')
3946 _multiprocessing.sem_unlink(name1)
3947 p.terminate()
3948 p.wait()
Richard Oudkerk42a526c2014-02-21 22:29:58 +00003949 time.sleep(2.0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003950 with self.assertRaises(OSError) as ctx:
3951 _multiprocessing.sem_unlink(name2)
3952 # docs say it should be ENOENT, but OSX seems to give EINVAL
3953 self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
Richard Oudkerk67e51982013-08-22 23:37:23 +01003954 err = p.stderr.read().decode('utf-8')
3955 p.stderr.close()
3956 expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
3957 self.assertRegex(err, expected)
R David Murray44b548d2016-09-08 13:59:53 -04003958 self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003959
3960#
3961# Mixins
3962#
3963
3964class ProcessesMixin(object):
3965 TYPE = 'processes'
3966 Process = multiprocessing.Process
3967 connection = multiprocessing.connection
3968 current_process = staticmethod(multiprocessing.current_process)
3969 active_children = staticmethod(multiprocessing.active_children)
3970 Pool = staticmethod(multiprocessing.Pool)
3971 Pipe = staticmethod(multiprocessing.Pipe)
3972 Queue = staticmethod(multiprocessing.Queue)
3973 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
3974 Lock = staticmethod(multiprocessing.Lock)
3975 RLock = staticmethod(multiprocessing.RLock)
3976 Semaphore = staticmethod(multiprocessing.Semaphore)
3977 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
3978 Condition = staticmethod(multiprocessing.Condition)
3979 Event = staticmethod(multiprocessing.Event)
3980 Barrier = staticmethod(multiprocessing.Barrier)
3981 Value = staticmethod(multiprocessing.Value)
3982 Array = staticmethod(multiprocessing.Array)
3983 RawValue = staticmethod(multiprocessing.RawValue)
3984 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003985
Benjamin Petersone711caf2008-06-11 16:44:04 +00003986
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003987class ManagerMixin(object):
3988 TYPE = 'manager'
3989 Process = multiprocessing.Process
3990 Queue = property(operator.attrgetter('manager.Queue'))
3991 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3992 Lock = property(operator.attrgetter('manager.Lock'))
3993 RLock = property(operator.attrgetter('manager.RLock'))
3994 Semaphore = property(operator.attrgetter('manager.Semaphore'))
3995 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3996 Condition = property(operator.attrgetter('manager.Condition'))
3997 Event = property(operator.attrgetter('manager.Event'))
3998 Barrier = property(operator.attrgetter('manager.Barrier'))
3999 Value = property(operator.attrgetter('manager.Value'))
4000 Array = property(operator.attrgetter('manager.Array'))
4001 list = property(operator.attrgetter('manager.list'))
4002 dict = property(operator.attrgetter('manager.dict'))
4003 Namespace = property(operator.attrgetter('manager.Namespace'))
4004
4005 @classmethod
4006 def Pool(cls, *args, **kwds):
4007 return cls.manager.Pool(*args, **kwds)
4008
4009 @classmethod
4010 def setUpClass(cls):
4011 cls.manager = multiprocessing.Manager()
4012
4013 @classmethod
4014 def tearDownClass(cls):
4015 # only the manager process should be returned by active_children()
4016 # but this can take a bit on slow machines, so wait a few seconds
4017 # if there are other children too (see #17395)
4018 t = 0.01
4019 while len(multiprocessing.active_children()) > 1 and t < 5:
4020 time.sleep(t)
4021 t *= 2
4022 gc.collect() # do garbage collection
4023 if cls.manager._number_of_objects() != 0:
4024 # This is not really an error since some tests do not
4025 # ensure that all processes which hold a reference to a
4026 # managed object have been joined.
4027 print('Shared objects which still exist at manager shutdown:')
4028 print(cls.manager._debug_info())
4029 cls.manager.shutdown()
4030 cls.manager.join()
4031 cls.manager = None
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01004032
4033
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004034class ThreadsMixin(object):
4035 TYPE = 'threads'
4036 Process = multiprocessing.dummy.Process
4037 connection = multiprocessing.dummy.connection
4038 current_process = staticmethod(multiprocessing.dummy.current_process)
4039 active_children = staticmethod(multiprocessing.dummy.active_children)
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01004040 Pool = staticmethod(multiprocessing.dummy.Pool)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004041 Pipe = staticmethod(multiprocessing.dummy.Pipe)
4042 Queue = staticmethod(multiprocessing.dummy.Queue)
4043 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
4044 Lock = staticmethod(multiprocessing.dummy.Lock)
4045 RLock = staticmethod(multiprocessing.dummy.RLock)
4046 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
4047 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
4048 Condition = staticmethod(multiprocessing.dummy.Condition)
4049 Event = staticmethod(multiprocessing.dummy.Event)
4050 Barrier = staticmethod(multiprocessing.dummy.Barrier)
4051 Value = staticmethod(multiprocessing.dummy.Value)
4052 Array = staticmethod(multiprocessing.dummy.Array)
4053
4054#
4055# Functions used to create test cases from the base ones in this module
4056#
4057
4058def install_tests_in_module_dict(remote_globs, start_method):
4059 __module__ = remote_globs['__name__']
4060 local_globs = globals()
4061 ALL_TYPES = {'processes', 'threads', 'manager'}
4062
4063 for name, base in local_globs.items():
4064 if not isinstance(base, type):
4065 continue
4066 if issubclass(base, BaseTestCase):
4067 if base is BaseTestCase:
4068 continue
4069 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
4070 for type_ in base.ALLOWED_TYPES:
4071 newname = 'With' + type_.capitalize() + name[1:]
4072 Mixin = local_globs[type_.capitalize() + 'Mixin']
4073 class Temp(base, Mixin, unittest.TestCase):
4074 pass
4075 Temp.__name__ = Temp.__qualname__ = newname
4076 Temp.__module__ = __module__
4077 remote_globs[newname] = Temp
4078 elif issubclass(base, unittest.TestCase):
4079 class Temp(base, object):
4080 pass
4081 Temp.__name__ = Temp.__qualname__ = name
4082 Temp.__module__ = __module__
4083 remote_globs[name] = Temp
4084
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004085 dangling = [None, None]
4086 old_start_method = [None]
4087
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004088 def setUpModule():
4089 multiprocessing.set_forkserver_preload(PRELOAD)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004090 multiprocessing.process._cleanup()
4091 dangling[0] = multiprocessing.process._dangling.copy()
4092 dangling[1] = threading._dangling.copy()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004093 old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004094 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004095 multiprocessing.set_start_method(start_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004096 except ValueError:
4097 raise unittest.SkipTest(start_method +
4098 ' start method not supported')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004099
4100 if sys.platform.startswith("linux"):
4101 try:
4102 lock = multiprocessing.RLock()
4103 except OSError:
4104 raise unittest.SkipTest("OSError raises on RLock creation, "
4105 "see issue 3111!")
4106 check_enough_semaphores()
4107 util.get_temp_dir() # creates temp directory
4108 multiprocessing.get_logger().setLevel(LOG_LEVEL)
4109
4110 def tearDownModule():
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004111 multiprocessing.set_start_method(old_start_method[0], force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004112 # pause a bit so we don't get warning about dangling threads/processes
4113 time.sleep(0.5)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004114 multiprocessing.process._cleanup()
4115 gc.collect()
4116 tmp = set(multiprocessing.process._dangling) - set(dangling[0])
4117 if tmp:
4118 print('Dangling processes:', tmp, file=sys.stderr)
4119 del tmp
4120 tmp = set(threading._dangling) - set(dangling[1])
4121 if tmp:
4122 print('Dangling threads:', tmp, file=sys.stderr)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004123
4124 remote_globs['setUpModule'] = setUpModule
4125 remote_globs['tearDownModule'] = tearDownModule