blob: b62c119e9ae0694290a410f3cee1e3316ee8bdea [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Julien Palard5d236ca2018-11-04 23:40:32 +01006import unittest.mock
Benjamin Petersone711caf2008-06-11 16:44:04 +00007import queue as pyqueue
8import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00009import io
Antoine Pitroude911b22011-12-21 11:03:24 +010010import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010020import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010021import operator
Antoine Pitrou89889452017-03-24 13:52:11 +010022import weakref
Pablo Galindoec74d182018-09-04 09:53:54 +010023import warnings
R. David Murraya21e4ca2009-03-31 23:16:50 +000024import test.support
Berker Peksag076dbd02015-05-06 07:01:52 +030025import test.support.script_helper
Victor Stinnerb9b69002017-09-14 14:40:56 -070026from test import support
Benjamin Petersone711caf2008-06-11 16:44:04 +000027
Benjamin Petersone5384b02008-10-04 22:00:42 +000028
R. David Murraya21e4ca2009-03-31 23:16:50 +000029# Skip tests if _multiprocessing wasn't built.
30_multiprocessing = test.support.import_module('_multiprocessing')
31# Skip tests if sem_open implementation is broken.
32test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000033import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000034
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.connection
Victor Stinnerd7e64d92017-07-25 00:33:56 +020036import multiprocessing.dummy
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.heap
Victor Stinnerd7e64d92017-07-25 00:33:56 +020038import multiprocessing.managers
Benjamin Petersone711caf2008-06-11 16:44:04 +000039import multiprocessing.pool
Victor Stinnerd7e64d92017-07-25 00:33:56 +020040import multiprocessing.queues
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Charles-François Natalibc8f0822011-09-20 20:36:51 +020042from multiprocessing import util
43
44try:
45 from multiprocessing import reduction
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010046 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
Charles-François Natalibc8f0822011-09-20 20:36:51 +020047except ImportError:
48 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Brian Curtinafa88b52010-10-07 01:12:19 +000050try:
51 from multiprocessing.sharedctypes import Value, copy
52 HAS_SHAREDCTYPES = True
53except ImportError:
54 HAS_SHAREDCTYPES = False
55
Antoine Pitroubcb39d42011-08-23 19:46:22 +020056try:
57 import msvcrt
58except ImportError:
59 msvcrt = None
60
Benjamin Petersone711caf2008-06-11 16:44:04 +000061#
62#
63#
64
Victor Stinner11f08072017-09-15 06:55:31 -070065# Timeout to wait until a process completes
66TIMEOUT = 30.0 # seconds
67
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000068def latin(s):
69 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
Victor Stinnerd7e64d92017-07-25 00:33:56 +020071
72def close_queue(queue):
73 if isinstance(queue, multiprocessing.queues.Queue):
74 queue.close()
75 queue.join_thread()
76
77
Victor Stinner11f08072017-09-15 06:55:31 -070078def join_process(process):
Victor Stinnerb9b69002017-09-14 14:40:56 -070079 # Since multiprocessing.Process has the same API than threading.Thread
80 # (join() and is_alive(), the support function can be reused
Victor Stinner11f08072017-09-15 06:55:31 -070081 support.join_thread(process, timeout=TIMEOUT)
Victor Stinnerb9b69002017-09-14 14:40:56 -070082
83
Benjamin Petersone711caf2008-06-11 16:44:04 +000084#
85# Constants
86#
87
88LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000089#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000090
91DELTA = 0.1
92CHECK_TIMINGS = False # making true makes tests take a lot longer
93 # and can sometimes cause some non-serious
94 # failures because some calls block a bit
95 # longer than expected
96if CHECK_TIMINGS:
97 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
98else:
99 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
100
101HAVE_GETVALUE = not getattr(_multiprocessing,
102 'HAVE_BROKEN_SEM_GETVALUE', False)
103
Victor Stinner937ee9e2018-06-26 02:11:06 +0200104WIN32 = (sys.platform == "win32")
105
Richard Oudkerk59d54042012-05-10 16:11:12 +0100106from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200107
Richard Oudkerk59d54042012-05-10 16:11:12 +0100108def wait_for_handle(handle, timeout):
109 if timeout is not None and timeout < 0.0:
110 timeout = None
111 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +0000112
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200113try:
114 MAXFD = os.sysconf("SC_OPEN_MAX")
115except:
116 MAXFD = 256
117
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100118# To speed up tests when using the forkserver, we can preload these:
119PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
120
Benjamin Petersone711caf2008-06-11 16:44:04 +0000121#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000122# Some tests require ctypes
123#
124
125try:
Gareth Rees3913bad2017-07-21 11:35:33 +0100126 from ctypes import Structure, c_int, c_double, c_longlong
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000127except ImportError:
128 Structure = object
Antoine Pitrouff92ff52017-07-21 13:24:05 +0200129 c_int = c_double = c_longlong = None
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000130
Charles-François Natali221ef672011-11-22 18:55:22 +0100131
132def check_enough_semaphores():
133 """Check that the system supports enough semaphores to run the test."""
134 # minimum number of semaphores available according to POSIX
135 nsems_min = 256
136 try:
137 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
138 except (AttributeError, ValueError):
139 # sysconf not available or setting not available
140 return
141 if nsems == -1 or nsems >= nsems_min:
142 return
143 raise unittest.SkipTest("The OS doesn't support enough semaphores "
144 "to run the test (required: %d)." % nsems_min)
145
146
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000147#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000148# Creates a wrapper for a function which records the time it takes to finish
149#
150
151class TimingWrapper(object):
152
153 def __init__(self, func):
154 self.func = func
155 self.elapsed = None
156
157 def __call__(self, *args, **kwds):
158 t = time.time()
159 try:
160 return self.func(*args, **kwds)
161 finally:
162 self.elapsed = time.time() - t
163
164#
165# Base class for test cases
166#
167
168class BaseTestCase(object):
169
170 ALLOWED_TYPES = ('processes', 'manager', 'threads')
171
172 def assertTimingAlmostEqual(self, a, b):
173 if CHECK_TIMINGS:
174 self.assertAlmostEqual(a, b, 1)
175
176 def assertReturnsIfImplemented(self, value, func, *args):
177 try:
178 res = func(*args)
179 except NotImplementedError:
180 pass
181 else:
182 return self.assertEqual(value, res)
183
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000184 # For the sanity of Windows users, rather than crashing or freezing in
185 # multiple ways.
186 def __reduce__(self, *args):
187 raise NotImplementedError("shouldn't try to pickle a test case")
188
189 __reduce_ex__ = __reduce__
190
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191#
192# Return the value of a semaphore
193#
194
195def get_value(self):
196 try:
197 return self.get_value()
198 except AttributeError:
199 try:
200 return self._Semaphore__value
201 except AttributeError:
202 try:
203 return self._value
204 except AttributeError:
205 raise NotImplementedError
206
207#
208# Testcases
209#
210
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200211class DummyCallable:
212 def __call__(self, q, c):
213 assert isinstance(c, DummyCallable)
214 q.put(5)
215
216
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217class _TestProcess(BaseTestCase):
218
219 ALLOWED_TYPES = ('processes', 'threads')
220
221 def test_current(self):
222 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600223 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000224
225 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000226 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000227
228 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000230 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000232 self.assertEqual(current.ident, os.getpid())
233 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000235 def test_daemon_argument(self):
236 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600237 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000238
239 # By default uses the current process's daemon flag.
240 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000241 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000242 proc1 = self.Process(target=self._test, daemon=True)
243 self.assertTrue(proc1.daemon)
244 proc2 = self.Process(target=self._test, daemon=False)
245 self.assertFalse(proc2.daemon)
246
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000247 @classmethod
248 def _test(cls, q, *args, **kwds):
249 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250 q.put(args)
251 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000252 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000253 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000254 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255 q.put(current.pid)
256
257 def test_process(self):
258 q = self.Queue(1)
259 e = self.Event()
260 args = (q, 1, 2)
261 kwargs = {'hello':23, 'bye':2.54}
262 name = 'SomeProcess'
263 p = self.Process(
264 target=self._test, args=args, kwargs=kwargs, name=name
265 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000266 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000267 current = self.current_process()
268
269 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000270 self.assertEqual(p.authkey, current.authkey)
271 self.assertEqual(p.is_alive(), False)
272 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000273 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000275 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276
277 p.start()
278
Ezio Melottib3aedd42010-11-20 19:04:17 +0000279 self.assertEqual(p.exitcode, None)
280 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000281 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282
Ezio Melottib3aedd42010-11-20 19:04:17 +0000283 self.assertEqual(q.get(), args[1:])
284 self.assertEqual(q.get(), kwargs)
285 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000287 self.assertEqual(q.get(), current.authkey)
288 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000289
290 p.join()
291
Ezio Melottib3aedd42010-11-20 19:04:17 +0000292 self.assertEqual(p.exitcode, 0)
293 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000294 self.assertNotIn(p, self.active_children())
Victor Stinnerb4c52962017-07-25 02:40:55 +0200295 close_queue(q)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000296
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000297 @classmethod
Vitor Pereiraba75af72017-07-18 16:34:23 +0100298 def _sleep_some(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100299 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000300
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200301 @classmethod
302 def _test_sleep(cls, delay):
303 time.sleep(delay)
304
Vitor Pereiraba75af72017-07-18 16:34:23 +0100305 def _kill_process(self, meth):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000306 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600307 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000308
Vitor Pereiraba75af72017-07-18 16:34:23 +0100309 p = self.Process(target=self._sleep_some)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000310 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000311 p.start()
312
313 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000314 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000315 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000316
Richard Oudkerk59d54042012-05-10 16:11:12 +0100317 join = TimingWrapper(p.join)
318
319 self.assertEqual(join(0), None)
320 self.assertTimingAlmostEqual(join.elapsed, 0.0)
321 self.assertEqual(p.is_alive(), True)
322
323 self.assertEqual(join(-1), None)
324 self.assertTimingAlmostEqual(join.elapsed, 0.0)
325 self.assertEqual(p.is_alive(), True)
326
Richard Oudkerk26f92682013-10-17 13:56:18 +0100327 # XXX maybe terminating too soon causes the problems on Gentoo...
328 time.sleep(1)
329
Vitor Pereiraba75af72017-07-18 16:34:23 +0100330 meth(p)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331
Richard Oudkerk4f350792013-10-13 00:49:27 +0100332 if hasattr(signal, 'alarm'):
Richard Oudkerkd44500a2013-10-17 10:38:37 +0100333 # On the Gentoo buildbot waitpid() often seems to block forever.
Richard Oudkerk26f92682013-10-17 13:56:18 +0100334 # We use alarm() to interrupt it if it blocks for too long.
Richard Oudkerk4f350792013-10-13 00:49:27 +0100335 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100336 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100337 old_handler = signal.signal(signal.SIGALRM, handler)
338 try:
339 signal.alarm(10)
340 self.assertEqual(join(), None)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100341 finally:
Richard Oudkerk1e2f67c2013-10-17 14:24:06 +0100342 signal.alarm(0)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100343 signal.signal(signal.SIGALRM, old_handler)
344 else:
345 self.assertEqual(join(), None)
346
Benjamin Petersone711caf2008-06-11 16:44:04 +0000347 self.assertTimingAlmostEqual(join.elapsed, 0.0)
348
349 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000350 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000351
352 p.join()
353
Vitor Pereiraba75af72017-07-18 16:34:23 +0100354 return p.exitcode
355
356 def test_terminate(self):
357 exitcode = self._kill_process(multiprocessing.Process.terminate)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200358 if os.name != 'nt':
Vitor Pereiraba75af72017-07-18 16:34:23 +0100359 self.assertEqual(exitcode, -signal.SIGTERM)
360
361 def test_kill(self):
362 exitcode = self._kill_process(multiprocessing.Process.kill)
363 if os.name != 'nt':
364 self.assertEqual(exitcode, -signal.SIGKILL)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000365
366 def test_cpu_count(self):
367 try:
368 cpus = multiprocessing.cpu_count()
369 except NotImplementedError:
370 cpus = 1
371 self.assertTrue(type(cpus) is int)
372 self.assertTrue(cpus >= 1)
373
374 def test_active_children(self):
375 self.assertEqual(type(self.active_children()), list)
376
377 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000378 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000379
Jesus Cea94f964f2011-09-09 20:26:57 +0200380 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000382 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000383
384 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000385 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000386
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000387 @classmethod
388 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000389 wconn.send(id)
390 if len(id) < 2:
391 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000392 p = cls.Process(
393 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000394 )
395 p.start()
396 p.join()
397
398 def test_recursion(self):
399 rconn, wconn = self.Pipe(duplex=False)
400 self._test_recursion(wconn, [])
401
402 time.sleep(DELTA)
403 result = []
404 while rconn.poll():
405 result.append(rconn.recv())
406
407 expected = [
408 [],
409 [0],
410 [0, 0],
411 [0, 1],
412 [1],
413 [1, 0],
414 [1, 1]
415 ]
416 self.assertEqual(result, expected)
417
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200418 @classmethod
419 def _test_sentinel(cls, event):
420 event.wait(10.0)
421
422 def test_sentinel(self):
423 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600424 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200425 event = self.Event()
426 p = self.Process(target=self._test_sentinel, args=(event,))
427 with self.assertRaises(ValueError):
428 p.sentinel
429 p.start()
430 self.addCleanup(p.join)
431 sentinel = p.sentinel
432 self.assertIsInstance(sentinel, int)
433 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
434 event.set()
435 p.join()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100436 self.assertTrue(wait_for_handle(sentinel, timeout=1))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200437
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200438 @classmethod
439 def _test_close(cls, rc=0, q=None):
440 if q is not None:
441 q.get()
442 sys.exit(rc)
443
444 def test_close(self):
445 if self.TYPE == "threads":
446 self.skipTest('test not appropriate for {}'.format(self.TYPE))
447 q = self.Queue()
448 p = self.Process(target=self._test_close, kwargs={'q': q})
449 p.daemon = True
450 p.start()
451 self.assertEqual(p.is_alive(), True)
452 # Child is still alive, cannot close
453 with self.assertRaises(ValueError):
454 p.close()
455
456 q.put(None)
457 p.join()
458 self.assertEqual(p.is_alive(), False)
459 self.assertEqual(p.exitcode, 0)
460 p.close()
461 with self.assertRaises(ValueError):
462 p.is_alive()
463 with self.assertRaises(ValueError):
464 p.join()
465 with self.assertRaises(ValueError):
466 p.terminate()
467 p.close()
468
469 wr = weakref.ref(p)
470 del p
471 gc.collect()
472 self.assertIs(wr(), None)
473
Victor Stinnerb4c52962017-07-25 02:40:55 +0200474 close_queue(q)
475
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200476 def test_many_processes(self):
477 if self.TYPE == 'threads':
478 self.skipTest('test not appropriate for {}'.format(self.TYPE))
479
480 sm = multiprocessing.get_start_method()
481 N = 5 if sm == 'spawn' else 100
482
483 # Try to overwhelm the forkserver loop with events
484 procs = [self.Process(target=self._test_sleep, args=(0.01,))
485 for i in range(N)]
486 for p in procs:
487 p.start()
488 for p in procs:
Victor Stinner11f08072017-09-15 06:55:31 -0700489 join_process(p)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200490 for p in procs:
491 self.assertEqual(p.exitcode, 0)
492
Vitor Pereiraba75af72017-07-18 16:34:23 +0100493 procs = [self.Process(target=self._sleep_some)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200494 for i in range(N)]
495 for p in procs:
496 p.start()
497 time.sleep(0.001) # let the children start...
498 for p in procs:
499 p.terminate()
500 for p in procs:
Victor Stinner11f08072017-09-15 06:55:31 -0700501 join_process(p)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200502 if os.name != 'nt':
Victor Stinnere6cfdef2017-10-02 08:27:34 -0700503 exitcodes = [-signal.SIGTERM]
504 if sys.platform == 'darwin':
505 # bpo-31510: On macOS, killing a freshly started process with
506 # SIGTERM sometimes kills the process with SIGKILL.
507 exitcodes.append(-signal.SIGKILL)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200508 for p in procs:
Victor Stinnere6cfdef2017-10-02 08:27:34 -0700509 self.assertIn(p.exitcode, exitcodes)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200510
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200511 def test_lose_target_ref(self):
512 c = DummyCallable()
513 wr = weakref.ref(c)
514 q = self.Queue()
515 p = self.Process(target=c, args=(q, c))
516 del c
517 p.start()
518 p.join()
519 self.assertIs(wr(), None)
520 self.assertEqual(q.get(), 5)
Victor Stinnerb4c52962017-07-25 02:40:55 +0200521 close_queue(q)
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200522
Antoine Pitrou896145d2017-07-22 13:22:54 +0200523 @classmethod
524 def _test_child_fd_inflation(self, evt, q):
525 q.put(test.support.fd_count())
526 evt.wait()
527
528 def test_child_fd_inflation(self):
529 # Number of fds in child processes should not grow with the
530 # number of running children.
531 if self.TYPE == 'threads':
532 self.skipTest('test not appropriate for {}'.format(self.TYPE))
533
534 sm = multiprocessing.get_start_method()
535 if sm == 'fork':
536 # The fork method by design inherits all fds from the parent,
537 # trying to go against it is a lost battle
538 self.skipTest('test not appropriate for {}'.format(sm))
539
540 N = 5
541 evt = self.Event()
542 q = self.Queue()
543
544 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
545 for i in range(N)]
546 for p in procs:
547 p.start()
548
549 try:
550 fd_counts = [q.get() for i in range(N)]
551 self.assertEqual(len(set(fd_counts)), 1, fd_counts)
552
553 finally:
554 evt.set()
555 for p in procs:
556 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200557 close_queue(q)
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200558
Antoine Pitrouee84a602017-08-16 20:53:28 +0200559 @classmethod
560 def _test_wait_for_threads(self, evt):
561 def func1():
562 time.sleep(0.5)
563 evt.set()
564
565 def func2():
566 time.sleep(20)
567 evt.clear()
568
569 threading.Thread(target=func1).start()
570 threading.Thread(target=func2, daemon=True).start()
571
572 def test_wait_for_threads(self):
573 # A child process should wait for non-daemonic threads to end
574 # before exiting
575 if self.TYPE == 'threads':
576 self.skipTest('test not appropriate for {}'.format(self.TYPE))
577
578 evt = self.Event()
579 proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
580 proc.start()
581 proc.join()
582 self.assertTrue(evt.is_set())
583
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200584 @classmethod
Antoine Pitroue756f662018-03-11 19:21:38 +0100585 def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
586 for stream_name, action in break_std_streams.items():
587 if action == 'close':
588 stream = io.StringIO()
589 stream.close()
590 else:
591 assert action == 'remove'
592 stream = None
593 setattr(sys, stream_name, None)
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200594 evt.set()
595
Antoine Pitroue756f662018-03-11 19:21:38 +0100596 def test_error_on_stdio_flush_1(self):
597 # Check that Process works with broken standard streams
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200598 streams = [io.StringIO(), None]
599 streams[0].close()
600 for stream_name in ('stdout', 'stderr'):
601 for stream in streams:
602 old_stream = getattr(sys, stream_name)
603 setattr(sys, stream_name, stream)
604 try:
605 evt = self.Event()
606 proc = self.Process(target=self._test_error_on_stdio_flush,
607 args=(evt,))
608 proc.start()
609 proc.join()
610 self.assertTrue(evt.is_set())
Antoine Pitroue756f662018-03-11 19:21:38 +0100611 self.assertEqual(proc.exitcode, 0)
612 finally:
613 setattr(sys, stream_name, old_stream)
614
615 def test_error_on_stdio_flush_2(self):
616 # Same as test_error_on_stdio_flush_1(), but standard streams are
617 # broken by the child process
618 for stream_name in ('stdout', 'stderr'):
619 for action in ('close', 'remove'):
620 old_stream = getattr(sys, stream_name)
621 try:
622 evt = self.Event()
623 proc = self.Process(target=self._test_error_on_stdio_flush,
624 args=(evt, {stream_name: action}))
625 proc.start()
626 proc.join()
627 self.assertTrue(evt.is_set())
628 self.assertEqual(proc.exitcode, 0)
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200629 finally:
630 setattr(sys, stream_name, old_stream)
631
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100632 @classmethod
633 def _sleep_and_set_event(self, evt, delay=0.0):
634 time.sleep(delay)
635 evt.set()
636
637 def check_forkserver_death(self, signum):
638 # bpo-31308: if the forkserver process has died, we should still
639 # be able to create and run new Process instances (the forkserver
640 # is implicitly restarted).
641 if self.TYPE == 'threads':
642 self.skipTest('test not appropriate for {}'.format(self.TYPE))
643 sm = multiprocessing.get_start_method()
644 if sm != 'forkserver':
645 # The fork method by design inherits all fds from the parent,
646 # trying to go against it is a lost battle
647 self.skipTest('test not appropriate for {}'.format(sm))
648
649 from multiprocessing.forkserver import _forkserver
650 _forkserver.ensure_running()
651
Victor Stinner07888e12018-07-04 11:49:41 +0200652 # First process sleeps 500 ms
653 delay = 0.5
654
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100655 evt = self.Event()
Victor Stinner07888e12018-07-04 11:49:41 +0200656 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100657 proc.start()
658
659 pid = _forkserver._forkserver_pid
660 os.kill(pid, signum)
Victor Stinner07888e12018-07-04 11:49:41 +0200661 # give time to the fork server to die and time to proc to complete
662 time.sleep(delay * 2.0)
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100663
664 evt2 = self.Event()
665 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
666 proc2.start()
667 proc2.join()
668 self.assertTrue(evt2.is_set())
669 self.assertEqual(proc2.exitcode, 0)
670
671 proc.join()
672 self.assertTrue(evt.is_set())
673 self.assertIn(proc.exitcode, (0, 255))
674
675 def test_forkserver_sigint(self):
676 # Catchable signal
677 self.check_forkserver_death(signal.SIGINT)
678
679 def test_forkserver_sigkill(self):
680 # Uncatchable signal
681 if os.name != 'nt':
682 self.check_forkserver_death(signal.SIGKILL)
683
Antoine Pitrouee84a602017-08-16 20:53:28 +0200684
Benjamin Petersone711caf2008-06-11 16:44:04 +0000685#
686#
687#
688
689class _UpperCaser(multiprocessing.Process):
690
691 def __init__(self):
692 multiprocessing.Process.__init__(self)
693 self.child_conn, self.parent_conn = multiprocessing.Pipe()
694
695 def run(self):
696 self.parent_conn.close()
697 for s in iter(self.child_conn.recv, None):
698 self.child_conn.send(s.upper())
699 self.child_conn.close()
700
701 def submit(self, s):
702 assert type(s) is str
703 self.parent_conn.send(s)
704 return self.parent_conn.recv()
705
706 def stop(self):
707 self.parent_conn.send(None)
708 self.parent_conn.close()
709 self.child_conn.close()
710
711class _TestSubclassingProcess(BaseTestCase):
712
713 ALLOWED_TYPES = ('processes',)
714
715 def test_subclassing(self):
716 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200717 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000718 uppercaser.start()
719 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
720 self.assertEqual(uppercaser.submit('world'), 'WORLD')
721 uppercaser.stop()
722 uppercaser.join()
723
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100724 def test_stderr_flush(self):
725 # sys.stderr is flushed at process shutdown (issue #13812)
726 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600727 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100728
729 testfn = test.support.TESTFN
730 self.addCleanup(test.support.unlink, testfn)
731 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
732 proc.start()
733 proc.join()
734 with open(testfn, 'r') as f:
735 err = f.read()
736 # The whole traceback was printed
737 self.assertIn("ZeroDivisionError", err)
738 self.assertIn("test_multiprocessing.py", err)
739 self.assertIn("1/0 # MARKER", err)
740
741 @classmethod
742 def _test_stderr_flush(cls, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100743 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
744 sys.stderr = open(fd, 'w', closefd=False)
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100745 1/0 # MARKER
746
747
Richard Oudkerk29471de2012-06-06 19:04:57 +0100748 @classmethod
749 def _test_sys_exit(cls, reason, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100750 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
751 sys.stderr = open(fd, 'w', closefd=False)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100752 sys.exit(reason)
753
754 def test_sys_exit(self):
755 # See Issue 13854
756 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600757 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk29471de2012-06-06 19:04:57 +0100758
759 testfn = test.support.TESTFN
760 self.addCleanup(test.support.unlink, testfn)
761
Victor Stinnera6d865c2016-03-25 09:29:50 +0100762 for reason in (
763 [1, 2, 3],
764 'ignore this',
765 ):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100766 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
767 p.daemon = True
768 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -0700769 join_process(p)
Victor Stinnera6d865c2016-03-25 09:29:50 +0100770 self.assertEqual(p.exitcode, 1)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100771
772 with open(testfn, 'r') as f:
Victor Stinnera6d865c2016-03-25 09:29:50 +0100773 content = f.read()
774 self.assertEqual(content.rstrip(), str(reason))
775
776 os.unlink(testfn)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100777
778 for reason in (True, False, 8):
779 p = self.Process(target=sys.exit, args=(reason,))
780 p.daemon = True
781 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -0700782 join_process(p)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100783 self.assertEqual(p.exitcode, reason)
784
Benjamin Petersone711caf2008-06-11 16:44:04 +0000785#
786#
787#
788
789def queue_empty(q):
790 if hasattr(q, 'empty'):
791 return q.empty()
792 else:
793 return q.qsize() == 0
794
795def queue_full(q, maxsize):
796 if hasattr(q, 'full'):
797 return q.full()
798 else:
799 return q.qsize() == maxsize
800
801
802class _TestQueue(BaseTestCase):
803
804
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000805 @classmethod
806 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807 child_can_start.wait()
808 for i in range(6):
809 queue.get()
810 parent_can_continue.set()
811
812 def test_put(self):
813 MAXSIZE = 6
814 queue = self.Queue(maxsize=MAXSIZE)
815 child_can_start = self.Event()
816 parent_can_continue = self.Event()
817
818 proc = self.Process(
819 target=self._test_put,
820 args=(queue, child_can_start, parent_can_continue)
821 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000822 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000823 proc.start()
824
825 self.assertEqual(queue_empty(queue), True)
826 self.assertEqual(queue_full(queue, MAXSIZE), False)
827
828 queue.put(1)
829 queue.put(2, True)
830 queue.put(3, True, None)
831 queue.put(4, False)
832 queue.put(5, False, None)
833 queue.put_nowait(6)
834
835 # the values may be in buffer but not yet in pipe so sleep a bit
836 time.sleep(DELTA)
837
838 self.assertEqual(queue_empty(queue), False)
839 self.assertEqual(queue_full(queue, MAXSIZE), True)
840
841 put = TimingWrapper(queue.put)
842 put_nowait = TimingWrapper(queue.put_nowait)
843
844 self.assertRaises(pyqueue.Full, put, 7, False)
845 self.assertTimingAlmostEqual(put.elapsed, 0)
846
847 self.assertRaises(pyqueue.Full, put, 7, False, None)
848 self.assertTimingAlmostEqual(put.elapsed, 0)
849
850 self.assertRaises(pyqueue.Full, put_nowait, 7)
851 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
852
853 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
854 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
855
856 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
857 self.assertTimingAlmostEqual(put.elapsed, 0)
858
859 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
860 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
861
862 child_can_start.set()
863 parent_can_continue.wait()
864
865 self.assertEqual(queue_empty(queue), True)
866 self.assertEqual(queue_full(queue, MAXSIZE), False)
867
868 proc.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200869 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000870
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000871 @classmethod
872 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000873 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000874 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000875 queue.put(2)
876 queue.put(3)
877 queue.put(4)
878 queue.put(5)
879 parent_can_continue.set()
880
881 def test_get(self):
882 queue = self.Queue()
883 child_can_start = self.Event()
884 parent_can_continue = self.Event()
885
886 proc = self.Process(
887 target=self._test_get,
888 args=(queue, child_can_start, parent_can_continue)
889 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000890 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891 proc.start()
892
893 self.assertEqual(queue_empty(queue), True)
894
895 child_can_start.set()
896 parent_can_continue.wait()
897
898 time.sleep(DELTA)
899 self.assertEqual(queue_empty(queue), False)
900
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000901 # Hangs unexpectedly, remove for now
902 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000903 self.assertEqual(queue.get(True, None), 2)
904 self.assertEqual(queue.get(True), 3)
905 self.assertEqual(queue.get(timeout=1), 4)
906 self.assertEqual(queue.get_nowait(), 5)
907
908 self.assertEqual(queue_empty(queue), True)
909
910 get = TimingWrapper(queue.get)
911 get_nowait = TimingWrapper(queue.get_nowait)
912
913 self.assertRaises(pyqueue.Empty, get, False)
914 self.assertTimingAlmostEqual(get.elapsed, 0)
915
916 self.assertRaises(pyqueue.Empty, get, False, None)
917 self.assertTimingAlmostEqual(get.elapsed, 0)
918
919 self.assertRaises(pyqueue.Empty, get_nowait)
920 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
921
922 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
923 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
924
925 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
926 self.assertTimingAlmostEqual(get.elapsed, 0)
927
928 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
929 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
930
931 proc.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200932 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000933
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000934 @classmethod
935 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000936 for i in range(10, 20):
937 queue.put(i)
938 # note that at this point the items may only be buffered, so the
939 # process cannot shutdown until the feeder thread has finished
940 # pushing items onto the pipe.
941
942 def test_fork(self):
943 # Old versions of Queue would fail to create a new feeder
944 # thread for a forked process if the original process had its
945 # own feeder thread. This test checks that this no longer
946 # happens.
947
948 queue = self.Queue()
949
950 # put items on queue so that main process starts a feeder thread
951 for i in range(10):
952 queue.put(i)
953
954 # wait to make sure thread starts before we fork a new process
955 time.sleep(DELTA)
956
957 # fork process
958 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200959 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000960 p.start()
961
962 # check that all expected items are in the queue
963 for i in range(20):
964 self.assertEqual(queue.get(), i)
965 self.assertRaises(pyqueue.Empty, queue.get, False)
966
967 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200968 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000969
970 def test_qsize(self):
971 q = self.Queue()
972 try:
973 self.assertEqual(q.qsize(), 0)
974 except NotImplementedError:
Zachary Ware9fe6d862013-12-08 00:20:35 -0600975 self.skipTest('qsize method not implemented')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976 q.put(1)
977 self.assertEqual(q.qsize(), 1)
978 q.put(5)
979 self.assertEqual(q.qsize(), 2)
980 q.get()
981 self.assertEqual(q.qsize(), 1)
982 q.get()
983 self.assertEqual(q.qsize(), 0)
Victor Stinnerd7e64d92017-07-25 00:33:56 +0200984 close_queue(q)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000985
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000986 @classmethod
987 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000988 for obj in iter(q.get, None):
989 time.sleep(DELTA)
990 q.task_done()
991
992 def test_task_done(self):
993 queue = self.JoinableQueue()
994
Benjamin Petersone711caf2008-06-11 16:44:04 +0000995 workers = [self.Process(target=self._test_task_done, args=(queue,))
996 for i in range(4)]
997
998 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200999 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001000 p.start()
1001
1002 for i in range(10):
1003 queue.put(i)
1004
1005 queue.join()
1006
1007 for p in workers:
1008 queue.put(None)
1009
1010 for p in workers:
1011 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +02001012 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001013
Serhiy Storchakaf8904e92015-03-06 23:32:54 +02001014 def test_no_import_lock_contention(self):
1015 with test.support.temp_cwd():
1016 module_name = 'imported_by_an_imported_module'
1017 with open(module_name + '.py', 'w') as f:
1018 f.write("""if 1:
1019 import multiprocessing
1020
1021 q = multiprocessing.Queue()
1022 q.put('knock knock')
1023 q.get(timeout=3)
1024 q.close()
1025 del q
1026 """)
1027
1028 with test.support.DirsOnSysPath(os.getcwd()):
1029 try:
1030 __import__(module_name)
1031 except pyqueue.Empty:
1032 self.fail("Probable regression on import lock contention;"
1033 " see Issue #22853")
1034
Giampaolo Rodola'30830712013-04-17 13:12:27 +02001035 def test_timeout(self):
1036 q = multiprocessing.Queue()
1037 start = time.time()
Victor Stinneraad7b2e2015-02-05 14:25:05 +01001038 self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
Giampaolo Rodola'30830712013-04-17 13:12:27 +02001039 delta = time.time() - start
Victor Stinner5640d032018-08-03 02:09:00 +02001040 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1041 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1042 # failed because the delta was only 135.8 ms.
1043 self.assertGreaterEqual(delta, 0.100)
Victor Stinnerb4c52962017-07-25 02:40:55 +02001044 close_queue(q)
Giampaolo Rodola'30830712013-04-17 13:12:27 +02001045
grzgrzgrz3bc50f032017-05-25 16:22:57 +02001046 def test_queue_feeder_donot_stop_onexc(self):
1047 # bpo-30414: verify feeder handles exceptions correctly
1048 if self.TYPE != 'processes':
1049 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1050
1051 class NotSerializable(object):
1052 def __reduce__(self):
1053 raise AttributeError
1054 with test.support.captured_stderr():
1055 q = self.Queue()
1056 q.put(NotSerializable())
1057 q.put(True)
Victor Stinner8f6eeaf2017-06-13 23:48:47 +02001058 # bpo-30595: use a timeout of 1 second for slow buildbots
1059 self.assertTrue(q.get(timeout=1.0))
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001060 close_queue(q)
grzgrzgrz3bc50f032017-05-25 16:22:57 +02001061
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001062 with test.support.captured_stderr():
1063 # bpo-33078: verify that the queue size is correctly handled
1064 # on errors.
1065 q = self.Queue(maxsize=1)
1066 q.put(NotSerializable())
1067 q.put(True)
Thomas Moreaudec1c772018-03-21 18:56:27 +01001068 try:
1069 self.assertEqual(q.qsize(), 1)
1070 except NotImplementedError:
1071 # qsize is not available on all platform as it
1072 # relies on sem_getvalue
1073 pass
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001074 # bpo-30595: use a timeout of 1 second for slow buildbots
1075 self.assertTrue(q.get(timeout=1.0))
1076 # Check that the size of the queue is correct
Thomas Moreaudec1c772018-03-21 18:56:27 +01001077 self.assertTrue(q.empty())
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001078 close_queue(q)
1079
Thomas Moreau94459fd2018-01-05 11:15:54 +01001080 def test_queue_feeder_on_queue_feeder_error(self):
1081 # bpo-30006: verify feeder handles exceptions using the
1082 # _on_queue_feeder_error hook.
1083 if self.TYPE != 'processes':
1084 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1085
1086 class NotSerializable(object):
1087 """Mock unserializable object"""
1088 def __init__(self):
1089 self.reduce_was_called = False
1090 self.on_queue_feeder_error_was_called = False
1091
1092 def __reduce__(self):
1093 self.reduce_was_called = True
1094 raise AttributeError
1095
1096 class SafeQueue(multiprocessing.queues.Queue):
1097 """Queue with overloaded _on_queue_feeder_error hook"""
1098 @staticmethod
1099 def _on_queue_feeder_error(e, obj):
1100 if (isinstance(e, AttributeError) and
1101 isinstance(obj, NotSerializable)):
1102 obj.on_queue_feeder_error_was_called = True
1103
1104 not_serializable_obj = NotSerializable()
1105 # The captured_stderr reduces the noise in the test report
1106 with test.support.captured_stderr():
1107 q = SafeQueue(ctx=multiprocessing.get_context())
1108 q.put(not_serializable_obj)
1109
Ville Skyttä61f82e02018-04-20 23:08:45 +03001110 # Verify that q is still functioning correctly
Thomas Moreau94459fd2018-01-05 11:15:54 +01001111 q.put(True)
1112 self.assertTrue(q.get(timeout=1.0))
1113
1114 # Assert that the serialization and the hook have been called correctly
1115 self.assertTrue(not_serializable_obj.reduce_was_called)
1116 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
Zackery Spytz04617042018-10-13 03:26:09 -06001117
1118 def test_closed_queue_put_get_exceptions(self):
1119 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1120 q.close()
1121 with self.assertRaisesRegex(ValueError, 'is closed'):
1122 q.put('foo')
1123 with self.assertRaisesRegex(ValueError, 'is closed'):
1124 q.get()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001125#
1126#
1127#
1128
1129class _TestLock(BaseTestCase):
1130
1131 def test_lock(self):
1132 lock = self.Lock()
1133 self.assertEqual(lock.acquire(), True)
1134 self.assertEqual(lock.acquire(False), False)
1135 self.assertEqual(lock.release(), None)
1136 self.assertRaises((ValueError, threading.ThreadError), lock.release)
1137
1138 def test_rlock(self):
1139 lock = self.RLock()
1140 self.assertEqual(lock.acquire(), True)
1141 self.assertEqual(lock.acquire(), True)
1142 self.assertEqual(lock.acquire(), True)
1143 self.assertEqual(lock.release(), None)
1144 self.assertEqual(lock.release(), None)
1145 self.assertEqual(lock.release(), None)
1146 self.assertRaises((AssertionError, RuntimeError), lock.release)
1147
Jesse Nollerf8d00852009-03-31 03:25:07 +00001148 def test_lock_context(self):
1149 with self.Lock():
1150 pass
1151
Benjamin Petersone711caf2008-06-11 16:44:04 +00001152
1153class _TestSemaphore(BaseTestCase):
1154
1155 def _test_semaphore(self, sem):
1156 self.assertReturnsIfImplemented(2, get_value, sem)
1157 self.assertEqual(sem.acquire(), True)
1158 self.assertReturnsIfImplemented(1, get_value, sem)
1159 self.assertEqual(sem.acquire(), True)
1160 self.assertReturnsIfImplemented(0, get_value, sem)
1161 self.assertEqual(sem.acquire(False), False)
1162 self.assertReturnsIfImplemented(0, get_value, sem)
1163 self.assertEqual(sem.release(), None)
1164 self.assertReturnsIfImplemented(1, get_value, sem)
1165 self.assertEqual(sem.release(), None)
1166 self.assertReturnsIfImplemented(2, get_value, sem)
1167
1168 def test_semaphore(self):
1169 sem = self.Semaphore(2)
1170 self._test_semaphore(sem)
1171 self.assertEqual(sem.release(), None)
1172 self.assertReturnsIfImplemented(3, get_value, sem)
1173 self.assertEqual(sem.release(), None)
1174 self.assertReturnsIfImplemented(4, get_value, sem)
1175
1176 def test_bounded_semaphore(self):
1177 sem = self.BoundedSemaphore(2)
1178 self._test_semaphore(sem)
1179 # Currently fails on OS/X
1180 #if HAVE_GETVALUE:
1181 # self.assertRaises(ValueError, sem.release)
1182 # self.assertReturnsIfImplemented(2, get_value, sem)
1183
1184 def test_timeout(self):
1185 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001186 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001187
1188 sem = self.Semaphore(0)
1189 acquire = TimingWrapper(sem.acquire)
1190
1191 self.assertEqual(acquire(False), False)
1192 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1193
1194 self.assertEqual(acquire(False, None), False)
1195 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1196
1197 self.assertEqual(acquire(False, TIMEOUT1), False)
1198 self.assertTimingAlmostEqual(acquire.elapsed, 0)
1199
1200 self.assertEqual(acquire(True, TIMEOUT2), False)
1201 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1202
1203 self.assertEqual(acquire(timeout=TIMEOUT3), False)
1204 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1205
1206
1207class _TestCondition(BaseTestCase):
1208
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001209 @classmethod
1210 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001211 cond.acquire()
1212 sleeping.release()
1213 cond.wait(timeout)
1214 woken.release()
1215 cond.release()
1216
Antoine Pitrou48350412017-07-04 08:59:22 +02001217 def assertReachesEventually(self, func, value):
1218 for i in range(10):
1219 try:
1220 if func() == value:
1221 break
1222 except NotImplementedError:
1223 break
1224 time.sleep(DELTA)
1225 time.sleep(DELTA)
1226 self.assertReturnsIfImplemented(value, func)
1227
Benjamin Petersone711caf2008-06-11 16:44:04 +00001228 def check_invariant(self, cond):
1229 # this is only supposed to succeed when there are no sleepers
1230 if self.TYPE == 'processes':
1231 try:
1232 sleepers = (cond._sleeping_count.get_value() -
1233 cond._woken_count.get_value())
1234 self.assertEqual(sleepers, 0)
1235 self.assertEqual(cond._wait_semaphore.get_value(), 0)
1236 except NotImplementedError:
1237 pass
1238
1239 def test_notify(self):
1240 cond = self.Condition()
1241 sleeping = self.Semaphore(0)
1242 woken = self.Semaphore(0)
1243
1244 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001245 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001246 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001247 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001248
1249 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001250 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001251 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001252 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001253
1254 # wait for both children to start sleeping
1255 sleeping.acquire()
1256 sleeping.acquire()
1257
1258 # check no process/thread has woken up
1259 time.sleep(DELTA)
1260 self.assertReturnsIfImplemented(0, get_value, woken)
1261
1262 # wake up one process/thread
1263 cond.acquire()
1264 cond.notify()
1265 cond.release()
1266
1267 # check one process/thread has woken up
1268 time.sleep(DELTA)
1269 self.assertReturnsIfImplemented(1, get_value, woken)
1270
1271 # wake up another
1272 cond.acquire()
1273 cond.notify()
1274 cond.release()
1275
1276 # check other has woken up
1277 time.sleep(DELTA)
1278 self.assertReturnsIfImplemented(2, get_value, woken)
1279
1280 # check state is not mucked up
1281 self.check_invariant(cond)
1282 p.join()
1283
1284 def test_notify_all(self):
1285 cond = self.Condition()
1286 sleeping = self.Semaphore(0)
1287 woken = self.Semaphore(0)
1288
1289 # start some threads/processes which will timeout
1290 for i in range(3):
1291 p = self.Process(target=self.f,
1292 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001293 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001294 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001295 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001296
1297 t = threading.Thread(target=self.f,
1298 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +00001299 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001300 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001301 self.addCleanup(t.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001302
1303 # wait for them all to sleep
1304 for i in range(6):
1305 sleeping.acquire()
1306
1307 # check they have all timed out
1308 for i in range(6):
1309 woken.acquire()
1310 self.assertReturnsIfImplemented(0, get_value, woken)
1311
1312 # check state is not mucked up
1313 self.check_invariant(cond)
1314
1315 # start some more threads/processes
1316 for i in range(3):
1317 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001318 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001319 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001320 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001321
1322 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +00001323 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001324 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001325 self.addCleanup(t.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001326
1327 # wait for them to all sleep
1328 for i in range(6):
1329 sleeping.acquire()
1330
1331 # check no process/thread has woken up
1332 time.sleep(DELTA)
1333 self.assertReturnsIfImplemented(0, get_value, woken)
1334
1335 # wake them all up
1336 cond.acquire()
1337 cond.notify_all()
1338 cond.release()
1339
1340 # check they have all woken
Antoine Pitrou48350412017-07-04 08:59:22 +02001341 self.assertReachesEventually(lambda: get_value(woken), 6)
1342
1343 # check state is not mucked up
1344 self.check_invariant(cond)
1345
1346 def test_notify_n(self):
1347 cond = self.Condition()
1348 sleeping = self.Semaphore(0)
1349 woken = self.Semaphore(0)
1350
1351 # start some threads/processes
1352 for i in range(3):
1353 p = self.Process(target=self.f, args=(cond, sleeping, woken))
1354 p.daemon = True
1355 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001356 self.addCleanup(p.join)
Antoine Pitrou48350412017-07-04 08:59:22 +02001357
1358 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1359 t.daemon = True
1360 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001361 self.addCleanup(t.join)
Antoine Pitrou48350412017-07-04 08:59:22 +02001362
1363 # wait for them to all sleep
1364 for i in range(6):
1365 sleeping.acquire()
1366
1367 # check no process/thread has woken up
1368 time.sleep(DELTA)
1369 self.assertReturnsIfImplemented(0, get_value, woken)
1370
1371 # wake some of them up
1372 cond.acquire()
1373 cond.notify(n=2)
1374 cond.release()
1375
1376 # check 2 have woken
1377 self.assertReachesEventually(lambda: get_value(woken), 2)
1378
1379 # wake the rest of them
1380 cond.acquire()
1381 cond.notify(n=4)
1382 cond.release()
1383
1384 self.assertReachesEventually(lambda: get_value(woken), 6)
1385
1386 # doesn't do anything more
1387 cond.acquire()
1388 cond.notify(n=3)
1389 cond.release()
1390
Benjamin Petersone711caf2008-06-11 16:44:04 +00001391 self.assertReturnsIfImplemented(6, get_value, woken)
1392
1393 # check state is not mucked up
1394 self.check_invariant(cond)
1395
1396 def test_timeout(self):
1397 cond = self.Condition()
1398 wait = TimingWrapper(cond.wait)
1399 cond.acquire()
1400 res = wait(TIMEOUT1)
1401 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +00001402 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001403 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1404
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001405 @classmethod
1406 def _test_waitfor_f(cls, cond, state):
1407 with cond:
1408 state.value = 0
1409 cond.notify()
1410 result = cond.wait_for(lambda : state.value==4)
1411 if not result or state.value != 4:
1412 sys.exit(1)
1413
1414 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1415 def test_waitfor(self):
1416 # based on test in test/lock_tests.py
1417 cond = self.Condition()
1418 state = self.Value('i', -1)
1419
1420 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1421 p.daemon = True
1422 p.start()
1423
1424 with cond:
1425 result = cond.wait_for(lambda : state.value==0)
1426 self.assertTrue(result)
1427 self.assertEqual(state.value, 0)
1428
1429 for i in range(4):
1430 time.sleep(0.01)
1431 with cond:
1432 state.value += 1
1433 cond.notify()
1434
Victor Stinner11f08072017-09-15 06:55:31 -07001435 join_process(p)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001436 self.assertEqual(p.exitcode, 0)
1437
1438 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001439 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1440 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001441 with cond:
1442 expected = 0.1
1443 dt = time.time()
1444 result = cond.wait_for(lambda : state.value==4, timeout=expected)
1445 dt = time.time() - dt
1446 # borrow logic in assertTimeout() from test/lock_tests.py
1447 if not result and expected * 0.6 < dt < expected * 10.0:
1448 success.value = True
1449
1450 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1451 def test_waitfor_timeout(self):
1452 # based on test in test/lock_tests.py
1453 cond = self.Condition()
1454 state = self.Value('i', 0)
1455 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001456 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001457
1458 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001459 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001460 p.daemon = True
1461 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -07001462 self.assertTrue(sem.acquire(timeout=TIMEOUT))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001463
1464 # Only increment 3 times, so state == 4 is never reached.
1465 for i in range(3):
1466 time.sleep(0.01)
1467 with cond:
1468 state.value += 1
1469 cond.notify()
1470
Victor Stinner11f08072017-09-15 06:55:31 -07001471 join_process(p)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001472 self.assertTrue(success.value)
1473
Richard Oudkerk98449932012-06-05 13:15:29 +01001474 @classmethod
1475 def _test_wait_result(cls, c, pid):
1476 with c:
1477 c.notify()
1478 time.sleep(1)
1479 if pid is not None:
1480 os.kill(pid, signal.SIGINT)
1481
1482 def test_wait_result(self):
1483 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1484 pid = os.getpid()
1485 else:
1486 pid = None
1487
1488 c = self.Condition()
1489 with c:
1490 self.assertFalse(c.wait(0))
1491 self.assertFalse(c.wait(0.1))
1492
1493 p = self.Process(target=self._test_wait_result, args=(c, pid))
1494 p.start()
1495
Victor Stinner49257272018-06-27 22:24:02 +02001496 self.assertTrue(c.wait(60))
Richard Oudkerk98449932012-06-05 13:15:29 +01001497 if pid is not None:
Victor Stinner49257272018-06-27 22:24:02 +02001498 self.assertRaises(KeyboardInterrupt, c.wait, 60)
Richard Oudkerk98449932012-06-05 13:15:29 +01001499
1500 p.join()
1501
Benjamin Petersone711caf2008-06-11 16:44:04 +00001502
1503class _TestEvent(BaseTestCase):
1504
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001505 @classmethod
1506 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001507 time.sleep(TIMEOUT2)
1508 event.set()
1509
1510 def test_event(self):
1511 event = self.Event()
1512 wait = TimingWrapper(event.wait)
1513
Ezio Melotti13925002011-03-16 11:05:33 +02001514 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001515 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001516 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001517
Benjamin Peterson965ce872009-04-05 21:24:58 +00001518 # Removed, threading.Event.wait() will return the value of the __flag
1519 # instead of None. API Shear with the semaphore backed mp.Event
1520 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001521 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001522 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001523 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1524
1525 event.set()
1526
1527 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001528 self.assertEqual(event.is_set(), True)
1529 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001530 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001531 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001532 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1533 # self.assertEqual(event.is_set(), True)
1534
1535 event.clear()
1536
1537 #self.assertEqual(event.is_set(), False)
1538
Jesus Cea94f964f2011-09-09 20:26:57 +02001539 p = self.Process(target=self._test_event, args=(event,))
1540 p.daemon = True
1541 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001542 self.assertEqual(wait(), True)
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001543 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001544
1545#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001546# Tests for Barrier - adapted from tests in test/lock_tests.py
1547#
1548
1549# Many of the tests for threading.Barrier use a list as an atomic
1550# counter: a value is appended to increment the counter, and the
1551# length of the list gives the value. We use the class DummyList
1552# for the same purpose.
1553
1554class _DummyList(object):
1555
1556 def __init__(self):
1557 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1558 lock = multiprocessing.Lock()
1559 self.__setstate__((wrapper, lock))
1560 self._lengthbuf[0] = 0
1561
1562 def __setstate__(self, state):
1563 (self._wrapper, self._lock) = state
1564 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1565
1566 def __getstate__(self):
1567 return (self._wrapper, self._lock)
1568
1569 def append(self, _):
1570 with self._lock:
1571 self._lengthbuf[0] += 1
1572
1573 def __len__(self):
1574 with self._lock:
1575 return self._lengthbuf[0]
1576
1577def _wait():
1578 # A crude wait/yield function not relying on synchronization primitives.
1579 time.sleep(0.01)
1580
1581
1582class Bunch(object):
1583 """
1584 A bunch of threads.
1585 """
1586 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1587 """
1588 Construct a bunch of `n` threads running the same function `f`.
1589 If `wait_before_exit` is True, the threads won't terminate until
1590 do_finish() is called.
1591 """
1592 self.f = f
1593 self.args = args
1594 self.n = n
1595 self.started = namespace.DummyList()
1596 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001597 self._can_exit = namespace.Event()
1598 if not wait_before_exit:
1599 self._can_exit.set()
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001600
1601 threads = []
Richard Oudkerk3730a172012-06-15 18:26:07 +01001602 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001603 p = namespace.Process(target=self.task)
1604 p.daemon = True
1605 p.start()
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001606 threads.append(p)
1607
1608 def finalize(threads):
1609 for p in threads:
1610 p.join()
1611
1612 self._finalizer = weakref.finalize(self, finalize, threads)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001613
1614 def task(self):
1615 pid = os.getpid()
1616 self.started.append(pid)
1617 try:
1618 self.f(*self.args)
1619 finally:
1620 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001621 self._can_exit.wait(30)
1622 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001623
1624 def wait_for_started(self):
1625 while len(self.started) < self.n:
1626 _wait()
1627
1628 def wait_for_finished(self):
1629 while len(self.finished) < self.n:
1630 _wait()
1631
1632 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001633 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001634
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001635 def close(self):
1636 self._finalizer()
1637
Richard Oudkerk3730a172012-06-15 18:26:07 +01001638
1639class AppendTrue(object):
1640 def __init__(self, obj):
1641 self.obj = obj
1642 def __call__(self):
1643 self.obj.append(True)
1644
1645
1646class _TestBarrier(BaseTestCase):
1647 """
1648 Tests for Barrier objects.
1649 """
1650 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001651 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001652
1653 def setUp(self):
1654 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1655
1656 def tearDown(self):
1657 self.barrier.abort()
1658 self.barrier = None
1659
1660 def DummyList(self):
1661 if self.TYPE == 'threads':
1662 return []
1663 elif self.TYPE == 'manager':
1664 return self.manager.list()
1665 else:
1666 return _DummyList()
1667
1668 def run_threads(self, f, args):
1669 b = Bunch(self, f, args, self.N-1)
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001670 try:
1671 f(*args)
1672 b.wait_for_finished()
1673 finally:
1674 b.close()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001675
1676 @classmethod
1677 def multipass(cls, barrier, results, n):
1678 m = barrier.parties
1679 assert m == cls.N
1680 for i in range(n):
1681 results[0].append(True)
1682 assert len(results[1]) == i * m
1683 barrier.wait()
1684 results[1].append(True)
1685 assert len(results[0]) == (i + 1) * m
1686 barrier.wait()
1687 try:
1688 assert barrier.n_waiting == 0
1689 except NotImplementedError:
1690 pass
1691 assert not barrier.broken
1692
1693 def test_barrier(self, passes=1):
1694 """
1695 Test that a barrier is passed in lockstep
1696 """
1697 results = [self.DummyList(), self.DummyList()]
1698 self.run_threads(self.multipass, (self.barrier, results, passes))
1699
1700 def test_barrier_10(self):
1701 """
1702 Test that a barrier works for 10 consecutive runs
1703 """
1704 return self.test_barrier(10)
1705
1706 @classmethod
1707 def _test_wait_return_f(cls, barrier, queue):
1708 res = barrier.wait()
1709 queue.put(res)
1710
1711 def test_wait_return(self):
1712 """
1713 test the return value from barrier.wait
1714 """
1715 queue = self.Queue()
1716 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1717 results = [queue.get() for i in range(self.N)]
1718 self.assertEqual(results.count(0), 1)
Victor Stinnerb4c52962017-07-25 02:40:55 +02001719 close_queue(queue)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001720
1721 @classmethod
1722 def _test_action_f(cls, barrier, results):
1723 barrier.wait()
1724 if len(results) != 1:
1725 raise RuntimeError
1726
1727 def test_action(self):
1728 """
1729 Test the 'action' callback
1730 """
1731 results = self.DummyList()
1732 barrier = self.Barrier(self.N, action=AppendTrue(results))
1733 self.run_threads(self._test_action_f, (barrier, results))
1734 self.assertEqual(len(results), 1)
1735
1736 @classmethod
1737 def _test_abort_f(cls, barrier, results1, results2):
1738 try:
1739 i = barrier.wait()
1740 if i == cls.N//2:
1741 raise RuntimeError
1742 barrier.wait()
1743 results1.append(True)
1744 except threading.BrokenBarrierError:
1745 results2.append(True)
1746 except RuntimeError:
1747 barrier.abort()
1748
1749 def test_abort(self):
1750 """
1751 Test that an abort will put the barrier in a broken state
1752 """
1753 results1 = self.DummyList()
1754 results2 = self.DummyList()
1755 self.run_threads(self._test_abort_f,
1756 (self.barrier, results1, results2))
1757 self.assertEqual(len(results1), 0)
1758 self.assertEqual(len(results2), self.N-1)
1759 self.assertTrue(self.barrier.broken)
1760
1761 @classmethod
1762 def _test_reset_f(cls, barrier, results1, results2, results3):
1763 i = barrier.wait()
1764 if i == cls.N//2:
1765 # Wait until the other threads are all in the barrier.
1766 while barrier.n_waiting < cls.N-1:
1767 time.sleep(0.001)
1768 barrier.reset()
1769 else:
1770 try:
1771 barrier.wait()
1772 results1.append(True)
1773 except threading.BrokenBarrierError:
1774 results2.append(True)
1775 # Now, pass the barrier again
1776 barrier.wait()
1777 results3.append(True)
1778
1779 def test_reset(self):
1780 """
1781 Test that a 'reset' on a barrier frees the waiting threads
1782 """
1783 results1 = self.DummyList()
1784 results2 = self.DummyList()
1785 results3 = self.DummyList()
1786 self.run_threads(self._test_reset_f,
1787 (self.barrier, results1, results2, results3))
1788 self.assertEqual(len(results1), 0)
1789 self.assertEqual(len(results2), self.N-1)
1790 self.assertEqual(len(results3), self.N)
1791
1792 @classmethod
1793 def _test_abort_and_reset_f(cls, barrier, barrier2,
1794 results1, results2, results3):
1795 try:
1796 i = barrier.wait()
1797 if i == cls.N//2:
1798 raise RuntimeError
1799 barrier.wait()
1800 results1.append(True)
1801 except threading.BrokenBarrierError:
1802 results2.append(True)
1803 except RuntimeError:
1804 barrier.abort()
1805 # Synchronize and reset the barrier. Must synchronize first so
1806 # that everyone has left it when we reset, and after so that no
1807 # one enters it before the reset.
1808 if barrier2.wait() == cls.N//2:
1809 barrier.reset()
1810 barrier2.wait()
1811 barrier.wait()
1812 results3.append(True)
1813
1814 def test_abort_and_reset(self):
1815 """
1816 Test that a barrier can be reset after being broken.
1817 """
1818 results1 = self.DummyList()
1819 results2 = self.DummyList()
1820 results3 = self.DummyList()
1821 barrier2 = self.Barrier(self.N)
1822
1823 self.run_threads(self._test_abort_and_reset_f,
1824 (self.barrier, barrier2, results1, results2, results3))
1825 self.assertEqual(len(results1), 0)
1826 self.assertEqual(len(results2), self.N-1)
1827 self.assertEqual(len(results3), self.N)
1828
1829 @classmethod
1830 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001831 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001832 if i == cls.N//2:
1833 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001834 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001835 try:
1836 barrier.wait(0.5)
1837 except threading.BrokenBarrierError:
1838 results.append(True)
1839
1840 def test_timeout(self):
1841 """
1842 Test wait(timeout)
1843 """
1844 results = self.DummyList()
1845 self.run_threads(self._test_timeout_f, (self.barrier, results))
1846 self.assertEqual(len(results), self.barrier.parties)
1847
1848 @classmethod
1849 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001850 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001851 if i == cls.N//2:
1852 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001853 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001854 try:
1855 barrier.wait()
1856 except threading.BrokenBarrierError:
1857 results.append(True)
1858
1859 def test_default_timeout(self):
1860 """
1861 Test the barrier's default timeout
1862 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001863 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001864 results = self.DummyList()
1865 self.run_threads(self._test_default_timeout_f, (barrier, results))
1866 self.assertEqual(len(results), barrier.parties)
1867
1868 def test_single_thread(self):
1869 b = self.Barrier(1)
1870 b.wait()
1871 b.wait()
1872
1873 @classmethod
1874 def _test_thousand_f(cls, barrier, passes, conn, lock):
1875 for i in range(passes):
1876 barrier.wait()
1877 with lock:
1878 conn.send(i)
1879
1880 def test_thousand(self):
1881 if self.TYPE == 'manager':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001882 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk3730a172012-06-15 18:26:07 +01001883 passes = 1000
1884 lock = self.Lock()
1885 conn, child_conn = self.Pipe(False)
1886 for j in range(self.N):
1887 p = self.Process(target=self._test_thousand_f,
1888 args=(self.barrier, passes, child_conn, lock))
1889 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001890 self.addCleanup(p.join)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001891
1892 for i in range(passes):
1893 for j in range(self.N):
1894 self.assertEqual(conn.recv(), i)
1895
1896#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001897#
1898#
1899
1900class _TestValue(BaseTestCase):
1901
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001902 ALLOWED_TYPES = ('processes',)
1903
Benjamin Petersone711caf2008-06-11 16:44:04 +00001904 codes_values = [
1905 ('i', 4343, 24234),
1906 ('d', 3.625, -4.25),
1907 ('h', -232, 234),
Gareth Rees3913bad2017-07-21 11:35:33 +01001908 ('q', 2 ** 33, 2 ** 34),
Benjamin Petersone711caf2008-06-11 16:44:04 +00001909 ('c', latin('x'), latin('y'))
1910 ]
1911
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001912 def setUp(self):
1913 if not HAS_SHAREDCTYPES:
1914 self.skipTest("requires multiprocessing.sharedctypes")
1915
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001916 @classmethod
1917 def _test(cls, values):
1918 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001919 sv.value = cv[2]
1920
1921
1922 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001923 if raw:
1924 values = [self.RawValue(code, value)
1925 for code, value, _ in self.codes_values]
1926 else:
1927 values = [self.Value(code, value)
1928 for code, value, _ in self.codes_values]
1929
1930 for sv, cv in zip(values, self.codes_values):
1931 self.assertEqual(sv.value, cv[1])
1932
1933 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001934 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001935 proc.start()
1936 proc.join()
1937
1938 for sv, cv in zip(values, self.codes_values):
1939 self.assertEqual(sv.value, cv[2])
1940
1941 def test_rawvalue(self):
1942 self.test_value(raw=True)
1943
1944 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001945 val1 = self.Value('i', 5)
1946 lock1 = val1.get_lock()
1947 obj1 = val1.get_obj()
1948
1949 val2 = self.Value('i', 5, lock=None)
1950 lock2 = val2.get_lock()
1951 obj2 = val2.get_obj()
1952
1953 lock = self.Lock()
1954 val3 = self.Value('i', 5, lock=lock)
1955 lock3 = val3.get_lock()
1956 obj3 = val3.get_obj()
1957 self.assertEqual(lock, lock3)
1958
Jesse Nollerb0516a62009-01-18 03:11:38 +00001959 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001960 self.assertFalse(hasattr(arr4, 'get_lock'))
1961 self.assertFalse(hasattr(arr4, 'get_obj'))
1962
Jesse Nollerb0516a62009-01-18 03:11:38 +00001963 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1964
1965 arr5 = self.RawValue('i', 5)
1966 self.assertFalse(hasattr(arr5, 'get_lock'))
1967 self.assertFalse(hasattr(arr5, 'get_obj'))
1968
Benjamin Petersone711caf2008-06-11 16:44:04 +00001969
1970class _TestArray(BaseTestCase):
1971
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001972 ALLOWED_TYPES = ('processes',)
1973
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001974 @classmethod
1975 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001976 for i in range(1, len(seq)):
1977 seq[i] += seq[i-1]
1978
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001979 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001980 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001981 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1982 if raw:
1983 arr = self.RawArray('i', seq)
1984 else:
1985 arr = self.Array('i', seq)
1986
1987 self.assertEqual(len(arr), len(seq))
1988 self.assertEqual(arr[3], seq[3])
1989 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1990
1991 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1992
1993 self.assertEqual(list(arr[:]), seq)
1994
1995 self.f(seq)
1996
1997 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001998 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001999 p.start()
2000 p.join()
2001
2002 self.assertEqual(list(arr[:]), seq)
2003
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002004 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00002005 def test_array_from_size(self):
2006 size = 10
2007 # Test for zeroing (see issue #11675).
2008 # The repetition below strengthens the test by increasing the chances
2009 # of previously allocated non-zero memory being used for the new array
2010 # on the 2nd and 3rd loops.
2011 for _ in range(3):
2012 arr = self.Array('i', size)
2013 self.assertEqual(len(arr), size)
2014 self.assertEqual(list(arr), [0] * size)
2015 arr[:] = range(10)
2016 self.assertEqual(list(arr), list(range(10)))
2017 del arr
2018
2019 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002020 def test_rawarray(self):
2021 self.test_array(raw=True)
2022
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002023 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002024 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002025 arr1 = self.Array('i', list(range(10)))
2026 lock1 = arr1.get_lock()
2027 obj1 = arr1.get_obj()
2028
2029 arr2 = self.Array('i', list(range(10)), lock=None)
2030 lock2 = arr2.get_lock()
2031 obj2 = arr2.get_obj()
2032
2033 lock = self.Lock()
2034 arr3 = self.Array('i', list(range(10)), lock=lock)
2035 lock3 = arr3.get_lock()
2036 obj3 = arr3.get_obj()
2037 self.assertEqual(lock, lock3)
2038
Jesse Nollerb0516a62009-01-18 03:11:38 +00002039 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002040 self.assertFalse(hasattr(arr4, 'get_lock'))
2041 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00002042 self.assertRaises(AttributeError,
2043 self.Array, 'i', range(10), lock='notalock')
2044
2045 arr5 = self.RawArray('i', range(10))
2046 self.assertFalse(hasattr(arr5, 'get_lock'))
2047 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002048
2049#
2050#
2051#
2052
2053class _TestContainers(BaseTestCase):
2054
2055 ALLOWED_TYPES = ('manager',)
2056
2057 def test_list(self):
2058 a = self.list(list(range(10)))
2059 self.assertEqual(a[:], list(range(10)))
2060
2061 b = self.list()
2062 self.assertEqual(b[:], [])
2063
2064 b.extend(list(range(5)))
2065 self.assertEqual(b[:], list(range(5)))
2066
2067 self.assertEqual(b[2], 2)
2068 self.assertEqual(b[2:10], [2,3,4])
2069
2070 b *= 2
2071 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2072
2073 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2074
2075 self.assertEqual(a[:], list(range(10)))
2076
2077 d = [a, b]
2078 e = self.list(d)
2079 self.assertEqual(
Davin Potts86a76682016-09-07 18:48:01 -05002080 [element[:] for element in e],
Benjamin Petersone711caf2008-06-11 16:44:04 +00002081 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2082 )
2083
2084 f = self.list([a])
2085 a.append('hello')
Davin Potts86a76682016-09-07 18:48:01 -05002086 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2087
Serhiy Storchakae0e50652018-09-17 14:24:01 +03002088 def test_list_iter(self):
2089 a = self.list(list(range(10)))
2090 it = iter(a)
2091 self.assertEqual(list(it), list(range(10)))
2092 self.assertEqual(list(it), []) # exhausted
2093 # list modified during iteration
2094 it = iter(a)
2095 a[0] = 100
2096 self.assertEqual(next(it), 100)
2097
Davin Potts86a76682016-09-07 18:48:01 -05002098 def test_list_proxy_in_list(self):
2099 a = self.list([self.list(range(3)) for _i in range(3)])
2100 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2101
2102 a[0][-1] = 55
2103 self.assertEqual(a[0][:], [0, 1, 55])
2104 for i in range(1, 3):
2105 self.assertEqual(a[i][:], [0, 1, 2])
2106
2107 self.assertEqual(a[1].pop(), 2)
2108 self.assertEqual(len(a[1]), 2)
2109 for i in range(0, 3, 2):
2110 self.assertEqual(len(a[i]), 3)
2111
2112 del a
2113
2114 b = self.list()
2115 b.append(b)
2116 del b
Benjamin Petersone711caf2008-06-11 16:44:04 +00002117
2118 def test_dict(self):
2119 d = self.dict()
2120 indices = list(range(65, 70))
2121 for i in indices:
2122 d[i] = chr(i)
2123 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2124 self.assertEqual(sorted(d.keys()), indices)
2125 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2126 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2127
Serhiy Storchakae0e50652018-09-17 14:24:01 +03002128 def test_dict_iter(self):
2129 d = self.dict()
2130 indices = list(range(65, 70))
2131 for i in indices:
2132 d[i] = chr(i)
2133 it = iter(d)
2134 self.assertEqual(list(it), indices)
2135 self.assertEqual(list(it), []) # exhausted
2136 # dictionary changed size during iteration
2137 it = iter(d)
2138 d.clear()
2139 self.assertRaises(RuntimeError, next, it)
2140
Davin Potts86a76682016-09-07 18:48:01 -05002141 def test_dict_proxy_nested(self):
2142 pets = self.dict(ferrets=2, hamsters=4)
2143 supplies = self.dict(water=10, feed=3)
2144 d = self.dict(pets=pets, supplies=supplies)
2145
2146 self.assertEqual(supplies['water'], 10)
2147 self.assertEqual(d['supplies']['water'], 10)
2148
2149 d['supplies']['blankets'] = 5
2150 self.assertEqual(supplies['blankets'], 5)
2151 self.assertEqual(d['supplies']['blankets'], 5)
2152
2153 d['supplies']['water'] = 7
2154 self.assertEqual(supplies['water'], 7)
2155 self.assertEqual(d['supplies']['water'], 7)
2156
2157 del pets
2158 del supplies
2159 self.assertEqual(d['pets']['ferrets'], 2)
2160 d['supplies']['blankets'] = 11
2161 self.assertEqual(d['supplies']['blankets'], 11)
2162
2163 pets = d['pets']
2164 supplies = d['supplies']
2165 supplies['water'] = 7
2166 self.assertEqual(supplies['water'], 7)
2167 self.assertEqual(d['supplies']['water'], 7)
2168
2169 d.clear()
2170 self.assertEqual(len(d), 0)
2171 self.assertEqual(supplies['water'], 7)
2172 self.assertEqual(pets['hamsters'], 4)
2173
2174 l = self.list([pets, supplies])
2175 l[0]['marmots'] = 1
2176 self.assertEqual(pets['marmots'], 1)
2177 self.assertEqual(l[0]['marmots'], 1)
2178
2179 del pets
2180 del supplies
2181 self.assertEqual(l[0]['marmots'], 1)
2182
2183 outer = self.list([[88, 99], l])
2184 self.assertIsInstance(outer[0], list) # Not a ListProxy
2185 self.assertEqual(outer[-1][-1]['feed'], 3)
2186
Benjamin Petersone711caf2008-06-11 16:44:04 +00002187 def test_namespace(self):
2188 n = self.Namespace()
2189 n.name = 'Bob'
2190 n.job = 'Builder'
2191 n._hidden = 'hidden'
2192 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2193 del n.job
2194 self.assertEqual(str(n), "Namespace(name='Bob')")
2195 self.assertTrue(hasattr(n, 'name'))
2196 self.assertTrue(not hasattr(n, 'job'))
2197
2198#
2199#
2200#
2201
2202def sqr(x, wait=0.0):
2203 time.sleep(wait)
2204 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00002205
Antoine Pitroude911b22011-12-21 11:03:24 +01002206def mul(x, y):
2207 return x*y
2208
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002209def raise_large_valuerror(wait):
2210 time.sleep(wait)
2211 raise ValueError("x" * 1024**2)
2212
Antoine Pitrou89889452017-03-24 13:52:11 +01002213def identity(x):
2214 return x
2215
2216class CountedObject(object):
2217 n_instances = 0
2218
2219 def __new__(cls):
2220 cls.n_instances += 1
2221 return object.__new__(cls)
2222
2223 def __del__(self):
2224 type(self).n_instances -= 1
2225
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002226class SayWhenError(ValueError): pass
2227
2228def exception_throwing_generator(total, when):
Xiang Zhang794623b2017-03-29 11:58:54 +08002229 if when == -1:
2230 raise SayWhenError("Somebody said when")
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002231 for i in range(total):
2232 if i == when:
2233 raise SayWhenError("Somebody said when")
2234 yield i
2235
Antoine Pitrou89889452017-03-24 13:52:11 +01002236
Benjamin Petersone711caf2008-06-11 16:44:04 +00002237class _TestPool(BaseTestCase):
2238
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002239 @classmethod
2240 def setUpClass(cls):
2241 super().setUpClass()
2242 cls.pool = cls.Pool(4)
2243
2244 @classmethod
2245 def tearDownClass(cls):
2246 cls.pool.terminate()
2247 cls.pool.join()
2248 cls.pool = None
2249 super().tearDownClass()
2250
Benjamin Petersone711caf2008-06-11 16:44:04 +00002251 def test_apply(self):
2252 papply = self.pool.apply
2253 self.assertEqual(papply(sqr, (5,)), sqr(5))
2254 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2255
2256 def test_map(self):
2257 pmap = self.pool.map
2258 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2259 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2260 list(map(sqr, list(range(100)))))
2261
Antoine Pitroude911b22011-12-21 11:03:24 +01002262 def test_starmap(self):
2263 psmap = self.pool.starmap
2264 tuples = list(zip(range(10), range(9,-1, -1)))
2265 self.assertEqual(psmap(mul, tuples),
2266 list(itertools.starmap(mul, tuples)))
2267 tuples = list(zip(range(100), range(99,-1, -1)))
2268 self.assertEqual(psmap(mul, tuples, chunksize=20),
2269 list(itertools.starmap(mul, tuples)))
2270
2271 def test_starmap_async(self):
2272 tuples = list(zip(range(100), range(99,-1, -1)))
2273 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2274 list(itertools.starmap(mul, tuples)))
2275
Hynek Schlawack254af262012-10-27 12:53:02 +02002276 def test_map_async(self):
2277 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2278 list(map(sqr, list(range(10)))))
2279
2280 def test_map_async_callbacks(self):
2281 call_args = self.manager.list() if self.TYPE == 'manager' else []
2282 self.pool.map_async(int, ['1'],
2283 callback=call_args.append,
2284 error_callback=call_args.append).wait()
2285 self.assertEqual(1, len(call_args))
2286 self.assertEqual([1], call_args[0])
2287 self.pool.map_async(int, ['a'],
2288 callback=call_args.append,
2289 error_callback=call_args.append).wait()
2290 self.assertEqual(2, len(call_args))
2291 self.assertIsInstance(call_args[1], ValueError)
2292
Richard Oudkerke90cedb2013-10-28 23:11:58 +00002293 def test_map_unplicklable(self):
2294 # Issue #19425 -- failure to pickle should not cause a hang
2295 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002296 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerke90cedb2013-10-28 23:11:58 +00002297 class A(object):
2298 def __reduce__(self):
2299 raise RuntimeError('cannot pickle')
2300 with self.assertRaises(RuntimeError):
2301 self.pool.map(sqr, [A()]*10)
2302
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00002303 def test_map_chunksize(self):
2304 try:
2305 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2306 except multiprocessing.TimeoutError:
2307 self.fail("pool.map_async with chunksize stalled on null list")
2308
Xiang Zhang794623b2017-03-29 11:58:54 +08002309 def test_map_handle_iterable_exception(self):
2310 if self.TYPE == 'manager':
2311 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2312
2313 # SayWhenError seen at the very first of the iterable
2314 with self.assertRaises(SayWhenError):
2315 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2316 # again, make sure it's reentrant
2317 with self.assertRaises(SayWhenError):
2318 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2319
2320 with self.assertRaises(SayWhenError):
2321 self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2322
2323 class SpecialIterable:
2324 def __iter__(self):
2325 return self
2326 def __next__(self):
2327 raise SayWhenError
2328 def __len__(self):
2329 return 1
2330 with self.assertRaises(SayWhenError):
2331 self.pool.map(sqr, SpecialIterable(), 1)
2332 with self.assertRaises(SayWhenError):
2333 self.pool.map(sqr, SpecialIterable(), 1)
2334
Benjamin Petersone711caf2008-06-11 16:44:04 +00002335 def test_async(self):
2336 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2337 get = TimingWrapper(res.get)
2338 self.assertEqual(get(), 49)
2339 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2340
2341 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00002342 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002343 get = TimingWrapper(res.get)
2344 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2345 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2346
2347 def test_imap(self):
2348 it = self.pool.imap(sqr, list(range(10)))
2349 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2350
2351 it = self.pool.imap(sqr, list(range(10)))
2352 for i in range(10):
2353 self.assertEqual(next(it), i*i)
2354 self.assertRaises(StopIteration, it.__next__)
2355
2356 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2357 for i in range(1000):
2358 self.assertEqual(next(it), i*i)
2359 self.assertRaises(StopIteration, it.__next__)
2360
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002361 def test_imap_handle_iterable_exception(self):
2362 if self.TYPE == 'manager':
2363 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2364
Xiang Zhang794623b2017-03-29 11:58:54 +08002365 # SayWhenError seen at the very first of the iterable
2366 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2367 self.assertRaises(SayWhenError, it.__next__)
2368 # again, make sure it's reentrant
2369 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2370 self.assertRaises(SayWhenError, it.__next__)
2371
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002372 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2373 for i in range(3):
2374 self.assertEqual(next(it), i*i)
2375 self.assertRaises(SayWhenError, it.__next__)
2376
2377 # SayWhenError seen at start of problematic chunk's results
2378 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2379 for i in range(6):
2380 self.assertEqual(next(it), i*i)
2381 self.assertRaises(SayWhenError, it.__next__)
2382 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2383 for i in range(4):
2384 self.assertEqual(next(it), i*i)
2385 self.assertRaises(SayWhenError, it.__next__)
2386
Benjamin Petersone711caf2008-06-11 16:44:04 +00002387 def test_imap_unordered(self):
Victor Stinner23401fb2018-07-03 13:20:35 +02002388 it = self.pool.imap_unordered(sqr, list(range(10)))
2389 self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002390
Victor Stinner23401fb2018-07-03 13:20:35 +02002391 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002392 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2393
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002394 def test_imap_unordered_handle_iterable_exception(self):
2395 if self.TYPE == 'manager':
2396 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2397
Xiang Zhang794623b2017-03-29 11:58:54 +08002398 # SayWhenError seen at the very first of the iterable
2399 it = self.pool.imap_unordered(sqr,
2400 exception_throwing_generator(1, -1),
2401 1)
2402 self.assertRaises(SayWhenError, it.__next__)
2403 # again, make sure it's reentrant
2404 it = self.pool.imap_unordered(sqr,
2405 exception_throwing_generator(1, -1),
2406 1)
2407 self.assertRaises(SayWhenError, it.__next__)
2408
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002409 it = self.pool.imap_unordered(sqr,
2410 exception_throwing_generator(10, 3),
2411 1)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002412 expected_values = list(map(sqr, list(range(10))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002413 with self.assertRaises(SayWhenError):
2414 # imap_unordered makes it difficult to anticipate the SayWhenError
2415 for i in range(10):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002416 value = next(it)
2417 self.assertIn(value, expected_values)
2418 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002419
2420 it = self.pool.imap_unordered(sqr,
2421 exception_throwing_generator(20, 7),
2422 2)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002423 expected_values = list(map(sqr, list(range(20))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002424 with self.assertRaises(SayWhenError):
2425 for i in range(20):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002426 value = next(it)
2427 self.assertIn(value, expected_values)
2428 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002429
Benjamin Petersone711caf2008-06-11 16:44:04 +00002430 def test_make_pool(self):
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002431 expected_error = (RemoteError if self.TYPE == 'manager'
2432 else ValueError)
Victor Stinner2fae27b2011-06-20 17:53:35 +02002433
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002434 self.assertRaises(expected_error, self.Pool, -1)
2435 self.assertRaises(expected_error, self.Pool, 0)
2436
2437 if self.TYPE != 'manager':
2438 p = self.Pool(3)
2439 try:
2440 self.assertEqual(3, len(p._pool))
2441 finally:
2442 p.close()
2443 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002444
2445 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002446 result = self.pool.map_async(
2447 time.sleep, [0.1 for i in range(10000)], chunksize=1
2448 )
2449 self.pool.terminate()
2450 join = TimingWrapper(self.pool.join)
2451 join()
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002452 # Sanity check the pool didn't wait for all tasks to finish
2453 self.assertLess(join.elapsed, 2.0)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002454
Richard Oudkerke41682b2012-06-06 19:04:57 +01002455 def test_empty_iterable(self):
2456 # See Issue 12157
2457 p = self.Pool(1)
2458
2459 self.assertEqual(p.map(sqr, []), [])
2460 self.assertEqual(list(p.imap(sqr, [])), [])
2461 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2462 self.assertEqual(p.map_async(sqr, []).get(), [])
2463
2464 p.close()
2465 p.join()
2466
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002467 def test_context(self):
2468 if self.TYPE == 'processes':
2469 L = list(range(10))
2470 expected = [sqr(i) for i in L]
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002471 with self.Pool(2) as p:
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002472 r = p.map_async(sqr, L)
2473 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04002474 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002475
Richard Oudkerk85757832013-05-06 11:38:25 +01002476 @classmethod
2477 def _test_traceback(cls):
2478 raise RuntimeError(123) # some comment
2479
2480 def test_traceback(self):
2481 # We want ensure that the traceback from the child process is
2482 # contained in the traceback raised in the main process.
2483 if self.TYPE == 'processes':
2484 with self.Pool(1) as p:
2485 try:
2486 p.apply(self._test_traceback)
2487 except Exception as e:
2488 exc = e
2489 else:
Xiang Zhang794623b2017-03-29 11:58:54 +08002490 self.fail('expected RuntimeError')
Richard Oudkerk85757832013-05-06 11:38:25 +01002491 self.assertIs(type(exc), RuntimeError)
2492 self.assertEqual(exc.args, (123,))
2493 cause = exc.__cause__
2494 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2495 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2496
2497 with test.support.captured_stderr() as f1:
2498 try:
2499 raise exc
2500 except RuntimeError:
2501 sys.excepthook(*sys.exc_info())
2502 self.assertIn('raise RuntimeError(123) # some comment',
2503 f1.getvalue())
Xiang Zhang794623b2017-03-29 11:58:54 +08002504 # _helper_reraises_exception should not make the error
2505 # a remote exception
2506 with self.Pool(1) as p:
2507 try:
2508 p.map(sqr, exception_throwing_generator(1, -1), 1)
2509 except Exception as e:
2510 exc = e
2511 else:
2512 self.fail('expected SayWhenError')
2513 self.assertIs(type(exc), SayWhenError)
2514 self.assertIs(exc.__cause__, None)
Richard Oudkerk85757832013-05-06 11:38:25 +01002515
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002516 @classmethod
2517 def _test_wrapped_exception(cls):
2518 raise RuntimeError('foo')
2519
2520 def test_wrapped_exception(self):
2521 # Issue #20980: Should not wrap exception when using thread pool
2522 with self.Pool(1) as p:
2523 with self.assertRaises(RuntimeError):
2524 p.apply(self._test_wrapped_exception)
Victor Stinnerb7278732018-11-28 01:14:31 +01002525 p.join()
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002526
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002527 def test_map_no_failfast(self):
2528 # Issue #23992: the fail-fast behaviour when an exception is raised
2529 # during map() would make Pool.join() deadlock, because a worker
2530 # process would fill the result queue (after the result handler thread
2531 # terminated, hence not draining it anymore).
2532
2533 t_start = time.time()
2534
2535 with self.assertRaises(ValueError):
2536 with self.Pool(2) as p:
2537 try:
2538 p.map(raise_large_valuerror, [0, 1])
2539 finally:
2540 time.sleep(0.5)
2541 p.close()
2542 p.join()
2543
2544 # check that we indeed waited for all jobs
2545 self.assertGreater(time.time() - t_start, 0.9)
2546
Antoine Pitrou89889452017-03-24 13:52:11 +01002547 def test_release_task_refs(self):
2548 # Issue #29861: task arguments and results should not be kept
2549 # alive after we are done with them.
2550 objs = [CountedObject() for i in range(10)]
2551 refs = [weakref.ref(o) for o in objs]
2552 self.pool.map(identity, objs)
2553
2554 del objs
Antoine Pitrou685cdb92017-04-14 13:10:00 +02002555 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou89889452017-03-24 13:52:11 +01002556 self.assertEqual(set(wr() for wr in refs), {None})
2557 # With a process pool, copies of the objects are returned, check
2558 # they were released too.
2559 self.assertEqual(CountedObject.n_instances, 0)
2560
Victor Stinnerb7278732018-11-28 01:14:31 +01002561 @support.reap_threads
tzickel97bfe8d2018-10-03 00:01:23 +03002562 def test_del_pool(self):
2563 p = self.Pool(1)
2564 wr = weakref.ref(p)
2565 del p
2566 gc.collect()
2567 self.assertIsNone(wr())
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002568
Ask Solem2afcbf22010-11-09 20:55:52 +00002569def raising():
2570 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00002571
Ask Solem2afcbf22010-11-09 20:55:52 +00002572def unpickleable_result():
2573 return lambda: 42
2574
2575class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00002576 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00002577
2578 def test_async_error_callback(self):
2579 p = multiprocessing.Pool(2)
2580
2581 scratchpad = [None]
2582 def errback(exc):
2583 scratchpad[0] = exc
2584
2585 res = p.apply_async(raising, error_callback=errback)
2586 self.assertRaises(KeyError, res.get)
2587 self.assertTrue(scratchpad[0])
2588 self.assertIsInstance(scratchpad[0], KeyError)
2589
2590 p.close()
2591 p.join()
2592
2593 def test_unpickleable_result(self):
2594 from multiprocessing.pool import MaybeEncodingError
2595 p = multiprocessing.Pool(2)
2596
2597 # Make sure we don't lose pool processes because of encoding errors.
2598 for iteration in range(20):
2599
2600 scratchpad = [None]
2601 def errback(exc):
2602 scratchpad[0] = exc
2603
2604 res = p.apply_async(unpickleable_result, error_callback=errback)
2605 self.assertRaises(MaybeEncodingError, res.get)
2606 wrapped = scratchpad[0]
2607 self.assertTrue(wrapped)
2608 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2609 self.assertIsNotNone(wrapped.exc)
2610 self.assertIsNotNone(wrapped.value)
2611
2612 p.close()
2613 p.join()
2614
2615class _TestPoolWorkerLifetime(BaseTestCase):
2616 ALLOWED_TYPES = ('processes', )
2617
Jesse Noller1f0b6582010-01-27 03:36:01 +00002618 def test_pool_worker_lifetime(self):
2619 p = multiprocessing.Pool(3, maxtasksperchild=10)
2620 self.assertEqual(3, len(p._pool))
2621 origworkerpids = [w.pid for w in p._pool]
2622 # Run many tasks so each worker gets replaced (hopefully)
2623 results = []
2624 for i in range(100):
2625 results.append(p.apply_async(sqr, (i, )))
2626 # Fetch the results and verify we got the right answers,
2627 # also ensuring all the tasks have completed.
2628 for (j, res) in enumerate(results):
2629 self.assertEqual(res.get(), sqr(j))
2630 # Refill the pool
2631 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00002632 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02002633 # (countdown * DELTA = 5 seconds max startup process time)
2634 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00002635 while countdown and not all(w.is_alive() for w in p._pool):
2636 countdown -= 1
2637 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002638 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00002639 # All pids should be assigned. See issue #7805.
2640 self.assertNotIn(None, origworkerpids)
2641 self.assertNotIn(None, finalworkerpids)
2642 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00002643 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2644 p.close()
2645 p.join()
2646
Charles-François Natalif8859e12011-10-24 18:45:29 +02002647 def test_pool_worker_lifetime_early_close(self):
2648 # Issue #10332: closing a pool whose workers have limited lifetimes
2649 # before all the tasks completed would make join() hang.
2650 p = multiprocessing.Pool(3, maxtasksperchild=1)
2651 results = []
2652 for i in range(6):
2653 results.append(p.apply_async(sqr, (i, 0.3)))
2654 p.close()
2655 p.join()
2656 # check the results
2657 for (j, res) in enumerate(results):
2658 self.assertEqual(res.get(), sqr(j))
2659
Benjamin Petersone711caf2008-06-11 16:44:04 +00002660#
2661# Test of creating a customized manager class
2662#
2663
2664from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2665
2666class FooBar(object):
2667 def f(self):
2668 return 'f()'
2669 def g(self):
2670 raise ValueError
2671 def _h(self):
2672 return '_h()'
2673
2674def baz():
2675 for i in range(10):
2676 yield i*i
2677
2678class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00002679 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002680 def __iter__(self):
2681 return self
2682 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002683 return self._callmethod('__next__')
2684
2685class MyManager(BaseManager):
2686 pass
2687
2688MyManager.register('Foo', callable=FooBar)
2689MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2690MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2691
2692
2693class _TestMyManager(BaseTestCase):
2694
2695 ALLOWED_TYPES = ('manager',)
2696
2697 def test_mymanager(self):
2698 manager = MyManager()
2699 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01002700 self.common(manager)
2701 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002702
Richard Oudkerkac385712012-06-18 21:29:30 +01002703 # If the manager process exited cleanly then the exitcode
2704 # will be zero. Otherwise (after a short timeout)
2705 # terminate() is used, resulting in an exitcode of -SIGTERM.
2706 self.assertEqual(manager._process.exitcode, 0)
2707
2708 def test_mymanager_context(self):
2709 with MyManager() as manager:
2710 self.common(manager)
Victor Stinnerfbd71722018-06-27 18:18:10 +02002711 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2712 # to the manager process if it takes longer than 1 second to stop.
2713 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
Richard Oudkerkac385712012-06-18 21:29:30 +01002714
2715 def test_mymanager_context_prestarted(self):
2716 manager = MyManager()
2717 manager.start()
2718 with manager:
2719 self.common(manager)
2720 self.assertEqual(manager._process.exitcode, 0)
2721
2722 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002723 foo = manager.Foo()
2724 bar = manager.Bar()
2725 baz = manager.baz()
2726
2727 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2728 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2729
2730 self.assertEqual(foo_methods, ['f', 'g'])
2731 self.assertEqual(bar_methods, ['f', '_h'])
2732
2733 self.assertEqual(foo.f(), 'f()')
2734 self.assertRaises(ValueError, foo.g)
2735 self.assertEqual(foo._callmethod('f'), 'f()')
2736 self.assertRaises(RemoteError, foo._callmethod, '_h')
2737
2738 self.assertEqual(bar.f(), 'f()')
2739 self.assertEqual(bar._h(), '_h()')
2740 self.assertEqual(bar._callmethod('f'), 'f()')
2741 self.assertEqual(bar._callmethod('_h'), '_h()')
2742
2743 self.assertEqual(list(baz), [i*i for i in range(10)])
2744
Richard Oudkerk73d9a292012-06-14 15:30:10 +01002745
Benjamin Petersone711caf2008-06-11 16:44:04 +00002746#
2747# Test of connecting to a remote server and using xmlrpclib for serialization
2748#
2749
2750_queue = pyqueue.Queue()
2751def get_queue():
2752 return _queue
2753
2754class QueueManager(BaseManager):
2755 '''manager class used by server process'''
2756QueueManager.register('get_queue', callable=get_queue)
2757
2758class QueueManager2(BaseManager):
2759 '''manager class which specifies the same interface as QueueManager'''
2760QueueManager2.register('get_queue')
2761
2762
2763SERIALIZER = 'xmlrpclib'
2764
2765class _TestRemoteManager(BaseTestCase):
2766
2767 ALLOWED_TYPES = ('manager',)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002768 values = ['hello world', None, True, 2.25,
2769 'hall\xe5 v\xe4rlden',
2770 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2771 b'hall\xe5 v\xe4rlden',
2772 ]
2773 result = values[:]
Benjamin Petersone711caf2008-06-11 16:44:04 +00002774
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002775 @classmethod
2776 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002777 manager = QueueManager2(
2778 address=address, authkey=authkey, serializer=SERIALIZER
2779 )
2780 manager.connect()
2781 queue = manager.get_queue()
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002782 # Note that xmlrpclib will deserialize object as a list not a tuple
2783 queue.put(tuple(cls.values))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002784
2785 def test_remote(self):
2786 authkey = os.urandom(32)
2787
2788 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002789 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00002790 )
2791 manager.start()
2792
2793 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002794 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002795 p.start()
2796
2797 manager2 = QueueManager2(
2798 address=manager.address, authkey=authkey, serializer=SERIALIZER
2799 )
2800 manager2.connect()
2801 queue = manager2.get_queue()
2802
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002803 self.assertEqual(queue.get(), self.result)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002804
2805 # Because we are using xmlrpclib for serialization instead of
2806 # pickle this will cause a serialization error.
2807 self.assertRaises(Exception, queue.put, time.sleep)
2808
2809 # Make queue finalizer run before the server is stopped
2810 del queue
2811 manager.shutdown()
2812
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002813class _TestManagerRestart(BaseTestCase):
2814
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002815 @classmethod
2816 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002817 manager = QueueManager(
2818 address=address, authkey=authkey, serializer=SERIALIZER)
2819 manager.connect()
2820 queue = manager.get_queue()
2821 queue.put('hello world')
2822
2823 def test_rapid_restart(self):
2824 authkey = os.urandom(32)
2825 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002826 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002827 srvr = manager.get_server()
2828 addr = srvr.address
2829 # Close the connection.Listener socket which gets opened as a part
2830 # of manager.get_server(). It's not needed for the test.
2831 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002832 manager.start()
2833
2834 p = self.Process(target=self._putter, args=(manager.address, authkey))
2835 p.start()
Victor Stinner17657bb2017-08-16 12:46:04 +02002836 p.join()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002837 queue = manager.get_queue()
2838 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002839 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002840 manager.shutdown()
Victor Stinner17657bb2017-08-16 12:46:04 +02002841
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002842 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002843 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002844 try:
2845 manager.start()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002846 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002847 if e.errno != errno.EADDRINUSE:
2848 raise
2849 # Retry after some time, in case the old socket was lingering
2850 # (sporadic failure on buildbots)
2851 time.sleep(1.0)
2852 manager = QueueManager(
2853 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002854 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002855
Benjamin Petersone711caf2008-06-11 16:44:04 +00002856#
2857#
2858#
2859
2860SENTINEL = latin('')
2861
2862class _TestConnection(BaseTestCase):
2863
2864 ALLOWED_TYPES = ('processes', 'threads')
2865
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002866 @classmethod
2867 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002868 for msg in iter(conn.recv_bytes, SENTINEL):
2869 conn.send_bytes(msg)
2870 conn.close()
2871
2872 def test_connection(self):
2873 conn, child_conn = self.Pipe()
2874
2875 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002876 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002877 p.start()
2878
2879 seq = [1, 2.25, None]
2880 msg = latin('hello world')
2881 longmsg = msg * 10
2882 arr = array.array('i', list(range(4)))
2883
2884 if self.TYPE == 'processes':
2885 self.assertEqual(type(conn.fileno()), int)
2886
2887 self.assertEqual(conn.send(seq), None)
2888 self.assertEqual(conn.recv(), seq)
2889
2890 self.assertEqual(conn.send_bytes(msg), None)
2891 self.assertEqual(conn.recv_bytes(), msg)
2892
2893 if self.TYPE == 'processes':
2894 buffer = array.array('i', [0]*10)
2895 expected = list(arr) + [0] * (10 - len(arr))
2896 self.assertEqual(conn.send_bytes(arr), None)
2897 self.assertEqual(conn.recv_bytes_into(buffer),
2898 len(arr) * buffer.itemsize)
2899 self.assertEqual(list(buffer), expected)
2900
2901 buffer = array.array('i', [0]*10)
2902 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2903 self.assertEqual(conn.send_bytes(arr), None)
2904 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2905 len(arr) * buffer.itemsize)
2906 self.assertEqual(list(buffer), expected)
2907
2908 buffer = bytearray(latin(' ' * 40))
2909 self.assertEqual(conn.send_bytes(longmsg), None)
2910 try:
2911 res = conn.recv_bytes_into(buffer)
2912 except multiprocessing.BufferTooShort as e:
2913 self.assertEqual(e.args, (longmsg,))
2914 else:
2915 self.fail('expected BufferTooShort, got %s' % res)
2916
2917 poll = TimingWrapper(conn.poll)
2918
2919 self.assertEqual(poll(), False)
2920 self.assertTimingAlmostEqual(poll.elapsed, 0)
2921
Richard Oudkerk59d54042012-05-10 16:11:12 +01002922 self.assertEqual(poll(-1), False)
2923 self.assertTimingAlmostEqual(poll.elapsed, 0)
2924
Benjamin Petersone711caf2008-06-11 16:44:04 +00002925 self.assertEqual(poll(TIMEOUT1), False)
2926 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2927
2928 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002929 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002930
2931 self.assertEqual(poll(TIMEOUT1), True)
2932 self.assertTimingAlmostEqual(poll.elapsed, 0)
2933
2934 self.assertEqual(conn.recv(), None)
2935
2936 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2937 conn.send_bytes(really_big_msg)
2938 self.assertEqual(conn.recv_bytes(), really_big_msg)
2939
2940 conn.send_bytes(SENTINEL) # tell child to quit
2941 child_conn.close()
2942
2943 if self.TYPE == 'processes':
2944 self.assertEqual(conn.readable, True)
2945 self.assertEqual(conn.writable, True)
2946 self.assertRaises(EOFError, conn.recv)
2947 self.assertRaises(EOFError, conn.recv_bytes)
2948
2949 p.join()
2950
2951 def test_duplex_false(self):
2952 reader, writer = self.Pipe(duplex=False)
2953 self.assertEqual(writer.send(1), None)
2954 self.assertEqual(reader.recv(), 1)
2955 if self.TYPE == 'processes':
2956 self.assertEqual(reader.readable, True)
2957 self.assertEqual(reader.writable, False)
2958 self.assertEqual(writer.readable, False)
2959 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002960 self.assertRaises(OSError, reader.send, 2)
2961 self.assertRaises(OSError, writer.recv)
2962 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002963
2964 def test_spawn_close(self):
2965 # We test that a pipe connection can be closed by parent
2966 # process immediately after child is spawned. On Windows this
2967 # would have sometimes failed on old versions because
2968 # child_conn would be closed before the child got a chance to
2969 # duplicate it.
2970 conn, child_conn = self.Pipe()
2971
2972 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002973 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002974 p.start()
2975 child_conn.close() # this might complete before child initializes
2976
2977 msg = latin('hello')
2978 conn.send_bytes(msg)
2979 self.assertEqual(conn.recv_bytes(), msg)
2980
2981 conn.send_bytes(SENTINEL)
2982 conn.close()
2983 p.join()
2984
2985 def test_sendbytes(self):
2986 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002987 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002988
2989 msg = latin('abcdefghijklmnopqrstuvwxyz')
2990 a, b = self.Pipe()
2991
2992 a.send_bytes(msg)
2993 self.assertEqual(b.recv_bytes(), msg)
2994
2995 a.send_bytes(msg, 5)
2996 self.assertEqual(b.recv_bytes(), msg[5:])
2997
2998 a.send_bytes(msg, 7, 8)
2999 self.assertEqual(b.recv_bytes(), msg[7:7+8])
3000
3001 a.send_bytes(msg, 26)
3002 self.assertEqual(b.recv_bytes(), latin(''))
3003
3004 a.send_bytes(msg, 26, 0)
3005 self.assertEqual(b.recv_bytes(), latin(''))
3006
3007 self.assertRaises(ValueError, a.send_bytes, msg, 27)
3008
3009 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3010
3011 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3012
3013 self.assertRaises(ValueError, a.send_bytes, msg, -1)
3014
3015 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3016
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003017 @classmethod
3018 def _is_fd_assigned(cls, fd):
3019 try:
3020 os.fstat(fd)
3021 except OSError as e:
3022 if e.errno == errno.EBADF:
3023 return False
3024 raise
3025 else:
3026 return True
3027
3028 @classmethod
3029 def _writefd(cls, conn, data, create_dummy_fds=False):
3030 if create_dummy_fds:
3031 for i in range(0, 256):
3032 if not cls._is_fd_assigned(i):
3033 os.dup2(conn.fileno(), i)
3034 fd = reduction.recv_handle(conn)
3035 if msvcrt:
3036 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3037 os.write(fd, data)
3038 os.close(fd)
3039
Charles-François Natalibc8f0822011-09-20 20:36:51 +02003040 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003041 def test_fd_transfer(self):
3042 if self.TYPE != 'processes':
3043 self.skipTest("only makes sense with processes")
3044 conn, child_conn = self.Pipe(duplex=True)
3045
3046 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02003047 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003048 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02003049 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003050 with open(test.support.TESTFN, "wb") as f:
3051 fd = f.fileno()
3052 if msvcrt:
3053 fd = msvcrt.get_osfhandle(fd)
3054 reduction.send_handle(conn, fd, p.pid)
3055 p.join()
3056 with open(test.support.TESTFN, "rb") as f:
3057 self.assertEqual(f.read(), b"foo")
3058
Charles-François Natalibc8f0822011-09-20 20:36:51 +02003059 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003060 @unittest.skipIf(sys.platform == "win32",
3061 "test semantics don't make sense on Windows")
3062 @unittest.skipIf(MAXFD <= 256,
3063 "largest assignable fd number is too small")
3064 @unittest.skipUnless(hasattr(os, "dup2"),
3065 "test needs os.dup2()")
3066 def test_large_fd_transfer(self):
3067 # With fd > 256 (issue #11657)
3068 if self.TYPE != 'processes':
3069 self.skipTest("only makes sense with processes")
3070 conn, child_conn = self.Pipe(duplex=True)
3071
3072 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02003073 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003074 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02003075 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003076 with open(test.support.TESTFN, "wb") as f:
3077 fd = f.fileno()
3078 for newfd in range(256, MAXFD):
3079 if not self._is_fd_assigned(newfd):
3080 break
3081 else:
3082 self.fail("could not find an unassigned large file descriptor")
3083 os.dup2(fd, newfd)
3084 try:
3085 reduction.send_handle(conn, newfd, p.pid)
3086 finally:
3087 os.close(newfd)
3088 p.join()
3089 with open(test.support.TESTFN, "rb") as f:
3090 self.assertEqual(f.read(), b"bar")
3091
Jesus Cea4507e642011-09-21 03:53:25 +02003092 @classmethod
3093 def _send_data_without_fd(self, conn):
3094 os.write(conn.fileno(), b"\0")
3095
Charles-François Natalie51c8da2011-09-21 18:48:21 +02003096 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02003097 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3098 def test_missing_fd_transfer(self):
3099 # Check that exception is raised when received data is not
3100 # accompanied by a file descriptor in ancillary data.
3101 if self.TYPE != 'processes':
3102 self.skipTest("only makes sense with processes")
3103 conn, child_conn = self.Pipe(duplex=True)
3104
3105 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3106 p.daemon = True
3107 p.start()
3108 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3109 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003110
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003111 def test_context(self):
3112 a, b = self.Pipe()
3113
3114 with a, b:
3115 a.send(1729)
3116 self.assertEqual(b.recv(), 1729)
3117 if self.TYPE == 'processes':
3118 self.assertFalse(a.closed)
3119 self.assertFalse(b.closed)
3120
3121 if self.TYPE == 'processes':
3122 self.assertTrue(a.closed)
3123 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003124 self.assertRaises(OSError, a.recv)
3125 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003126
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003127class _TestListener(BaseTestCase):
3128
Richard Oudkerk91257752012-06-15 21:53:34 +01003129 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003130
3131 def test_multiple_bind(self):
3132 for family in self.connection.families:
3133 l = self.connection.Listener(family=family)
3134 self.addCleanup(l.close)
3135 self.assertRaises(OSError, self.connection.Listener,
3136 l.address, family)
3137
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003138 def test_context(self):
3139 with self.connection.Listener() as l:
3140 with self.connection.Client(l.address) as c:
3141 with l.accept() as d:
3142 c.send(1729)
3143 self.assertEqual(d.recv(), 1729)
3144
3145 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003146 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003147
Benjamin Petersone711caf2008-06-11 16:44:04 +00003148class _TestListenerClient(BaseTestCase):
3149
3150 ALLOWED_TYPES = ('processes', 'threads')
3151
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003152 @classmethod
3153 def _test(cls, address):
3154 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003155 conn.send('hello')
3156 conn.close()
3157
3158 def test_listener_client(self):
3159 for family in self.connection.families:
3160 l = self.connection.Listener(family=family)
3161 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00003162 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003163 p.start()
3164 conn = l.accept()
3165 self.assertEqual(conn.recv(), 'hello')
3166 p.join()
3167 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003168
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01003169 def test_issue14725(self):
3170 l = self.connection.Listener()
3171 p = self.Process(target=self._test, args=(l.address,))
3172 p.daemon = True
3173 p.start()
3174 time.sleep(1)
3175 # On Windows the client process should by now have connected,
3176 # written data and closed the pipe handle by now. This causes
3177 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
3178 # 14725.
3179 conn = l.accept()
3180 self.assertEqual(conn.recv(), 'hello')
3181 conn.close()
3182 p.join()
3183 l.close()
3184
Richard Oudkerked9e06c2013-01-13 22:46:48 +00003185 def test_issue16955(self):
3186 for fam in self.connection.families:
3187 l = self.connection.Listener(family=fam)
3188 c = self.connection.Client(l.address)
3189 a = l.accept()
3190 a.send_bytes(b"hello")
3191 self.assertTrue(c.poll(1))
3192 a.close()
3193 c.close()
3194 l.close()
3195
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003196class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003197
3198 ALLOWED_TYPES = ('processes', 'threads')
3199
3200 def test_empty_string(self):
3201 a, b = self.Pipe()
3202 self.assertEqual(a.poll(), False)
3203 b.send_bytes(b'')
3204 self.assertEqual(a.poll(), True)
3205 self.assertEqual(a.poll(), True)
3206
3207 @classmethod
3208 def _child_strings(cls, conn, strings):
3209 for s in strings:
3210 time.sleep(0.1)
3211 conn.send_bytes(s)
3212 conn.close()
3213
3214 def test_strings(self):
3215 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3216 a, b = self.Pipe()
3217 p = self.Process(target=self._child_strings, args=(b, strings))
3218 p.start()
3219
3220 for s in strings:
3221 for i in range(200):
3222 if a.poll(0.01):
3223 break
3224 x = a.recv_bytes()
3225 self.assertEqual(s, x)
3226
3227 p.join()
3228
3229 @classmethod
3230 def _child_boundaries(cls, r):
3231 # Polling may "pull" a message in to the child process, but we
3232 # don't want it to pull only part of a message, as that would
3233 # corrupt the pipe for any other processes which might later
3234 # read from it.
3235 r.poll(5)
3236
3237 def test_boundaries(self):
3238 r, w = self.Pipe(False)
3239 p = self.Process(target=self._child_boundaries, args=(r,))
3240 p.start()
3241 time.sleep(2)
3242 L = [b"first", b"second"]
3243 for obj in L:
3244 w.send_bytes(obj)
3245 w.close()
3246 p.join()
3247 self.assertIn(r.recv_bytes(), L)
3248
3249 @classmethod
3250 def _child_dont_merge(cls, b):
3251 b.send_bytes(b'a')
3252 b.send_bytes(b'b')
3253 b.send_bytes(b'cd')
3254
3255 def test_dont_merge(self):
3256 a, b = self.Pipe()
3257 self.assertEqual(a.poll(0.0), False)
3258 self.assertEqual(a.poll(0.1), False)
3259
3260 p = self.Process(target=self._child_dont_merge, args=(b,))
3261 p.start()
3262
3263 self.assertEqual(a.recv_bytes(), b'a')
3264 self.assertEqual(a.poll(1.0), True)
3265 self.assertEqual(a.poll(1.0), True)
3266 self.assertEqual(a.recv_bytes(), b'b')
3267 self.assertEqual(a.poll(1.0), True)
3268 self.assertEqual(a.poll(1.0), True)
3269 self.assertEqual(a.poll(0.0), True)
3270 self.assertEqual(a.recv_bytes(), b'cd')
3271
3272 p.join()
3273
Benjamin Petersone711caf2008-06-11 16:44:04 +00003274#
3275# Test of sending connection and socket objects between processes
3276#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003277
3278@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00003279class _TestPicklingConnections(BaseTestCase):
3280
3281 ALLOWED_TYPES = ('processes',)
3282
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003283 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02003284 def tearDownClass(cls):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003285 from multiprocessing import resource_sharer
Victor Stinner11f08072017-09-15 06:55:31 -07003286 resource_sharer.stop(timeout=TIMEOUT)
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02003287
3288 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003289 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003290 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003291 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003292 conn.send(l.address)
3293 new_conn = l.accept()
3294 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003295 new_conn.close()
3296 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003297
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003298 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003299 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01003300 l.listen()
Richard Oudkerk5d73c172012-05-08 22:24:47 +01003301 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003302 new_conn, addr = l.accept()
3303 conn.send(new_conn)
3304 new_conn.close()
3305 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003306
3307 conn.recv()
3308
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003309 @classmethod
3310 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003311 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003312 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003313 client.send(msg.upper())
3314 client.close()
3315
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003316 address, msg = conn.recv()
3317 client = socket.socket()
3318 client.connect(address)
3319 client.sendall(msg.upper())
3320 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003321
3322 conn.close()
3323
3324 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003325 families = self.connection.families
3326
3327 lconn, lconn0 = self.Pipe()
3328 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02003329 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003330 lp.start()
3331 lconn0.close()
3332
3333 rconn, rconn0 = self.Pipe()
3334 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003335 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003336 rp.start()
3337 rconn0.close()
3338
3339 for fam in families:
3340 msg = ('This connection uses family %s' % fam).encode('ascii')
3341 address = lconn.recv()
3342 rconn.send((address, msg))
3343 new_conn = lconn.recv()
3344 self.assertEqual(new_conn.recv(), msg.upper())
3345
3346 rconn.send(None)
3347
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003348 msg = latin('This connection uses a normal socket')
3349 address = lconn.recv()
3350 rconn.send((address, msg))
3351 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01003352 buf = []
3353 while True:
3354 s = new_conn.recv(100)
3355 if not s:
3356 break
3357 buf.append(s)
3358 buf = b''.join(buf)
3359 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003360 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003361
3362 lconn.send(None)
3363
3364 rconn.close()
3365 lconn.close()
3366
3367 lp.join()
3368 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003369
3370 @classmethod
3371 def child_access(cls, conn):
3372 w = conn.recv()
3373 w.send('all is well')
3374 w.close()
3375
3376 r = conn.recv()
3377 msg = r.recv()
3378 conn.send(msg*2)
3379
3380 conn.close()
3381
3382 def test_access(self):
3383 # On Windows, if we do not specify a destination pid when
3384 # using DupHandle then we need to be careful to use the
3385 # correct access flags for DuplicateHandle(), or else
3386 # DupHandle.detach() will raise PermissionError. For example,
3387 # for a read only pipe handle we should use
3388 # access=FILE_GENERIC_READ. (Unfortunately
3389 # DUPLICATE_SAME_ACCESS does not work.)
3390 conn, child_conn = self.Pipe()
3391 p = self.Process(target=self.child_access, args=(child_conn,))
3392 p.daemon = True
3393 p.start()
3394 child_conn.close()
3395
3396 r, w = self.Pipe(duplex=False)
3397 conn.send(w)
3398 w.close()
3399 self.assertEqual(r.recv(), 'all is well')
3400 r.close()
3401
3402 r, w = self.Pipe(duplex=False)
3403 conn.send(r)
3404 r.close()
3405 w.send('foobar')
3406 w.close()
3407 self.assertEqual(conn.recv(), 'foobar'*2)
3408
Victor Stinnerb4c52962017-07-25 02:40:55 +02003409 p.join()
3410
Benjamin Petersone711caf2008-06-11 16:44:04 +00003411#
3412#
3413#
3414
3415class _TestHeap(BaseTestCase):
3416
3417 ALLOWED_TYPES = ('processes',)
3418
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003419 def setUp(self):
3420 super().setUp()
3421 # Make pristine heap for these tests
3422 self.old_heap = multiprocessing.heap.BufferWrapper._heap
3423 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3424
3425 def tearDown(self):
3426 multiprocessing.heap.BufferWrapper._heap = self.old_heap
3427 super().tearDown()
3428
Benjamin Petersone711caf2008-06-11 16:44:04 +00003429 def test_heap(self):
3430 iterations = 5000
3431 maxblocks = 50
3432 blocks = []
3433
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003434 # get the heap object
3435 heap = multiprocessing.heap.BufferWrapper._heap
3436 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3437
Benjamin Petersone711caf2008-06-11 16:44:04 +00003438 # create and destroy lots of blocks of different sizes
3439 for i in range(iterations):
3440 size = int(random.lognormvariate(0, 1) * 1000)
3441 b = multiprocessing.heap.BufferWrapper(size)
3442 blocks.append(b)
3443 if len(blocks) > maxblocks:
3444 i = random.randrange(maxblocks)
3445 del blocks[i]
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003446 del b
Benjamin Petersone711caf2008-06-11 16:44:04 +00003447
3448 # verify the state of the heap
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003449 with heap._lock:
3450 all = []
3451 free = 0
3452 occupied = 0
3453 for L in list(heap._len_to_seq.values()):
3454 # count all free blocks in arenas
3455 for arena, start, stop in L:
3456 all.append((heap._arenas.index(arena), start, stop,
3457 stop-start, 'free'))
3458 free += (stop-start)
3459 for arena, arena_blocks in heap._allocated_blocks.items():
3460 # count all allocated blocks in arenas
3461 for start, stop in arena_blocks:
3462 all.append((heap._arenas.index(arena), start, stop,
3463 stop-start, 'occupied'))
3464 occupied += (stop-start)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003465
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003466 self.assertEqual(free + occupied,
3467 sum(arena.size for arena in heap._arenas))
Benjamin Petersone711caf2008-06-11 16:44:04 +00003468
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003469 all.sort()
3470
3471 for i in range(len(all)-1):
3472 (arena, start, stop) = all[i][:3]
3473 (narena, nstart, nstop) = all[i+1][:3]
3474 if arena != narena:
3475 # Two different arenas
3476 self.assertEqual(stop, heap._arenas[arena].size) # last block
3477 self.assertEqual(nstart, 0) # first block
3478 else:
3479 # Same arena: two adjacent blocks
3480 self.assertEqual(stop, nstart)
3481
3482 # test free'ing all blocks
3483 random.shuffle(blocks)
3484 while blocks:
3485 blocks.pop()
3486
3487 self.assertEqual(heap._n_frees, heap._n_mallocs)
3488 self.assertEqual(len(heap._pending_free_blocks), 0)
3489 self.assertEqual(len(heap._arenas), 0)
3490 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3491 self.assertEqual(len(heap._len_to_seq), 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003492
Charles-François Natali778db492011-07-02 14:35:49 +02003493 def test_free_from_gc(self):
3494 # Check that freeing of blocks by the garbage collector doesn't deadlock
3495 # (issue #12352).
3496 # Make sure the GC is enabled, and set lower collection thresholds to
3497 # make collections more frequent (and increase the probability of
3498 # deadlock).
3499 if not gc.isenabled():
3500 gc.enable()
3501 self.addCleanup(gc.disable)
3502 thresholds = gc.get_threshold()
3503 self.addCleanup(gc.set_threshold, *thresholds)
3504 gc.set_threshold(10)
3505
3506 # perform numerous block allocations, with cyclic references to make
3507 # sure objects are collected asynchronously by the gc
3508 for i in range(5000):
3509 a = multiprocessing.heap.BufferWrapper(1)
3510 b = multiprocessing.heap.BufferWrapper(1)
3511 # circular references
3512 a.buddy = b
3513 b.buddy = a
3514
Benjamin Petersone711caf2008-06-11 16:44:04 +00003515#
3516#
3517#
3518
Benjamin Petersone711caf2008-06-11 16:44:04 +00003519class _Foo(Structure):
3520 _fields_ = [
3521 ('x', c_int),
Gareth Rees3913bad2017-07-21 11:35:33 +01003522 ('y', c_double),
3523 ('z', c_longlong,)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003524 ]
3525
3526class _TestSharedCTypes(BaseTestCase):
3527
3528 ALLOWED_TYPES = ('processes',)
3529
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00003530 def setUp(self):
3531 if not HAS_SHAREDCTYPES:
3532 self.skipTest("requires multiprocessing.sharedctypes")
3533
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003534 @classmethod
Gareth Rees3913bad2017-07-21 11:35:33 +01003535 def _double(cls, x, y, z, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003536 x.value *= 2
3537 y.value *= 2
Gareth Rees3913bad2017-07-21 11:35:33 +01003538 z.value *= 2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003539 foo.x *= 2
3540 foo.y *= 2
3541 string.value *= 2
3542 for i in range(len(arr)):
3543 arr[i] *= 2
3544
3545 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003546 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003547 y = Value(c_double, 1.0/3.0, lock=lock)
Gareth Rees3913bad2017-07-21 11:35:33 +01003548 z = Value(c_longlong, 2 ** 33, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003549 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003550 arr = self.Array('d', list(range(10)), lock=lock)
3551 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00003552 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00003553
Gareth Rees3913bad2017-07-21 11:35:33 +01003554 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02003555 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003556 p.start()
3557 p.join()
3558
3559 self.assertEqual(x.value, 14)
3560 self.assertAlmostEqual(y.value, 2.0/3.0)
Gareth Rees3913bad2017-07-21 11:35:33 +01003561 self.assertEqual(z.value, 2 ** 34)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003562 self.assertEqual(foo.x, 6)
3563 self.assertAlmostEqual(foo.y, 4.0)
3564 for i in range(10):
3565 self.assertAlmostEqual(arr[i], i*2)
3566 self.assertEqual(string.value, latin('hellohello'))
3567
3568 def test_synchronize(self):
3569 self.test_sharedctypes(lock=True)
3570
3571 def test_copy(self):
Gareth Rees3913bad2017-07-21 11:35:33 +01003572 foo = _Foo(2, 5.0, 2 ** 33)
Brian Curtinafa88b52010-10-07 01:12:19 +00003573 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003574 foo.x = 0
3575 foo.y = 0
Gareth Rees3913bad2017-07-21 11:35:33 +01003576 foo.z = 0
Benjamin Petersone711caf2008-06-11 16:44:04 +00003577 self.assertEqual(bar.x, 2)
3578 self.assertAlmostEqual(bar.y, 5.0)
Gareth Rees3913bad2017-07-21 11:35:33 +01003579 self.assertEqual(bar.z, 2 ** 33)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003580
3581#
3582#
3583#
3584
3585class _TestFinalize(BaseTestCase):
3586
3587 ALLOWED_TYPES = ('processes',)
3588
Antoine Pitrou1eb6c002017-06-13 17:10:39 +02003589 def setUp(self):
3590 self.registry_backup = util._finalizer_registry.copy()
3591 util._finalizer_registry.clear()
3592
3593 def tearDown(self):
3594 self.assertFalse(util._finalizer_registry)
3595 util._finalizer_registry.update(self.registry_backup)
3596
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003597 @classmethod
3598 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003599 class Foo(object):
3600 pass
3601
3602 a = Foo()
3603 util.Finalize(a, conn.send, args=('a',))
3604 del a # triggers callback for a
3605
3606 b = Foo()
3607 close_b = util.Finalize(b, conn.send, args=('b',))
3608 close_b() # triggers callback for b
3609 close_b() # does nothing because callback has already been called
3610 del b # does nothing because callback has already been called
3611
3612 c = Foo()
3613 util.Finalize(c, conn.send, args=('c',))
3614
3615 d10 = Foo()
3616 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
3617
3618 d01 = Foo()
3619 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
3620 d02 = Foo()
3621 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
3622 d03 = Foo()
3623 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
3624
3625 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
3626
3627 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
3628
Ezio Melotti13925002011-03-16 11:05:33 +02003629 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00003630 # garbage collecting locals
3631 util._exit_function()
3632 conn.close()
3633 os._exit(0)
3634
3635 def test_finalize(self):
3636 conn, child_conn = self.Pipe()
3637
3638 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003639 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003640 p.start()
3641 p.join()
3642
3643 result = [obj for obj in iter(conn.recv, 'STOP')]
3644 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
3645
Antoine Pitrou1eb6c002017-06-13 17:10:39 +02003646 def test_thread_safety(self):
3647 # bpo-24484: _run_finalizers() should be thread-safe
3648 def cb():
3649 pass
3650
3651 class Foo(object):
3652 def __init__(self):
3653 self.ref = self # create reference cycle
3654 # insert finalizer at random key
3655 util.Finalize(self, cb, exitpriority=random.randint(1, 100))
3656
3657 finish = False
3658 exc = None
3659
3660 def run_finalizers():
3661 nonlocal exc
3662 while not finish:
3663 time.sleep(random.random() * 1e-1)
3664 try:
3665 # A GC run will eventually happen during this,
3666 # collecting stale Foo's and mutating the registry
3667 util._run_finalizers()
3668 except Exception as e:
3669 exc = e
3670
3671 def make_finalizers():
3672 nonlocal exc
3673 d = {}
3674 while not finish:
3675 try:
3676 # Old Foo's get gradually replaced and later
3677 # collected by the GC (because of the cyclic ref)
3678 d[random.getrandbits(5)] = {Foo() for i in range(10)}
3679 except Exception as e:
3680 exc = e
3681 d.clear()
3682
3683 old_interval = sys.getswitchinterval()
3684 old_threshold = gc.get_threshold()
3685 try:
3686 sys.setswitchinterval(1e-6)
3687 gc.set_threshold(5, 5, 5)
3688 threads = [threading.Thread(target=run_finalizers),
3689 threading.Thread(target=make_finalizers)]
3690 with test.support.start_threads(threads):
3691 time.sleep(4.0) # Wait a bit to trigger race condition
3692 finish = True
3693 if exc is not None:
3694 raise exc
3695 finally:
3696 sys.setswitchinterval(old_interval)
3697 gc.set_threshold(*old_threshold)
3698 gc.collect() # Collect remaining Foo's
3699
3700
Benjamin Petersone711caf2008-06-11 16:44:04 +00003701#
3702# Test that from ... import * works for each module
3703#
3704
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003705class _TestImportStar(unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003706
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003707 def get_module_names(self):
3708 import glob
3709 folder = os.path.dirname(multiprocessing.__file__)
3710 pattern = os.path.join(folder, '*.py')
3711 files = glob.glob(pattern)
3712 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
3713 modules = ['multiprocessing.' + m for m in modules]
3714 modules.remove('multiprocessing.__init__')
3715 modules.append('multiprocessing')
3716 return modules
Benjamin Petersone711caf2008-06-11 16:44:04 +00003717
3718 def test_import(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003719 modules = self.get_module_names()
3720 if sys.platform == 'win32':
3721 modules.remove('multiprocessing.popen_fork')
3722 modules.remove('multiprocessing.popen_forkserver')
3723 modules.remove('multiprocessing.popen_spawn_posix')
3724 else:
3725 modules.remove('multiprocessing.popen_spawn_win32')
3726 if not HAS_REDUCTION:
3727 modules.remove('multiprocessing.popen_forkserver')
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003728
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003729 if c_int is None:
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003730 # This module requires _ctypes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003731 modules.remove('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00003732
3733 for name in modules:
3734 __import__(name)
3735 mod = sys.modules[name]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003736 self.assertTrue(hasattr(mod, '__all__'), name)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003737
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003738 for attr in mod.__all__:
Benjamin Petersone711caf2008-06-11 16:44:04 +00003739 self.assertTrue(
3740 hasattr(mod, attr),
3741 '%r does not have attribute %r' % (mod, attr)
3742 )
3743
3744#
3745# Quick test that logging works -- does not test logging output
3746#
3747
3748class _TestLogging(BaseTestCase):
3749
3750 ALLOWED_TYPES = ('processes',)
3751
3752 def test_enable_logging(self):
3753 logger = multiprocessing.get_logger()
3754 logger.setLevel(util.SUBWARNING)
3755 self.assertTrue(logger is not None)
3756 logger.debug('this will not be printed')
3757 logger.info('nor will this')
3758 logger.setLevel(LOG_LEVEL)
3759
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003760 @classmethod
3761 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003762 logger = multiprocessing.get_logger()
3763 conn.send(logger.getEffectiveLevel())
3764
3765 def test_level(self):
3766 LEVEL1 = 32
3767 LEVEL2 = 37
3768
3769 logger = multiprocessing.get_logger()
3770 root_logger = logging.getLogger()
3771 root_level = root_logger.level
3772
3773 reader, writer = multiprocessing.Pipe(duplex=False)
3774
3775 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02003776 p = self.Process(target=self._test_level, args=(writer,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003777 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003778 self.assertEqual(LEVEL1, reader.recv())
Victor Stinner06634952017-07-24 13:02:20 +02003779 p.join()
3780 p.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003781
3782 logger.setLevel(logging.NOTSET)
3783 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02003784 p = self.Process(target=self._test_level, args=(writer,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003785 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003786 self.assertEqual(LEVEL2, reader.recv())
Victor Stinner06634952017-07-24 13:02:20 +02003787 p.join()
3788 p.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003789
3790 root_logger.setLevel(root_level)
3791 logger.setLevel(level=LOG_LEVEL)
3792
Jesse Nollerb9a49b72009-11-21 18:09:38 +00003793
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00003794# class _TestLoggingProcessName(BaseTestCase):
3795#
3796# def handle(self, record):
3797# assert record.processName == multiprocessing.current_process().name
3798# self.__handled = True
3799#
3800# def test_logging(self):
3801# handler = logging.Handler()
3802# handler.handle = self.handle
3803# self.__handled = False
3804# # Bypass getLogger() and side-effects
3805# logger = logging.getLoggerClass()(
3806# 'multiprocessing.test.TestLoggingProcessName')
3807# logger.addHandler(handler)
3808# logger.propagate = False
3809#
3810# logger.warn('foo')
3811# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00003812
Benjamin Petersone711caf2008-06-11 16:44:04 +00003813#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003814# Check that Process.join() retries if os.waitpid() fails with EINTR
3815#
3816
3817class _TestPollEintr(BaseTestCase):
3818
3819 ALLOWED_TYPES = ('processes',)
3820
3821 @classmethod
3822 def _killer(cls, pid):
Richard Oudkerk6a53af82013-08-28 13:50:19 +01003823 time.sleep(0.1)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003824 os.kill(pid, signal.SIGUSR1)
3825
3826 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3827 def test_poll_eintr(self):
3828 got_signal = [False]
3829 def record(*args):
3830 got_signal[0] = True
3831 pid = os.getpid()
3832 oldhandler = signal.signal(signal.SIGUSR1, record)
3833 try:
3834 killer = self.Process(target=self._killer, args=(pid,))
3835 killer.start()
Richard Oudkerk6a53af82013-08-28 13:50:19 +01003836 try:
3837 p = self.Process(target=time.sleep, args=(2,))
3838 p.start()
3839 p.join()
3840 finally:
3841 killer.join()
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003842 self.assertTrue(got_signal[0])
3843 self.assertEqual(p.exitcode, 0)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003844 finally:
3845 signal.signal(signal.SIGUSR1, oldhandler)
3846
3847#
Jesse Noller6214edd2009-01-19 16:23:53 +00003848# Test to verify handle verification, see issue 3321
3849#
3850
3851class TestInvalidHandle(unittest.TestCase):
3852
Victor Stinner937ee9e2018-06-26 02:11:06 +02003853 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00003854 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003855 conn = multiprocessing.connection.Connection(44977608)
Charles-François Natali6703bb42013-09-06 21:12:22 +02003856 # check that poll() doesn't crash
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003857 try:
Charles-François Natali6703bb42013-09-06 21:12:22 +02003858 conn.poll()
3859 except (ValueError, OSError):
3860 pass
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003861 finally:
3862 # Hack private attribute _handle to avoid printing an error
3863 # in conn.__del__
3864 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003865 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003866 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003867
Benjamin Petersone711caf2008-06-11 16:44:04 +00003868
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003869
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003870class OtherTest(unittest.TestCase):
3871 # TODO: add more tests for deliver/answer challenge.
3872 def test_deliver_challenge_auth_failure(self):
3873 class _FakeConnection(object):
3874 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003875 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003876 def send_bytes(self, data):
3877 pass
3878 self.assertRaises(multiprocessing.AuthenticationError,
3879 multiprocessing.connection.deliver_challenge,
3880 _FakeConnection(), b'abc')
3881
3882 def test_answer_challenge_auth_failure(self):
3883 class _FakeConnection(object):
3884 def __init__(self):
3885 self.count = 0
3886 def recv_bytes(self, size):
3887 self.count += 1
3888 if self.count == 1:
3889 return multiprocessing.connection.CHALLENGE
3890 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003891 return b'something bogus'
3892 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003893 def send_bytes(self, data):
3894 pass
3895 self.assertRaises(multiprocessing.AuthenticationError,
3896 multiprocessing.connection.answer_challenge,
3897 _FakeConnection(), b'abc')
3898
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003899#
3900# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3901#
3902
3903def initializer(ns):
3904 ns.test += 1
3905
3906class TestInitializers(unittest.TestCase):
3907 def setUp(self):
3908 self.mgr = multiprocessing.Manager()
3909 self.ns = self.mgr.Namespace()
3910 self.ns.test = 0
3911
3912 def tearDown(self):
3913 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003914 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003915
3916 def test_manager_initializer(self):
3917 m = multiprocessing.managers.SyncManager()
3918 self.assertRaises(TypeError, m.start, 1)
3919 m.start(initializer, (self.ns,))
3920 self.assertEqual(self.ns.test, 1)
3921 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003922 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003923
3924 def test_pool_initializer(self):
3925 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3926 p = multiprocessing.Pool(1, initializer, (self.ns,))
3927 p.close()
3928 p.join()
3929 self.assertEqual(self.ns.test, 1)
3930
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003931#
3932# Issue 5155, 5313, 5331: Test process in processes
3933# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3934#
3935
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003936def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003937 try:
3938 item = q.get(block=False)
3939 except pyqueue.Empty:
3940 pass
3941
Victor Stinnerb4c52962017-07-25 02:40:55 +02003942def _test_process():
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003943 queue = multiprocessing.Queue()
3944 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3945 subProc.daemon = True
3946 subProc.start()
3947 subProc.join()
3948
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003949def _afunc(x):
3950 return x*x
3951
3952def pool_in_process():
3953 pool = multiprocessing.Pool(processes=4)
3954 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003955 pool.close()
3956 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003957
3958class _file_like(object):
3959 def __init__(self, delegate):
3960 self._delegate = delegate
3961 self._pid = None
3962
3963 @property
3964 def cache(self):
3965 pid = os.getpid()
3966 # There are no race conditions since fork keeps only the running thread
3967 if pid != self._pid:
3968 self._pid = pid
3969 self._cache = []
3970 return self._cache
3971
3972 def write(self, data):
3973 self.cache.append(data)
3974
3975 def flush(self):
3976 self._delegate.write(''.join(self.cache))
3977 self._cache = []
3978
3979class TestStdinBadfiledescriptor(unittest.TestCase):
3980
3981 def test_queue_in_process(self):
Victor Stinnerb4c52962017-07-25 02:40:55 +02003982 proc = multiprocessing.Process(target=_test_process)
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003983 proc.start()
3984 proc.join()
3985
3986 def test_pool_in_process(self):
3987 p = multiprocessing.Process(target=pool_in_process)
3988 p.start()
3989 p.join()
3990
3991 def test_flushing(self):
3992 sio = io.StringIO()
3993 flike = _file_like(sio)
3994 flike.write('foo')
3995 proc = multiprocessing.Process(target=lambda: flike.flush())
3996 flike.flush()
3997 assert sio.getvalue() == 'foo'
3998
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003999
4000class TestWait(unittest.TestCase):
4001
4002 @classmethod
4003 def _child_test_wait(cls, w, slow):
4004 for i in range(10):
4005 if slow:
4006 time.sleep(random.random()*0.1)
4007 w.send((i, os.getpid()))
4008 w.close()
4009
4010 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004011 from multiprocessing.connection import wait
4012 readers = []
4013 procs = []
4014 messages = []
4015
4016 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01004017 r, w = multiprocessing.Pipe(duplex=False)
4018 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004019 p.daemon = True
4020 p.start()
4021 w.close()
4022 readers.append(r)
4023 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01004024 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004025
4026 while readers:
4027 for r in wait(readers):
4028 try:
4029 msg = r.recv()
4030 except EOFError:
4031 readers.remove(r)
4032 r.close()
4033 else:
4034 messages.append(msg)
4035
4036 messages.sort()
4037 expected = sorted((i, p.pid) for i in range(10) for p in procs)
4038 self.assertEqual(messages, expected)
4039
4040 @classmethod
4041 def _child_test_wait_socket(cls, address, slow):
4042 s = socket.socket()
4043 s.connect(address)
4044 for i in range(10):
4045 if slow:
4046 time.sleep(random.random()*0.1)
4047 s.sendall(('%s\n' % i).encode('ascii'))
4048 s.close()
4049
4050 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004051 from multiprocessing.connection import wait
4052 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02004053 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01004054 l.listen()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02004055 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004056 readers = []
4057 procs = []
4058 dic = {}
4059
4060 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01004061 p = multiprocessing.Process(target=self._child_test_wait_socket,
4062 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004063 p.daemon = True
4064 p.start()
4065 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01004066 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004067
4068 for i in range(4):
4069 r, _ = l.accept()
4070 readers.append(r)
4071 dic[r] = []
4072 l.close()
4073
4074 while readers:
4075 for r in wait(readers):
4076 msg = r.recv(32)
4077 if not msg:
4078 readers.remove(r)
4079 r.close()
4080 else:
4081 dic[r].append(msg)
4082
4083 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4084 for v in dic.values():
4085 self.assertEqual(b''.join(v), expected)
4086
4087 def test_wait_slow(self):
4088 self.test_wait(True)
4089
4090 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01004091 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004092
4093 def test_wait_timeout(self):
4094 from multiprocessing.connection import wait
4095
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004096 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004097 a, b = multiprocessing.Pipe()
4098
4099 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004100 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004101 delta = time.time() - start
4102
4103 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01004104 self.assertLess(delta, expected * 2)
4105 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004106
4107 b.send(None)
4108
4109 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004110 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004111 delta = time.time() - start
4112
4113 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01004114 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004115
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004116 @classmethod
4117 def signal_and_sleep(cls, sem, period):
4118 sem.release()
4119 time.sleep(period)
4120
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004121 def test_wait_integer(self):
4122 from multiprocessing.connection import wait
4123
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004124 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01004125 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004126 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004127 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004128 p = multiprocessing.Process(target=self.signal_and_sleep,
4129 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004130
4131 p.start()
4132 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004133 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004134
4135 start = time.time()
4136 res = wait([a, p.sentinel, b], expected + 20)
4137 delta = time.time() - start
4138
4139 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01004140 self.assertLess(delta, expected + 2)
4141 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004142
4143 a.send(None)
4144
4145 start = time.time()
4146 res = wait([a, p.sentinel, b], 20)
4147 delta = time.time() - start
4148
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01004149 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01004150 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004151
4152 b.send(None)
4153
4154 start = time.time()
4155 res = wait([a, p.sentinel, b], 20)
4156 delta = time.time() - start
4157
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01004158 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01004159 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004160
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004161 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004162 p.join()
4163
Richard Oudkerk59d54042012-05-10 16:11:12 +01004164 def test_neg_timeout(self):
4165 from multiprocessing.connection import wait
4166 a, b = multiprocessing.Pipe()
4167 t = time.time()
4168 res = wait([a], timeout=-1)
4169 t = time.time() - t
4170 self.assertEqual(res, [])
4171 self.assertLess(t, 1)
4172 a.close()
4173 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004174
Antoine Pitrou709176f2012-04-01 17:19:09 +02004175#
4176# Issue 14151: Test invalid family on invalid environment
4177#
4178
4179class TestInvalidFamily(unittest.TestCase):
4180
Victor Stinner937ee9e2018-06-26 02:11:06 +02004181 @unittest.skipIf(WIN32, "skipped on Windows")
Antoine Pitrou709176f2012-04-01 17:19:09 +02004182 def test_invalid_family(self):
4183 with self.assertRaises(ValueError):
4184 multiprocessing.connection.Listener(r'\\.\test')
4185
Victor Stinner937ee9e2018-06-26 02:11:06 +02004186 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02004187 def test_invalid_family_win32(self):
4188 with self.assertRaises(ValueError):
4189 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02004190
Richard Oudkerk77c84f22012-05-18 14:28:02 +01004191#
4192# Issue 12098: check sys.flags of child matches that for parent
4193#
4194
4195class TestFlags(unittest.TestCase):
4196 @classmethod
4197 def run_in_grandchild(cls, conn):
4198 conn.send(tuple(sys.flags))
4199
4200 @classmethod
4201 def run_in_child(cls):
4202 import json
4203 r, w = multiprocessing.Pipe(duplex=False)
4204 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4205 p.start()
4206 grandchild_flags = r.recv()
4207 p.join()
4208 r.close()
4209 w.close()
4210 flags = (tuple(sys.flags), grandchild_flags)
4211 print(json.dumps(flags))
4212
4213 def test_flags(self):
4214 import json, subprocess
4215 # start child process using unusual flags
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004216 prog = ('from test._test_multiprocessing import TestFlags; ' +
Richard Oudkerk77c84f22012-05-18 14:28:02 +01004217 'TestFlags.run_in_child()')
4218 data = subprocess.check_output(
4219 [sys.executable, '-E', '-S', '-O', '-c', prog])
4220 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4221 self.assertEqual(child_flags, grandchild_flags)
4222
Richard Oudkerkb15e6222012-07-27 14:19:00 +01004223#
4224# Test interaction with socket timeouts - see Issue #6056
4225#
4226
4227class TestTimeouts(unittest.TestCase):
4228 @classmethod
4229 def _test_timeout(cls, child, address):
4230 time.sleep(1)
4231 child.send(123)
4232 child.close()
4233 conn = multiprocessing.connection.Client(address)
4234 conn.send(456)
4235 conn.close()
4236
4237 def test_timeout(self):
4238 old_timeout = socket.getdefaulttimeout()
4239 try:
4240 socket.setdefaulttimeout(0.1)
4241 parent, child = multiprocessing.Pipe(duplex=True)
4242 l = multiprocessing.connection.Listener(family='AF_INET')
4243 p = multiprocessing.Process(target=self._test_timeout,
4244 args=(child, l.address))
4245 p.start()
4246 child.close()
4247 self.assertEqual(parent.recv(), 123)
4248 parent.close()
4249 conn = l.accept()
4250 self.assertEqual(conn.recv(), 456)
4251 conn.close()
4252 l.close()
Victor Stinner11f08072017-09-15 06:55:31 -07004253 join_process(p)
Richard Oudkerkb15e6222012-07-27 14:19:00 +01004254 finally:
4255 socket.setdefaulttimeout(old_timeout)
4256
Richard Oudkerke88a2442012-08-14 11:41:32 +01004257#
4258# Test what happens with no "if __name__ == '__main__'"
4259#
4260
4261class TestNoForkBomb(unittest.TestCase):
4262 def test_noforkbomb(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004263 sm = multiprocessing.get_start_method()
Richard Oudkerke88a2442012-08-14 11:41:32 +01004264 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004265 if sm != 'fork':
Berker Peksag076dbd02015-05-06 07:01:52 +03004266 rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02004267 self.assertEqual(out, b'')
4268 self.assertIn(b'RuntimeError', err)
Richard Oudkerke88a2442012-08-14 11:41:32 +01004269 else:
Berker Peksag076dbd02015-05-06 07:01:52 +03004270 rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02004271 self.assertEqual(out.rstrip(), b'123')
4272 self.assertEqual(err, b'')
Richard Oudkerke88a2442012-08-14 11:41:32 +01004273
4274#
Richard Oudkerk409c3132013-04-17 20:58:00 +01004275# Issue #17555: ForkAwareThreadLock
4276#
4277
4278class TestForkAwareThreadLock(unittest.TestCase):
Mike53f7a7c2017-12-14 14:04:53 +03004279 # We recursively start processes. Issue #17555 meant that the
Richard Oudkerk409c3132013-04-17 20:58:00 +01004280 # after fork registry would get duplicate entries for the same
4281 # lock. The size of the registry at generation n was ~2**n.
4282
4283 @classmethod
4284 def child(cls, n, conn):
4285 if n > 1:
4286 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4287 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01004288 conn.close()
Victor Stinner11f08072017-09-15 06:55:31 -07004289 join_process(p)
Richard Oudkerk409c3132013-04-17 20:58:00 +01004290 else:
4291 conn.send(len(util._afterfork_registry))
4292 conn.close()
4293
4294 def test_lock(self):
4295 r, w = multiprocessing.Pipe(False)
4296 l = util.ForkAwareThreadLock()
4297 old_size = len(util._afterfork_registry)
4298 p = multiprocessing.Process(target=self.child, args=(5, w))
4299 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01004300 w.close()
Richard Oudkerk409c3132013-04-17 20:58:00 +01004301 new_size = r.recv()
Victor Stinner11f08072017-09-15 06:55:31 -07004302 join_process(p)
Richard Oudkerk409c3132013-04-17 20:58:00 +01004303 self.assertLessEqual(new_size, old_size)
4304
4305#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004306# Check that non-forked child processes do not inherit unneeded fds/handles
4307#
4308
4309class TestCloseFds(unittest.TestCase):
4310
4311 def get_high_socket_fd(self):
Victor Stinner937ee9e2018-06-26 02:11:06 +02004312 if WIN32:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004313 # The child process will not have any socket handles, so
4314 # calling socket.fromfd() should produce WSAENOTSOCK even
4315 # if there is a handle of the same number.
4316 return socket.socket().detach()
4317 else:
4318 # We want to produce a socket with an fd high enough that a
4319 # freshly created child process will not have any fds as high.
4320 fd = socket.socket().detach()
4321 to_close = []
4322 while fd < 50:
4323 to_close.append(fd)
4324 fd = os.dup(fd)
4325 for x in to_close:
4326 os.close(x)
4327 return fd
4328
4329 def close(self, fd):
Victor Stinner937ee9e2018-06-26 02:11:06 +02004330 if WIN32:
Christian Heimesb6e43af2018-01-29 22:37:58 +01004331 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004332 else:
4333 os.close(fd)
4334
4335 @classmethod
4336 def _test_closefds(cls, conn, fd):
4337 try:
4338 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
4339 except Exception as e:
4340 conn.send(e)
4341 else:
4342 s.close()
4343 conn.send(None)
4344
4345 def test_closefd(self):
4346 if not HAS_REDUCTION:
4347 raise unittest.SkipTest('requires fd pickling')
4348
4349 reader, writer = multiprocessing.Pipe()
4350 fd = self.get_high_socket_fd()
4351 try:
4352 p = multiprocessing.Process(target=self._test_closefds,
4353 args=(writer, fd))
4354 p.start()
4355 writer.close()
4356 e = reader.recv()
Victor Stinner11f08072017-09-15 06:55:31 -07004357 join_process(p)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004358 finally:
4359 self.close(fd)
4360 writer.close()
4361 reader.close()
4362
4363 if multiprocessing.get_start_method() == 'fork':
4364 self.assertIs(e, None)
4365 else:
4366 WSAENOTSOCK = 10038
4367 self.assertIsInstance(e, OSError)
4368 self.assertTrue(e.errno == errno.EBADF or
4369 e.winerror == WSAENOTSOCK, e)
4370
4371#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004372# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
4373#
4374
4375class TestIgnoreEINTR(unittest.TestCase):
4376
Victor Stinner252f6ab2018-06-01 16:48:34 +02004377 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
4378 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
4379
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004380 @classmethod
4381 def _test_ignore(cls, conn):
4382 def handler(signum, frame):
4383 pass
4384 signal.signal(signal.SIGUSR1, handler)
4385 conn.send('ready')
4386 x = conn.recv()
4387 conn.send(x)
Victor Stinner252f6ab2018-06-01 16:48:34 +02004388 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004389
4390 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4391 def test_ignore(self):
4392 conn, child_conn = multiprocessing.Pipe()
4393 try:
4394 p = multiprocessing.Process(target=self._test_ignore,
4395 args=(child_conn,))
4396 p.daemon = True
4397 p.start()
4398 child_conn.close()
4399 self.assertEqual(conn.recv(), 'ready')
4400 time.sleep(0.1)
4401 os.kill(p.pid, signal.SIGUSR1)
4402 time.sleep(0.1)
4403 conn.send(1234)
4404 self.assertEqual(conn.recv(), 1234)
4405 time.sleep(0.1)
4406 os.kill(p.pid, signal.SIGUSR1)
Victor Stinner252f6ab2018-06-01 16:48:34 +02004407 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004408 time.sleep(0.1)
4409 p.join()
4410 finally:
4411 conn.close()
4412
4413 @classmethod
4414 def _test_ignore_listener(cls, conn):
4415 def handler(signum, frame):
4416 pass
4417 signal.signal(signal.SIGUSR1, handler)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004418 with multiprocessing.connection.Listener() as l:
4419 conn.send(l.address)
4420 a = l.accept()
4421 a.send('welcome')
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004422
4423 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4424 def test_ignore_listener(self):
4425 conn, child_conn = multiprocessing.Pipe()
4426 try:
4427 p = multiprocessing.Process(target=self._test_ignore_listener,
4428 args=(child_conn,))
4429 p.daemon = True
4430 p.start()
4431 child_conn.close()
4432 address = conn.recv()
4433 time.sleep(0.1)
4434 os.kill(p.pid, signal.SIGUSR1)
4435 time.sleep(0.1)
4436 client = multiprocessing.connection.Client(address)
4437 self.assertEqual(client.recv(), 'welcome')
4438 p.join()
4439 finally:
4440 conn.close()
4441
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004442class TestStartMethod(unittest.TestCase):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004443 @classmethod
4444 def _check_context(cls, conn):
4445 conn.send(multiprocessing.get_start_method())
4446
4447 def check_context(self, ctx):
4448 r, w = ctx.Pipe(duplex=False)
4449 p = ctx.Process(target=self._check_context, args=(w,))
4450 p.start()
4451 w.close()
4452 child_method = r.recv()
4453 r.close()
4454 p.join()
4455 self.assertEqual(child_method, ctx.get_start_method())
4456
4457 def test_context(self):
4458 for method in ('fork', 'spawn', 'forkserver'):
4459 try:
4460 ctx = multiprocessing.get_context(method)
4461 except ValueError:
4462 continue
4463 self.assertEqual(ctx.get_start_method(), method)
4464 self.assertIs(ctx.get_context(), ctx)
4465 self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
4466 self.assertRaises(ValueError, ctx.set_start_method, None)
4467 self.check_context(ctx)
4468
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004469 def test_set_get(self):
4470 multiprocessing.set_forkserver_preload(PRELOAD)
4471 count = 0
4472 old_method = multiprocessing.get_start_method()
Jesse Nollerd00df3c2008-06-18 14:22:48 +00004473 try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004474 for method in ('fork', 'spawn', 'forkserver'):
4475 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004476 multiprocessing.set_start_method(method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004477 except ValueError:
4478 continue
4479 self.assertEqual(multiprocessing.get_start_method(), method)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004480 ctx = multiprocessing.get_context()
4481 self.assertEqual(ctx.get_start_method(), method)
4482 self.assertTrue(type(ctx).__name__.lower().startswith(method))
4483 self.assertTrue(
4484 ctx.Process.__name__.lower().startswith(method))
4485 self.check_context(multiprocessing)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004486 count += 1
4487 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004488 multiprocessing.set_start_method(old_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004489 self.assertGreaterEqual(count, 1)
4490
4491 def test_get_all(self):
4492 methods = multiprocessing.get_all_start_methods()
4493 if sys.platform == 'win32':
4494 self.assertEqual(methods, ['spawn'])
4495 else:
4496 self.assertTrue(methods == ['fork', 'spawn'] or
4497 methods == ['fork', 'spawn', 'forkserver'])
4498
Antoine Pitroucd2a2012016-12-10 17:13:16 +01004499 def test_preload_resources(self):
4500 if multiprocessing.get_start_method() != 'forkserver':
4501 self.skipTest("test only relevant for 'forkserver' method")
4502 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
4503 rc, out, err = test.support.script_helper.assert_python_ok(name)
4504 out = out.decode()
4505 err = err.decode()
4506 if out.rstrip() != 'ok' or err != '':
4507 print(out)
4508 print(err)
4509 self.fail("failed spawning forkserver or grandchild")
4510
4511
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004512@unittest.skipIf(sys.platform == "win32",
4513 "test semantics don't make sense on Windows")
4514class TestSemaphoreTracker(unittest.TestCase):
Antoine Pitroucbe17562017-11-03 14:31:38 +01004515
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004516 def test_semaphore_tracker(self):
Antoine Pitroucbe17562017-11-03 14:31:38 +01004517 #
4518 # Check that killing process does not leak named semaphores
4519 #
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004520 import subprocess
4521 cmd = '''if 1:
4522 import multiprocessing as mp, time, os
4523 mp.set_start_method("spawn")
4524 lock1 = mp.Lock()
4525 lock2 = mp.Lock()
4526 os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
4527 os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
4528 time.sleep(10)
4529 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004530 r, w = os.pipe()
4531 p = subprocess.Popen([sys.executable,
Victor Stinner9402c832017-12-15 16:29:24 +01004532 '-E', '-c', cmd % (w, w)],
Richard Oudkerk67e51982013-08-22 23:37:23 +01004533 pass_fds=[w],
4534 stderr=subprocess.PIPE)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004535 os.close(w)
4536 with open(r, 'rb', closefd=True) as f:
4537 name1 = f.readline().rstrip().decode('ascii')
4538 name2 = f.readline().rstrip().decode('ascii')
4539 _multiprocessing.sem_unlink(name1)
4540 p.terminate()
4541 p.wait()
Richard Oudkerk42a526c2014-02-21 22:29:58 +00004542 time.sleep(2.0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004543 with self.assertRaises(OSError) as ctx:
4544 _multiprocessing.sem_unlink(name2)
4545 # docs say it should be ENOENT, but OSX seems to give EINVAL
4546 self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
Richard Oudkerk67e51982013-08-22 23:37:23 +01004547 err = p.stderr.read().decode('utf-8')
4548 p.stderr.close()
4549 expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
4550 self.assertRegex(err, expected)
R David Murray44b548d2016-09-08 13:59:53 -04004551 self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004552
Antoine Pitroucbe17562017-11-03 14:31:38 +01004553 def check_semaphore_tracker_death(self, signum, should_die):
4554 # bpo-31310: if the semaphore tracker process has died, it should
4555 # be restarted implicitly.
4556 from multiprocessing.semaphore_tracker import _semaphore_tracker
Antoine Pitroucbe17562017-11-03 14:31:38 +01004557 pid = _semaphore_tracker._pid
Pablo Galindoec74d182018-09-04 09:53:54 +01004558 if pid is not None:
4559 os.kill(pid, signal.SIGKILL)
4560 os.waitpid(pid, 0)
Pablo Galindo3058b7d2018-10-10 08:40:14 +01004561 with warnings.catch_warnings():
4562 warnings.simplefilter("ignore")
Pablo Galindoec74d182018-09-04 09:53:54 +01004563 _semaphore_tracker.ensure_running()
4564 pid = _semaphore_tracker._pid
4565
Antoine Pitroucbe17562017-11-03 14:31:38 +01004566 os.kill(pid, signum)
4567 time.sleep(1.0) # give it time to die
4568
4569 ctx = multiprocessing.get_context("spawn")
Pablo Galindoec74d182018-09-04 09:53:54 +01004570 with warnings.catch_warnings(record=True) as all_warn:
Pablo Galindo3058b7d2018-10-10 08:40:14 +01004571 warnings.simplefilter("always")
Antoine Pitroucbe17562017-11-03 14:31:38 +01004572 sem = ctx.Semaphore()
4573 sem.acquire()
4574 sem.release()
4575 wr = weakref.ref(sem)
4576 # ensure `sem` gets collected, which triggers communication with
4577 # the semaphore tracker
4578 del sem
4579 gc.collect()
4580 self.assertIsNone(wr())
Pablo Galindoec74d182018-09-04 09:53:54 +01004581 if should_die:
4582 self.assertEqual(len(all_warn), 1)
4583 the_warn = all_warn[0]
Pablo Galindo3058b7d2018-10-10 08:40:14 +01004584 self.assertTrue(issubclass(the_warn.category, UserWarning))
Pablo Galindoec74d182018-09-04 09:53:54 +01004585 self.assertTrue("semaphore_tracker: process died"
4586 in str(the_warn.message))
4587 else:
4588 self.assertEqual(len(all_warn), 0)
Antoine Pitroucbe17562017-11-03 14:31:38 +01004589
4590 def test_semaphore_tracker_sigint(self):
4591 # Catchable signal (ignored by semaphore tracker)
4592 self.check_semaphore_tracker_death(signal.SIGINT, False)
4593
Pablo Galindoec74d182018-09-04 09:53:54 +01004594 def test_semaphore_tracker_sigterm(self):
4595 # Catchable signal (ignored by semaphore tracker)
4596 self.check_semaphore_tracker_death(signal.SIGTERM, False)
4597
Antoine Pitroucbe17562017-11-03 14:31:38 +01004598 def test_semaphore_tracker_sigkill(self):
4599 # Uncatchable signal.
4600 self.check_semaphore_tracker_death(signal.SIGKILL, True)
4601
4602
Xiang Zhang6f75bc02017-05-17 21:04:00 +08004603class TestSimpleQueue(unittest.TestCase):
4604
4605 @classmethod
4606 def _test_empty(cls, queue, child_can_start, parent_can_continue):
4607 child_can_start.wait()
4608 # issue 30301, could fail under spawn and forkserver
4609 try:
4610 queue.put(queue.empty())
4611 queue.put(queue.empty())
4612 finally:
4613 parent_can_continue.set()
4614
4615 def test_empty(self):
4616 queue = multiprocessing.SimpleQueue()
4617 child_can_start = multiprocessing.Event()
4618 parent_can_continue = multiprocessing.Event()
4619
4620 proc = multiprocessing.Process(
4621 target=self._test_empty,
4622 args=(queue, child_can_start, parent_can_continue)
4623 )
4624 proc.daemon = True
4625 proc.start()
4626
4627 self.assertTrue(queue.empty())
4628
4629 child_can_start.set()
4630 parent_can_continue.wait()
4631
4632 self.assertFalse(queue.empty())
4633 self.assertEqual(queue.get(), True)
4634 self.assertEqual(queue.get(), False)
4635 self.assertTrue(queue.empty())
4636
4637 proc.join()
4638
Derek B. Kimc40278e2018-07-11 19:22:28 +09004639
Julien Palard5d236ca2018-11-04 23:40:32 +01004640class TestPoolNotLeakOnFailure(unittest.TestCase):
4641
4642 def test_release_unused_processes(self):
4643 # Issue #19675: During pool creation, if we can't create a process,
4644 # don't leak already created ones.
4645 will_fail_in = 3
4646 forked_processes = []
4647
4648 class FailingForkProcess:
4649 def __init__(self, **kwargs):
4650 self.name = 'Fake Process'
4651 self.exitcode = None
4652 self.state = None
4653 forked_processes.append(self)
4654
4655 def start(self):
4656 nonlocal will_fail_in
4657 if will_fail_in <= 0:
4658 raise OSError("Manually induced OSError")
4659 will_fail_in -= 1
4660 self.state = 'started'
4661
4662 def terminate(self):
4663 self.state = 'stopping'
4664
4665 def join(self):
4666 if self.state == 'stopping':
4667 self.state = 'stopped'
4668
4669 def is_alive(self):
4670 return self.state == 'started' or self.state == 'stopping'
4671
4672 with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
4673 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
4674 Process=FailingForkProcess))
4675 p.close()
4676 p.join()
4677 self.assertFalse(
4678 any(process.is_alive() for process in forked_processes))
4679
4680
4681
Derek B. Kimc40278e2018-07-11 19:22:28 +09004682class MiscTestCase(unittest.TestCase):
4683 def test__all__(self):
4684 # Just make sure names in blacklist are excluded
4685 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
4686 blacklist=['SUBDEBUG', 'SUBWARNING'])
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004687#
4688# Mixins
4689#
4690
Victor Stinnerffb49402017-07-25 01:55:54 +02004691class BaseMixin(object):
4692 @classmethod
4693 def setUpClass(cls):
4694 cls.dangling = (multiprocessing.process._dangling.copy(),
4695 threading._dangling.copy())
4696
4697 @classmethod
4698 def tearDownClass(cls):
4699 # bpo-26762: Some multiprocessing objects like Pool create reference
4700 # cycles. Trigger a garbage collection to break these cycles.
4701 test.support.gc_collect()
4702
4703 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
4704 if processes:
Victor Stinner957d0e92017-08-10 17:36:50 +02004705 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004706 print('Warning -- Dangling processes: %s' % processes,
4707 file=sys.stderr)
4708 processes = None
4709
4710 threads = set(threading._dangling) - set(cls.dangling[1])
4711 if threads:
Victor Stinner957d0e92017-08-10 17:36:50 +02004712 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004713 print('Warning -- Dangling threads: %s' % threads,
4714 file=sys.stderr)
4715 threads = None
4716
4717
4718class ProcessesMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004719 TYPE = 'processes'
4720 Process = multiprocessing.Process
4721 connection = multiprocessing.connection
4722 current_process = staticmethod(multiprocessing.current_process)
4723 active_children = staticmethod(multiprocessing.active_children)
4724 Pool = staticmethod(multiprocessing.Pool)
4725 Pipe = staticmethod(multiprocessing.Pipe)
4726 Queue = staticmethod(multiprocessing.Queue)
4727 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
4728 Lock = staticmethod(multiprocessing.Lock)
4729 RLock = staticmethod(multiprocessing.RLock)
4730 Semaphore = staticmethod(multiprocessing.Semaphore)
4731 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
4732 Condition = staticmethod(multiprocessing.Condition)
4733 Event = staticmethod(multiprocessing.Event)
4734 Barrier = staticmethod(multiprocessing.Barrier)
4735 Value = staticmethod(multiprocessing.Value)
4736 Array = staticmethod(multiprocessing.Array)
4737 RawValue = staticmethod(multiprocessing.RawValue)
4738 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00004739
Benjamin Petersone711caf2008-06-11 16:44:04 +00004740
Victor Stinnerffb49402017-07-25 01:55:54 +02004741class ManagerMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004742 TYPE = 'manager'
4743 Process = multiprocessing.Process
4744 Queue = property(operator.attrgetter('manager.Queue'))
4745 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
4746 Lock = property(operator.attrgetter('manager.Lock'))
4747 RLock = property(operator.attrgetter('manager.RLock'))
4748 Semaphore = property(operator.attrgetter('manager.Semaphore'))
4749 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
4750 Condition = property(operator.attrgetter('manager.Condition'))
4751 Event = property(operator.attrgetter('manager.Event'))
4752 Barrier = property(operator.attrgetter('manager.Barrier'))
4753 Value = property(operator.attrgetter('manager.Value'))
4754 Array = property(operator.attrgetter('manager.Array'))
4755 list = property(operator.attrgetter('manager.list'))
4756 dict = property(operator.attrgetter('manager.dict'))
4757 Namespace = property(operator.attrgetter('manager.Namespace'))
4758
4759 @classmethod
4760 def Pool(cls, *args, **kwds):
4761 return cls.manager.Pool(*args, **kwds)
4762
4763 @classmethod
4764 def setUpClass(cls):
Victor Stinnerffb49402017-07-25 01:55:54 +02004765 super().setUpClass()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004766 cls.manager = multiprocessing.Manager()
4767
4768 @classmethod
4769 def tearDownClass(cls):
4770 # only the manager process should be returned by active_children()
4771 # but this can take a bit on slow machines, so wait a few seconds
4772 # if there are other children too (see #17395)
Victor Stinnerffb49402017-07-25 01:55:54 +02004773 start_time = time.monotonic()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004774 t = 0.01
Victor Stinnerffb49402017-07-25 01:55:54 +02004775 while len(multiprocessing.active_children()) > 1:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004776 time.sleep(t)
4777 t *= 2
Victor Stinnerffb49402017-07-25 01:55:54 +02004778 dt = time.monotonic() - start_time
4779 if dt >= 5.0:
Victor Stinner957d0e92017-08-10 17:36:50 +02004780 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004781 print("Warning -- multiprocessing.Manager still has %s active "
4782 "children after %s seconds"
4783 % (multiprocessing.active_children(), dt),
4784 file=sys.stderr)
4785 break
4786
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004787 gc.collect() # do garbage collection
4788 if cls.manager._number_of_objects() != 0:
4789 # This is not really an error since some tests do not
4790 # ensure that all processes which hold a reference to a
4791 # managed object have been joined.
Victor Stinner957d0e92017-08-10 17:36:50 +02004792 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004793 print('Warning -- Shared objects which still exist at manager '
4794 'shutdown:')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004795 print(cls.manager._debug_info())
4796 cls.manager.shutdown()
4797 cls.manager.join()
4798 cls.manager = None
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01004799
Victor Stinnerffb49402017-07-25 01:55:54 +02004800 super().tearDownClass()
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01004801
Victor Stinnerffb49402017-07-25 01:55:54 +02004802
4803class ThreadsMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004804 TYPE = 'threads'
4805 Process = multiprocessing.dummy.Process
4806 connection = multiprocessing.dummy.connection
4807 current_process = staticmethod(multiprocessing.dummy.current_process)
4808 active_children = staticmethod(multiprocessing.dummy.active_children)
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01004809 Pool = staticmethod(multiprocessing.dummy.Pool)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004810 Pipe = staticmethod(multiprocessing.dummy.Pipe)
4811 Queue = staticmethod(multiprocessing.dummy.Queue)
4812 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
4813 Lock = staticmethod(multiprocessing.dummy.Lock)
4814 RLock = staticmethod(multiprocessing.dummy.RLock)
4815 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
4816 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
4817 Condition = staticmethod(multiprocessing.dummy.Condition)
4818 Event = staticmethod(multiprocessing.dummy.Event)
4819 Barrier = staticmethod(multiprocessing.dummy.Barrier)
4820 Value = staticmethod(multiprocessing.dummy.Value)
4821 Array = staticmethod(multiprocessing.dummy.Array)
4822
4823#
4824# Functions used to create test cases from the base ones in this module
4825#
4826
4827def install_tests_in_module_dict(remote_globs, start_method):
4828 __module__ = remote_globs['__name__']
4829 local_globs = globals()
4830 ALL_TYPES = {'processes', 'threads', 'manager'}
4831
4832 for name, base in local_globs.items():
4833 if not isinstance(base, type):
4834 continue
4835 if issubclass(base, BaseTestCase):
4836 if base is BaseTestCase:
4837 continue
4838 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
4839 for type_ in base.ALLOWED_TYPES:
4840 newname = 'With' + type_.capitalize() + name[1:]
4841 Mixin = local_globs[type_.capitalize() + 'Mixin']
4842 class Temp(base, Mixin, unittest.TestCase):
4843 pass
4844 Temp.__name__ = Temp.__qualname__ = newname
4845 Temp.__module__ = __module__
4846 remote_globs[newname] = Temp
4847 elif issubclass(base, unittest.TestCase):
4848 class Temp(base, object):
4849 pass
4850 Temp.__name__ = Temp.__qualname__ = name
4851 Temp.__module__ = __module__
4852 remote_globs[name] = Temp
4853
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004854 dangling = [None, None]
4855 old_start_method = [None]
4856
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004857 def setUpModule():
4858 multiprocessing.set_forkserver_preload(PRELOAD)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004859 multiprocessing.process._cleanup()
4860 dangling[0] = multiprocessing.process._dangling.copy()
4861 dangling[1] = threading._dangling.copy()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004862 old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004863 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004864 multiprocessing.set_start_method(start_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004865 except ValueError:
4866 raise unittest.SkipTest(start_method +
4867 ' start method not supported')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004868
4869 if sys.platform.startswith("linux"):
4870 try:
4871 lock = multiprocessing.RLock()
4872 except OSError:
4873 raise unittest.SkipTest("OSError raises on RLock creation, "
4874 "see issue 3111!")
4875 check_enough_semaphores()
4876 util.get_temp_dir() # creates temp directory
4877 multiprocessing.get_logger().setLevel(LOG_LEVEL)
4878
4879 def tearDownModule():
Victor Stinnerffb49402017-07-25 01:55:54 +02004880 need_sleep = False
4881
4882 # bpo-26762: Some multiprocessing objects like Pool create reference
4883 # cycles. Trigger a garbage collection to break these cycles.
4884 test.support.gc_collect()
4885
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004886 multiprocessing.set_start_method(old_start_method[0], force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004887 # pause a bit so we don't get warning about dangling threads/processes
Victor Stinnerffb49402017-07-25 01:55:54 +02004888 processes = set(multiprocessing.process._dangling) - set(dangling[0])
4889 if processes:
4890 need_sleep = True
Victor Stinner957d0e92017-08-10 17:36:50 +02004891 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004892 print('Warning -- Dangling processes: %s' % processes,
4893 file=sys.stderr)
4894 processes = None
4895
4896 threads = set(threading._dangling) - set(dangling[1])
4897 if threads:
4898 need_sleep = True
Victor Stinner957d0e92017-08-10 17:36:50 +02004899 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004900 print('Warning -- Dangling threads: %s' % threads,
4901 file=sys.stderr)
4902 threads = None
4903
4904 # Sleep 500 ms to give time to child processes to complete.
4905 if need_sleep:
4906 time.sleep(0.5)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004907 multiprocessing.process._cleanup()
Victor Stinnerffb49402017-07-25 01:55:54 +02004908 test.support.gc_collect()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004909
4910 remote_globs['setUpModule'] = setUpModule
4911 remote_globs['tearDownModule'] = tearDownModule