blob: 7993fcb08e465a36ab1f65cfe50a3d421c9fbb6c [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Julien Palard5d236ca2018-11-04 23:40:32 +01006import unittest.mock
Benjamin Petersone711caf2008-06-11 16:44:04 +00007import queue as pyqueue
Antoine Pitroucbe17562017-11-03 14:31:38 +01008import contextlib
Benjamin Petersone711caf2008-06-11 16:44:04 +00009import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010022import operator
Antoine Pitrou89889452017-03-24 13:52:11 +010023import weakref
Pablo Galindoec74d182018-09-04 09:53:54 +010024import warnings
R. David Murraya21e4ca2009-03-31 23:16:50 +000025import test.support
Berker Peksag076dbd02015-05-06 07:01:52 +030026import test.support.script_helper
Victor Stinnerb9b69002017-09-14 14:40:56 -070027from test import support
Benjamin Petersone711caf2008-06-11 16:44:04 +000028
Benjamin Petersone5384b02008-10-04 22:00:42 +000029
R. David Murraya21e4ca2009-03-31 23:16:50 +000030# Skip tests if _multiprocessing wasn't built.
31_multiprocessing = test.support.import_module('_multiprocessing')
32# Skip tests if sem_open implementation is broken.
33test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000034import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000035
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.connection
Victor Stinnerd7e64d92017-07-25 00:33:56 +020037import multiprocessing.dummy
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import multiprocessing.heap
Victor Stinnerd7e64d92017-07-25 00:33:56 +020039import multiprocessing.managers
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import multiprocessing.pool
Victor Stinnerd7e64d92017-07-25 00:33:56 +020041import multiprocessing.queues
Benjamin Petersone711caf2008-06-11 16:44:04 +000042
Charles-François Natalibc8f0822011-09-20 20:36:51 +020043from multiprocessing import util
44
45try:
46 from multiprocessing import reduction
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010047 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
Charles-François Natalibc8f0822011-09-20 20:36:51 +020048except ImportError:
49 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000050
Brian Curtinafa88b52010-10-07 01:12:19 +000051try:
52 from multiprocessing.sharedctypes import Value, copy
53 HAS_SHAREDCTYPES = True
54except ImportError:
55 HAS_SHAREDCTYPES = False
56
Antoine Pitroubcb39d42011-08-23 19:46:22 +020057try:
58 import msvcrt
59except ImportError:
60 msvcrt = None
61
Benjamin Petersone711caf2008-06-11 16:44:04 +000062#
63#
64#
65
Victor Stinner11f08072017-09-15 06:55:31 -070066# Timeout to wait until a process completes
67TIMEOUT = 30.0 # seconds
68
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000069def latin(s):
70 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
Victor Stinnerd7e64d92017-07-25 00:33:56 +020072
73def close_queue(queue):
74 if isinstance(queue, multiprocessing.queues.Queue):
75 queue.close()
76 queue.join_thread()
77
78
Victor Stinner11f08072017-09-15 06:55:31 -070079def join_process(process):
Victor Stinnerb9b69002017-09-14 14:40:56 -070080 # Since multiprocessing.Process has the same API than threading.Thread
81 # (join() and is_alive(), the support function can be reused
Victor Stinner11f08072017-09-15 06:55:31 -070082 support.join_thread(process, timeout=TIMEOUT)
Victor Stinnerb9b69002017-09-14 14:40:56 -070083
84
Benjamin Petersone711caf2008-06-11 16:44:04 +000085#
86# Constants
87#
88
89LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000090#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000091
92DELTA = 0.1
93CHECK_TIMINGS = False # making true makes tests take a lot longer
94 # and can sometimes cause some non-serious
95 # failures because some calls block a bit
96 # longer than expected
97if CHECK_TIMINGS:
98 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
99else:
100 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
101
102HAVE_GETVALUE = not getattr(_multiprocessing,
103 'HAVE_BROKEN_SEM_GETVALUE', False)
104
Victor Stinner937ee9e2018-06-26 02:11:06 +0200105WIN32 = (sys.platform == "win32")
106
Richard Oudkerk59d54042012-05-10 16:11:12 +0100107from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200108
Richard Oudkerk59d54042012-05-10 16:11:12 +0100109def wait_for_handle(handle, timeout):
110 if timeout is not None and timeout < 0.0:
111 timeout = None
112 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +0000113
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200114try:
115 MAXFD = os.sysconf("SC_OPEN_MAX")
116except:
117 MAXFD = 256
118
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100119# To speed up tests when using the forkserver, we can preload these:
120PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
121
Benjamin Petersone711caf2008-06-11 16:44:04 +0000122#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000123# Some tests require ctypes
124#
125
126try:
Gareth Rees3913bad2017-07-21 11:35:33 +0100127 from ctypes import Structure, c_int, c_double, c_longlong
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000128except ImportError:
129 Structure = object
Antoine Pitrouff92ff52017-07-21 13:24:05 +0200130 c_int = c_double = c_longlong = None
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000131
Charles-François Natali221ef672011-11-22 18:55:22 +0100132
133def check_enough_semaphores():
134 """Check that the system supports enough semaphores to run the test."""
135 # minimum number of semaphores available according to POSIX
136 nsems_min = 256
137 try:
138 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
139 except (AttributeError, ValueError):
140 # sysconf not available or setting not available
141 return
142 if nsems == -1 or nsems >= nsems_min:
143 return
144 raise unittest.SkipTest("The OS doesn't support enough semaphores "
145 "to run the test (required: %d)." % nsems_min)
146
147
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000148#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149# Creates a wrapper for a function which records the time it takes to finish
150#
151
152class TimingWrapper(object):
153
154 def __init__(self, func):
155 self.func = func
156 self.elapsed = None
157
158 def __call__(self, *args, **kwds):
159 t = time.time()
160 try:
161 return self.func(*args, **kwds)
162 finally:
163 self.elapsed = time.time() - t
164
165#
166# Base class for test cases
167#
168
169class BaseTestCase(object):
170
171 ALLOWED_TYPES = ('processes', 'manager', 'threads')
172
173 def assertTimingAlmostEqual(self, a, b):
174 if CHECK_TIMINGS:
175 self.assertAlmostEqual(a, b, 1)
176
177 def assertReturnsIfImplemented(self, value, func, *args):
178 try:
179 res = func(*args)
180 except NotImplementedError:
181 pass
182 else:
183 return self.assertEqual(value, res)
184
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000185 # For the sanity of Windows users, rather than crashing or freezing in
186 # multiple ways.
187 def __reduce__(self, *args):
188 raise NotImplementedError("shouldn't try to pickle a test case")
189
190 __reduce_ex__ = __reduce__
191
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192#
193# Return the value of a semaphore
194#
195
196def get_value(self):
197 try:
198 return self.get_value()
199 except AttributeError:
200 try:
201 return self._Semaphore__value
202 except AttributeError:
203 try:
204 return self._value
205 except AttributeError:
206 raise NotImplementedError
207
208#
209# Testcases
210#
211
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200212class DummyCallable:
213 def __call__(self, q, c):
214 assert isinstance(c, DummyCallable)
215 q.put(5)
216
217
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218class _TestProcess(BaseTestCase):
219
220 ALLOWED_TYPES = ('processes', 'threads')
221
222 def test_current(self):
223 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600224 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228
229 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000231 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000233 self.assertEqual(current.ident, os.getpid())
234 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000236 def test_daemon_argument(self):
237 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600238 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000239
240 # By default uses the current process's daemon flag.
241 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000242 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000243 proc1 = self.Process(target=self._test, daemon=True)
244 self.assertTrue(proc1.daemon)
245 proc2 = self.Process(target=self._test, daemon=False)
246 self.assertFalse(proc2.daemon)
247
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000248 @classmethod
249 def _test(cls, q, *args, **kwds):
250 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251 q.put(args)
252 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000253 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000254 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000255 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 q.put(current.pid)
257
258 def test_process(self):
259 q = self.Queue(1)
260 e = self.Event()
261 args = (q, 1, 2)
262 kwargs = {'hello':23, 'bye':2.54}
263 name = 'SomeProcess'
264 p = self.Process(
265 target=self._test, args=args, kwargs=kwargs, name=name
266 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000267 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 current = self.current_process()
269
270 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000271 self.assertEqual(p.authkey, current.authkey)
272 self.assertEqual(p.is_alive(), False)
273 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000274 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000276 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
278 p.start()
279
Ezio Melottib3aedd42010-11-20 19:04:17 +0000280 self.assertEqual(p.exitcode, None)
281 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000282 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000283
Ezio Melottib3aedd42010-11-20 19:04:17 +0000284 self.assertEqual(q.get(), args[1:])
285 self.assertEqual(q.get(), kwargs)
286 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000287 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000288 self.assertEqual(q.get(), current.authkey)
289 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000290
291 p.join()
292
Ezio Melottib3aedd42010-11-20 19:04:17 +0000293 self.assertEqual(p.exitcode, 0)
294 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000295 self.assertNotIn(p, self.active_children())
Victor Stinnerb4c52962017-07-25 02:40:55 +0200296 close_queue(q)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000298 @classmethod
Vitor Pereiraba75af72017-07-18 16:34:23 +0100299 def _sleep_some(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100300 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200302 @classmethod
303 def _test_sleep(cls, delay):
304 time.sleep(delay)
305
Vitor Pereiraba75af72017-07-18 16:34:23 +0100306 def _kill_process(self, meth):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600308 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000309
Vitor Pereiraba75af72017-07-18 16:34:23 +0100310 p = self.Process(target=self._sleep_some)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000311 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000312 p.start()
313
314 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000315 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000316 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000317
Richard Oudkerk59d54042012-05-10 16:11:12 +0100318 join = TimingWrapper(p.join)
319
320 self.assertEqual(join(0), None)
321 self.assertTimingAlmostEqual(join.elapsed, 0.0)
322 self.assertEqual(p.is_alive(), True)
323
324 self.assertEqual(join(-1), None)
325 self.assertTimingAlmostEqual(join.elapsed, 0.0)
326 self.assertEqual(p.is_alive(), True)
327
Richard Oudkerk26f92682013-10-17 13:56:18 +0100328 # XXX maybe terminating too soon causes the problems on Gentoo...
329 time.sleep(1)
330
Vitor Pereiraba75af72017-07-18 16:34:23 +0100331 meth(p)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000332
Richard Oudkerk4f350792013-10-13 00:49:27 +0100333 if hasattr(signal, 'alarm'):
Richard Oudkerkd44500a2013-10-17 10:38:37 +0100334 # On the Gentoo buildbot waitpid() often seems to block forever.
Richard Oudkerk26f92682013-10-17 13:56:18 +0100335 # We use alarm() to interrupt it if it blocks for too long.
Richard Oudkerk4f350792013-10-13 00:49:27 +0100336 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100337 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100338 old_handler = signal.signal(signal.SIGALRM, handler)
339 try:
340 signal.alarm(10)
341 self.assertEqual(join(), None)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100342 finally:
Richard Oudkerk1e2f67c2013-10-17 14:24:06 +0100343 signal.alarm(0)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100344 signal.signal(signal.SIGALRM, old_handler)
345 else:
346 self.assertEqual(join(), None)
347
Benjamin Petersone711caf2008-06-11 16:44:04 +0000348 self.assertTimingAlmostEqual(join.elapsed, 0.0)
349
350 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000351 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000352
353 p.join()
354
Vitor Pereiraba75af72017-07-18 16:34:23 +0100355 return p.exitcode
356
357 def test_terminate(self):
358 exitcode = self._kill_process(multiprocessing.Process.terminate)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200359 if os.name != 'nt':
Vitor Pereiraba75af72017-07-18 16:34:23 +0100360 self.assertEqual(exitcode, -signal.SIGTERM)
361
362 def test_kill(self):
363 exitcode = self._kill_process(multiprocessing.Process.kill)
364 if os.name != 'nt':
365 self.assertEqual(exitcode, -signal.SIGKILL)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000366
367 def test_cpu_count(self):
368 try:
369 cpus = multiprocessing.cpu_count()
370 except NotImplementedError:
371 cpus = 1
372 self.assertTrue(type(cpus) is int)
373 self.assertTrue(cpus >= 1)
374
375 def test_active_children(self):
376 self.assertEqual(type(self.active_children()), list)
377
378 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000379 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000380
Jesus Cea94f964f2011-09-09 20:26:57 +0200381 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000382 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000383 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384
385 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000386 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000387
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000388 @classmethod
389 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000390 wconn.send(id)
391 if len(id) < 2:
392 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000393 p = cls.Process(
394 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395 )
396 p.start()
397 p.join()
398
399 def test_recursion(self):
400 rconn, wconn = self.Pipe(duplex=False)
401 self._test_recursion(wconn, [])
402
403 time.sleep(DELTA)
404 result = []
405 while rconn.poll():
406 result.append(rconn.recv())
407
408 expected = [
409 [],
410 [0],
411 [0, 0],
412 [0, 1],
413 [1],
414 [1, 0],
415 [1, 1]
416 ]
417 self.assertEqual(result, expected)
418
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200419 @classmethod
420 def _test_sentinel(cls, event):
421 event.wait(10.0)
422
423 def test_sentinel(self):
424 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600425 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200426 event = self.Event()
427 p = self.Process(target=self._test_sentinel, args=(event,))
428 with self.assertRaises(ValueError):
429 p.sentinel
430 p.start()
431 self.addCleanup(p.join)
432 sentinel = p.sentinel
433 self.assertIsInstance(sentinel, int)
434 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
435 event.set()
436 p.join()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100437 self.assertTrue(wait_for_handle(sentinel, timeout=1))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200438
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200439 @classmethod
440 def _test_close(cls, rc=0, q=None):
441 if q is not None:
442 q.get()
443 sys.exit(rc)
444
445 def test_close(self):
446 if self.TYPE == "threads":
447 self.skipTest('test not appropriate for {}'.format(self.TYPE))
448 q = self.Queue()
449 p = self.Process(target=self._test_close, kwargs={'q': q})
450 p.daemon = True
451 p.start()
452 self.assertEqual(p.is_alive(), True)
453 # Child is still alive, cannot close
454 with self.assertRaises(ValueError):
455 p.close()
456
457 q.put(None)
458 p.join()
459 self.assertEqual(p.is_alive(), False)
460 self.assertEqual(p.exitcode, 0)
461 p.close()
462 with self.assertRaises(ValueError):
463 p.is_alive()
464 with self.assertRaises(ValueError):
465 p.join()
466 with self.assertRaises(ValueError):
467 p.terminate()
468 p.close()
469
470 wr = weakref.ref(p)
471 del p
472 gc.collect()
473 self.assertIs(wr(), None)
474
Victor Stinnerb4c52962017-07-25 02:40:55 +0200475 close_queue(q)
476
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200477 def test_many_processes(self):
478 if self.TYPE == 'threads':
479 self.skipTest('test not appropriate for {}'.format(self.TYPE))
480
481 sm = multiprocessing.get_start_method()
482 N = 5 if sm == 'spawn' else 100
483
484 # Try to overwhelm the forkserver loop with events
485 procs = [self.Process(target=self._test_sleep, args=(0.01,))
486 for i in range(N)]
487 for p in procs:
488 p.start()
489 for p in procs:
Victor Stinner11f08072017-09-15 06:55:31 -0700490 join_process(p)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200491 for p in procs:
492 self.assertEqual(p.exitcode, 0)
493
Vitor Pereiraba75af72017-07-18 16:34:23 +0100494 procs = [self.Process(target=self._sleep_some)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200495 for i in range(N)]
496 for p in procs:
497 p.start()
498 time.sleep(0.001) # let the children start...
499 for p in procs:
500 p.terminate()
501 for p in procs:
Victor Stinner11f08072017-09-15 06:55:31 -0700502 join_process(p)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200503 if os.name != 'nt':
Victor Stinnere6cfdef2017-10-02 08:27:34 -0700504 exitcodes = [-signal.SIGTERM]
505 if sys.platform == 'darwin':
506 # bpo-31510: On macOS, killing a freshly started process with
507 # SIGTERM sometimes kills the process with SIGKILL.
508 exitcodes.append(-signal.SIGKILL)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200509 for p in procs:
Victor Stinnere6cfdef2017-10-02 08:27:34 -0700510 self.assertIn(p.exitcode, exitcodes)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200511
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200512 def test_lose_target_ref(self):
513 c = DummyCallable()
514 wr = weakref.ref(c)
515 q = self.Queue()
516 p = self.Process(target=c, args=(q, c))
517 del c
518 p.start()
519 p.join()
520 self.assertIs(wr(), None)
521 self.assertEqual(q.get(), 5)
Victor Stinnerb4c52962017-07-25 02:40:55 +0200522 close_queue(q)
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200523
Antoine Pitrou896145d2017-07-22 13:22:54 +0200524 @classmethod
525 def _test_child_fd_inflation(self, evt, q):
526 q.put(test.support.fd_count())
527 evt.wait()
528
529 def test_child_fd_inflation(self):
530 # Number of fds in child processes should not grow with the
531 # number of running children.
532 if self.TYPE == 'threads':
533 self.skipTest('test not appropriate for {}'.format(self.TYPE))
534
535 sm = multiprocessing.get_start_method()
536 if sm == 'fork':
537 # The fork method by design inherits all fds from the parent,
538 # trying to go against it is a lost battle
539 self.skipTest('test not appropriate for {}'.format(sm))
540
541 N = 5
542 evt = self.Event()
543 q = self.Queue()
544
545 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
546 for i in range(N)]
547 for p in procs:
548 p.start()
549
550 try:
551 fd_counts = [q.get() for i in range(N)]
552 self.assertEqual(len(set(fd_counts)), 1, fd_counts)
553
554 finally:
555 evt.set()
556 for p in procs:
557 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200558 close_queue(q)
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200559
Antoine Pitrouee84a602017-08-16 20:53:28 +0200560 @classmethod
561 def _test_wait_for_threads(self, evt):
562 def func1():
563 time.sleep(0.5)
564 evt.set()
565
566 def func2():
567 time.sleep(20)
568 evt.clear()
569
570 threading.Thread(target=func1).start()
571 threading.Thread(target=func2, daemon=True).start()
572
573 def test_wait_for_threads(self):
574 # A child process should wait for non-daemonic threads to end
575 # before exiting
576 if self.TYPE == 'threads':
577 self.skipTest('test not appropriate for {}'.format(self.TYPE))
578
579 evt = self.Event()
580 proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
581 proc.start()
582 proc.join()
583 self.assertTrue(evt.is_set())
584
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200585 @classmethod
Antoine Pitroue756f662018-03-11 19:21:38 +0100586 def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
587 for stream_name, action in break_std_streams.items():
588 if action == 'close':
589 stream = io.StringIO()
590 stream.close()
591 else:
592 assert action == 'remove'
593 stream = None
594 setattr(sys, stream_name, None)
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200595 evt.set()
596
Antoine Pitroue756f662018-03-11 19:21:38 +0100597 def test_error_on_stdio_flush_1(self):
598 # Check that Process works with broken standard streams
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200599 streams = [io.StringIO(), None]
600 streams[0].close()
601 for stream_name in ('stdout', 'stderr'):
602 for stream in streams:
603 old_stream = getattr(sys, stream_name)
604 setattr(sys, stream_name, stream)
605 try:
606 evt = self.Event()
607 proc = self.Process(target=self._test_error_on_stdio_flush,
608 args=(evt,))
609 proc.start()
610 proc.join()
611 self.assertTrue(evt.is_set())
Antoine Pitroue756f662018-03-11 19:21:38 +0100612 self.assertEqual(proc.exitcode, 0)
613 finally:
614 setattr(sys, stream_name, old_stream)
615
616 def test_error_on_stdio_flush_2(self):
617 # Same as test_error_on_stdio_flush_1(), but standard streams are
618 # broken by the child process
619 for stream_name in ('stdout', 'stderr'):
620 for action in ('close', 'remove'):
621 old_stream = getattr(sys, stream_name)
622 try:
623 evt = self.Event()
624 proc = self.Process(target=self._test_error_on_stdio_flush,
625 args=(evt, {stream_name: action}))
626 proc.start()
627 proc.join()
628 self.assertTrue(evt.is_set())
629 self.assertEqual(proc.exitcode, 0)
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200630 finally:
631 setattr(sys, stream_name, old_stream)
632
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100633 @classmethod
634 def _sleep_and_set_event(self, evt, delay=0.0):
635 time.sleep(delay)
636 evt.set()
637
638 def check_forkserver_death(self, signum):
639 # bpo-31308: if the forkserver process has died, we should still
640 # be able to create and run new Process instances (the forkserver
641 # is implicitly restarted).
642 if self.TYPE == 'threads':
643 self.skipTest('test not appropriate for {}'.format(self.TYPE))
644 sm = multiprocessing.get_start_method()
645 if sm != 'forkserver':
646 # The fork method by design inherits all fds from the parent,
647 # trying to go against it is a lost battle
648 self.skipTest('test not appropriate for {}'.format(sm))
649
650 from multiprocessing.forkserver import _forkserver
651 _forkserver.ensure_running()
652
Victor Stinner07888e12018-07-04 11:49:41 +0200653 # First process sleeps 500 ms
654 delay = 0.5
655
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100656 evt = self.Event()
Victor Stinner07888e12018-07-04 11:49:41 +0200657 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100658 proc.start()
659
660 pid = _forkserver._forkserver_pid
661 os.kill(pid, signum)
Victor Stinner07888e12018-07-04 11:49:41 +0200662 # give time to the fork server to die and time to proc to complete
663 time.sleep(delay * 2.0)
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100664
665 evt2 = self.Event()
666 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
667 proc2.start()
668 proc2.join()
669 self.assertTrue(evt2.is_set())
670 self.assertEqual(proc2.exitcode, 0)
671
672 proc.join()
673 self.assertTrue(evt.is_set())
674 self.assertIn(proc.exitcode, (0, 255))
675
676 def test_forkserver_sigint(self):
677 # Catchable signal
678 self.check_forkserver_death(signal.SIGINT)
679
680 def test_forkserver_sigkill(self):
681 # Uncatchable signal
682 if os.name != 'nt':
683 self.check_forkserver_death(signal.SIGKILL)
684
Antoine Pitrouee84a602017-08-16 20:53:28 +0200685
Benjamin Petersone711caf2008-06-11 16:44:04 +0000686#
687#
688#
689
690class _UpperCaser(multiprocessing.Process):
691
692 def __init__(self):
693 multiprocessing.Process.__init__(self)
694 self.child_conn, self.parent_conn = multiprocessing.Pipe()
695
696 def run(self):
697 self.parent_conn.close()
698 for s in iter(self.child_conn.recv, None):
699 self.child_conn.send(s.upper())
700 self.child_conn.close()
701
702 def submit(self, s):
703 assert type(s) is str
704 self.parent_conn.send(s)
705 return self.parent_conn.recv()
706
707 def stop(self):
708 self.parent_conn.send(None)
709 self.parent_conn.close()
710 self.child_conn.close()
711
712class _TestSubclassingProcess(BaseTestCase):
713
714 ALLOWED_TYPES = ('processes',)
715
716 def test_subclassing(self):
717 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200718 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000719 uppercaser.start()
720 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
721 self.assertEqual(uppercaser.submit('world'), 'WORLD')
722 uppercaser.stop()
723 uppercaser.join()
724
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100725 def test_stderr_flush(self):
726 # sys.stderr is flushed at process shutdown (issue #13812)
727 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600728 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100729
730 testfn = test.support.TESTFN
731 self.addCleanup(test.support.unlink, testfn)
732 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
733 proc.start()
734 proc.join()
735 with open(testfn, 'r') as f:
736 err = f.read()
737 # The whole traceback was printed
738 self.assertIn("ZeroDivisionError", err)
739 self.assertIn("test_multiprocessing.py", err)
740 self.assertIn("1/0 # MARKER", err)
741
742 @classmethod
743 def _test_stderr_flush(cls, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100744 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
745 sys.stderr = open(fd, 'w', closefd=False)
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100746 1/0 # MARKER
747
748
Richard Oudkerk29471de2012-06-06 19:04:57 +0100749 @classmethod
750 def _test_sys_exit(cls, reason, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100751 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
752 sys.stderr = open(fd, 'w', closefd=False)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100753 sys.exit(reason)
754
755 def test_sys_exit(self):
756 # See Issue 13854
757 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600758 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk29471de2012-06-06 19:04:57 +0100759
760 testfn = test.support.TESTFN
761 self.addCleanup(test.support.unlink, testfn)
762
Victor Stinnera6d865c2016-03-25 09:29:50 +0100763 for reason in (
764 [1, 2, 3],
765 'ignore this',
766 ):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100767 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
768 p.daemon = True
769 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -0700770 join_process(p)
Victor Stinnera6d865c2016-03-25 09:29:50 +0100771 self.assertEqual(p.exitcode, 1)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100772
773 with open(testfn, 'r') as f:
Victor Stinnera6d865c2016-03-25 09:29:50 +0100774 content = f.read()
775 self.assertEqual(content.rstrip(), str(reason))
776
777 os.unlink(testfn)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100778
779 for reason in (True, False, 8):
780 p = self.Process(target=sys.exit, args=(reason,))
781 p.daemon = True
782 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -0700783 join_process(p)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100784 self.assertEqual(p.exitcode, reason)
785
Benjamin Petersone711caf2008-06-11 16:44:04 +0000786#
787#
788#
789
790def queue_empty(q):
791 if hasattr(q, 'empty'):
792 return q.empty()
793 else:
794 return q.qsize() == 0
795
796def queue_full(q, maxsize):
797 if hasattr(q, 'full'):
798 return q.full()
799 else:
800 return q.qsize() == maxsize
801
802
803class _TestQueue(BaseTestCase):
804
805
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000806 @classmethod
807 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000808 child_can_start.wait()
809 for i in range(6):
810 queue.get()
811 parent_can_continue.set()
812
813 def test_put(self):
814 MAXSIZE = 6
815 queue = self.Queue(maxsize=MAXSIZE)
816 child_can_start = self.Event()
817 parent_can_continue = self.Event()
818
819 proc = self.Process(
820 target=self._test_put,
821 args=(queue, child_can_start, parent_can_continue)
822 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000823 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000824 proc.start()
825
826 self.assertEqual(queue_empty(queue), True)
827 self.assertEqual(queue_full(queue, MAXSIZE), False)
828
829 queue.put(1)
830 queue.put(2, True)
831 queue.put(3, True, None)
832 queue.put(4, False)
833 queue.put(5, False, None)
834 queue.put_nowait(6)
835
836 # the values may be in buffer but not yet in pipe so sleep a bit
837 time.sleep(DELTA)
838
839 self.assertEqual(queue_empty(queue), False)
840 self.assertEqual(queue_full(queue, MAXSIZE), True)
841
842 put = TimingWrapper(queue.put)
843 put_nowait = TimingWrapper(queue.put_nowait)
844
845 self.assertRaises(pyqueue.Full, put, 7, False)
846 self.assertTimingAlmostEqual(put.elapsed, 0)
847
848 self.assertRaises(pyqueue.Full, put, 7, False, None)
849 self.assertTimingAlmostEqual(put.elapsed, 0)
850
851 self.assertRaises(pyqueue.Full, put_nowait, 7)
852 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
853
854 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
855 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
856
857 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
858 self.assertTimingAlmostEqual(put.elapsed, 0)
859
860 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
861 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
862
863 child_can_start.set()
864 parent_can_continue.wait()
865
866 self.assertEqual(queue_empty(queue), True)
867 self.assertEqual(queue_full(queue, MAXSIZE), False)
868
869 proc.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200870 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000871
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000872 @classmethod
873 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000874 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000875 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 queue.put(2)
877 queue.put(3)
878 queue.put(4)
879 queue.put(5)
880 parent_can_continue.set()
881
882 def test_get(self):
883 queue = self.Queue()
884 child_can_start = self.Event()
885 parent_can_continue = self.Event()
886
887 proc = self.Process(
888 target=self._test_get,
889 args=(queue, child_can_start, parent_can_continue)
890 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000891 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000892 proc.start()
893
894 self.assertEqual(queue_empty(queue), True)
895
896 child_can_start.set()
897 parent_can_continue.wait()
898
899 time.sleep(DELTA)
900 self.assertEqual(queue_empty(queue), False)
901
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000902 # Hangs unexpectedly, remove for now
903 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 self.assertEqual(queue.get(True, None), 2)
905 self.assertEqual(queue.get(True), 3)
906 self.assertEqual(queue.get(timeout=1), 4)
907 self.assertEqual(queue.get_nowait(), 5)
908
909 self.assertEqual(queue_empty(queue), True)
910
911 get = TimingWrapper(queue.get)
912 get_nowait = TimingWrapper(queue.get_nowait)
913
914 self.assertRaises(pyqueue.Empty, get, False)
915 self.assertTimingAlmostEqual(get.elapsed, 0)
916
917 self.assertRaises(pyqueue.Empty, get, False, None)
918 self.assertTimingAlmostEqual(get.elapsed, 0)
919
920 self.assertRaises(pyqueue.Empty, get_nowait)
921 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
922
923 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
924 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
925
926 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
927 self.assertTimingAlmostEqual(get.elapsed, 0)
928
929 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
930 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
931
932 proc.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200933 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000934
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000935 @classmethod
936 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000937 for i in range(10, 20):
938 queue.put(i)
939 # note that at this point the items may only be buffered, so the
940 # process cannot shutdown until the feeder thread has finished
941 # pushing items onto the pipe.
942
943 def test_fork(self):
944 # Old versions of Queue would fail to create a new feeder
945 # thread for a forked process if the original process had its
946 # own feeder thread. This test checks that this no longer
947 # happens.
948
949 queue = self.Queue()
950
951 # put items on queue so that main process starts a feeder thread
952 for i in range(10):
953 queue.put(i)
954
955 # wait to make sure thread starts before we fork a new process
956 time.sleep(DELTA)
957
958 # fork process
959 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200960 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000961 p.start()
962
963 # check that all expected items are in the queue
964 for i in range(20):
965 self.assertEqual(queue.get(), i)
966 self.assertRaises(pyqueue.Empty, queue.get, False)
967
968 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200969 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970
971 def test_qsize(self):
972 q = self.Queue()
973 try:
974 self.assertEqual(q.qsize(), 0)
975 except NotImplementedError:
Zachary Ware9fe6d862013-12-08 00:20:35 -0600976 self.skipTest('qsize method not implemented')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000977 q.put(1)
978 self.assertEqual(q.qsize(), 1)
979 q.put(5)
980 self.assertEqual(q.qsize(), 2)
981 q.get()
982 self.assertEqual(q.qsize(), 1)
983 q.get()
984 self.assertEqual(q.qsize(), 0)
Victor Stinnerd7e64d92017-07-25 00:33:56 +0200985 close_queue(q)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000986
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000987 @classmethod
988 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000989 for obj in iter(q.get, None):
990 time.sleep(DELTA)
991 q.task_done()
992
993 def test_task_done(self):
994 queue = self.JoinableQueue()
995
Benjamin Petersone711caf2008-06-11 16:44:04 +0000996 workers = [self.Process(target=self._test_task_done, args=(queue,))
997 for i in range(4)]
998
999 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +02001000 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001001 p.start()
1002
1003 for i in range(10):
1004 queue.put(i)
1005
1006 queue.join()
1007
1008 for p in workers:
1009 queue.put(None)
1010
1011 for p in workers:
1012 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +02001013 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001014
Serhiy Storchakaf8904e92015-03-06 23:32:54 +02001015 def test_no_import_lock_contention(self):
1016 with test.support.temp_cwd():
1017 module_name = 'imported_by_an_imported_module'
1018 with open(module_name + '.py', 'w') as f:
1019 f.write("""if 1:
1020 import multiprocessing
1021
1022 q = multiprocessing.Queue()
1023 q.put('knock knock')
1024 q.get(timeout=3)
1025 q.close()
1026 del q
1027 """)
1028
1029 with test.support.DirsOnSysPath(os.getcwd()):
1030 try:
1031 __import__(module_name)
1032 except pyqueue.Empty:
1033 self.fail("Probable regression on import lock contention;"
1034 " see Issue #22853")
1035
Giampaolo Rodola'30830712013-04-17 13:12:27 +02001036 def test_timeout(self):
1037 q = multiprocessing.Queue()
1038 start = time.time()
Victor Stinneraad7b2e2015-02-05 14:25:05 +01001039 self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
Giampaolo Rodola'30830712013-04-17 13:12:27 +02001040 delta = time.time() - start
Victor Stinner5640d032018-08-03 02:09:00 +02001041 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1042 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1043 # failed because the delta was only 135.8 ms.
1044 self.assertGreaterEqual(delta, 0.100)
Victor Stinnerb4c52962017-07-25 02:40:55 +02001045 close_queue(q)
Giampaolo Rodola'30830712013-04-17 13:12:27 +02001046
grzgrzgrz3bc50f032017-05-25 16:22:57 +02001047 def test_queue_feeder_donot_stop_onexc(self):
1048 # bpo-30414: verify feeder handles exceptions correctly
1049 if self.TYPE != 'processes':
1050 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1051
1052 class NotSerializable(object):
1053 def __reduce__(self):
1054 raise AttributeError
1055 with test.support.captured_stderr():
1056 q = self.Queue()
1057 q.put(NotSerializable())
1058 q.put(True)
Victor Stinner8f6eeaf2017-06-13 23:48:47 +02001059 # bpo-30595: use a timeout of 1 second for slow buildbots
1060 self.assertTrue(q.get(timeout=1.0))
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001061 close_queue(q)
grzgrzgrz3bc50f032017-05-25 16:22:57 +02001062
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001063 with test.support.captured_stderr():
1064 # bpo-33078: verify that the queue size is correctly handled
1065 # on errors.
1066 q = self.Queue(maxsize=1)
1067 q.put(NotSerializable())
1068 q.put(True)
Thomas Moreaudec1c772018-03-21 18:56:27 +01001069 try:
1070 self.assertEqual(q.qsize(), 1)
1071 except NotImplementedError:
1072 # qsize is not available on all platform as it
1073 # relies on sem_getvalue
1074 pass
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001075 # bpo-30595: use a timeout of 1 second for slow buildbots
1076 self.assertTrue(q.get(timeout=1.0))
1077 # Check that the size of the queue is correct
Thomas Moreaudec1c772018-03-21 18:56:27 +01001078 self.assertTrue(q.empty())
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001079 close_queue(q)
1080
Thomas Moreau94459fd2018-01-05 11:15:54 +01001081 def test_queue_feeder_on_queue_feeder_error(self):
1082 # bpo-30006: verify feeder handles exceptions using the
1083 # _on_queue_feeder_error hook.
1084 if self.TYPE != 'processes':
1085 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1086
1087 class NotSerializable(object):
1088 """Mock unserializable object"""
1089 def __init__(self):
1090 self.reduce_was_called = False
1091 self.on_queue_feeder_error_was_called = False
1092
1093 def __reduce__(self):
1094 self.reduce_was_called = True
1095 raise AttributeError
1096
1097 class SafeQueue(multiprocessing.queues.Queue):
1098 """Queue with overloaded _on_queue_feeder_error hook"""
1099 @staticmethod
1100 def _on_queue_feeder_error(e, obj):
1101 if (isinstance(e, AttributeError) and
1102 isinstance(obj, NotSerializable)):
1103 obj.on_queue_feeder_error_was_called = True
1104
1105 not_serializable_obj = NotSerializable()
1106 # The captured_stderr reduces the noise in the test report
1107 with test.support.captured_stderr():
1108 q = SafeQueue(ctx=multiprocessing.get_context())
1109 q.put(not_serializable_obj)
1110
Ville Skyttä61f82e02018-04-20 23:08:45 +03001111 # Verify that q is still functioning correctly
Thomas Moreau94459fd2018-01-05 11:15:54 +01001112 q.put(True)
1113 self.assertTrue(q.get(timeout=1.0))
1114
1115 # Assert that the serialization and the hook have been called correctly
1116 self.assertTrue(not_serializable_obj.reduce_was_called)
1117 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
Zackery Spytz04617042018-10-13 03:26:09 -06001118
1119 def test_closed_queue_put_get_exceptions(self):
1120 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1121 q.close()
1122 with self.assertRaisesRegex(ValueError, 'is closed'):
1123 q.put('foo')
1124 with self.assertRaisesRegex(ValueError, 'is closed'):
1125 q.get()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001126#
1127#
1128#
1129
1130class _TestLock(BaseTestCase):
1131
1132 def test_lock(self):
1133 lock = self.Lock()
1134 self.assertEqual(lock.acquire(), True)
1135 self.assertEqual(lock.acquire(False), False)
1136 self.assertEqual(lock.release(), None)
1137 self.assertRaises((ValueError, threading.ThreadError), lock.release)
1138
1139 def test_rlock(self):
1140 lock = self.RLock()
1141 self.assertEqual(lock.acquire(), True)
1142 self.assertEqual(lock.acquire(), True)
1143 self.assertEqual(lock.acquire(), True)
1144 self.assertEqual(lock.release(), None)
1145 self.assertEqual(lock.release(), None)
1146 self.assertEqual(lock.release(), None)
1147 self.assertRaises((AssertionError, RuntimeError), lock.release)
1148
Jesse Nollerf8d00852009-03-31 03:25:07 +00001149 def test_lock_context(self):
1150 with self.Lock():
1151 pass
1152
Benjamin Petersone711caf2008-06-11 16:44:04 +00001153
1154class _TestSemaphore(BaseTestCase):
1155
1156 def _test_semaphore(self, sem):
1157 self.assertReturnsIfImplemented(2, get_value, sem)
1158 self.assertEqual(sem.acquire(), True)
1159 self.assertReturnsIfImplemented(1, get_value, sem)
1160 self.assertEqual(sem.acquire(), True)
1161 self.assertReturnsIfImplemented(0, get_value, sem)
1162 self.assertEqual(sem.acquire(False), False)
1163 self.assertReturnsIfImplemented(0, get_value, sem)
1164 self.assertEqual(sem.release(), None)
1165 self.assertReturnsIfImplemented(1, get_value, sem)
1166 self.assertEqual(sem.release(), None)
1167 self.assertReturnsIfImplemented(2, get_value, sem)
1168
1169 def test_semaphore(self):
1170 sem = self.Semaphore(2)
1171 self._test_semaphore(sem)
1172 self.assertEqual(sem.release(), None)
1173 self.assertReturnsIfImplemented(3, get_value, sem)
1174 self.assertEqual(sem.release(), None)
1175 self.assertReturnsIfImplemented(4, get_value, sem)
1176
1177 def test_bounded_semaphore(self):
1178 sem = self.BoundedSemaphore(2)
1179 self._test_semaphore(sem)
1180 # Currently fails on OS/X
1181 #if HAVE_GETVALUE:
1182 # self.assertRaises(ValueError, sem.release)
1183 # self.assertReturnsIfImplemented(2, get_value, sem)
1184
1185 def test_timeout(self):
1186 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001187 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001188
1189 sem = self.Semaphore(0)
1190 acquire = TimingWrapper(sem.acquire)
1191
1192 self.assertEqual(acquire(False), False)
1193 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1194
1195 self.assertEqual(acquire(False, None), False)
1196 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1197
1198 self.assertEqual(acquire(False, TIMEOUT1), False)
1199 self.assertTimingAlmostEqual(acquire.elapsed, 0)
1200
1201 self.assertEqual(acquire(True, TIMEOUT2), False)
1202 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1203
1204 self.assertEqual(acquire(timeout=TIMEOUT3), False)
1205 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1206
1207
1208class _TestCondition(BaseTestCase):
1209
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001210 @classmethod
1211 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001212 cond.acquire()
1213 sleeping.release()
1214 cond.wait(timeout)
1215 woken.release()
1216 cond.release()
1217
Antoine Pitrou48350412017-07-04 08:59:22 +02001218 def assertReachesEventually(self, func, value):
1219 for i in range(10):
1220 try:
1221 if func() == value:
1222 break
1223 except NotImplementedError:
1224 break
1225 time.sleep(DELTA)
1226 time.sleep(DELTA)
1227 self.assertReturnsIfImplemented(value, func)
1228
Benjamin Petersone711caf2008-06-11 16:44:04 +00001229 def check_invariant(self, cond):
1230 # this is only supposed to succeed when there are no sleepers
1231 if self.TYPE == 'processes':
1232 try:
1233 sleepers = (cond._sleeping_count.get_value() -
1234 cond._woken_count.get_value())
1235 self.assertEqual(sleepers, 0)
1236 self.assertEqual(cond._wait_semaphore.get_value(), 0)
1237 except NotImplementedError:
1238 pass
1239
1240 def test_notify(self):
1241 cond = self.Condition()
1242 sleeping = self.Semaphore(0)
1243 woken = self.Semaphore(0)
1244
1245 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001246 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001247 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001248 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001249
1250 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001251 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001252 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001253 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001254
1255 # wait for both children to start sleeping
1256 sleeping.acquire()
1257 sleeping.acquire()
1258
1259 # check no process/thread has woken up
1260 time.sleep(DELTA)
1261 self.assertReturnsIfImplemented(0, get_value, woken)
1262
1263 # wake up one process/thread
1264 cond.acquire()
1265 cond.notify()
1266 cond.release()
1267
1268 # check one process/thread has woken up
1269 time.sleep(DELTA)
1270 self.assertReturnsIfImplemented(1, get_value, woken)
1271
1272 # wake up another
1273 cond.acquire()
1274 cond.notify()
1275 cond.release()
1276
1277 # check other has woken up
1278 time.sleep(DELTA)
1279 self.assertReturnsIfImplemented(2, get_value, woken)
1280
1281 # check state is not mucked up
1282 self.check_invariant(cond)
1283 p.join()
1284
1285 def test_notify_all(self):
1286 cond = self.Condition()
1287 sleeping = self.Semaphore(0)
1288 woken = self.Semaphore(0)
1289
1290 # start some threads/processes which will timeout
1291 for i in range(3):
1292 p = self.Process(target=self.f,
1293 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001294 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001295 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001296 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001297
1298 t = threading.Thread(target=self.f,
1299 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +00001300 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001301 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001302 self.addCleanup(t.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001303
1304 # wait for them all to sleep
1305 for i in range(6):
1306 sleeping.acquire()
1307
1308 # check they have all timed out
1309 for i in range(6):
1310 woken.acquire()
1311 self.assertReturnsIfImplemented(0, get_value, woken)
1312
1313 # check state is not mucked up
1314 self.check_invariant(cond)
1315
1316 # start some more threads/processes
1317 for i in range(3):
1318 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001319 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001320 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001321 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001322
1323 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +00001324 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001325 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001326 self.addCleanup(t.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001327
1328 # wait for them to all sleep
1329 for i in range(6):
1330 sleeping.acquire()
1331
1332 # check no process/thread has woken up
1333 time.sleep(DELTA)
1334 self.assertReturnsIfImplemented(0, get_value, woken)
1335
1336 # wake them all up
1337 cond.acquire()
1338 cond.notify_all()
1339 cond.release()
1340
1341 # check they have all woken
Antoine Pitrou48350412017-07-04 08:59:22 +02001342 self.assertReachesEventually(lambda: get_value(woken), 6)
1343
1344 # check state is not mucked up
1345 self.check_invariant(cond)
1346
1347 def test_notify_n(self):
1348 cond = self.Condition()
1349 sleeping = self.Semaphore(0)
1350 woken = self.Semaphore(0)
1351
1352 # start some threads/processes
1353 for i in range(3):
1354 p = self.Process(target=self.f, args=(cond, sleeping, woken))
1355 p.daemon = True
1356 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001357 self.addCleanup(p.join)
Antoine Pitrou48350412017-07-04 08:59:22 +02001358
1359 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1360 t.daemon = True
1361 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001362 self.addCleanup(t.join)
Antoine Pitrou48350412017-07-04 08:59:22 +02001363
1364 # wait for them to all sleep
1365 for i in range(6):
1366 sleeping.acquire()
1367
1368 # check no process/thread has woken up
1369 time.sleep(DELTA)
1370 self.assertReturnsIfImplemented(0, get_value, woken)
1371
1372 # wake some of them up
1373 cond.acquire()
1374 cond.notify(n=2)
1375 cond.release()
1376
1377 # check 2 have woken
1378 self.assertReachesEventually(lambda: get_value(woken), 2)
1379
1380 # wake the rest of them
1381 cond.acquire()
1382 cond.notify(n=4)
1383 cond.release()
1384
1385 self.assertReachesEventually(lambda: get_value(woken), 6)
1386
1387 # doesn't do anything more
1388 cond.acquire()
1389 cond.notify(n=3)
1390 cond.release()
1391
Benjamin Petersone711caf2008-06-11 16:44:04 +00001392 self.assertReturnsIfImplemented(6, get_value, woken)
1393
1394 # check state is not mucked up
1395 self.check_invariant(cond)
1396
1397 def test_timeout(self):
1398 cond = self.Condition()
1399 wait = TimingWrapper(cond.wait)
1400 cond.acquire()
1401 res = wait(TIMEOUT1)
1402 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +00001403 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001404 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1405
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001406 @classmethod
1407 def _test_waitfor_f(cls, cond, state):
1408 with cond:
1409 state.value = 0
1410 cond.notify()
1411 result = cond.wait_for(lambda : state.value==4)
1412 if not result or state.value != 4:
1413 sys.exit(1)
1414
1415 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1416 def test_waitfor(self):
1417 # based on test in test/lock_tests.py
1418 cond = self.Condition()
1419 state = self.Value('i', -1)
1420
1421 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1422 p.daemon = True
1423 p.start()
1424
1425 with cond:
1426 result = cond.wait_for(lambda : state.value==0)
1427 self.assertTrue(result)
1428 self.assertEqual(state.value, 0)
1429
1430 for i in range(4):
1431 time.sleep(0.01)
1432 with cond:
1433 state.value += 1
1434 cond.notify()
1435
Victor Stinner11f08072017-09-15 06:55:31 -07001436 join_process(p)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001437 self.assertEqual(p.exitcode, 0)
1438
1439 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001440 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1441 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001442 with cond:
1443 expected = 0.1
1444 dt = time.time()
1445 result = cond.wait_for(lambda : state.value==4, timeout=expected)
1446 dt = time.time() - dt
1447 # borrow logic in assertTimeout() from test/lock_tests.py
1448 if not result and expected * 0.6 < dt < expected * 10.0:
1449 success.value = True
1450
1451 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1452 def test_waitfor_timeout(self):
1453 # based on test in test/lock_tests.py
1454 cond = self.Condition()
1455 state = self.Value('i', 0)
1456 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001457 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001458
1459 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001460 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001461 p.daemon = True
1462 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -07001463 self.assertTrue(sem.acquire(timeout=TIMEOUT))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001464
1465 # Only increment 3 times, so state == 4 is never reached.
1466 for i in range(3):
1467 time.sleep(0.01)
1468 with cond:
1469 state.value += 1
1470 cond.notify()
1471
Victor Stinner11f08072017-09-15 06:55:31 -07001472 join_process(p)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001473 self.assertTrue(success.value)
1474
Richard Oudkerk98449932012-06-05 13:15:29 +01001475 @classmethod
1476 def _test_wait_result(cls, c, pid):
1477 with c:
1478 c.notify()
1479 time.sleep(1)
1480 if pid is not None:
1481 os.kill(pid, signal.SIGINT)
1482
1483 def test_wait_result(self):
1484 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1485 pid = os.getpid()
1486 else:
1487 pid = None
1488
1489 c = self.Condition()
1490 with c:
1491 self.assertFalse(c.wait(0))
1492 self.assertFalse(c.wait(0.1))
1493
1494 p = self.Process(target=self._test_wait_result, args=(c, pid))
1495 p.start()
1496
Victor Stinner49257272018-06-27 22:24:02 +02001497 self.assertTrue(c.wait(60))
Richard Oudkerk98449932012-06-05 13:15:29 +01001498 if pid is not None:
Victor Stinner49257272018-06-27 22:24:02 +02001499 self.assertRaises(KeyboardInterrupt, c.wait, 60)
Richard Oudkerk98449932012-06-05 13:15:29 +01001500
1501 p.join()
1502
Benjamin Petersone711caf2008-06-11 16:44:04 +00001503
1504class _TestEvent(BaseTestCase):
1505
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001506 @classmethod
1507 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001508 time.sleep(TIMEOUT2)
1509 event.set()
1510
1511 def test_event(self):
1512 event = self.Event()
1513 wait = TimingWrapper(event.wait)
1514
Ezio Melotti13925002011-03-16 11:05:33 +02001515 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001516 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001517 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001518
Benjamin Peterson965ce872009-04-05 21:24:58 +00001519 # Removed, threading.Event.wait() will return the value of the __flag
1520 # instead of None. API Shear with the semaphore backed mp.Event
1521 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001522 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001523 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001524 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1525
1526 event.set()
1527
1528 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001529 self.assertEqual(event.is_set(), True)
1530 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001531 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001532 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001533 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1534 # self.assertEqual(event.is_set(), True)
1535
1536 event.clear()
1537
1538 #self.assertEqual(event.is_set(), False)
1539
Jesus Cea94f964f2011-09-09 20:26:57 +02001540 p = self.Process(target=self._test_event, args=(event,))
1541 p.daemon = True
1542 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001543 self.assertEqual(wait(), True)
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001544 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001545
1546#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001547# Tests for Barrier - adapted from tests in test/lock_tests.py
1548#
1549
1550# Many of the tests for threading.Barrier use a list as an atomic
1551# counter: a value is appended to increment the counter, and the
1552# length of the list gives the value. We use the class DummyList
1553# for the same purpose.
1554
1555class _DummyList(object):
1556
1557 def __init__(self):
1558 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1559 lock = multiprocessing.Lock()
1560 self.__setstate__((wrapper, lock))
1561 self._lengthbuf[0] = 0
1562
1563 def __setstate__(self, state):
1564 (self._wrapper, self._lock) = state
1565 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1566
1567 def __getstate__(self):
1568 return (self._wrapper, self._lock)
1569
1570 def append(self, _):
1571 with self._lock:
1572 self._lengthbuf[0] += 1
1573
1574 def __len__(self):
1575 with self._lock:
1576 return self._lengthbuf[0]
1577
1578def _wait():
1579 # A crude wait/yield function not relying on synchronization primitives.
1580 time.sleep(0.01)
1581
1582
1583class Bunch(object):
1584 """
1585 A bunch of threads.
1586 """
1587 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1588 """
1589 Construct a bunch of `n` threads running the same function `f`.
1590 If `wait_before_exit` is True, the threads won't terminate until
1591 do_finish() is called.
1592 """
1593 self.f = f
1594 self.args = args
1595 self.n = n
1596 self.started = namespace.DummyList()
1597 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001598 self._can_exit = namespace.Event()
1599 if not wait_before_exit:
1600 self._can_exit.set()
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001601
1602 threads = []
Richard Oudkerk3730a172012-06-15 18:26:07 +01001603 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001604 p = namespace.Process(target=self.task)
1605 p.daemon = True
1606 p.start()
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001607 threads.append(p)
1608
1609 def finalize(threads):
1610 for p in threads:
1611 p.join()
1612
1613 self._finalizer = weakref.finalize(self, finalize, threads)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001614
1615 def task(self):
1616 pid = os.getpid()
1617 self.started.append(pid)
1618 try:
1619 self.f(*self.args)
1620 finally:
1621 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001622 self._can_exit.wait(30)
1623 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001624
1625 def wait_for_started(self):
1626 while len(self.started) < self.n:
1627 _wait()
1628
1629 def wait_for_finished(self):
1630 while len(self.finished) < self.n:
1631 _wait()
1632
1633 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001634 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001635
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001636 def close(self):
1637 self._finalizer()
1638
Richard Oudkerk3730a172012-06-15 18:26:07 +01001639
1640class AppendTrue(object):
1641 def __init__(self, obj):
1642 self.obj = obj
1643 def __call__(self):
1644 self.obj.append(True)
1645
1646
1647class _TestBarrier(BaseTestCase):
1648 """
1649 Tests for Barrier objects.
1650 """
1651 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001652 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001653
1654 def setUp(self):
1655 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1656
1657 def tearDown(self):
1658 self.barrier.abort()
1659 self.barrier = None
1660
1661 def DummyList(self):
1662 if self.TYPE == 'threads':
1663 return []
1664 elif self.TYPE == 'manager':
1665 return self.manager.list()
1666 else:
1667 return _DummyList()
1668
1669 def run_threads(self, f, args):
1670 b = Bunch(self, f, args, self.N-1)
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001671 try:
1672 f(*args)
1673 b.wait_for_finished()
1674 finally:
1675 b.close()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001676
1677 @classmethod
1678 def multipass(cls, barrier, results, n):
1679 m = barrier.parties
1680 assert m == cls.N
1681 for i in range(n):
1682 results[0].append(True)
1683 assert len(results[1]) == i * m
1684 barrier.wait()
1685 results[1].append(True)
1686 assert len(results[0]) == (i + 1) * m
1687 barrier.wait()
1688 try:
1689 assert barrier.n_waiting == 0
1690 except NotImplementedError:
1691 pass
1692 assert not barrier.broken
1693
1694 def test_barrier(self, passes=1):
1695 """
1696 Test that a barrier is passed in lockstep
1697 """
1698 results = [self.DummyList(), self.DummyList()]
1699 self.run_threads(self.multipass, (self.barrier, results, passes))
1700
1701 def test_barrier_10(self):
1702 """
1703 Test that a barrier works for 10 consecutive runs
1704 """
1705 return self.test_barrier(10)
1706
1707 @classmethod
1708 def _test_wait_return_f(cls, barrier, queue):
1709 res = barrier.wait()
1710 queue.put(res)
1711
1712 def test_wait_return(self):
1713 """
1714 test the return value from barrier.wait
1715 """
1716 queue = self.Queue()
1717 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1718 results = [queue.get() for i in range(self.N)]
1719 self.assertEqual(results.count(0), 1)
Victor Stinnerb4c52962017-07-25 02:40:55 +02001720 close_queue(queue)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001721
1722 @classmethod
1723 def _test_action_f(cls, barrier, results):
1724 barrier.wait()
1725 if len(results) != 1:
1726 raise RuntimeError
1727
1728 def test_action(self):
1729 """
1730 Test the 'action' callback
1731 """
1732 results = self.DummyList()
1733 barrier = self.Barrier(self.N, action=AppendTrue(results))
1734 self.run_threads(self._test_action_f, (barrier, results))
1735 self.assertEqual(len(results), 1)
1736
1737 @classmethod
1738 def _test_abort_f(cls, barrier, results1, results2):
1739 try:
1740 i = barrier.wait()
1741 if i == cls.N//2:
1742 raise RuntimeError
1743 barrier.wait()
1744 results1.append(True)
1745 except threading.BrokenBarrierError:
1746 results2.append(True)
1747 except RuntimeError:
1748 barrier.abort()
1749
1750 def test_abort(self):
1751 """
1752 Test that an abort will put the barrier in a broken state
1753 """
1754 results1 = self.DummyList()
1755 results2 = self.DummyList()
1756 self.run_threads(self._test_abort_f,
1757 (self.barrier, results1, results2))
1758 self.assertEqual(len(results1), 0)
1759 self.assertEqual(len(results2), self.N-1)
1760 self.assertTrue(self.barrier.broken)
1761
1762 @classmethod
1763 def _test_reset_f(cls, barrier, results1, results2, results3):
1764 i = barrier.wait()
1765 if i == cls.N//2:
1766 # Wait until the other threads are all in the barrier.
1767 while barrier.n_waiting < cls.N-1:
1768 time.sleep(0.001)
1769 barrier.reset()
1770 else:
1771 try:
1772 barrier.wait()
1773 results1.append(True)
1774 except threading.BrokenBarrierError:
1775 results2.append(True)
1776 # Now, pass the barrier again
1777 barrier.wait()
1778 results3.append(True)
1779
1780 def test_reset(self):
1781 """
1782 Test that a 'reset' on a barrier frees the waiting threads
1783 """
1784 results1 = self.DummyList()
1785 results2 = self.DummyList()
1786 results3 = self.DummyList()
1787 self.run_threads(self._test_reset_f,
1788 (self.barrier, results1, results2, results3))
1789 self.assertEqual(len(results1), 0)
1790 self.assertEqual(len(results2), self.N-1)
1791 self.assertEqual(len(results3), self.N)
1792
1793 @classmethod
1794 def _test_abort_and_reset_f(cls, barrier, barrier2,
1795 results1, results2, results3):
1796 try:
1797 i = barrier.wait()
1798 if i == cls.N//2:
1799 raise RuntimeError
1800 barrier.wait()
1801 results1.append(True)
1802 except threading.BrokenBarrierError:
1803 results2.append(True)
1804 except RuntimeError:
1805 barrier.abort()
1806 # Synchronize and reset the barrier. Must synchronize first so
1807 # that everyone has left it when we reset, and after so that no
1808 # one enters it before the reset.
1809 if barrier2.wait() == cls.N//2:
1810 barrier.reset()
1811 barrier2.wait()
1812 barrier.wait()
1813 results3.append(True)
1814
1815 def test_abort_and_reset(self):
1816 """
1817 Test that a barrier can be reset after being broken.
1818 """
1819 results1 = self.DummyList()
1820 results2 = self.DummyList()
1821 results3 = self.DummyList()
1822 barrier2 = self.Barrier(self.N)
1823
1824 self.run_threads(self._test_abort_and_reset_f,
1825 (self.barrier, barrier2, results1, results2, results3))
1826 self.assertEqual(len(results1), 0)
1827 self.assertEqual(len(results2), self.N-1)
1828 self.assertEqual(len(results3), self.N)
1829
1830 @classmethod
1831 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001832 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001833 if i == cls.N//2:
1834 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001835 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001836 try:
1837 barrier.wait(0.5)
1838 except threading.BrokenBarrierError:
1839 results.append(True)
1840
1841 def test_timeout(self):
1842 """
1843 Test wait(timeout)
1844 """
1845 results = self.DummyList()
1846 self.run_threads(self._test_timeout_f, (self.barrier, results))
1847 self.assertEqual(len(results), self.barrier.parties)
1848
1849 @classmethod
1850 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001851 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001852 if i == cls.N//2:
1853 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001854 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001855 try:
1856 barrier.wait()
1857 except threading.BrokenBarrierError:
1858 results.append(True)
1859
1860 def test_default_timeout(self):
1861 """
1862 Test the barrier's default timeout
1863 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001864 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001865 results = self.DummyList()
1866 self.run_threads(self._test_default_timeout_f, (barrier, results))
1867 self.assertEqual(len(results), barrier.parties)
1868
1869 def test_single_thread(self):
1870 b = self.Barrier(1)
1871 b.wait()
1872 b.wait()
1873
1874 @classmethod
1875 def _test_thousand_f(cls, barrier, passes, conn, lock):
1876 for i in range(passes):
1877 barrier.wait()
1878 with lock:
1879 conn.send(i)
1880
1881 def test_thousand(self):
1882 if self.TYPE == 'manager':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001883 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk3730a172012-06-15 18:26:07 +01001884 passes = 1000
1885 lock = self.Lock()
1886 conn, child_conn = self.Pipe(False)
1887 for j in range(self.N):
1888 p = self.Process(target=self._test_thousand_f,
1889 args=(self.barrier, passes, child_conn, lock))
1890 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001891 self.addCleanup(p.join)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001892
1893 for i in range(passes):
1894 for j in range(self.N):
1895 self.assertEqual(conn.recv(), i)
1896
1897#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001898#
1899#
1900
1901class _TestValue(BaseTestCase):
1902
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001903 ALLOWED_TYPES = ('processes',)
1904
Benjamin Petersone711caf2008-06-11 16:44:04 +00001905 codes_values = [
1906 ('i', 4343, 24234),
1907 ('d', 3.625, -4.25),
1908 ('h', -232, 234),
Gareth Rees3913bad2017-07-21 11:35:33 +01001909 ('q', 2 ** 33, 2 ** 34),
Benjamin Petersone711caf2008-06-11 16:44:04 +00001910 ('c', latin('x'), latin('y'))
1911 ]
1912
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001913 def setUp(self):
1914 if not HAS_SHAREDCTYPES:
1915 self.skipTest("requires multiprocessing.sharedctypes")
1916
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001917 @classmethod
1918 def _test(cls, values):
1919 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001920 sv.value = cv[2]
1921
1922
1923 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001924 if raw:
1925 values = [self.RawValue(code, value)
1926 for code, value, _ in self.codes_values]
1927 else:
1928 values = [self.Value(code, value)
1929 for code, value, _ in self.codes_values]
1930
1931 for sv, cv in zip(values, self.codes_values):
1932 self.assertEqual(sv.value, cv[1])
1933
1934 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001935 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001936 proc.start()
1937 proc.join()
1938
1939 for sv, cv in zip(values, self.codes_values):
1940 self.assertEqual(sv.value, cv[2])
1941
1942 def test_rawvalue(self):
1943 self.test_value(raw=True)
1944
1945 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001946 val1 = self.Value('i', 5)
1947 lock1 = val1.get_lock()
1948 obj1 = val1.get_obj()
1949
1950 val2 = self.Value('i', 5, lock=None)
1951 lock2 = val2.get_lock()
1952 obj2 = val2.get_obj()
1953
1954 lock = self.Lock()
1955 val3 = self.Value('i', 5, lock=lock)
1956 lock3 = val3.get_lock()
1957 obj3 = val3.get_obj()
1958 self.assertEqual(lock, lock3)
1959
Jesse Nollerb0516a62009-01-18 03:11:38 +00001960 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001961 self.assertFalse(hasattr(arr4, 'get_lock'))
1962 self.assertFalse(hasattr(arr4, 'get_obj'))
1963
Jesse Nollerb0516a62009-01-18 03:11:38 +00001964 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1965
1966 arr5 = self.RawValue('i', 5)
1967 self.assertFalse(hasattr(arr5, 'get_lock'))
1968 self.assertFalse(hasattr(arr5, 'get_obj'))
1969
Benjamin Petersone711caf2008-06-11 16:44:04 +00001970
1971class _TestArray(BaseTestCase):
1972
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001973 ALLOWED_TYPES = ('processes',)
1974
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001975 @classmethod
1976 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001977 for i in range(1, len(seq)):
1978 seq[i] += seq[i-1]
1979
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001980 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001981 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001982 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1983 if raw:
1984 arr = self.RawArray('i', seq)
1985 else:
1986 arr = self.Array('i', seq)
1987
1988 self.assertEqual(len(arr), len(seq))
1989 self.assertEqual(arr[3], seq[3])
1990 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1991
1992 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1993
1994 self.assertEqual(list(arr[:]), seq)
1995
1996 self.f(seq)
1997
1998 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001999 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002000 p.start()
2001 p.join()
2002
2003 self.assertEqual(list(arr[:]), seq)
2004
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002005 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00002006 def test_array_from_size(self):
2007 size = 10
2008 # Test for zeroing (see issue #11675).
2009 # The repetition below strengthens the test by increasing the chances
2010 # of previously allocated non-zero memory being used for the new array
2011 # on the 2nd and 3rd loops.
2012 for _ in range(3):
2013 arr = self.Array('i', size)
2014 self.assertEqual(len(arr), size)
2015 self.assertEqual(list(arr), [0] * size)
2016 arr[:] = range(10)
2017 self.assertEqual(list(arr), list(range(10)))
2018 del arr
2019
2020 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002021 def test_rawarray(self):
2022 self.test_array(raw=True)
2023
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002024 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002025 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002026 arr1 = self.Array('i', list(range(10)))
2027 lock1 = arr1.get_lock()
2028 obj1 = arr1.get_obj()
2029
2030 arr2 = self.Array('i', list(range(10)), lock=None)
2031 lock2 = arr2.get_lock()
2032 obj2 = arr2.get_obj()
2033
2034 lock = self.Lock()
2035 arr3 = self.Array('i', list(range(10)), lock=lock)
2036 lock3 = arr3.get_lock()
2037 obj3 = arr3.get_obj()
2038 self.assertEqual(lock, lock3)
2039
Jesse Nollerb0516a62009-01-18 03:11:38 +00002040 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002041 self.assertFalse(hasattr(arr4, 'get_lock'))
2042 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00002043 self.assertRaises(AttributeError,
2044 self.Array, 'i', range(10), lock='notalock')
2045
2046 arr5 = self.RawArray('i', range(10))
2047 self.assertFalse(hasattr(arr5, 'get_lock'))
2048 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002049
2050#
2051#
2052#
2053
2054class _TestContainers(BaseTestCase):
2055
2056 ALLOWED_TYPES = ('manager',)
2057
2058 def test_list(self):
2059 a = self.list(list(range(10)))
2060 self.assertEqual(a[:], list(range(10)))
2061
2062 b = self.list()
2063 self.assertEqual(b[:], [])
2064
2065 b.extend(list(range(5)))
2066 self.assertEqual(b[:], list(range(5)))
2067
2068 self.assertEqual(b[2], 2)
2069 self.assertEqual(b[2:10], [2,3,4])
2070
2071 b *= 2
2072 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2073
2074 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2075
2076 self.assertEqual(a[:], list(range(10)))
2077
2078 d = [a, b]
2079 e = self.list(d)
2080 self.assertEqual(
Davin Potts86a76682016-09-07 18:48:01 -05002081 [element[:] for element in e],
Benjamin Petersone711caf2008-06-11 16:44:04 +00002082 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2083 )
2084
2085 f = self.list([a])
2086 a.append('hello')
Davin Potts86a76682016-09-07 18:48:01 -05002087 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2088
Serhiy Storchakae0e50652018-09-17 14:24:01 +03002089 def test_list_iter(self):
2090 a = self.list(list(range(10)))
2091 it = iter(a)
2092 self.assertEqual(list(it), list(range(10)))
2093 self.assertEqual(list(it), []) # exhausted
2094 # list modified during iteration
2095 it = iter(a)
2096 a[0] = 100
2097 self.assertEqual(next(it), 100)
2098
Davin Potts86a76682016-09-07 18:48:01 -05002099 def test_list_proxy_in_list(self):
2100 a = self.list([self.list(range(3)) for _i in range(3)])
2101 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2102
2103 a[0][-1] = 55
2104 self.assertEqual(a[0][:], [0, 1, 55])
2105 for i in range(1, 3):
2106 self.assertEqual(a[i][:], [0, 1, 2])
2107
2108 self.assertEqual(a[1].pop(), 2)
2109 self.assertEqual(len(a[1]), 2)
2110 for i in range(0, 3, 2):
2111 self.assertEqual(len(a[i]), 3)
2112
2113 del a
2114
2115 b = self.list()
2116 b.append(b)
2117 del b
Benjamin Petersone711caf2008-06-11 16:44:04 +00002118
2119 def test_dict(self):
2120 d = self.dict()
2121 indices = list(range(65, 70))
2122 for i in indices:
2123 d[i] = chr(i)
2124 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2125 self.assertEqual(sorted(d.keys()), indices)
2126 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2127 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2128
Serhiy Storchakae0e50652018-09-17 14:24:01 +03002129 def test_dict_iter(self):
2130 d = self.dict()
2131 indices = list(range(65, 70))
2132 for i in indices:
2133 d[i] = chr(i)
2134 it = iter(d)
2135 self.assertEqual(list(it), indices)
2136 self.assertEqual(list(it), []) # exhausted
2137 # dictionary changed size during iteration
2138 it = iter(d)
2139 d.clear()
2140 self.assertRaises(RuntimeError, next, it)
2141
Davin Potts86a76682016-09-07 18:48:01 -05002142 def test_dict_proxy_nested(self):
2143 pets = self.dict(ferrets=2, hamsters=4)
2144 supplies = self.dict(water=10, feed=3)
2145 d = self.dict(pets=pets, supplies=supplies)
2146
2147 self.assertEqual(supplies['water'], 10)
2148 self.assertEqual(d['supplies']['water'], 10)
2149
2150 d['supplies']['blankets'] = 5
2151 self.assertEqual(supplies['blankets'], 5)
2152 self.assertEqual(d['supplies']['blankets'], 5)
2153
2154 d['supplies']['water'] = 7
2155 self.assertEqual(supplies['water'], 7)
2156 self.assertEqual(d['supplies']['water'], 7)
2157
2158 del pets
2159 del supplies
2160 self.assertEqual(d['pets']['ferrets'], 2)
2161 d['supplies']['blankets'] = 11
2162 self.assertEqual(d['supplies']['blankets'], 11)
2163
2164 pets = d['pets']
2165 supplies = d['supplies']
2166 supplies['water'] = 7
2167 self.assertEqual(supplies['water'], 7)
2168 self.assertEqual(d['supplies']['water'], 7)
2169
2170 d.clear()
2171 self.assertEqual(len(d), 0)
2172 self.assertEqual(supplies['water'], 7)
2173 self.assertEqual(pets['hamsters'], 4)
2174
2175 l = self.list([pets, supplies])
2176 l[0]['marmots'] = 1
2177 self.assertEqual(pets['marmots'], 1)
2178 self.assertEqual(l[0]['marmots'], 1)
2179
2180 del pets
2181 del supplies
2182 self.assertEqual(l[0]['marmots'], 1)
2183
2184 outer = self.list([[88, 99], l])
2185 self.assertIsInstance(outer[0], list) # Not a ListProxy
2186 self.assertEqual(outer[-1][-1]['feed'], 3)
2187
Benjamin Petersone711caf2008-06-11 16:44:04 +00002188 def test_namespace(self):
2189 n = self.Namespace()
2190 n.name = 'Bob'
2191 n.job = 'Builder'
2192 n._hidden = 'hidden'
2193 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2194 del n.job
2195 self.assertEqual(str(n), "Namespace(name='Bob')")
2196 self.assertTrue(hasattr(n, 'name'))
2197 self.assertTrue(not hasattr(n, 'job'))
2198
2199#
2200#
2201#
2202
2203def sqr(x, wait=0.0):
2204 time.sleep(wait)
2205 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00002206
Antoine Pitroude911b22011-12-21 11:03:24 +01002207def mul(x, y):
2208 return x*y
2209
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002210def raise_large_valuerror(wait):
2211 time.sleep(wait)
2212 raise ValueError("x" * 1024**2)
2213
Antoine Pitrou89889452017-03-24 13:52:11 +01002214def identity(x):
2215 return x
2216
2217class CountedObject(object):
2218 n_instances = 0
2219
2220 def __new__(cls):
2221 cls.n_instances += 1
2222 return object.__new__(cls)
2223
2224 def __del__(self):
2225 type(self).n_instances -= 1
2226
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002227class SayWhenError(ValueError): pass
2228
2229def exception_throwing_generator(total, when):
Xiang Zhang794623b2017-03-29 11:58:54 +08002230 if when == -1:
2231 raise SayWhenError("Somebody said when")
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002232 for i in range(total):
2233 if i == when:
2234 raise SayWhenError("Somebody said when")
2235 yield i
2236
Antoine Pitrou89889452017-03-24 13:52:11 +01002237
Benjamin Petersone711caf2008-06-11 16:44:04 +00002238class _TestPool(BaseTestCase):
2239
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002240 @classmethod
2241 def setUpClass(cls):
2242 super().setUpClass()
2243 cls.pool = cls.Pool(4)
2244
2245 @classmethod
2246 def tearDownClass(cls):
2247 cls.pool.terminate()
2248 cls.pool.join()
2249 cls.pool = None
2250 super().tearDownClass()
2251
Benjamin Petersone711caf2008-06-11 16:44:04 +00002252 def test_apply(self):
2253 papply = self.pool.apply
2254 self.assertEqual(papply(sqr, (5,)), sqr(5))
2255 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2256
2257 def test_map(self):
2258 pmap = self.pool.map
2259 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2260 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2261 list(map(sqr, list(range(100)))))
2262
Antoine Pitroude911b22011-12-21 11:03:24 +01002263 def test_starmap(self):
2264 psmap = self.pool.starmap
2265 tuples = list(zip(range(10), range(9,-1, -1)))
2266 self.assertEqual(psmap(mul, tuples),
2267 list(itertools.starmap(mul, tuples)))
2268 tuples = list(zip(range(100), range(99,-1, -1)))
2269 self.assertEqual(psmap(mul, tuples, chunksize=20),
2270 list(itertools.starmap(mul, tuples)))
2271
2272 def test_starmap_async(self):
2273 tuples = list(zip(range(100), range(99,-1, -1)))
2274 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2275 list(itertools.starmap(mul, tuples)))
2276
Hynek Schlawack254af262012-10-27 12:53:02 +02002277 def test_map_async(self):
2278 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2279 list(map(sqr, list(range(10)))))
2280
2281 def test_map_async_callbacks(self):
2282 call_args = self.manager.list() if self.TYPE == 'manager' else []
2283 self.pool.map_async(int, ['1'],
2284 callback=call_args.append,
2285 error_callback=call_args.append).wait()
2286 self.assertEqual(1, len(call_args))
2287 self.assertEqual([1], call_args[0])
2288 self.pool.map_async(int, ['a'],
2289 callback=call_args.append,
2290 error_callback=call_args.append).wait()
2291 self.assertEqual(2, len(call_args))
2292 self.assertIsInstance(call_args[1], ValueError)
2293
Richard Oudkerke90cedb2013-10-28 23:11:58 +00002294 def test_map_unplicklable(self):
2295 # Issue #19425 -- failure to pickle should not cause a hang
2296 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002297 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerke90cedb2013-10-28 23:11:58 +00002298 class A(object):
2299 def __reduce__(self):
2300 raise RuntimeError('cannot pickle')
2301 with self.assertRaises(RuntimeError):
2302 self.pool.map(sqr, [A()]*10)
2303
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00002304 def test_map_chunksize(self):
2305 try:
2306 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2307 except multiprocessing.TimeoutError:
2308 self.fail("pool.map_async with chunksize stalled on null list")
2309
Xiang Zhang794623b2017-03-29 11:58:54 +08002310 def test_map_handle_iterable_exception(self):
2311 if self.TYPE == 'manager':
2312 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2313
2314 # SayWhenError seen at the very first of the iterable
2315 with self.assertRaises(SayWhenError):
2316 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2317 # again, make sure it's reentrant
2318 with self.assertRaises(SayWhenError):
2319 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2320
2321 with self.assertRaises(SayWhenError):
2322 self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2323
2324 class SpecialIterable:
2325 def __iter__(self):
2326 return self
2327 def __next__(self):
2328 raise SayWhenError
2329 def __len__(self):
2330 return 1
2331 with self.assertRaises(SayWhenError):
2332 self.pool.map(sqr, SpecialIterable(), 1)
2333 with self.assertRaises(SayWhenError):
2334 self.pool.map(sqr, SpecialIterable(), 1)
2335
Benjamin Petersone711caf2008-06-11 16:44:04 +00002336 def test_async(self):
2337 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2338 get = TimingWrapper(res.get)
2339 self.assertEqual(get(), 49)
2340 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2341
2342 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00002343 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002344 get = TimingWrapper(res.get)
2345 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2346 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2347
2348 def test_imap(self):
2349 it = self.pool.imap(sqr, list(range(10)))
2350 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2351
2352 it = self.pool.imap(sqr, list(range(10)))
2353 for i in range(10):
2354 self.assertEqual(next(it), i*i)
2355 self.assertRaises(StopIteration, it.__next__)
2356
2357 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2358 for i in range(1000):
2359 self.assertEqual(next(it), i*i)
2360 self.assertRaises(StopIteration, it.__next__)
2361
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002362 def test_imap_handle_iterable_exception(self):
2363 if self.TYPE == 'manager':
2364 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2365
Xiang Zhang794623b2017-03-29 11:58:54 +08002366 # SayWhenError seen at the very first of the iterable
2367 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2368 self.assertRaises(SayWhenError, it.__next__)
2369 # again, make sure it's reentrant
2370 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2371 self.assertRaises(SayWhenError, it.__next__)
2372
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002373 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2374 for i in range(3):
2375 self.assertEqual(next(it), i*i)
2376 self.assertRaises(SayWhenError, it.__next__)
2377
2378 # SayWhenError seen at start of problematic chunk's results
2379 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2380 for i in range(6):
2381 self.assertEqual(next(it), i*i)
2382 self.assertRaises(SayWhenError, it.__next__)
2383 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2384 for i in range(4):
2385 self.assertEqual(next(it), i*i)
2386 self.assertRaises(SayWhenError, it.__next__)
2387
Benjamin Petersone711caf2008-06-11 16:44:04 +00002388 def test_imap_unordered(self):
Victor Stinner23401fb2018-07-03 13:20:35 +02002389 it = self.pool.imap_unordered(sqr, list(range(10)))
2390 self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002391
Victor Stinner23401fb2018-07-03 13:20:35 +02002392 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002393 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2394
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002395 def test_imap_unordered_handle_iterable_exception(self):
2396 if self.TYPE == 'manager':
2397 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2398
Xiang Zhang794623b2017-03-29 11:58:54 +08002399 # SayWhenError seen at the very first of the iterable
2400 it = self.pool.imap_unordered(sqr,
2401 exception_throwing_generator(1, -1),
2402 1)
2403 self.assertRaises(SayWhenError, it.__next__)
2404 # again, make sure it's reentrant
2405 it = self.pool.imap_unordered(sqr,
2406 exception_throwing_generator(1, -1),
2407 1)
2408 self.assertRaises(SayWhenError, it.__next__)
2409
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002410 it = self.pool.imap_unordered(sqr,
2411 exception_throwing_generator(10, 3),
2412 1)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002413 expected_values = list(map(sqr, list(range(10))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002414 with self.assertRaises(SayWhenError):
2415 # imap_unordered makes it difficult to anticipate the SayWhenError
2416 for i in range(10):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002417 value = next(it)
2418 self.assertIn(value, expected_values)
2419 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002420
2421 it = self.pool.imap_unordered(sqr,
2422 exception_throwing_generator(20, 7),
2423 2)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002424 expected_values = list(map(sqr, list(range(20))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002425 with self.assertRaises(SayWhenError):
2426 for i in range(20):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002427 value = next(it)
2428 self.assertIn(value, expected_values)
2429 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002430
Benjamin Petersone711caf2008-06-11 16:44:04 +00002431 def test_make_pool(self):
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002432 expected_error = (RemoteError if self.TYPE == 'manager'
2433 else ValueError)
Victor Stinner2fae27b2011-06-20 17:53:35 +02002434
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002435 self.assertRaises(expected_error, self.Pool, -1)
2436 self.assertRaises(expected_error, self.Pool, 0)
2437
2438 if self.TYPE != 'manager':
2439 p = self.Pool(3)
2440 try:
2441 self.assertEqual(3, len(p._pool))
2442 finally:
2443 p.close()
2444 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002445
2446 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002447 result = self.pool.map_async(
2448 time.sleep, [0.1 for i in range(10000)], chunksize=1
2449 )
2450 self.pool.terminate()
2451 join = TimingWrapper(self.pool.join)
2452 join()
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002453 # Sanity check the pool didn't wait for all tasks to finish
2454 self.assertLess(join.elapsed, 2.0)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002455
Richard Oudkerke41682b2012-06-06 19:04:57 +01002456 def test_empty_iterable(self):
2457 # See Issue 12157
2458 p = self.Pool(1)
2459
2460 self.assertEqual(p.map(sqr, []), [])
2461 self.assertEqual(list(p.imap(sqr, [])), [])
2462 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2463 self.assertEqual(p.map_async(sqr, []).get(), [])
2464
2465 p.close()
2466 p.join()
2467
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002468 def test_context(self):
2469 if self.TYPE == 'processes':
2470 L = list(range(10))
2471 expected = [sqr(i) for i in L]
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002472 with self.Pool(2) as p:
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002473 r = p.map_async(sqr, L)
2474 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04002475 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002476
Richard Oudkerk85757832013-05-06 11:38:25 +01002477 @classmethod
2478 def _test_traceback(cls):
2479 raise RuntimeError(123) # some comment
2480
2481 def test_traceback(self):
2482 # We want ensure that the traceback from the child process is
2483 # contained in the traceback raised in the main process.
2484 if self.TYPE == 'processes':
2485 with self.Pool(1) as p:
2486 try:
2487 p.apply(self._test_traceback)
2488 except Exception as e:
2489 exc = e
2490 else:
Xiang Zhang794623b2017-03-29 11:58:54 +08002491 self.fail('expected RuntimeError')
Richard Oudkerk85757832013-05-06 11:38:25 +01002492 self.assertIs(type(exc), RuntimeError)
2493 self.assertEqual(exc.args, (123,))
2494 cause = exc.__cause__
2495 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2496 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2497
2498 with test.support.captured_stderr() as f1:
2499 try:
2500 raise exc
2501 except RuntimeError:
2502 sys.excepthook(*sys.exc_info())
2503 self.assertIn('raise RuntimeError(123) # some comment',
2504 f1.getvalue())
Xiang Zhang794623b2017-03-29 11:58:54 +08002505 # _helper_reraises_exception should not make the error
2506 # a remote exception
2507 with self.Pool(1) as p:
2508 try:
2509 p.map(sqr, exception_throwing_generator(1, -1), 1)
2510 except Exception as e:
2511 exc = e
2512 else:
2513 self.fail('expected SayWhenError')
2514 self.assertIs(type(exc), SayWhenError)
2515 self.assertIs(exc.__cause__, None)
Richard Oudkerk85757832013-05-06 11:38:25 +01002516
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002517 @classmethod
2518 def _test_wrapped_exception(cls):
2519 raise RuntimeError('foo')
2520
2521 def test_wrapped_exception(self):
2522 # Issue #20980: Should not wrap exception when using thread pool
2523 with self.Pool(1) as p:
2524 with self.assertRaises(RuntimeError):
2525 p.apply(self._test_wrapped_exception)
2526
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002527 def test_map_no_failfast(self):
2528 # Issue #23992: the fail-fast behaviour when an exception is raised
2529 # during map() would make Pool.join() deadlock, because a worker
2530 # process would fill the result queue (after the result handler thread
2531 # terminated, hence not draining it anymore).
2532
2533 t_start = time.time()
2534
2535 with self.assertRaises(ValueError):
2536 with self.Pool(2) as p:
2537 try:
2538 p.map(raise_large_valuerror, [0, 1])
2539 finally:
2540 time.sleep(0.5)
2541 p.close()
2542 p.join()
2543
2544 # check that we indeed waited for all jobs
2545 self.assertGreater(time.time() - t_start, 0.9)
2546
Antoine Pitrou89889452017-03-24 13:52:11 +01002547 def test_release_task_refs(self):
2548 # Issue #29861: task arguments and results should not be kept
2549 # alive after we are done with them.
2550 objs = [CountedObject() for i in range(10)]
2551 refs = [weakref.ref(o) for o in objs]
2552 self.pool.map(identity, objs)
2553
2554 del objs
Antoine Pitrou685cdb92017-04-14 13:10:00 +02002555 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou89889452017-03-24 13:52:11 +01002556 self.assertEqual(set(wr() for wr in refs), {None})
2557 # With a process pool, copies of the objects are returned, check
2558 # they were released too.
2559 self.assertEqual(CountedObject.n_instances, 0)
2560
tzickel97bfe8d2018-10-03 00:01:23 +03002561 def test_del_pool(self):
2562 p = self.Pool(1)
2563 wr = weakref.ref(p)
2564 del p
2565 gc.collect()
2566 self.assertIsNone(wr())
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002567
Ask Solem2afcbf22010-11-09 20:55:52 +00002568def raising():
2569 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00002570
Ask Solem2afcbf22010-11-09 20:55:52 +00002571def unpickleable_result():
2572 return lambda: 42
2573
2574class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00002575 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00002576
2577 def test_async_error_callback(self):
2578 p = multiprocessing.Pool(2)
2579
2580 scratchpad = [None]
2581 def errback(exc):
2582 scratchpad[0] = exc
2583
2584 res = p.apply_async(raising, error_callback=errback)
2585 self.assertRaises(KeyError, res.get)
2586 self.assertTrue(scratchpad[0])
2587 self.assertIsInstance(scratchpad[0], KeyError)
2588
2589 p.close()
2590 p.join()
2591
2592 def test_unpickleable_result(self):
2593 from multiprocessing.pool import MaybeEncodingError
2594 p = multiprocessing.Pool(2)
2595
2596 # Make sure we don't lose pool processes because of encoding errors.
2597 for iteration in range(20):
2598
2599 scratchpad = [None]
2600 def errback(exc):
2601 scratchpad[0] = exc
2602
2603 res = p.apply_async(unpickleable_result, error_callback=errback)
2604 self.assertRaises(MaybeEncodingError, res.get)
2605 wrapped = scratchpad[0]
2606 self.assertTrue(wrapped)
2607 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2608 self.assertIsNotNone(wrapped.exc)
2609 self.assertIsNotNone(wrapped.value)
2610
2611 p.close()
2612 p.join()
2613
2614class _TestPoolWorkerLifetime(BaseTestCase):
2615 ALLOWED_TYPES = ('processes', )
2616
Jesse Noller1f0b6582010-01-27 03:36:01 +00002617 def test_pool_worker_lifetime(self):
2618 p = multiprocessing.Pool(3, maxtasksperchild=10)
2619 self.assertEqual(3, len(p._pool))
2620 origworkerpids = [w.pid for w in p._pool]
2621 # Run many tasks so each worker gets replaced (hopefully)
2622 results = []
2623 for i in range(100):
2624 results.append(p.apply_async(sqr, (i, )))
2625 # Fetch the results and verify we got the right answers,
2626 # also ensuring all the tasks have completed.
2627 for (j, res) in enumerate(results):
2628 self.assertEqual(res.get(), sqr(j))
2629 # Refill the pool
2630 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00002631 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02002632 # (countdown * DELTA = 5 seconds max startup process time)
2633 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00002634 while countdown and not all(w.is_alive() for w in p._pool):
2635 countdown -= 1
2636 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002637 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00002638 # All pids should be assigned. See issue #7805.
2639 self.assertNotIn(None, origworkerpids)
2640 self.assertNotIn(None, finalworkerpids)
2641 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00002642 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2643 p.close()
2644 p.join()
2645
Charles-François Natalif8859e12011-10-24 18:45:29 +02002646 def test_pool_worker_lifetime_early_close(self):
2647 # Issue #10332: closing a pool whose workers have limited lifetimes
2648 # before all the tasks completed would make join() hang.
2649 p = multiprocessing.Pool(3, maxtasksperchild=1)
2650 results = []
2651 for i in range(6):
2652 results.append(p.apply_async(sqr, (i, 0.3)))
2653 p.close()
2654 p.join()
2655 # check the results
2656 for (j, res) in enumerate(results):
2657 self.assertEqual(res.get(), sqr(j))
2658
Benjamin Petersone711caf2008-06-11 16:44:04 +00002659#
2660# Test of creating a customized manager class
2661#
2662
2663from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2664
2665class FooBar(object):
2666 def f(self):
2667 return 'f()'
2668 def g(self):
2669 raise ValueError
2670 def _h(self):
2671 return '_h()'
2672
2673def baz():
2674 for i in range(10):
2675 yield i*i
2676
2677class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00002678 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002679 def __iter__(self):
2680 return self
2681 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002682 return self._callmethod('__next__')
2683
2684class MyManager(BaseManager):
2685 pass
2686
2687MyManager.register('Foo', callable=FooBar)
2688MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2689MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2690
2691
2692class _TestMyManager(BaseTestCase):
2693
2694 ALLOWED_TYPES = ('manager',)
2695
2696 def test_mymanager(self):
2697 manager = MyManager()
2698 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01002699 self.common(manager)
2700 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002701
Richard Oudkerkac385712012-06-18 21:29:30 +01002702 # If the manager process exited cleanly then the exitcode
2703 # will be zero. Otherwise (after a short timeout)
2704 # terminate() is used, resulting in an exitcode of -SIGTERM.
2705 self.assertEqual(manager._process.exitcode, 0)
2706
2707 def test_mymanager_context(self):
2708 with MyManager() as manager:
2709 self.common(manager)
Victor Stinnerfbd71722018-06-27 18:18:10 +02002710 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2711 # to the manager process if it takes longer than 1 second to stop.
2712 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
Richard Oudkerkac385712012-06-18 21:29:30 +01002713
2714 def test_mymanager_context_prestarted(self):
2715 manager = MyManager()
2716 manager.start()
2717 with manager:
2718 self.common(manager)
2719 self.assertEqual(manager._process.exitcode, 0)
2720
2721 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002722 foo = manager.Foo()
2723 bar = manager.Bar()
2724 baz = manager.baz()
2725
2726 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2727 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2728
2729 self.assertEqual(foo_methods, ['f', 'g'])
2730 self.assertEqual(bar_methods, ['f', '_h'])
2731
2732 self.assertEqual(foo.f(), 'f()')
2733 self.assertRaises(ValueError, foo.g)
2734 self.assertEqual(foo._callmethod('f'), 'f()')
2735 self.assertRaises(RemoteError, foo._callmethod, '_h')
2736
2737 self.assertEqual(bar.f(), 'f()')
2738 self.assertEqual(bar._h(), '_h()')
2739 self.assertEqual(bar._callmethod('f'), 'f()')
2740 self.assertEqual(bar._callmethod('_h'), '_h()')
2741
2742 self.assertEqual(list(baz), [i*i for i in range(10)])
2743
Richard Oudkerk73d9a292012-06-14 15:30:10 +01002744
Benjamin Petersone711caf2008-06-11 16:44:04 +00002745#
2746# Test of connecting to a remote server and using xmlrpclib for serialization
2747#
2748
2749_queue = pyqueue.Queue()
2750def get_queue():
2751 return _queue
2752
2753class QueueManager(BaseManager):
2754 '''manager class used by server process'''
2755QueueManager.register('get_queue', callable=get_queue)
2756
2757class QueueManager2(BaseManager):
2758 '''manager class which specifies the same interface as QueueManager'''
2759QueueManager2.register('get_queue')
2760
2761
2762SERIALIZER = 'xmlrpclib'
2763
2764class _TestRemoteManager(BaseTestCase):
2765
2766 ALLOWED_TYPES = ('manager',)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002767 values = ['hello world', None, True, 2.25,
2768 'hall\xe5 v\xe4rlden',
2769 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2770 b'hall\xe5 v\xe4rlden',
2771 ]
2772 result = values[:]
Benjamin Petersone711caf2008-06-11 16:44:04 +00002773
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002774 @classmethod
2775 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002776 manager = QueueManager2(
2777 address=address, authkey=authkey, serializer=SERIALIZER
2778 )
2779 manager.connect()
2780 queue = manager.get_queue()
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002781 # Note that xmlrpclib will deserialize object as a list not a tuple
2782 queue.put(tuple(cls.values))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002783
2784 def test_remote(self):
2785 authkey = os.urandom(32)
2786
2787 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002788 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00002789 )
2790 manager.start()
2791
2792 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002793 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002794 p.start()
2795
2796 manager2 = QueueManager2(
2797 address=manager.address, authkey=authkey, serializer=SERIALIZER
2798 )
2799 manager2.connect()
2800 queue = manager2.get_queue()
2801
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002802 self.assertEqual(queue.get(), self.result)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002803
2804 # Because we are using xmlrpclib for serialization instead of
2805 # pickle this will cause a serialization error.
2806 self.assertRaises(Exception, queue.put, time.sleep)
2807
2808 # Make queue finalizer run before the server is stopped
2809 del queue
2810 manager.shutdown()
2811
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002812class _TestManagerRestart(BaseTestCase):
2813
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002814 @classmethod
2815 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002816 manager = QueueManager(
2817 address=address, authkey=authkey, serializer=SERIALIZER)
2818 manager.connect()
2819 queue = manager.get_queue()
2820 queue.put('hello world')
2821
2822 def test_rapid_restart(self):
2823 authkey = os.urandom(32)
2824 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002825 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002826 srvr = manager.get_server()
2827 addr = srvr.address
2828 # Close the connection.Listener socket which gets opened as a part
2829 # of manager.get_server(). It's not needed for the test.
2830 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002831 manager.start()
2832
2833 p = self.Process(target=self._putter, args=(manager.address, authkey))
2834 p.start()
Victor Stinner17657bb2017-08-16 12:46:04 +02002835 p.join()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002836 queue = manager.get_queue()
2837 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002838 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002839 manager.shutdown()
Victor Stinner17657bb2017-08-16 12:46:04 +02002840
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002841 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002842 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002843 try:
2844 manager.start()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002845 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002846 if e.errno != errno.EADDRINUSE:
2847 raise
2848 # Retry after some time, in case the old socket was lingering
2849 # (sporadic failure on buildbots)
2850 time.sleep(1.0)
2851 manager = QueueManager(
2852 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002853 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002854
Benjamin Petersone711caf2008-06-11 16:44:04 +00002855#
2856#
2857#
2858
2859SENTINEL = latin('')
2860
2861class _TestConnection(BaseTestCase):
2862
2863 ALLOWED_TYPES = ('processes', 'threads')
2864
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002865 @classmethod
2866 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002867 for msg in iter(conn.recv_bytes, SENTINEL):
2868 conn.send_bytes(msg)
2869 conn.close()
2870
2871 def test_connection(self):
2872 conn, child_conn = self.Pipe()
2873
2874 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002875 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002876 p.start()
2877
2878 seq = [1, 2.25, None]
2879 msg = latin('hello world')
2880 longmsg = msg * 10
2881 arr = array.array('i', list(range(4)))
2882
2883 if self.TYPE == 'processes':
2884 self.assertEqual(type(conn.fileno()), int)
2885
2886 self.assertEqual(conn.send(seq), None)
2887 self.assertEqual(conn.recv(), seq)
2888
2889 self.assertEqual(conn.send_bytes(msg), None)
2890 self.assertEqual(conn.recv_bytes(), msg)
2891
2892 if self.TYPE == 'processes':
2893 buffer = array.array('i', [0]*10)
2894 expected = list(arr) + [0] * (10 - len(arr))
2895 self.assertEqual(conn.send_bytes(arr), None)
2896 self.assertEqual(conn.recv_bytes_into(buffer),
2897 len(arr) * buffer.itemsize)
2898 self.assertEqual(list(buffer), expected)
2899
2900 buffer = array.array('i', [0]*10)
2901 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2902 self.assertEqual(conn.send_bytes(arr), None)
2903 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2904 len(arr) * buffer.itemsize)
2905 self.assertEqual(list(buffer), expected)
2906
2907 buffer = bytearray(latin(' ' * 40))
2908 self.assertEqual(conn.send_bytes(longmsg), None)
2909 try:
2910 res = conn.recv_bytes_into(buffer)
2911 except multiprocessing.BufferTooShort as e:
2912 self.assertEqual(e.args, (longmsg,))
2913 else:
2914 self.fail('expected BufferTooShort, got %s' % res)
2915
2916 poll = TimingWrapper(conn.poll)
2917
2918 self.assertEqual(poll(), False)
2919 self.assertTimingAlmostEqual(poll.elapsed, 0)
2920
Richard Oudkerk59d54042012-05-10 16:11:12 +01002921 self.assertEqual(poll(-1), False)
2922 self.assertTimingAlmostEqual(poll.elapsed, 0)
2923
Benjamin Petersone711caf2008-06-11 16:44:04 +00002924 self.assertEqual(poll(TIMEOUT1), False)
2925 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2926
2927 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002928 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002929
2930 self.assertEqual(poll(TIMEOUT1), True)
2931 self.assertTimingAlmostEqual(poll.elapsed, 0)
2932
2933 self.assertEqual(conn.recv(), None)
2934
2935 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2936 conn.send_bytes(really_big_msg)
2937 self.assertEqual(conn.recv_bytes(), really_big_msg)
2938
2939 conn.send_bytes(SENTINEL) # tell child to quit
2940 child_conn.close()
2941
2942 if self.TYPE == 'processes':
2943 self.assertEqual(conn.readable, True)
2944 self.assertEqual(conn.writable, True)
2945 self.assertRaises(EOFError, conn.recv)
2946 self.assertRaises(EOFError, conn.recv_bytes)
2947
2948 p.join()
2949
2950 def test_duplex_false(self):
2951 reader, writer = self.Pipe(duplex=False)
2952 self.assertEqual(writer.send(1), None)
2953 self.assertEqual(reader.recv(), 1)
2954 if self.TYPE == 'processes':
2955 self.assertEqual(reader.readable, True)
2956 self.assertEqual(reader.writable, False)
2957 self.assertEqual(writer.readable, False)
2958 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002959 self.assertRaises(OSError, reader.send, 2)
2960 self.assertRaises(OSError, writer.recv)
2961 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002962
2963 def test_spawn_close(self):
2964 # We test that a pipe connection can be closed by parent
2965 # process immediately after child is spawned. On Windows this
2966 # would have sometimes failed on old versions because
2967 # child_conn would be closed before the child got a chance to
2968 # duplicate it.
2969 conn, child_conn = self.Pipe()
2970
2971 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002972 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002973 p.start()
2974 child_conn.close() # this might complete before child initializes
2975
2976 msg = latin('hello')
2977 conn.send_bytes(msg)
2978 self.assertEqual(conn.recv_bytes(), msg)
2979
2980 conn.send_bytes(SENTINEL)
2981 conn.close()
2982 p.join()
2983
2984 def test_sendbytes(self):
2985 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002986 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002987
2988 msg = latin('abcdefghijklmnopqrstuvwxyz')
2989 a, b = self.Pipe()
2990
2991 a.send_bytes(msg)
2992 self.assertEqual(b.recv_bytes(), msg)
2993
2994 a.send_bytes(msg, 5)
2995 self.assertEqual(b.recv_bytes(), msg[5:])
2996
2997 a.send_bytes(msg, 7, 8)
2998 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2999
3000 a.send_bytes(msg, 26)
3001 self.assertEqual(b.recv_bytes(), latin(''))
3002
3003 a.send_bytes(msg, 26, 0)
3004 self.assertEqual(b.recv_bytes(), latin(''))
3005
3006 self.assertRaises(ValueError, a.send_bytes, msg, 27)
3007
3008 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3009
3010 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3011
3012 self.assertRaises(ValueError, a.send_bytes, msg, -1)
3013
3014 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3015
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003016 @classmethod
3017 def _is_fd_assigned(cls, fd):
3018 try:
3019 os.fstat(fd)
3020 except OSError as e:
3021 if e.errno == errno.EBADF:
3022 return False
3023 raise
3024 else:
3025 return True
3026
3027 @classmethod
3028 def _writefd(cls, conn, data, create_dummy_fds=False):
3029 if create_dummy_fds:
3030 for i in range(0, 256):
3031 if not cls._is_fd_assigned(i):
3032 os.dup2(conn.fileno(), i)
3033 fd = reduction.recv_handle(conn)
3034 if msvcrt:
3035 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3036 os.write(fd, data)
3037 os.close(fd)
3038
Charles-François Natalibc8f0822011-09-20 20:36:51 +02003039 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003040 def test_fd_transfer(self):
3041 if self.TYPE != 'processes':
3042 self.skipTest("only makes sense with processes")
3043 conn, child_conn = self.Pipe(duplex=True)
3044
3045 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02003046 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003047 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02003048 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003049 with open(test.support.TESTFN, "wb") as f:
3050 fd = f.fileno()
3051 if msvcrt:
3052 fd = msvcrt.get_osfhandle(fd)
3053 reduction.send_handle(conn, fd, p.pid)
3054 p.join()
3055 with open(test.support.TESTFN, "rb") as f:
3056 self.assertEqual(f.read(), b"foo")
3057
Charles-François Natalibc8f0822011-09-20 20:36:51 +02003058 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003059 @unittest.skipIf(sys.platform == "win32",
3060 "test semantics don't make sense on Windows")
3061 @unittest.skipIf(MAXFD <= 256,
3062 "largest assignable fd number is too small")
3063 @unittest.skipUnless(hasattr(os, "dup2"),
3064 "test needs os.dup2()")
3065 def test_large_fd_transfer(self):
3066 # With fd > 256 (issue #11657)
3067 if self.TYPE != 'processes':
3068 self.skipTest("only makes sense with processes")
3069 conn, child_conn = self.Pipe(duplex=True)
3070
3071 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02003072 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003073 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02003074 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003075 with open(test.support.TESTFN, "wb") as f:
3076 fd = f.fileno()
3077 for newfd in range(256, MAXFD):
3078 if not self._is_fd_assigned(newfd):
3079 break
3080 else:
3081 self.fail("could not find an unassigned large file descriptor")
3082 os.dup2(fd, newfd)
3083 try:
3084 reduction.send_handle(conn, newfd, p.pid)
3085 finally:
3086 os.close(newfd)
3087 p.join()
3088 with open(test.support.TESTFN, "rb") as f:
3089 self.assertEqual(f.read(), b"bar")
3090
Jesus Cea4507e642011-09-21 03:53:25 +02003091 @classmethod
3092 def _send_data_without_fd(self, conn):
3093 os.write(conn.fileno(), b"\0")
3094
Charles-François Natalie51c8da2011-09-21 18:48:21 +02003095 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02003096 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3097 def test_missing_fd_transfer(self):
3098 # Check that exception is raised when received data is not
3099 # accompanied by a file descriptor in ancillary data.
3100 if self.TYPE != 'processes':
3101 self.skipTest("only makes sense with processes")
3102 conn, child_conn = self.Pipe(duplex=True)
3103
3104 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3105 p.daemon = True
3106 p.start()
3107 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3108 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003109
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003110 def test_context(self):
3111 a, b = self.Pipe()
3112
3113 with a, b:
3114 a.send(1729)
3115 self.assertEqual(b.recv(), 1729)
3116 if self.TYPE == 'processes':
3117 self.assertFalse(a.closed)
3118 self.assertFalse(b.closed)
3119
3120 if self.TYPE == 'processes':
3121 self.assertTrue(a.closed)
3122 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003123 self.assertRaises(OSError, a.recv)
3124 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003125
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003126class _TestListener(BaseTestCase):
3127
Richard Oudkerk91257752012-06-15 21:53:34 +01003128 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003129
3130 def test_multiple_bind(self):
3131 for family in self.connection.families:
3132 l = self.connection.Listener(family=family)
3133 self.addCleanup(l.close)
3134 self.assertRaises(OSError, self.connection.Listener,
3135 l.address, family)
3136
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003137 def test_context(self):
3138 with self.connection.Listener() as l:
3139 with self.connection.Client(l.address) as c:
3140 with l.accept() as d:
3141 c.send(1729)
3142 self.assertEqual(d.recv(), 1729)
3143
3144 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003145 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003146
Benjamin Petersone711caf2008-06-11 16:44:04 +00003147class _TestListenerClient(BaseTestCase):
3148
3149 ALLOWED_TYPES = ('processes', 'threads')
3150
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003151 @classmethod
3152 def _test(cls, address):
3153 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003154 conn.send('hello')
3155 conn.close()
3156
3157 def test_listener_client(self):
3158 for family in self.connection.families:
3159 l = self.connection.Listener(family=family)
3160 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00003161 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003162 p.start()
3163 conn = l.accept()
3164 self.assertEqual(conn.recv(), 'hello')
3165 p.join()
3166 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003167
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01003168 def test_issue14725(self):
3169 l = self.connection.Listener()
3170 p = self.Process(target=self._test, args=(l.address,))
3171 p.daemon = True
3172 p.start()
3173 time.sleep(1)
3174 # On Windows the client process should by now have connected,
3175 # written data and closed the pipe handle by now. This causes
3176 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
3177 # 14725.
3178 conn = l.accept()
3179 self.assertEqual(conn.recv(), 'hello')
3180 conn.close()
3181 p.join()
3182 l.close()
3183
Richard Oudkerked9e06c2013-01-13 22:46:48 +00003184 def test_issue16955(self):
3185 for fam in self.connection.families:
3186 l = self.connection.Listener(family=fam)
3187 c = self.connection.Client(l.address)
3188 a = l.accept()
3189 a.send_bytes(b"hello")
3190 self.assertTrue(c.poll(1))
3191 a.close()
3192 c.close()
3193 l.close()
3194
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003195class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003196
3197 ALLOWED_TYPES = ('processes', 'threads')
3198
3199 def test_empty_string(self):
3200 a, b = self.Pipe()
3201 self.assertEqual(a.poll(), False)
3202 b.send_bytes(b'')
3203 self.assertEqual(a.poll(), True)
3204 self.assertEqual(a.poll(), True)
3205
3206 @classmethod
3207 def _child_strings(cls, conn, strings):
3208 for s in strings:
3209 time.sleep(0.1)
3210 conn.send_bytes(s)
3211 conn.close()
3212
3213 def test_strings(self):
3214 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3215 a, b = self.Pipe()
3216 p = self.Process(target=self._child_strings, args=(b, strings))
3217 p.start()
3218
3219 for s in strings:
3220 for i in range(200):
3221 if a.poll(0.01):
3222 break
3223 x = a.recv_bytes()
3224 self.assertEqual(s, x)
3225
3226 p.join()
3227
3228 @classmethod
3229 def _child_boundaries(cls, r):
3230 # Polling may "pull" a message in to the child process, but we
3231 # don't want it to pull only part of a message, as that would
3232 # corrupt the pipe for any other processes which might later
3233 # read from it.
3234 r.poll(5)
3235
3236 def test_boundaries(self):
3237 r, w = self.Pipe(False)
3238 p = self.Process(target=self._child_boundaries, args=(r,))
3239 p.start()
3240 time.sleep(2)
3241 L = [b"first", b"second"]
3242 for obj in L:
3243 w.send_bytes(obj)
3244 w.close()
3245 p.join()
3246 self.assertIn(r.recv_bytes(), L)
3247
3248 @classmethod
3249 def _child_dont_merge(cls, b):
3250 b.send_bytes(b'a')
3251 b.send_bytes(b'b')
3252 b.send_bytes(b'cd')
3253
3254 def test_dont_merge(self):
3255 a, b = self.Pipe()
3256 self.assertEqual(a.poll(0.0), False)
3257 self.assertEqual(a.poll(0.1), False)
3258
3259 p = self.Process(target=self._child_dont_merge, args=(b,))
3260 p.start()
3261
3262 self.assertEqual(a.recv_bytes(), b'a')
3263 self.assertEqual(a.poll(1.0), True)
3264 self.assertEqual(a.poll(1.0), True)
3265 self.assertEqual(a.recv_bytes(), b'b')
3266 self.assertEqual(a.poll(1.0), True)
3267 self.assertEqual(a.poll(1.0), True)
3268 self.assertEqual(a.poll(0.0), True)
3269 self.assertEqual(a.recv_bytes(), b'cd')
3270
3271 p.join()
3272
Benjamin Petersone711caf2008-06-11 16:44:04 +00003273#
3274# Test of sending connection and socket objects between processes
3275#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003276
3277@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00003278class _TestPicklingConnections(BaseTestCase):
3279
3280 ALLOWED_TYPES = ('processes',)
3281
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003282 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02003283 def tearDownClass(cls):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003284 from multiprocessing import resource_sharer
Victor Stinner11f08072017-09-15 06:55:31 -07003285 resource_sharer.stop(timeout=TIMEOUT)
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02003286
3287 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003288 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003289 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003290 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003291 conn.send(l.address)
3292 new_conn = l.accept()
3293 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003294 new_conn.close()
3295 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003296
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003297 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003298 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01003299 l.listen()
Richard Oudkerk5d73c172012-05-08 22:24:47 +01003300 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003301 new_conn, addr = l.accept()
3302 conn.send(new_conn)
3303 new_conn.close()
3304 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003305
3306 conn.recv()
3307
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003308 @classmethod
3309 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003310 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003311 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003312 client.send(msg.upper())
3313 client.close()
3314
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003315 address, msg = conn.recv()
3316 client = socket.socket()
3317 client.connect(address)
3318 client.sendall(msg.upper())
3319 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003320
3321 conn.close()
3322
3323 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003324 families = self.connection.families
3325
3326 lconn, lconn0 = self.Pipe()
3327 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02003328 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003329 lp.start()
3330 lconn0.close()
3331
3332 rconn, rconn0 = self.Pipe()
3333 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003334 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003335 rp.start()
3336 rconn0.close()
3337
3338 for fam in families:
3339 msg = ('This connection uses family %s' % fam).encode('ascii')
3340 address = lconn.recv()
3341 rconn.send((address, msg))
3342 new_conn = lconn.recv()
3343 self.assertEqual(new_conn.recv(), msg.upper())
3344
3345 rconn.send(None)
3346
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003347 msg = latin('This connection uses a normal socket')
3348 address = lconn.recv()
3349 rconn.send((address, msg))
3350 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01003351 buf = []
3352 while True:
3353 s = new_conn.recv(100)
3354 if not s:
3355 break
3356 buf.append(s)
3357 buf = b''.join(buf)
3358 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003359 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003360
3361 lconn.send(None)
3362
3363 rconn.close()
3364 lconn.close()
3365
3366 lp.join()
3367 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003368
3369 @classmethod
3370 def child_access(cls, conn):
3371 w = conn.recv()
3372 w.send('all is well')
3373 w.close()
3374
3375 r = conn.recv()
3376 msg = r.recv()
3377 conn.send(msg*2)
3378
3379 conn.close()
3380
3381 def test_access(self):
3382 # On Windows, if we do not specify a destination pid when
3383 # using DupHandle then we need to be careful to use the
3384 # correct access flags for DuplicateHandle(), or else
3385 # DupHandle.detach() will raise PermissionError. For example,
3386 # for a read only pipe handle we should use
3387 # access=FILE_GENERIC_READ. (Unfortunately
3388 # DUPLICATE_SAME_ACCESS does not work.)
3389 conn, child_conn = self.Pipe()
3390 p = self.Process(target=self.child_access, args=(child_conn,))
3391 p.daemon = True
3392 p.start()
3393 child_conn.close()
3394
3395 r, w = self.Pipe(duplex=False)
3396 conn.send(w)
3397 w.close()
3398 self.assertEqual(r.recv(), 'all is well')
3399 r.close()
3400
3401 r, w = self.Pipe(duplex=False)
3402 conn.send(r)
3403 r.close()
3404 w.send('foobar')
3405 w.close()
3406 self.assertEqual(conn.recv(), 'foobar'*2)
3407
Victor Stinnerb4c52962017-07-25 02:40:55 +02003408 p.join()
3409
Benjamin Petersone711caf2008-06-11 16:44:04 +00003410#
3411#
3412#
3413
3414class _TestHeap(BaseTestCase):
3415
3416 ALLOWED_TYPES = ('processes',)
3417
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003418 def setUp(self):
3419 super().setUp()
3420 # Make pristine heap for these tests
3421 self.old_heap = multiprocessing.heap.BufferWrapper._heap
3422 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3423
3424 def tearDown(self):
3425 multiprocessing.heap.BufferWrapper._heap = self.old_heap
3426 super().tearDown()
3427
Benjamin Petersone711caf2008-06-11 16:44:04 +00003428 def test_heap(self):
3429 iterations = 5000
3430 maxblocks = 50
3431 blocks = []
3432
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003433 # get the heap object
3434 heap = multiprocessing.heap.BufferWrapper._heap
3435 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3436
Benjamin Petersone711caf2008-06-11 16:44:04 +00003437 # create and destroy lots of blocks of different sizes
3438 for i in range(iterations):
3439 size = int(random.lognormvariate(0, 1) * 1000)
3440 b = multiprocessing.heap.BufferWrapper(size)
3441 blocks.append(b)
3442 if len(blocks) > maxblocks:
3443 i = random.randrange(maxblocks)
3444 del blocks[i]
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003445 del b
Benjamin Petersone711caf2008-06-11 16:44:04 +00003446
3447 # verify the state of the heap
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003448 with heap._lock:
3449 all = []
3450 free = 0
3451 occupied = 0
3452 for L in list(heap._len_to_seq.values()):
3453 # count all free blocks in arenas
3454 for arena, start, stop in L:
3455 all.append((heap._arenas.index(arena), start, stop,
3456 stop-start, 'free'))
3457 free += (stop-start)
3458 for arena, arena_blocks in heap._allocated_blocks.items():
3459 # count all allocated blocks in arenas
3460 for start, stop in arena_blocks:
3461 all.append((heap._arenas.index(arena), start, stop,
3462 stop-start, 'occupied'))
3463 occupied += (stop-start)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003464
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003465 self.assertEqual(free + occupied,
3466 sum(arena.size for arena in heap._arenas))
Benjamin Petersone711caf2008-06-11 16:44:04 +00003467
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003468 all.sort()
3469
3470 for i in range(len(all)-1):
3471 (arena, start, stop) = all[i][:3]
3472 (narena, nstart, nstop) = all[i+1][:3]
3473 if arena != narena:
3474 # Two different arenas
3475 self.assertEqual(stop, heap._arenas[arena].size) # last block
3476 self.assertEqual(nstart, 0) # first block
3477 else:
3478 # Same arena: two adjacent blocks
3479 self.assertEqual(stop, nstart)
3480
3481 # test free'ing all blocks
3482 random.shuffle(blocks)
3483 while blocks:
3484 blocks.pop()
3485
3486 self.assertEqual(heap._n_frees, heap._n_mallocs)
3487 self.assertEqual(len(heap._pending_free_blocks), 0)
3488 self.assertEqual(len(heap._arenas), 0)
3489 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3490 self.assertEqual(len(heap._len_to_seq), 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003491
Charles-François Natali778db492011-07-02 14:35:49 +02003492 def test_free_from_gc(self):
3493 # Check that freeing of blocks by the garbage collector doesn't deadlock
3494 # (issue #12352).
3495 # Make sure the GC is enabled, and set lower collection thresholds to
3496 # make collections more frequent (and increase the probability of
3497 # deadlock).
3498 if not gc.isenabled():
3499 gc.enable()
3500 self.addCleanup(gc.disable)
3501 thresholds = gc.get_threshold()
3502 self.addCleanup(gc.set_threshold, *thresholds)
3503 gc.set_threshold(10)
3504
3505 # perform numerous block allocations, with cyclic references to make
3506 # sure objects are collected asynchronously by the gc
3507 for i in range(5000):
3508 a = multiprocessing.heap.BufferWrapper(1)
3509 b = multiprocessing.heap.BufferWrapper(1)
3510 # circular references
3511 a.buddy = b
3512 b.buddy = a
3513
Benjamin Petersone711caf2008-06-11 16:44:04 +00003514#
3515#
3516#
3517
Benjamin Petersone711caf2008-06-11 16:44:04 +00003518class _Foo(Structure):
3519 _fields_ = [
3520 ('x', c_int),
Gareth Rees3913bad2017-07-21 11:35:33 +01003521 ('y', c_double),
3522 ('z', c_longlong,)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003523 ]
3524
3525class _TestSharedCTypes(BaseTestCase):
3526
3527 ALLOWED_TYPES = ('processes',)
3528
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00003529 def setUp(self):
3530 if not HAS_SHAREDCTYPES:
3531 self.skipTest("requires multiprocessing.sharedctypes")
3532
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003533 @classmethod
Gareth Rees3913bad2017-07-21 11:35:33 +01003534 def _double(cls, x, y, z, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003535 x.value *= 2
3536 y.value *= 2
Gareth Rees3913bad2017-07-21 11:35:33 +01003537 z.value *= 2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003538 foo.x *= 2
3539 foo.y *= 2
3540 string.value *= 2
3541 for i in range(len(arr)):
3542 arr[i] *= 2
3543
3544 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003545 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003546 y = Value(c_double, 1.0/3.0, lock=lock)
Gareth Rees3913bad2017-07-21 11:35:33 +01003547 z = Value(c_longlong, 2 ** 33, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003548 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003549 arr = self.Array('d', list(range(10)), lock=lock)
3550 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00003551 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00003552
Gareth Rees3913bad2017-07-21 11:35:33 +01003553 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02003554 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003555 p.start()
3556 p.join()
3557
3558 self.assertEqual(x.value, 14)
3559 self.assertAlmostEqual(y.value, 2.0/3.0)
Gareth Rees3913bad2017-07-21 11:35:33 +01003560 self.assertEqual(z.value, 2 ** 34)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003561 self.assertEqual(foo.x, 6)
3562 self.assertAlmostEqual(foo.y, 4.0)
3563 for i in range(10):
3564 self.assertAlmostEqual(arr[i], i*2)
3565 self.assertEqual(string.value, latin('hellohello'))
3566
3567 def test_synchronize(self):
3568 self.test_sharedctypes(lock=True)
3569
3570 def test_copy(self):
Gareth Rees3913bad2017-07-21 11:35:33 +01003571 foo = _Foo(2, 5.0, 2 ** 33)
Brian Curtinafa88b52010-10-07 01:12:19 +00003572 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003573 foo.x = 0
3574 foo.y = 0
Gareth Rees3913bad2017-07-21 11:35:33 +01003575 foo.z = 0
Benjamin Petersone711caf2008-06-11 16:44:04 +00003576 self.assertEqual(bar.x, 2)
3577 self.assertAlmostEqual(bar.y, 5.0)
Gareth Rees3913bad2017-07-21 11:35:33 +01003578 self.assertEqual(bar.z, 2 ** 33)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003579
3580#
3581#
3582#
3583
3584class _TestFinalize(BaseTestCase):
3585
3586 ALLOWED_TYPES = ('processes',)
3587
Antoine Pitrou1eb6c002017-06-13 17:10:39 +02003588 def setUp(self):
3589 self.registry_backup = util._finalizer_registry.copy()
3590 util._finalizer_registry.clear()
3591
3592 def tearDown(self):
3593 self.assertFalse(util._finalizer_registry)
3594 util._finalizer_registry.update(self.registry_backup)
3595
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003596 @classmethod
3597 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003598 class Foo(object):
3599 pass
3600
3601 a = Foo()
3602 util.Finalize(a, conn.send, args=('a',))
3603 del a # triggers callback for a
3604
3605 b = Foo()
3606 close_b = util.Finalize(b, conn.send, args=('b',))
3607 close_b() # triggers callback for b
3608 close_b() # does nothing because callback has already been called
3609 del b # does nothing because callback has already been called
3610
3611 c = Foo()
3612 util.Finalize(c, conn.send, args=('c',))
3613
3614 d10 = Foo()
3615 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
3616
3617 d01 = Foo()
3618 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
3619 d02 = Foo()
3620 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
3621 d03 = Foo()
3622 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
3623
3624 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
3625
3626 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
3627
Ezio Melotti13925002011-03-16 11:05:33 +02003628 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00003629 # garbage collecting locals
3630 util._exit_function()
3631 conn.close()
3632 os._exit(0)
3633
3634 def test_finalize(self):
3635 conn, child_conn = self.Pipe()
3636
3637 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003638 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003639 p.start()
3640 p.join()
3641
3642 result = [obj for obj in iter(conn.recv, 'STOP')]
3643 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
3644
Antoine Pitrou1eb6c002017-06-13 17:10:39 +02003645 def test_thread_safety(self):
3646 # bpo-24484: _run_finalizers() should be thread-safe
3647 def cb():
3648 pass
3649
3650 class Foo(object):
3651 def __init__(self):
3652 self.ref = self # create reference cycle
3653 # insert finalizer at random key
3654 util.Finalize(self, cb, exitpriority=random.randint(1, 100))
3655
3656 finish = False
3657 exc = None
3658
3659 def run_finalizers():
3660 nonlocal exc
3661 while not finish:
3662 time.sleep(random.random() * 1e-1)
3663 try:
3664 # A GC run will eventually happen during this,
3665 # collecting stale Foo's and mutating the registry
3666 util._run_finalizers()
3667 except Exception as e:
3668 exc = e
3669
3670 def make_finalizers():
3671 nonlocal exc
3672 d = {}
3673 while not finish:
3674 try:
3675 # Old Foo's get gradually replaced and later
3676 # collected by the GC (because of the cyclic ref)
3677 d[random.getrandbits(5)] = {Foo() for i in range(10)}
3678 except Exception as e:
3679 exc = e
3680 d.clear()
3681
3682 old_interval = sys.getswitchinterval()
3683 old_threshold = gc.get_threshold()
3684 try:
3685 sys.setswitchinterval(1e-6)
3686 gc.set_threshold(5, 5, 5)
3687 threads = [threading.Thread(target=run_finalizers),
3688 threading.Thread(target=make_finalizers)]
3689 with test.support.start_threads(threads):
3690 time.sleep(4.0) # Wait a bit to trigger race condition
3691 finish = True
3692 if exc is not None:
3693 raise exc
3694 finally:
3695 sys.setswitchinterval(old_interval)
3696 gc.set_threshold(*old_threshold)
3697 gc.collect() # Collect remaining Foo's
3698
3699
Benjamin Petersone711caf2008-06-11 16:44:04 +00003700#
3701# Test that from ... import * works for each module
3702#
3703
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003704class _TestImportStar(unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003705
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003706 def get_module_names(self):
3707 import glob
3708 folder = os.path.dirname(multiprocessing.__file__)
3709 pattern = os.path.join(folder, '*.py')
3710 files = glob.glob(pattern)
3711 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
3712 modules = ['multiprocessing.' + m for m in modules]
3713 modules.remove('multiprocessing.__init__')
3714 modules.append('multiprocessing')
3715 return modules
Benjamin Petersone711caf2008-06-11 16:44:04 +00003716
3717 def test_import(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003718 modules = self.get_module_names()
3719 if sys.platform == 'win32':
3720 modules.remove('multiprocessing.popen_fork')
3721 modules.remove('multiprocessing.popen_forkserver')
3722 modules.remove('multiprocessing.popen_spawn_posix')
3723 else:
3724 modules.remove('multiprocessing.popen_spawn_win32')
3725 if not HAS_REDUCTION:
3726 modules.remove('multiprocessing.popen_forkserver')
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003727
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003728 if c_int is None:
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003729 # This module requires _ctypes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003730 modules.remove('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00003731
3732 for name in modules:
3733 __import__(name)
3734 mod = sys.modules[name]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003735 self.assertTrue(hasattr(mod, '__all__'), name)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003736
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003737 for attr in mod.__all__:
Benjamin Petersone711caf2008-06-11 16:44:04 +00003738 self.assertTrue(
3739 hasattr(mod, attr),
3740 '%r does not have attribute %r' % (mod, attr)
3741 )
3742
3743#
3744# Quick test that logging works -- does not test logging output
3745#
3746
3747class _TestLogging(BaseTestCase):
3748
3749 ALLOWED_TYPES = ('processes',)
3750
3751 def test_enable_logging(self):
3752 logger = multiprocessing.get_logger()
3753 logger.setLevel(util.SUBWARNING)
3754 self.assertTrue(logger is not None)
3755 logger.debug('this will not be printed')
3756 logger.info('nor will this')
3757 logger.setLevel(LOG_LEVEL)
3758
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003759 @classmethod
3760 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003761 logger = multiprocessing.get_logger()
3762 conn.send(logger.getEffectiveLevel())
3763
3764 def test_level(self):
3765 LEVEL1 = 32
3766 LEVEL2 = 37
3767
3768 logger = multiprocessing.get_logger()
3769 root_logger = logging.getLogger()
3770 root_level = root_logger.level
3771
3772 reader, writer = multiprocessing.Pipe(duplex=False)
3773
3774 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02003775 p = self.Process(target=self._test_level, args=(writer,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003776 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003777 self.assertEqual(LEVEL1, reader.recv())
Victor Stinner06634952017-07-24 13:02:20 +02003778 p.join()
3779 p.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003780
3781 logger.setLevel(logging.NOTSET)
3782 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02003783 p = self.Process(target=self._test_level, args=(writer,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003784 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003785 self.assertEqual(LEVEL2, reader.recv())
Victor Stinner06634952017-07-24 13:02:20 +02003786 p.join()
3787 p.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003788
3789 root_logger.setLevel(root_level)
3790 logger.setLevel(level=LOG_LEVEL)
3791
Jesse Nollerb9a49b72009-11-21 18:09:38 +00003792
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00003793# class _TestLoggingProcessName(BaseTestCase):
3794#
3795# def handle(self, record):
3796# assert record.processName == multiprocessing.current_process().name
3797# self.__handled = True
3798#
3799# def test_logging(self):
3800# handler = logging.Handler()
3801# handler.handle = self.handle
3802# self.__handled = False
3803# # Bypass getLogger() and side-effects
3804# logger = logging.getLoggerClass()(
3805# 'multiprocessing.test.TestLoggingProcessName')
3806# logger.addHandler(handler)
3807# logger.propagate = False
3808#
3809# logger.warn('foo')
3810# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00003811
Benjamin Petersone711caf2008-06-11 16:44:04 +00003812#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003813# Check that Process.join() retries if os.waitpid() fails with EINTR
3814#
3815
3816class _TestPollEintr(BaseTestCase):
3817
3818 ALLOWED_TYPES = ('processes',)
3819
3820 @classmethod
3821 def _killer(cls, pid):
Richard Oudkerk6a53af82013-08-28 13:50:19 +01003822 time.sleep(0.1)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003823 os.kill(pid, signal.SIGUSR1)
3824
3825 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3826 def test_poll_eintr(self):
3827 got_signal = [False]
3828 def record(*args):
3829 got_signal[0] = True
3830 pid = os.getpid()
3831 oldhandler = signal.signal(signal.SIGUSR1, record)
3832 try:
3833 killer = self.Process(target=self._killer, args=(pid,))
3834 killer.start()
Richard Oudkerk6a53af82013-08-28 13:50:19 +01003835 try:
3836 p = self.Process(target=time.sleep, args=(2,))
3837 p.start()
3838 p.join()
3839 finally:
3840 killer.join()
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003841 self.assertTrue(got_signal[0])
3842 self.assertEqual(p.exitcode, 0)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003843 finally:
3844 signal.signal(signal.SIGUSR1, oldhandler)
3845
3846#
Jesse Noller6214edd2009-01-19 16:23:53 +00003847# Test to verify handle verification, see issue 3321
3848#
3849
3850class TestInvalidHandle(unittest.TestCase):
3851
Victor Stinner937ee9e2018-06-26 02:11:06 +02003852 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00003853 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003854 conn = multiprocessing.connection.Connection(44977608)
Charles-François Natali6703bb42013-09-06 21:12:22 +02003855 # check that poll() doesn't crash
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003856 try:
Charles-François Natali6703bb42013-09-06 21:12:22 +02003857 conn.poll()
3858 except (ValueError, OSError):
3859 pass
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003860 finally:
3861 # Hack private attribute _handle to avoid printing an error
3862 # in conn.__del__
3863 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003864 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003865 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003866
Benjamin Petersone711caf2008-06-11 16:44:04 +00003867
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003868
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003869class OtherTest(unittest.TestCase):
3870 # TODO: add more tests for deliver/answer challenge.
3871 def test_deliver_challenge_auth_failure(self):
3872 class _FakeConnection(object):
3873 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003874 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003875 def send_bytes(self, data):
3876 pass
3877 self.assertRaises(multiprocessing.AuthenticationError,
3878 multiprocessing.connection.deliver_challenge,
3879 _FakeConnection(), b'abc')
3880
3881 def test_answer_challenge_auth_failure(self):
3882 class _FakeConnection(object):
3883 def __init__(self):
3884 self.count = 0
3885 def recv_bytes(self, size):
3886 self.count += 1
3887 if self.count == 1:
3888 return multiprocessing.connection.CHALLENGE
3889 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003890 return b'something bogus'
3891 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003892 def send_bytes(self, data):
3893 pass
3894 self.assertRaises(multiprocessing.AuthenticationError,
3895 multiprocessing.connection.answer_challenge,
3896 _FakeConnection(), b'abc')
3897
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003898#
3899# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3900#
3901
3902def initializer(ns):
3903 ns.test += 1
3904
3905class TestInitializers(unittest.TestCase):
3906 def setUp(self):
3907 self.mgr = multiprocessing.Manager()
3908 self.ns = self.mgr.Namespace()
3909 self.ns.test = 0
3910
3911 def tearDown(self):
3912 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003913 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003914
3915 def test_manager_initializer(self):
3916 m = multiprocessing.managers.SyncManager()
3917 self.assertRaises(TypeError, m.start, 1)
3918 m.start(initializer, (self.ns,))
3919 self.assertEqual(self.ns.test, 1)
3920 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003921 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003922
3923 def test_pool_initializer(self):
3924 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3925 p = multiprocessing.Pool(1, initializer, (self.ns,))
3926 p.close()
3927 p.join()
3928 self.assertEqual(self.ns.test, 1)
3929
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003930#
3931# Issue 5155, 5313, 5331: Test process in processes
3932# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3933#
3934
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003935def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003936 try:
3937 item = q.get(block=False)
3938 except pyqueue.Empty:
3939 pass
3940
Victor Stinnerb4c52962017-07-25 02:40:55 +02003941def _test_process():
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003942 queue = multiprocessing.Queue()
3943 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3944 subProc.daemon = True
3945 subProc.start()
3946 subProc.join()
3947
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003948def _afunc(x):
3949 return x*x
3950
3951def pool_in_process():
3952 pool = multiprocessing.Pool(processes=4)
3953 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003954 pool.close()
3955 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003956
3957class _file_like(object):
3958 def __init__(self, delegate):
3959 self._delegate = delegate
3960 self._pid = None
3961
3962 @property
3963 def cache(self):
3964 pid = os.getpid()
3965 # There are no race conditions since fork keeps only the running thread
3966 if pid != self._pid:
3967 self._pid = pid
3968 self._cache = []
3969 return self._cache
3970
3971 def write(self, data):
3972 self.cache.append(data)
3973
3974 def flush(self):
3975 self._delegate.write(''.join(self.cache))
3976 self._cache = []
3977
3978class TestStdinBadfiledescriptor(unittest.TestCase):
3979
3980 def test_queue_in_process(self):
Victor Stinnerb4c52962017-07-25 02:40:55 +02003981 proc = multiprocessing.Process(target=_test_process)
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003982 proc.start()
3983 proc.join()
3984
3985 def test_pool_in_process(self):
3986 p = multiprocessing.Process(target=pool_in_process)
3987 p.start()
3988 p.join()
3989
3990 def test_flushing(self):
3991 sio = io.StringIO()
3992 flike = _file_like(sio)
3993 flike.write('foo')
3994 proc = multiprocessing.Process(target=lambda: flike.flush())
3995 flike.flush()
3996 assert sio.getvalue() == 'foo'
3997
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003998
3999class TestWait(unittest.TestCase):
4000
4001 @classmethod
4002 def _child_test_wait(cls, w, slow):
4003 for i in range(10):
4004 if slow:
4005 time.sleep(random.random()*0.1)
4006 w.send((i, os.getpid()))
4007 w.close()
4008
4009 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004010 from multiprocessing.connection import wait
4011 readers = []
4012 procs = []
4013 messages = []
4014
4015 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01004016 r, w = multiprocessing.Pipe(duplex=False)
4017 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004018 p.daemon = True
4019 p.start()
4020 w.close()
4021 readers.append(r)
4022 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01004023 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004024
4025 while readers:
4026 for r in wait(readers):
4027 try:
4028 msg = r.recv()
4029 except EOFError:
4030 readers.remove(r)
4031 r.close()
4032 else:
4033 messages.append(msg)
4034
4035 messages.sort()
4036 expected = sorted((i, p.pid) for i in range(10) for p in procs)
4037 self.assertEqual(messages, expected)
4038
4039 @classmethod
4040 def _child_test_wait_socket(cls, address, slow):
4041 s = socket.socket()
4042 s.connect(address)
4043 for i in range(10):
4044 if slow:
4045 time.sleep(random.random()*0.1)
4046 s.sendall(('%s\n' % i).encode('ascii'))
4047 s.close()
4048
4049 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004050 from multiprocessing.connection import wait
4051 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02004052 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01004053 l.listen()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02004054 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004055 readers = []
4056 procs = []
4057 dic = {}
4058
4059 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01004060 p = multiprocessing.Process(target=self._child_test_wait_socket,
4061 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004062 p.daemon = True
4063 p.start()
4064 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01004065 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004066
4067 for i in range(4):
4068 r, _ = l.accept()
4069 readers.append(r)
4070 dic[r] = []
4071 l.close()
4072
4073 while readers:
4074 for r in wait(readers):
4075 msg = r.recv(32)
4076 if not msg:
4077 readers.remove(r)
4078 r.close()
4079 else:
4080 dic[r].append(msg)
4081
4082 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4083 for v in dic.values():
4084 self.assertEqual(b''.join(v), expected)
4085
4086 def test_wait_slow(self):
4087 self.test_wait(True)
4088
4089 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01004090 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004091
4092 def test_wait_timeout(self):
4093 from multiprocessing.connection import wait
4094
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004095 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004096 a, b = multiprocessing.Pipe()
4097
4098 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004099 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004100 delta = time.time() - start
4101
4102 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01004103 self.assertLess(delta, expected * 2)
4104 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004105
4106 b.send(None)
4107
4108 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004109 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004110 delta = time.time() - start
4111
4112 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01004113 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004114
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004115 @classmethod
4116 def signal_and_sleep(cls, sem, period):
4117 sem.release()
4118 time.sleep(period)
4119
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004120 def test_wait_integer(self):
4121 from multiprocessing.connection import wait
4122
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004123 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01004124 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004125 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004126 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004127 p = multiprocessing.Process(target=self.signal_and_sleep,
4128 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004129
4130 p.start()
4131 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004132 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004133
4134 start = time.time()
4135 res = wait([a, p.sentinel, b], expected + 20)
4136 delta = time.time() - start
4137
4138 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01004139 self.assertLess(delta, expected + 2)
4140 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004141
4142 a.send(None)
4143
4144 start = time.time()
4145 res = wait([a, p.sentinel, b], 20)
4146 delta = time.time() - start
4147
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01004148 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01004149 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004150
4151 b.send(None)
4152
4153 start = time.time()
4154 res = wait([a, p.sentinel, b], 20)
4155 delta = time.time() - start
4156
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01004157 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01004158 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004159
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004160 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004161 p.join()
4162
Richard Oudkerk59d54042012-05-10 16:11:12 +01004163 def test_neg_timeout(self):
4164 from multiprocessing.connection import wait
4165 a, b = multiprocessing.Pipe()
4166 t = time.time()
4167 res = wait([a], timeout=-1)
4168 t = time.time() - t
4169 self.assertEqual(res, [])
4170 self.assertLess(t, 1)
4171 a.close()
4172 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004173
Antoine Pitrou709176f2012-04-01 17:19:09 +02004174#
4175# Issue 14151: Test invalid family on invalid environment
4176#
4177
4178class TestInvalidFamily(unittest.TestCase):
4179
Victor Stinner937ee9e2018-06-26 02:11:06 +02004180 @unittest.skipIf(WIN32, "skipped on Windows")
Antoine Pitrou709176f2012-04-01 17:19:09 +02004181 def test_invalid_family(self):
4182 with self.assertRaises(ValueError):
4183 multiprocessing.connection.Listener(r'\\.\test')
4184
Victor Stinner937ee9e2018-06-26 02:11:06 +02004185 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02004186 def test_invalid_family_win32(self):
4187 with self.assertRaises(ValueError):
4188 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02004189
Richard Oudkerk77c84f22012-05-18 14:28:02 +01004190#
4191# Issue 12098: check sys.flags of child matches that for parent
4192#
4193
4194class TestFlags(unittest.TestCase):
4195 @classmethod
4196 def run_in_grandchild(cls, conn):
4197 conn.send(tuple(sys.flags))
4198
4199 @classmethod
4200 def run_in_child(cls):
4201 import json
4202 r, w = multiprocessing.Pipe(duplex=False)
4203 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4204 p.start()
4205 grandchild_flags = r.recv()
4206 p.join()
4207 r.close()
4208 w.close()
4209 flags = (tuple(sys.flags), grandchild_flags)
4210 print(json.dumps(flags))
4211
4212 def test_flags(self):
4213 import json, subprocess
4214 # start child process using unusual flags
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004215 prog = ('from test._test_multiprocessing import TestFlags; ' +
Richard Oudkerk77c84f22012-05-18 14:28:02 +01004216 'TestFlags.run_in_child()')
4217 data = subprocess.check_output(
4218 [sys.executable, '-E', '-S', '-O', '-c', prog])
4219 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4220 self.assertEqual(child_flags, grandchild_flags)
4221
Richard Oudkerkb15e6222012-07-27 14:19:00 +01004222#
4223# Test interaction with socket timeouts - see Issue #6056
4224#
4225
4226class TestTimeouts(unittest.TestCase):
4227 @classmethod
4228 def _test_timeout(cls, child, address):
4229 time.sleep(1)
4230 child.send(123)
4231 child.close()
4232 conn = multiprocessing.connection.Client(address)
4233 conn.send(456)
4234 conn.close()
4235
4236 def test_timeout(self):
4237 old_timeout = socket.getdefaulttimeout()
4238 try:
4239 socket.setdefaulttimeout(0.1)
4240 parent, child = multiprocessing.Pipe(duplex=True)
4241 l = multiprocessing.connection.Listener(family='AF_INET')
4242 p = multiprocessing.Process(target=self._test_timeout,
4243 args=(child, l.address))
4244 p.start()
4245 child.close()
4246 self.assertEqual(parent.recv(), 123)
4247 parent.close()
4248 conn = l.accept()
4249 self.assertEqual(conn.recv(), 456)
4250 conn.close()
4251 l.close()
Victor Stinner11f08072017-09-15 06:55:31 -07004252 join_process(p)
Richard Oudkerkb15e6222012-07-27 14:19:00 +01004253 finally:
4254 socket.setdefaulttimeout(old_timeout)
4255
Richard Oudkerke88a2442012-08-14 11:41:32 +01004256#
4257# Test what happens with no "if __name__ == '__main__'"
4258#
4259
4260class TestNoForkBomb(unittest.TestCase):
4261 def test_noforkbomb(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004262 sm = multiprocessing.get_start_method()
Richard Oudkerke88a2442012-08-14 11:41:32 +01004263 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004264 if sm != 'fork':
Berker Peksag076dbd02015-05-06 07:01:52 +03004265 rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02004266 self.assertEqual(out, b'')
4267 self.assertIn(b'RuntimeError', err)
Richard Oudkerke88a2442012-08-14 11:41:32 +01004268 else:
Berker Peksag076dbd02015-05-06 07:01:52 +03004269 rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02004270 self.assertEqual(out.rstrip(), b'123')
4271 self.assertEqual(err, b'')
Richard Oudkerke88a2442012-08-14 11:41:32 +01004272
4273#
Richard Oudkerk409c3132013-04-17 20:58:00 +01004274# Issue #17555: ForkAwareThreadLock
4275#
4276
4277class TestForkAwareThreadLock(unittest.TestCase):
Mike53f7a7c2017-12-14 14:04:53 +03004278 # We recursively start processes. Issue #17555 meant that the
Richard Oudkerk409c3132013-04-17 20:58:00 +01004279 # after fork registry would get duplicate entries for the same
4280 # lock. The size of the registry at generation n was ~2**n.
4281
4282 @classmethod
4283 def child(cls, n, conn):
4284 if n > 1:
4285 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4286 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01004287 conn.close()
Victor Stinner11f08072017-09-15 06:55:31 -07004288 join_process(p)
Richard Oudkerk409c3132013-04-17 20:58:00 +01004289 else:
4290 conn.send(len(util._afterfork_registry))
4291 conn.close()
4292
4293 def test_lock(self):
4294 r, w = multiprocessing.Pipe(False)
4295 l = util.ForkAwareThreadLock()
4296 old_size = len(util._afterfork_registry)
4297 p = multiprocessing.Process(target=self.child, args=(5, w))
4298 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01004299 w.close()
Richard Oudkerk409c3132013-04-17 20:58:00 +01004300 new_size = r.recv()
Victor Stinner11f08072017-09-15 06:55:31 -07004301 join_process(p)
Richard Oudkerk409c3132013-04-17 20:58:00 +01004302 self.assertLessEqual(new_size, old_size)
4303
4304#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004305# Check that non-forked child processes do not inherit unneeded fds/handles
4306#
4307
4308class TestCloseFds(unittest.TestCase):
4309
4310 def get_high_socket_fd(self):
Victor Stinner937ee9e2018-06-26 02:11:06 +02004311 if WIN32:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004312 # The child process will not have any socket handles, so
4313 # calling socket.fromfd() should produce WSAENOTSOCK even
4314 # if there is a handle of the same number.
4315 return socket.socket().detach()
4316 else:
4317 # We want to produce a socket with an fd high enough that a
4318 # freshly created child process will not have any fds as high.
4319 fd = socket.socket().detach()
4320 to_close = []
4321 while fd < 50:
4322 to_close.append(fd)
4323 fd = os.dup(fd)
4324 for x in to_close:
4325 os.close(x)
4326 return fd
4327
4328 def close(self, fd):
Victor Stinner937ee9e2018-06-26 02:11:06 +02004329 if WIN32:
Christian Heimesb6e43af2018-01-29 22:37:58 +01004330 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004331 else:
4332 os.close(fd)
4333
4334 @classmethod
4335 def _test_closefds(cls, conn, fd):
4336 try:
4337 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
4338 except Exception as e:
4339 conn.send(e)
4340 else:
4341 s.close()
4342 conn.send(None)
4343
4344 def test_closefd(self):
4345 if not HAS_REDUCTION:
4346 raise unittest.SkipTest('requires fd pickling')
4347
4348 reader, writer = multiprocessing.Pipe()
4349 fd = self.get_high_socket_fd()
4350 try:
4351 p = multiprocessing.Process(target=self._test_closefds,
4352 args=(writer, fd))
4353 p.start()
4354 writer.close()
4355 e = reader.recv()
Victor Stinner11f08072017-09-15 06:55:31 -07004356 join_process(p)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004357 finally:
4358 self.close(fd)
4359 writer.close()
4360 reader.close()
4361
4362 if multiprocessing.get_start_method() == 'fork':
4363 self.assertIs(e, None)
4364 else:
4365 WSAENOTSOCK = 10038
4366 self.assertIsInstance(e, OSError)
4367 self.assertTrue(e.errno == errno.EBADF or
4368 e.winerror == WSAENOTSOCK, e)
4369
4370#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004371# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
4372#
4373
4374class TestIgnoreEINTR(unittest.TestCase):
4375
Victor Stinner252f6ab2018-06-01 16:48:34 +02004376 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
4377 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
4378
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004379 @classmethod
4380 def _test_ignore(cls, conn):
4381 def handler(signum, frame):
4382 pass
4383 signal.signal(signal.SIGUSR1, handler)
4384 conn.send('ready')
4385 x = conn.recv()
4386 conn.send(x)
Victor Stinner252f6ab2018-06-01 16:48:34 +02004387 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004388
4389 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4390 def test_ignore(self):
4391 conn, child_conn = multiprocessing.Pipe()
4392 try:
4393 p = multiprocessing.Process(target=self._test_ignore,
4394 args=(child_conn,))
4395 p.daemon = True
4396 p.start()
4397 child_conn.close()
4398 self.assertEqual(conn.recv(), 'ready')
4399 time.sleep(0.1)
4400 os.kill(p.pid, signal.SIGUSR1)
4401 time.sleep(0.1)
4402 conn.send(1234)
4403 self.assertEqual(conn.recv(), 1234)
4404 time.sleep(0.1)
4405 os.kill(p.pid, signal.SIGUSR1)
Victor Stinner252f6ab2018-06-01 16:48:34 +02004406 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004407 time.sleep(0.1)
4408 p.join()
4409 finally:
4410 conn.close()
4411
4412 @classmethod
4413 def _test_ignore_listener(cls, conn):
4414 def handler(signum, frame):
4415 pass
4416 signal.signal(signal.SIGUSR1, handler)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004417 with multiprocessing.connection.Listener() as l:
4418 conn.send(l.address)
4419 a = l.accept()
4420 a.send('welcome')
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004421
4422 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4423 def test_ignore_listener(self):
4424 conn, child_conn = multiprocessing.Pipe()
4425 try:
4426 p = multiprocessing.Process(target=self._test_ignore_listener,
4427 args=(child_conn,))
4428 p.daemon = True
4429 p.start()
4430 child_conn.close()
4431 address = conn.recv()
4432 time.sleep(0.1)
4433 os.kill(p.pid, signal.SIGUSR1)
4434 time.sleep(0.1)
4435 client = multiprocessing.connection.Client(address)
4436 self.assertEqual(client.recv(), 'welcome')
4437 p.join()
4438 finally:
4439 conn.close()
4440
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004441class TestStartMethod(unittest.TestCase):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004442 @classmethod
4443 def _check_context(cls, conn):
4444 conn.send(multiprocessing.get_start_method())
4445
4446 def check_context(self, ctx):
4447 r, w = ctx.Pipe(duplex=False)
4448 p = ctx.Process(target=self._check_context, args=(w,))
4449 p.start()
4450 w.close()
4451 child_method = r.recv()
4452 r.close()
4453 p.join()
4454 self.assertEqual(child_method, ctx.get_start_method())
4455
4456 def test_context(self):
4457 for method in ('fork', 'spawn', 'forkserver'):
4458 try:
4459 ctx = multiprocessing.get_context(method)
4460 except ValueError:
4461 continue
4462 self.assertEqual(ctx.get_start_method(), method)
4463 self.assertIs(ctx.get_context(), ctx)
4464 self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
4465 self.assertRaises(ValueError, ctx.set_start_method, None)
4466 self.check_context(ctx)
4467
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004468 def test_set_get(self):
4469 multiprocessing.set_forkserver_preload(PRELOAD)
4470 count = 0
4471 old_method = multiprocessing.get_start_method()
Jesse Nollerd00df3c2008-06-18 14:22:48 +00004472 try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004473 for method in ('fork', 'spawn', 'forkserver'):
4474 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004475 multiprocessing.set_start_method(method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004476 except ValueError:
4477 continue
4478 self.assertEqual(multiprocessing.get_start_method(), method)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004479 ctx = multiprocessing.get_context()
4480 self.assertEqual(ctx.get_start_method(), method)
4481 self.assertTrue(type(ctx).__name__.lower().startswith(method))
4482 self.assertTrue(
4483 ctx.Process.__name__.lower().startswith(method))
4484 self.check_context(multiprocessing)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004485 count += 1
4486 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004487 multiprocessing.set_start_method(old_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004488 self.assertGreaterEqual(count, 1)
4489
4490 def test_get_all(self):
4491 methods = multiprocessing.get_all_start_methods()
4492 if sys.platform == 'win32':
4493 self.assertEqual(methods, ['spawn'])
4494 else:
4495 self.assertTrue(methods == ['fork', 'spawn'] or
4496 methods == ['fork', 'spawn', 'forkserver'])
4497
Antoine Pitroucd2a2012016-12-10 17:13:16 +01004498 def test_preload_resources(self):
4499 if multiprocessing.get_start_method() != 'forkserver':
4500 self.skipTest("test only relevant for 'forkserver' method")
4501 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
4502 rc, out, err = test.support.script_helper.assert_python_ok(name)
4503 out = out.decode()
4504 err = err.decode()
4505 if out.rstrip() != 'ok' or err != '':
4506 print(out)
4507 print(err)
4508 self.fail("failed spawning forkserver or grandchild")
4509
4510
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004511@unittest.skipIf(sys.platform == "win32",
4512 "test semantics don't make sense on Windows")
4513class TestSemaphoreTracker(unittest.TestCase):
Antoine Pitroucbe17562017-11-03 14:31:38 +01004514
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004515 def test_semaphore_tracker(self):
Antoine Pitroucbe17562017-11-03 14:31:38 +01004516 #
4517 # Check that killing process does not leak named semaphores
4518 #
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004519 import subprocess
4520 cmd = '''if 1:
4521 import multiprocessing as mp, time, os
4522 mp.set_start_method("spawn")
4523 lock1 = mp.Lock()
4524 lock2 = mp.Lock()
4525 os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
4526 os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
4527 time.sleep(10)
4528 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004529 r, w = os.pipe()
4530 p = subprocess.Popen([sys.executable,
Victor Stinner9402c832017-12-15 16:29:24 +01004531 '-E', '-c', cmd % (w, w)],
Richard Oudkerk67e51982013-08-22 23:37:23 +01004532 pass_fds=[w],
4533 stderr=subprocess.PIPE)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004534 os.close(w)
4535 with open(r, 'rb', closefd=True) as f:
4536 name1 = f.readline().rstrip().decode('ascii')
4537 name2 = f.readline().rstrip().decode('ascii')
4538 _multiprocessing.sem_unlink(name1)
4539 p.terminate()
4540 p.wait()
Richard Oudkerk42a526c2014-02-21 22:29:58 +00004541 time.sleep(2.0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004542 with self.assertRaises(OSError) as ctx:
4543 _multiprocessing.sem_unlink(name2)
4544 # docs say it should be ENOENT, but OSX seems to give EINVAL
4545 self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
Richard Oudkerk67e51982013-08-22 23:37:23 +01004546 err = p.stderr.read().decode('utf-8')
4547 p.stderr.close()
4548 expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
4549 self.assertRegex(err, expected)
R David Murray44b548d2016-09-08 13:59:53 -04004550 self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004551
Antoine Pitroucbe17562017-11-03 14:31:38 +01004552 def check_semaphore_tracker_death(self, signum, should_die):
4553 # bpo-31310: if the semaphore tracker process has died, it should
4554 # be restarted implicitly.
4555 from multiprocessing.semaphore_tracker import _semaphore_tracker
Antoine Pitroucbe17562017-11-03 14:31:38 +01004556 pid = _semaphore_tracker._pid
Pablo Galindoec74d182018-09-04 09:53:54 +01004557 if pid is not None:
4558 os.kill(pid, signal.SIGKILL)
4559 os.waitpid(pid, 0)
Pablo Galindo3058b7d2018-10-10 08:40:14 +01004560 with warnings.catch_warnings():
4561 warnings.simplefilter("ignore")
Pablo Galindoec74d182018-09-04 09:53:54 +01004562 _semaphore_tracker.ensure_running()
4563 pid = _semaphore_tracker._pid
4564
Antoine Pitroucbe17562017-11-03 14:31:38 +01004565 os.kill(pid, signum)
4566 time.sleep(1.0) # give it time to die
4567
4568 ctx = multiprocessing.get_context("spawn")
Pablo Galindoec74d182018-09-04 09:53:54 +01004569 with warnings.catch_warnings(record=True) as all_warn:
Pablo Galindo3058b7d2018-10-10 08:40:14 +01004570 warnings.simplefilter("always")
Antoine Pitroucbe17562017-11-03 14:31:38 +01004571 sem = ctx.Semaphore()
4572 sem.acquire()
4573 sem.release()
4574 wr = weakref.ref(sem)
4575 # ensure `sem` gets collected, which triggers communication with
4576 # the semaphore tracker
4577 del sem
4578 gc.collect()
4579 self.assertIsNone(wr())
Pablo Galindoec74d182018-09-04 09:53:54 +01004580 if should_die:
4581 self.assertEqual(len(all_warn), 1)
4582 the_warn = all_warn[0]
Pablo Galindo3058b7d2018-10-10 08:40:14 +01004583 self.assertTrue(issubclass(the_warn.category, UserWarning))
Pablo Galindoec74d182018-09-04 09:53:54 +01004584 self.assertTrue("semaphore_tracker: process died"
4585 in str(the_warn.message))
4586 else:
4587 self.assertEqual(len(all_warn), 0)
Antoine Pitroucbe17562017-11-03 14:31:38 +01004588
4589 def test_semaphore_tracker_sigint(self):
4590 # Catchable signal (ignored by semaphore tracker)
4591 self.check_semaphore_tracker_death(signal.SIGINT, False)
4592
Pablo Galindoec74d182018-09-04 09:53:54 +01004593 def test_semaphore_tracker_sigterm(self):
4594 # Catchable signal (ignored by semaphore tracker)
4595 self.check_semaphore_tracker_death(signal.SIGTERM, False)
4596
Antoine Pitroucbe17562017-11-03 14:31:38 +01004597 def test_semaphore_tracker_sigkill(self):
4598 # Uncatchable signal.
4599 self.check_semaphore_tracker_death(signal.SIGKILL, True)
4600
4601
Xiang Zhang6f75bc02017-05-17 21:04:00 +08004602class TestSimpleQueue(unittest.TestCase):
4603
4604 @classmethod
4605 def _test_empty(cls, queue, child_can_start, parent_can_continue):
4606 child_can_start.wait()
4607 # issue 30301, could fail under spawn and forkserver
4608 try:
4609 queue.put(queue.empty())
4610 queue.put(queue.empty())
4611 finally:
4612 parent_can_continue.set()
4613
4614 def test_empty(self):
4615 queue = multiprocessing.SimpleQueue()
4616 child_can_start = multiprocessing.Event()
4617 parent_can_continue = multiprocessing.Event()
4618
4619 proc = multiprocessing.Process(
4620 target=self._test_empty,
4621 args=(queue, child_can_start, parent_can_continue)
4622 )
4623 proc.daemon = True
4624 proc.start()
4625
4626 self.assertTrue(queue.empty())
4627
4628 child_can_start.set()
4629 parent_can_continue.wait()
4630
4631 self.assertFalse(queue.empty())
4632 self.assertEqual(queue.get(), True)
4633 self.assertEqual(queue.get(), False)
4634 self.assertTrue(queue.empty())
4635
4636 proc.join()
4637
Derek B. Kimc40278e2018-07-11 19:22:28 +09004638
Julien Palard5d236ca2018-11-04 23:40:32 +01004639class TestPoolNotLeakOnFailure(unittest.TestCase):
4640
4641 def test_release_unused_processes(self):
4642 # Issue #19675: During pool creation, if we can't create a process,
4643 # don't leak already created ones.
4644 will_fail_in = 3
4645 forked_processes = []
4646
4647 class FailingForkProcess:
4648 def __init__(self, **kwargs):
4649 self.name = 'Fake Process'
4650 self.exitcode = None
4651 self.state = None
4652 forked_processes.append(self)
4653
4654 def start(self):
4655 nonlocal will_fail_in
4656 if will_fail_in <= 0:
4657 raise OSError("Manually induced OSError")
4658 will_fail_in -= 1
4659 self.state = 'started'
4660
4661 def terminate(self):
4662 self.state = 'stopping'
4663
4664 def join(self):
4665 if self.state == 'stopping':
4666 self.state = 'stopped'
4667
4668 def is_alive(self):
4669 return self.state == 'started' or self.state == 'stopping'
4670
4671 with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
4672 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
4673 Process=FailingForkProcess))
4674 p.close()
4675 p.join()
4676 self.assertFalse(
4677 any(process.is_alive() for process in forked_processes))
4678
4679
4680
Derek B. Kimc40278e2018-07-11 19:22:28 +09004681class MiscTestCase(unittest.TestCase):
4682 def test__all__(self):
4683 # Just make sure names in blacklist are excluded
4684 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
4685 blacklist=['SUBDEBUG', 'SUBWARNING'])
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004686#
4687# Mixins
4688#
4689
Victor Stinnerffb49402017-07-25 01:55:54 +02004690class BaseMixin(object):
4691 @classmethod
4692 def setUpClass(cls):
4693 cls.dangling = (multiprocessing.process._dangling.copy(),
4694 threading._dangling.copy())
4695
4696 @classmethod
4697 def tearDownClass(cls):
4698 # bpo-26762: Some multiprocessing objects like Pool create reference
4699 # cycles. Trigger a garbage collection to break these cycles.
4700 test.support.gc_collect()
4701
4702 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
4703 if processes:
Victor Stinner957d0e92017-08-10 17:36:50 +02004704 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004705 print('Warning -- Dangling processes: %s' % processes,
4706 file=sys.stderr)
4707 processes = None
4708
4709 threads = set(threading._dangling) - set(cls.dangling[1])
4710 if threads:
Victor Stinner957d0e92017-08-10 17:36:50 +02004711 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004712 print('Warning -- Dangling threads: %s' % threads,
4713 file=sys.stderr)
4714 threads = None
4715
4716
4717class ProcessesMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004718 TYPE = 'processes'
4719 Process = multiprocessing.Process
4720 connection = multiprocessing.connection
4721 current_process = staticmethod(multiprocessing.current_process)
4722 active_children = staticmethod(multiprocessing.active_children)
4723 Pool = staticmethod(multiprocessing.Pool)
4724 Pipe = staticmethod(multiprocessing.Pipe)
4725 Queue = staticmethod(multiprocessing.Queue)
4726 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
4727 Lock = staticmethod(multiprocessing.Lock)
4728 RLock = staticmethod(multiprocessing.RLock)
4729 Semaphore = staticmethod(multiprocessing.Semaphore)
4730 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
4731 Condition = staticmethod(multiprocessing.Condition)
4732 Event = staticmethod(multiprocessing.Event)
4733 Barrier = staticmethod(multiprocessing.Barrier)
4734 Value = staticmethod(multiprocessing.Value)
4735 Array = staticmethod(multiprocessing.Array)
4736 RawValue = staticmethod(multiprocessing.RawValue)
4737 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00004738
Benjamin Petersone711caf2008-06-11 16:44:04 +00004739
Victor Stinnerffb49402017-07-25 01:55:54 +02004740class ManagerMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004741 TYPE = 'manager'
4742 Process = multiprocessing.Process
4743 Queue = property(operator.attrgetter('manager.Queue'))
4744 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
4745 Lock = property(operator.attrgetter('manager.Lock'))
4746 RLock = property(operator.attrgetter('manager.RLock'))
4747 Semaphore = property(operator.attrgetter('manager.Semaphore'))
4748 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
4749 Condition = property(operator.attrgetter('manager.Condition'))
4750 Event = property(operator.attrgetter('manager.Event'))
4751 Barrier = property(operator.attrgetter('manager.Barrier'))
4752 Value = property(operator.attrgetter('manager.Value'))
4753 Array = property(operator.attrgetter('manager.Array'))
4754 list = property(operator.attrgetter('manager.list'))
4755 dict = property(operator.attrgetter('manager.dict'))
4756 Namespace = property(operator.attrgetter('manager.Namespace'))
4757
4758 @classmethod
4759 def Pool(cls, *args, **kwds):
4760 return cls.manager.Pool(*args, **kwds)
4761
4762 @classmethod
4763 def setUpClass(cls):
Victor Stinnerffb49402017-07-25 01:55:54 +02004764 super().setUpClass()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004765 cls.manager = multiprocessing.Manager()
4766
4767 @classmethod
4768 def tearDownClass(cls):
4769 # only the manager process should be returned by active_children()
4770 # but this can take a bit on slow machines, so wait a few seconds
4771 # if there are other children too (see #17395)
Victor Stinnerffb49402017-07-25 01:55:54 +02004772 start_time = time.monotonic()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004773 t = 0.01
Victor Stinnerffb49402017-07-25 01:55:54 +02004774 while len(multiprocessing.active_children()) > 1:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004775 time.sleep(t)
4776 t *= 2
Victor Stinnerffb49402017-07-25 01:55:54 +02004777 dt = time.monotonic() - start_time
4778 if dt >= 5.0:
Victor Stinner957d0e92017-08-10 17:36:50 +02004779 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004780 print("Warning -- multiprocessing.Manager still has %s active "
4781 "children after %s seconds"
4782 % (multiprocessing.active_children(), dt),
4783 file=sys.stderr)
4784 break
4785
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004786 gc.collect() # do garbage collection
4787 if cls.manager._number_of_objects() != 0:
4788 # This is not really an error since some tests do not
4789 # ensure that all processes which hold a reference to a
4790 # managed object have been joined.
Victor Stinner957d0e92017-08-10 17:36:50 +02004791 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004792 print('Warning -- Shared objects which still exist at manager '
4793 'shutdown:')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004794 print(cls.manager._debug_info())
4795 cls.manager.shutdown()
4796 cls.manager.join()
4797 cls.manager = None
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01004798
Victor Stinnerffb49402017-07-25 01:55:54 +02004799 super().tearDownClass()
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01004800
Victor Stinnerffb49402017-07-25 01:55:54 +02004801
4802class ThreadsMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004803 TYPE = 'threads'
4804 Process = multiprocessing.dummy.Process
4805 connection = multiprocessing.dummy.connection
4806 current_process = staticmethod(multiprocessing.dummy.current_process)
4807 active_children = staticmethod(multiprocessing.dummy.active_children)
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01004808 Pool = staticmethod(multiprocessing.dummy.Pool)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004809 Pipe = staticmethod(multiprocessing.dummy.Pipe)
4810 Queue = staticmethod(multiprocessing.dummy.Queue)
4811 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
4812 Lock = staticmethod(multiprocessing.dummy.Lock)
4813 RLock = staticmethod(multiprocessing.dummy.RLock)
4814 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
4815 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
4816 Condition = staticmethod(multiprocessing.dummy.Condition)
4817 Event = staticmethod(multiprocessing.dummy.Event)
4818 Barrier = staticmethod(multiprocessing.dummy.Barrier)
4819 Value = staticmethod(multiprocessing.dummy.Value)
4820 Array = staticmethod(multiprocessing.dummy.Array)
4821
4822#
4823# Functions used to create test cases from the base ones in this module
4824#
4825
4826def install_tests_in_module_dict(remote_globs, start_method):
4827 __module__ = remote_globs['__name__']
4828 local_globs = globals()
4829 ALL_TYPES = {'processes', 'threads', 'manager'}
4830
4831 for name, base in local_globs.items():
4832 if not isinstance(base, type):
4833 continue
4834 if issubclass(base, BaseTestCase):
4835 if base is BaseTestCase:
4836 continue
4837 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
4838 for type_ in base.ALLOWED_TYPES:
4839 newname = 'With' + type_.capitalize() + name[1:]
4840 Mixin = local_globs[type_.capitalize() + 'Mixin']
4841 class Temp(base, Mixin, unittest.TestCase):
4842 pass
4843 Temp.__name__ = Temp.__qualname__ = newname
4844 Temp.__module__ = __module__
4845 remote_globs[newname] = Temp
4846 elif issubclass(base, unittest.TestCase):
4847 class Temp(base, object):
4848 pass
4849 Temp.__name__ = Temp.__qualname__ = name
4850 Temp.__module__ = __module__
4851 remote_globs[name] = Temp
4852
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004853 dangling = [None, None]
4854 old_start_method = [None]
4855
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004856 def setUpModule():
4857 multiprocessing.set_forkserver_preload(PRELOAD)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004858 multiprocessing.process._cleanup()
4859 dangling[0] = multiprocessing.process._dangling.copy()
4860 dangling[1] = threading._dangling.copy()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004861 old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004862 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004863 multiprocessing.set_start_method(start_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004864 except ValueError:
4865 raise unittest.SkipTest(start_method +
4866 ' start method not supported')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004867
4868 if sys.platform.startswith("linux"):
4869 try:
4870 lock = multiprocessing.RLock()
4871 except OSError:
4872 raise unittest.SkipTest("OSError raises on RLock creation, "
4873 "see issue 3111!")
4874 check_enough_semaphores()
4875 util.get_temp_dir() # creates temp directory
4876 multiprocessing.get_logger().setLevel(LOG_LEVEL)
4877
4878 def tearDownModule():
Victor Stinnerffb49402017-07-25 01:55:54 +02004879 need_sleep = False
4880
4881 # bpo-26762: Some multiprocessing objects like Pool create reference
4882 # cycles. Trigger a garbage collection to break these cycles.
4883 test.support.gc_collect()
4884
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004885 multiprocessing.set_start_method(old_start_method[0], force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004886 # pause a bit so we don't get warning about dangling threads/processes
Victor Stinnerffb49402017-07-25 01:55:54 +02004887 processes = set(multiprocessing.process._dangling) - set(dangling[0])
4888 if processes:
4889 need_sleep = True
Victor Stinner957d0e92017-08-10 17:36:50 +02004890 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004891 print('Warning -- Dangling processes: %s' % processes,
4892 file=sys.stderr)
4893 processes = None
4894
4895 threads = set(threading._dangling) - set(dangling[1])
4896 if threads:
4897 need_sleep = True
Victor Stinner957d0e92017-08-10 17:36:50 +02004898 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004899 print('Warning -- Dangling threads: %s' % threads,
4900 file=sys.stderr)
4901 threads = None
4902
4903 # Sleep 500 ms to give time to child processes to complete.
4904 if need_sleep:
4905 time.sleep(0.5)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004906 multiprocessing.process._cleanup()
Victor Stinnerffb49402017-07-25 01:55:54 +02004907 test.support.gc_collect()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004908
4909 remote_globs['setUpModule'] = setUpModule
4910 remote_globs['tearDownModule'] = tearDownModule