blob: 78ec53beb0f01d5ba62ac63626dad9a35cc1e0db [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Julien Palard5d236ca2018-11-04 23:40:32 +01006import unittest.mock
Benjamin Petersone711caf2008-06-11 16:44:04 +00007import queue as pyqueue
8import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00009import io
Antoine Pitroude911b22011-12-21 11:03:24 +010010import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
Pierre Glaserb1dfcad2019-05-13 21:15:32 +020020import subprocess
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010022import operator
Davin Pottse895de32019-02-23 22:08:16 -060023import pickle
Antoine Pitrou89889452017-03-24 13:52:11 +010024import weakref
Pablo Galindoec74d182018-09-04 09:53:54 +010025import warnings
R. David Murraya21e4ca2009-03-31 23:16:50 +000026import test.support
Berker Peksag076dbd02015-05-06 07:01:52 +030027import test.support.script_helper
Victor Stinnerb9b69002017-09-14 14:40:56 -070028from test import support
Benjamin Petersone711caf2008-06-11 16:44:04 +000029
Benjamin Petersone5384b02008-10-04 22:00:42 +000030
R. David Murraya21e4ca2009-03-31 23:16:50 +000031# Skip tests if _multiprocessing wasn't built.
32_multiprocessing = test.support.import_module('_multiprocessing')
33# Skip tests if sem_open implementation is broken.
34test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000035import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000036
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.connection
Victor Stinnerd7e64d92017-07-25 00:33:56 +020038import multiprocessing.dummy
Benjamin Petersone711caf2008-06-11 16:44:04 +000039import multiprocessing.heap
Victor Stinnerd7e64d92017-07-25 00:33:56 +020040import multiprocessing.managers
Benjamin Petersone711caf2008-06-11 16:44:04 +000041import multiprocessing.pool
Victor Stinnerd7e64d92017-07-25 00:33:56 +020042import multiprocessing.queues
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
Charles-François Natalibc8f0822011-09-20 20:36:51 +020044from multiprocessing import util
45
46try:
47 from multiprocessing import reduction
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010048 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
Charles-François Natalibc8f0822011-09-20 20:36:51 +020049except ImportError:
50 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000051
Brian Curtinafa88b52010-10-07 01:12:19 +000052try:
53 from multiprocessing.sharedctypes import Value, copy
54 HAS_SHAREDCTYPES = True
55except ImportError:
56 HAS_SHAREDCTYPES = False
57
Antoine Pitroubcb39d42011-08-23 19:46:22 +020058try:
Davin Pottse895de32019-02-23 22:08:16 -060059 from multiprocessing import shared_memory
60 HAS_SHMEM = True
61except ImportError:
62 HAS_SHMEM = False
63
64try:
Antoine Pitroubcb39d42011-08-23 19:46:22 +020065 import msvcrt
66except ImportError:
67 msvcrt = None
68
Benjamin Petersone711caf2008-06-11 16:44:04 +000069#
70#
71#
72
Victor Stinner11f08072017-09-15 06:55:31 -070073# Timeout to wait until a process completes
Pablo Galindo40b69072019-03-22 07:36:56 +000074TIMEOUT = 60.0 # seconds
Victor Stinner11f08072017-09-15 06:55:31 -070075
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000076def latin(s):
77 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000078
Victor Stinnerd7e64d92017-07-25 00:33:56 +020079
80def close_queue(queue):
81 if isinstance(queue, multiprocessing.queues.Queue):
82 queue.close()
83 queue.join_thread()
84
85
Victor Stinner11f08072017-09-15 06:55:31 -070086def join_process(process):
Victor Stinnerb9b69002017-09-14 14:40:56 -070087 # Since multiprocessing.Process has the same API than threading.Thread
88 # (join() and is_alive(), the support function can be reused
Victor Stinner11f08072017-09-15 06:55:31 -070089 support.join_thread(process, timeout=TIMEOUT)
Victor Stinnerb9b69002017-09-14 14:40:56 -070090
91
Pierre Glaserf22cc692019-05-10 22:59:08 +020092if os.name == "posix":
93 from multiprocessing import resource_tracker
94
95 def _resource_unlink(name, rtype):
96 resource_tracker._CLEANUP_FUNCS[rtype](name)
97
98
Benjamin Petersone711caf2008-06-11 16:44:04 +000099#
100# Constants
101#
102
103LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +0000104#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +0000105
106DELTA = 0.1
107CHECK_TIMINGS = False # making true makes tests take a lot longer
108 # and can sometimes cause some non-serious
109 # failures because some calls block a bit
110 # longer than expected
111if CHECK_TIMINGS:
112 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
113else:
114 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
115
116HAVE_GETVALUE = not getattr(_multiprocessing,
117 'HAVE_BROKEN_SEM_GETVALUE', False)
118
Victor Stinner937ee9e2018-06-26 02:11:06 +0200119WIN32 = (sys.platform == "win32")
120
Richard Oudkerk59d54042012-05-10 16:11:12 +0100121from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200122
Richard Oudkerk59d54042012-05-10 16:11:12 +0100123def wait_for_handle(handle, timeout):
124 if timeout is not None and timeout < 0.0:
125 timeout = None
126 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +0000127
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200128try:
129 MAXFD = os.sysconf("SC_OPEN_MAX")
130except:
131 MAXFD = 256
132
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100133# To speed up tests when using the forkserver, we can preload these:
134PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
135
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000137# Some tests require ctypes
138#
139
140try:
Gareth Rees3913bad2017-07-21 11:35:33 +0100141 from ctypes import Structure, c_int, c_double, c_longlong
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000142except ImportError:
143 Structure = object
Antoine Pitrouff92ff52017-07-21 13:24:05 +0200144 c_int = c_double = c_longlong = None
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000145
Charles-François Natali221ef672011-11-22 18:55:22 +0100146
147def check_enough_semaphores():
148 """Check that the system supports enough semaphores to run the test."""
149 # minimum number of semaphores available according to POSIX
150 nsems_min = 256
151 try:
152 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
153 except (AttributeError, ValueError):
154 # sysconf not available or setting not available
155 return
156 if nsems == -1 or nsems >= nsems_min:
157 return
158 raise unittest.SkipTest("The OS doesn't support enough semaphores "
159 "to run the test (required: %d)." % nsems_min)
160
161
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000162#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163# Creates a wrapper for a function which records the time it takes to finish
164#
165
166class TimingWrapper(object):
167
168 def __init__(self, func):
169 self.func = func
170 self.elapsed = None
171
172 def __call__(self, *args, **kwds):
Victor Stinner2cf4c202018-12-17 09:36:36 +0100173 t = time.monotonic()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 try:
175 return self.func(*args, **kwds)
176 finally:
Victor Stinner2cf4c202018-12-17 09:36:36 +0100177 self.elapsed = time.monotonic() - t
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178
179#
180# Base class for test cases
181#
182
183class BaseTestCase(object):
184
185 ALLOWED_TYPES = ('processes', 'manager', 'threads')
186
187 def assertTimingAlmostEqual(self, a, b):
188 if CHECK_TIMINGS:
189 self.assertAlmostEqual(a, b, 1)
190
191 def assertReturnsIfImplemented(self, value, func, *args):
192 try:
193 res = func(*args)
194 except NotImplementedError:
195 pass
196 else:
197 return self.assertEqual(value, res)
198
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000199 # For the sanity of Windows users, rather than crashing or freezing in
200 # multiple ways.
201 def __reduce__(self, *args):
202 raise NotImplementedError("shouldn't try to pickle a test case")
203
204 __reduce_ex__ = __reduce__
205
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206#
207# Return the value of a semaphore
208#
209
210def get_value(self):
211 try:
212 return self.get_value()
213 except AttributeError:
214 try:
215 return self._Semaphore__value
216 except AttributeError:
217 try:
218 return self._value
219 except AttributeError:
220 raise NotImplementedError
221
222#
223# Testcases
224#
225
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200226class DummyCallable:
227 def __call__(self, q, c):
228 assert isinstance(c, DummyCallable)
229 q.put(5)
230
231
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232class _TestProcess(BaseTestCase):
233
234 ALLOWED_TYPES = ('processes', 'threads')
235
236 def test_current(self):
237 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600238 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242
243 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000244 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000245 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000246 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000247 self.assertEqual(current.ident, os.getpid())
248 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000250 def test_daemon_argument(self):
251 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600252 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000253
254 # By default uses the current process's daemon flag.
255 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000256 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000257 proc1 = self.Process(target=self._test, daemon=True)
258 self.assertTrue(proc1.daemon)
259 proc2 = self.Process(target=self._test, daemon=False)
260 self.assertFalse(proc2.daemon)
261
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000262 @classmethod
263 def _test(cls, q, *args, **kwds):
264 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265 q.put(args)
266 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000267 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000268 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000269 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270 q.put(current.pid)
271
272 def test_process(self):
273 q = self.Queue(1)
274 e = self.Event()
275 args = (q, 1, 2)
276 kwargs = {'hello':23, 'bye':2.54}
277 name = 'SomeProcess'
278 p = self.Process(
279 target=self._test, args=args, kwargs=kwargs, name=name
280 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000281 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282 current = self.current_process()
283
284 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000285 self.assertEqual(p.authkey, current.authkey)
286 self.assertEqual(p.is_alive(), False)
287 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000288 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000289 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000290 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291
292 p.start()
293
Ezio Melottib3aedd42010-11-20 19:04:17 +0000294 self.assertEqual(p.exitcode, None)
295 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000296 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297
Ezio Melottib3aedd42010-11-20 19:04:17 +0000298 self.assertEqual(q.get(), args[1:])
299 self.assertEqual(q.get(), kwargs)
300 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000302 self.assertEqual(q.get(), current.authkey)
303 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
305 p.join()
306
Ezio Melottib3aedd42010-11-20 19:04:17 +0000307 self.assertEqual(p.exitcode, 0)
308 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000309 self.assertNotIn(p, self.active_children())
Victor Stinnerb4c52962017-07-25 02:40:55 +0200310 close_queue(q)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000311
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000312 @classmethod
Vitor Pereiraba75af72017-07-18 16:34:23 +0100313 def _sleep_some(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100314 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000315
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200316 @classmethod
317 def _test_sleep(cls, delay):
318 time.sleep(delay)
319
Vitor Pereiraba75af72017-07-18 16:34:23 +0100320 def _kill_process(self, meth):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600322 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323
Vitor Pereiraba75af72017-07-18 16:34:23 +0100324 p = self.Process(target=self._sleep_some)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000325 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000326 p.start()
327
328 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000329 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000330 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331
Richard Oudkerk59d54042012-05-10 16:11:12 +0100332 join = TimingWrapper(p.join)
333
334 self.assertEqual(join(0), None)
335 self.assertTimingAlmostEqual(join.elapsed, 0.0)
336 self.assertEqual(p.is_alive(), True)
337
338 self.assertEqual(join(-1), None)
339 self.assertTimingAlmostEqual(join.elapsed, 0.0)
340 self.assertEqual(p.is_alive(), True)
341
Richard Oudkerk26f92682013-10-17 13:56:18 +0100342 # XXX maybe terminating too soon causes the problems on Gentoo...
343 time.sleep(1)
344
Vitor Pereiraba75af72017-07-18 16:34:23 +0100345 meth(p)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000346
Richard Oudkerk4f350792013-10-13 00:49:27 +0100347 if hasattr(signal, 'alarm'):
Richard Oudkerkd44500a2013-10-17 10:38:37 +0100348 # On the Gentoo buildbot waitpid() often seems to block forever.
Richard Oudkerk26f92682013-10-17 13:56:18 +0100349 # We use alarm() to interrupt it if it blocks for too long.
Richard Oudkerk4f350792013-10-13 00:49:27 +0100350 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100351 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100352 old_handler = signal.signal(signal.SIGALRM, handler)
353 try:
354 signal.alarm(10)
355 self.assertEqual(join(), None)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100356 finally:
Richard Oudkerk1e2f67c2013-10-17 14:24:06 +0100357 signal.alarm(0)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100358 signal.signal(signal.SIGALRM, old_handler)
359 else:
360 self.assertEqual(join(), None)
361
Benjamin Petersone711caf2008-06-11 16:44:04 +0000362 self.assertTimingAlmostEqual(join.elapsed, 0.0)
363
364 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000365 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000366
367 p.join()
368
Vitor Pereiraba75af72017-07-18 16:34:23 +0100369 return p.exitcode
370
371 def test_terminate(self):
372 exitcode = self._kill_process(multiprocessing.Process.terminate)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200373 if os.name != 'nt':
Vitor Pereiraba75af72017-07-18 16:34:23 +0100374 self.assertEqual(exitcode, -signal.SIGTERM)
375
376 def test_kill(self):
377 exitcode = self._kill_process(multiprocessing.Process.kill)
378 if os.name != 'nt':
379 self.assertEqual(exitcode, -signal.SIGKILL)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000380
381 def test_cpu_count(self):
382 try:
383 cpus = multiprocessing.cpu_count()
384 except NotImplementedError:
385 cpus = 1
386 self.assertTrue(type(cpus) is int)
387 self.assertTrue(cpus >= 1)
388
389 def test_active_children(self):
390 self.assertEqual(type(self.active_children()), list)
391
392 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000393 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000394
Jesus Cea94f964f2011-09-09 20:26:57 +0200395 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000396 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000397 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000398
399 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000400 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000401
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000402 @classmethod
403 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000404 wconn.send(id)
405 if len(id) < 2:
406 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000407 p = cls.Process(
408 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000409 )
410 p.start()
411 p.join()
412
413 def test_recursion(self):
414 rconn, wconn = self.Pipe(duplex=False)
415 self._test_recursion(wconn, [])
416
417 time.sleep(DELTA)
418 result = []
419 while rconn.poll():
420 result.append(rconn.recv())
421
422 expected = [
423 [],
424 [0],
425 [0, 0],
426 [0, 1],
427 [1],
428 [1, 0],
429 [1, 1]
430 ]
431 self.assertEqual(result, expected)
432
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200433 @classmethod
434 def _test_sentinel(cls, event):
435 event.wait(10.0)
436
437 def test_sentinel(self):
438 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600439 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200440 event = self.Event()
441 p = self.Process(target=self._test_sentinel, args=(event,))
442 with self.assertRaises(ValueError):
443 p.sentinel
444 p.start()
445 self.addCleanup(p.join)
446 sentinel = p.sentinel
447 self.assertIsInstance(sentinel, int)
448 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
449 event.set()
450 p.join()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100451 self.assertTrue(wait_for_handle(sentinel, timeout=1))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200452
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200453 @classmethod
454 def _test_close(cls, rc=0, q=None):
455 if q is not None:
456 q.get()
457 sys.exit(rc)
458
459 def test_close(self):
460 if self.TYPE == "threads":
461 self.skipTest('test not appropriate for {}'.format(self.TYPE))
462 q = self.Queue()
463 p = self.Process(target=self._test_close, kwargs={'q': q})
464 p.daemon = True
465 p.start()
466 self.assertEqual(p.is_alive(), True)
467 # Child is still alive, cannot close
468 with self.assertRaises(ValueError):
469 p.close()
470
471 q.put(None)
472 p.join()
473 self.assertEqual(p.is_alive(), False)
474 self.assertEqual(p.exitcode, 0)
475 p.close()
476 with self.assertRaises(ValueError):
477 p.is_alive()
478 with self.assertRaises(ValueError):
479 p.join()
480 with self.assertRaises(ValueError):
481 p.terminate()
482 p.close()
483
484 wr = weakref.ref(p)
485 del p
486 gc.collect()
487 self.assertIs(wr(), None)
488
Victor Stinnerb4c52962017-07-25 02:40:55 +0200489 close_queue(q)
490
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200491 def test_many_processes(self):
492 if self.TYPE == 'threads':
493 self.skipTest('test not appropriate for {}'.format(self.TYPE))
494
495 sm = multiprocessing.get_start_method()
496 N = 5 if sm == 'spawn' else 100
497
498 # Try to overwhelm the forkserver loop with events
499 procs = [self.Process(target=self._test_sleep, args=(0.01,))
500 for i in range(N)]
501 for p in procs:
502 p.start()
503 for p in procs:
Victor Stinner11f08072017-09-15 06:55:31 -0700504 join_process(p)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200505 for p in procs:
506 self.assertEqual(p.exitcode, 0)
507
Vitor Pereiraba75af72017-07-18 16:34:23 +0100508 procs = [self.Process(target=self._sleep_some)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200509 for i in range(N)]
510 for p in procs:
511 p.start()
512 time.sleep(0.001) # let the children start...
513 for p in procs:
514 p.terminate()
515 for p in procs:
Victor Stinner11f08072017-09-15 06:55:31 -0700516 join_process(p)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200517 if os.name != 'nt':
Victor Stinnere6cfdef2017-10-02 08:27:34 -0700518 exitcodes = [-signal.SIGTERM]
519 if sys.platform == 'darwin':
520 # bpo-31510: On macOS, killing a freshly started process with
521 # SIGTERM sometimes kills the process with SIGKILL.
522 exitcodes.append(-signal.SIGKILL)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200523 for p in procs:
Victor Stinnere6cfdef2017-10-02 08:27:34 -0700524 self.assertIn(p.exitcode, exitcodes)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200525
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200526 def test_lose_target_ref(self):
527 c = DummyCallable()
528 wr = weakref.ref(c)
529 q = self.Queue()
530 p = self.Process(target=c, args=(q, c))
531 del c
532 p.start()
533 p.join()
534 self.assertIs(wr(), None)
535 self.assertEqual(q.get(), 5)
Victor Stinnerb4c52962017-07-25 02:40:55 +0200536 close_queue(q)
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200537
Antoine Pitrou896145d2017-07-22 13:22:54 +0200538 @classmethod
539 def _test_child_fd_inflation(self, evt, q):
540 q.put(test.support.fd_count())
541 evt.wait()
542
543 def test_child_fd_inflation(self):
544 # Number of fds in child processes should not grow with the
545 # number of running children.
546 if self.TYPE == 'threads':
547 self.skipTest('test not appropriate for {}'.format(self.TYPE))
548
549 sm = multiprocessing.get_start_method()
550 if sm == 'fork':
551 # The fork method by design inherits all fds from the parent,
552 # trying to go against it is a lost battle
553 self.skipTest('test not appropriate for {}'.format(sm))
554
555 N = 5
556 evt = self.Event()
557 q = self.Queue()
558
559 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
560 for i in range(N)]
561 for p in procs:
562 p.start()
563
564 try:
565 fd_counts = [q.get() for i in range(N)]
566 self.assertEqual(len(set(fd_counts)), 1, fd_counts)
567
568 finally:
569 evt.set()
570 for p in procs:
571 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200572 close_queue(q)
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200573
Antoine Pitrouee84a602017-08-16 20:53:28 +0200574 @classmethod
575 def _test_wait_for_threads(self, evt):
576 def func1():
577 time.sleep(0.5)
578 evt.set()
579
580 def func2():
581 time.sleep(20)
582 evt.clear()
583
584 threading.Thread(target=func1).start()
585 threading.Thread(target=func2, daemon=True).start()
586
587 def test_wait_for_threads(self):
588 # A child process should wait for non-daemonic threads to end
589 # before exiting
590 if self.TYPE == 'threads':
591 self.skipTest('test not appropriate for {}'.format(self.TYPE))
592
593 evt = self.Event()
594 proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
595 proc.start()
596 proc.join()
597 self.assertTrue(evt.is_set())
598
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200599 @classmethod
Antoine Pitroue756f662018-03-11 19:21:38 +0100600 def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
601 for stream_name, action in break_std_streams.items():
602 if action == 'close':
603 stream = io.StringIO()
604 stream.close()
605 else:
606 assert action == 'remove'
607 stream = None
608 setattr(sys, stream_name, None)
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200609 evt.set()
610
Antoine Pitroue756f662018-03-11 19:21:38 +0100611 def test_error_on_stdio_flush_1(self):
612 # Check that Process works with broken standard streams
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200613 streams = [io.StringIO(), None]
614 streams[0].close()
615 for stream_name in ('stdout', 'stderr'):
616 for stream in streams:
617 old_stream = getattr(sys, stream_name)
618 setattr(sys, stream_name, stream)
619 try:
620 evt = self.Event()
621 proc = self.Process(target=self._test_error_on_stdio_flush,
622 args=(evt,))
623 proc.start()
624 proc.join()
625 self.assertTrue(evt.is_set())
Antoine Pitroue756f662018-03-11 19:21:38 +0100626 self.assertEqual(proc.exitcode, 0)
627 finally:
628 setattr(sys, stream_name, old_stream)
629
630 def test_error_on_stdio_flush_2(self):
631 # Same as test_error_on_stdio_flush_1(), but standard streams are
632 # broken by the child process
633 for stream_name in ('stdout', 'stderr'):
634 for action in ('close', 'remove'):
635 old_stream = getattr(sys, stream_name)
636 try:
637 evt = self.Event()
638 proc = self.Process(target=self._test_error_on_stdio_flush,
639 args=(evt, {stream_name: action}))
640 proc.start()
641 proc.join()
642 self.assertTrue(evt.is_set())
643 self.assertEqual(proc.exitcode, 0)
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200644 finally:
645 setattr(sys, stream_name, old_stream)
646
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100647 @classmethod
648 def _sleep_and_set_event(self, evt, delay=0.0):
649 time.sleep(delay)
650 evt.set()
651
652 def check_forkserver_death(self, signum):
653 # bpo-31308: if the forkserver process has died, we should still
654 # be able to create and run new Process instances (the forkserver
655 # is implicitly restarted).
656 if self.TYPE == 'threads':
657 self.skipTest('test not appropriate for {}'.format(self.TYPE))
658 sm = multiprocessing.get_start_method()
659 if sm != 'forkserver':
660 # The fork method by design inherits all fds from the parent,
661 # trying to go against it is a lost battle
662 self.skipTest('test not appropriate for {}'.format(sm))
663
664 from multiprocessing.forkserver import _forkserver
665 _forkserver.ensure_running()
666
Victor Stinner07888e12018-07-04 11:49:41 +0200667 # First process sleeps 500 ms
668 delay = 0.5
669
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100670 evt = self.Event()
Victor Stinner07888e12018-07-04 11:49:41 +0200671 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100672 proc.start()
673
674 pid = _forkserver._forkserver_pid
675 os.kill(pid, signum)
Victor Stinner07888e12018-07-04 11:49:41 +0200676 # give time to the fork server to die and time to proc to complete
677 time.sleep(delay * 2.0)
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100678
679 evt2 = self.Event()
680 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
681 proc2.start()
682 proc2.join()
683 self.assertTrue(evt2.is_set())
684 self.assertEqual(proc2.exitcode, 0)
685
686 proc.join()
687 self.assertTrue(evt.is_set())
688 self.assertIn(proc.exitcode, (0, 255))
689
690 def test_forkserver_sigint(self):
691 # Catchable signal
692 self.check_forkserver_death(signal.SIGINT)
693
694 def test_forkserver_sigkill(self):
695 # Uncatchable signal
696 if os.name != 'nt':
697 self.check_forkserver_death(signal.SIGKILL)
698
Antoine Pitrouee84a602017-08-16 20:53:28 +0200699
Benjamin Petersone711caf2008-06-11 16:44:04 +0000700#
701#
702#
703
704class _UpperCaser(multiprocessing.Process):
705
706 def __init__(self):
707 multiprocessing.Process.__init__(self)
708 self.child_conn, self.parent_conn = multiprocessing.Pipe()
709
710 def run(self):
711 self.parent_conn.close()
712 for s in iter(self.child_conn.recv, None):
713 self.child_conn.send(s.upper())
714 self.child_conn.close()
715
716 def submit(self, s):
717 assert type(s) is str
718 self.parent_conn.send(s)
719 return self.parent_conn.recv()
720
721 def stop(self):
722 self.parent_conn.send(None)
723 self.parent_conn.close()
724 self.child_conn.close()
725
726class _TestSubclassingProcess(BaseTestCase):
727
728 ALLOWED_TYPES = ('processes',)
729
730 def test_subclassing(self):
731 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200732 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000733 uppercaser.start()
734 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
735 self.assertEqual(uppercaser.submit('world'), 'WORLD')
736 uppercaser.stop()
737 uppercaser.join()
738
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100739 def test_stderr_flush(self):
740 # sys.stderr is flushed at process shutdown (issue #13812)
741 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600742 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100743
744 testfn = test.support.TESTFN
745 self.addCleanup(test.support.unlink, testfn)
746 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
747 proc.start()
748 proc.join()
749 with open(testfn, 'r') as f:
750 err = f.read()
751 # The whole traceback was printed
752 self.assertIn("ZeroDivisionError", err)
753 self.assertIn("test_multiprocessing.py", err)
754 self.assertIn("1/0 # MARKER", err)
755
756 @classmethod
757 def _test_stderr_flush(cls, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100758 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
759 sys.stderr = open(fd, 'w', closefd=False)
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100760 1/0 # MARKER
761
762
Richard Oudkerk29471de2012-06-06 19:04:57 +0100763 @classmethod
764 def _test_sys_exit(cls, reason, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100765 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
766 sys.stderr = open(fd, 'w', closefd=False)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100767 sys.exit(reason)
768
769 def test_sys_exit(self):
770 # See Issue 13854
771 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600772 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk29471de2012-06-06 19:04:57 +0100773
774 testfn = test.support.TESTFN
775 self.addCleanup(test.support.unlink, testfn)
776
Victor Stinnera6d865c2016-03-25 09:29:50 +0100777 for reason in (
778 [1, 2, 3],
779 'ignore this',
780 ):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100781 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
782 p.daemon = True
783 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -0700784 join_process(p)
Victor Stinnera6d865c2016-03-25 09:29:50 +0100785 self.assertEqual(p.exitcode, 1)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100786
787 with open(testfn, 'r') as f:
Victor Stinnera6d865c2016-03-25 09:29:50 +0100788 content = f.read()
789 self.assertEqual(content.rstrip(), str(reason))
790
791 os.unlink(testfn)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100792
793 for reason in (True, False, 8):
794 p = self.Process(target=sys.exit, args=(reason,))
795 p.daemon = True
796 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -0700797 join_process(p)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100798 self.assertEqual(p.exitcode, reason)
799
Benjamin Petersone711caf2008-06-11 16:44:04 +0000800#
801#
802#
803
804def queue_empty(q):
805 if hasattr(q, 'empty'):
806 return q.empty()
807 else:
808 return q.qsize() == 0
809
810def queue_full(q, maxsize):
811 if hasattr(q, 'full'):
812 return q.full()
813 else:
814 return q.qsize() == maxsize
815
816
817class _TestQueue(BaseTestCase):
818
819
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000820 @classmethod
821 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000822 child_can_start.wait()
823 for i in range(6):
824 queue.get()
825 parent_can_continue.set()
826
827 def test_put(self):
828 MAXSIZE = 6
829 queue = self.Queue(maxsize=MAXSIZE)
830 child_can_start = self.Event()
831 parent_can_continue = self.Event()
832
833 proc = self.Process(
834 target=self._test_put,
835 args=(queue, child_can_start, parent_can_continue)
836 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000837 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000838 proc.start()
839
840 self.assertEqual(queue_empty(queue), True)
841 self.assertEqual(queue_full(queue, MAXSIZE), False)
842
843 queue.put(1)
844 queue.put(2, True)
845 queue.put(3, True, None)
846 queue.put(4, False)
847 queue.put(5, False, None)
848 queue.put_nowait(6)
849
850 # the values may be in buffer but not yet in pipe so sleep a bit
851 time.sleep(DELTA)
852
853 self.assertEqual(queue_empty(queue), False)
854 self.assertEqual(queue_full(queue, MAXSIZE), True)
855
856 put = TimingWrapper(queue.put)
857 put_nowait = TimingWrapper(queue.put_nowait)
858
859 self.assertRaises(pyqueue.Full, put, 7, False)
860 self.assertTimingAlmostEqual(put.elapsed, 0)
861
862 self.assertRaises(pyqueue.Full, put, 7, False, None)
863 self.assertTimingAlmostEqual(put.elapsed, 0)
864
865 self.assertRaises(pyqueue.Full, put_nowait, 7)
866 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
867
868 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
869 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
870
871 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
872 self.assertTimingAlmostEqual(put.elapsed, 0)
873
874 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
875 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
876
877 child_can_start.set()
878 parent_can_continue.wait()
879
880 self.assertEqual(queue_empty(queue), True)
881 self.assertEqual(queue_full(queue, MAXSIZE), False)
882
883 proc.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200884 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000886 @classmethod
887 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000889 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000890 queue.put(2)
891 queue.put(3)
892 queue.put(4)
893 queue.put(5)
894 parent_can_continue.set()
895
896 def test_get(self):
897 queue = self.Queue()
898 child_can_start = self.Event()
899 parent_can_continue = self.Event()
900
901 proc = self.Process(
902 target=self._test_get,
903 args=(queue, child_can_start, parent_can_continue)
904 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000905 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000906 proc.start()
907
908 self.assertEqual(queue_empty(queue), True)
909
910 child_can_start.set()
911 parent_can_continue.wait()
912
913 time.sleep(DELTA)
914 self.assertEqual(queue_empty(queue), False)
915
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000916 # Hangs unexpectedly, remove for now
917 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000918 self.assertEqual(queue.get(True, None), 2)
919 self.assertEqual(queue.get(True), 3)
920 self.assertEqual(queue.get(timeout=1), 4)
921 self.assertEqual(queue.get_nowait(), 5)
922
923 self.assertEqual(queue_empty(queue), True)
924
925 get = TimingWrapper(queue.get)
926 get_nowait = TimingWrapper(queue.get_nowait)
927
928 self.assertRaises(pyqueue.Empty, get, False)
929 self.assertTimingAlmostEqual(get.elapsed, 0)
930
931 self.assertRaises(pyqueue.Empty, get, False, None)
932 self.assertTimingAlmostEqual(get.elapsed, 0)
933
934 self.assertRaises(pyqueue.Empty, get_nowait)
935 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
936
937 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
938 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
939
940 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
941 self.assertTimingAlmostEqual(get.elapsed, 0)
942
943 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
944 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
945
946 proc.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200947 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000948
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000949 @classmethod
950 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000951 for i in range(10, 20):
952 queue.put(i)
953 # note that at this point the items may only be buffered, so the
954 # process cannot shutdown until the feeder thread has finished
955 # pushing items onto the pipe.
956
957 def test_fork(self):
958 # Old versions of Queue would fail to create a new feeder
959 # thread for a forked process if the original process had its
960 # own feeder thread. This test checks that this no longer
961 # happens.
962
963 queue = self.Queue()
964
965 # put items on queue so that main process starts a feeder thread
966 for i in range(10):
967 queue.put(i)
968
969 # wait to make sure thread starts before we fork a new process
970 time.sleep(DELTA)
971
972 # fork process
973 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200974 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000975 p.start()
976
977 # check that all expected items are in the queue
978 for i in range(20):
979 self.assertEqual(queue.get(), i)
980 self.assertRaises(pyqueue.Empty, queue.get, False)
981
982 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200983 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000984
985 def test_qsize(self):
986 q = self.Queue()
987 try:
988 self.assertEqual(q.qsize(), 0)
989 except NotImplementedError:
Zachary Ware9fe6d862013-12-08 00:20:35 -0600990 self.skipTest('qsize method not implemented')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000991 q.put(1)
992 self.assertEqual(q.qsize(), 1)
993 q.put(5)
994 self.assertEqual(q.qsize(), 2)
995 q.get()
996 self.assertEqual(q.qsize(), 1)
997 q.get()
998 self.assertEqual(q.qsize(), 0)
Victor Stinnerd7e64d92017-07-25 00:33:56 +0200999 close_queue(q)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001000
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001001 @classmethod
1002 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001003 for obj in iter(q.get, None):
1004 time.sleep(DELTA)
1005 q.task_done()
1006
1007 def test_task_done(self):
1008 queue = self.JoinableQueue()
1009
Benjamin Petersone711caf2008-06-11 16:44:04 +00001010 workers = [self.Process(target=self._test_task_done, args=(queue,))
1011 for i in range(4)]
1012
1013 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +02001014 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001015 p.start()
1016
1017 for i in range(10):
1018 queue.put(i)
1019
1020 queue.join()
1021
1022 for p in workers:
1023 queue.put(None)
1024
1025 for p in workers:
1026 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +02001027 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001028
Serhiy Storchakaf8904e92015-03-06 23:32:54 +02001029 def test_no_import_lock_contention(self):
1030 with test.support.temp_cwd():
1031 module_name = 'imported_by_an_imported_module'
1032 with open(module_name + '.py', 'w') as f:
1033 f.write("""if 1:
1034 import multiprocessing
1035
1036 q = multiprocessing.Queue()
1037 q.put('knock knock')
1038 q.get(timeout=3)
1039 q.close()
1040 del q
1041 """)
1042
1043 with test.support.DirsOnSysPath(os.getcwd()):
1044 try:
1045 __import__(module_name)
1046 except pyqueue.Empty:
1047 self.fail("Probable regression on import lock contention;"
1048 " see Issue #22853")
1049
Giampaolo Rodola'30830712013-04-17 13:12:27 +02001050 def test_timeout(self):
1051 q = multiprocessing.Queue()
Victor Stinner2cf4c202018-12-17 09:36:36 +01001052 start = time.monotonic()
Victor Stinneraad7b2e2015-02-05 14:25:05 +01001053 self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
Victor Stinner2cf4c202018-12-17 09:36:36 +01001054 delta = time.monotonic() - start
Victor Stinner5640d032018-08-03 02:09:00 +02001055 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1056 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1057 # failed because the delta was only 135.8 ms.
1058 self.assertGreaterEqual(delta, 0.100)
Victor Stinnerb4c52962017-07-25 02:40:55 +02001059 close_queue(q)
Giampaolo Rodola'30830712013-04-17 13:12:27 +02001060
grzgrzgrz3bc50f032017-05-25 16:22:57 +02001061 def test_queue_feeder_donot_stop_onexc(self):
1062 # bpo-30414: verify feeder handles exceptions correctly
1063 if self.TYPE != 'processes':
1064 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1065
1066 class NotSerializable(object):
1067 def __reduce__(self):
1068 raise AttributeError
1069 with test.support.captured_stderr():
1070 q = self.Queue()
1071 q.put(NotSerializable())
1072 q.put(True)
Victor Stinner8f6eeaf2017-06-13 23:48:47 +02001073 # bpo-30595: use a timeout of 1 second for slow buildbots
1074 self.assertTrue(q.get(timeout=1.0))
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001075 close_queue(q)
grzgrzgrz3bc50f032017-05-25 16:22:57 +02001076
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001077 with test.support.captured_stderr():
1078 # bpo-33078: verify that the queue size is correctly handled
1079 # on errors.
1080 q = self.Queue(maxsize=1)
1081 q.put(NotSerializable())
1082 q.put(True)
Thomas Moreaudec1c772018-03-21 18:56:27 +01001083 try:
1084 self.assertEqual(q.qsize(), 1)
1085 except NotImplementedError:
1086 # qsize is not available on all platform as it
1087 # relies on sem_getvalue
1088 pass
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001089 # bpo-30595: use a timeout of 1 second for slow buildbots
1090 self.assertTrue(q.get(timeout=1.0))
1091 # Check that the size of the queue is correct
Thomas Moreaudec1c772018-03-21 18:56:27 +01001092 self.assertTrue(q.empty())
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001093 close_queue(q)
1094
Thomas Moreau94459fd2018-01-05 11:15:54 +01001095 def test_queue_feeder_on_queue_feeder_error(self):
1096 # bpo-30006: verify feeder handles exceptions using the
1097 # _on_queue_feeder_error hook.
1098 if self.TYPE != 'processes':
1099 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1100
1101 class NotSerializable(object):
1102 """Mock unserializable object"""
1103 def __init__(self):
1104 self.reduce_was_called = False
1105 self.on_queue_feeder_error_was_called = False
1106
1107 def __reduce__(self):
1108 self.reduce_was_called = True
1109 raise AttributeError
1110
1111 class SafeQueue(multiprocessing.queues.Queue):
1112 """Queue with overloaded _on_queue_feeder_error hook"""
1113 @staticmethod
1114 def _on_queue_feeder_error(e, obj):
1115 if (isinstance(e, AttributeError) and
1116 isinstance(obj, NotSerializable)):
1117 obj.on_queue_feeder_error_was_called = True
1118
1119 not_serializable_obj = NotSerializable()
1120 # The captured_stderr reduces the noise in the test report
1121 with test.support.captured_stderr():
1122 q = SafeQueue(ctx=multiprocessing.get_context())
1123 q.put(not_serializable_obj)
1124
Ville Skyttä61f82e02018-04-20 23:08:45 +03001125 # Verify that q is still functioning correctly
Thomas Moreau94459fd2018-01-05 11:15:54 +01001126 q.put(True)
1127 self.assertTrue(q.get(timeout=1.0))
1128
1129 # Assert that the serialization and the hook have been called correctly
1130 self.assertTrue(not_serializable_obj.reduce_was_called)
1131 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
Zackery Spytz04617042018-10-13 03:26:09 -06001132
1133 def test_closed_queue_put_get_exceptions(self):
1134 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1135 q.close()
1136 with self.assertRaisesRegex(ValueError, 'is closed'):
1137 q.put('foo')
1138 with self.assertRaisesRegex(ValueError, 'is closed'):
1139 q.get()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001140#
1141#
1142#
1143
1144class _TestLock(BaseTestCase):
1145
1146 def test_lock(self):
1147 lock = self.Lock()
1148 self.assertEqual(lock.acquire(), True)
1149 self.assertEqual(lock.acquire(False), False)
1150 self.assertEqual(lock.release(), None)
1151 self.assertRaises((ValueError, threading.ThreadError), lock.release)
1152
1153 def test_rlock(self):
1154 lock = self.RLock()
1155 self.assertEqual(lock.acquire(), True)
1156 self.assertEqual(lock.acquire(), True)
1157 self.assertEqual(lock.acquire(), True)
1158 self.assertEqual(lock.release(), None)
1159 self.assertEqual(lock.release(), None)
1160 self.assertEqual(lock.release(), None)
1161 self.assertRaises((AssertionError, RuntimeError), lock.release)
1162
Jesse Nollerf8d00852009-03-31 03:25:07 +00001163 def test_lock_context(self):
1164 with self.Lock():
1165 pass
1166
Benjamin Petersone711caf2008-06-11 16:44:04 +00001167
1168class _TestSemaphore(BaseTestCase):
1169
1170 def _test_semaphore(self, sem):
1171 self.assertReturnsIfImplemented(2, get_value, sem)
1172 self.assertEqual(sem.acquire(), True)
1173 self.assertReturnsIfImplemented(1, get_value, sem)
1174 self.assertEqual(sem.acquire(), True)
1175 self.assertReturnsIfImplemented(0, get_value, sem)
1176 self.assertEqual(sem.acquire(False), False)
1177 self.assertReturnsIfImplemented(0, get_value, sem)
1178 self.assertEqual(sem.release(), None)
1179 self.assertReturnsIfImplemented(1, get_value, sem)
1180 self.assertEqual(sem.release(), None)
1181 self.assertReturnsIfImplemented(2, get_value, sem)
1182
1183 def test_semaphore(self):
1184 sem = self.Semaphore(2)
1185 self._test_semaphore(sem)
1186 self.assertEqual(sem.release(), None)
1187 self.assertReturnsIfImplemented(3, get_value, sem)
1188 self.assertEqual(sem.release(), None)
1189 self.assertReturnsIfImplemented(4, get_value, sem)
1190
1191 def test_bounded_semaphore(self):
1192 sem = self.BoundedSemaphore(2)
1193 self._test_semaphore(sem)
1194 # Currently fails on OS/X
1195 #if HAVE_GETVALUE:
1196 # self.assertRaises(ValueError, sem.release)
1197 # self.assertReturnsIfImplemented(2, get_value, sem)
1198
1199 def test_timeout(self):
1200 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001201 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001202
1203 sem = self.Semaphore(0)
1204 acquire = TimingWrapper(sem.acquire)
1205
1206 self.assertEqual(acquire(False), False)
1207 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1208
1209 self.assertEqual(acquire(False, None), False)
1210 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1211
1212 self.assertEqual(acquire(False, TIMEOUT1), False)
1213 self.assertTimingAlmostEqual(acquire.elapsed, 0)
1214
1215 self.assertEqual(acquire(True, TIMEOUT2), False)
1216 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1217
1218 self.assertEqual(acquire(timeout=TIMEOUT3), False)
1219 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1220
1221
1222class _TestCondition(BaseTestCase):
1223
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001224 @classmethod
1225 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001226 cond.acquire()
1227 sleeping.release()
1228 cond.wait(timeout)
1229 woken.release()
1230 cond.release()
1231
Antoine Pitrou48350412017-07-04 08:59:22 +02001232 def assertReachesEventually(self, func, value):
1233 for i in range(10):
1234 try:
1235 if func() == value:
1236 break
1237 except NotImplementedError:
1238 break
1239 time.sleep(DELTA)
1240 time.sleep(DELTA)
1241 self.assertReturnsIfImplemented(value, func)
1242
Benjamin Petersone711caf2008-06-11 16:44:04 +00001243 def check_invariant(self, cond):
1244 # this is only supposed to succeed when there are no sleepers
1245 if self.TYPE == 'processes':
1246 try:
1247 sleepers = (cond._sleeping_count.get_value() -
1248 cond._woken_count.get_value())
1249 self.assertEqual(sleepers, 0)
1250 self.assertEqual(cond._wait_semaphore.get_value(), 0)
1251 except NotImplementedError:
1252 pass
1253
1254 def test_notify(self):
1255 cond = self.Condition()
1256 sleeping = self.Semaphore(0)
1257 woken = self.Semaphore(0)
1258
1259 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001260 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001261 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001262 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001263
1264 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001265 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001266 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001267 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001268
1269 # wait for both children to start sleeping
1270 sleeping.acquire()
1271 sleeping.acquire()
1272
1273 # check no process/thread has woken up
1274 time.sleep(DELTA)
1275 self.assertReturnsIfImplemented(0, get_value, woken)
1276
1277 # wake up one process/thread
1278 cond.acquire()
1279 cond.notify()
1280 cond.release()
1281
1282 # check one process/thread has woken up
1283 time.sleep(DELTA)
1284 self.assertReturnsIfImplemented(1, get_value, woken)
1285
1286 # wake up another
1287 cond.acquire()
1288 cond.notify()
1289 cond.release()
1290
1291 # check other has woken up
1292 time.sleep(DELTA)
1293 self.assertReturnsIfImplemented(2, get_value, woken)
1294
1295 # check state is not mucked up
1296 self.check_invariant(cond)
1297 p.join()
1298
1299 def test_notify_all(self):
1300 cond = self.Condition()
1301 sleeping = self.Semaphore(0)
1302 woken = self.Semaphore(0)
1303
1304 # start some threads/processes which will timeout
1305 for i in range(3):
1306 p = self.Process(target=self.f,
1307 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001308 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001309 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001310 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001311
1312 t = threading.Thread(target=self.f,
1313 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +00001314 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001315 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001316 self.addCleanup(t.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001317
1318 # wait for them all to sleep
1319 for i in range(6):
1320 sleeping.acquire()
1321
1322 # check they have all timed out
1323 for i in range(6):
1324 woken.acquire()
1325 self.assertReturnsIfImplemented(0, get_value, woken)
1326
1327 # check state is not mucked up
1328 self.check_invariant(cond)
1329
1330 # start some more threads/processes
1331 for i in range(3):
1332 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001333 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001334 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001335 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001336
1337 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +00001338 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001339 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001340 self.addCleanup(t.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001341
1342 # wait for them to all sleep
1343 for i in range(6):
1344 sleeping.acquire()
1345
1346 # check no process/thread has woken up
1347 time.sleep(DELTA)
1348 self.assertReturnsIfImplemented(0, get_value, woken)
1349
1350 # wake them all up
1351 cond.acquire()
1352 cond.notify_all()
1353 cond.release()
1354
1355 # check they have all woken
Antoine Pitrou48350412017-07-04 08:59:22 +02001356 self.assertReachesEventually(lambda: get_value(woken), 6)
1357
1358 # check state is not mucked up
1359 self.check_invariant(cond)
1360
1361 def test_notify_n(self):
1362 cond = self.Condition()
1363 sleeping = self.Semaphore(0)
1364 woken = self.Semaphore(0)
1365
1366 # start some threads/processes
1367 for i in range(3):
1368 p = self.Process(target=self.f, args=(cond, sleeping, woken))
1369 p.daemon = True
1370 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001371 self.addCleanup(p.join)
Antoine Pitrou48350412017-07-04 08:59:22 +02001372
1373 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1374 t.daemon = True
1375 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001376 self.addCleanup(t.join)
Antoine Pitrou48350412017-07-04 08:59:22 +02001377
1378 # wait for them to all sleep
1379 for i in range(6):
1380 sleeping.acquire()
1381
1382 # check no process/thread has woken up
1383 time.sleep(DELTA)
1384 self.assertReturnsIfImplemented(0, get_value, woken)
1385
1386 # wake some of them up
1387 cond.acquire()
1388 cond.notify(n=2)
1389 cond.release()
1390
1391 # check 2 have woken
1392 self.assertReachesEventually(lambda: get_value(woken), 2)
1393
1394 # wake the rest of them
1395 cond.acquire()
1396 cond.notify(n=4)
1397 cond.release()
1398
1399 self.assertReachesEventually(lambda: get_value(woken), 6)
1400
1401 # doesn't do anything more
1402 cond.acquire()
1403 cond.notify(n=3)
1404 cond.release()
1405
Benjamin Petersone711caf2008-06-11 16:44:04 +00001406 self.assertReturnsIfImplemented(6, get_value, woken)
1407
1408 # check state is not mucked up
1409 self.check_invariant(cond)
1410
1411 def test_timeout(self):
1412 cond = self.Condition()
1413 wait = TimingWrapper(cond.wait)
1414 cond.acquire()
1415 res = wait(TIMEOUT1)
1416 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +00001417 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001418 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1419
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001420 @classmethod
1421 def _test_waitfor_f(cls, cond, state):
1422 with cond:
1423 state.value = 0
1424 cond.notify()
1425 result = cond.wait_for(lambda : state.value==4)
1426 if not result or state.value != 4:
1427 sys.exit(1)
1428
1429 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1430 def test_waitfor(self):
1431 # based on test in test/lock_tests.py
1432 cond = self.Condition()
1433 state = self.Value('i', -1)
1434
1435 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1436 p.daemon = True
1437 p.start()
1438
1439 with cond:
1440 result = cond.wait_for(lambda : state.value==0)
1441 self.assertTrue(result)
1442 self.assertEqual(state.value, 0)
1443
1444 for i in range(4):
1445 time.sleep(0.01)
1446 with cond:
1447 state.value += 1
1448 cond.notify()
1449
Victor Stinner11f08072017-09-15 06:55:31 -07001450 join_process(p)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001451 self.assertEqual(p.exitcode, 0)
1452
1453 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001454 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1455 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001456 with cond:
1457 expected = 0.1
Victor Stinner2cf4c202018-12-17 09:36:36 +01001458 dt = time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001459 result = cond.wait_for(lambda : state.value==4, timeout=expected)
Victor Stinner2cf4c202018-12-17 09:36:36 +01001460 dt = time.monotonic() - dt
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001461 # borrow logic in assertTimeout() from test/lock_tests.py
1462 if not result and expected * 0.6 < dt < expected * 10.0:
1463 success.value = True
1464
1465 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1466 def test_waitfor_timeout(self):
1467 # based on test in test/lock_tests.py
1468 cond = self.Condition()
1469 state = self.Value('i', 0)
1470 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001471 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001472
1473 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001474 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001475 p.daemon = True
1476 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -07001477 self.assertTrue(sem.acquire(timeout=TIMEOUT))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001478
1479 # Only increment 3 times, so state == 4 is never reached.
1480 for i in range(3):
1481 time.sleep(0.01)
1482 with cond:
1483 state.value += 1
1484 cond.notify()
1485
Victor Stinner11f08072017-09-15 06:55:31 -07001486 join_process(p)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001487 self.assertTrue(success.value)
1488
Richard Oudkerk98449932012-06-05 13:15:29 +01001489 @classmethod
1490 def _test_wait_result(cls, c, pid):
1491 with c:
1492 c.notify()
1493 time.sleep(1)
1494 if pid is not None:
1495 os.kill(pid, signal.SIGINT)
1496
1497 def test_wait_result(self):
1498 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1499 pid = os.getpid()
1500 else:
1501 pid = None
1502
1503 c = self.Condition()
1504 with c:
1505 self.assertFalse(c.wait(0))
1506 self.assertFalse(c.wait(0.1))
1507
1508 p = self.Process(target=self._test_wait_result, args=(c, pid))
1509 p.start()
1510
Victor Stinner49257272018-06-27 22:24:02 +02001511 self.assertTrue(c.wait(60))
Richard Oudkerk98449932012-06-05 13:15:29 +01001512 if pid is not None:
Victor Stinner49257272018-06-27 22:24:02 +02001513 self.assertRaises(KeyboardInterrupt, c.wait, 60)
Richard Oudkerk98449932012-06-05 13:15:29 +01001514
1515 p.join()
1516
Benjamin Petersone711caf2008-06-11 16:44:04 +00001517
1518class _TestEvent(BaseTestCase):
1519
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001520 @classmethod
1521 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001522 time.sleep(TIMEOUT2)
1523 event.set()
1524
1525 def test_event(self):
1526 event = self.Event()
1527 wait = TimingWrapper(event.wait)
1528
Ezio Melotti13925002011-03-16 11:05:33 +02001529 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001530 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001531 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001532
Benjamin Peterson965ce872009-04-05 21:24:58 +00001533 # Removed, threading.Event.wait() will return the value of the __flag
1534 # instead of None. API Shear with the semaphore backed mp.Event
1535 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001536 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001537 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001538 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1539
1540 event.set()
1541
1542 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001543 self.assertEqual(event.is_set(), True)
1544 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001545 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001546 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001547 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1548 # self.assertEqual(event.is_set(), True)
1549
1550 event.clear()
1551
1552 #self.assertEqual(event.is_set(), False)
1553
Jesus Cea94f964f2011-09-09 20:26:57 +02001554 p = self.Process(target=self._test_event, args=(event,))
1555 p.daemon = True
1556 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001557 self.assertEqual(wait(), True)
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001558 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001559
1560#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001561# Tests for Barrier - adapted from tests in test/lock_tests.py
1562#
1563
1564# Many of the tests for threading.Barrier use a list as an atomic
1565# counter: a value is appended to increment the counter, and the
1566# length of the list gives the value. We use the class DummyList
1567# for the same purpose.
1568
1569class _DummyList(object):
1570
1571 def __init__(self):
1572 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1573 lock = multiprocessing.Lock()
1574 self.__setstate__((wrapper, lock))
1575 self._lengthbuf[0] = 0
1576
1577 def __setstate__(self, state):
1578 (self._wrapper, self._lock) = state
1579 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1580
1581 def __getstate__(self):
1582 return (self._wrapper, self._lock)
1583
1584 def append(self, _):
1585 with self._lock:
1586 self._lengthbuf[0] += 1
1587
1588 def __len__(self):
1589 with self._lock:
1590 return self._lengthbuf[0]
1591
1592def _wait():
1593 # A crude wait/yield function not relying on synchronization primitives.
1594 time.sleep(0.01)
1595
1596
1597class Bunch(object):
1598 """
1599 A bunch of threads.
1600 """
1601 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1602 """
1603 Construct a bunch of `n` threads running the same function `f`.
1604 If `wait_before_exit` is True, the threads won't terminate until
1605 do_finish() is called.
1606 """
1607 self.f = f
1608 self.args = args
1609 self.n = n
1610 self.started = namespace.DummyList()
1611 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001612 self._can_exit = namespace.Event()
1613 if not wait_before_exit:
1614 self._can_exit.set()
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001615
1616 threads = []
Richard Oudkerk3730a172012-06-15 18:26:07 +01001617 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001618 p = namespace.Process(target=self.task)
1619 p.daemon = True
1620 p.start()
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001621 threads.append(p)
1622
1623 def finalize(threads):
1624 for p in threads:
1625 p.join()
1626
1627 self._finalizer = weakref.finalize(self, finalize, threads)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001628
1629 def task(self):
1630 pid = os.getpid()
1631 self.started.append(pid)
1632 try:
1633 self.f(*self.args)
1634 finally:
1635 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001636 self._can_exit.wait(30)
1637 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001638
1639 def wait_for_started(self):
1640 while len(self.started) < self.n:
1641 _wait()
1642
1643 def wait_for_finished(self):
1644 while len(self.finished) < self.n:
1645 _wait()
1646
1647 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001648 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001649
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001650 def close(self):
1651 self._finalizer()
1652
Richard Oudkerk3730a172012-06-15 18:26:07 +01001653
1654class AppendTrue(object):
1655 def __init__(self, obj):
1656 self.obj = obj
1657 def __call__(self):
1658 self.obj.append(True)
1659
1660
1661class _TestBarrier(BaseTestCase):
1662 """
1663 Tests for Barrier objects.
1664 """
1665 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001666 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001667
1668 def setUp(self):
1669 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1670
1671 def tearDown(self):
1672 self.barrier.abort()
1673 self.barrier = None
1674
1675 def DummyList(self):
1676 if self.TYPE == 'threads':
1677 return []
1678 elif self.TYPE == 'manager':
1679 return self.manager.list()
1680 else:
1681 return _DummyList()
1682
1683 def run_threads(self, f, args):
1684 b = Bunch(self, f, args, self.N-1)
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001685 try:
1686 f(*args)
1687 b.wait_for_finished()
1688 finally:
1689 b.close()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001690
1691 @classmethod
1692 def multipass(cls, barrier, results, n):
1693 m = barrier.parties
1694 assert m == cls.N
1695 for i in range(n):
1696 results[0].append(True)
1697 assert len(results[1]) == i * m
1698 barrier.wait()
1699 results[1].append(True)
1700 assert len(results[0]) == (i + 1) * m
1701 barrier.wait()
1702 try:
1703 assert barrier.n_waiting == 0
1704 except NotImplementedError:
1705 pass
1706 assert not barrier.broken
1707
1708 def test_barrier(self, passes=1):
1709 """
1710 Test that a barrier is passed in lockstep
1711 """
1712 results = [self.DummyList(), self.DummyList()]
1713 self.run_threads(self.multipass, (self.barrier, results, passes))
1714
1715 def test_barrier_10(self):
1716 """
1717 Test that a barrier works for 10 consecutive runs
1718 """
1719 return self.test_barrier(10)
1720
1721 @classmethod
1722 def _test_wait_return_f(cls, barrier, queue):
1723 res = barrier.wait()
1724 queue.put(res)
1725
1726 def test_wait_return(self):
1727 """
1728 test the return value from barrier.wait
1729 """
1730 queue = self.Queue()
1731 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1732 results = [queue.get() for i in range(self.N)]
1733 self.assertEqual(results.count(0), 1)
Victor Stinnerb4c52962017-07-25 02:40:55 +02001734 close_queue(queue)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001735
1736 @classmethod
1737 def _test_action_f(cls, barrier, results):
1738 barrier.wait()
1739 if len(results) != 1:
1740 raise RuntimeError
1741
1742 def test_action(self):
1743 """
1744 Test the 'action' callback
1745 """
1746 results = self.DummyList()
1747 barrier = self.Barrier(self.N, action=AppendTrue(results))
1748 self.run_threads(self._test_action_f, (barrier, results))
1749 self.assertEqual(len(results), 1)
1750
1751 @classmethod
1752 def _test_abort_f(cls, barrier, results1, results2):
1753 try:
1754 i = barrier.wait()
1755 if i == cls.N//2:
1756 raise RuntimeError
1757 barrier.wait()
1758 results1.append(True)
1759 except threading.BrokenBarrierError:
1760 results2.append(True)
1761 except RuntimeError:
1762 barrier.abort()
1763
1764 def test_abort(self):
1765 """
1766 Test that an abort will put the barrier in a broken state
1767 """
1768 results1 = self.DummyList()
1769 results2 = self.DummyList()
1770 self.run_threads(self._test_abort_f,
1771 (self.barrier, results1, results2))
1772 self.assertEqual(len(results1), 0)
1773 self.assertEqual(len(results2), self.N-1)
1774 self.assertTrue(self.barrier.broken)
1775
1776 @classmethod
1777 def _test_reset_f(cls, barrier, results1, results2, results3):
1778 i = barrier.wait()
1779 if i == cls.N//2:
1780 # Wait until the other threads are all in the barrier.
1781 while barrier.n_waiting < cls.N-1:
1782 time.sleep(0.001)
1783 barrier.reset()
1784 else:
1785 try:
1786 barrier.wait()
1787 results1.append(True)
1788 except threading.BrokenBarrierError:
1789 results2.append(True)
1790 # Now, pass the barrier again
1791 barrier.wait()
1792 results3.append(True)
1793
1794 def test_reset(self):
1795 """
1796 Test that a 'reset' on a barrier frees the waiting threads
1797 """
1798 results1 = self.DummyList()
1799 results2 = self.DummyList()
1800 results3 = self.DummyList()
1801 self.run_threads(self._test_reset_f,
1802 (self.barrier, results1, results2, results3))
1803 self.assertEqual(len(results1), 0)
1804 self.assertEqual(len(results2), self.N-1)
1805 self.assertEqual(len(results3), self.N)
1806
1807 @classmethod
1808 def _test_abort_and_reset_f(cls, barrier, barrier2,
1809 results1, results2, results3):
1810 try:
1811 i = barrier.wait()
1812 if i == cls.N//2:
1813 raise RuntimeError
1814 barrier.wait()
1815 results1.append(True)
1816 except threading.BrokenBarrierError:
1817 results2.append(True)
1818 except RuntimeError:
1819 barrier.abort()
1820 # Synchronize and reset the barrier. Must synchronize first so
1821 # that everyone has left it when we reset, and after so that no
1822 # one enters it before the reset.
1823 if barrier2.wait() == cls.N//2:
1824 barrier.reset()
1825 barrier2.wait()
1826 barrier.wait()
1827 results3.append(True)
1828
1829 def test_abort_and_reset(self):
1830 """
1831 Test that a barrier can be reset after being broken.
1832 """
1833 results1 = self.DummyList()
1834 results2 = self.DummyList()
1835 results3 = self.DummyList()
1836 barrier2 = self.Barrier(self.N)
1837
1838 self.run_threads(self._test_abort_and_reset_f,
1839 (self.barrier, barrier2, results1, results2, results3))
1840 self.assertEqual(len(results1), 0)
1841 self.assertEqual(len(results2), self.N-1)
1842 self.assertEqual(len(results3), self.N)
1843
1844 @classmethod
1845 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001846 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001847 if i == cls.N//2:
1848 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001849 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001850 try:
1851 barrier.wait(0.5)
1852 except threading.BrokenBarrierError:
1853 results.append(True)
1854
1855 def test_timeout(self):
1856 """
1857 Test wait(timeout)
1858 """
1859 results = self.DummyList()
1860 self.run_threads(self._test_timeout_f, (self.barrier, results))
1861 self.assertEqual(len(results), self.barrier.parties)
1862
1863 @classmethod
1864 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001865 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001866 if i == cls.N//2:
1867 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001868 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001869 try:
1870 barrier.wait()
1871 except threading.BrokenBarrierError:
1872 results.append(True)
1873
1874 def test_default_timeout(self):
1875 """
1876 Test the barrier's default timeout
1877 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001878 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001879 results = self.DummyList()
1880 self.run_threads(self._test_default_timeout_f, (barrier, results))
1881 self.assertEqual(len(results), barrier.parties)
1882
1883 def test_single_thread(self):
1884 b = self.Barrier(1)
1885 b.wait()
1886 b.wait()
1887
1888 @classmethod
1889 def _test_thousand_f(cls, barrier, passes, conn, lock):
1890 for i in range(passes):
1891 barrier.wait()
1892 with lock:
1893 conn.send(i)
1894
1895 def test_thousand(self):
1896 if self.TYPE == 'manager':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001897 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk3730a172012-06-15 18:26:07 +01001898 passes = 1000
1899 lock = self.Lock()
1900 conn, child_conn = self.Pipe(False)
1901 for j in range(self.N):
1902 p = self.Process(target=self._test_thousand_f,
1903 args=(self.barrier, passes, child_conn, lock))
1904 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001905 self.addCleanup(p.join)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001906
1907 for i in range(passes):
1908 for j in range(self.N):
1909 self.assertEqual(conn.recv(), i)
1910
1911#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001912#
1913#
1914
1915class _TestValue(BaseTestCase):
1916
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001917 ALLOWED_TYPES = ('processes',)
1918
Benjamin Petersone711caf2008-06-11 16:44:04 +00001919 codes_values = [
1920 ('i', 4343, 24234),
1921 ('d', 3.625, -4.25),
1922 ('h', -232, 234),
Gareth Rees3913bad2017-07-21 11:35:33 +01001923 ('q', 2 ** 33, 2 ** 34),
Benjamin Petersone711caf2008-06-11 16:44:04 +00001924 ('c', latin('x'), latin('y'))
1925 ]
1926
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001927 def setUp(self):
1928 if not HAS_SHAREDCTYPES:
1929 self.skipTest("requires multiprocessing.sharedctypes")
1930
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001931 @classmethod
1932 def _test(cls, values):
1933 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001934 sv.value = cv[2]
1935
1936
1937 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001938 if raw:
1939 values = [self.RawValue(code, value)
1940 for code, value, _ in self.codes_values]
1941 else:
1942 values = [self.Value(code, value)
1943 for code, value, _ in self.codes_values]
1944
1945 for sv, cv in zip(values, self.codes_values):
1946 self.assertEqual(sv.value, cv[1])
1947
1948 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001949 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001950 proc.start()
1951 proc.join()
1952
1953 for sv, cv in zip(values, self.codes_values):
1954 self.assertEqual(sv.value, cv[2])
1955
1956 def test_rawvalue(self):
1957 self.test_value(raw=True)
1958
1959 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001960 val1 = self.Value('i', 5)
1961 lock1 = val1.get_lock()
1962 obj1 = val1.get_obj()
1963
1964 val2 = self.Value('i', 5, lock=None)
1965 lock2 = val2.get_lock()
1966 obj2 = val2.get_obj()
1967
1968 lock = self.Lock()
1969 val3 = self.Value('i', 5, lock=lock)
1970 lock3 = val3.get_lock()
1971 obj3 = val3.get_obj()
1972 self.assertEqual(lock, lock3)
1973
Jesse Nollerb0516a62009-01-18 03:11:38 +00001974 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001975 self.assertFalse(hasattr(arr4, 'get_lock'))
1976 self.assertFalse(hasattr(arr4, 'get_obj'))
1977
Jesse Nollerb0516a62009-01-18 03:11:38 +00001978 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1979
1980 arr5 = self.RawValue('i', 5)
1981 self.assertFalse(hasattr(arr5, 'get_lock'))
1982 self.assertFalse(hasattr(arr5, 'get_obj'))
1983
Benjamin Petersone711caf2008-06-11 16:44:04 +00001984
1985class _TestArray(BaseTestCase):
1986
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001987 ALLOWED_TYPES = ('processes',)
1988
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001989 @classmethod
1990 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001991 for i in range(1, len(seq)):
1992 seq[i] += seq[i-1]
1993
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001994 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001995 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001996 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1997 if raw:
1998 arr = self.RawArray('i', seq)
1999 else:
2000 arr = self.Array('i', seq)
2001
2002 self.assertEqual(len(arr), len(seq))
2003 self.assertEqual(arr[3], seq[3])
2004 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
2005
2006 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
2007
2008 self.assertEqual(list(arr[:]), seq)
2009
2010 self.f(seq)
2011
2012 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002013 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002014 p.start()
2015 p.join()
2016
2017 self.assertEqual(list(arr[:]), seq)
2018
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002019 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00002020 def test_array_from_size(self):
2021 size = 10
2022 # Test for zeroing (see issue #11675).
2023 # The repetition below strengthens the test by increasing the chances
2024 # of previously allocated non-zero memory being used for the new array
2025 # on the 2nd and 3rd loops.
2026 for _ in range(3):
2027 arr = self.Array('i', size)
2028 self.assertEqual(len(arr), size)
2029 self.assertEqual(list(arr), [0] * size)
2030 arr[:] = range(10)
2031 self.assertEqual(list(arr), list(range(10)))
2032 del arr
2033
2034 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002035 def test_rawarray(self):
2036 self.test_array(raw=True)
2037
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002038 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002039 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002040 arr1 = self.Array('i', list(range(10)))
2041 lock1 = arr1.get_lock()
2042 obj1 = arr1.get_obj()
2043
2044 arr2 = self.Array('i', list(range(10)), lock=None)
2045 lock2 = arr2.get_lock()
2046 obj2 = arr2.get_obj()
2047
2048 lock = self.Lock()
2049 arr3 = self.Array('i', list(range(10)), lock=lock)
2050 lock3 = arr3.get_lock()
2051 obj3 = arr3.get_obj()
2052 self.assertEqual(lock, lock3)
2053
Jesse Nollerb0516a62009-01-18 03:11:38 +00002054 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002055 self.assertFalse(hasattr(arr4, 'get_lock'))
2056 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00002057 self.assertRaises(AttributeError,
2058 self.Array, 'i', range(10), lock='notalock')
2059
2060 arr5 = self.RawArray('i', range(10))
2061 self.assertFalse(hasattr(arr5, 'get_lock'))
2062 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002063
2064#
2065#
2066#
2067
2068class _TestContainers(BaseTestCase):
2069
2070 ALLOWED_TYPES = ('manager',)
2071
2072 def test_list(self):
2073 a = self.list(list(range(10)))
2074 self.assertEqual(a[:], list(range(10)))
2075
2076 b = self.list()
2077 self.assertEqual(b[:], [])
2078
2079 b.extend(list(range(5)))
2080 self.assertEqual(b[:], list(range(5)))
2081
2082 self.assertEqual(b[2], 2)
2083 self.assertEqual(b[2:10], [2,3,4])
2084
2085 b *= 2
2086 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2087
2088 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2089
2090 self.assertEqual(a[:], list(range(10)))
2091
2092 d = [a, b]
2093 e = self.list(d)
2094 self.assertEqual(
Davin Potts86a76682016-09-07 18:48:01 -05002095 [element[:] for element in e],
Benjamin Petersone711caf2008-06-11 16:44:04 +00002096 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2097 )
2098
2099 f = self.list([a])
2100 a.append('hello')
Davin Potts86a76682016-09-07 18:48:01 -05002101 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2102
Serhiy Storchakae0e50652018-09-17 14:24:01 +03002103 def test_list_iter(self):
2104 a = self.list(list(range(10)))
2105 it = iter(a)
2106 self.assertEqual(list(it), list(range(10)))
2107 self.assertEqual(list(it), []) # exhausted
2108 # list modified during iteration
2109 it = iter(a)
2110 a[0] = 100
2111 self.assertEqual(next(it), 100)
2112
Davin Potts86a76682016-09-07 18:48:01 -05002113 def test_list_proxy_in_list(self):
2114 a = self.list([self.list(range(3)) for _i in range(3)])
2115 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2116
2117 a[0][-1] = 55
2118 self.assertEqual(a[0][:], [0, 1, 55])
2119 for i in range(1, 3):
2120 self.assertEqual(a[i][:], [0, 1, 2])
2121
2122 self.assertEqual(a[1].pop(), 2)
2123 self.assertEqual(len(a[1]), 2)
2124 for i in range(0, 3, 2):
2125 self.assertEqual(len(a[i]), 3)
2126
2127 del a
2128
2129 b = self.list()
2130 b.append(b)
2131 del b
Benjamin Petersone711caf2008-06-11 16:44:04 +00002132
2133 def test_dict(self):
2134 d = self.dict()
2135 indices = list(range(65, 70))
2136 for i in indices:
2137 d[i] = chr(i)
2138 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2139 self.assertEqual(sorted(d.keys()), indices)
2140 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2141 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2142
Serhiy Storchakae0e50652018-09-17 14:24:01 +03002143 def test_dict_iter(self):
2144 d = self.dict()
2145 indices = list(range(65, 70))
2146 for i in indices:
2147 d[i] = chr(i)
2148 it = iter(d)
2149 self.assertEqual(list(it), indices)
2150 self.assertEqual(list(it), []) # exhausted
2151 # dictionary changed size during iteration
2152 it = iter(d)
2153 d.clear()
2154 self.assertRaises(RuntimeError, next, it)
2155
Davin Potts86a76682016-09-07 18:48:01 -05002156 def test_dict_proxy_nested(self):
2157 pets = self.dict(ferrets=2, hamsters=4)
2158 supplies = self.dict(water=10, feed=3)
2159 d = self.dict(pets=pets, supplies=supplies)
2160
2161 self.assertEqual(supplies['water'], 10)
2162 self.assertEqual(d['supplies']['water'], 10)
2163
2164 d['supplies']['blankets'] = 5
2165 self.assertEqual(supplies['blankets'], 5)
2166 self.assertEqual(d['supplies']['blankets'], 5)
2167
2168 d['supplies']['water'] = 7
2169 self.assertEqual(supplies['water'], 7)
2170 self.assertEqual(d['supplies']['water'], 7)
2171
2172 del pets
2173 del supplies
2174 self.assertEqual(d['pets']['ferrets'], 2)
2175 d['supplies']['blankets'] = 11
2176 self.assertEqual(d['supplies']['blankets'], 11)
2177
2178 pets = d['pets']
2179 supplies = d['supplies']
2180 supplies['water'] = 7
2181 self.assertEqual(supplies['water'], 7)
2182 self.assertEqual(d['supplies']['water'], 7)
2183
2184 d.clear()
2185 self.assertEqual(len(d), 0)
2186 self.assertEqual(supplies['water'], 7)
2187 self.assertEqual(pets['hamsters'], 4)
2188
2189 l = self.list([pets, supplies])
2190 l[0]['marmots'] = 1
2191 self.assertEqual(pets['marmots'], 1)
2192 self.assertEqual(l[0]['marmots'], 1)
2193
2194 del pets
2195 del supplies
2196 self.assertEqual(l[0]['marmots'], 1)
2197
2198 outer = self.list([[88, 99], l])
2199 self.assertIsInstance(outer[0], list) # Not a ListProxy
2200 self.assertEqual(outer[-1][-1]['feed'], 3)
2201
Benjamin Petersone711caf2008-06-11 16:44:04 +00002202 def test_namespace(self):
2203 n = self.Namespace()
2204 n.name = 'Bob'
2205 n.job = 'Builder'
2206 n._hidden = 'hidden'
2207 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2208 del n.job
2209 self.assertEqual(str(n), "Namespace(name='Bob')")
2210 self.assertTrue(hasattr(n, 'name'))
2211 self.assertTrue(not hasattr(n, 'job'))
2212
2213#
2214#
2215#
2216
2217def sqr(x, wait=0.0):
2218 time.sleep(wait)
2219 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00002220
Antoine Pitroude911b22011-12-21 11:03:24 +01002221def mul(x, y):
2222 return x*y
2223
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002224def raise_large_valuerror(wait):
2225 time.sleep(wait)
2226 raise ValueError("x" * 1024**2)
2227
Antoine Pitrou89889452017-03-24 13:52:11 +01002228def identity(x):
2229 return x
2230
2231class CountedObject(object):
2232 n_instances = 0
2233
2234 def __new__(cls):
2235 cls.n_instances += 1
2236 return object.__new__(cls)
2237
2238 def __del__(self):
2239 type(self).n_instances -= 1
2240
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002241class SayWhenError(ValueError): pass
2242
2243def exception_throwing_generator(total, when):
Xiang Zhang794623b2017-03-29 11:58:54 +08002244 if when == -1:
2245 raise SayWhenError("Somebody said when")
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002246 for i in range(total):
2247 if i == when:
2248 raise SayWhenError("Somebody said when")
2249 yield i
2250
Antoine Pitrou89889452017-03-24 13:52:11 +01002251
Benjamin Petersone711caf2008-06-11 16:44:04 +00002252class _TestPool(BaseTestCase):
2253
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002254 @classmethod
2255 def setUpClass(cls):
2256 super().setUpClass()
2257 cls.pool = cls.Pool(4)
2258
2259 @classmethod
2260 def tearDownClass(cls):
2261 cls.pool.terminate()
2262 cls.pool.join()
2263 cls.pool = None
2264 super().tearDownClass()
2265
Benjamin Petersone711caf2008-06-11 16:44:04 +00002266 def test_apply(self):
2267 papply = self.pool.apply
2268 self.assertEqual(papply(sqr, (5,)), sqr(5))
2269 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2270
2271 def test_map(self):
2272 pmap = self.pool.map
2273 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2274 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2275 list(map(sqr, list(range(100)))))
2276
Antoine Pitroude911b22011-12-21 11:03:24 +01002277 def test_starmap(self):
2278 psmap = self.pool.starmap
2279 tuples = list(zip(range(10), range(9,-1, -1)))
2280 self.assertEqual(psmap(mul, tuples),
2281 list(itertools.starmap(mul, tuples)))
2282 tuples = list(zip(range(100), range(99,-1, -1)))
2283 self.assertEqual(psmap(mul, tuples, chunksize=20),
2284 list(itertools.starmap(mul, tuples)))
2285
2286 def test_starmap_async(self):
2287 tuples = list(zip(range(100), range(99,-1, -1)))
2288 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2289 list(itertools.starmap(mul, tuples)))
2290
Hynek Schlawack254af262012-10-27 12:53:02 +02002291 def test_map_async(self):
2292 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2293 list(map(sqr, list(range(10)))))
2294
2295 def test_map_async_callbacks(self):
2296 call_args = self.manager.list() if self.TYPE == 'manager' else []
2297 self.pool.map_async(int, ['1'],
2298 callback=call_args.append,
2299 error_callback=call_args.append).wait()
2300 self.assertEqual(1, len(call_args))
2301 self.assertEqual([1], call_args[0])
2302 self.pool.map_async(int, ['a'],
2303 callback=call_args.append,
2304 error_callback=call_args.append).wait()
2305 self.assertEqual(2, len(call_args))
2306 self.assertIsInstance(call_args[1], ValueError)
2307
Richard Oudkerke90cedb2013-10-28 23:11:58 +00002308 def test_map_unplicklable(self):
2309 # Issue #19425 -- failure to pickle should not cause a hang
2310 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002311 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerke90cedb2013-10-28 23:11:58 +00002312 class A(object):
2313 def __reduce__(self):
2314 raise RuntimeError('cannot pickle')
2315 with self.assertRaises(RuntimeError):
2316 self.pool.map(sqr, [A()]*10)
2317
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00002318 def test_map_chunksize(self):
2319 try:
2320 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2321 except multiprocessing.TimeoutError:
2322 self.fail("pool.map_async with chunksize stalled on null list")
2323
Xiang Zhang794623b2017-03-29 11:58:54 +08002324 def test_map_handle_iterable_exception(self):
2325 if self.TYPE == 'manager':
2326 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2327
2328 # SayWhenError seen at the very first of the iterable
2329 with self.assertRaises(SayWhenError):
2330 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2331 # again, make sure it's reentrant
2332 with self.assertRaises(SayWhenError):
2333 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2334
2335 with self.assertRaises(SayWhenError):
2336 self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2337
2338 class SpecialIterable:
2339 def __iter__(self):
2340 return self
2341 def __next__(self):
2342 raise SayWhenError
2343 def __len__(self):
2344 return 1
2345 with self.assertRaises(SayWhenError):
2346 self.pool.map(sqr, SpecialIterable(), 1)
2347 with self.assertRaises(SayWhenError):
2348 self.pool.map(sqr, SpecialIterable(), 1)
2349
Benjamin Petersone711caf2008-06-11 16:44:04 +00002350 def test_async(self):
2351 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2352 get = TimingWrapper(res.get)
2353 self.assertEqual(get(), 49)
2354 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2355
2356 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00002357 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002358 get = TimingWrapper(res.get)
2359 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2360 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2361
2362 def test_imap(self):
2363 it = self.pool.imap(sqr, list(range(10)))
2364 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2365
2366 it = self.pool.imap(sqr, list(range(10)))
2367 for i in range(10):
2368 self.assertEqual(next(it), i*i)
2369 self.assertRaises(StopIteration, it.__next__)
2370
2371 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2372 for i in range(1000):
2373 self.assertEqual(next(it), i*i)
2374 self.assertRaises(StopIteration, it.__next__)
2375
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002376 def test_imap_handle_iterable_exception(self):
2377 if self.TYPE == 'manager':
2378 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2379
Xiang Zhang794623b2017-03-29 11:58:54 +08002380 # SayWhenError seen at the very first of the iterable
2381 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2382 self.assertRaises(SayWhenError, it.__next__)
2383 # again, make sure it's reentrant
2384 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2385 self.assertRaises(SayWhenError, it.__next__)
2386
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002387 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2388 for i in range(3):
2389 self.assertEqual(next(it), i*i)
2390 self.assertRaises(SayWhenError, it.__next__)
2391
2392 # SayWhenError seen at start of problematic chunk's results
2393 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2394 for i in range(6):
2395 self.assertEqual(next(it), i*i)
2396 self.assertRaises(SayWhenError, it.__next__)
2397 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2398 for i in range(4):
2399 self.assertEqual(next(it), i*i)
2400 self.assertRaises(SayWhenError, it.__next__)
2401
Benjamin Petersone711caf2008-06-11 16:44:04 +00002402 def test_imap_unordered(self):
Victor Stinner23401fb2018-07-03 13:20:35 +02002403 it = self.pool.imap_unordered(sqr, list(range(10)))
2404 self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002405
Victor Stinner23401fb2018-07-03 13:20:35 +02002406 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002407 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2408
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002409 def test_imap_unordered_handle_iterable_exception(self):
2410 if self.TYPE == 'manager':
2411 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2412
Xiang Zhang794623b2017-03-29 11:58:54 +08002413 # SayWhenError seen at the very first of the iterable
2414 it = self.pool.imap_unordered(sqr,
2415 exception_throwing_generator(1, -1),
2416 1)
2417 self.assertRaises(SayWhenError, it.__next__)
2418 # again, make sure it's reentrant
2419 it = self.pool.imap_unordered(sqr,
2420 exception_throwing_generator(1, -1),
2421 1)
2422 self.assertRaises(SayWhenError, it.__next__)
2423
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002424 it = self.pool.imap_unordered(sqr,
2425 exception_throwing_generator(10, 3),
2426 1)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002427 expected_values = list(map(sqr, list(range(10))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002428 with self.assertRaises(SayWhenError):
2429 # imap_unordered makes it difficult to anticipate the SayWhenError
2430 for i in range(10):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002431 value = next(it)
2432 self.assertIn(value, expected_values)
2433 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002434
2435 it = self.pool.imap_unordered(sqr,
2436 exception_throwing_generator(20, 7),
2437 2)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002438 expected_values = list(map(sqr, list(range(20))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002439 with self.assertRaises(SayWhenError):
2440 for i in range(20):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002441 value = next(it)
2442 self.assertIn(value, expected_values)
2443 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002444
Benjamin Petersone711caf2008-06-11 16:44:04 +00002445 def test_make_pool(self):
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002446 expected_error = (RemoteError if self.TYPE == 'manager'
2447 else ValueError)
Victor Stinner2fae27b2011-06-20 17:53:35 +02002448
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002449 self.assertRaises(expected_error, self.Pool, -1)
2450 self.assertRaises(expected_error, self.Pool, 0)
2451
2452 if self.TYPE != 'manager':
2453 p = self.Pool(3)
2454 try:
2455 self.assertEqual(3, len(p._pool))
2456 finally:
2457 p.close()
2458 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002459
2460 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002461 result = self.pool.map_async(
2462 time.sleep, [0.1 for i in range(10000)], chunksize=1
2463 )
2464 self.pool.terminate()
2465 join = TimingWrapper(self.pool.join)
2466 join()
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002467 # Sanity check the pool didn't wait for all tasks to finish
2468 self.assertLess(join.elapsed, 2.0)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002469
Richard Oudkerke41682b2012-06-06 19:04:57 +01002470 def test_empty_iterable(self):
2471 # See Issue 12157
2472 p = self.Pool(1)
2473
2474 self.assertEqual(p.map(sqr, []), [])
2475 self.assertEqual(list(p.imap(sqr, [])), [])
2476 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2477 self.assertEqual(p.map_async(sqr, []).get(), [])
2478
2479 p.close()
2480 p.join()
2481
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002482 def test_context(self):
2483 if self.TYPE == 'processes':
2484 L = list(range(10))
2485 expected = [sqr(i) for i in L]
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002486 with self.Pool(2) as p:
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002487 r = p.map_async(sqr, L)
2488 self.assertEqual(r.get(), expected)
Victor Stinner388c8c22018-12-06 11:56:52 +01002489 p.join()
Benjamin Peterson3095f472012-09-25 12:45:42 -04002490 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002491
Richard Oudkerk85757832013-05-06 11:38:25 +01002492 @classmethod
2493 def _test_traceback(cls):
2494 raise RuntimeError(123) # some comment
2495
2496 def test_traceback(self):
2497 # We want ensure that the traceback from the child process is
2498 # contained in the traceback raised in the main process.
2499 if self.TYPE == 'processes':
2500 with self.Pool(1) as p:
2501 try:
2502 p.apply(self._test_traceback)
2503 except Exception as e:
2504 exc = e
2505 else:
Xiang Zhang794623b2017-03-29 11:58:54 +08002506 self.fail('expected RuntimeError')
Victor Stinner388c8c22018-12-06 11:56:52 +01002507 p.join()
Richard Oudkerk85757832013-05-06 11:38:25 +01002508 self.assertIs(type(exc), RuntimeError)
2509 self.assertEqual(exc.args, (123,))
2510 cause = exc.__cause__
2511 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2512 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2513
2514 with test.support.captured_stderr() as f1:
2515 try:
2516 raise exc
2517 except RuntimeError:
2518 sys.excepthook(*sys.exc_info())
2519 self.assertIn('raise RuntimeError(123) # some comment',
2520 f1.getvalue())
Xiang Zhang794623b2017-03-29 11:58:54 +08002521 # _helper_reraises_exception should not make the error
2522 # a remote exception
2523 with self.Pool(1) as p:
2524 try:
2525 p.map(sqr, exception_throwing_generator(1, -1), 1)
2526 except Exception as e:
2527 exc = e
2528 else:
2529 self.fail('expected SayWhenError')
2530 self.assertIs(type(exc), SayWhenError)
2531 self.assertIs(exc.__cause__, None)
Victor Stinner388c8c22018-12-06 11:56:52 +01002532 p.join()
Richard Oudkerk85757832013-05-06 11:38:25 +01002533
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002534 @classmethod
2535 def _test_wrapped_exception(cls):
2536 raise RuntimeError('foo')
2537
2538 def test_wrapped_exception(self):
2539 # Issue #20980: Should not wrap exception when using thread pool
2540 with self.Pool(1) as p:
2541 with self.assertRaises(RuntimeError):
2542 p.apply(self._test_wrapped_exception)
Victor Stinnerb7278732018-11-28 01:14:31 +01002543 p.join()
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002544
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002545 def test_map_no_failfast(self):
2546 # Issue #23992: the fail-fast behaviour when an exception is raised
2547 # during map() would make Pool.join() deadlock, because a worker
2548 # process would fill the result queue (after the result handler thread
2549 # terminated, hence not draining it anymore).
2550
Victor Stinner2cf4c202018-12-17 09:36:36 +01002551 t_start = time.monotonic()
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002552
2553 with self.assertRaises(ValueError):
2554 with self.Pool(2) as p:
2555 try:
2556 p.map(raise_large_valuerror, [0, 1])
2557 finally:
2558 time.sleep(0.5)
2559 p.close()
2560 p.join()
2561
2562 # check that we indeed waited for all jobs
Victor Stinner2cf4c202018-12-17 09:36:36 +01002563 self.assertGreater(time.monotonic() - t_start, 0.9)
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002564
Antoine Pitrou89889452017-03-24 13:52:11 +01002565 def test_release_task_refs(self):
2566 # Issue #29861: task arguments and results should not be kept
2567 # alive after we are done with them.
2568 objs = [CountedObject() for i in range(10)]
2569 refs = [weakref.ref(o) for o in objs]
2570 self.pool.map(identity, objs)
2571
2572 del objs
Antoine Pitrou685cdb92017-04-14 13:10:00 +02002573 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou89889452017-03-24 13:52:11 +01002574 self.assertEqual(set(wr() for wr in refs), {None})
2575 # With a process pool, copies of the objects are returned, check
2576 # they were released too.
2577 self.assertEqual(CountedObject.n_instances, 0)
2578
Victor Stinner08c2ba02018-12-13 02:15:30 +01002579 def test_enter(self):
2580 if self.TYPE == 'manager':
2581 self.skipTest("test not applicable to manager")
2582
2583 pool = self.Pool(1)
2584 with pool:
2585 pass
2586 # call pool.terminate()
2587 # pool is no longer running
2588
2589 with self.assertRaises(ValueError):
2590 # bpo-35477: pool.__enter__() fails if the pool is not running
2591 with pool:
2592 pass
2593 pool.join()
2594
Victor Stinner9a8d1d72018-12-20 20:33:51 +01002595 def test_resource_warning(self):
2596 if self.TYPE == 'manager':
2597 self.skipTest("test not applicable to manager")
2598
2599 pool = self.Pool(1)
2600 pool.terminate()
2601 pool.join()
2602
2603 # force state to RUN to emit ResourceWarning in __del__()
2604 pool._state = multiprocessing.pool.RUN
2605
2606 with support.check_warnings(('unclosed running multiprocessing pool',
2607 ResourceWarning)):
2608 pool = None
2609 support.gc_collect()
2610
Ask Solem2afcbf22010-11-09 20:55:52 +00002611def raising():
2612 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00002613
Ask Solem2afcbf22010-11-09 20:55:52 +00002614def unpickleable_result():
2615 return lambda: 42
2616
2617class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00002618 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00002619
2620 def test_async_error_callback(self):
2621 p = multiprocessing.Pool(2)
2622
2623 scratchpad = [None]
2624 def errback(exc):
2625 scratchpad[0] = exc
2626
2627 res = p.apply_async(raising, error_callback=errback)
2628 self.assertRaises(KeyError, res.get)
2629 self.assertTrue(scratchpad[0])
2630 self.assertIsInstance(scratchpad[0], KeyError)
2631
2632 p.close()
2633 p.join()
2634
2635 def test_unpickleable_result(self):
2636 from multiprocessing.pool import MaybeEncodingError
2637 p = multiprocessing.Pool(2)
2638
2639 # Make sure we don't lose pool processes because of encoding errors.
2640 for iteration in range(20):
2641
2642 scratchpad = [None]
2643 def errback(exc):
2644 scratchpad[0] = exc
2645
2646 res = p.apply_async(unpickleable_result, error_callback=errback)
2647 self.assertRaises(MaybeEncodingError, res.get)
2648 wrapped = scratchpad[0]
2649 self.assertTrue(wrapped)
2650 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2651 self.assertIsNotNone(wrapped.exc)
2652 self.assertIsNotNone(wrapped.value)
2653
2654 p.close()
2655 p.join()
2656
2657class _TestPoolWorkerLifetime(BaseTestCase):
2658 ALLOWED_TYPES = ('processes', )
2659
Jesse Noller1f0b6582010-01-27 03:36:01 +00002660 def test_pool_worker_lifetime(self):
2661 p = multiprocessing.Pool(3, maxtasksperchild=10)
2662 self.assertEqual(3, len(p._pool))
2663 origworkerpids = [w.pid for w in p._pool]
2664 # Run many tasks so each worker gets replaced (hopefully)
2665 results = []
2666 for i in range(100):
2667 results.append(p.apply_async(sqr, (i, )))
2668 # Fetch the results and verify we got the right answers,
2669 # also ensuring all the tasks have completed.
2670 for (j, res) in enumerate(results):
2671 self.assertEqual(res.get(), sqr(j))
2672 # Refill the pool
2673 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00002674 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02002675 # (countdown * DELTA = 5 seconds max startup process time)
2676 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00002677 while countdown and not all(w.is_alive() for w in p._pool):
2678 countdown -= 1
2679 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002680 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00002681 # All pids should be assigned. See issue #7805.
2682 self.assertNotIn(None, origworkerpids)
2683 self.assertNotIn(None, finalworkerpids)
2684 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00002685 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2686 p.close()
2687 p.join()
2688
Charles-François Natalif8859e12011-10-24 18:45:29 +02002689 def test_pool_worker_lifetime_early_close(self):
2690 # Issue #10332: closing a pool whose workers have limited lifetimes
2691 # before all the tasks completed would make join() hang.
2692 p = multiprocessing.Pool(3, maxtasksperchild=1)
2693 results = []
2694 for i in range(6):
2695 results.append(p.apply_async(sqr, (i, 0.3)))
2696 p.close()
2697 p.join()
2698 # check the results
2699 for (j, res) in enumerate(results):
2700 self.assertEqual(res.get(), sqr(j))
2701
Benjamin Petersone711caf2008-06-11 16:44:04 +00002702#
2703# Test of creating a customized manager class
2704#
2705
2706from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2707
2708class FooBar(object):
2709 def f(self):
2710 return 'f()'
2711 def g(self):
2712 raise ValueError
2713 def _h(self):
2714 return '_h()'
2715
2716def baz():
2717 for i in range(10):
2718 yield i*i
2719
2720class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00002721 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002722 def __iter__(self):
2723 return self
2724 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002725 return self._callmethod('__next__')
2726
2727class MyManager(BaseManager):
2728 pass
2729
2730MyManager.register('Foo', callable=FooBar)
2731MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2732MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2733
2734
2735class _TestMyManager(BaseTestCase):
2736
2737 ALLOWED_TYPES = ('manager',)
2738
2739 def test_mymanager(self):
2740 manager = MyManager()
2741 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01002742 self.common(manager)
2743 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002744
Richard Oudkerkac385712012-06-18 21:29:30 +01002745 # If the manager process exited cleanly then the exitcode
2746 # will be zero. Otherwise (after a short timeout)
2747 # terminate() is used, resulting in an exitcode of -SIGTERM.
2748 self.assertEqual(manager._process.exitcode, 0)
2749
2750 def test_mymanager_context(self):
2751 with MyManager() as manager:
2752 self.common(manager)
Victor Stinnerfbd71722018-06-27 18:18:10 +02002753 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2754 # to the manager process if it takes longer than 1 second to stop.
2755 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
Richard Oudkerkac385712012-06-18 21:29:30 +01002756
2757 def test_mymanager_context_prestarted(self):
2758 manager = MyManager()
2759 manager.start()
2760 with manager:
2761 self.common(manager)
2762 self.assertEqual(manager._process.exitcode, 0)
2763
2764 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002765 foo = manager.Foo()
2766 bar = manager.Bar()
2767 baz = manager.baz()
2768
2769 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2770 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2771
2772 self.assertEqual(foo_methods, ['f', 'g'])
2773 self.assertEqual(bar_methods, ['f', '_h'])
2774
2775 self.assertEqual(foo.f(), 'f()')
2776 self.assertRaises(ValueError, foo.g)
2777 self.assertEqual(foo._callmethod('f'), 'f()')
2778 self.assertRaises(RemoteError, foo._callmethod, '_h')
2779
2780 self.assertEqual(bar.f(), 'f()')
2781 self.assertEqual(bar._h(), '_h()')
2782 self.assertEqual(bar._callmethod('f'), 'f()')
2783 self.assertEqual(bar._callmethod('_h'), '_h()')
2784
2785 self.assertEqual(list(baz), [i*i for i in range(10)])
2786
Richard Oudkerk73d9a292012-06-14 15:30:10 +01002787
Benjamin Petersone711caf2008-06-11 16:44:04 +00002788#
2789# Test of connecting to a remote server and using xmlrpclib for serialization
2790#
2791
2792_queue = pyqueue.Queue()
2793def get_queue():
2794 return _queue
2795
2796class QueueManager(BaseManager):
2797 '''manager class used by server process'''
2798QueueManager.register('get_queue', callable=get_queue)
2799
2800class QueueManager2(BaseManager):
2801 '''manager class which specifies the same interface as QueueManager'''
2802QueueManager2.register('get_queue')
2803
2804
2805SERIALIZER = 'xmlrpclib'
2806
2807class _TestRemoteManager(BaseTestCase):
2808
2809 ALLOWED_TYPES = ('manager',)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002810 values = ['hello world', None, True, 2.25,
2811 'hall\xe5 v\xe4rlden',
2812 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2813 b'hall\xe5 v\xe4rlden',
2814 ]
2815 result = values[:]
Benjamin Petersone711caf2008-06-11 16:44:04 +00002816
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002817 @classmethod
2818 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002819 manager = QueueManager2(
2820 address=address, authkey=authkey, serializer=SERIALIZER
2821 )
2822 manager.connect()
2823 queue = manager.get_queue()
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002824 # Note that xmlrpclib will deserialize object as a list not a tuple
2825 queue.put(tuple(cls.values))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002826
2827 def test_remote(self):
2828 authkey = os.urandom(32)
2829
2830 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002831 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00002832 )
2833 manager.start()
Pablo Galindo7b2a37b2019-02-09 17:35:05 +00002834 self.addCleanup(manager.shutdown)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002835
2836 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002837 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002838 p.start()
2839
2840 manager2 = QueueManager2(
2841 address=manager.address, authkey=authkey, serializer=SERIALIZER
2842 )
2843 manager2.connect()
2844 queue = manager2.get_queue()
2845
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002846 self.assertEqual(queue.get(), self.result)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002847
2848 # Because we are using xmlrpclib for serialization instead of
2849 # pickle this will cause a serialization error.
2850 self.assertRaises(Exception, queue.put, time.sleep)
2851
2852 # Make queue finalizer run before the server is stopped
2853 del queue
Benjamin Petersone711caf2008-06-11 16:44:04 +00002854
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002855class _TestManagerRestart(BaseTestCase):
2856
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002857 @classmethod
2858 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002859 manager = QueueManager(
2860 address=address, authkey=authkey, serializer=SERIALIZER)
2861 manager.connect()
2862 queue = manager.get_queue()
2863 queue.put('hello world')
2864
2865 def test_rapid_restart(self):
2866 authkey = os.urandom(32)
2867 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002868 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Pablo Galindo7b2a37b2019-02-09 17:35:05 +00002869 try:
2870 srvr = manager.get_server()
2871 addr = srvr.address
2872 # Close the connection.Listener socket which gets opened as a part
2873 # of manager.get_server(). It's not needed for the test.
2874 srvr.listener.close()
2875 manager.start()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002876
Pablo Galindo7b2a37b2019-02-09 17:35:05 +00002877 p = self.Process(target=self._putter, args=(manager.address, authkey))
2878 p.start()
2879 p.join()
2880 queue = manager.get_queue()
2881 self.assertEqual(queue.get(), 'hello world')
2882 del queue
2883 finally:
2884 if hasattr(manager, "shutdown"):
2885 manager.shutdown()
Victor Stinner17657bb2017-08-16 12:46:04 +02002886
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002887 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002888 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002889 try:
2890 manager.start()
Pablo Galindo7b2a37b2019-02-09 17:35:05 +00002891 self.addCleanup(manager.shutdown)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002892 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002893 if e.errno != errno.EADDRINUSE:
2894 raise
2895 # Retry after some time, in case the old socket was lingering
2896 # (sporadic failure on buildbots)
2897 time.sleep(1.0)
2898 manager = QueueManager(
2899 address=addr, authkey=authkey, serializer=SERIALIZER)
Pablo Galindo7b2a37b2019-02-09 17:35:05 +00002900 if hasattr(manager, "shutdown"):
2901 self.addCleanup(manager.shutdown)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002902
Benjamin Petersone711caf2008-06-11 16:44:04 +00002903#
2904#
2905#
2906
2907SENTINEL = latin('')
2908
2909class _TestConnection(BaseTestCase):
2910
2911 ALLOWED_TYPES = ('processes', 'threads')
2912
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002913 @classmethod
2914 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002915 for msg in iter(conn.recv_bytes, SENTINEL):
2916 conn.send_bytes(msg)
2917 conn.close()
2918
2919 def test_connection(self):
2920 conn, child_conn = self.Pipe()
2921
2922 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002923 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002924 p.start()
2925
2926 seq = [1, 2.25, None]
2927 msg = latin('hello world')
2928 longmsg = msg * 10
2929 arr = array.array('i', list(range(4)))
2930
2931 if self.TYPE == 'processes':
2932 self.assertEqual(type(conn.fileno()), int)
2933
2934 self.assertEqual(conn.send(seq), None)
2935 self.assertEqual(conn.recv(), seq)
2936
2937 self.assertEqual(conn.send_bytes(msg), None)
2938 self.assertEqual(conn.recv_bytes(), msg)
2939
2940 if self.TYPE == 'processes':
2941 buffer = array.array('i', [0]*10)
2942 expected = list(arr) + [0] * (10 - len(arr))
2943 self.assertEqual(conn.send_bytes(arr), None)
2944 self.assertEqual(conn.recv_bytes_into(buffer),
2945 len(arr) * buffer.itemsize)
2946 self.assertEqual(list(buffer), expected)
2947
2948 buffer = array.array('i', [0]*10)
2949 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2950 self.assertEqual(conn.send_bytes(arr), None)
2951 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2952 len(arr) * buffer.itemsize)
2953 self.assertEqual(list(buffer), expected)
2954
2955 buffer = bytearray(latin(' ' * 40))
2956 self.assertEqual(conn.send_bytes(longmsg), None)
2957 try:
2958 res = conn.recv_bytes_into(buffer)
2959 except multiprocessing.BufferTooShort as e:
2960 self.assertEqual(e.args, (longmsg,))
2961 else:
2962 self.fail('expected BufferTooShort, got %s' % res)
2963
2964 poll = TimingWrapper(conn.poll)
2965
2966 self.assertEqual(poll(), False)
2967 self.assertTimingAlmostEqual(poll.elapsed, 0)
2968
Richard Oudkerk59d54042012-05-10 16:11:12 +01002969 self.assertEqual(poll(-1), False)
2970 self.assertTimingAlmostEqual(poll.elapsed, 0)
2971
Benjamin Petersone711caf2008-06-11 16:44:04 +00002972 self.assertEqual(poll(TIMEOUT1), False)
2973 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2974
2975 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002976 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002977
2978 self.assertEqual(poll(TIMEOUT1), True)
2979 self.assertTimingAlmostEqual(poll.elapsed, 0)
2980
2981 self.assertEqual(conn.recv(), None)
2982
2983 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2984 conn.send_bytes(really_big_msg)
2985 self.assertEqual(conn.recv_bytes(), really_big_msg)
2986
2987 conn.send_bytes(SENTINEL) # tell child to quit
2988 child_conn.close()
2989
2990 if self.TYPE == 'processes':
2991 self.assertEqual(conn.readable, True)
2992 self.assertEqual(conn.writable, True)
2993 self.assertRaises(EOFError, conn.recv)
2994 self.assertRaises(EOFError, conn.recv_bytes)
2995
2996 p.join()
2997
2998 def test_duplex_false(self):
2999 reader, writer = self.Pipe(duplex=False)
3000 self.assertEqual(writer.send(1), None)
3001 self.assertEqual(reader.recv(), 1)
3002 if self.TYPE == 'processes':
3003 self.assertEqual(reader.readable, True)
3004 self.assertEqual(reader.writable, False)
3005 self.assertEqual(writer.readable, False)
3006 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003007 self.assertRaises(OSError, reader.send, 2)
3008 self.assertRaises(OSError, writer.recv)
3009 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003010
3011 def test_spawn_close(self):
3012 # We test that a pipe connection can be closed by parent
3013 # process immediately after child is spawned. On Windows this
3014 # would have sometimes failed on old versions because
3015 # child_conn would be closed before the child got a chance to
3016 # duplicate it.
3017 conn, child_conn = self.Pipe()
3018
3019 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003020 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003021 p.start()
3022 child_conn.close() # this might complete before child initializes
3023
3024 msg = latin('hello')
3025 conn.send_bytes(msg)
3026 self.assertEqual(conn.recv_bytes(), msg)
3027
3028 conn.send_bytes(SENTINEL)
3029 conn.close()
3030 p.join()
3031
3032 def test_sendbytes(self):
3033 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06003034 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00003035
3036 msg = latin('abcdefghijklmnopqrstuvwxyz')
3037 a, b = self.Pipe()
3038
3039 a.send_bytes(msg)
3040 self.assertEqual(b.recv_bytes(), msg)
3041
3042 a.send_bytes(msg, 5)
3043 self.assertEqual(b.recv_bytes(), msg[5:])
3044
3045 a.send_bytes(msg, 7, 8)
3046 self.assertEqual(b.recv_bytes(), msg[7:7+8])
3047
3048 a.send_bytes(msg, 26)
3049 self.assertEqual(b.recv_bytes(), latin(''))
3050
3051 a.send_bytes(msg, 26, 0)
3052 self.assertEqual(b.recv_bytes(), latin(''))
3053
3054 self.assertRaises(ValueError, a.send_bytes, msg, 27)
3055
3056 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3057
3058 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3059
3060 self.assertRaises(ValueError, a.send_bytes, msg, -1)
3061
3062 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3063
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003064 @classmethod
3065 def _is_fd_assigned(cls, fd):
3066 try:
3067 os.fstat(fd)
3068 except OSError as e:
3069 if e.errno == errno.EBADF:
3070 return False
3071 raise
3072 else:
3073 return True
3074
3075 @classmethod
3076 def _writefd(cls, conn, data, create_dummy_fds=False):
3077 if create_dummy_fds:
3078 for i in range(0, 256):
3079 if not cls._is_fd_assigned(i):
3080 os.dup2(conn.fileno(), i)
3081 fd = reduction.recv_handle(conn)
3082 if msvcrt:
3083 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3084 os.write(fd, data)
3085 os.close(fd)
3086
Charles-François Natalibc8f0822011-09-20 20:36:51 +02003087 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003088 def test_fd_transfer(self):
3089 if self.TYPE != 'processes':
3090 self.skipTest("only makes sense with processes")
3091 conn, child_conn = self.Pipe(duplex=True)
3092
3093 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02003094 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003095 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02003096 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003097 with open(test.support.TESTFN, "wb") as f:
3098 fd = f.fileno()
3099 if msvcrt:
3100 fd = msvcrt.get_osfhandle(fd)
3101 reduction.send_handle(conn, fd, p.pid)
3102 p.join()
3103 with open(test.support.TESTFN, "rb") as f:
3104 self.assertEqual(f.read(), b"foo")
3105
Charles-François Natalibc8f0822011-09-20 20:36:51 +02003106 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003107 @unittest.skipIf(sys.platform == "win32",
3108 "test semantics don't make sense on Windows")
3109 @unittest.skipIf(MAXFD <= 256,
3110 "largest assignable fd number is too small")
3111 @unittest.skipUnless(hasattr(os, "dup2"),
3112 "test needs os.dup2()")
3113 def test_large_fd_transfer(self):
3114 # With fd > 256 (issue #11657)
3115 if self.TYPE != 'processes':
3116 self.skipTest("only makes sense with processes")
3117 conn, child_conn = self.Pipe(duplex=True)
3118
3119 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02003120 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003121 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02003122 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003123 with open(test.support.TESTFN, "wb") as f:
3124 fd = f.fileno()
3125 for newfd in range(256, MAXFD):
3126 if not self._is_fd_assigned(newfd):
3127 break
3128 else:
3129 self.fail("could not find an unassigned large file descriptor")
3130 os.dup2(fd, newfd)
3131 try:
3132 reduction.send_handle(conn, newfd, p.pid)
3133 finally:
3134 os.close(newfd)
3135 p.join()
3136 with open(test.support.TESTFN, "rb") as f:
3137 self.assertEqual(f.read(), b"bar")
3138
Jesus Cea4507e642011-09-21 03:53:25 +02003139 @classmethod
3140 def _send_data_without_fd(self, conn):
3141 os.write(conn.fileno(), b"\0")
3142
Charles-François Natalie51c8da2011-09-21 18:48:21 +02003143 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02003144 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3145 def test_missing_fd_transfer(self):
3146 # Check that exception is raised when received data is not
3147 # accompanied by a file descriptor in ancillary data.
3148 if self.TYPE != 'processes':
3149 self.skipTest("only makes sense with processes")
3150 conn, child_conn = self.Pipe(duplex=True)
3151
3152 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3153 p.daemon = True
3154 p.start()
3155 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3156 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003157
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003158 def test_context(self):
3159 a, b = self.Pipe()
3160
3161 with a, b:
3162 a.send(1729)
3163 self.assertEqual(b.recv(), 1729)
3164 if self.TYPE == 'processes':
3165 self.assertFalse(a.closed)
3166 self.assertFalse(b.closed)
3167
3168 if self.TYPE == 'processes':
3169 self.assertTrue(a.closed)
3170 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003171 self.assertRaises(OSError, a.recv)
3172 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003173
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003174class _TestListener(BaseTestCase):
3175
Richard Oudkerk91257752012-06-15 21:53:34 +01003176 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003177
3178 def test_multiple_bind(self):
3179 for family in self.connection.families:
3180 l = self.connection.Listener(family=family)
3181 self.addCleanup(l.close)
3182 self.assertRaises(OSError, self.connection.Listener,
3183 l.address, family)
3184
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003185 def test_context(self):
3186 with self.connection.Listener() as l:
3187 with self.connection.Client(l.address) as c:
3188 with l.accept() as d:
3189 c.send(1729)
3190 self.assertEqual(d.recv(), 1729)
3191
3192 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003193 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003194
Benjamin Petersone711caf2008-06-11 16:44:04 +00003195class _TestListenerClient(BaseTestCase):
3196
3197 ALLOWED_TYPES = ('processes', 'threads')
3198
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003199 @classmethod
3200 def _test(cls, address):
3201 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003202 conn.send('hello')
3203 conn.close()
3204
3205 def test_listener_client(self):
3206 for family in self.connection.families:
3207 l = self.connection.Listener(family=family)
3208 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00003209 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003210 p.start()
3211 conn = l.accept()
3212 self.assertEqual(conn.recv(), 'hello')
3213 p.join()
3214 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003215
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01003216 def test_issue14725(self):
3217 l = self.connection.Listener()
3218 p = self.Process(target=self._test, args=(l.address,))
3219 p.daemon = True
3220 p.start()
3221 time.sleep(1)
3222 # On Windows the client process should by now have connected,
3223 # written data and closed the pipe handle by now. This causes
3224 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
3225 # 14725.
3226 conn = l.accept()
3227 self.assertEqual(conn.recv(), 'hello')
3228 conn.close()
3229 p.join()
3230 l.close()
3231
Richard Oudkerked9e06c2013-01-13 22:46:48 +00003232 def test_issue16955(self):
3233 for fam in self.connection.families:
3234 l = self.connection.Listener(family=fam)
3235 c = self.connection.Client(l.address)
3236 a = l.accept()
3237 a.send_bytes(b"hello")
3238 self.assertTrue(c.poll(1))
3239 a.close()
3240 c.close()
3241 l.close()
3242
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003243class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003244
3245 ALLOWED_TYPES = ('processes', 'threads')
3246
3247 def test_empty_string(self):
3248 a, b = self.Pipe()
3249 self.assertEqual(a.poll(), False)
3250 b.send_bytes(b'')
3251 self.assertEqual(a.poll(), True)
3252 self.assertEqual(a.poll(), True)
3253
3254 @classmethod
3255 def _child_strings(cls, conn, strings):
3256 for s in strings:
3257 time.sleep(0.1)
3258 conn.send_bytes(s)
3259 conn.close()
3260
3261 def test_strings(self):
3262 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3263 a, b = self.Pipe()
3264 p = self.Process(target=self._child_strings, args=(b, strings))
3265 p.start()
3266
3267 for s in strings:
3268 for i in range(200):
3269 if a.poll(0.01):
3270 break
3271 x = a.recv_bytes()
3272 self.assertEqual(s, x)
3273
3274 p.join()
3275
3276 @classmethod
3277 def _child_boundaries(cls, r):
3278 # Polling may "pull" a message in to the child process, but we
3279 # don't want it to pull only part of a message, as that would
3280 # corrupt the pipe for any other processes which might later
3281 # read from it.
3282 r.poll(5)
3283
3284 def test_boundaries(self):
3285 r, w = self.Pipe(False)
3286 p = self.Process(target=self._child_boundaries, args=(r,))
3287 p.start()
3288 time.sleep(2)
3289 L = [b"first", b"second"]
3290 for obj in L:
3291 w.send_bytes(obj)
3292 w.close()
3293 p.join()
3294 self.assertIn(r.recv_bytes(), L)
3295
3296 @classmethod
3297 def _child_dont_merge(cls, b):
3298 b.send_bytes(b'a')
3299 b.send_bytes(b'b')
3300 b.send_bytes(b'cd')
3301
3302 def test_dont_merge(self):
3303 a, b = self.Pipe()
3304 self.assertEqual(a.poll(0.0), False)
3305 self.assertEqual(a.poll(0.1), False)
3306
3307 p = self.Process(target=self._child_dont_merge, args=(b,))
3308 p.start()
3309
3310 self.assertEqual(a.recv_bytes(), b'a')
3311 self.assertEqual(a.poll(1.0), True)
3312 self.assertEqual(a.poll(1.0), True)
3313 self.assertEqual(a.recv_bytes(), b'b')
3314 self.assertEqual(a.poll(1.0), True)
3315 self.assertEqual(a.poll(1.0), True)
3316 self.assertEqual(a.poll(0.0), True)
3317 self.assertEqual(a.recv_bytes(), b'cd')
3318
3319 p.join()
3320
Benjamin Petersone711caf2008-06-11 16:44:04 +00003321#
3322# Test of sending connection and socket objects between processes
3323#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003324
3325@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00003326class _TestPicklingConnections(BaseTestCase):
3327
3328 ALLOWED_TYPES = ('processes',)
3329
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003330 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02003331 def tearDownClass(cls):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003332 from multiprocessing import resource_sharer
Victor Stinner11f08072017-09-15 06:55:31 -07003333 resource_sharer.stop(timeout=TIMEOUT)
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02003334
3335 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003336 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003337 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003338 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003339 conn.send(l.address)
3340 new_conn = l.accept()
3341 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003342 new_conn.close()
3343 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003344
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02003345 l = socket.create_server((test.support.HOST, 0))
Richard Oudkerk5d73c172012-05-08 22:24:47 +01003346 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003347 new_conn, addr = l.accept()
3348 conn.send(new_conn)
3349 new_conn.close()
3350 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003351
3352 conn.recv()
3353
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003354 @classmethod
3355 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003356 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003357 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003358 client.send(msg.upper())
3359 client.close()
3360
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003361 address, msg = conn.recv()
3362 client = socket.socket()
3363 client.connect(address)
3364 client.sendall(msg.upper())
3365 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003366
3367 conn.close()
3368
3369 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003370 families = self.connection.families
3371
3372 lconn, lconn0 = self.Pipe()
3373 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02003374 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003375 lp.start()
3376 lconn0.close()
3377
3378 rconn, rconn0 = self.Pipe()
3379 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003380 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003381 rp.start()
3382 rconn0.close()
3383
3384 for fam in families:
3385 msg = ('This connection uses family %s' % fam).encode('ascii')
3386 address = lconn.recv()
3387 rconn.send((address, msg))
3388 new_conn = lconn.recv()
3389 self.assertEqual(new_conn.recv(), msg.upper())
3390
3391 rconn.send(None)
3392
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003393 msg = latin('This connection uses a normal socket')
3394 address = lconn.recv()
3395 rconn.send((address, msg))
3396 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01003397 buf = []
3398 while True:
3399 s = new_conn.recv(100)
3400 if not s:
3401 break
3402 buf.append(s)
3403 buf = b''.join(buf)
3404 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003405 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003406
3407 lconn.send(None)
3408
3409 rconn.close()
3410 lconn.close()
3411
3412 lp.join()
3413 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003414
3415 @classmethod
3416 def child_access(cls, conn):
3417 w = conn.recv()
3418 w.send('all is well')
3419 w.close()
3420
3421 r = conn.recv()
3422 msg = r.recv()
3423 conn.send(msg*2)
3424
3425 conn.close()
3426
3427 def test_access(self):
3428 # On Windows, if we do not specify a destination pid when
3429 # using DupHandle then we need to be careful to use the
3430 # correct access flags for DuplicateHandle(), or else
3431 # DupHandle.detach() will raise PermissionError. For example,
3432 # for a read only pipe handle we should use
3433 # access=FILE_GENERIC_READ. (Unfortunately
3434 # DUPLICATE_SAME_ACCESS does not work.)
3435 conn, child_conn = self.Pipe()
3436 p = self.Process(target=self.child_access, args=(child_conn,))
3437 p.daemon = True
3438 p.start()
3439 child_conn.close()
3440
3441 r, w = self.Pipe(duplex=False)
3442 conn.send(w)
3443 w.close()
3444 self.assertEqual(r.recv(), 'all is well')
3445 r.close()
3446
3447 r, w = self.Pipe(duplex=False)
3448 conn.send(r)
3449 r.close()
3450 w.send('foobar')
3451 w.close()
3452 self.assertEqual(conn.recv(), 'foobar'*2)
3453
Victor Stinnerb4c52962017-07-25 02:40:55 +02003454 p.join()
3455
Benjamin Petersone711caf2008-06-11 16:44:04 +00003456#
3457#
3458#
3459
3460class _TestHeap(BaseTestCase):
3461
3462 ALLOWED_TYPES = ('processes',)
3463
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003464 def setUp(self):
3465 super().setUp()
3466 # Make pristine heap for these tests
3467 self.old_heap = multiprocessing.heap.BufferWrapper._heap
3468 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3469
3470 def tearDown(self):
3471 multiprocessing.heap.BufferWrapper._heap = self.old_heap
3472 super().tearDown()
3473
Benjamin Petersone711caf2008-06-11 16:44:04 +00003474 def test_heap(self):
3475 iterations = 5000
3476 maxblocks = 50
3477 blocks = []
3478
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003479 # get the heap object
3480 heap = multiprocessing.heap.BufferWrapper._heap
3481 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3482
Benjamin Petersone711caf2008-06-11 16:44:04 +00003483 # create and destroy lots of blocks of different sizes
3484 for i in range(iterations):
3485 size = int(random.lognormvariate(0, 1) * 1000)
3486 b = multiprocessing.heap.BufferWrapper(size)
3487 blocks.append(b)
3488 if len(blocks) > maxblocks:
3489 i = random.randrange(maxblocks)
3490 del blocks[i]
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003491 del b
Benjamin Petersone711caf2008-06-11 16:44:04 +00003492
3493 # verify the state of the heap
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003494 with heap._lock:
3495 all = []
3496 free = 0
3497 occupied = 0
3498 for L in list(heap._len_to_seq.values()):
3499 # count all free blocks in arenas
3500 for arena, start, stop in L:
3501 all.append((heap._arenas.index(arena), start, stop,
3502 stop-start, 'free'))
3503 free += (stop-start)
3504 for arena, arena_blocks in heap._allocated_blocks.items():
3505 # count all allocated blocks in arenas
3506 for start, stop in arena_blocks:
3507 all.append((heap._arenas.index(arena), start, stop,
3508 stop-start, 'occupied'))
3509 occupied += (stop-start)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003510
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003511 self.assertEqual(free + occupied,
3512 sum(arena.size for arena in heap._arenas))
Benjamin Petersone711caf2008-06-11 16:44:04 +00003513
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003514 all.sort()
3515
3516 for i in range(len(all)-1):
3517 (arena, start, stop) = all[i][:3]
3518 (narena, nstart, nstop) = all[i+1][:3]
3519 if arena != narena:
3520 # Two different arenas
3521 self.assertEqual(stop, heap._arenas[arena].size) # last block
3522 self.assertEqual(nstart, 0) # first block
3523 else:
3524 # Same arena: two adjacent blocks
3525 self.assertEqual(stop, nstart)
3526
3527 # test free'ing all blocks
3528 random.shuffle(blocks)
3529 while blocks:
3530 blocks.pop()
3531
3532 self.assertEqual(heap._n_frees, heap._n_mallocs)
3533 self.assertEqual(len(heap._pending_free_blocks), 0)
3534 self.assertEqual(len(heap._arenas), 0)
3535 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3536 self.assertEqual(len(heap._len_to_seq), 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003537
Charles-François Natali778db492011-07-02 14:35:49 +02003538 def test_free_from_gc(self):
3539 # Check that freeing of blocks by the garbage collector doesn't deadlock
3540 # (issue #12352).
3541 # Make sure the GC is enabled, and set lower collection thresholds to
3542 # make collections more frequent (and increase the probability of
3543 # deadlock).
3544 if not gc.isenabled():
3545 gc.enable()
3546 self.addCleanup(gc.disable)
3547 thresholds = gc.get_threshold()
3548 self.addCleanup(gc.set_threshold, *thresholds)
3549 gc.set_threshold(10)
3550
3551 # perform numerous block allocations, with cyclic references to make
3552 # sure objects are collected asynchronously by the gc
3553 for i in range(5000):
3554 a = multiprocessing.heap.BufferWrapper(1)
3555 b = multiprocessing.heap.BufferWrapper(1)
3556 # circular references
3557 a.buddy = b
3558 b.buddy = a
3559
Benjamin Petersone711caf2008-06-11 16:44:04 +00003560#
3561#
3562#
3563
Benjamin Petersone711caf2008-06-11 16:44:04 +00003564class _Foo(Structure):
3565 _fields_ = [
3566 ('x', c_int),
Gareth Rees3913bad2017-07-21 11:35:33 +01003567 ('y', c_double),
3568 ('z', c_longlong,)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003569 ]
3570
3571class _TestSharedCTypes(BaseTestCase):
3572
3573 ALLOWED_TYPES = ('processes',)
3574
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00003575 def setUp(self):
3576 if not HAS_SHAREDCTYPES:
3577 self.skipTest("requires multiprocessing.sharedctypes")
3578
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003579 @classmethod
Gareth Rees3913bad2017-07-21 11:35:33 +01003580 def _double(cls, x, y, z, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003581 x.value *= 2
3582 y.value *= 2
Gareth Rees3913bad2017-07-21 11:35:33 +01003583 z.value *= 2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003584 foo.x *= 2
3585 foo.y *= 2
3586 string.value *= 2
3587 for i in range(len(arr)):
3588 arr[i] *= 2
3589
3590 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003591 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003592 y = Value(c_double, 1.0/3.0, lock=lock)
Gareth Rees3913bad2017-07-21 11:35:33 +01003593 z = Value(c_longlong, 2 ** 33, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003594 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003595 arr = self.Array('d', list(range(10)), lock=lock)
3596 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00003597 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00003598
Gareth Rees3913bad2017-07-21 11:35:33 +01003599 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02003600 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003601 p.start()
3602 p.join()
3603
3604 self.assertEqual(x.value, 14)
3605 self.assertAlmostEqual(y.value, 2.0/3.0)
Gareth Rees3913bad2017-07-21 11:35:33 +01003606 self.assertEqual(z.value, 2 ** 34)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003607 self.assertEqual(foo.x, 6)
3608 self.assertAlmostEqual(foo.y, 4.0)
3609 for i in range(10):
3610 self.assertAlmostEqual(arr[i], i*2)
3611 self.assertEqual(string.value, latin('hellohello'))
3612
3613 def test_synchronize(self):
3614 self.test_sharedctypes(lock=True)
3615
3616 def test_copy(self):
Gareth Rees3913bad2017-07-21 11:35:33 +01003617 foo = _Foo(2, 5.0, 2 ** 33)
Brian Curtinafa88b52010-10-07 01:12:19 +00003618 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003619 foo.x = 0
3620 foo.y = 0
Gareth Rees3913bad2017-07-21 11:35:33 +01003621 foo.z = 0
Benjamin Petersone711caf2008-06-11 16:44:04 +00003622 self.assertEqual(bar.x, 2)
3623 self.assertAlmostEqual(bar.y, 5.0)
Gareth Rees3913bad2017-07-21 11:35:33 +01003624 self.assertEqual(bar.z, 2 ** 33)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003625
Davin Pottse895de32019-02-23 22:08:16 -06003626
3627@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
3628class _TestSharedMemory(BaseTestCase):
3629
3630 ALLOWED_TYPES = ('processes',)
3631
3632 @staticmethod
3633 def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data):
3634 if isinstance(shmem_name_or_obj, str):
3635 local_sms = shared_memory.SharedMemory(shmem_name_or_obj)
3636 else:
3637 local_sms = shmem_name_or_obj
3638 local_sms.buf[:len(binary_data)] = binary_data
3639 local_sms.close()
3640
3641 def test_shared_memory_basics(self):
3642 sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512)
3643 self.addCleanup(sms.unlink)
3644
3645 # Verify attributes are readable.
3646 self.assertEqual(sms.name, 'test01_tsmb')
3647 self.assertGreaterEqual(sms.size, 512)
3648 self.assertGreaterEqual(len(sms.buf), sms.size)
3649
3650 # Modify contents of shared memory segment through memoryview.
3651 sms.buf[0] = 42
3652 self.assertEqual(sms.buf[0], 42)
3653
3654 # Attach to existing shared memory segment.
3655 also_sms = shared_memory.SharedMemory('test01_tsmb')
3656 self.assertEqual(also_sms.buf[0], 42)
3657 also_sms.close()
3658
3659 # Attach to existing shared memory segment but specify a new size.
3660 same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size)
3661 self.assertLess(same_sms.size, 20*sms.size) # Size was ignored.
3662 same_sms.close()
3663
3664 if shared_memory._USE_POSIX:
3665 # Posix Shared Memory can only be unlinked once. Here we
3666 # test an implementation detail that is not observed across
3667 # all supported platforms (since WindowsNamedSharedMemory
3668 # manages unlinking on its own and unlink() does nothing).
3669 # True release of shared memory segment does not necessarily
3670 # happen until process exits, depending on the OS platform.
3671 with self.assertRaises(FileNotFoundError):
3672 sms_uno = shared_memory.SharedMemory(
3673 'test01_dblunlink',
3674 create=True,
3675 size=5000
3676 )
3677
3678 try:
3679 self.assertGreaterEqual(sms_uno.size, 5000)
3680
3681 sms_duo = shared_memory.SharedMemory('test01_dblunlink')
3682 sms_duo.unlink() # First shm_unlink() call.
3683 sms_duo.close()
3684 sms_uno.close()
3685
3686 finally:
3687 sms_uno.unlink() # A second shm_unlink() call is bad.
3688
3689 with self.assertRaises(FileExistsError):
3690 # Attempting to create a new shared memory segment with a
3691 # name that is already in use triggers an exception.
3692 there_can_only_be_one_sms = shared_memory.SharedMemory(
3693 'test01_tsmb',
3694 create=True,
3695 size=512
3696 )
3697
3698 if shared_memory._USE_POSIX:
3699 # Requesting creation of a shared memory segment with the option
3700 # to attach to an existing segment, if that name is currently in
3701 # use, should not trigger an exception.
3702 # Note: Using a smaller size could possibly cause truncation of
3703 # the existing segment but is OS platform dependent. In the
3704 # case of MacOS/darwin, requesting a smaller size is disallowed.
3705 class OptionalAttachSharedMemory(shared_memory.SharedMemory):
3706 _flags = os.O_CREAT | os.O_RDWR
3707 ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb')
3708 self.assertEqual(ok_if_exists_sms.size, sms.size)
3709 ok_if_exists_sms.close()
3710
3711 # Attempting to attach to an existing shared memory segment when
3712 # no segment exists with the supplied name triggers an exception.
3713 with self.assertRaises(FileNotFoundError):
3714 nonexisting_sms = shared_memory.SharedMemory('test01_notthere')
3715 nonexisting_sms.unlink() # Error should occur on prior line.
3716
3717 sms.close()
3718
3719 def test_shared_memory_across_processes(self):
3720 sms = shared_memory.SharedMemory('test02_tsmap', True, size=512)
3721 self.addCleanup(sms.unlink)
3722
3723 # Verify remote attachment to existing block by name is working.
3724 p = self.Process(
3725 target=self._attach_existing_shmem_then_write,
3726 args=(sms.name, b'howdy')
3727 )
3728 p.daemon = True
3729 p.start()
3730 p.join()
3731 self.assertEqual(bytes(sms.buf[:5]), b'howdy')
3732
3733 # Verify pickling of SharedMemory instance also works.
3734 p = self.Process(
3735 target=self._attach_existing_shmem_then_write,
3736 args=(sms, b'HELLO')
3737 )
3738 p.daemon = True
3739 p.start()
3740 p.join()
3741 self.assertEqual(bytes(sms.buf[:5]), b'HELLO')
3742
3743 sms.close()
3744
Pierre Glaserd0d64ad2019-05-10 20:42:35 +02003745 @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
3746 def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
3747 # bpo-36368: protect SharedMemoryManager server process from
3748 # KeyboardInterrupt signals.
3749 smm = multiprocessing.managers.SharedMemoryManager()
3750 smm.start()
3751
3752 # make sure the manager works properly at the beginning
3753 sl = smm.ShareableList(range(10))
3754
3755 # the manager's server should ignore KeyboardInterrupt signals, and
3756 # maintain its connection with the current process, and success when
3757 # asked to deliver memory segments.
3758 os.kill(smm._process.pid, signal.SIGINT)
3759
3760 sl2 = smm.ShareableList(range(10))
3761
3762 # test that the custom signal handler registered in the Manager does
3763 # not affect signal handling in the parent process.
3764 with self.assertRaises(KeyboardInterrupt):
3765 os.kill(os.getpid(), signal.SIGINT)
3766
3767 smm.shutdown()
3768
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02003769 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
3770 def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
3771 # bpo-36867: test that a SharedMemoryManager uses the
3772 # same resource_tracker process as its parent.
3773 cmd = '''if 1:
3774 from multiprocessing.managers import SharedMemoryManager
3775
3776
3777 smm = SharedMemoryManager()
3778 smm.start()
3779 sl = smm.ShareableList(range(10))
3780 smm.shutdown()
3781 '''
3782 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
3783
3784 # Before bpo-36867 was fixed, a SharedMemoryManager not using the same
3785 # resource_tracker process as its parent would make the parent's
3786 # tracker complain about sl being leaked even though smm.shutdown()
3787 # properly released sl.
3788 self.assertFalse(err)
3789
Davin Pottse895de32019-02-23 22:08:16 -06003790 def test_shared_memory_SharedMemoryManager_basics(self):
3791 smm1 = multiprocessing.managers.SharedMemoryManager()
3792 with self.assertRaises(ValueError):
3793 smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started
3794 smm1.start()
3795 lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ]
3796 lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ]
3797 doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name)
3798 self.assertEqual(len(doppleganger_list0), 5)
3799 doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name)
3800 self.assertGreaterEqual(len(doppleganger_shm0.buf), 32)
3801 held_name = lom[0].name
3802 smm1.shutdown()
3803 if sys.platform != "win32":
3804 # Calls to unlink() have no effect on Windows platform; shared
3805 # memory will only be released once final process exits.
3806 with self.assertRaises(FileNotFoundError):
3807 # No longer there to be attached to again.
3808 absent_shm = shared_memory.SharedMemory(name=held_name)
3809
3810 with multiprocessing.managers.SharedMemoryManager() as smm2:
3811 sl = smm2.ShareableList("howdy")
3812 shm = smm2.SharedMemory(size=128)
3813 held_name = sl.shm.name
3814 if sys.platform != "win32":
3815 with self.assertRaises(FileNotFoundError):
3816 # No longer there to be attached to again.
3817 absent_sl = shared_memory.ShareableList(name=held_name)
3818
3819
3820 def test_shared_memory_ShareableList_basics(self):
3821 sl = shared_memory.ShareableList(
3822 ['howdy', b'HoWdY', -273.154, 100, None, True, 42]
3823 )
3824 self.addCleanup(sl.shm.unlink)
3825
3826 # Verify attributes are readable.
3827 self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
3828
3829 # Exercise len().
3830 self.assertEqual(len(sl), 7)
3831
3832 # Exercise index().
3833 with warnings.catch_warnings():
3834 # Suppress BytesWarning when comparing against b'HoWdY'.
3835 warnings.simplefilter('ignore')
3836 with self.assertRaises(ValueError):
3837 sl.index('100')
3838 self.assertEqual(sl.index(100), 3)
3839
3840 # Exercise retrieving individual values.
3841 self.assertEqual(sl[0], 'howdy')
3842 self.assertEqual(sl[-2], True)
3843
3844 # Exercise iterability.
3845 self.assertEqual(
3846 tuple(sl),
3847 ('howdy', b'HoWdY', -273.154, 100, None, True, 42)
3848 )
3849
3850 # Exercise modifying individual values.
3851 sl[3] = 42
3852 self.assertEqual(sl[3], 42)
3853 sl[4] = 'some' # Change type at a given position.
3854 self.assertEqual(sl[4], 'some')
3855 self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q')
3856 with self.assertRaises(ValueError):
3857 sl[4] = 'far too many' # Exceeds available storage.
3858 self.assertEqual(sl[4], 'some')
3859
3860 # Exercise count().
3861 with warnings.catch_warnings():
3862 # Suppress BytesWarning when comparing against b'HoWdY'.
3863 warnings.simplefilter('ignore')
3864 self.assertEqual(sl.count(42), 2)
3865 self.assertEqual(sl.count(b'HoWdY'), 1)
3866 self.assertEqual(sl.count(b'adios'), 0)
3867
3868 # Exercise creating a duplicate.
3869 sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate')
3870 try:
3871 self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
3872 self.assertEqual('test03_duplicate', sl_copy.shm.name)
3873 self.assertEqual(list(sl), list(sl_copy))
3874 self.assertEqual(sl.format, sl_copy.format)
3875 sl_copy[-1] = 77
3876 self.assertEqual(sl_copy[-1], 77)
3877 self.assertNotEqual(sl[-1], 77)
3878 sl_copy.shm.close()
3879 finally:
3880 sl_copy.shm.unlink()
3881
3882 # Obtain a second handle on the same ShareableList.
3883 sl_tethered = shared_memory.ShareableList(name=sl.shm.name)
3884 self.assertEqual(sl.shm.name, sl_tethered.shm.name)
3885 sl_tethered[-1] = 880
3886 self.assertEqual(sl[-1], 880)
3887 sl_tethered.shm.close()
3888
3889 sl.shm.close()
3890
3891 # Exercise creating an empty ShareableList.
3892 empty_sl = shared_memory.ShareableList()
3893 try:
3894 self.assertEqual(len(empty_sl), 0)
3895 self.assertEqual(empty_sl.format, '')
3896 self.assertEqual(empty_sl.count('any'), 0)
3897 with self.assertRaises(ValueError):
3898 empty_sl.index(None)
3899 empty_sl.shm.close()
3900 finally:
3901 empty_sl.shm.unlink()
3902
3903 def test_shared_memory_ShareableList_pickling(self):
3904 sl = shared_memory.ShareableList(range(10))
3905 self.addCleanup(sl.shm.unlink)
3906
3907 serialized_sl = pickle.dumps(sl)
3908 deserialized_sl = pickle.loads(serialized_sl)
3909 self.assertTrue(
3910 isinstance(deserialized_sl, shared_memory.ShareableList)
3911 )
3912 self.assertTrue(deserialized_sl[-1], 9)
3913 self.assertFalse(sl is deserialized_sl)
3914 deserialized_sl[4] = "changed"
3915 self.assertEqual(sl[4], "changed")
3916
3917 # Verify data is not being put into the pickled representation.
3918 name = 'a' * len(sl.shm.name)
3919 larger_sl = shared_memory.ShareableList(range(400))
3920 self.addCleanup(larger_sl.shm.unlink)
3921 serialized_larger_sl = pickle.dumps(larger_sl)
3922 self.assertTrue(len(serialized_sl) == len(serialized_larger_sl))
3923 larger_sl.shm.close()
3924
3925 deserialized_sl.shm.close()
3926 sl.shm.close()
3927
Pierre Glaserf22cc692019-05-10 22:59:08 +02003928 def test_shared_memory_cleaned_after_process_termination(self):
Pierre Glaserf22cc692019-05-10 22:59:08 +02003929 cmd = '''if 1:
3930 import os, time, sys
3931 from multiprocessing import shared_memory
3932
3933 # Create a shared_memory segment, and send the segment name
3934 sm = shared_memory.SharedMemory(create=True, size=10)
3935 sys.stdout.write(sm._name + '\\n')
3936 sys.stdout.flush()
3937 time.sleep(100)
3938 '''
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02003939 with subprocess.Popen([sys.executable, '-E', '-c', cmd],
3940 stdout=subprocess.PIPE,
3941 stderr=subprocess.PIPE) as p:
3942 name = p.stdout.readline().strip().decode()
Pierre Glaserf22cc692019-05-10 22:59:08 +02003943
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02003944 # killing abruptly processes holding reference to a shared memory
3945 # segment should not leak the given memory segment.
3946 p.terminate()
3947 p.wait()
Pierre Glaserf22cc692019-05-10 22:59:08 +02003948
Pierre Glasercbe72d82019-05-17 20:20:07 +02003949 deadline = time.monotonic() + 60
3950 t = 0.1
3951 while time.monotonic() < deadline:
3952 time.sleep(t)
3953 t = min(t*2, 5)
3954 try:
3955 smm = shared_memory.SharedMemory(name, create=False)
3956 except FileNotFoundError:
3957 break
3958 else:
3959 raise AssertionError("A SharedMemory segment was leaked after"
3960 " a process was abruptly terminated.")
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02003961
3962 if os.name == 'posix':
3963 # A warning was emitted by the subprocess' own
3964 # resource_tracker (on Windows, shared memory segments
3965 # are released automatically by the OS).
3966 err = p.stderr.read().decode()
3967 self.assertIn(
3968 "resource_tracker: There appear to be 1 leaked "
3969 "shared_memory objects to clean up at shutdown", err)
Pierre Glaserf22cc692019-05-10 22:59:08 +02003970
Benjamin Petersone711caf2008-06-11 16:44:04 +00003971#
3972#
3973#
3974
3975class _TestFinalize(BaseTestCase):
3976
3977 ALLOWED_TYPES = ('processes',)
3978
Antoine Pitrou1eb6c002017-06-13 17:10:39 +02003979 def setUp(self):
3980 self.registry_backup = util._finalizer_registry.copy()
3981 util._finalizer_registry.clear()
3982
3983 def tearDown(self):
3984 self.assertFalse(util._finalizer_registry)
3985 util._finalizer_registry.update(self.registry_backup)
3986
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003987 @classmethod
3988 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003989 class Foo(object):
3990 pass
3991
3992 a = Foo()
3993 util.Finalize(a, conn.send, args=('a',))
3994 del a # triggers callback for a
3995
3996 b = Foo()
3997 close_b = util.Finalize(b, conn.send, args=('b',))
3998 close_b() # triggers callback for b
3999 close_b() # does nothing because callback has already been called
4000 del b # does nothing because callback has already been called
4001
4002 c = Foo()
4003 util.Finalize(c, conn.send, args=('c',))
4004
4005 d10 = Foo()
4006 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
4007
4008 d01 = Foo()
4009 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
4010 d02 = Foo()
4011 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
4012 d03 = Foo()
4013 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
4014
4015 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
4016
4017 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
4018
Ezio Melotti13925002011-03-16 11:05:33 +02004019 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00004020 # garbage collecting locals
4021 util._exit_function()
4022 conn.close()
4023 os._exit(0)
4024
4025 def test_finalize(self):
4026 conn, child_conn = self.Pipe()
4027
4028 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02004029 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00004030 p.start()
4031 p.join()
4032
4033 result = [obj for obj in iter(conn.recv, 'STOP')]
4034 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
4035
Antoine Pitrou1eb6c002017-06-13 17:10:39 +02004036 def test_thread_safety(self):
4037 # bpo-24484: _run_finalizers() should be thread-safe
4038 def cb():
4039 pass
4040
4041 class Foo(object):
4042 def __init__(self):
4043 self.ref = self # create reference cycle
4044 # insert finalizer at random key
4045 util.Finalize(self, cb, exitpriority=random.randint(1, 100))
4046
4047 finish = False
4048 exc = None
4049
4050 def run_finalizers():
4051 nonlocal exc
4052 while not finish:
4053 time.sleep(random.random() * 1e-1)
4054 try:
4055 # A GC run will eventually happen during this,
4056 # collecting stale Foo's and mutating the registry
4057 util._run_finalizers()
4058 except Exception as e:
4059 exc = e
4060
4061 def make_finalizers():
4062 nonlocal exc
4063 d = {}
4064 while not finish:
4065 try:
4066 # Old Foo's get gradually replaced and later
4067 # collected by the GC (because of the cyclic ref)
4068 d[random.getrandbits(5)] = {Foo() for i in range(10)}
4069 except Exception as e:
4070 exc = e
4071 d.clear()
4072
4073 old_interval = sys.getswitchinterval()
4074 old_threshold = gc.get_threshold()
4075 try:
4076 sys.setswitchinterval(1e-6)
4077 gc.set_threshold(5, 5, 5)
4078 threads = [threading.Thread(target=run_finalizers),
4079 threading.Thread(target=make_finalizers)]
4080 with test.support.start_threads(threads):
4081 time.sleep(4.0) # Wait a bit to trigger race condition
4082 finish = True
4083 if exc is not None:
4084 raise exc
4085 finally:
4086 sys.setswitchinterval(old_interval)
4087 gc.set_threshold(*old_threshold)
4088 gc.collect() # Collect remaining Foo's
4089
4090
Benjamin Petersone711caf2008-06-11 16:44:04 +00004091#
4092# Test that from ... import * works for each module
4093#
4094
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004095class _TestImportStar(unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00004096
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004097 def get_module_names(self):
4098 import glob
4099 folder = os.path.dirname(multiprocessing.__file__)
4100 pattern = os.path.join(folder, '*.py')
4101 files = glob.glob(pattern)
4102 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
4103 modules = ['multiprocessing.' + m for m in modules]
4104 modules.remove('multiprocessing.__init__')
4105 modules.append('multiprocessing')
4106 return modules
Benjamin Petersone711caf2008-06-11 16:44:04 +00004107
4108 def test_import(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004109 modules = self.get_module_names()
4110 if sys.platform == 'win32':
4111 modules.remove('multiprocessing.popen_fork')
4112 modules.remove('multiprocessing.popen_forkserver')
4113 modules.remove('multiprocessing.popen_spawn_posix')
4114 else:
4115 modules.remove('multiprocessing.popen_spawn_win32')
4116 if not HAS_REDUCTION:
4117 modules.remove('multiprocessing.popen_forkserver')
Florent Xiclunafd1b0932010-03-28 00:25:02 +00004118
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004119 if c_int is None:
Florent Xiclunafd1b0932010-03-28 00:25:02 +00004120 # This module requires _ctypes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004121 modules.remove('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00004122
4123 for name in modules:
4124 __import__(name)
4125 mod = sys.modules[name]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004126 self.assertTrue(hasattr(mod, '__all__'), name)
Benjamin Petersone711caf2008-06-11 16:44:04 +00004127
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004128 for attr in mod.__all__:
Benjamin Petersone711caf2008-06-11 16:44:04 +00004129 self.assertTrue(
4130 hasattr(mod, attr),
4131 '%r does not have attribute %r' % (mod, attr)
4132 )
4133
4134#
4135# Quick test that logging works -- does not test logging output
4136#
4137
4138class _TestLogging(BaseTestCase):
4139
4140 ALLOWED_TYPES = ('processes',)
4141
4142 def test_enable_logging(self):
4143 logger = multiprocessing.get_logger()
4144 logger.setLevel(util.SUBWARNING)
4145 self.assertTrue(logger is not None)
4146 logger.debug('this will not be printed')
4147 logger.info('nor will this')
4148 logger.setLevel(LOG_LEVEL)
4149
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00004150 @classmethod
4151 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00004152 logger = multiprocessing.get_logger()
4153 conn.send(logger.getEffectiveLevel())
4154
4155 def test_level(self):
4156 LEVEL1 = 32
4157 LEVEL2 = 37
4158
4159 logger = multiprocessing.get_logger()
4160 root_logger = logging.getLogger()
4161 root_level = root_logger.level
4162
4163 reader, writer = multiprocessing.Pipe(duplex=False)
4164
4165 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02004166 p = self.Process(target=self._test_level, args=(writer,))
Jesus Cea94f964f2011-09-09 20:26:57 +02004167 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00004168 self.assertEqual(LEVEL1, reader.recv())
Victor Stinner06634952017-07-24 13:02:20 +02004169 p.join()
4170 p.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00004171
4172 logger.setLevel(logging.NOTSET)
4173 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02004174 p = self.Process(target=self._test_level, args=(writer,))
Jesus Cea94f964f2011-09-09 20:26:57 +02004175 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00004176 self.assertEqual(LEVEL2, reader.recv())
Victor Stinner06634952017-07-24 13:02:20 +02004177 p.join()
4178 p.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00004179
4180 root_logger.setLevel(root_level)
4181 logger.setLevel(level=LOG_LEVEL)
4182
Jesse Nollerb9a49b72009-11-21 18:09:38 +00004183
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00004184# class _TestLoggingProcessName(BaseTestCase):
4185#
4186# def handle(self, record):
4187# assert record.processName == multiprocessing.current_process().name
4188# self.__handled = True
4189#
4190# def test_logging(self):
4191# handler = logging.Handler()
4192# handler.handle = self.handle
4193# self.__handled = False
4194# # Bypass getLogger() and side-effects
4195# logger = logging.getLoggerClass()(
4196# 'multiprocessing.test.TestLoggingProcessName')
4197# logger.addHandler(handler)
4198# logger.propagate = False
4199#
4200# logger.warn('foo')
4201# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00004202
Benjamin Petersone711caf2008-06-11 16:44:04 +00004203#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00004204# Check that Process.join() retries if os.waitpid() fails with EINTR
4205#
4206
4207class _TestPollEintr(BaseTestCase):
4208
4209 ALLOWED_TYPES = ('processes',)
4210
4211 @classmethod
4212 def _killer(cls, pid):
Richard Oudkerk6a53af82013-08-28 13:50:19 +01004213 time.sleep(0.1)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00004214 os.kill(pid, signal.SIGUSR1)
4215
4216 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4217 def test_poll_eintr(self):
4218 got_signal = [False]
4219 def record(*args):
4220 got_signal[0] = True
4221 pid = os.getpid()
4222 oldhandler = signal.signal(signal.SIGUSR1, record)
4223 try:
4224 killer = self.Process(target=self._killer, args=(pid,))
4225 killer.start()
Richard Oudkerk6a53af82013-08-28 13:50:19 +01004226 try:
4227 p = self.Process(target=time.sleep, args=(2,))
4228 p.start()
4229 p.join()
4230 finally:
4231 killer.join()
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00004232 self.assertTrue(got_signal[0])
4233 self.assertEqual(p.exitcode, 0)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00004234 finally:
4235 signal.signal(signal.SIGUSR1, oldhandler)
4236
4237#
Jesse Noller6214edd2009-01-19 16:23:53 +00004238# Test to verify handle verification, see issue 3321
4239#
4240
4241class TestInvalidHandle(unittest.TestCase):
4242
Victor Stinner937ee9e2018-06-26 02:11:06 +02004243 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00004244 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02004245 conn = multiprocessing.connection.Connection(44977608)
Charles-François Natali6703bb42013-09-06 21:12:22 +02004246 # check that poll() doesn't crash
Antoine Pitrou87cf2202011-05-09 17:04:27 +02004247 try:
Charles-François Natali6703bb42013-09-06 21:12:22 +02004248 conn.poll()
4249 except (ValueError, OSError):
4250 pass
Antoine Pitrou87cf2202011-05-09 17:04:27 +02004251 finally:
4252 # Hack private attribute _handle to avoid printing an error
4253 # in conn.__del__
4254 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004255 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02004256 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00004257
Benjamin Petersone711caf2008-06-11 16:44:04 +00004258
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01004259
Neal Norwitz5d6415e2008-08-25 01:53:32 +00004260class OtherTest(unittest.TestCase):
4261 # TODO: add more tests for deliver/answer challenge.
4262 def test_deliver_challenge_auth_failure(self):
4263 class _FakeConnection(object):
4264 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00004265 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00004266 def send_bytes(self, data):
4267 pass
4268 self.assertRaises(multiprocessing.AuthenticationError,
4269 multiprocessing.connection.deliver_challenge,
4270 _FakeConnection(), b'abc')
4271
4272 def test_answer_challenge_auth_failure(self):
4273 class _FakeConnection(object):
4274 def __init__(self):
4275 self.count = 0
4276 def recv_bytes(self, size):
4277 self.count += 1
4278 if self.count == 1:
4279 return multiprocessing.connection.CHALLENGE
4280 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00004281 return b'something bogus'
4282 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00004283 def send_bytes(self, data):
4284 pass
4285 self.assertRaises(multiprocessing.AuthenticationError,
4286 multiprocessing.connection.answer_challenge,
4287 _FakeConnection(), b'abc')
4288
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00004289#
4290# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
4291#
4292
4293def initializer(ns):
4294 ns.test += 1
4295
4296class TestInitializers(unittest.TestCase):
4297 def setUp(self):
4298 self.mgr = multiprocessing.Manager()
4299 self.ns = self.mgr.Namespace()
4300 self.ns.test = 0
4301
4302 def tearDown(self):
4303 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01004304 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00004305
4306 def test_manager_initializer(self):
4307 m = multiprocessing.managers.SyncManager()
4308 self.assertRaises(TypeError, m.start, 1)
4309 m.start(initializer, (self.ns,))
4310 self.assertEqual(self.ns.test, 1)
4311 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01004312 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00004313
4314 def test_pool_initializer(self):
4315 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
4316 p = multiprocessing.Pool(1, initializer, (self.ns,))
4317 p.close()
4318 p.join()
4319 self.assertEqual(self.ns.test, 1)
4320
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00004321#
4322# Issue 5155, 5313, 5331: Test process in processes
4323# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
4324#
4325
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01004326def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00004327 try:
4328 item = q.get(block=False)
4329 except pyqueue.Empty:
4330 pass
4331
Victor Stinnerb4c52962017-07-25 02:40:55 +02004332def _test_process():
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01004333 queue = multiprocessing.Queue()
4334 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
4335 subProc.daemon = True
4336 subProc.start()
4337 subProc.join()
4338
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00004339def _afunc(x):
4340 return x*x
4341
4342def pool_in_process():
4343 pool = multiprocessing.Pool(processes=4)
4344 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01004345 pool.close()
4346 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00004347
4348class _file_like(object):
4349 def __init__(self, delegate):
4350 self._delegate = delegate
4351 self._pid = None
4352
4353 @property
4354 def cache(self):
4355 pid = os.getpid()
4356 # There are no race conditions since fork keeps only the running thread
4357 if pid != self._pid:
4358 self._pid = pid
4359 self._cache = []
4360 return self._cache
4361
4362 def write(self, data):
4363 self.cache.append(data)
4364
4365 def flush(self):
4366 self._delegate.write(''.join(self.cache))
4367 self._cache = []
4368
4369class TestStdinBadfiledescriptor(unittest.TestCase):
4370
4371 def test_queue_in_process(self):
Victor Stinnerb4c52962017-07-25 02:40:55 +02004372 proc = multiprocessing.Process(target=_test_process)
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00004373 proc.start()
4374 proc.join()
4375
4376 def test_pool_in_process(self):
4377 p = multiprocessing.Process(target=pool_in_process)
4378 p.start()
4379 p.join()
4380
4381 def test_flushing(self):
4382 sio = io.StringIO()
4383 flike = _file_like(sio)
4384 flike.write('foo')
4385 proc = multiprocessing.Process(target=lambda: flike.flush())
4386 flike.flush()
4387 assert sio.getvalue() == 'foo'
4388
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004389
4390class TestWait(unittest.TestCase):
4391
4392 @classmethod
4393 def _child_test_wait(cls, w, slow):
4394 for i in range(10):
4395 if slow:
4396 time.sleep(random.random()*0.1)
4397 w.send((i, os.getpid()))
4398 w.close()
4399
4400 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004401 from multiprocessing.connection import wait
4402 readers = []
4403 procs = []
4404 messages = []
4405
4406 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01004407 r, w = multiprocessing.Pipe(duplex=False)
4408 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004409 p.daemon = True
4410 p.start()
4411 w.close()
4412 readers.append(r)
4413 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01004414 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004415
4416 while readers:
4417 for r in wait(readers):
4418 try:
4419 msg = r.recv()
4420 except EOFError:
4421 readers.remove(r)
4422 r.close()
4423 else:
4424 messages.append(msg)
4425
4426 messages.sort()
4427 expected = sorted((i, p.pid) for i in range(10) for p in procs)
4428 self.assertEqual(messages, expected)
4429
4430 @classmethod
4431 def _child_test_wait_socket(cls, address, slow):
4432 s = socket.socket()
4433 s.connect(address)
4434 for i in range(10):
4435 if slow:
4436 time.sleep(random.random()*0.1)
4437 s.sendall(('%s\n' % i).encode('ascii'))
4438 s.close()
4439
4440 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004441 from multiprocessing.connection import wait
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02004442 l = socket.create_server((test.support.HOST, 0))
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02004443 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004444 readers = []
4445 procs = []
4446 dic = {}
4447
4448 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01004449 p = multiprocessing.Process(target=self._child_test_wait_socket,
4450 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004451 p.daemon = True
4452 p.start()
4453 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01004454 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004455
4456 for i in range(4):
4457 r, _ = l.accept()
4458 readers.append(r)
4459 dic[r] = []
4460 l.close()
4461
4462 while readers:
4463 for r in wait(readers):
4464 msg = r.recv(32)
4465 if not msg:
4466 readers.remove(r)
4467 r.close()
4468 else:
4469 dic[r].append(msg)
4470
4471 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4472 for v in dic.values():
4473 self.assertEqual(b''.join(v), expected)
4474
4475 def test_wait_slow(self):
4476 self.test_wait(True)
4477
4478 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01004479 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004480
4481 def test_wait_timeout(self):
4482 from multiprocessing.connection import wait
4483
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004484 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004485 a, b = multiprocessing.Pipe()
4486
Victor Stinner2cf4c202018-12-17 09:36:36 +01004487 start = time.monotonic()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004488 res = wait([a, b], expected)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004489 delta = time.monotonic() - start
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004490
4491 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01004492 self.assertLess(delta, expected * 2)
4493 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004494
4495 b.send(None)
4496
Victor Stinner2cf4c202018-12-17 09:36:36 +01004497 start = time.monotonic()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004498 res = wait([a, b], 20)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004499 delta = time.monotonic() - start
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004500
4501 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01004502 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004503
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004504 @classmethod
4505 def signal_and_sleep(cls, sem, period):
4506 sem.release()
4507 time.sleep(period)
4508
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004509 def test_wait_integer(self):
4510 from multiprocessing.connection import wait
4511
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004512 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01004513 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004514 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004515 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004516 p = multiprocessing.Process(target=self.signal_and_sleep,
4517 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004518
4519 p.start()
4520 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004521 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004522
Victor Stinner2cf4c202018-12-17 09:36:36 +01004523 start = time.monotonic()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004524 res = wait([a, p.sentinel, b], expected + 20)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004525 delta = time.monotonic() - start
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004526
4527 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01004528 self.assertLess(delta, expected + 2)
4529 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004530
4531 a.send(None)
4532
Victor Stinner2cf4c202018-12-17 09:36:36 +01004533 start = time.monotonic()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004534 res = wait([a, p.sentinel, b], 20)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004535 delta = time.monotonic() - start
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004536
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01004537 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01004538 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004539
4540 b.send(None)
4541
Victor Stinner2cf4c202018-12-17 09:36:36 +01004542 start = time.monotonic()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004543 res = wait([a, p.sentinel, b], 20)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004544 delta = time.monotonic() - start
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004545
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01004546 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01004547 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004548
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004549 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004550 p.join()
4551
Richard Oudkerk59d54042012-05-10 16:11:12 +01004552 def test_neg_timeout(self):
4553 from multiprocessing.connection import wait
4554 a, b = multiprocessing.Pipe()
Victor Stinner2cf4c202018-12-17 09:36:36 +01004555 t = time.monotonic()
Richard Oudkerk59d54042012-05-10 16:11:12 +01004556 res = wait([a], timeout=-1)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004557 t = time.monotonic() - t
Richard Oudkerk59d54042012-05-10 16:11:12 +01004558 self.assertEqual(res, [])
4559 self.assertLess(t, 1)
4560 a.close()
4561 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004562
Antoine Pitrou709176f2012-04-01 17:19:09 +02004563#
4564# Issue 14151: Test invalid family on invalid environment
4565#
4566
4567class TestInvalidFamily(unittest.TestCase):
4568
Victor Stinner937ee9e2018-06-26 02:11:06 +02004569 @unittest.skipIf(WIN32, "skipped on Windows")
Antoine Pitrou709176f2012-04-01 17:19:09 +02004570 def test_invalid_family(self):
4571 with self.assertRaises(ValueError):
4572 multiprocessing.connection.Listener(r'\\.\test')
4573
Victor Stinner937ee9e2018-06-26 02:11:06 +02004574 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02004575 def test_invalid_family_win32(self):
4576 with self.assertRaises(ValueError):
4577 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02004578
Richard Oudkerk77c84f22012-05-18 14:28:02 +01004579#
4580# Issue 12098: check sys.flags of child matches that for parent
4581#
4582
4583class TestFlags(unittest.TestCase):
4584 @classmethod
4585 def run_in_grandchild(cls, conn):
4586 conn.send(tuple(sys.flags))
4587
4588 @classmethod
4589 def run_in_child(cls):
4590 import json
4591 r, w = multiprocessing.Pipe(duplex=False)
4592 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4593 p.start()
4594 grandchild_flags = r.recv()
4595 p.join()
4596 r.close()
4597 w.close()
4598 flags = (tuple(sys.flags), grandchild_flags)
4599 print(json.dumps(flags))
4600
4601 def test_flags(self):
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02004602 import json
Richard Oudkerk77c84f22012-05-18 14:28:02 +01004603 # start child process using unusual flags
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004604 prog = ('from test._test_multiprocessing import TestFlags; ' +
Richard Oudkerk77c84f22012-05-18 14:28:02 +01004605 'TestFlags.run_in_child()')
4606 data = subprocess.check_output(
4607 [sys.executable, '-E', '-S', '-O', '-c', prog])
4608 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4609 self.assertEqual(child_flags, grandchild_flags)
4610
Richard Oudkerkb15e6222012-07-27 14:19:00 +01004611#
4612# Test interaction with socket timeouts - see Issue #6056
4613#
4614
4615class TestTimeouts(unittest.TestCase):
4616 @classmethod
4617 def _test_timeout(cls, child, address):
4618 time.sleep(1)
4619 child.send(123)
4620 child.close()
4621 conn = multiprocessing.connection.Client(address)
4622 conn.send(456)
4623 conn.close()
4624
4625 def test_timeout(self):
4626 old_timeout = socket.getdefaulttimeout()
4627 try:
4628 socket.setdefaulttimeout(0.1)
4629 parent, child = multiprocessing.Pipe(duplex=True)
4630 l = multiprocessing.connection.Listener(family='AF_INET')
4631 p = multiprocessing.Process(target=self._test_timeout,
4632 args=(child, l.address))
4633 p.start()
4634 child.close()
4635 self.assertEqual(parent.recv(), 123)
4636 parent.close()
4637 conn = l.accept()
4638 self.assertEqual(conn.recv(), 456)
4639 conn.close()
4640 l.close()
Victor Stinner11f08072017-09-15 06:55:31 -07004641 join_process(p)
Richard Oudkerkb15e6222012-07-27 14:19:00 +01004642 finally:
4643 socket.setdefaulttimeout(old_timeout)
4644
Richard Oudkerke88a2442012-08-14 11:41:32 +01004645#
4646# Test what happens with no "if __name__ == '__main__'"
4647#
4648
4649class TestNoForkBomb(unittest.TestCase):
4650 def test_noforkbomb(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004651 sm = multiprocessing.get_start_method()
Richard Oudkerke88a2442012-08-14 11:41:32 +01004652 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004653 if sm != 'fork':
Berker Peksag076dbd02015-05-06 07:01:52 +03004654 rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02004655 self.assertEqual(out, b'')
4656 self.assertIn(b'RuntimeError', err)
Richard Oudkerke88a2442012-08-14 11:41:32 +01004657 else:
Berker Peksag076dbd02015-05-06 07:01:52 +03004658 rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02004659 self.assertEqual(out.rstrip(), b'123')
4660 self.assertEqual(err, b'')
Richard Oudkerke88a2442012-08-14 11:41:32 +01004661
4662#
Richard Oudkerk409c3132013-04-17 20:58:00 +01004663# Issue #17555: ForkAwareThreadLock
4664#
4665
4666class TestForkAwareThreadLock(unittest.TestCase):
Mike53f7a7c2017-12-14 14:04:53 +03004667 # We recursively start processes. Issue #17555 meant that the
Richard Oudkerk409c3132013-04-17 20:58:00 +01004668 # after fork registry would get duplicate entries for the same
4669 # lock. The size of the registry at generation n was ~2**n.
4670
4671 @classmethod
4672 def child(cls, n, conn):
4673 if n > 1:
4674 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4675 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01004676 conn.close()
Victor Stinner11f08072017-09-15 06:55:31 -07004677 join_process(p)
Richard Oudkerk409c3132013-04-17 20:58:00 +01004678 else:
4679 conn.send(len(util._afterfork_registry))
4680 conn.close()
4681
4682 def test_lock(self):
4683 r, w = multiprocessing.Pipe(False)
4684 l = util.ForkAwareThreadLock()
4685 old_size = len(util._afterfork_registry)
4686 p = multiprocessing.Process(target=self.child, args=(5, w))
4687 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01004688 w.close()
Richard Oudkerk409c3132013-04-17 20:58:00 +01004689 new_size = r.recv()
Victor Stinner11f08072017-09-15 06:55:31 -07004690 join_process(p)
Richard Oudkerk409c3132013-04-17 20:58:00 +01004691 self.assertLessEqual(new_size, old_size)
4692
4693#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004694# Check that non-forked child processes do not inherit unneeded fds/handles
4695#
4696
4697class TestCloseFds(unittest.TestCase):
4698
4699 def get_high_socket_fd(self):
Victor Stinner937ee9e2018-06-26 02:11:06 +02004700 if WIN32:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004701 # The child process will not have any socket handles, so
4702 # calling socket.fromfd() should produce WSAENOTSOCK even
4703 # if there is a handle of the same number.
4704 return socket.socket().detach()
4705 else:
4706 # We want to produce a socket with an fd high enough that a
4707 # freshly created child process will not have any fds as high.
4708 fd = socket.socket().detach()
4709 to_close = []
4710 while fd < 50:
4711 to_close.append(fd)
4712 fd = os.dup(fd)
4713 for x in to_close:
4714 os.close(x)
4715 return fd
4716
4717 def close(self, fd):
Victor Stinner937ee9e2018-06-26 02:11:06 +02004718 if WIN32:
Christian Heimesb6e43af2018-01-29 22:37:58 +01004719 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004720 else:
4721 os.close(fd)
4722
4723 @classmethod
4724 def _test_closefds(cls, conn, fd):
4725 try:
4726 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
4727 except Exception as e:
4728 conn.send(e)
4729 else:
4730 s.close()
4731 conn.send(None)
4732
4733 def test_closefd(self):
4734 if not HAS_REDUCTION:
4735 raise unittest.SkipTest('requires fd pickling')
4736
4737 reader, writer = multiprocessing.Pipe()
4738 fd = self.get_high_socket_fd()
4739 try:
4740 p = multiprocessing.Process(target=self._test_closefds,
4741 args=(writer, fd))
4742 p.start()
4743 writer.close()
4744 e = reader.recv()
Victor Stinner11f08072017-09-15 06:55:31 -07004745 join_process(p)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004746 finally:
4747 self.close(fd)
4748 writer.close()
4749 reader.close()
4750
4751 if multiprocessing.get_start_method() == 'fork':
4752 self.assertIs(e, None)
4753 else:
4754 WSAENOTSOCK = 10038
4755 self.assertIsInstance(e, OSError)
4756 self.assertTrue(e.errno == errno.EBADF or
4757 e.winerror == WSAENOTSOCK, e)
4758
4759#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004760# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
4761#
4762
4763class TestIgnoreEINTR(unittest.TestCase):
4764
Victor Stinner252f6ab2018-06-01 16:48:34 +02004765 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
4766 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
4767
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004768 @classmethod
4769 def _test_ignore(cls, conn):
4770 def handler(signum, frame):
4771 pass
4772 signal.signal(signal.SIGUSR1, handler)
4773 conn.send('ready')
4774 x = conn.recv()
4775 conn.send(x)
Victor Stinner252f6ab2018-06-01 16:48:34 +02004776 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004777
4778 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4779 def test_ignore(self):
4780 conn, child_conn = multiprocessing.Pipe()
4781 try:
4782 p = multiprocessing.Process(target=self._test_ignore,
4783 args=(child_conn,))
4784 p.daemon = True
4785 p.start()
4786 child_conn.close()
4787 self.assertEqual(conn.recv(), 'ready')
4788 time.sleep(0.1)
4789 os.kill(p.pid, signal.SIGUSR1)
4790 time.sleep(0.1)
4791 conn.send(1234)
4792 self.assertEqual(conn.recv(), 1234)
4793 time.sleep(0.1)
4794 os.kill(p.pid, signal.SIGUSR1)
Victor Stinner252f6ab2018-06-01 16:48:34 +02004795 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004796 time.sleep(0.1)
4797 p.join()
4798 finally:
4799 conn.close()
4800
4801 @classmethod
4802 def _test_ignore_listener(cls, conn):
4803 def handler(signum, frame):
4804 pass
4805 signal.signal(signal.SIGUSR1, handler)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004806 with multiprocessing.connection.Listener() as l:
4807 conn.send(l.address)
4808 a = l.accept()
4809 a.send('welcome')
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004810
4811 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4812 def test_ignore_listener(self):
4813 conn, child_conn = multiprocessing.Pipe()
4814 try:
4815 p = multiprocessing.Process(target=self._test_ignore_listener,
4816 args=(child_conn,))
4817 p.daemon = True
4818 p.start()
4819 child_conn.close()
4820 address = conn.recv()
4821 time.sleep(0.1)
4822 os.kill(p.pid, signal.SIGUSR1)
4823 time.sleep(0.1)
4824 client = multiprocessing.connection.Client(address)
4825 self.assertEqual(client.recv(), 'welcome')
4826 p.join()
4827 finally:
4828 conn.close()
4829
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004830class TestStartMethod(unittest.TestCase):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004831 @classmethod
4832 def _check_context(cls, conn):
4833 conn.send(multiprocessing.get_start_method())
4834
4835 def check_context(self, ctx):
4836 r, w = ctx.Pipe(duplex=False)
4837 p = ctx.Process(target=self._check_context, args=(w,))
4838 p.start()
4839 w.close()
4840 child_method = r.recv()
4841 r.close()
4842 p.join()
4843 self.assertEqual(child_method, ctx.get_start_method())
4844
4845 def test_context(self):
4846 for method in ('fork', 'spawn', 'forkserver'):
4847 try:
4848 ctx = multiprocessing.get_context(method)
4849 except ValueError:
4850 continue
4851 self.assertEqual(ctx.get_start_method(), method)
4852 self.assertIs(ctx.get_context(), ctx)
4853 self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
4854 self.assertRaises(ValueError, ctx.set_start_method, None)
4855 self.check_context(ctx)
4856
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004857 def test_set_get(self):
4858 multiprocessing.set_forkserver_preload(PRELOAD)
4859 count = 0
4860 old_method = multiprocessing.get_start_method()
Jesse Nollerd00df3c2008-06-18 14:22:48 +00004861 try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004862 for method in ('fork', 'spawn', 'forkserver'):
4863 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004864 multiprocessing.set_start_method(method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004865 except ValueError:
4866 continue
4867 self.assertEqual(multiprocessing.get_start_method(), method)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004868 ctx = multiprocessing.get_context()
4869 self.assertEqual(ctx.get_start_method(), method)
4870 self.assertTrue(type(ctx).__name__.lower().startswith(method))
4871 self.assertTrue(
4872 ctx.Process.__name__.lower().startswith(method))
4873 self.check_context(multiprocessing)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004874 count += 1
4875 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004876 multiprocessing.set_start_method(old_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004877 self.assertGreaterEqual(count, 1)
4878
4879 def test_get_all(self):
4880 methods = multiprocessing.get_all_start_methods()
4881 if sys.platform == 'win32':
4882 self.assertEqual(methods, ['spawn'])
4883 else:
4884 self.assertTrue(methods == ['fork', 'spawn'] or
4885 methods == ['fork', 'spawn', 'forkserver'])
4886
Antoine Pitroucd2a2012016-12-10 17:13:16 +01004887 def test_preload_resources(self):
4888 if multiprocessing.get_start_method() != 'forkserver':
4889 self.skipTest("test only relevant for 'forkserver' method")
4890 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
4891 rc, out, err = test.support.script_helper.assert_python_ok(name)
4892 out = out.decode()
4893 err = err.decode()
4894 if out.rstrip() != 'ok' or err != '':
4895 print(out)
4896 print(err)
4897 self.fail("failed spawning forkserver or grandchild")
4898
4899
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004900@unittest.skipIf(sys.platform == "win32",
4901 "test semantics don't make sense on Windows")
Pierre Glaserf22cc692019-05-10 22:59:08 +02004902class TestResourceTracker(unittest.TestCase):
Antoine Pitroucbe17562017-11-03 14:31:38 +01004903
Pierre Glaserf22cc692019-05-10 22:59:08 +02004904 def test_resource_tracker(self):
Antoine Pitroucbe17562017-11-03 14:31:38 +01004905 #
4906 # Check that killing process does not leak named semaphores
4907 #
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004908 cmd = '''if 1:
Pierre Glaserf22cc692019-05-10 22:59:08 +02004909 import time, os, tempfile
4910 import multiprocessing as mp
4911 from multiprocessing import resource_tracker
4912 from multiprocessing.shared_memory import SharedMemory
4913
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004914 mp.set_start_method("spawn")
Pierre Glaserf22cc692019-05-10 22:59:08 +02004915 rand = tempfile._RandomNameSequence()
4916
4917
4918 def create_and_register_resource(rtype):
4919 if rtype == "semaphore":
4920 lock = mp.Lock()
4921 return lock, lock._semlock.name
4922 elif rtype == "shared_memory":
4923 sm = SharedMemory(create=True, size=10)
4924 return sm, sm._name
4925 else:
4926 raise ValueError(
4927 "Resource type {{}} not understood".format(rtype))
4928
4929
4930 resource1, rname1 = create_and_register_resource("{rtype}")
4931 resource2, rname2 = create_and_register_resource("{rtype}")
4932
4933 os.write({w}, rname1.encode("ascii") + b"\\n")
4934 os.write({w}, rname2.encode("ascii") + b"\\n")
4935
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004936 time.sleep(10)
4937 '''
Pierre Glaserf22cc692019-05-10 22:59:08 +02004938 for rtype in resource_tracker._CLEANUP_FUNCS:
4939 with self.subTest(rtype=rtype):
4940 if rtype == "noop":
4941 # Artefact resource type used by the resource_tracker
4942 continue
4943 r, w = os.pipe()
4944 p = subprocess.Popen([sys.executable,
4945 '-E', '-c', cmd.format(w=w, rtype=rtype)],
4946 pass_fds=[w],
4947 stderr=subprocess.PIPE)
4948 os.close(w)
4949 with open(r, 'rb', closefd=True) as f:
4950 name1 = f.readline().rstrip().decode('ascii')
4951 name2 = f.readline().rstrip().decode('ascii')
4952 _resource_unlink(name1, rtype)
4953 p.terminate()
4954 p.wait()
4955 time.sleep(2.0)
4956 with self.assertRaises(OSError) as ctx:
4957 _resource_unlink(name2, rtype)
4958 # docs say it should be ENOENT, but OSX seems to give EINVAL
4959 self.assertIn(
4960 ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
4961 err = p.stderr.read().decode('utf-8')
4962 p.stderr.close()
4963 expected = ('resource_tracker: There appear to be 2 leaked {} '
4964 'objects'.format(
4965 rtype))
4966 self.assertRegex(err, expected)
4967 self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004968
Pierre Glaserf22cc692019-05-10 22:59:08 +02004969 def check_resource_tracker_death(self, signum, should_die):
Antoine Pitroucbe17562017-11-03 14:31:38 +01004970 # bpo-31310: if the semaphore tracker process has died, it should
4971 # be restarted implicitly.
Pierre Glaserf22cc692019-05-10 22:59:08 +02004972 from multiprocessing.resource_tracker import _resource_tracker
4973 pid = _resource_tracker._pid
Pablo Galindoec74d182018-09-04 09:53:54 +01004974 if pid is not None:
4975 os.kill(pid, signal.SIGKILL)
4976 os.waitpid(pid, 0)
Pablo Galindo3058b7d2018-10-10 08:40:14 +01004977 with warnings.catch_warnings():
4978 warnings.simplefilter("ignore")
Pierre Glaserf22cc692019-05-10 22:59:08 +02004979 _resource_tracker.ensure_running()
4980 pid = _resource_tracker._pid
Pablo Galindoec74d182018-09-04 09:53:54 +01004981
Antoine Pitroucbe17562017-11-03 14:31:38 +01004982 os.kill(pid, signum)
4983 time.sleep(1.0) # give it time to die
4984
4985 ctx = multiprocessing.get_context("spawn")
Pablo Galindoec74d182018-09-04 09:53:54 +01004986 with warnings.catch_warnings(record=True) as all_warn:
Pablo Galindo3058b7d2018-10-10 08:40:14 +01004987 warnings.simplefilter("always")
Antoine Pitroucbe17562017-11-03 14:31:38 +01004988 sem = ctx.Semaphore()
4989 sem.acquire()
4990 sem.release()
4991 wr = weakref.ref(sem)
4992 # ensure `sem` gets collected, which triggers communication with
4993 # the semaphore tracker
4994 del sem
4995 gc.collect()
4996 self.assertIsNone(wr())
Pablo Galindoec74d182018-09-04 09:53:54 +01004997 if should_die:
4998 self.assertEqual(len(all_warn), 1)
4999 the_warn = all_warn[0]
Pablo Galindo3058b7d2018-10-10 08:40:14 +01005000 self.assertTrue(issubclass(the_warn.category, UserWarning))
Pierre Glaserf22cc692019-05-10 22:59:08 +02005001 self.assertTrue("resource_tracker: process died"
Pablo Galindoec74d182018-09-04 09:53:54 +01005002 in str(the_warn.message))
5003 else:
5004 self.assertEqual(len(all_warn), 0)
Antoine Pitroucbe17562017-11-03 14:31:38 +01005005
Pierre Glaserf22cc692019-05-10 22:59:08 +02005006 def test_resource_tracker_sigint(self):
Antoine Pitroucbe17562017-11-03 14:31:38 +01005007 # Catchable signal (ignored by semaphore tracker)
Pierre Glaserf22cc692019-05-10 22:59:08 +02005008 self.check_resource_tracker_death(signal.SIGINT, False)
Antoine Pitroucbe17562017-11-03 14:31:38 +01005009
Pierre Glaserf22cc692019-05-10 22:59:08 +02005010 def test_resource_tracker_sigterm(self):
Pablo Galindoec74d182018-09-04 09:53:54 +01005011 # Catchable signal (ignored by semaphore tracker)
Pierre Glaserf22cc692019-05-10 22:59:08 +02005012 self.check_resource_tracker_death(signal.SIGTERM, False)
Pablo Galindoec74d182018-09-04 09:53:54 +01005013
Pierre Glaserf22cc692019-05-10 22:59:08 +02005014 def test_resource_tracker_sigkill(self):
Antoine Pitroucbe17562017-11-03 14:31:38 +01005015 # Uncatchable signal.
Pierre Glaserf22cc692019-05-10 22:59:08 +02005016 self.check_resource_tracker_death(signal.SIGKILL, True)
Antoine Pitroucbe17562017-11-03 14:31:38 +01005017
Thomas Moreau004b93e2019-04-24 21:45:52 +02005018 @staticmethod
Pierre Glaserf22cc692019-05-10 22:59:08 +02005019 def _is_resource_tracker_reused(conn, pid):
5020 from multiprocessing.resource_tracker import _resource_tracker
5021 _resource_tracker.ensure_running()
Thomas Moreau004b93e2019-04-24 21:45:52 +02005022 # The pid should be None in the child process, expect for the fork
5023 # context. It should not be a new value.
Pierre Glaserf22cc692019-05-10 22:59:08 +02005024 reused = _resource_tracker._pid in (None, pid)
5025 reused &= _resource_tracker._check_alive()
Thomas Moreau004b93e2019-04-24 21:45:52 +02005026 conn.send(reused)
5027
Pierre Glaserf22cc692019-05-10 22:59:08 +02005028 def test_resource_tracker_reused(self):
5029 from multiprocessing.resource_tracker import _resource_tracker
5030 _resource_tracker.ensure_running()
5031 pid = _resource_tracker._pid
Thomas Moreau004b93e2019-04-24 21:45:52 +02005032
5033 r, w = multiprocessing.Pipe(duplex=False)
Pierre Glaserf22cc692019-05-10 22:59:08 +02005034 p = multiprocessing.Process(target=self._is_resource_tracker_reused,
Thomas Moreau004b93e2019-04-24 21:45:52 +02005035 args=(w, pid))
5036 p.start()
Pierre Glaserf22cc692019-05-10 22:59:08 +02005037 is_resource_tracker_reused = r.recv()
Thomas Moreau004b93e2019-04-24 21:45:52 +02005038
5039 # Clean up
5040 p.join()
5041 w.close()
5042 r.close()
5043
Pierre Glaserf22cc692019-05-10 22:59:08 +02005044 self.assertTrue(is_resource_tracker_reused)
Thomas Moreau004b93e2019-04-24 21:45:52 +02005045
Antoine Pitroucbe17562017-11-03 14:31:38 +01005046
Xiang Zhang6f75bc02017-05-17 21:04:00 +08005047class TestSimpleQueue(unittest.TestCase):
5048
5049 @classmethod
5050 def _test_empty(cls, queue, child_can_start, parent_can_continue):
5051 child_can_start.wait()
5052 # issue 30301, could fail under spawn and forkserver
5053 try:
5054 queue.put(queue.empty())
5055 queue.put(queue.empty())
5056 finally:
5057 parent_can_continue.set()
5058
5059 def test_empty(self):
5060 queue = multiprocessing.SimpleQueue()
5061 child_can_start = multiprocessing.Event()
5062 parent_can_continue = multiprocessing.Event()
5063
5064 proc = multiprocessing.Process(
5065 target=self._test_empty,
5066 args=(queue, child_can_start, parent_can_continue)
5067 )
5068 proc.daemon = True
5069 proc.start()
5070
5071 self.assertTrue(queue.empty())
5072
5073 child_can_start.set()
5074 parent_can_continue.wait()
5075
5076 self.assertFalse(queue.empty())
5077 self.assertEqual(queue.get(), True)
5078 self.assertEqual(queue.get(), False)
5079 self.assertTrue(queue.empty())
5080
5081 proc.join()
5082
Derek B. Kimc40278e2018-07-11 19:22:28 +09005083
Julien Palard5d236ca2018-11-04 23:40:32 +01005084class TestPoolNotLeakOnFailure(unittest.TestCase):
5085
5086 def test_release_unused_processes(self):
5087 # Issue #19675: During pool creation, if we can't create a process,
5088 # don't leak already created ones.
5089 will_fail_in = 3
5090 forked_processes = []
5091
5092 class FailingForkProcess:
5093 def __init__(self, **kwargs):
5094 self.name = 'Fake Process'
5095 self.exitcode = None
5096 self.state = None
5097 forked_processes.append(self)
5098
5099 def start(self):
5100 nonlocal will_fail_in
5101 if will_fail_in <= 0:
5102 raise OSError("Manually induced OSError")
5103 will_fail_in -= 1
5104 self.state = 'started'
5105
5106 def terminate(self):
5107 self.state = 'stopping'
5108
5109 def join(self):
5110 if self.state == 'stopping':
5111 self.state = 'stopped'
5112
5113 def is_alive(self):
5114 return self.state == 'started' or self.state == 'stopping'
5115
5116 with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
5117 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
5118 Process=FailingForkProcess))
5119 p.close()
5120 p.join()
5121 self.assertFalse(
5122 any(process.is_alive() for process in forked_processes))
5123
5124
Giampaolo Rodola2848d9d2019-02-07 03:03:11 -08005125class TestSyncManagerTypes(unittest.TestCase):
5126 """Test all the types which can be shared between a parent and a
5127 child process by using a manager which acts as an intermediary
5128 between them.
5129
5130 In the following unit-tests the base type is created in the parent
5131 process, the @classmethod represents the worker process and the
5132 shared object is readable and editable between the two.
5133
5134 # The child.
5135 @classmethod
5136 def _test_list(cls, obj):
5137 assert obj[0] == 5
5138 assert obj.append(6)
5139
5140 # The parent.
5141 def test_list(self):
5142 o = self.manager.list()
5143 o.append(5)
5144 self.run_worker(self._test_list, o)
5145 assert o[1] == 6
5146 """
5147 manager_class = multiprocessing.managers.SyncManager
5148
5149 def setUp(self):
5150 self.manager = self.manager_class()
5151 self.manager.start()
5152 self.proc = None
5153
5154 def tearDown(self):
5155 if self.proc is not None and self.proc.is_alive():
5156 self.proc.terminate()
5157 self.proc.join()
5158 self.manager.shutdown()
Pablo Galindo613f7292019-02-09 17:08:49 +00005159 self.manager = None
5160 self.proc = None
Giampaolo Rodola2848d9d2019-02-07 03:03:11 -08005161
5162 @classmethod
5163 def setUpClass(cls):
5164 support.reap_children()
5165
5166 tearDownClass = setUpClass
5167
5168 def wait_proc_exit(self):
5169 # Only the manager process should be returned by active_children()
5170 # but this can take a bit on slow machines, so wait a few seconds
5171 # if there are other children too (see #17395).
5172 join_process(self.proc)
5173 start_time = time.monotonic()
5174 t = 0.01
5175 while len(multiprocessing.active_children()) > 1:
5176 time.sleep(t)
5177 t *= 2
5178 dt = time.monotonic() - start_time
5179 if dt >= 5.0:
5180 test.support.environment_altered = True
5181 print("Warning -- multiprocessing.Manager still has %s active "
5182 "children after %s seconds"
5183 % (multiprocessing.active_children(), dt),
5184 file=sys.stderr)
5185 break
5186
5187 def run_worker(self, worker, obj):
5188 self.proc = multiprocessing.Process(target=worker, args=(obj, ))
5189 self.proc.daemon = True
5190 self.proc.start()
5191 self.wait_proc_exit()
5192 self.assertEqual(self.proc.exitcode, 0)
5193
5194 @classmethod
Giampaolo Rodola2848d9d2019-02-07 03:03:11 -08005195 def _test_event(cls, obj):
5196 assert obj.is_set()
5197 obj.wait()
5198 obj.clear()
5199 obj.wait(0.001)
5200
5201 def test_event(self):
5202 o = self.manager.Event()
5203 o.set()
5204 self.run_worker(self._test_event, o)
5205 assert not o.is_set()
5206 o.wait(0.001)
5207
5208 @classmethod
5209 def _test_lock(cls, obj):
5210 obj.acquire()
5211
5212 def test_lock(self, lname="Lock"):
5213 o = getattr(self.manager, lname)()
5214 self.run_worker(self._test_lock, o)
5215 o.release()
5216 self.assertRaises(RuntimeError, o.release) # already released
5217
5218 @classmethod
5219 def _test_rlock(cls, obj):
5220 obj.acquire()
5221 obj.release()
5222
5223 def test_rlock(self, lname="Lock"):
5224 o = getattr(self.manager, lname)()
5225 self.run_worker(self._test_rlock, o)
5226
5227 @classmethod
5228 def _test_semaphore(cls, obj):
5229 obj.acquire()
5230
5231 def test_semaphore(self, sname="Semaphore"):
5232 o = getattr(self.manager, sname)()
5233 self.run_worker(self._test_semaphore, o)
5234 o.release()
5235
5236 def test_bounded_semaphore(self):
5237 self.test_semaphore(sname="BoundedSemaphore")
5238
5239 @classmethod
5240 def _test_condition(cls, obj):
5241 obj.acquire()
5242 obj.release()
5243
5244 def test_condition(self):
5245 o = self.manager.Condition()
5246 self.run_worker(self._test_condition, o)
5247
5248 @classmethod
5249 def _test_barrier(cls, obj):
5250 assert obj.parties == 5
5251 obj.reset()
5252
5253 def test_barrier(self):
5254 o = self.manager.Barrier(5)
5255 self.run_worker(self._test_barrier, o)
5256
5257 @classmethod
5258 def _test_pool(cls, obj):
5259 # TODO: fix https://bugs.python.org/issue35919
5260 with obj:
5261 pass
5262
5263 def test_pool(self):
5264 o = self.manager.Pool(processes=4)
5265 self.run_worker(self._test_pool, o)
5266
5267 @classmethod
Davin Pottse895de32019-02-23 22:08:16 -06005268 def _test_queue(cls, obj):
5269 assert obj.qsize() == 2
5270 assert obj.full()
5271 assert not obj.empty()
5272 assert obj.get() == 5
5273 assert not obj.empty()
5274 assert obj.get() == 6
5275 assert obj.empty()
5276
5277 def test_queue(self, qname="Queue"):
5278 o = getattr(self.manager, qname)(2)
5279 o.put(5)
5280 o.put(6)
5281 self.run_worker(self._test_queue, o)
5282 assert o.empty()
5283 assert not o.full()
5284
5285 def test_joinable_queue(self):
5286 self.test_queue("JoinableQueue")
5287
5288 @classmethod
Giampaolo Rodola2848d9d2019-02-07 03:03:11 -08005289 def _test_list(cls, obj):
5290 assert obj[0] == 5
5291 assert obj.count(5) == 1
5292 assert obj.index(5) == 0
5293 obj.sort()
5294 obj.reverse()
5295 for x in obj:
5296 pass
5297 assert len(obj) == 1
5298 assert obj.pop(0) == 5
5299
5300 def test_list(self):
5301 o = self.manager.list()
5302 o.append(5)
5303 self.run_worker(self._test_list, o)
5304 assert not o
5305 self.assertEqual(len(o), 0)
5306
5307 @classmethod
5308 def _test_dict(cls, obj):
5309 assert len(obj) == 1
5310 assert obj['foo'] == 5
5311 assert obj.get('foo') == 5
Giampaolo Rodola2848d9d2019-02-07 03:03:11 -08005312 assert list(obj.items()) == [('foo', 5)]
5313 assert list(obj.keys()) == ['foo']
5314 assert list(obj.values()) == [5]
5315 assert obj.copy() == {'foo': 5}
5316 assert obj.popitem() == ('foo', 5)
5317
5318 def test_dict(self):
5319 o = self.manager.dict()
5320 o['foo'] = 5
5321 self.run_worker(self._test_dict, o)
5322 assert not o
5323 self.assertEqual(len(o), 0)
5324
5325 @classmethod
5326 def _test_value(cls, obj):
5327 assert obj.value == 1
5328 assert obj.get() == 1
5329 obj.set(2)
5330
5331 def test_value(self):
5332 o = self.manager.Value('i', 1)
5333 self.run_worker(self._test_value, o)
5334 self.assertEqual(o.value, 2)
5335 self.assertEqual(o.get(), 2)
5336
5337 @classmethod
5338 def _test_array(cls, obj):
5339 assert obj[0] == 0
5340 assert obj[1] == 1
5341 assert len(obj) == 2
5342 assert list(obj) == [0, 1]
5343
5344 def test_array(self):
5345 o = self.manager.Array('i', [0, 1])
5346 self.run_worker(self._test_array, o)
5347
5348 @classmethod
5349 def _test_namespace(cls, obj):
5350 assert obj.x == 0
5351 assert obj.y == 1
5352
5353 def test_namespace(self):
5354 o = self.manager.Namespace()
5355 o.x = 0
5356 o.y = 1
5357 self.run_worker(self._test_namespace, o)
5358
5359
Derek B. Kimc40278e2018-07-11 19:22:28 +09005360class MiscTestCase(unittest.TestCase):
5361 def test__all__(self):
5362 # Just make sure names in blacklist are excluded
5363 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
5364 blacklist=['SUBDEBUG', 'SUBWARNING'])
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005365#
5366# Mixins
5367#
5368
Victor Stinnerffb49402017-07-25 01:55:54 +02005369class BaseMixin(object):
5370 @classmethod
5371 def setUpClass(cls):
5372 cls.dangling = (multiprocessing.process._dangling.copy(),
5373 threading._dangling.copy())
5374
5375 @classmethod
5376 def tearDownClass(cls):
5377 # bpo-26762: Some multiprocessing objects like Pool create reference
5378 # cycles. Trigger a garbage collection to break these cycles.
5379 test.support.gc_collect()
5380
5381 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
5382 if processes:
Victor Stinner957d0e92017-08-10 17:36:50 +02005383 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005384 print('Warning -- Dangling processes: %s' % processes,
5385 file=sys.stderr)
5386 processes = None
5387
5388 threads = set(threading._dangling) - set(cls.dangling[1])
5389 if threads:
Victor Stinner957d0e92017-08-10 17:36:50 +02005390 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005391 print('Warning -- Dangling threads: %s' % threads,
5392 file=sys.stderr)
5393 threads = None
5394
5395
5396class ProcessesMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005397 TYPE = 'processes'
5398 Process = multiprocessing.Process
5399 connection = multiprocessing.connection
5400 current_process = staticmethod(multiprocessing.current_process)
5401 active_children = staticmethod(multiprocessing.active_children)
5402 Pool = staticmethod(multiprocessing.Pool)
5403 Pipe = staticmethod(multiprocessing.Pipe)
5404 Queue = staticmethod(multiprocessing.Queue)
5405 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
5406 Lock = staticmethod(multiprocessing.Lock)
5407 RLock = staticmethod(multiprocessing.RLock)
5408 Semaphore = staticmethod(multiprocessing.Semaphore)
5409 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
5410 Condition = staticmethod(multiprocessing.Condition)
5411 Event = staticmethod(multiprocessing.Event)
5412 Barrier = staticmethod(multiprocessing.Barrier)
5413 Value = staticmethod(multiprocessing.Value)
5414 Array = staticmethod(multiprocessing.Array)
5415 RawValue = staticmethod(multiprocessing.RawValue)
5416 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00005417
Benjamin Petersone711caf2008-06-11 16:44:04 +00005418
Victor Stinnerffb49402017-07-25 01:55:54 +02005419class ManagerMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005420 TYPE = 'manager'
5421 Process = multiprocessing.Process
5422 Queue = property(operator.attrgetter('manager.Queue'))
5423 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
5424 Lock = property(operator.attrgetter('manager.Lock'))
5425 RLock = property(operator.attrgetter('manager.RLock'))
5426 Semaphore = property(operator.attrgetter('manager.Semaphore'))
5427 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
5428 Condition = property(operator.attrgetter('manager.Condition'))
5429 Event = property(operator.attrgetter('manager.Event'))
5430 Barrier = property(operator.attrgetter('manager.Barrier'))
5431 Value = property(operator.attrgetter('manager.Value'))
5432 Array = property(operator.attrgetter('manager.Array'))
5433 list = property(operator.attrgetter('manager.list'))
5434 dict = property(operator.attrgetter('manager.dict'))
5435 Namespace = property(operator.attrgetter('manager.Namespace'))
5436
5437 @classmethod
5438 def Pool(cls, *args, **kwds):
5439 return cls.manager.Pool(*args, **kwds)
5440
5441 @classmethod
5442 def setUpClass(cls):
Victor Stinnerffb49402017-07-25 01:55:54 +02005443 super().setUpClass()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005444 cls.manager = multiprocessing.Manager()
5445
5446 @classmethod
5447 def tearDownClass(cls):
5448 # only the manager process should be returned by active_children()
5449 # but this can take a bit on slow machines, so wait a few seconds
5450 # if there are other children too (see #17395)
Victor Stinnerffb49402017-07-25 01:55:54 +02005451 start_time = time.monotonic()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005452 t = 0.01
Victor Stinnerffb49402017-07-25 01:55:54 +02005453 while len(multiprocessing.active_children()) > 1:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005454 time.sleep(t)
5455 t *= 2
Victor Stinnerffb49402017-07-25 01:55:54 +02005456 dt = time.monotonic() - start_time
5457 if dt >= 5.0:
Victor Stinner957d0e92017-08-10 17:36:50 +02005458 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005459 print("Warning -- multiprocessing.Manager still has %s active "
5460 "children after %s seconds"
5461 % (multiprocessing.active_children(), dt),
5462 file=sys.stderr)
5463 break
5464
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005465 gc.collect() # do garbage collection
5466 if cls.manager._number_of_objects() != 0:
5467 # This is not really an error since some tests do not
5468 # ensure that all processes which hold a reference to a
5469 # managed object have been joined.
Victor Stinner957d0e92017-08-10 17:36:50 +02005470 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005471 print('Warning -- Shared objects which still exist at manager '
5472 'shutdown:')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005473 print(cls.manager._debug_info())
5474 cls.manager.shutdown()
5475 cls.manager.join()
5476 cls.manager = None
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01005477
Victor Stinnerffb49402017-07-25 01:55:54 +02005478 super().tearDownClass()
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01005479
Victor Stinnerffb49402017-07-25 01:55:54 +02005480
5481class ThreadsMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005482 TYPE = 'threads'
5483 Process = multiprocessing.dummy.Process
5484 connection = multiprocessing.dummy.connection
5485 current_process = staticmethod(multiprocessing.dummy.current_process)
5486 active_children = staticmethod(multiprocessing.dummy.active_children)
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01005487 Pool = staticmethod(multiprocessing.dummy.Pool)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005488 Pipe = staticmethod(multiprocessing.dummy.Pipe)
5489 Queue = staticmethod(multiprocessing.dummy.Queue)
5490 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
5491 Lock = staticmethod(multiprocessing.dummy.Lock)
5492 RLock = staticmethod(multiprocessing.dummy.RLock)
5493 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
5494 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
5495 Condition = staticmethod(multiprocessing.dummy.Condition)
5496 Event = staticmethod(multiprocessing.dummy.Event)
5497 Barrier = staticmethod(multiprocessing.dummy.Barrier)
5498 Value = staticmethod(multiprocessing.dummy.Value)
5499 Array = staticmethod(multiprocessing.dummy.Array)
5500
5501#
5502# Functions used to create test cases from the base ones in this module
5503#
5504
5505def install_tests_in_module_dict(remote_globs, start_method):
5506 __module__ = remote_globs['__name__']
5507 local_globs = globals()
5508 ALL_TYPES = {'processes', 'threads', 'manager'}
5509
5510 for name, base in local_globs.items():
5511 if not isinstance(base, type):
5512 continue
5513 if issubclass(base, BaseTestCase):
5514 if base is BaseTestCase:
5515 continue
5516 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
5517 for type_ in base.ALLOWED_TYPES:
5518 newname = 'With' + type_.capitalize() + name[1:]
5519 Mixin = local_globs[type_.capitalize() + 'Mixin']
5520 class Temp(base, Mixin, unittest.TestCase):
5521 pass
5522 Temp.__name__ = Temp.__qualname__ = newname
5523 Temp.__module__ = __module__
5524 remote_globs[newname] = Temp
5525 elif issubclass(base, unittest.TestCase):
5526 class Temp(base, object):
5527 pass
5528 Temp.__name__ = Temp.__qualname__ = name
5529 Temp.__module__ = __module__
5530 remote_globs[name] = Temp
5531
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01005532 dangling = [None, None]
5533 old_start_method = [None]
5534
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005535 def setUpModule():
5536 multiprocessing.set_forkserver_preload(PRELOAD)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01005537 multiprocessing.process._cleanup()
5538 dangling[0] = multiprocessing.process._dangling.copy()
5539 dangling[1] = threading._dangling.copy()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01005540 old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005541 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01005542 multiprocessing.set_start_method(start_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005543 except ValueError:
5544 raise unittest.SkipTest(start_method +
5545 ' start method not supported')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005546
5547 if sys.platform.startswith("linux"):
5548 try:
5549 lock = multiprocessing.RLock()
5550 except OSError:
5551 raise unittest.SkipTest("OSError raises on RLock creation, "
5552 "see issue 3111!")
5553 check_enough_semaphores()
5554 util.get_temp_dir() # creates temp directory
5555 multiprocessing.get_logger().setLevel(LOG_LEVEL)
5556
5557 def tearDownModule():
Victor Stinnerffb49402017-07-25 01:55:54 +02005558 need_sleep = False
5559
5560 # bpo-26762: Some multiprocessing objects like Pool create reference
5561 # cycles. Trigger a garbage collection to break these cycles.
5562 test.support.gc_collect()
5563
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01005564 multiprocessing.set_start_method(old_start_method[0], force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005565 # pause a bit so we don't get warning about dangling threads/processes
Victor Stinnerffb49402017-07-25 01:55:54 +02005566 processes = set(multiprocessing.process._dangling) - set(dangling[0])
5567 if processes:
5568 need_sleep = True
Victor Stinner957d0e92017-08-10 17:36:50 +02005569 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005570 print('Warning -- Dangling processes: %s' % processes,
5571 file=sys.stderr)
5572 processes = None
5573
5574 threads = set(threading._dangling) - set(dangling[1])
5575 if threads:
5576 need_sleep = True
Victor Stinner957d0e92017-08-10 17:36:50 +02005577 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005578 print('Warning -- Dangling threads: %s' % threads,
5579 file=sys.stderr)
5580 threads = None
5581
5582 # Sleep 500 ms to give time to child processes to complete.
5583 if need_sleep:
5584 time.sleep(0.5)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01005585 multiprocessing.process._cleanup()
Victor Stinnerffb49402017-07-25 01:55:54 +02005586 test.support.gc_collect()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005587
5588 remote_globs['setUpModule'] = setUpModule
5589 remote_globs['tearDownModule'] = tearDownModule