blob: 1474624ac5d5b9ba55ec80c24ef5803e690f0c5d [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Julien Palard5d236ca2018-11-04 23:40:32 +01006import unittest.mock
Benjamin Petersone711caf2008-06-11 16:44:04 +00007import queue as pyqueue
8import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00009import io
Antoine Pitroude911b22011-12-21 11:03:24 +010010import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
Pierre Glaserb1dfcad2019-05-13 21:15:32 +020020import subprocess
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010022import operator
Davin Pottse895de32019-02-23 22:08:16 -060023import pickle
Antoine Pitrou89889452017-03-24 13:52:11 +010024import weakref
Pablo Galindoec74d182018-09-04 09:53:54 +010025import warnings
R. David Murraya21e4ca2009-03-31 23:16:50 +000026import test.support
Berker Peksag076dbd02015-05-06 07:01:52 +030027import test.support.script_helper
Victor Stinnerb9b69002017-09-14 14:40:56 -070028from test import support
Benjamin Petersone711caf2008-06-11 16:44:04 +000029
Benjamin Petersone5384b02008-10-04 22:00:42 +000030
R. David Murraya21e4ca2009-03-31 23:16:50 +000031# Skip tests if _multiprocessing wasn't built.
32_multiprocessing = test.support.import_module('_multiprocessing')
33# Skip tests if sem_open implementation is broken.
34test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000035import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000036
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.connection
Victor Stinnerd7e64d92017-07-25 00:33:56 +020038import multiprocessing.dummy
Benjamin Petersone711caf2008-06-11 16:44:04 +000039import multiprocessing.heap
Victor Stinnerd7e64d92017-07-25 00:33:56 +020040import multiprocessing.managers
Benjamin Petersone711caf2008-06-11 16:44:04 +000041import multiprocessing.pool
Victor Stinnerd7e64d92017-07-25 00:33:56 +020042import multiprocessing.queues
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
Charles-François Natalibc8f0822011-09-20 20:36:51 +020044from multiprocessing import util
45
46try:
47 from multiprocessing import reduction
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010048 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
Charles-François Natalibc8f0822011-09-20 20:36:51 +020049except ImportError:
50 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000051
Brian Curtinafa88b52010-10-07 01:12:19 +000052try:
53 from multiprocessing.sharedctypes import Value, copy
54 HAS_SHAREDCTYPES = True
55except ImportError:
56 HAS_SHAREDCTYPES = False
57
Antoine Pitroubcb39d42011-08-23 19:46:22 +020058try:
Davin Pottse895de32019-02-23 22:08:16 -060059 from multiprocessing import shared_memory
60 HAS_SHMEM = True
61except ImportError:
62 HAS_SHMEM = False
63
64try:
Antoine Pitroubcb39d42011-08-23 19:46:22 +020065 import msvcrt
66except ImportError:
67 msvcrt = None
68
Benjamin Petersone711caf2008-06-11 16:44:04 +000069#
70#
71#
72
Victor Stinner11f08072017-09-15 06:55:31 -070073# Timeout to wait until a process completes
Pablo Galindo40b69072019-03-22 07:36:56 +000074TIMEOUT = 60.0 # seconds
Victor Stinner11f08072017-09-15 06:55:31 -070075
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000076def latin(s):
77 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000078
Victor Stinnerd7e64d92017-07-25 00:33:56 +020079
80def close_queue(queue):
81 if isinstance(queue, multiprocessing.queues.Queue):
82 queue.close()
83 queue.join_thread()
84
85
Victor Stinner11f08072017-09-15 06:55:31 -070086def join_process(process):
Victor Stinnerb9b69002017-09-14 14:40:56 -070087 # Since multiprocessing.Process has the same API than threading.Thread
88 # (join() and is_alive(), the support function can be reused
Victor Stinner11f08072017-09-15 06:55:31 -070089 support.join_thread(process, timeout=TIMEOUT)
Victor Stinnerb9b69002017-09-14 14:40:56 -070090
91
Pierre Glaserf22cc692019-05-10 22:59:08 +020092if os.name == "posix":
93 from multiprocessing import resource_tracker
94
95 def _resource_unlink(name, rtype):
96 resource_tracker._CLEANUP_FUNCS[rtype](name)
97
98
Benjamin Petersone711caf2008-06-11 16:44:04 +000099#
100# Constants
101#
102
103LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +0000104#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +0000105
106DELTA = 0.1
107CHECK_TIMINGS = False # making true makes tests take a lot longer
108 # and can sometimes cause some non-serious
109 # failures because some calls block a bit
110 # longer than expected
111if CHECK_TIMINGS:
112 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
113else:
114 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
115
116HAVE_GETVALUE = not getattr(_multiprocessing,
117 'HAVE_BROKEN_SEM_GETVALUE', False)
118
Victor Stinner937ee9e2018-06-26 02:11:06 +0200119WIN32 = (sys.platform == "win32")
120
Richard Oudkerk59d54042012-05-10 16:11:12 +0100121from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200122
Richard Oudkerk59d54042012-05-10 16:11:12 +0100123def wait_for_handle(handle, timeout):
124 if timeout is not None and timeout < 0.0:
125 timeout = None
126 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +0000127
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200128try:
129 MAXFD = os.sysconf("SC_OPEN_MAX")
130except:
131 MAXFD = 256
132
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100133# To speed up tests when using the forkserver, we can preload these:
134PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
135
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000137# Some tests require ctypes
138#
139
140try:
Gareth Rees3913bad2017-07-21 11:35:33 +0100141 from ctypes import Structure, c_int, c_double, c_longlong
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000142except ImportError:
143 Structure = object
Antoine Pitrouff92ff52017-07-21 13:24:05 +0200144 c_int = c_double = c_longlong = None
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000145
Charles-François Natali221ef672011-11-22 18:55:22 +0100146
147def check_enough_semaphores():
148 """Check that the system supports enough semaphores to run the test."""
149 # minimum number of semaphores available according to POSIX
150 nsems_min = 256
151 try:
152 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
153 except (AttributeError, ValueError):
154 # sysconf not available or setting not available
155 return
156 if nsems == -1 or nsems >= nsems_min:
157 return
158 raise unittest.SkipTest("The OS doesn't support enough semaphores "
159 "to run the test (required: %d)." % nsems_min)
160
161
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000162#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000163# Creates a wrapper for a function which records the time it takes to finish
164#
165
166class TimingWrapper(object):
167
168 def __init__(self, func):
169 self.func = func
170 self.elapsed = None
171
172 def __call__(self, *args, **kwds):
Victor Stinner2cf4c202018-12-17 09:36:36 +0100173 t = time.monotonic()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000174 try:
175 return self.func(*args, **kwds)
176 finally:
Victor Stinner2cf4c202018-12-17 09:36:36 +0100177 self.elapsed = time.monotonic() - t
Benjamin Petersone711caf2008-06-11 16:44:04 +0000178
179#
180# Base class for test cases
181#
182
183class BaseTestCase(object):
184
185 ALLOWED_TYPES = ('processes', 'manager', 'threads')
186
187 def assertTimingAlmostEqual(self, a, b):
188 if CHECK_TIMINGS:
189 self.assertAlmostEqual(a, b, 1)
190
191 def assertReturnsIfImplemented(self, value, func, *args):
192 try:
193 res = func(*args)
194 except NotImplementedError:
195 pass
196 else:
197 return self.assertEqual(value, res)
198
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000199 # For the sanity of Windows users, rather than crashing or freezing in
200 # multiple ways.
201 def __reduce__(self, *args):
202 raise NotImplementedError("shouldn't try to pickle a test case")
203
204 __reduce_ex__ = __reduce__
205
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206#
207# Return the value of a semaphore
208#
209
210def get_value(self):
211 try:
212 return self.get_value()
213 except AttributeError:
214 try:
215 return self._Semaphore__value
216 except AttributeError:
217 try:
218 return self._value
219 except AttributeError:
220 raise NotImplementedError
221
222#
223# Testcases
224#
225
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200226class DummyCallable:
227 def __call__(self, q, c):
228 assert isinstance(c, DummyCallable)
229 q.put(5)
230
231
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232class _TestProcess(BaseTestCase):
233
234 ALLOWED_TYPES = ('processes', 'threads')
235
236 def test_current(self):
237 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600238 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000239
240 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242
243 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000244 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000245 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000246 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000247 self.assertEqual(current.ident, os.getpid())
248 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000250 def test_daemon_argument(self):
251 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600252 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000253
254 # By default uses the current process's daemon flag.
255 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000256 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000257 proc1 = self.Process(target=self._test, daemon=True)
258 self.assertTrue(proc1.daemon)
259 proc2 = self.Process(target=self._test, daemon=False)
260 self.assertFalse(proc2.daemon)
261
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000262 @classmethod
263 def _test(cls, q, *args, **kwds):
264 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000265 q.put(args)
266 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000267 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000268 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000269 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270 q.put(current.pid)
271
Thomas Moreauc09a9f52019-05-20 21:37:05 +0200272 def test_parent_process_attributes(self):
273 if self.TYPE == "threads":
274 self.skipTest('test not appropriate for {}'.format(self.TYPE))
275
276 self.assertIsNone(self.parent_process())
277
278 rconn, wconn = self.Pipe(duplex=False)
279 p = self.Process(target=self._test_send_parent_process, args=(wconn,))
280 p.start()
281 p.join()
282 parent_pid, parent_name = rconn.recv()
283 self.assertEqual(parent_pid, self.current_process().pid)
284 self.assertEqual(parent_pid, os.getpid())
285 self.assertEqual(parent_name, self.current_process().name)
286
287 @classmethod
288 def _test_send_parent_process(cls, wconn):
289 from multiprocessing.process import parent_process
290 wconn.send([parent_process().pid, parent_process().name])
291
292 def test_parent_process(self):
293 if self.TYPE == "threads":
294 self.skipTest('test not appropriate for {}'.format(self.TYPE))
295
296 # Launch a child process. Make it launch a grandchild process. Kill the
297 # child process and make sure that the grandchild notices the death of
298 # its parent (a.k.a the child process).
299 rconn, wconn = self.Pipe(duplex=False)
300 p = self.Process(
301 target=self._test_create_grandchild_process, args=(wconn, ))
302 p.start()
303
Miss Islington (bot)4adc38e2019-06-25 14:12:47 -0700304 if not rconn.poll(timeout=60):
Thomas Moreauc09a9f52019-05-20 21:37:05 +0200305 raise AssertionError("Could not communicate with child process")
306 parent_process_status = rconn.recv()
307 self.assertEqual(parent_process_status, "alive")
308
309 p.terminate()
310 p.join()
311
Miss Islington (bot)4adc38e2019-06-25 14:12:47 -0700312 if not rconn.poll(timeout=60):
Thomas Moreauc09a9f52019-05-20 21:37:05 +0200313 raise AssertionError("Could not communicate with child process")
314 parent_process_status = rconn.recv()
315 self.assertEqual(parent_process_status, "not alive")
316
317 @classmethod
318 def _test_create_grandchild_process(cls, wconn):
319 p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
320 p.start()
Miss Islington (bot)4adc38e2019-06-25 14:12:47 -0700321 time.sleep(300)
Thomas Moreauc09a9f52019-05-20 21:37:05 +0200322
323 @classmethod
324 def _test_report_parent_status(cls, wconn):
325 from multiprocessing.process import parent_process
326 wconn.send("alive" if parent_process().is_alive() else "not alive")
327 parent_process().join(timeout=5)
328 wconn.send("alive" if parent_process().is_alive() else "not alive")
329
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330 def test_process(self):
331 q = self.Queue(1)
332 e = self.Event()
333 args = (q, 1, 2)
334 kwargs = {'hello':23, 'bye':2.54}
335 name = 'SomeProcess'
336 p = self.Process(
337 target=self._test, args=args, kwargs=kwargs, name=name
338 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000339 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000340 current = self.current_process()
341
342 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000343 self.assertEqual(p.authkey, current.authkey)
344 self.assertEqual(p.is_alive(), False)
345 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000346 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000347 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000348 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000349
350 p.start()
351
Ezio Melottib3aedd42010-11-20 19:04:17 +0000352 self.assertEqual(p.exitcode, None)
353 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000354 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000355
Ezio Melottib3aedd42010-11-20 19:04:17 +0000356 self.assertEqual(q.get(), args[1:])
357 self.assertEqual(q.get(), kwargs)
358 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000359 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000360 self.assertEqual(q.get(), current.authkey)
361 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000362
363 p.join()
364
Ezio Melottib3aedd42010-11-20 19:04:17 +0000365 self.assertEqual(p.exitcode, 0)
366 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000367 self.assertNotIn(p, self.active_children())
Victor Stinnerb4c52962017-07-25 02:40:55 +0200368 close_queue(q)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000369
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000370 @classmethod
Vitor Pereiraba75af72017-07-18 16:34:23 +0100371 def _sleep_some(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100372 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000373
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200374 @classmethod
375 def _test_sleep(cls, delay):
376 time.sleep(delay)
377
Vitor Pereiraba75af72017-07-18 16:34:23 +0100378 def _kill_process(self, meth):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000379 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600380 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381
Vitor Pereiraba75af72017-07-18 16:34:23 +0100382 p = self.Process(target=self._sleep_some)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000383 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384 p.start()
385
386 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000387 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000388 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000389
Richard Oudkerk59d54042012-05-10 16:11:12 +0100390 join = TimingWrapper(p.join)
391
392 self.assertEqual(join(0), None)
393 self.assertTimingAlmostEqual(join.elapsed, 0.0)
394 self.assertEqual(p.is_alive(), True)
395
396 self.assertEqual(join(-1), None)
397 self.assertTimingAlmostEqual(join.elapsed, 0.0)
398 self.assertEqual(p.is_alive(), True)
399
Richard Oudkerk26f92682013-10-17 13:56:18 +0100400 # XXX maybe terminating too soon causes the problems on Gentoo...
401 time.sleep(1)
402
Vitor Pereiraba75af72017-07-18 16:34:23 +0100403 meth(p)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000404
Richard Oudkerk4f350792013-10-13 00:49:27 +0100405 if hasattr(signal, 'alarm'):
Richard Oudkerkd44500a2013-10-17 10:38:37 +0100406 # On the Gentoo buildbot waitpid() often seems to block forever.
Richard Oudkerk26f92682013-10-17 13:56:18 +0100407 # We use alarm() to interrupt it if it blocks for too long.
Richard Oudkerk4f350792013-10-13 00:49:27 +0100408 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100409 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100410 old_handler = signal.signal(signal.SIGALRM, handler)
411 try:
412 signal.alarm(10)
413 self.assertEqual(join(), None)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100414 finally:
Richard Oudkerk1e2f67c2013-10-17 14:24:06 +0100415 signal.alarm(0)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100416 signal.signal(signal.SIGALRM, old_handler)
417 else:
418 self.assertEqual(join(), None)
419
Benjamin Petersone711caf2008-06-11 16:44:04 +0000420 self.assertTimingAlmostEqual(join.elapsed, 0.0)
421
422 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000423 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000424
425 p.join()
426
Vitor Pereiraba75af72017-07-18 16:34:23 +0100427 return p.exitcode
428
429 def test_terminate(self):
430 exitcode = self._kill_process(multiprocessing.Process.terminate)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200431 if os.name != 'nt':
Vitor Pereiraba75af72017-07-18 16:34:23 +0100432 self.assertEqual(exitcode, -signal.SIGTERM)
433
434 def test_kill(self):
435 exitcode = self._kill_process(multiprocessing.Process.kill)
436 if os.name != 'nt':
437 self.assertEqual(exitcode, -signal.SIGKILL)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000438
439 def test_cpu_count(self):
440 try:
441 cpus = multiprocessing.cpu_count()
442 except NotImplementedError:
443 cpus = 1
444 self.assertTrue(type(cpus) is int)
445 self.assertTrue(cpus >= 1)
446
447 def test_active_children(self):
448 self.assertEqual(type(self.active_children()), list)
449
450 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000451 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000452
Jesus Cea94f964f2011-09-09 20:26:57 +0200453 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000454 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000455 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000456
457 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000458 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000459
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000460 @classmethod
461 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000462 wconn.send(id)
463 if len(id) < 2:
464 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000465 p = cls.Process(
466 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000467 )
468 p.start()
469 p.join()
470
471 def test_recursion(self):
472 rconn, wconn = self.Pipe(duplex=False)
473 self._test_recursion(wconn, [])
474
475 time.sleep(DELTA)
476 result = []
477 while rconn.poll():
478 result.append(rconn.recv())
479
480 expected = [
481 [],
482 [0],
483 [0, 0],
484 [0, 1],
485 [1],
486 [1, 0],
487 [1, 1]
488 ]
489 self.assertEqual(result, expected)
490
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200491 @classmethod
492 def _test_sentinel(cls, event):
493 event.wait(10.0)
494
495 def test_sentinel(self):
496 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600497 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200498 event = self.Event()
499 p = self.Process(target=self._test_sentinel, args=(event,))
500 with self.assertRaises(ValueError):
501 p.sentinel
502 p.start()
503 self.addCleanup(p.join)
504 sentinel = p.sentinel
505 self.assertIsInstance(sentinel, int)
506 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
507 event.set()
508 p.join()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100509 self.assertTrue(wait_for_handle(sentinel, timeout=1))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200510
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200511 @classmethod
512 def _test_close(cls, rc=0, q=None):
513 if q is not None:
514 q.get()
515 sys.exit(rc)
516
517 def test_close(self):
518 if self.TYPE == "threads":
519 self.skipTest('test not appropriate for {}'.format(self.TYPE))
520 q = self.Queue()
521 p = self.Process(target=self._test_close, kwargs={'q': q})
522 p.daemon = True
523 p.start()
524 self.assertEqual(p.is_alive(), True)
525 # Child is still alive, cannot close
526 with self.assertRaises(ValueError):
527 p.close()
528
529 q.put(None)
530 p.join()
531 self.assertEqual(p.is_alive(), False)
532 self.assertEqual(p.exitcode, 0)
533 p.close()
534 with self.assertRaises(ValueError):
535 p.is_alive()
536 with self.assertRaises(ValueError):
537 p.join()
538 with self.assertRaises(ValueError):
539 p.terminate()
540 p.close()
541
542 wr = weakref.ref(p)
543 del p
544 gc.collect()
545 self.assertIs(wr(), None)
546
Victor Stinnerb4c52962017-07-25 02:40:55 +0200547 close_queue(q)
548
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200549 def test_many_processes(self):
550 if self.TYPE == 'threads':
551 self.skipTest('test not appropriate for {}'.format(self.TYPE))
552
553 sm = multiprocessing.get_start_method()
554 N = 5 if sm == 'spawn' else 100
555
556 # Try to overwhelm the forkserver loop with events
557 procs = [self.Process(target=self._test_sleep, args=(0.01,))
558 for i in range(N)]
559 for p in procs:
560 p.start()
561 for p in procs:
Victor Stinner11f08072017-09-15 06:55:31 -0700562 join_process(p)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200563 for p in procs:
564 self.assertEqual(p.exitcode, 0)
565
Vitor Pereiraba75af72017-07-18 16:34:23 +0100566 procs = [self.Process(target=self._sleep_some)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200567 for i in range(N)]
568 for p in procs:
569 p.start()
570 time.sleep(0.001) # let the children start...
571 for p in procs:
572 p.terminate()
573 for p in procs:
Victor Stinner11f08072017-09-15 06:55:31 -0700574 join_process(p)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200575 if os.name != 'nt':
Victor Stinnere6cfdef2017-10-02 08:27:34 -0700576 exitcodes = [-signal.SIGTERM]
577 if sys.platform == 'darwin':
578 # bpo-31510: On macOS, killing a freshly started process with
579 # SIGTERM sometimes kills the process with SIGKILL.
580 exitcodes.append(-signal.SIGKILL)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200581 for p in procs:
Victor Stinnere6cfdef2017-10-02 08:27:34 -0700582 self.assertIn(p.exitcode, exitcodes)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200583
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200584 def test_lose_target_ref(self):
585 c = DummyCallable()
586 wr = weakref.ref(c)
587 q = self.Queue()
588 p = self.Process(target=c, args=(q, c))
589 del c
590 p.start()
591 p.join()
592 self.assertIs(wr(), None)
593 self.assertEqual(q.get(), 5)
Victor Stinnerb4c52962017-07-25 02:40:55 +0200594 close_queue(q)
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200595
Antoine Pitrou896145d2017-07-22 13:22:54 +0200596 @classmethod
597 def _test_child_fd_inflation(self, evt, q):
598 q.put(test.support.fd_count())
599 evt.wait()
600
601 def test_child_fd_inflation(self):
602 # Number of fds in child processes should not grow with the
603 # number of running children.
604 if self.TYPE == 'threads':
605 self.skipTest('test not appropriate for {}'.format(self.TYPE))
606
607 sm = multiprocessing.get_start_method()
608 if sm == 'fork':
609 # The fork method by design inherits all fds from the parent,
610 # trying to go against it is a lost battle
611 self.skipTest('test not appropriate for {}'.format(sm))
612
613 N = 5
614 evt = self.Event()
615 q = self.Queue()
616
617 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
618 for i in range(N)]
619 for p in procs:
620 p.start()
621
622 try:
623 fd_counts = [q.get() for i in range(N)]
624 self.assertEqual(len(set(fd_counts)), 1, fd_counts)
625
626 finally:
627 evt.set()
628 for p in procs:
629 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200630 close_queue(q)
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200631
Antoine Pitrouee84a602017-08-16 20:53:28 +0200632 @classmethod
633 def _test_wait_for_threads(self, evt):
634 def func1():
635 time.sleep(0.5)
636 evt.set()
637
638 def func2():
639 time.sleep(20)
640 evt.clear()
641
642 threading.Thread(target=func1).start()
643 threading.Thread(target=func2, daemon=True).start()
644
645 def test_wait_for_threads(self):
646 # A child process should wait for non-daemonic threads to end
647 # before exiting
648 if self.TYPE == 'threads':
649 self.skipTest('test not appropriate for {}'.format(self.TYPE))
650
651 evt = self.Event()
652 proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
653 proc.start()
654 proc.join()
655 self.assertTrue(evt.is_set())
656
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200657 @classmethod
Antoine Pitroue756f662018-03-11 19:21:38 +0100658 def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
659 for stream_name, action in break_std_streams.items():
660 if action == 'close':
661 stream = io.StringIO()
662 stream.close()
663 else:
664 assert action == 'remove'
665 stream = None
666 setattr(sys, stream_name, None)
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200667 evt.set()
668
Antoine Pitroue756f662018-03-11 19:21:38 +0100669 def test_error_on_stdio_flush_1(self):
670 # Check that Process works with broken standard streams
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200671 streams = [io.StringIO(), None]
672 streams[0].close()
673 for stream_name in ('stdout', 'stderr'):
674 for stream in streams:
675 old_stream = getattr(sys, stream_name)
676 setattr(sys, stream_name, stream)
677 try:
678 evt = self.Event()
679 proc = self.Process(target=self._test_error_on_stdio_flush,
680 args=(evt,))
681 proc.start()
682 proc.join()
683 self.assertTrue(evt.is_set())
Antoine Pitroue756f662018-03-11 19:21:38 +0100684 self.assertEqual(proc.exitcode, 0)
685 finally:
686 setattr(sys, stream_name, old_stream)
687
688 def test_error_on_stdio_flush_2(self):
689 # Same as test_error_on_stdio_flush_1(), but standard streams are
690 # broken by the child process
691 for stream_name in ('stdout', 'stderr'):
692 for action in ('close', 'remove'):
693 old_stream = getattr(sys, stream_name)
694 try:
695 evt = self.Event()
696 proc = self.Process(target=self._test_error_on_stdio_flush,
697 args=(evt, {stream_name: action}))
698 proc.start()
699 proc.join()
700 self.assertTrue(evt.is_set())
701 self.assertEqual(proc.exitcode, 0)
Antoine Pitroudaeefd22017-10-22 11:40:31 +0200702 finally:
703 setattr(sys, stream_name, old_stream)
704
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100705 @classmethod
706 def _sleep_and_set_event(self, evt, delay=0.0):
707 time.sleep(delay)
708 evt.set()
709
710 def check_forkserver_death(self, signum):
711 # bpo-31308: if the forkserver process has died, we should still
712 # be able to create and run new Process instances (the forkserver
713 # is implicitly restarted).
714 if self.TYPE == 'threads':
715 self.skipTest('test not appropriate for {}'.format(self.TYPE))
716 sm = multiprocessing.get_start_method()
717 if sm != 'forkserver':
718 # The fork method by design inherits all fds from the parent,
719 # trying to go against it is a lost battle
720 self.skipTest('test not appropriate for {}'.format(sm))
721
722 from multiprocessing.forkserver import _forkserver
723 _forkserver.ensure_running()
724
Victor Stinner07888e12018-07-04 11:49:41 +0200725 # First process sleeps 500 ms
726 delay = 0.5
727
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100728 evt = self.Event()
Victor Stinner07888e12018-07-04 11:49:41 +0200729 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100730 proc.start()
731
732 pid = _forkserver._forkserver_pid
733 os.kill(pid, signum)
Victor Stinner07888e12018-07-04 11:49:41 +0200734 # give time to the fork server to die and time to proc to complete
735 time.sleep(delay * 2.0)
Antoine Pitroufc6b3482017-11-03 13:34:22 +0100736
737 evt2 = self.Event()
738 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
739 proc2.start()
740 proc2.join()
741 self.assertTrue(evt2.is_set())
742 self.assertEqual(proc2.exitcode, 0)
743
744 proc.join()
745 self.assertTrue(evt.is_set())
746 self.assertIn(proc.exitcode, (0, 255))
747
748 def test_forkserver_sigint(self):
749 # Catchable signal
750 self.check_forkserver_death(signal.SIGINT)
751
752 def test_forkserver_sigkill(self):
753 # Uncatchable signal
754 if os.name != 'nt':
755 self.check_forkserver_death(signal.SIGKILL)
756
Antoine Pitrouee84a602017-08-16 20:53:28 +0200757
Benjamin Petersone711caf2008-06-11 16:44:04 +0000758#
759#
760#
761
762class _UpperCaser(multiprocessing.Process):
763
764 def __init__(self):
765 multiprocessing.Process.__init__(self)
766 self.child_conn, self.parent_conn = multiprocessing.Pipe()
767
768 def run(self):
769 self.parent_conn.close()
770 for s in iter(self.child_conn.recv, None):
771 self.child_conn.send(s.upper())
772 self.child_conn.close()
773
774 def submit(self, s):
775 assert type(s) is str
776 self.parent_conn.send(s)
777 return self.parent_conn.recv()
778
779 def stop(self):
780 self.parent_conn.send(None)
781 self.parent_conn.close()
782 self.child_conn.close()
783
784class _TestSubclassingProcess(BaseTestCase):
785
786 ALLOWED_TYPES = ('processes',)
787
788 def test_subclassing(self):
789 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200790 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000791 uppercaser.start()
792 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
793 self.assertEqual(uppercaser.submit('world'), 'WORLD')
794 uppercaser.stop()
795 uppercaser.join()
796
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100797 def test_stderr_flush(self):
798 # sys.stderr is flushed at process shutdown (issue #13812)
799 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600800 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100801
802 testfn = test.support.TESTFN
803 self.addCleanup(test.support.unlink, testfn)
804 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
805 proc.start()
806 proc.join()
807 with open(testfn, 'r') as f:
808 err = f.read()
809 # The whole traceback was printed
810 self.assertIn("ZeroDivisionError", err)
811 self.assertIn("test_multiprocessing.py", err)
812 self.assertIn("1/0 # MARKER", err)
813
814 @classmethod
815 def _test_stderr_flush(cls, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100816 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
817 sys.stderr = open(fd, 'w', closefd=False)
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100818 1/0 # MARKER
819
820
Richard Oudkerk29471de2012-06-06 19:04:57 +0100821 @classmethod
822 def _test_sys_exit(cls, reason, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100823 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
824 sys.stderr = open(fd, 'w', closefd=False)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100825 sys.exit(reason)
826
827 def test_sys_exit(self):
828 # See Issue 13854
829 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600830 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk29471de2012-06-06 19:04:57 +0100831
832 testfn = test.support.TESTFN
833 self.addCleanup(test.support.unlink, testfn)
834
Victor Stinnera6d865c2016-03-25 09:29:50 +0100835 for reason in (
836 [1, 2, 3],
837 'ignore this',
838 ):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100839 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
840 p.daemon = True
841 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -0700842 join_process(p)
Victor Stinnera6d865c2016-03-25 09:29:50 +0100843 self.assertEqual(p.exitcode, 1)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100844
845 with open(testfn, 'r') as f:
Victor Stinnera6d865c2016-03-25 09:29:50 +0100846 content = f.read()
847 self.assertEqual(content.rstrip(), str(reason))
848
849 os.unlink(testfn)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100850
851 for reason in (True, False, 8):
852 p = self.Process(target=sys.exit, args=(reason,))
853 p.daemon = True
854 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -0700855 join_process(p)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100856 self.assertEqual(p.exitcode, reason)
857
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858#
859#
860#
861
862def queue_empty(q):
863 if hasattr(q, 'empty'):
864 return q.empty()
865 else:
866 return q.qsize() == 0
867
868def queue_full(q, maxsize):
869 if hasattr(q, 'full'):
870 return q.full()
871 else:
872 return q.qsize() == maxsize
873
874
875class _TestQueue(BaseTestCase):
876
877
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000878 @classmethod
879 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000880 child_can_start.wait()
881 for i in range(6):
882 queue.get()
883 parent_can_continue.set()
884
885 def test_put(self):
886 MAXSIZE = 6
887 queue = self.Queue(maxsize=MAXSIZE)
888 child_can_start = self.Event()
889 parent_can_continue = self.Event()
890
891 proc = self.Process(
892 target=self._test_put,
893 args=(queue, child_can_start, parent_can_continue)
894 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000895 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000896 proc.start()
897
898 self.assertEqual(queue_empty(queue), True)
899 self.assertEqual(queue_full(queue, MAXSIZE), False)
900
901 queue.put(1)
902 queue.put(2, True)
903 queue.put(3, True, None)
904 queue.put(4, False)
905 queue.put(5, False, None)
906 queue.put_nowait(6)
907
908 # the values may be in buffer but not yet in pipe so sleep a bit
909 time.sleep(DELTA)
910
911 self.assertEqual(queue_empty(queue), False)
912 self.assertEqual(queue_full(queue, MAXSIZE), True)
913
914 put = TimingWrapper(queue.put)
915 put_nowait = TimingWrapper(queue.put_nowait)
916
917 self.assertRaises(pyqueue.Full, put, 7, False)
918 self.assertTimingAlmostEqual(put.elapsed, 0)
919
920 self.assertRaises(pyqueue.Full, put, 7, False, None)
921 self.assertTimingAlmostEqual(put.elapsed, 0)
922
923 self.assertRaises(pyqueue.Full, put_nowait, 7)
924 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
925
926 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
927 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
928
929 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
930 self.assertTimingAlmostEqual(put.elapsed, 0)
931
932 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
933 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
934
935 child_can_start.set()
936 parent_can_continue.wait()
937
938 self.assertEqual(queue_empty(queue), True)
939 self.assertEqual(queue_full(queue, MAXSIZE), False)
940
941 proc.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200942 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000943
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000944 @classmethod
945 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000946 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000947 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000948 queue.put(2)
949 queue.put(3)
950 queue.put(4)
951 queue.put(5)
952 parent_can_continue.set()
953
954 def test_get(self):
955 queue = self.Queue()
956 child_can_start = self.Event()
957 parent_can_continue = self.Event()
958
959 proc = self.Process(
960 target=self._test_get,
961 args=(queue, child_can_start, parent_can_continue)
962 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000963 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964 proc.start()
965
966 self.assertEqual(queue_empty(queue), True)
967
968 child_can_start.set()
969 parent_can_continue.wait()
970
971 time.sleep(DELTA)
972 self.assertEqual(queue_empty(queue), False)
973
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000974 # Hangs unexpectedly, remove for now
975 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976 self.assertEqual(queue.get(True, None), 2)
977 self.assertEqual(queue.get(True), 3)
978 self.assertEqual(queue.get(timeout=1), 4)
979 self.assertEqual(queue.get_nowait(), 5)
980
981 self.assertEqual(queue_empty(queue), True)
982
983 get = TimingWrapper(queue.get)
984 get_nowait = TimingWrapper(queue.get_nowait)
985
986 self.assertRaises(pyqueue.Empty, get, False)
987 self.assertTimingAlmostEqual(get.elapsed, 0)
988
989 self.assertRaises(pyqueue.Empty, get, False, None)
990 self.assertTimingAlmostEqual(get.elapsed, 0)
991
992 self.assertRaises(pyqueue.Empty, get_nowait)
993 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
994
995 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
996 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
997
998 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
999 self.assertTimingAlmostEqual(get.elapsed, 0)
1000
1001 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
1002 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
1003
1004 proc.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +02001005 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001006
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001007 @classmethod
1008 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001009 for i in range(10, 20):
1010 queue.put(i)
1011 # note that at this point the items may only be buffered, so the
1012 # process cannot shutdown until the feeder thread has finished
1013 # pushing items onto the pipe.
1014
1015 def test_fork(self):
1016 # Old versions of Queue would fail to create a new feeder
1017 # thread for a forked process if the original process had its
1018 # own feeder thread. This test checks that this no longer
1019 # happens.
1020
1021 queue = self.Queue()
1022
1023 # put items on queue so that main process starts a feeder thread
1024 for i in range(10):
1025 queue.put(i)
1026
1027 # wait to make sure thread starts before we fork a new process
1028 time.sleep(DELTA)
1029
1030 # fork process
1031 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001032 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033 p.start()
1034
1035 # check that all expected items are in the queue
1036 for i in range(20):
1037 self.assertEqual(queue.get(), i)
1038 self.assertRaises(pyqueue.Empty, queue.get, False)
1039
1040 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +02001041 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001042
1043 def test_qsize(self):
1044 q = self.Queue()
1045 try:
1046 self.assertEqual(q.qsize(), 0)
1047 except NotImplementedError:
Zachary Ware9fe6d862013-12-08 00:20:35 -06001048 self.skipTest('qsize method not implemented')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001049 q.put(1)
1050 self.assertEqual(q.qsize(), 1)
1051 q.put(5)
1052 self.assertEqual(q.qsize(), 2)
1053 q.get()
1054 self.assertEqual(q.qsize(), 1)
1055 q.get()
1056 self.assertEqual(q.qsize(), 0)
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001057 close_queue(q)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001059 @classmethod
1060 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001061 for obj in iter(q.get, None):
1062 time.sleep(DELTA)
1063 q.task_done()
1064
1065 def test_task_done(self):
1066 queue = self.JoinableQueue()
1067
Benjamin Petersone711caf2008-06-11 16:44:04 +00001068 workers = [self.Process(target=self._test_task_done, args=(queue,))
1069 for i in range(4)]
1070
1071 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +02001072 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001073 p.start()
1074
1075 for i in range(10):
1076 queue.put(i)
1077
1078 queue.join()
1079
1080 for p in workers:
1081 queue.put(None)
1082
1083 for p in workers:
1084 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +02001085 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001086
Serhiy Storchakaf8904e92015-03-06 23:32:54 +02001087 def test_no_import_lock_contention(self):
1088 with test.support.temp_cwd():
1089 module_name = 'imported_by_an_imported_module'
1090 with open(module_name + '.py', 'w') as f:
1091 f.write("""if 1:
1092 import multiprocessing
1093
1094 q = multiprocessing.Queue()
1095 q.put('knock knock')
1096 q.get(timeout=3)
1097 q.close()
1098 del q
1099 """)
1100
1101 with test.support.DirsOnSysPath(os.getcwd()):
1102 try:
1103 __import__(module_name)
1104 except pyqueue.Empty:
1105 self.fail("Probable regression on import lock contention;"
1106 " see Issue #22853")
1107
Giampaolo Rodola'30830712013-04-17 13:12:27 +02001108 def test_timeout(self):
1109 q = multiprocessing.Queue()
Victor Stinner2cf4c202018-12-17 09:36:36 +01001110 start = time.monotonic()
Victor Stinneraad7b2e2015-02-05 14:25:05 +01001111 self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
Victor Stinner2cf4c202018-12-17 09:36:36 +01001112 delta = time.monotonic() - start
Victor Stinner5640d032018-08-03 02:09:00 +02001113 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1114 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1115 # failed because the delta was only 135.8 ms.
1116 self.assertGreaterEqual(delta, 0.100)
Victor Stinnerb4c52962017-07-25 02:40:55 +02001117 close_queue(q)
Giampaolo Rodola'30830712013-04-17 13:12:27 +02001118
grzgrzgrz3bc50f032017-05-25 16:22:57 +02001119 def test_queue_feeder_donot_stop_onexc(self):
1120 # bpo-30414: verify feeder handles exceptions correctly
1121 if self.TYPE != 'processes':
1122 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1123
1124 class NotSerializable(object):
1125 def __reduce__(self):
1126 raise AttributeError
1127 with test.support.captured_stderr():
1128 q = self.Queue()
1129 q.put(NotSerializable())
1130 q.put(True)
Miss Islington (bot)fea9ca12019-09-24 04:07:28 -07001131 self.assertTrue(q.get(timeout=TIMEOUT))
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001132 close_queue(q)
grzgrzgrz3bc50f032017-05-25 16:22:57 +02001133
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001134 with test.support.captured_stderr():
1135 # bpo-33078: verify that the queue size is correctly handled
1136 # on errors.
1137 q = self.Queue(maxsize=1)
1138 q.put(NotSerializable())
1139 q.put(True)
Thomas Moreaudec1c772018-03-21 18:56:27 +01001140 try:
1141 self.assertEqual(q.qsize(), 1)
1142 except NotImplementedError:
1143 # qsize is not available on all platform as it
1144 # relies on sem_getvalue
1145 pass
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001146 # bpo-30595: use a timeout of 1 second for slow buildbots
1147 self.assertTrue(q.get(timeout=1.0))
1148 # Check that the size of the queue is correct
Thomas Moreaudec1c772018-03-21 18:56:27 +01001149 self.assertTrue(q.empty())
Thomas Moreaue2f33ad2018-03-21 16:50:28 +01001150 close_queue(q)
1151
Thomas Moreau94459fd2018-01-05 11:15:54 +01001152 def test_queue_feeder_on_queue_feeder_error(self):
1153 # bpo-30006: verify feeder handles exceptions using the
1154 # _on_queue_feeder_error hook.
1155 if self.TYPE != 'processes':
1156 self.skipTest('test not appropriate for {}'.format(self.TYPE))
1157
1158 class NotSerializable(object):
1159 """Mock unserializable object"""
1160 def __init__(self):
1161 self.reduce_was_called = False
1162 self.on_queue_feeder_error_was_called = False
1163
1164 def __reduce__(self):
1165 self.reduce_was_called = True
1166 raise AttributeError
1167
1168 class SafeQueue(multiprocessing.queues.Queue):
1169 """Queue with overloaded _on_queue_feeder_error hook"""
1170 @staticmethod
1171 def _on_queue_feeder_error(e, obj):
1172 if (isinstance(e, AttributeError) and
1173 isinstance(obj, NotSerializable)):
1174 obj.on_queue_feeder_error_was_called = True
1175
1176 not_serializable_obj = NotSerializable()
1177 # The captured_stderr reduces the noise in the test report
1178 with test.support.captured_stderr():
1179 q = SafeQueue(ctx=multiprocessing.get_context())
1180 q.put(not_serializable_obj)
1181
Ville Skyttä61f82e02018-04-20 23:08:45 +03001182 # Verify that q is still functioning correctly
Thomas Moreau94459fd2018-01-05 11:15:54 +01001183 q.put(True)
1184 self.assertTrue(q.get(timeout=1.0))
1185
1186 # Assert that the serialization and the hook have been called correctly
1187 self.assertTrue(not_serializable_obj.reduce_was_called)
1188 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
Zackery Spytz04617042018-10-13 03:26:09 -06001189
1190 def test_closed_queue_put_get_exceptions(self):
1191 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1192 q.close()
1193 with self.assertRaisesRegex(ValueError, 'is closed'):
1194 q.put('foo')
1195 with self.assertRaisesRegex(ValueError, 'is closed'):
1196 q.get()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001197#
1198#
1199#
1200
1201class _TestLock(BaseTestCase):
1202
1203 def test_lock(self):
1204 lock = self.Lock()
1205 self.assertEqual(lock.acquire(), True)
1206 self.assertEqual(lock.acquire(False), False)
1207 self.assertEqual(lock.release(), None)
1208 self.assertRaises((ValueError, threading.ThreadError), lock.release)
1209
1210 def test_rlock(self):
1211 lock = self.RLock()
1212 self.assertEqual(lock.acquire(), True)
1213 self.assertEqual(lock.acquire(), True)
1214 self.assertEqual(lock.acquire(), True)
1215 self.assertEqual(lock.release(), None)
1216 self.assertEqual(lock.release(), None)
1217 self.assertEqual(lock.release(), None)
1218 self.assertRaises((AssertionError, RuntimeError), lock.release)
1219
Jesse Nollerf8d00852009-03-31 03:25:07 +00001220 def test_lock_context(self):
1221 with self.Lock():
1222 pass
1223
Benjamin Petersone711caf2008-06-11 16:44:04 +00001224
1225class _TestSemaphore(BaseTestCase):
1226
1227 def _test_semaphore(self, sem):
1228 self.assertReturnsIfImplemented(2, get_value, sem)
1229 self.assertEqual(sem.acquire(), True)
1230 self.assertReturnsIfImplemented(1, get_value, sem)
1231 self.assertEqual(sem.acquire(), True)
1232 self.assertReturnsIfImplemented(0, get_value, sem)
1233 self.assertEqual(sem.acquire(False), False)
1234 self.assertReturnsIfImplemented(0, get_value, sem)
1235 self.assertEqual(sem.release(), None)
1236 self.assertReturnsIfImplemented(1, get_value, sem)
1237 self.assertEqual(sem.release(), None)
1238 self.assertReturnsIfImplemented(2, get_value, sem)
1239
1240 def test_semaphore(self):
1241 sem = self.Semaphore(2)
1242 self._test_semaphore(sem)
1243 self.assertEqual(sem.release(), None)
1244 self.assertReturnsIfImplemented(3, get_value, sem)
1245 self.assertEqual(sem.release(), None)
1246 self.assertReturnsIfImplemented(4, get_value, sem)
1247
1248 def test_bounded_semaphore(self):
1249 sem = self.BoundedSemaphore(2)
1250 self._test_semaphore(sem)
1251 # Currently fails on OS/X
1252 #if HAVE_GETVALUE:
1253 # self.assertRaises(ValueError, sem.release)
1254 # self.assertReturnsIfImplemented(2, get_value, sem)
1255
1256 def test_timeout(self):
1257 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001258 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001259
1260 sem = self.Semaphore(0)
1261 acquire = TimingWrapper(sem.acquire)
1262
1263 self.assertEqual(acquire(False), False)
1264 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1265
1266 self.assertEqual(acquire(False, None), False)
1267 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1268
1269 self.assertEqual(acquire(False, TIMEOUT1), False)
1270 self.assertTimingAlmostEqual(acquire.elapsed, 0)
1271
1272 self.assertEqual(acquire(True, TIMEOUT2), False)
1273 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1274
1275 self.assertEqual(acquire(timeout=TIMEOUT3), False)
1276 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1277
1278
1279class _TestCondition(BaseTestCase):
1280
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001281 @classmethod
1282 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001283 cond.acquire()
1284 sleeping.release()
1285 cond.wait(timeout)
1286 woken.release()
1287 cond.release()
1288
Antoine Pitrou48350412017-07-04 08:59:22 +02001289 def assertReachesEventually(self, func, value):
1290 for i in range(10):
1291 try:
1292 if func() == value:
1293 break
1294 except NotImplementedError:
1295 break
1296 time.sleep(DELTA)
1297 time.sleep(DELTA)
1298 self.assertReturnsIfImplemented(value, func)
1299
Benjamin Petersone711caf2008-06-11 16:44:04 +00001300 def check_invariant(self, cond):
1301 # this is only supposed to succeed when there are no sleepers
1302 if self.TYPE == 'processes':
1303 try:
1304 sleepers = (cond._sleeping_count.get_value() -
1305 cond._woken_count.get_value())
1306 self.assertEqual(sleepers, 0)
1307 self.assertEqual(cond._wait_semaphore.get_value(), 0)
1308 except NotImplementedError:
1309 pass
1310
1311 def test_notify(self):
1312 cond = self.Condition()
1313 sleeping = self.Semaphore(0)
1314 woken = self.Semaphore(0)
1315
1316 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001317 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001318 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001319 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001320
1321 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001322 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001323 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001324 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001325
1326 # wait for both children to start sleeping
1327 sleeping.acquire()
1328 sleeping.acquire()
1329
1330 # check no process/thread has woken up
1331 time.sleep(DELTA)
1332 self.assertReturnsIfImplemented(0, get_value, woken)
1333
1334 # wake up one process/thread
1335 cond.acquire()
1336 cond.notify()
1337 cond.release()
1338
1339 # check one process/thread has woken up
1340 time.sleep(DELTA)
1341 self.assertReturnsIfImplemented(1, get_value, woken)
1342
1343 # wake up another
1344 cond.acquire()
1345 cond.notify()
1346 cond.release()
1347
1348 # check other has woken up
1349 time.sleep(DELTA)
1350 self.assertReturnsIfImplemented(2, get_value, woken)
1351
1352 # check state is not mucked up
1353 self.check_invariant(cond)
1354 p.join()
1355
1356 def test_notify_all(self):
1357 cond = self.Condition()
1358 sleeping = self.Semaphore(0)
1359 woken = self.Semaphore(0)
1360
1361 # start some threads/processes which will timeout
1362 for i in range(3):
1363 p = self.Process(target=self.f,
1364 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001365 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001366 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001367 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001368
1369 t = threading.Thread(target=self.f,
1370 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +00001371 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001372 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001373 self.addCleanup(t.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001374
1375 # wait for them all to sleep
1376 for i in range(6):
1377 sleeping.acquire()
1378
1379 # check they have all timed out
1380 for i in range(6):
1381 woken.acquire()
1382 self.assertReturnsIfImplemented(0, get_value, woken)
1383
1384 # check state is not mucked up
1385 self.check_invariant(cond)
1386
1387 # start some more threads/processes
1388 for i in range(3):
1389 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001390 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001391 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001392 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001393
1394 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +00001395 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001396 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001397 self.addCleanup(t.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001398
1399 # wait for them to all sleep
1400 for i in range(6):
1401 sleeping.acquire()
1402
1403 # check no process/thread has woken up
1404 time.sleep(DELTA)
1405 self.assertReturnsIfImplemented(0, get_value, woken)
1406
1407 # wake them all up
1408 cond.acquire()
1409 cond.notify_all()
1410 cond.release()
1411
1412 # check they have all woken
Antoine Pitrou48350412017-07-04 08:59:22 +02001413 self.assertReachesEventually(lambda: get_value(woken), 6)
1414
1415 # check state is not mucked up
1416 self.check_invariant(cond)
1417
1418 def test_notify_n(self):
1419 cond = self.Condition()
1420 sleeping = self.Semaphore(0)
1421 woken = self.Semaphore(0)
1422
1423 # start some threads/processes
1424 for i in range(3):
1425 p = self.Process(target=self.f, args=(cond, sleeping, woken))
1426 p.daemon = True
1427 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001428 self.addCleanup(p.join)
Antoine Pitrou48350412017-07-04 08:59:22 +02001429
1430 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1431 t.daemon = True
1432 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001433 self.addCleanup(t.join)
Antoine Pitrou48350412017-07-04 08:59:22 +02001434
1435 # wait for them to all sleep
1436 for i in range(6):
1437 sleeping.acquire()
1438
1439 # check no process/thread has woken up
1440 time.sleep(DELTA)
1441 self.assertReturnsIfImplemented(0, get_value, woken)
1442
1443 # wake some of them up
1444 cond.acquire()
1445 cond.notify(n=2)
1446 cond.release()
1447
1448 # check 2 have woken
1449 self.assertReachesEventually(lambda: get_value(woken), 2)
1450
1451 # wake the rest of them
1452 cond.acquire()
1453 cond.notify(n=4)
1454 cond.release()
1455
1456 self.assertReachesEventually(lambda: get_value(woken), 6)
1457
1458 # doesn't do anything more
1459 cond.acquire()
1460 cond.notify(n=3)
1461 cond.release()
1462
Benjamin Petersone711caf2008-06-11 16:44:04 +00001463 self.assertReturnsIfImplemented(6, get_value, woken)
1464
1465 # check state is not mucked up
1466 self.check_invariant(cond)
1467
1468 def test_timeout(self):
1469 cond = self.Condition()
1470 wait = TimingWrapper(cond.wait)
1471 cond.acquire()
1472 res = wait(TIMEOUT1)
1473 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +00001474 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001475 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1476
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001477 @classmethod
1478 def _test_waitfor_f(cls, cond, state):
1479 with cond:
1480 state.value = 0
1481 cond.notify()
1482 result = cond.wait_for(lambda : state.value==4)
1483 if not result or state.value != 4:
1484 sys.exit(1)
1485
1486 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1487 def test_waitfor(self):
1488 # based on test in test/lock_tests.py
1489 cond = self.Condition()
1490 state = self.Value('i', -1)
1491
1492 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1493 p.daemon = True
1494 p.start()
1495
1496 with cond:
1497 result = cond.wait_for(lambda : state.value==0)
1498 self.assertTrue(result)
1499 self.assertEqual(state.value, 0)
1500
1501 for i in range(4):
1502 time.sleep(0.01)
1503 with cond:
1504 state.value += 1
1505 cond.notify()
1506
Victor Stinner11f08072017-09-15 06:55:31 -07001507 join_process(p)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001508 self.assertEqual(p.exitcode, 0)
1509
1510 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001511 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1512 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001513 with cond:
1514 expected = 0.1
Victor Stinner2cf4c202018-12-17 09:36:36 +01001515 dt = time.monotonic()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001516 result = cond.wait_for(lambda : state.value==4, timeout=expected)
Victor Stinner2cf4c202018-12-17 09:36:36 +01001517 dt = time.monotonic() - dt
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001518 # borrow logic in assertTimeout() from test/lock_tests.py
1519 if not result and expected * 0.6 < dt < expected * 10.0:
1520 success.value = True
1521
1522 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1523 def test_waitfor_timeout(self):
1524 # based on test in test/lock_tests.py
1525 cond = self.Condition()
1526 state = self.Value('i', 0)
1527 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001528 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001529
1530 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001531 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001532 p.daemon = True
1533 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -07001534 self.assertTrue(sem.acquire(timeout=TIMEOUT))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001535
1536 # Only increment 3 times, so state == 4 is never reached.
1537 for i in range(3):
1538 time.sleep(0.01)
1539 with cond:
1540 state.value += 1
1541 cond.notify()
1542
Victor Stinner11f08072017-09-15 06:55:31 -07001543 join_process(p)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001544 self.assertTrue(success.value)
1545
Richard Oudkerk98449932012-06-05 13:15:29 +01001546 @classmethod
1547 def _test_wait_result(cls, c, pid):
1548 with c:
1549 c.notify()
1550 time.sleep(1)
1551 if pid is not None:
1552 os.kill(pid, signal.SIGINT)
1553
1554 def test_wait_result(self):
1555 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1556 pid = os.getpid()
1557 else:
1558 pid = None
1559
1560 c = self.Condition()
1561 with c:
1562 self.assertFalse(c.wait(0))
1563 self.assertFalse(c.wait(0.1))
1564
1565 p = self.Process(target=self._test_wait_result, args=(c, pid))
1566 p.start()
1567
Victor Stinner49257272018-06-27 22:24:02 +02001568 self.assertTrue(c.wait(60))
Richard Oudkerk98449932012-06-05 13:15:29 +01001569 if pid is not None:
Victor Stinner49257272018-06-27 22:24:02 +02001570 self.assertRaises(KeyboardInterrupt, c.wait, 60)
Richard Oudkerk98449932012-06-05 13:15:29 +01001571
1572 p.join()
1573
Benjamin Petersone711caf2008-06-11 16:44:04 +00001574
1575class _TestEvent(BaseTestCase):
1576
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001577 @classmethod
1578 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001579 time.sleep(TIMEOUT2)
1580 event.set()
1581
1582 def test_event(self):
1583 event = self.Event()
1584 wait = TimingWrapper(event.wait)
1585
Ezio Melotti13925002011-03-16 11:05:33 +02001586 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001587 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001588 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001589
Benjamin Peterson965ce872009-04-05 21:24:58 +00001590 # Removed, threading.Event.wait() will return the value of the __flag
1591 # instead of None. API Shear with the semaphore backed mp.Event
1592 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001593 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001594 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001595 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1596
1597 event.set()
1598
1599 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001600 self.assertEqual(event.is_set(), True)
1601 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001602 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001603 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001604 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1605 # self.assertEqual(event.is_set(), True)
1606
1607 event.clear()
1608
1609 #self.assertEqual(event.is_set(), False)
1610
Jesus Cea94f964f2011-09-09 20:26:57 +02001611 p = self.Process(target=self._test_event, args=(event,))
1612 p.daemon = True
1613 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001614 self.assertEqual(wait(), True)
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001615 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001616
1617#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001618# Tests for Barrier - adapted from tests in test/lock_tests.py
1619#
1620
1621# Many of the tests for threading.Barrier use a list as an atomic
1622# counter: a value is appended to increment the counter, and the
1623# length of the list gives the value. We use the class DummyList
1624# for the same purpose.
1625
1626class _DummyList(object):
1627
1628 def __init__(self):
1629 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1630 lock = multiprocessing.Lock()
1631 self.__setstate__((wrapper, lock))
1632 self._lengthbuf[0] = 0
1633
1634 def __setstate__(self, state):
1635 (self._wrapper, self._lock) = state
1636 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1637
1638 def __getstate__(self):
1639 return (self._wrapper, self._lock)
1640
1641 def append(self, _):
1642 with self._lock:
1643 self._lengthbuf[0] += 1
1644
1645 def __len__(self):
1646 with self._lock:
1647 return self._lengthbuf[0]
1648
1649def _wait():
1650 # A crude wait/yield function not relying on synchronization primitives.
1651 time.sleep(0.01)
1652
1653
1654class Bunch(object):
1655 """
1656 A bunch of threads.
1657 """
1658 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1659 """
1660 Construct a bunch of `n` threads running the same function `f`.
1661 If `wait_before_exit` is True, the threads won't terminate until
1662 do_finish() is called.
1663 """
1664 self.f = f
1665 self.args = args
1666 self.n = n
1667 self.started = namespace.DummyList()
1668 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001669 self._can_exit = namespace.Event()
1670 if not wait_before_exit:
1671 self._can_exit.set()
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001672
1673 threads = []
Richard Oudkerk3730a172012-06-15 18:26:07 +01001674 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001675 p = namespace.Process(target=self.task)
1676 p.daemon = True
1677 p.start()
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001678 threads.append(p)
1679
1680 def finalize(threads):
1681 for p in threads:
1682 p.join()
1683
1684 self._finalizer = weakref.finalize(self, finalize, threads)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001685
1686 def task(self):
1687 pid = os.getpid()
1688 self.started.append(pid)
1689 try:
1690 self.f(*self.args)
1691 finally:
1692 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001693 self._can_exit.wait(30)
1694 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001695
1696 def wait_for_started(self):
1697 while len(self.started) < self.n:
1698 _wait()
1699
1700 def wait_for_finished(self):
1701 while len(self.finished) < self.n:
1702 _wait()
1703
1704 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001705 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001706
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001707 def close(self):
1708 self._finalizer()
1709
Richard Oudkerk3730a172012-06-15 18:26:07 +01001710
1711class AppendTrue(object):
1712 def __init__(self, obj):
1713 self.obj = obj
1714 def __call__(self):
1715 self.obj.append(True)
1716
1717
1718class _TestBarrier(BaseTestCase):
1719 """
1720 Tests for Barrier objects.
1721 """
1722 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001723 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001724
1725 def setUp(self):
1726 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1727
1728 def tearDown(self):
1729 self.barrier.abort()
1730 self.barrier = None
1731
1732 def DummyList(self):
1733 if self.TYPE == 'threads':
1734 return []
1735 elif self.TYPE == 'manager':
1736 return self.manager.list()
1737 else:
1738 return _DummyList()
1739
1740 def run_threads(self, f, args):
1741 b = Bunch(self, f, args, self.N-1)
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001742 try:
1743 f(*args)
1744 b.wait_for_finished()
1745 finally:
1746 b.close()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001747
1748 @classmethod
1749 def multipass(cls, barrier, results, n):
1750 m = barrier.parties
1751 assert m == cls.N
1752 for i in range(n):
1753 results[0].append(True)
1754 assert len(results[1]) == i * m
1755 barrier.wait()
1756 results[1].append(True)
1757 assert len(results[0]) == (i + 1) * m
1758 barrier.wait()
1759 try:
1760 assert barrier.n_waiting == 0
1761 except NotImplementedError:
1762 pass
1763 assert not barrier.broken
1764
1765 def test_barrier(self, passes=1):
1766 """
1767 Test that a barrier is passed in lockstep
1768 """
1769 results = [self.DummyList(), self.DummyList()]
1770 self.run_threads(self.multipass, (self.barrier, results, passes))
1771
1772 def test_barrier_10(self):
1773 """
1774 Test that a barrier works for 10 consecutive runs
1775 """
1776 return self.test_barrier(10)
1777
1778 @classmethod
1779 def _test_wait_return_f(cls, barrier, queue):
1780 res = barrier.wait()
1781 queue.put(res)
1782
1783 def test_wait_return(self):
1784 """
1785 test the return value from barrier.wait
1786 """
1787 queue = self.Queue()
1788 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1789 results = [queue.get() for i in range(self.N)]
1790 self.assertEqual(results.count(0), 1)
Victor Stinnerb4c52962017-07-25 02:40:55 +02001791 close_queue(queue)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001792
1793 @classmethod
1794 def _test_action_f(cls, barrier, results):
1795 barrier.wait()
1796 if len(results) != 1:
1797 raise RuntimeError
1798
1799 def test_action(self):
1800 """
1801 Test the 'action' callback
1802 """
1803 results = self.DummyList()
1804 barrier = self.Barrier(self.N, action=AppendTrue(results))
1805 self.run_threads(self._test_action_f, (barrier, results))
1806 self.assertEqual(len(results), 1)
1807
1808 @classmethod
1809 def _test_abort_f(cls, barrier, results1, results2):
1810 try:
1811 i = barrier.wait()
1812 if i == cls.N//2:
1813 raise RuntimeError
1814 barrier.wait()
1815 results1.append(True)
1816 except threading.BrokenBarrierError:
1817 results2.append(True)
1818 except RuntimeError:
1819 barrier.abort()
1820
1821 def test_abort(self):
1822 """
1823 Test that an abort will put the barrier in a broken state
1824 """
1825 results1 = self.DummyList()
1826 results2 = self.DummyList()
1827 self.run_threads(self._test_abort_f,
1828 (self.barrier, results1, results2))
1829 self.assertEqual(len(results1), 0)
1830 self.assertEqual(len(results2), self.N-1)
1831 self.assertTrue(self.barrier.broken)
1832
1833 @classmethod
1834 def _test_reset_f(cls, barrier, results1, results2, results3):
1835 i = barrier.wait()
1836 if i == cls.N//2:
1837 # Wait until the other threads are all in the barrier.
1838 while barrier.n_waiting < cls.N-1:
1839 time.sleep(0.001)
1840 barrier.reset()
1841 else:
1842 try:
1843 barrier.wait()
1844 results1.append(True)
1845 except threading.BrokenBarrierError:
1846 results2.append(True)
1847 # Now, pass the barrier again
1848 barrier.wait()
1849 results3.append(True)
1850
1851 def test_reset(self):
1852 """
1853 Test that a 'reset' on a barrier frees the waiting threads
1854 """
1855 results1 = self.DummyList()
1856 results2 = self.DummyList()
1857 results3 = self.DummyList()
1858 self.run_threads(self._test_reset_f,
1859 (self.barrier, results1, results2, results3))
1860 self.assertEqual(len(results1), 0)
1861 self.assertEqual(len(results2), self.N-1)
1862 self.assertEqual(len(results3), self.N)
1863
1864 @classmethod
1865 def _test_abort_and_reset_f(cls, barrier, barrier2,
1866 results1, results2, results3):
1867 try:
1868 i = barrier.wait()
1869 if i == cls.N//2:
1870 raise RuntimeError
1871 barrier.wait()
1872 results1.append(True)
1873 except threading.BrokenBarrierError:
1874 results2.append(True)
1875 except RuntimeError:
1876 barrier.abort()
1877 # Synchronize and reset the barrier. Must synchronize first so
1878 # that everyone has left it when we reset, and after so that no
1879 # one enters it before the reset.
1880 if barrier2.wait() == cls.N//2:
1881 barrier.reset()
1882 barrier2.wait()
1883 barrier.wait()
1884 results3.append(True)
1885
1886 def test_abort_and_reset(self):
1887 """
1888 Test that a barrier can be reset after being broken.
1889 """
1890 results1 = self.DummyList()
1891 results2 = self.DummyList()
1892 results3 = self.DummyList()
1893 barrier2 = self.Barrier(self.N)
1894
1895 self.run_threads(self._test_abort_and_reset_f,
1896 (self.barrier, barrier2, results1, results2, results3))
1897 self.assertEqual(len(results1), 0)
1898 self.assertEqual(len(results2), self.N-1)
1899 self.assertEqual(len(results3), self.N)
1900
1901 @classmethod
1902 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001903 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001904 if i == cls.N//2:
1905 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001906 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001907 try:
1908 barrier.wait(0.5)
1909 except threading.BrokenBarrierError:
1910 results.append(True)
1911
1912 def test_timeout(self):
1913 """
1914 Test wait(timeout)
1915 """
1916 results = self.DummyList()
1917 self.run_threads(self._test_timeout_f, (self.barrier, results))
1918 self.assertEqual(len(results), self.barrier.parties)
1919
1920 @classmethod
1921 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001922 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001923 if i == cls.N//2:
1924 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001925 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001926 try:
1927 barrier.wait()
1928 except threading.BrokenBarrierError:
1929 results.append(True)
1930
1931 def test_default_timeout(self):
1932 """
1933 Test the barrier's default timeout
1934 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001935 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001936 results = self.DummyList()
1937 self.run_threads(self._test_default_timeout_f, (barrier, results))
1938 self.assertEqual(len(results), barrier.parties)
1939
1940 def test_single_thread(self):
1941 b = self.Barrier(1)
1942 b.wait()
1943 b.wait()
1944
1945 @classmethod
1946 def _test_thousand_f(cls, barrier, passes, conn, lock):
1947 for i in range(passes):
1948 barrier.wait()
1949 with lock:
1950 conn.send(i)
1951
1952 def test_thousand(self):
1953 if self.TYPE == 'manager':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001954 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk3730a172012-06-15 18:26:07 +01001955 passes = 1000
1956 lock = self.Lock()
1957 conn, child_conn = self.Pipe(False)
1958 for j in range(self.N):
1959 p = self.Process(target=self._test_thousand_f,
1960 args=(self.barrier, passes, child_conn, lock))
1961 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001962 self.addCleanup(p.join)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001963
1964 for i in range(passes):
1965 for j in range(self.N):
1966 self.assertEqual(conn.recv(), i)
1967
1968#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001969#
1970#
1971
1972class _TestValue(BaseTestCase):
1973
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001974 ALLOWED_TYPES = ('processes',)
1975
Benjamin Petersone711caf2008-06-11 16:44:04 +00001976 codes_values = [
1977 ('i', 4343, 24234),
1978 ('d', 3.625, -4.25),
1979 ('h', -232, 234),
Gareth Rees3913bad2017-07-21 11:35:33 +01001980 ('q', 2 ** 33, 2 ** 34),
Benjamin Petersone711caf2008-06-11 16:44:04 +00001981 ('c', latin('x'), latin('y'))
1982 ]
1983
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001984 def setUp(self):
1985 if not HAS_SHAREDCTYPES:
1986 self.skipTest("requires multiprocessing.sharedctypes")
1987
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001988 @classmethod
1989 def _test(cls, values):
1990 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001991 sv.value = cv[2]
1992
1993
1994 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001995 if raw:
1996 values = [self.RawValue(code, value)
1997 for code, value, _ in self.codes_values]
1998 else:
1999 values = [self.Value(code, value)
2000 for code, value, _ in self.codes_values]
2001
2002 for sv, cv in zip(values, self.codes_values):
2003 self.assertEqual(sv.value, cv[1])
2004
2005 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002006 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002007 proc.start()
2008 proc.join()
2009
2010 for sv, cv in zip(values, self.codes_values):
2011 self.assertEqual(sv.value, cv[2])
2012
2013 def test_rawvalue(self):
2014 self.test_value(raw=True)
2015
2016 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002017 val1 = self.Value('i', 5)
2018 lock1 = val1.get_lock()
2019 obj1 = val1.get_obj()
2020
2021 val2 = self.Value('i', 5, lock=None)
2022 lock2 = val2.get_lock()
2023 obj2 = val2.get_obj()
2024
2025 lock = self.Lock()
2026 val3 = self.Value('i', 5, lock=lock)
2027 lock3 = val3.get_lock()
2028 obj3 = val3.get_obj()
2029 self.assertEqual(lock, lock3)
2030
Jesse Nollerb0516a62009-01-18 03:11:38 +00002031 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002032 self.assertFalse(hasattr(arr4, 'get_lock'))
2033 self.assertFalse(hasattr(arr4, 'get_obj'))
2034
Jesse Nollerb0516a62009-01-18 03:11:38 +00002035 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
2036
2037 arr5 = self.RawValue('i', 5)
2038 self.assertFalse(hasattr(arr5, 'get_lock'))
2039 self.assertFalse(hasattr(arr5, 'get_obj'))
2040
Benjamin Petersone711caf2008-06-11 16:44:04 +00002041
2042class _TestArray(BaseTestCase):
2043
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002044 ALLOWED_TYPES = ('processes',)
2045
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002046 @classmethod
2047 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002048 for i in range(1, len(seq)):
2049 seq[i] += seq[i-1]
2050
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002051 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002052 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002053 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
2054 if raw:
2055 arr = self.RawArray('i', seq)
2056 else:
2057 arr = self.Array('i', seq)
2058
2059 self.assertEqual(len(arr), len(seq))
2060 self.assertEqual(arr[3], seq[3])
2061 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
2062
2063 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
2064
2065 self.assertEqual(list(arr[:]), seq)
2066
2067 self.f(seq)
2068
2069 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002070 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002071 p.start()
2072 p.join()
2073
2074 self.assertEqual(list(arr[:]), seq)
2075
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002076 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00002077 def test_array_from_size(self):
2078 size = 10
2079 # Test for zeroing (see issue #11675).
2080 # The repetition below strengthens the test by increasing the chances
2081 # of previously allocated non-zero memory being used for the new array
2082 # on the 2nd and 3rd loops.
2083 for _ in range(3):
2084 arr = self.Array('i', size)
2085 self.assertEqual(len(arr), size)
2086 self.assertEqual(list(arr), [0] * size)
2087 arr[:] = range(10)
2088 self.assertEqual(list(arr), list(range(10)))
2089 del arr
2090
2091 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002092 def test_rawarray(self):
2093 self.test_array(raw=True)
2094
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002095 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002096 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002097 arr1 = self.Array('i', list(range(10)))
2098 lock1 = arr1.get_lock()
2099 obj1 = arr1.get_obj()
2100
2101 arr2 = self.Array('i', list(range(10)), lock=None)
2102 lock2 = arr2.get_lock()
2103 obj2 = arr2.get_obj()
2104
2105 lock = self.Lock()
2106 arr3 = self.Array('i', list(range(10)), lock=lock)
2107 lock3 = arr3.get_lock()
2108 obj3 = arr3.get_obj()
2109 self.assertEqual(lock, lock3)
2110
Jesse Nollerb0516a62009-01-18 03:11:38 +00002111 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002112 self.assertFalse(hasattr(arr4, 'get_lock'))
2113 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00002114 self.assertRaises(AttributeError,
2115 self.Array, 'i', range(10), lock='notalock')
2116
2117 arr5 = self.RawArray('i', range(10))
2118 self.assertFalse(hasattr(arr5, 'get_lock'))
2119 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002120
2121#
2122#
2123#
2124
2125class _TestContainers(BaseTestCase):
2126
2127 ALLOWED_TYPES = ('manager',)
2128
2129 def test_list(self):
2130 a = self.list(list(range(10)))
2131 self.assertEqual(a[:], list(range(10)))
2132
2133 b = self.list()
2134 self.assertEqual(b[:], [])
2135
2136 b.extend(list(range(5)))
2137 self.assertEqual(b[:], list(range(5)))
2138
2139 self.assertEqual(b[2], 2)
2140 self.assertEqual(b[2:10], [2,3,4])
2141
2142 b *= 2
2143 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2144
2145 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2146
2147 self.assertEqual(a[:], list(range(10)))
2148
2149 d = [a, b]
2150 e = self.list(d)
2151 self.assertEqual(
Davin Potts86a76682016-09-07 18:48:01 -05002152 [element[:] for element in e],
Benjamin Petersone711caf2008-06-11 16:44:04 +00002153 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2154 )
2155
2156 f = self.list([a])
2157 a.append('hello')
Davin Potts86a76682016-09-07 18:48:01 -05002158 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2159
Serhiy Storchakae0e50652018-09-17 14:24:01 +03002160 def test_list_iter(self):
2161 a = self.list(list(range(10)))
2162 it = iter(a)
2163 self.assertEqual(list(it), list(range(10)))
2164 self.assertEqual(list(it), []) # exhausted
2165 # list modified during iteration
2166 it = iter(a)
2167 a[0] = 100
2168 self.assertEqual(next(it), 100)
2169
Davin Potts86a76682016-09-07 18:48:01 -05002170 def test_list_proxy_in_list(self):
2171 a = self.list([self.list(range(3)) for _i in range(3)])
2172 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2173
2174 a[0][-1] = 55
2175 self.assertEqual(a[0][:], [0, 1, 55])
2176 for i in range(1, 3):
2177 self.assertEqual(a[i][:], [0, 1, 2])
2178
2179 self.assertEqual(a[1].pop(), 2)
2180 self.assertEqual(len(a[1]), 2)
2181 for i in range(0, 3, 2):
2182 self.assertEqual(len(a[i]), 3)
2183
2184 del a
2185
2186 b = self.list()
2187 b.append(b)
2188 del b
Benjamin Petersone711caf2008-06-11 16:44:04 +00002189
2190 def test_dict(self):
2191 d = self.dict()
2192 indices = list(range(65, 70))
2193 for i in indices:
2194 d[i] = chr(i)
2195 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2196 self.assertEqual(sorted(d.keys()), indices)
2197 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2198 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2199
Serhiy Storchakae0e50652018-09-17 14:24:01 +03002200 def test_dict_iter(self):
2201 d = self.dict()
2202 indices = list(range(65, 70))
2203 for i in indices:
2204 d[i] = chr(i)
2205 it = iter(d)
2206 self.assertEqual(list(it), indices)
2207 self.assertEqual(list(it), []) # exhausted
2208 # dictionary changed size during iteration
2209 it = iter(d)
2210 d.clear()
2211 self.assertRaises(RuntimeError, next, it)
2212
Davin Potts86a76682016-09-07 18:48:01 -05002213 def test_dict_proxy_nested(self):
2214 pets = self.dict(ferrets=2, hamsters=4)
2215 supplies = self.dict(water=10, feed=3)
2216 d = self.dict(pets=pets, supplies=supplies)
2217
2218 self.assertEqual(supplies['water'], 10)
2219 self.assertEqual(d['supplies']['water'], 10)
2220
2221 d['supplies']['blankets'] = 5
2222 self.assertEqual(supplies['blankets'], 5)
2223 self.assertEqual(d['supplies']['blankets'], 5)
2224
2225 d['supplies']['water'] = 7
2226 self.assertEqual(supplies['water'], 7)
2227 self.assertEqual(d['supplies']['water'], 7)
2228
2229 del pets
2230 del supplies
2231 self.assertEqual(d['pets']['ferrets'], 2)
2232 d['supplies']['blankets'] = 11
2233 self.assertEqual(d['supplies']['blankets'], 11)
2234
2235 pets = d['pets']
2236 supplies = d['supplies']
2237 supplies['water'] = 7
2238 self.assertEqual(supplies['water'], 7)
2239 self.assertEqual(d['supplies']['water'], 7)
2240
2241 d.clear()
2242 self.assertEqual(len(d), 0)
2243 self.assertEqual(supplies['water'], 7)
2244 self.assertEqual(pets['hamsters'], 4)
2245
2246 l = self.list([pets, supplies])
2247 l[0]['marmots'] = 1
2248 self.assertEqual(pets['marmots'], 1)
2249 self.assertEqual(l[0]['marmots'], 1)
2250
2251 del pets
2252 del supplies
2253 self.assertEqual(l[0]['marmots'], 1)
2254
2255 outer = self.list([[88, 99], l])
2256 self.assertIsInstance(outer[0], list) # Not a ListProxy
2257 self.assertEqual(outer[-1][-1]['feed'], 3)
2258
Benjamin Petersone711caf2008-06-11 16:44:04 +00002259 def test_namespace(self):
2260 n = self.Namespace()
2261 n.name = 'Bob'
2262 n.job = 'Builder'
2263 n._hidden = 'hidden'
2264 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2265 del n.job
2266 self.assertEqual(str(n), "Namespace(name='Bob')")
2267 self.assertTrue(hasattr(n, 'name'))
2268 self.assertTrue(not hasattr(n, 'job'))
2269
2270#
2271#
2272#
2273
2274def sqr(x, wait=0.0):
2275 time.sleep(wait)
2276 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00002277
Antoine Pitroude911b22011-12-21 11:03:24 +01002278def mul(x, y):
2279 return x*y
2280
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002281def raise_large_valuerror(wait):
2282 time.sleep(wait)
2283 raise ValueError("x" * 1024**2)
2284
Antoine Pitrou89889452017-03-24 13:52:11 +01002285def identity(x):
2286 return x
2287
2288class CountedObject(object):
2289 n_instances = 0
2290
2291 def __new__(cls):
2292 cls.n_instances += 1
2293 return object.__new__(cls)
2294
2295 def __del__(self):
2296 type(self).n_instances -= 1
2297
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002298class SayWhenError(ValueError): pass
2299
2300def exception_throwing_generator(total, when):
Xiang Zhang794623b2017-03-29 11:58:54 +08002301 if when == -1:
2302 raise SayWhenError("Somebody said when")
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002303 for i in range(total):
2304 if i == when:
2305 raise SayWhenError("Somebody said when")
2306 yield i
2307
Antoine Pitrou89889452017-03-24 13:52:11 +01002308
Benjamin Petersone711caf2008-06-11 16:44:04 +00002309class _TestPool(BaseTestCase):
2310
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002311 @classmethod
2312 def setUpClass(cls):
2313 super().setUpClass()
2314 cls.pool = cls.Pool(4)
2315
2316 @classmethod
2317 def tearDownClass(cls):
2318 cls.pool.terminate()
2319 cls.pool.join()
2320 cls.pool = None
2321 super().tearDownClass()
2322
Benjamin Petersone711caf2008-06-11 16:44:04 +00002323 def test_apply(self):
2324 papply = self.pool.apply
2325 self.assertEqual(papply(sqr, (5,)), sqr(5))
2326 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2327
2328 def test_map(self):
2329 pmap = self.pool.map
2330 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2331 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2332 list(map(sqr, list(range(100)))))
2333
Antoine Pitroude911b22011-12-21 11:03:24 +01002334 def test_starmap(self):
2335 psmap = self.pool.starmap
2336 tuples = list(zip(range(10), range(9,-1, -1)))
2337 self.assertEqual(psmap(mul, tuples),
2338 list(itertools.starmap(mul, tuples)))
2339 tuples = list(zip(range(100), range(99,-1, -1)))
2340 self.assertEqual(psmap(mul, tuples, chunksize=20),
2341 list(itertools.starmap(mul, tuples)))
2342
2343 def test_starmap_async(self):
2344 tuples = list(zip(range(100), range(99,-1, -1)))
2345 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2346 list(itertools.starmap(mul, tuples)))
2347
Hynek Schlawack254af262012-10-27 12:53:02 +02002348 def test_map_async(self):
2349 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2350 list(map(sqr, list(range(10)))))
2351
2352 def test_map_async_callbacks(self):
2353 call_args = self.manager.list() if self.TYPE == 'manager' else []
2354 self.pool.map_async(int, ['1'],
2355 callback=call_args.append,
2356 error_callback=call_args.append).wait()
2357 self.assertEqual(1, len(call_args))
2358 self.assertEqual([1], call_args[0])
2359 self.pool.map_async(int, ['a'],
2360 callback=call_args.append,
2361 error_callback=call_args.append).wait()
2362 self.assertEqual(2, len(call_args))
2363 self.assertIsInstance(call_args[1], ValueError)
2364
Richard Oudkerke90cedb2013-10-28 23:11:58 +00002365 def test_map_unplicklable(self):
2366 # Issue #19425 -- failure to pickle should not cause a hang
2367 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002368 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerke90cedb2013-10-28 23:11:58 +00002369 class A(object):
2370 def __reduce__(self):
2371 raise RuntimeError('cannot pickle')
2372 with self.assertRaises(RuntimeError):
2373 self.pool.map(sqr, [A()]*10)
2374
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00002375 def test_map_chunksize(self):
2376 try:
2377 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2378 except multiprocessing.TimeoutError:
2379 self.fail("pool.map_async with chunksize stalled on null list")
2380
Xiang Zhang794623b2017-03-29 11:58:54 +08002381 def test_map_handle_iterable_exception(self):
2382 if self.TYPE == 'manager':
2383 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2384
2385 # SayWhenError seen at the very first of the iterable
2386 with self.assertRaises(SayWhenError):
2387 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2388 # again, make sure it's reentrant
2389 with self.assertRaises(SayWhenError):
2390 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2391
2392 with self.assertRaises(SayWhenError):
2393 self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2394
2395 class SpecialIterable:
2396 def __iter__(self):
2397 return self
2398 def __next__(self):
2399 raise SayWhenError
2400 def __len__(self):
2401 return 1
2402 with self.assertRaises(SayWhenError):
2403 self.pool.map(sqr, SpecialIterable(), 1)
2404 with self.assertRaises(SayWhenError):
2405 self.pool.map(sqr, SpecialIterable(), 1)
2406
Benjamin Petersone711caf2008-06-11 16:44:04 +00002407 def test_async(self):
2408 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2409 get = TimingWrapper(res.get)
2410 self.assertEqual(get(), 49)
2411 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2412
2413 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00002414 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002415 get = TimingWrapper(res.get)
2416 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2417 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2418
2419 def test_imap(self):
2420 it = self.pool.imap(sqr, list(range(10)))
2421 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2422
2423 it = self.pool.imap(sqr, list(range(10)))
2424 for i in range(10):
2425 self.assertEqual(next(it), i*i)
2426 self.assertRaises(StopIteration, it.__next__)
2427
2428 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2429 for i in range(1000):
2430 self.assertEqual(next(it), i*i)
2431 self.assertRaises(StopIteration, it.__next__)
2432
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002433 def test_imap_handle_iterable_exception(self):
2434 if self.TYPE == 'manager':
2435 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2436
Xiang Zhang794623b2017-03-29 11:58:54 +08002437 # SayWhenError seen at the very first of the iterable
2438 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2439 self.assertRaises(SayWhenError, it.__next__)
2440 # again, make sure it's reentrant
2441 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2442 self.assertRaises(SayWhenError, it.__next__)
2443
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002444 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2445 for i in range(3):
2446 self.assertEqual(next(it), i*i)
2447 self.assertRaises(SayWhenError, it.__next__)
2448
2449 # SayWhenError seen at start of problematic chunk's results
2450 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2451 for i in range(6):
2452 self.assertEqual(next(it), i*i)
2453 self.assertRaises(SayWhenError, it.__next__)
2454 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2455 for i in range(4):
2456 self.assertEqual(next(it), i*i)
2457 self.assertRaises(SayWhenError, it.__next__)
2458
Benjamin Petersone711caf2008-06-11 16:44:04 +00002459 def test_imap_unordered(self):
Victor Stinner23401fb2018-07-03 13:20:35 +02002460 it = self.pool.imap_unordered(sqr, list(range(10)))
2461 self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002462
Victor Stinner23401fb2018-07-03 13:20:35 +02002463 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002464 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2465
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002466 def test_imap_unordered_handle_iterable_exception(self):
2467 if self.TYPE == 'manager':
2468 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2469
Xiang Zhang794623b2017-03-29 11:58:54 +08002470 # SayWhenError seen at the very first of the iterable
2471 it = self.pool.imap_unordered(sqr,
2472 exception_throwing_generator(1, -1),
2473 1)
2474 self.assertRaises(SayWhenError, it.__next__)
2475 # again, make sure it's reentrant
2476 it = self.pool.imap_unordered(sqr,
2477 exception_throwing_generator(1, -1),
2478 1)
2479 self.assertRaises(SayWhenError, it.__next__)
2480
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002481 it = self.pool.imap_unordered(sqr,
2482 exception_throwing_generator(10, 3),
2483 1)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002484 expected_values = list(map(sqr, list(range(10))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002485 with self.assertRaises(SayWhenError):
2486 # imap_unordered makes it difficult to anticipate the SayWhenError
2487 for i in range(10):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002488 value = next(it)
2489 self.assertIn(value, expected_values)
2490 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002491
2492 it = self.pool.imap_unordered(sqr,
2493 exception_throwing_generator(20, 7),
2494 2)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002495 expected_values = list(map(sqr, list(range(20))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002496 with self.assertRaises(SayWhenError):
2497 for i in range(20):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002498 value = next(it)
2499 self.assertIn(value, expected_values)
2500 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002501
Benjamin Petersone711caf2008-06-11 16:44:04 +00002502 def test_make_pool(self):
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002503 expected_error = (RemoteError if self.TYPE == 'manager'
2504 else ValueError)
Victor Stinner2fae27b2011-06-20 17:53:35 +02002505
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002506 self.assertRaises(expected_error, self.Pool, -1)
2507 self.assertRaises(expected_error, self.Pool, 0)
2508
2509 if self.TYPE != 'manager':
2510 p = self.Pool(3)
2511 try:
2512 self.assertEqual(3, len(p._pool))
2513 finally:
2514 p.close()
2515 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002516
2517 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002518 result = self.pool.map_async(
2519 time.sleep, [0.1 for i in range(10000)], chunksize=1
2520 )
2521 self.pool.terminate()
2522 join = TimingWrapper(self.pool.join)
2523 join()
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002524 # Sanity check the pool didn't wait for all tasks to finish
2525 self.assertLess(join.elapsed, 2.0)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002526
Richard Oudkerke41682b2012-06-06 19:04:57 +01002527 def test_empty_iterable(self):
2528 # See Issue 12157
2529 p = self.Pool(1)
2530
2531 self.assertEqual(p.map(sqr, []), [])
2532 self.assertEqual(list(p.imap(sqr, [])), [])
2533 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2534 self.assertEqual(p.map_async(sqr, []).get(), [])
2535
2536 p.close()
2537 p.join()
2538
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002539 def test_context(self):
2540 if self.TYPE == 'processes':
2541 L = list(range(10))
2542 expected = [sqr(i) for i in L]
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002543 with self.Pool(2) as p:
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002544 r = p.map_async(sqr, L)
2545 self.assertEqual(r.get(), expected)
Victor Stinner388c8c22018-12-06 11:56:52 +01002546 p.join()
Benjamin Peterson3095f472012-09-25 12:45:42 -04002547 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002548
Richard Oudkerk85757832013-05-06 11:38:25 +01002549 @classmethod
2550 def _test_traceback(cls):
2551 raise RuntimeError(123) # some comment
2552
2553 def test_traceback(self):
2554 # We want ensure that the traceback from the child process is
2555 # contained in the traceback raised in the main process.
2556 if self.TYPE == 'processes':
2557 with self.Pool(1) as p:
2558 try:
2559 p.apply(self._test_traceback)
2560 except Exception as e:
2561 exc = e
2562 else:
Xiang Zhang794623b2017-03-29 11:58:54 +08002563 self.fail('expected RuntimeError')
Victor Stinner388c8c22018-12-06 11:56:52 +01002564 p.join()
Richard Oudkerk85757832013-05-06 11:38:25 +01002565 self.assertIs(type(exc), RuntimeError)
2566 self.assertEqual(exc.args, (123,))
2567 cause = exc.__cause__
2568 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2569 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2570
2571 with test.support.captured_stderr() as f1:
2572 try:
2573 raise exc
2574 except RuntimeError:
2575 sys.excepthook(*sys.exc_info())
2576 self.assertIn('raise RuntimeError(123) # some comment',
2577 f1.getvalue())
Xiang Zhang794623b2017-03-29 11:58:54 +08002578 # _helper_reraises_exception should not make the error
2579 # a remote exception
2580 with self.Pool(1) as p:
2581 try:
2582 p.map(sqr, exception_throwing_generator(1, -1), 1)
2583 except Exception as e:
2584 exc = e
2585 else:
2586 self.fail('expected SayWhenError')
2587 self.assertIs(type(exc), SayWhenError)
2588 self.assertIs(exc.__cause__, None)
Victor Stinner388c8c22018-12-06 11:56:52 +01002589 p.join()
Richard Oudkerk85757832013-05-06 11:38:25 +01002590
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002591 @classmethod
2592 def _test_wrapped_exception(cls):
2593 raise RuntimeError('foo')
2594
2595 def test_wrapped_exception(self):
2596 # Issue #20980: Should not wrap exception when using thread pool
2597 with self.Pool(1) as p:
2598 with self.assertRaises(RuntimeError):
2599 p.apply(self._test_wrapped_exception)
Victor Stinnerb7278732018-11-28 01:14:31 +01002600 p.join()
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002601
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002602 def test_map_no_failfast(self):
2603 # Issue #23992: the fail-fast behaviour when an exception is raised
2604 # during map() would make Pool.join() deadlock, because a worker
2605 # process would fill the result queue (after the result handler thread
2606 # terminated, hence not draining it anymore).
2607
Victor Stinner2cf4c202018-12-17 09:36:36 +01002608 t_start = time.monotonic()
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002609
2610 with self.assertRaises(ValueError):
2611 with self.Pool(2) as p:
2612 try:
2613 p.map(raise_large_valuerror, [0, 1])
2614 finally:
2615 time.sleep(0.5)
2616 p.close()
2617 p.join()
2618
2619 # check that we indeed waited for all jobs
Victor Stinner2cf4c202018-12-17 09:36:36 +01002620 self.assertGreater(time.monotonic() - t_start, 0.9)
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002621
Antoine Pitrou89889452017-03-24 13:52:11 +01002622 def test_release_task_refs(self):
2623 # Issue #29861: task arguments and results should not be kept
2624 # alive after we are done with them.
2625 objs = [CountedObject() for i in range(10)]
2626 refs = [weakref.ref(o) for o in objs]
2627 self.pool.map(identity, objs)
2628
2629 del objs
Antoine Pitrou685cdb92017-04-14 13:10:00 +02002630 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou89889452017-03-24 13:52:11 +01002631 self.assertEqual(set(wr() for wr in refs), {None})
2632 # With a process pool, copies of the objects are returned, check
2633 # they were released too.
2634 self.assertEqual(CountedObject.n_instances, 0)
2635
Victor Stinner08c2ba02018-12-13 02:15:30 +01002636 def test_enter(self):
2637 if self.TYPE == 'manager':
2638 self.skipTest("test not applicable to manager")
2639
2640 pool = self.Pool(1)
2641 with pool:
2642 pass
2643 # call pool.terminate()
2644 # pool is no longer running
2645
2646 with self.assertRaises(ValueError):
2647 # bpo-35477: pool.__enter__() fails if the pool is not running
2648 with pool:
2649 pass
2650 pool.join()
2651
Victor Stinner9a8d1d72018-12-20 20:33:51 +01002652 def test_resource_warning(self):
2653 if self.TYPE == 'manager':
2654 self.skipTest("test not applicable to manager")
2655
2656 pool = self.Pool(1)
2657 pool.terminate()
2658 pool.join()
2659
2660 # force state to RUN to emit ResourceWarning in __del__()
2661 pool._state = multiprocessing.pool.RUN
2662
2663 with support.check_warnings(('unclosed running multiprocessing pool',
2664 ResourceWarning)):
2665 pool = None
2666 support.gc_collect()
2667
Ask Solem2afcbf22010-11-09 20:55:52 +00002668def raising():
2669 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00002670
Ask Solem2afcbf22010-11-09 20:55:52 +00002671def unpickleable_result():
2672 return lambda: 42
2673
2674class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00002675 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00002676
2677 def test_async_error_callback(self):
2678 p = multiprocessing.Pool(2)
2679
2680 scratchpad = [None]
2681 def errback(exc):
2682 scratchpad[0] = exc
2683
2684 res = p.apply_async(raising, error_callback=errback)
2685 self.assertRaises(KeyError, res.get)
2686 self.assertTrue(scratchpad[0])
2687 self.assertIsInstance(scratchpad[0], KeyError)
2688
2689 p.close()
2690 p.join()
2691
2692 def test_unpickleable_result(self):
2693 from multiprocessing.pool import MaybeEncodingError
2694 p = multiprocessing.Pool(2)
2695
2696 # Make sure we don't lose pool processes because of encoding errors.
2697 for iteration in range(20):
2698
2699 scratchpad = [None]
2700 def errback(exc):
2701 scratchpad[0] = exc
2702
2703 res = p.apply_async(unpickleable_result, error_callback=errback)
2704 self.assertRaises(MaybeEncodingError, res.get)
2705 wrapped = scratchpad[0]
2706 self.assertTrue(wrapped)
2707 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2708 self.assertIsNotNone(wrapped.exc)
2709 self.assertIsNotNone(wrapped.value)
2710
2711 p.close()
2712 p.join()
2713
2714class _TestPoolWorkerLifetime(BaseTestCase):
2715 ALLOWED_TYPES = ('processes', )
2716
Jesse Noller1f0b6582010-01-27 03:36:01 +00002717 def test_pool_worker_lifetime(self):
2718 p = multiprocessing.Pool(3, maxtasksperchild=10)
2719 self.assertEqual(3, len(p._pool))
2720 origworkerpids = [w.pid for w in p._pool]
2721 # Run many tasks so each worker gets replaced (hopefully)
2722 results = []
2723 for i in range(100):
2724 results.append(p.apply_async(sqr, (i, )))
2725 # Fetch the results and verify we got the right answers,
2726 # also ensuring all the tasks have completed.
2727 for (j, res) in enumerate(results):
2728 self.assertEqual(res.get(), sqr(j))
2729 # Refill the pool
2730 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00002731 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02002732 # (countdown * DELTA = 5 seconds max startup process time)
2733 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00002734 while countdown and not all(w.is_alive() for w in p._pool):
2735 countdown -= 1
2736 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002737 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00002738 # All pids should be assigned. See issue #7805.
2739 self.assertNotIn(None, origworkerpids)
2740 self.assertNotIn(None, finalworkerpids)
2741 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00002742 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2743 p.close()
2744 p.join()
2745
Charles-François Natalif8859e12011-10-24 18:45:29 +02002746 def test_pool_worker_lifetime_early_close(self):
2747 # Issue #10332: closing a pool whose workers have limited lifetimes
2748 # before all the tasks completed would make join() hang.
2749 p = multiprocessing.Pool(3, maxtasksperchild=1)
2750 results = []
2751 for i in range(6):
2752 results.append(p.apply_async(sqr, (i, 0.3)))
2753 p.close()
2754 p.join()
2755 # check the results
2756 for (j, res) in enumerate(results):
2757 self.assertEqual(res.get(), sqr(j))
2758
Benjamin Petersone711caf2008-06-11 16:44:04 +00002759#
2760# Test of creating a customized manager class
2761#
2762
2763from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2764
2765class FooBar(object):
2766 def f(self):
2767 return 'f()'
2768 def g(self):
2769 raise ValueError
2770 def _h(self):
2771 return '_h()'
2772
2773def baz():
2774 for i in range(10):
2775 yield i*i
2776
2777class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00002778 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002779 def __iter__(self):
2780 return self
2781 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002782 return self._callmethod('__next__')
2783
2784class MyManager(BaseManager):
2785 pass
2786
2787MyManager.register('Foo', callable=FooBar)
2788MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2789MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2790
2791
2792class _TestMyManager(BaseTestCase):
2793
2794 ALLOWED_TYPES = ('manager',)
2795
2796 def test_mymanager(self):
2797 manager = MyManager()
2798 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01002799 self.common(manager)
2800 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002801
Miss Islington (bot)081641f2019-09-24 05:39:47 -07002802 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2803 # to the manager process if it takes longer than 1 second to stop,
2804 # which happens on slow buildbots.
2805 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
Richard Oudkerkac385712012-06-18 21:29:30 +01002806
2807 def test_mymanager_context(self):
2808 with MyManager() as manager:
2809 self.common(manager)
Victor Stinnerfbd71722018-06-27 18:18:10 +02002810 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
Miss Islington (bot)081641f2019-09-24 05:39:47 -07002811 # to the manager process if it takes longer than 1 second to stop,
2812 # which happens on slow buildbots.
Victor Stinnerfbd71722018-06-27 18:18:10 +02002813 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
Richard Oudkerkac385712012-06-18 21:29:30 +01002814
2815 def test_mymanager_context_prestarted(self):
2816 manager = MyManager()
2817 manager.start()
2818 with manager:
2819 self.common(manager)
2820 self.assertEqual(manager._process.exitcode, 0)
2821
2822 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002823 foo = manager.Foo()
2824 bar = manager.Bar()
2825 baz = manager.baz()
2826
2827 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2828 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2829
2830 self.assertEqual(foo_methods, ['f', 'g'])
2831 self.assertEqual(bar_methods, ['f', '_h'])
2832
2833 self.assertEqual(foo.f(), 'f()')
2834 self.assertRaises(ValueError, foo.g)
2835 self.assertEqual(foo._callmethod('f'), 'f()')
2836 self.assertRaises(RemoteError, foo._callmethod, '_h')
2837
2838 self.assertEqual(bar.f(), 'f()')
2839 self.assertEqual(bar._h(), '_h()')
2840 self.assertEqual(bar._callmethod('f'), 'f()')
2841 self.assertEqual(bar._callmethod('_h'), '_h()')
2842
2843 self.assertEqual(list(baz), [i*i for i in range(10)])
2844
Richard Oudkerk73d9a292012-06-14 15:30:10 +01002845
Benjamin Petersone711caf2008-06-11 16:44:04 +00002846#
2847# Test of connecting to a remote server and using xmlrpclib for serialization
2848#
2849
2850_queue = pyqueue.Queue()
2851def get_queue():
2852 return _queue
2853
2854class QueueManager(BaseManager):
2855 '''manager class used by server process'''
2856QueueManager.register('get_queue', callable=get_queue)
2857
2858class QueueManager2(BaseManager):
2859 '''manager class which specifies the same interface as QueueManager'''
2860QueueManager2.register('get_queue')
2861
2862
2863SERIALIZER = 'xmlrpclib'
2864
2865class _TestRemoteManager(BaseTestCase):
2866
2867 ALLOWED_TYPES = ('manager',)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002868 values = ['hello world', None, True, 2.25,
2869 'hall\xe5 v\xe4rlden',
2870 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2871 b'hall\xe5 v\xe4rlden',
2872 ]
2873 result = values[:]
Benjamin Petersone711caf2008-06-11 16:44:04 +00002874
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002875 @classmethod
2876 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002877 manager = QueueManager2(
2878 address=address, authkey=authkey, serializer=SERIALIZER
2879 )
2880 manager.connect()
2881 queue = manager.get_queue()
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002882 # Note that xmlrpclib will deserialize object as a list not a tuple
2883 queue.put(tuple(cls.values))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002884
2885 def test_remote(self):
2886 authkey = os.urandom(32)
2887
2888 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002889 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00002890 )
2891 manager.start()
Pablo Galindo7b2a37b2019-02-09 17:35:05 +00002892 self.addCleanup(manager.shutdown)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002893
2894 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002895 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002896 p.start()
2897
2898 manager2 = QueueManager2(
2899 address=manager.address, authkey=authkey, serializer=SERIALIZER
2900 )
2901 manager2.connect()
2902 queue = manager2.get_queue()
2903
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002904 self.assertEqual(queue.get(), self.result)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002905
2906 # Because we are using xmlrpclib for serialization instead of
2907 # pickle this will cause a serialization error.
2908 self.assertRaises(Exception, queue.put, time.sleep)
2909
2910 # Make queue finalizer run before the server is stopped
2911 del queue
Benjamin Petersone711caf2008-06-11 16:44:04 +00002912
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002913class _TestManagerRestart(BaseTestCase):
2914
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002915 @classmethod
2916 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002917 manager = QueueManager(
2918 address=address, authkey=authkey, serializer=SERIALIZER)
2919 manager.connect()
2920 queue = manager.get_queue()
2921 queue.put('hello world')
2922
2923 def test_rapid_restart(self):
2924 authkey = os.urandom(32)
2925 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002926 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Pablo Galindo7b2a37b2019-02-09 17:35:05 +00002927 try:
2928 srvr = manager.get_server()
2929 addr = srvr.address
2930 # Close the connection.Listener socket which gets opened as a part
2931 # of manager.get_server(). It's not needed for the test.
2932 srvr.listener.close()
2933 manager.start()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002934
Pablo Galindo7b2a37b2019-02-09 17:35:05 +00002935 p = self.Process(target=self._putter, args=(manager.address, authkey))
2936 p.start()
2937 p.join()
2938 queue = manager.get_queue()
2939 self.assertEqual(queue.get(), 'hello world')
2940 del queue
2941 finally:
2942 if hasattr(manager, "shutdown"):
2943 manager.shutdown()
Victor Stinner17657bb2017-08-16 12:46:04 +02002944
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002945 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002946 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002947 try:
2948 manager.start()
Pablo Galindo7b2a37b2019-02-09 17:35:05 +00002949 self.addCleanup(manager.shutdown)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002950 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002951 if e.errno != errno.EADDRINUSE:
2952 raise
2953 # Retry after some time, in case the old socket was lingering
2954 # (sporadic failure on buildbots)
2955 time.sleep(1.0)
2956 manager = QueueManager(
2957 address=addr, authkey=authkey, serializer=SERIALIZER)
Pablo Galindo7b2a37b2019-02-09 17:35:05 +00002958 if hasattr(manager, "shutdown"):
2959 self.addCleanup(manager.shutdown)
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002960
Benjamin Petersone711caf2008-06-11 16:44:04 +00002961#
2962#
2963#
2964
2965SENTINEL = latin('')
2966
2967class _TestConnection(BaseTestCase):
2968
2969 ALLOWED_TYPES = ('processes', 'threads')
2970
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002971 @classmethod
2972 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002973 for msg in iter(conn.recv_bytes, SENTINEL):
2974 conn.send_bytes(msg)
2975 conn.close()
2976
2977 def test_connection(self):
2978 conn, child_conn = self.Pipe()
2979
2980 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002981 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002982 p.start()
2983
2984 seq = [1, 2.25, None]
2985 msg = latin('hello world')
2986 longmsg = msg * 10
2987 arr = array.array('i', list(range(4)))
2988
2989 if self.TYPE == 'processes':
2990 self.assertEqual(type(conn.fileno()), int)
2991
2992 self.assertEqual(conn.send(seq), None)
2993 self.assertEqual(conn.recv(), seq)
2994
2995 self.assertEqual(conn.send_bytes(msg), None)
2996 self.assertEqual(conn.recv_bytes(), msg)
2997
2998 if self.TYPE == 'processes':
2999 buffer = array.array('i', [0]*10)
3000 expected = list(arr) + [0] * (10 - len(arr))
3001 self.assertEqual(conn.send_bytes(arr), None)
3002 self.assertEqual(conn.recv_bytes_into(buffer),
3003 len(arr) * buffer.itemsize)
3004 self.assertEqual(list(buffer), expected)
3005
3006 buffer = array.array('i', [0]*10)
3007 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
3008 self.assertEqual(conn.send_bytes(arr), None)
3009 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
3010 len(arr) * buffer.itemsize)
3011 self.assertEqual(list(buffer), expected)
3012
3013 buffer = bytearray(latin(' ' * 40))
3014 self.assertEqual(conn.send_bytes(longmsg), None)
3015 try:
3016 res = conn.recv_bytes_into(buffer)
3017 except multiprocessing.BufferTooShort as e:
3018 self.assertEqual(e.args, (longmsg,))
3019 else:
3020 self.fail('expected BufferTooShort, got %s' % res)
3021
3022 poll = TimingWrapper(conn.poll)
3023
3024 self.assertEqual(poll(), False)
3025 self.assertTimingAlmostEqual(poll.elapsed, 0)
3026
Richard Oudkerk59d54042012-05-10 16:11:12 +01003027 self.assertEqual(poll(-1), False)
3028 self.assertTimingAlmostEqual(poll.elapsed, 0)
3029
Benjamin Petersone711caf2008-06-11 16:44:04 +00003030 self.assertEqual(poll(TIMEOUT1), False)
3031 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
3032
3033 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01003034 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003035
3036 self.assertEqual(poll(TIMEOUT1), True)
3037 self.assertTimingAlmostEqual(poll.elapsed, 0)
3038
3039 self.assertEqual(conn.recv(), None)
3040
3041 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
3042 conn.send_bytes(really_big_msg)
3043 self.assertEqual(conn.recv_bytes(), really_big_msg)
3044
3045 conn.send_bytes(SENTINEL) # tell child to quit
3046 child_conn.close()
3047
3048 if self.TYPE == 'processes':
3049 self.assertEqual(conn.readable, True)
3050 self.assertEqual(conn.writable, True)
3051 self.assertRaises(EOFError, conn.recv)
3052 self.assertRaises(EOFError, conn.recv_bytes)
3053
3054 p.join()
3055
3056 def test_duplex_false(self):
3057 reader, writer = self.Pipe(duplex=False)
3058 self.assertEqual(writer.send(1), None)
3059 self.assertEqual(reader.recv(), 1)
3060 if self.TYPE == 'processes':
3061 self.assertEqual(reader.readable, True)
3062 self.assertEqual(reader.writable, False)
3063 self.assertEqual(writer.readable, False)
3064 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003065 self.assertRaises(OSError, reader.send, 2)
3066 self.assertRaises(OSError, writer.recv)
3067 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003068
3069 def test_spawn_close(self):
3070 # We test that a pipe connection can be closed by parent
3071 # process immediately after child is spawned. On Windows this
3072 # would have sometimes failed on old versions because
3073 # child_conn would be closed before the child got a chance to
3074 # duplicate it.
3075 conn, child_conn = self.Pipe()
3076
3077 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003078 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003079 p.start()
3080 child_conn.close() # this might complete before child initializes
3081
3082 msg = latin('hello')
3083 conn.send_bytes(msg)
3084 self.assertEqual(conn.recv_bytes(), msg)
3085
3086 conn.send_bytes(SENTINEL)
3087 conn.close()
3088 p.join()
3089
3090 def test_sendbytes(self):
3091 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06003092 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00003093
3094 msg = latin('abcdefghijklmnopqrstuvwxyz')
3095 a, b = self.Pipe()
3096
3097 a.send_bytes(msg)
3098 self.assertEqual(b.recv_bytes(), msg)
3099
3100 a.send_bytes(msg, 5)
3101 self.assertEqual(b.recv_bytes(), msg[5:])
3102
3103 a.send_bytes(msg, 7, 8)
3104 self.assertEqual(b.recv_bytes(), msg[7:7+8])
3105
3106 a.send_bytes(msg, 26)
3107 self.assertEqual(b.recv_bytes(), latin(''))
3108
3109 a.send_bytes(msg, 26, 0)
3110 self.assertEqual(b.recv_bytes(), latin(''))
3111
3112 self.assertRaises(ValueError, a.send_bytes, msg, 27)
3113
3114 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3115
3116 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3117
3118 self.assertRaises(ValueError, a.send_bytes, msg, -1)
3119
3120 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3121
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003122 @classmethod
3123 def _is_fd_assigned(cls, fd):
3124 try:
3125 os.fstat(fd)
3126 except OSError as e:
3127 if e.errno == errno.EBADF:
3128 return False
3129 raise
3130 else:
3131 return True
3132
3133 @classmethod
3134 def _writefd(cls, conn, data, create_dummy_fds=False):
3135 if create_dummy_fds:
3136 for i in range(0, 256):
3137 if not cls._is_fd_assigned(i):
3138 os.dup2(conn.fileno(), i)
3139 fd = reduction.recv_handle(conn)
3140 if msvcrt:
3141 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3142 os.write(fd, data)
3143 os.close(fd)
3144
Charles-François Natalibc8f0822011-09-20 20:36:51 +02003145 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003146 def test_fd_transfer(self):
3147 if self.TYPE != 'processes':
3148 self.skipTest("only makes sense with processes")
3149 conn, child_conn = self.Pipe(duplex=True)
3150
3151 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02003152 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003153 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02003154 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003155 with open(test.support.TESTFN, "wb") as f:
3156 fd = f.fileno()
3157 if msvcrt:
3158 fd = msvcrt.get_osfhandle(fd)
3159 reduction.send_handle(conn, fd, p.pid)
3160 p.join()
3161 with open(test.support.TESTFN, "rb") as f:
3162 self.assertEqual(f.read(), b"foo")
3163
Charles-François Natalibc8f0822011-09-20 20:36:51 +02003164 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003165 @unittest.skipIf(sys.platform == "win32",
3166 "test semantics don't make sense on Windows")
3167 @unittest.skipIf(MAXFD <= 256,
3168 "largest assignable fd number is too small")
3169 @unittest.skipUnless(hasattr(os, "dup2"),
3170 "test needs os.dup2()")
3171 def test_large_fd_transfer(self):
3172 # With fd > 256 (issue #11657)
3173 if self.TYPE != 'processes':
3174 self.skipTest("only makes sense with processes")
3175 conn, child_conn = self.Pipe(duplex=True)
3176
3177 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02003178 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003179 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02003180 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003181 with open(test.support.TESTFN, "wb") as f:
3182 fd = f.fileno()
3183 for newfd in range(256, MAXFD):
3184 if not self._is_fd_assigned(newfd):
3185 break
3186 else:
3187 self.fail("could not find an unassigned large file descriptor")
3188 os.dup2(fd, newfd)
3189 try:
3190 reduction.send_handle(conn, newfd, p.pid)
3191 finally:
3192 os.close(newfd)
3193 p.join()
3194 with open(test.support.TESTFN, "rb") as f:
3195 self.assertEqual(f.read(), b"bar")
3196
Jesus Cea4507e642011-09-21 03:53:25 +02003197 @classmethod
3198 def _send_data_without_fd(self, conn):
3199 os.write(conn.fileno(), b"\0")
3200
Charles-François Natalie51c8da2011-09-21 18:48:21 +02003201 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02003202 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3203 def test_missing_fd_transfer(self):
3204 # Check that exception is raised when received data is not
3205 # accompanied by a file descriptor in ancillary data.
3206 if self.TYPE != 'processes':
3207 self.skipTest("only makes sense with processes")
3208 conn, child_conn = self.Pipe(duplex=True)
3209
3210 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3211 p.daemon = True
3212 p.start()
3213 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3214 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02003215
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003216 def test_context(self):
3217 a, b = self.Pipe()
3218
3219 with a, b:
3220 a.send(1729)
3221 self.assertEqual(b.recv(), 1729)
3222 if self.TYPE == 'processes':
3223 self.assertFalse(a.closed)
3224 self.assertFalse(b.closed)
3225
3226 if self.TYPE == 'processes':
3227 self.assertTrue(a.closed)
3228 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003229 self.assertRaises(OSError, a.recv)
3230 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003231
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003232class _TestListener(BaseTestCase):
3233
Richard Oudkerk91257752012-06-15 21:53:34 +01003234 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003235
3236 def test_multiple_bind(self):
3237 for family in self.connection.families:
3238 l = self.connection.Listener(family=family)
3239 self.addCleanup(l.close)
3240 self.assertRaises(OSError, self.connection.Listener,
3241 l.address, family)
3242
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003243 def test_context(self):
3244 with self.connection.Listener() as l:
3245 with self.connection.Client(l.address) as c:
3246 with l.accept() as d:
3247 c.send(1729)
3248 self.assertEqual(d.recv(), 1729)
3249
3250 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003251 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01003252
Benjamin Petersone711caf2008-06-11 16:44:04 +00003253class _TestListenerClient(BaseTestCase):
3254
3255 ALLOWED_TYPES = ('processes', 'threads')
3256
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003257 @classmethod
3258 def _test(cls, address):
3259 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003260 conn.send('hello')
3261 conn.close()
3262
3263 def test_listener_client(self):
3264 for family in self.connection.families:
3265 l = self.connection.Listener(family=family)
3266 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00003267 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003268 p.start()
3269 conn = l.accept()
3270 self.assertEqual(conn.recv(), 'hello')
3271 p.join()
3272 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01003273
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01003274 def test_issue14725(self):
3275 l = self.connection.Listener()
3276 p = self.Process(target=self._test, args=(l.address,))
3277 p.daemon = True
3278 p.start()
3279 time.sleep(1)
3280 # On Windows the client process should by now have connected,
3281 # written data and closed the pipe handle by now. This causes
3282 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
3283 # 14725.
3284 conn = l.accept()
3285 self.assertEqual(conn.recv(), 'hello')
3286 conn.close()
3287 p.join()
3288 l.close()
3289
Richard Oudkerked9e06c2013-01-13 22:46:48 +00003290 def test_issue16955(self):
3291 for fam in self.connection.families:
3292 l = self.connection.Listener(family=fam)
3293 c = self.connection.Client(l.address)
3294 a = l.accept()
3295 a.send_bytes(b"hello")
3296 self.assertTrue(c.poll(1))
3297 a.close()
3298 c.close()
3299 l.close()
3300
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003301class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003302
3303 ALLOWED_TYPES = ('processes', 'threads')
3304
3305 def test_empty_string(self):
3306 a, b = self.Pipe()
3307 self.assertEqual(a.poll(), False)
3308 b.send_bytes(b'')
3309 self.assertEqual(a.poll(), True)
3310 self.assertEqual(a.poll(), True)
3311
3312 @classmethod
3313 def _child_strings(cls, conn, strings):
3314 for s in strings:
3315 time.sleep(0.1)
3316 conn.send_bytes(s)
3317 conn.close()
3318
3319 def test_strings(self):
3320 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3321 a, b = self.Pipe()
3322 p = self.Process(target=self._child_strings, args=(b, strings))
3323 p.start()
3324
3325 for s in strings:
3326 for i in range(200):
3327 if a.poll(0.01):
3328 break
3329 x = a.recv_bytes()
3330 self.assertEqual(s, x)
3331
3332 p.join()
3333
3334 @classmethod
3335 def _child_boundaries(cls, r):
3336 # Polling may "pull" a message in to the child process, but we
3337 # don't want it to pull only part of a message, as that would
3338 # corrupt the pipe for any other processes which might later
3339 # read from it.
3340 r.poll(5)
3341
3342 def test_boundaries(self):
3343 r, w = self.Pipe(False)
3344 p = self.Process(target=self._child_boundaries, args=(r,))
3345 p.start()
3346 time.sleep(2)
3347 L = [b"first", b"second"]
3348 for obj in L:
3349 w.send_bytes(obj)
3350 w.close()
3351 p.join()
3352 self.assertIn(r.recv_bytes(), L)
3353
3354 @classmethod
3355 def _child_dont_merge(cls, b):
3356 b.send_bytes(b'a')
3357 b.send_bytes(b'b')
3358 b.send_bytes(b'cd')
3359
3360 def test_dont_merge(self):
3361 a, b = self.Pipe()
3362 self.assertEqual(a.poll(0.0), False)
3363 self.assertEqual(a.poll(0.1), False)
3364
3365 p = self.Process(target=self._child_dont_merge, args=(b,))
3366 p.start()
3367
3368 self.assertEqual(a.recv_bytes(), b'a')
3369 self.assertEqual(a.poll(1.0), True)
3370 self.assertEqual(a.poll(1.0), True)
3371 self.assertEqual(a.recv_bytes(), b'b')
3372 self.assertEqual(a.poll(1.0), True)
3373 self.assertEqual(a.poll(1.0), True)
3374 self.assertEqual(a.poll(0.0), True)
3375 self.assertEqual(a.recv_bytes(), b'cd')
3376
3377 p.join()
3378
Benjamin Petersone711caf2008-06-11 16:44:04 +00003379#
3380# Test of sending connection and socket objects between processes
3381#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003382
3383@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00003384class _TestPicklingConnections(BaseTestCase):
3385
3386 ALLOWED_TYPES = ('processes',)
3387
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003388 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02003389 def tearDownClass(cls):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003390 from multiprocessing import resource_sharer
Victor Stinner11f08072017-09-15 06:55:31 -07003391 resource_sharer.stop(timeout=TIMEOUT)
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02003392
3393 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003394 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003395 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003396 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003397 conn.send(l.address)
3398 new_conn = l.accept()
3399 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003400 new_conn.close()
3401 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003402
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02003403 l = socket.create_server((test.support.HOST, 0))
Richard Oudkerk5d73c172012-05-08 22:24:47 +01003404 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003405 new_conn, addr = l.accept()
3406 conn.send(new_conn)
3407 new_conn.close()
3408 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003409
3410 conn.recv()
3411
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003412 @classmethod
3413 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003414 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003415 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003416 client.send(msg.upper())
3417 client.close()
3418
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003419 address, msg = conn.recv()
3420 client = socket.socket()
3421 client.connect(address)
3422 client.sendall(msg.upper())
3423 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003424
3425 conn.close()
3426
3427 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003428 families = self.connection.families
3429
3430 lconn, lconn0 = self.Pipe()
3431 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02003432 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003433 lp.start()
3434 lconn0.close()
3435
3436 rconn, rconn0 = self.Pipe()
3437 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003438 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003439 rp.start()
3440 rconn0.close()
3441
3442 for fam in families:
3443 msg = ('This connection uses family %s' % fam).encode('ascii')
3444 address = lconn.recv()
3445 rconn.send((address, msg))
3446 new_conn = lconn.recv()
3447 self.assertEqual(new_conn.recv(), msg.upper())
3448
3449 rconn.send(None)
3450
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003451 msg = latin('This connection uses a normal socket')
3452 address = lconn.recv()
3453 rconn.send((address, msg))
3454 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01003455 buf = []
3456 while True:
3457 s = new_conn.recv(100)
3458 if not s:
3459 break
3460 buf.append(s)
3461 buf = b''.join(buf)
3462 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003463 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003464
3465 lconn.send(None)
3466
3467 rconn.close()
3468 lconn.close()
3469
3470 lp.join()
3471 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003472
3473 @classmethod
3474 def child_access(cls, conn):
3475 w = conn.recv()
3476 w.send('all is well')
3477 w.close()
3478
3479 r = conn.recv()
3480 msg = r.recv()
3481 conn.send(msg*2)
3482
3483 conn.close()
3484
3485 def test_access(self):
3486 # On Windows, if we do not specify a destination pid when
3487 # using DupHandle then we need to be careful to use the
3488 # correct access flags for DuplicateHandle(), or else
3489 # DupHandle.detach() will raise PermissionError. For example,
3490 # for a read only pipe handle we should use
3491 # access=FILE_GENERIC_READ. (Unfortunately
3492 # DUPLICATE_SAME_ACCESS does not work.)
3493 conn, child_conn = self.Pipe()
3494 p = self.Process(target=self.child_access, args=(child_conn,))
3495 p.daemon = True
3496 p.start()
3497 child_conn.close()
3498
3499 r, w = self.Pipe(duplex=False)
3500 conn.send(w)
3501 w.close()
3502 self.assertEqual(r.recv(), 'all is well')
3503 r.close()
3504
3505 r, w = self.Pipe(duplex=False)
3506 conn.send(r)
3507 r.close()
3508 w.send('foobar')
3509 w.close()
3510 self.assertEqual(conn.recv(), 'foobar'*2)
3511
Victor Stinnerb4c52962017-07-25 02:40:55 +02003512 p.join()
3513
Benjamin Petersone711caf2008-06-11 16:44:04 +00003514#
3515#
3516#
3517
3518class _TestHeap(BaseTestCase):
3519
3520 ALLOWED_TYPES = ('processes',)
3521
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003522 def setUp(self):
3523 super().setUp()
3524 # Make pristine heap for these tests
3525 self.old_heap = multiprocessing.heap.BufferWrapper._heap
3526 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3527
3528 def tearDown(self):
3529 multiprocessing.heap.BufferWrapper._heap = self.old_heap
3530 super().tearDown()
3531
Benjamin Petersone711caf2008-06-11 16:44:04 +00003532 def test_heap(self):
3533 iterations = 5000
3534 maxblocks = 50
3535 blocks = []
3536
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003537 # get the heap object
3538 heap = multiprocessing.heap.BufferWrapper._heap
3539 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3540
Benjamin Petersone711caf2008-06-11 16:44:04 +00003541 # create and destroy lots of blocks of different sizes
3542 for i in range(iterations):
3543 size = int(random.lognormvariate(0, 1) * 1000)
3544 b = multiprocessing.heap.BufferWrapper(size)
3545 blocks.append(b)
3546 if len(blocks) > maxblocks:
3547 i = random.randrange(maxblocks)
3548 del blocks[i]
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003549 del b
Benjamin Petersone711caf2008-06-11 16:44:04 +00003550
3551 # verify the state of the heap
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003552 with heap._lock:
3553 all = []
3554 free = 0
3555 occupied = 0
3556 for L in list(heap._len_to_seq.values()):
3557 # count all free blocks in arenas
3558 for arena, start, stop in L:
3559 all.append((heap._arenas.index(arena), start, stop,
3560 stop-start, 'free'))
3561 free += (stop-start)
3562 for arena, arena_blocks in heap._allocated_blocks.items():
3563 # count all allocated blocks in arenas
3564 for start, stop in arena_blocks:
3565 all.append((heap._arenas.index(arena), start, stop,
3566 stop-start, 'occupied'))
3567 occupied += (stop-start)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003568
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003569 self.assertEqual(free + occupied,
3570 sum(arena.size for arena in heap._arenas))
Benjamin Petersone711caf2008-06-11 16:44:04 +00003571
Antoine Pitroue4679cd2018-04-09 17:37:55 +02003572 all.sort()
3573
3574 for i in range(len(all)-1):
3575 (arena, start, stop) = all[i][:3]
3576 (narena, nstart, nstop) = all[i+1][:3]
3577 if arena != narena:
3578 # Two different arenas
3579 self.assertEqual(stop, heap._arenas[arena].size) # last block
3580 self.assertEqual(nstart, 0) # first block
3581 else:
3582 # Same arena: two adjacent blocks
3583 self.assertEqual(stop, nstart)
3584
3585 # test free'ing all blocks
3586 random.shuffle(blocks)
3587 while blocks:
3588 blocks.pop()
3589
3590 self.assertEqual(heap._n_frees, heap._n_mallocs)
3591 self.assertEqual(len(heap._pending_free_blocks), 0)
3592 self.assertEqual(len(heap._arenas), 0)
3593 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3594 self.assertEqual(len(heap._len_to_seq), 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003595
Charles-François Natali778db492011-07-02 14:35:49 +02003596 def test_free_from_gc(self):
3597 # Check that freeing of blocks by the garbage collector doesn't deadlock
3598 # (issue #12352).
3599 # Make sure the GC is enabled, and set lower collection thresholds to
3600 # make collections more frequent (and increase the probability of
3601 # deadlock).
3602 if not gc.isenabled():
3603 gc.enable()
3604 self.addCleanup(gc.disable)
3605 thresholds = gc.get_threshold()
3606 self.addCleanup(gc.set_threshold, *thresholds)
3607 gc.set_threshold(10)
3608
3609 # perform numerous block allocations, with cyclic references to make
3610 # sure objects are collected asynchronously by the gc
3611 for i in range(5000):
3612 a = multiprocessing.heap.BufferWrapper(1)
3613 b = multiprocessing.heap.BufferWrapper(1)
3614 # circular references
3615 a.buddy = b
3616 b.buddy = a
3617
Benjamin Petersone711caf2008-06-11 16:44:04 +00003618#
3619#
3620#
3621
Benjamin Petersone711caf2008-06-11 16:44:04 +00003622class _Foo(Structure):
3623 _fields_ = [
3624 ('x', c_int),
Gareth Rees3913bad2017-07-21 11:35:33 +01003625 ('y', c_double),
3626 ('z', c_longlong,)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003627 ]
3628
3629class _TestSharedCTypes(BaseTestCase):
3630
3631 ALLOWED_TYPES = ('processes',)
3632
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00003633 def setUp(self):
3634 if not HAS_SHAREDCTYPES:
3635 self.skipTest("requires multiprocessing.sharedctypes")
3636
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003637 @classmethod
Gareth Rees3913bad2017-07-21 11:35:33 +01003638 def _double(cls, x, y, z, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003639 x.value *= 2
3640 y.value *= 2
Gareth Rees3913bad2017-07-21 11:35:33 +01003641 z.value *= 2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003642 foo.x *= 2
3643 foo.y *= 2
3644 string.value *= 2
3645 for i in range(len(arr)):
3646 arr[i] *= 2
3647
3648 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003649 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003650 y = Value(c_double, 1.0/3.0, lock=lock)
Gareth Rees3913bad2017-07-21 11:35:33 +01003651 z = Value(c_longlong, 2 ** 33, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003652 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003653 arr = self.Array('d', list(range(10)), lock=lock)
3654 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00003655 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00003656
Gareth Rees3913bad2017-07-21 11:35:33 +01003657 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02003658 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003659 p.start()
3660 p.join()
3661
3662 self.assertEqual(x.value, 14)
3663 self.assertAlmostEqual(y.value, 2.0/3.0)
Gareth Rees3913bad2017-07-21 11:35:33 +01003664 self.assertEqual(z.value, 2 ** 34)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003665 self.assertEqual(foo.x, 6)
3666 self.assertAlmostEqual(foo.y, 4.0)
3667 for i in range(10):
3668 self.assertAlmostEqual(arr[i], i*2)
3669 self.assertEqual(string.value, latin('hellohello'))
3670
3671 def test_synchronize(self):
3672 self.test_sharedctypes(lock=True)
3673
3674 def test_copy(self):
Gareth Rees3913bad2017-07-21 11:35:33 +01003675 foo = _Foo(2, 5.0, 2 ** 33)
Brian Curtinafa88b52010-10-07 01:12:19 +00003676 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003677 foo.x = 0
3678 foo.y = 0
Gareth Rees3913bad2017-07-21 11:35:33 +01003679 foo.z = 0
Benjamin Petersone711caf2008-06-11 16:44:04 +00003680 self.assertEqual(bar.x, 2)
3681 self.assertAlmostEqual(bar.y, 5.0)
Gareth Rees3913bad2017-07-21 11:35:33 +01003682 self.assertEqual(bar.z, 2 ** 33)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003683
Davin Pottse895de32019-02-23 22:08:16 -06003684
3685@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
3686class _TestSharedMemory(BaseTestCase):
3687
3688 ALLOWED_TYPES = ('processes',)
3689
3690 @staticmethod
3691 def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data):
3692 if isinstance(shmem_name_or_obj, str):
3693 local_sms = shared_memory.SharedMemory(shmem_name_or_obj)
3694 else:
3695 local_sms = shmem_name_or_obj
3696 local_sms.buf[:len(binary_data)] = binary_data
3697 local_sms.close()
3698
3699 def test_shared_memory_basics(self):
3700 sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512)
3701 self.addCleanup(sms.unlink)
3702
3703 # Verify attributes are readable.
3704 self.assertEqual(sms.name, 'test01_tsmb')
3705 self.assertGreaterEqual(sms.size, 512)
3706 self.assertGreaterEqual(len(sms.buf), sms.size)
3707
3708 # Modify contents of shared memory segment through memoryview.
3709 sms.buf[0] = 42
3710 self.assertEqual(sms.buf[0], 42)
3711
3712 # Attach to existing shared memory segment.
3713 also_sms = shared_memory.SharedMemory('test01_tsmb')
3714 self.assertEqual(also_sms.buf[0], 42)
3715 also_sms.close()
3716
3717 # Attach to existing shared memory segment but specify a new size.
3718 same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size)
3719 self.assertLess(same_sms.size, 20*sms.size) # Size was ignored.
3720 same_sms.close()
3721
3722 if shared_memory._USE_POSIX:
3723 # Posix Shared Memory can only be unlinked once. Here we
3724 # test an implementation detail that is not observed across
3725 # all supported platforms (since WindowsNamedSharedMemory
3726 # manages unlinking on its own and unlink() does nothing).
3727 # True release of shared memory segment does not necessarily
3728 # happen until process exits, depending on the OS platform.
3729 with self.assertRaises(FileNotFoundError):
3730 sms_uno = shared_memory.SharedMemory(
3731 'test01_dblunlink',
3732 create=True,
3733 size=5000
3734 )
3735
3736 try:
3737 self.assertGreaterEqual(sms_uno.size, 5000)
3738
3739 sms_duo = shared_memory.SharedMemory('test01_dblunlink')
3740 sms_duo.unlink() # First shm_unlink() call.
3741 sms_duo.close()
3742 sms_uno.close()
3743
3744 finally:
3745 sms_uno.unlink() # A second shm_unlink() call is bad.
3746
3747 with self.assertRaises(FileExistsError):
3748 # Attempting to create a new shared memory segment with a
3749 # name that is already in use triggers an exception.
3750 there_can_only_be_one_sms = shared_memory.SharedMemory(
3751 'test01_tsmb',
3752 create=True,
3753 size=512
3754 )
3755
3756 if shared_memory._USE_POSIX:
3757 # Requesting creation of a shared memory segment with the option
3758 # to attach to an existing segment, if that name is currently in
3759 # use, should not trigger an exception.
3760 # Note: Using a smaller size could possibly cause truncation of
3761 # the existing segment but is OS platform dependent. In the
3762 # case of MacOS/darwin, requesting a smaller size is disallowed.
3763 class OptionalAttachSharedMemory(shared_memory.SharedMemory):
3764 _flags = os.O_CREAT | os.O_RDWR
3765 ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb')
3766 self.assertEqual(ok_if_exists_sms.size, sms.size)
3767 ok_if_exists_sms.close()
3768
3769 # Attempting to attach to an existing shared memory segment when
3770 # no segment exists with the supplied name triggers an exception.
3771 with self.assertRaises(FileNotFoundError):
3772 nonexisting_sms = shared_memory.SharedMemory('test01_notthere')
3773 nonexisting_sms.unlink() # Error should occur on prior line.
3774
3775 sms.close()
3776
3777 def test_shared_memory_across_processes(self):
3778 sms = shared_memory.SharedMemory('test02_tsmap', True, size=512)
3779 self.addCleanup(sms.unlink)
3780
3781 # Verify remote attachment to existing block by name is working.
3782 p = self.Process(
3783 target=self._attach_existing_shmem_then_write,
3784 args=(sms.name, b'howdy')
3785 )
3786 p.daemon = True
3787 p.start()
3788 p.join()
3789 self.assertEqual(bytes(sms.buf[:5]), b'howdy')
3790
3791 # Verify pickling of SharedMemory instance also works.
3792 p = self.Process(
3793 target=self._attach_existing_shmem_then_write,
3794 args=(sms, b'HELLO')
3795 )
3796 p.daemon = True
3797 p.start()
3798 p.join()
3799 self.assertEqual(bytes(sms.buf[:5]), b'HELLO')
3800
3801 sms.close()
3802
Pierre Glaserd0d64ad2019-05-10 20:42:35 +02003803 @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
3804 def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
3805 # bpo-36368: protect SharedMemoryManager server process from
3806 # KeyboardInterrupt signals.
3807 smm = multiprocessing.managers.SharedMemoryManager()
3808 smm.start()
3809
3810 # make sure the manager works properly at the beginning
3811 sl = smm.ShareableList(range(10))
3812
3813 # the manager's server should ignore KeyboardInterrupt signals, and
3814 # maintain its connection with the current process, and success when
3815 # asked to deliver memory segments.
3816 os.kill(smm._process.pid, signal.SIGINT)
3817
3818 sl2 = smm.ShareableList(range(10))
3819
3820 # test that the custom signal handler registered in the Manager does
3821 # not affect signal handling in the parent process.
3822 with self.assertRaises(KeyboardInterrupt):
3823 os.kill(os.getpid(), signal.SIGINT)
3824
3825 smm.shutdown()
3826
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02003827 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
3828 def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
3829 # bpo-36867: test that a SharedMemoryManager uses the
3830 # same resource_tracker process as its parent.
3831 cmd = '''if 1:
3832 from multiprocessing.managers import SharedMemoryManager
3833
3834
3835 smm = SharedMemoryManager()
3836 smm.start()
3837 sl = smm.ShareableList(range(10))
3838 smm.shutdown()
3839 '''
3840 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
3841
3842 # Before bpo-36867 was fixed, a SharedMemoryManager not using the same
3843 # resource_tracker process as its parent would make the parent's
3844 # tracker complain about sl being leaked even though smm.shutdown()
3845 # properly released sl.
3846 self.assertFalse(err)
3847
Davin Pottse895de32019-02-23 22:08:16 -06003848 def test_shared_memory_SharedMemoryManager_basics(self):
3849 smm1 = multiprocessing.managers.SharedMemoryManager()
3850 with self.assertRaises(ValueError):
3851 smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started
3852 smm1.start()
3853 lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ]
3854 lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ]
3855 doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name)
3856 self.assertEqual(len(doppleganger_list0), 5)
3857 doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name)
3858 self.assertGreaterEqual(len(doppleganger_shm0.buf), 32)
3859 held_name = lom[0].name
3860 smm1.shutdown()
3861 if sys.platform != "win32":
3862 # Calls to unlink() have no effect on Windows platform; shared
3863 # memory will only be released once final process exits.
3864 with self.assertRaises(FileNotFoundError):
3865 # No longer there to be attached to again.
3866 absent_shm = shared_memory.SharedMemory(name=held_name)
3867
3868 with multiprocessing.managers.SharedMemoryManager() as smm2:
3869 sl = smm2.ShareableList("howdy")
3870 shm = smm2.SharedMemory(size=128)
3871 held_name = sl.shm.name
3872 if sys.platform != "win32":
3873 with self.assertRaises(FileNotFoundError):
3874 # No longer there to be attached to again.
3875 absent_sl = shared_memory.ShareableList(name=held_name)
3876
3877
3878 def test_shared_memory_ShareableList_basics(self):
3879 sl = shared_memory.ShareableList(
3880 ['howdy', b'HoWdY', -273.154, 100, None, True, 42]
3881 )
3882 self.addCleanup(sl.shm.unlink)
3883
3884 # Verify attributes are readable.
3885 self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
3886
3887 # Exercise len().
3888 self.assertEqual(len(sl), 7)
3889
3890 # Exercise index().
3891 with warnings.catch_warnings():
3892 # Suppress BytesWarning when comparing against b'HoWdY'.
3893 warnings.simplefilter('ignore')
3894 with self.assertRaises(ValueError):
3895 sl.index('100')
3896 self.assertEqual(sl.index(100), 3)
3897
3898 # Exercise retrieving individual values.
3899 self.assertEqual(sl[0], 'howdy')
3900 self.assertEqual(sl[-2], True)
3901
3902 # Exercise iterability.
3903 self.assertEqual(
3904 tuple(sl),
3905 ('howdy', b'HoWdY', -273.154, 100, None, True, 42)
3906 )
3907
3908 # Exercise modifying individual values.
3909 sl[3] = 42
3910 self.assertEqual(sl[3], 42)
3911 sl[4] = 'some' # Change type at a given position.
3912 self.assertEqual(sl[4], 'some')
3913 self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q')
3914 with self.assertRaises(ValueError):
3915 sl[4] = 'far too many' # Exceeds available storage.
3916 self.assertEqual(sl[4], 'some')
3917
3918 # Exercise count().
3919 with warnings.catch_warnings():
3920 # Suppress BytesWarning when comparing against b'HoWdY'.
3921 warnings.simplefilter('ignore')
3922 self.assertEqual(sl.count(42), 2)
3923 self.assertEqual(sl.count(b'HoWdY'), 1)
3924 self.assertEqual(sl.count(b'adios'), 0)
3925
3926 # Exercise creating a duplicate.
3927 sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate')
3928 try:
3929 self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
3930 self.assertEqual('test03_duplicate', sl_copy.shm.name)
3931 self.assertEqual(list(sl), list(sl_copy))
3932 self.assertEqual(sl.format, sl_copy.format)
3933 sl_copy[-1] = 77
3934 self.assertEqual(sl_copy[-1], 77)
3935 self.assertNotEqual(sl[-1], 77)
3936 sl_copy.shm.close()
3937 finally:
3938 sl_copy.shm.unlink()
3939
3940 # Obtain a second handle on the same ShareableList.
3941 sl_tethered = shared_memory.ShareableList(name=sl.shm.name)
3942 self.assertEqual(sl.shm.name, sl_tethered.shm.name)
3943 sl_tethered[-1] = 880
3944 self.assertEqual(sl[-1], 880)
3945 sl_tethered.shm.close()
3946
3947 sl.shm.close()
3948
3949 # Exercise creating an empty ShareableList.
3950 empty_sl = shared_memory.ShareableList()
3951 try:
3952 self.assertEqual(len(empty_sl), 0)
3953 self.assertEqual(empty_sl.format, '')
3954 self.assertEqual(empty_sl.count('any'), 0)
3955 with self.assertRaises(ValueError):
3956 empty_sl.index(None)
3957 empty_sl.shm.close()
3958 finally:
3959 empty_sl.shm.unlink()
3960
3961 def test_shared_memory_ShareableList_pickling(self):
3962 sl = shared_memory.ShareableList(range(10))
3963 self.addCleanup(sl.shm.unlink)
3964
3965 serialized_sl = pickle.dumps(sl)
3966 deserialized_sl = pickle.loads(serialized_sl)
3967 self.assertTrue(
3968 isinstance(deserialized_sl, shared_memory.ShareableList)
3969 )
3970 self.assertTrue(deserialized_sl[-1], 9)
3971 self.assertFalse(sl is deserialized_sl)
3972 deserialized_sl[4] = "changed"
3973 self.assertEqual(sl[4], "changed")
3974
3975 # Verify data is not being put into the pickled representation.
3976 name = 'a' * len(sl.shm.name)
3977 larger_sl = shared_memory.ShareableList(range(400))
3978 self.addCleanup(larger_sl.shm.unlink)
3979 serialized_larger_sl = pickle.dumps(larger_sl)
3980 self.assertTrue(len(serialized_sl) == len(serialized_larger_sl))
3981 larger_sl.shm.close()
3982
3983 deserialized_sl.shm.close()
3984 sl.shm.close()
3985
Pierre Glaserf22cc692019-05-10 22:59:08 +02003986 def test_shared_memory_cleaned_after_process_termination(self):
Pierre Glaserf22cc692019-05-10 22:59:08 +02003987 cmd = '''if 1:
3988 import os, time, sys
3989 from multiprocessing import shared_memory
3990
3991 # Create a shared_memory segment, and send the segment name
3992 sm = shared_memory.SharedMemory(create=True, size=10)
Miss Islington (bot)3d58b782019-07-11 11:38:37 -07003993 sys.stdout.write(sm.name + '\\n')
Pierre Glaserf22cc692019-05-10 22:59:08 +02003994 sys.stdout.flush()
3995 time.sleep(100)
3996 '''
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02003997 with subprocess.Popen([sys.executable, '-E', '-c', cmd],
3998 stdout=subprocess.PIPE,
3999 stderr=subprocess.PIPE) as p:
4000 name = p.stdout.readline().strip().decode()
Pierre Glaserf22cc692019-05-10 22:59:08 +02004001
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02004002 # killing abruptly processes holding reference to a shared memory
4003 # segment should not leak the given memory segment.
4004 p.terminate()
4005 p.wait()
Pierre Glaserf22cc692019-05-10 22:59:08 +02004006
Pierre Glasercbe72d82019-05-17 20:20:07 +02004007 deadline = time.monotonic() + 60
4008 t = 0.1
4009 while time.monotonic() < deadline:
4010 time.sleep(t)
4011 t = min(t*2, 5)
4012 try:
4013 smm = shared_memory.SharedMemory(name, create=False)
4014 except FileNotFoundError:
4015 break
4016 else:
4017 raise AssertionError("A SharedMemory segment was leaked after"
4018 " a process was abruptly terminated.")
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02004019
4020 if os.name == 'posix':
4021 # A warning was emitted by the subprocess' own
4022 # resource_tracker (on Windows, shared memory segments
4023 # are released automatically by the OS).
4024 err = p.stderr.read().decode()
4025 self.assertIn(
4026 "resource_tracker: There appear to be 1 leaked "
4027 "shared_memory objects to clean up at shutdown", err)
Pierre Glaserf22cc692019-05-10 22:59:08 +02004028
Benjamin Petersone711caf2008-06-11 16:44:04 +00004029#
4030#
4031#
4032
4033class _TestFinalize(BaseTestCase):
4034
4035 ALLOWED_TYPES = ('processes',)
4036
Antoine Pitrou1eb6c002017-06-13 17:10:39 +02004037 def setUp(self):
4038 self.registry_backup = util._finalizer_registry.copy()
4039 util._finalizer_registry.clear()
4040
4041 def tearDown(self):
4042 self.assertFalse(util._finalizer_registry)
4043 util._finalizer_registry.update(self.registry_backup)
4044
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00004045 @classmethod
4046 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00004047 class Foo(object):
4048 pass
4049
4050 a = Foo()
4051 util.Finalize(a, conn.send, args=('a',))
4052 del a # triggers callback for a
4053
4054 b = Foo()
4055 close_b = util.Finalize(b, conn.send, args=('b',))
4056 close_b() # triggers callback for b
4057 close_b() # does nothing because callback has already been called
4058 del b # does nothing because callback has already been called
4059
4060 c = Foo()
4061 util.Finalize(c, conn.send, args=('c',))
4062
4063 d10 = Foo()
4064 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
4065
4066 d01 = Foo()
4067 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
4068 d02 = Foo()
4069 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
4070 d03 = Foo()
4071 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
4072
4073 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
4074
4075 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
4076
Ezio Melotti13925002011-03-16 11:05:33 +02004077 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00004078 # garbage collecting locals
4079 util._exit_function()
4080 conn.close()
4081 os._exit(0)
4082
4083 def test_finalize(self):
4084 conn, child_conn = self.Pipe()
4085
4086 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02004087 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00004088 p.start()
4089 p.join()
4090
4091 result = [obj for obj in iter(conn.recv, 'STOP')]
4092 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
4093
Antoine Pitrou1eb6c002017-06-13 17:10:39 +02004094 def test_thread_safety(self):
4095 # bpo-24484: _run_finalizers() should be thread-safe
4096 def cb():
4097 pass
4098
4099 class Foo(object):
4100 def __init__(self):
4101 self.ref = self # create reference cycle
4102 # insert finalizer at random key
4103 util.Finalize(self, cb, exitpriority=random.randint(1, 100))
4104
4105 finish = False
4106 exc = None
4107
4108 def run_finalizers():
4109 nonlocal exc
4110 while not finish:
4111 time.sleep(random.random() * 1e-1)
4112 try:
4113 # A GC run will eventually happen during this,
4114 # collecting stale Foo's and mutating the registry
4115 util._run_finalizers()
4116 except Exception as e:
4117 exc = e
4118
4119 def make_finalizers():
4120 nonlocal exc
4121 d = {}
4122 while not finish:
4123 try:
4124 # Old Foo's get gradually replaced and later
4125 # collected by the GC (because of the cyclic ref)
4126 d[random.getrandbits(5)] = {Foo() for i in range(10)}
4127 except Exception as e:
4128 exc = e
4129 d.clear()
4130
4131 old_interval = sys.getswitchinterval()
4132 old_threshold = gc.get_threshold()
4133 try:
4134 sys.setswitchinterval(1e-6)
4135 gc.set_threshold(5, 5, 5)
4136 threads = [threading.Thread(target=run_finalizers),
4137 threading.Thread(target=make_finalizers)]
4138 with test.support.start_threads(threads):
4139 time.sleep(4.0) # Wait a bit to trigger race condition
4140 finish = True
4141 if exc is not None:
4142 raise exc
4143 finally:
4144 sys.setswitchinterval(old_interval)
4145 gc.set_threshold(*old_threshold)
4146 gc.collect() # Collect remaining Foo's
4147
4148
Benjamin Petersone711caf2008-06-11 16:44:04 +00004149#
4150# Test that from ... import * works for each module
4151#
4152
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004153class _TestImportStar(unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00004154
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004155 def get_module_names(self):
4156 import glob
4157 folder = os.path.dirname(multiprocessing.__file__)
4158 pattern = os.path.join(folder, '*.py')
4159 files = glob.glob(pattern)
4160 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
4161 modules = ['multiprocessing.' + m for m in modules]
4162 modules.remove('multiprocessing.__init__')
4163 modules.append('multiprocessing')
4164 return modules
Benjamin Petersone711caf2008-06-11 16:44:04 +00004165
4166 def test_import(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004167 modules = self.get_module_names()
4168 if sys.platform == 'win32':
4169 modules.remove('multiprocessing.popen_fork')
4170 modules.remove('multiprocessing.popen_forkserver')
4171 modules.remove('multiprocessing.popen_spawn_posix')
4172 else:
4173 modules.remove('multiprocessing.popen_spawn_win32')
4174 if not HAS_REDUCTION:
4175 modules.remove('multiprocessing.popen_forkserver')
Florent Xiclunafd1b0932010-03-28 00:25:02 +00004176
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004177 if c_int is None:
Florent Xiclunafd1b0932010-03-28 00:25:02 +00004178 # This module requires _ctypes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004179 modules.remove('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00004180
4181 for name in modules:
4182 __import__(name)
4183 mod = sys.modules[name]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004184 self.assertTrue(hasattr(mod, '__all__'), name)
Benjamin Petersone711caf2008-06-11 16:44:04 +00004185
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004186 for attr in mod.__all__:
Benjamin Petersone711caf2008-06-11 16:44:04 +00004187 self.assertTrue(
4188 hasattr(mod, attr),
4189 '%r does not have attribute %r' % (mod, attr)
4190 )
4191
4192#
4193# Quick test that logging works -- does not test logging output
4194#
4195
4196class _TestLogging(BaseTestCase):
4197
4198 ALLOWED_TYPES = ('processes',)
4199
4200 def test_enable_logging(self):
4201 logger = multiprocessing.get_logger()
4202 logger.setLevel(util.SUBWARNING)
4203 self.assertTrue(logger is not None)
4204 logger.debug('this will not be printed')
4205 logger.info('nor will this')
4206 logger.setLevel(LOG_LEVEL)
4207
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00004208 @classmethod
4209 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00004210 logger = multiprocessing.get_logger()
4211 conn.send(logger.getEffectiveLevel())
4212
4213 def test_level(self):
4214 LEVEL1 = 32
4215 LEVEL2 = 37
4216
4217 logger = multiprocessing.get_logger()
4218 root_logger = logging.getLogger()
4219 root_level = root_logger.level
4220
4221 reader, writer = multiprocessing.Pipe(duplex=False)
4222
4223 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02004224 p = self.Process(target=self._test_level, args=(writer,))
Jesus Cea94f964f2011-09-09 20:26:57 +02004225 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00004226 self.assertEqual(LEVEL1, reader.recv())
Victor Stinner06634952017-07-24 13:02:20 +02004227 p.join()
4228 p.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00004229
4230 logger.setLevel(logging.NOTSET)
4231 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02004232 p = self.Process(target=self._test_level, args=(writer,))
Jesus Cea94f964f2011-09-09 20:26:57 +02004233 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00004234 self.assertEqual(LEVEL2, reader.recv())
Victor Stinner06634952017-07-24 13:02:20 +02004235 p.join()
4236 p.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00004237
4238 root_logger.setLevel(root_level)
4239 logger.setLevel(level=LOG_LEVEL)
4240
Jesse Nollerb9a49b72009-11-21 18:09:38 +00004241
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00004242# class _TestLoggingProcessName(BaseTestCase):
4243#
4244# def handle(self, record):
4245# assert record.processName == multiprocessing.current_process().name
4246# self.__handled = True
4247#
4248# def test_logging(self):
4249# handler = logging.Handler()
4250# handler.handle = self.handle
4251# self.__handled = False
4252# # Bypass getLogger() and side-effects
4253# logger = logging.getLoggerClass()(
4254# 'multiprocessing.test.TestLoggingProcessName')
4255# logger.addHandler(handler)
4256# logger.propagate = False
4257#
4258# logger.warn('foo')
4259# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00004260
Benjamin Petersone711caf2008-06-11 16:44:04 +00004261#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00004262# Check that Process.join() retries if os.waitpid() fails with EINTR
4263#
4264
4265class _TestPollEintr(BaseTestCase):
4266
4267 ALLOWED_TYPES = ('processes',)
4268
4269 @classmethod
4270 def _killer(cls, pid):
Richard Oudkerk6a53af82013-08-28 13:50:19 +01004271 time.sleep(0.1)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00004272 os.kill(pid, signal.SIGUSR1)
4273
4274 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4275 def test_poll_eintr(self):
4276 got_signal = [False]
4277 def record(*args):
4278 got_signal[0] = True
4279 pid = os.getpid()
4280 oldhandler = signal.signal(signal.SIGUSR1, record)
4281 try:
4282 killer = self.Process(target=self._killer, args=(pid,))
4283 killer.start()
Richard Oudkerk6a53af82013-08-28 13:50:19 +01004284 try:
4285 p = self.Process(target=time.sleep, args=(2,))
4286 p.start()
4287 p.join()
4288 finally:
4289 killer.join()
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00004290 self.assertTrue(got_signal[0])
4291 self.assertEqual(p.exitcode, 0)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00004292 finally:
4293 signal.signal(signal.SIGUSR1, oldhandler)
4294
4295#
Jesse Noller6214edd2009-01-19 16:23:53 +00004296# Test to verify handle verification, see issue 3321
4297#
4298
4299class TestInvalidHandle(unittest.TestCase):
4300
Victor Stinner937ee9e2018-06-26 02:11:06 +02004301 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00004302 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02004303 conn = multiprocessing.connection.Connection(44977608)
Charles-François Natali6703bb42013-09-06 21:12:22 +02004304 # check that poll() doesn't crash
Antoine Pitrou87cf2202011-05-09 17:04:27 +02004305 try:
Charles-François Natali6703bb42013-09-06 21:12:22 +02004306 conn.poll()
4307 except (ValueError, OSError):
4308 pass
Antoine Pitrou87cf2202011-05-09 17:04:27 +02004309 finally:
4310 # Hack private attribute _handle to avoid printing an error
4311 # in conn.__del__
4312 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02004313 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02004314 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00004315
Benjamin Petersone711caf2008-06-11 16:44:04 +00004316
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01004317
Neal Norwitz5d6415e2008-08-25 01:53:32 +00004318class OtherTest(unittest.TestCase):
4319 # TODO: add more tests for deliver/answer challenge.
4320 def test_deliver_challenge_auth_failure(self):
4321 class _FakeConnection(object):
4322 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00004323 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00004324 def send_bytes(self, data):
4325 pass
4326 self.assertRaises(multiprocessing.AuthenticationError,
4327 multiprocessing.connection.deliver_challenge,
4328 _FakeConnection(), b'abc')
4329
4330 def test_answer_challenge_auth_failure(self):
4331 class _FakeConnection(object):
4332 def __init__(self):
4333 self.count = 0
4334 def recv_bytes(self, size):
4335 self.count += 1
4336 if self.count == 1:
4337 return multiprocessing.connection.CHALLENGE
4338 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00004339 return b'something bogus'
4340 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00004341 def send_bytes(self, data):
4342 pass
4343 self.assertRaises(multiprocessing.AuthenticationError,
4344 multiprocessing.connection.answer_challenge,
4345 _FakeConnection(), b'abc')
4346
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00004347#
4348# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
4349#
4350
4351def initializer(ns):
4352 ns.test += 1
4353
4354class TestInitializers(unittest.TestCase):
4355 def setUp(self):
4356 self.mgr = multiprocessing.Manager()
4357 self.ns = self.mgr.Namespace()
4358 self.ns.test = 0
4359
4360 def tearDown(self):
4361 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01004362 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00004363
4364 def test_manager_initializer(self):
4365 m = multiprocessing.managers.SyncManager()
4366 self.assertRaises(TypeError, m.start, 1)
4367 m.start(initializer, (self.ns,))
4368 self.assertEqual(self.ns.test, 1)
4369 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01004370 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00004371
4372 def test_pool_initializer(self):
4373 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
4374 p = multiprocessing.Pool(1, initializer, (self.ns,))
4375 p.close()
4376 p.join()
4377 self.assertEqual(self.ns.test, 1)
4378
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00004379#
4380# Issue 5155, 5313, 5331: Test process in processes
4381# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
4382#
4383
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01004384def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00004385 try:
4386 item = q.get(block=False)
4387 except pyqueue.Empty:
4388 pass
4389
Victor Stinnerb4c52962017-07-25 02:40:55 +02004390def _test_process():
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01004391 queue = multiprocessing.Queue()
4392 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
4393 subProc.daemon = True
4394 subProc.start()
4395 subProc.join()
4396
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00004397def _afunc(x):
4398 return x*x
4399
4400def pool_in_process():
4401 pool = multiprocessing.Pool(processes=4)
4402 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01004403 pool.close()
4404 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00004405
4406class _file_like(object):
4407 def __init__(self, delegate):
4408 self._delegate = delegate
4409 self._pid = None
4410
4411 @property
4412 def cache(self):
4413 pid = os.getpid()
4414 # There are no race conditions since fork keeps only the running thread
4415 if pid != self._pid:
4416 self._pid = pid
4417 self._cache = []
4418 return self._cache
4419
4420 def write(self, data):
4421 self.cache.append(data)
4422
4423 def flush(self):
4424 self._delegate.write(''.join(self.cache))
4425 self._cache = []
4426
4427class TestStdinBadfiledescriptor(unittest.TestCase):
4428
4429 def test_queue_in_process(self):
Victor Stinnerb4c52962017-07-25 02:40:55 +02004430 proc = multiprocessing.Process(target=_test_process)
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00004431 proc.start()
4432 proc.join()
4433
4434 def test_pool_in_process(self):
4435 p = multiprocessing.Process(target=pool_in_process)
4436 p.start()
4437 p.join()
4438
4439 def test_flushing(self):
4440 sio = io.StringIO()
4441 flike = _file_like(sio)
4442 flike.write('foo')
4443 proc = multiprocessing.Process(target=lambda: flike.flush())
4444 flike.flush()
4445 assert sio.getvalue() == 'foo'
4446
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004447
4448class TestWait(unittest.TestCase):
4449
4450 @classmethod
4451 def _child_test_wait(cls, w, slow):
4452 for i in range(10):
4453 if slow:
4454 time.sleep(random.random()*0.1)
4455 w.send((i, os.getpid()))
4456 w.close()
4457
4458 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004459 from multiprocessing.connection import wait
4460 readers = []
4461 procs = []
4462 messages = []
4463
4464 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01004465 r, w = multiprocessing.Pipe(duplex=False)
4466 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004467 p.daemon = True
4468 p.start()
4469 w.close()
4470 readers.append(r)
4471 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01004472 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004473
4474 while readers:
4475 for r in wait(readers):
4476 try:
4477 msg = r.recv()
4478 except EOFError:
4479 readers.remove(r)
4480 r.close()
4481 else:
4482 messages.append(msg)
4483
4484 messages.sort()
4485 expected = sorted((i, p.pid) for i in range(10) for p in procs)
4486 self.assertEqual(messages, expected)
4487
4488 @classmethod
4489 def _child_test_wait_socket(cls, address, slow):
4490 s = socket.socket()
4491 s.connect(address)
4492 for i in range(10):
4493 if slow:
4494 time.sleep(random.random()*0.1)
4495 s.sendall(('%s\n' % i).encode('ascii'))
4496 s.close()
4497
4498 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004499 from multiprocessing.connection import wait
Giampaolo Rodolaeb7e29f2019-04-09 00:34:02 +02004500 l = socket.create_server((test.support.HOST, 0))
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02004501 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004502 readers = []
4503 procs = []
4504 dic = {}
4505
4506 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01004507 p = multiprocessing.Process(target=self._child_test_wait_socket,
4508 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004509 p.daemon = True
4510 p.start()
4511 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01004512 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004513
4514 for i in range(4):
4515 r, _ = l.accept()
4516 readers.append(r)
4517 dic[r] = []
4518 l.close()
4519
4520 while readers:
4521 for r in wait(readers):
4522 msg = r.recv(32)
4523 if not msg:
4524 readers.remove(r)
4525 r.close()
4526 else:
4527 dic[r].append(msg)
4528
4529 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4530 for v in dic.values():
4531 self.assertEqual(b''.join(v), expected)
4532
4533 def test_wait_slow(self):
4534 self.test_wait(True)
4535
4536 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01004537 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004538
4539 def test_wait_timeout(self):
4540 from multiprocessing.connection import wait
4541
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004542 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004543 a, b = multiprocessing.Pipe()
4544
Victor Stinner2cf4c202018-12-17 09:36:36 +01004545 start = time.monotonic()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004546 res = wait([a, b], expected)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004547 delta = time.monotonic() - start
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004548
4549 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01004550 self.assertLess(delta, expected * 2)
4551 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004552
4553 b.send(None)
4554
Victor Stinner2cf4c202018-12-17 09:36:36 +01004555 start = time.monotonic()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004556 res = wait([a, b], 20)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004557 delta = time.monotonic() - start
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004558
4559 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01004560 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004561
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004562 @classmethod
4563 def signal_and_sleep(cls, sem, period):
4564 sem.release()
4565 time.sleep(period)
4566
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004567 def test_wait_integer(self):
4568 from multiprocessing.connection import wait
4569
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004570 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01004571 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004572 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004573 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004574 p = multiprocessing.Process(target=self.signal_and_sleep,
4575 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004576
4577 p.start()
4578 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004579 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004580
Victor Stinner2cf4c202018-12-17 09:36:36 +01004581 start = time.monotonic()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004582 res = wait([a, p.sentinel, b], expected + 20)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004583 delta = time.monotonic() - start
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004584
4585 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01004586 self.assertLess(delta, expected + 2)
4587 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004588
4589 a.send(None)
4590
Victor Stinner2cf4c202018-12-17 09:36:36 +01004591 start = time.monotonic()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004592 res = wait([a, p.sentinel, b], 20)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004593 delta = time.monotonic() - start
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004594
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01004595 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01004596 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004597
4598 b.send(None)
4599
Victor Stinner2cf4c202018-12-17 09:36:36 +01004600 start = time.monotonic()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004601 res = wait([a, p.sentinel, b], 20)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004602 delta = time.monotonic() - start
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004603
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01004604 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01004605 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004606
Richard Oudkerk009b15e2012-05-04 09:44:39 +01004607 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004608 p.join()
4609
Richard Oudkerk59d54042012-05-10 16:11:12 +01004610 def test_neg_timeout(self):
4611 from multiprocessing.connection import wait
4612 a, b = multiprocessing.Pipe()
Victor Stinner2cf4c202018-12-17 09:36:36 +01004613 t = time.monotonic()
Richard Oudkerk59d54042012-05-10 16:11:12 +01004614 res = wait([a], timeout=-1)
Victor Stinner2cf4c202018-12-17 09:36:36 +01004615 t = time.monotonic() - t
Richard Oudkerk59d54042012-05-10 16:11:12 +01004616 self.assertEqual(res, [])
4617 self.assertLess(t, 1)
4618 a.close()
4619 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01004620
Antoine Pitrou709176f2012-04-01 17:19:09 +02004621#
4622# Issue 14151: Test invalid family on invalid environment
4623#
4624
4625class TestInvalidFamily(unittest.TestCase):
4626
Victor Stinner937ee9e2018-06-26 02:11:06 +02004627 @unittest.skipIf(WIN32, "skipped on Windows")
Antoine Pitrou709176f2012-04-01 17:19:09 +02004628 def test_invalid_family(self):
4629 with self.assertRaises(ValueError):
4630 multiprocessing.connection.Listener(r'\\.\test')
4631
Victor Stinner937ee9e2018-06-26 02:11:06 +02004632 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02004633 def test_invalid_family_win32(self):
4634 with self.assertRaises(ValueError):
4635 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02004636
Richard Oudkerk77c84f22012-05-18 14:28:02 +01004637#
4638# Issue 12098: check sys.flags of child matches that for parent
4639#
4640
4641class TestFlags(unittest.TestCase):
4642 @classmethod
4643 def run_in_grandchild(cls, conn):
4644 conn.send(tuple(sys.flags))
4645
4646 @classmethod
4647 def run_in_child(cls):
4648 import json
4649 r, w = multiprocessing.Pipe(duplex=False)
4650 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4651 p.start()
4652 grandchild_flags = r.recv()
4653 p.join()
4654 r.close()
4655 w.close()
4656 flags = (tuple(sys.flags), grandchild_flags)
4657 print(json.dumps(flags))
4658
4659 def test_flags(self):
Pierre Glaserb1dfcad2019-05-13 21:15:32 +02004660 import json
Richard Oudkerk77c84f22012-05-18 14:28:02 +01004661 # start child process using unusual flags
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004662 prog = ('from test._test_multiprocessing import TestFlags; ' +
Richard Oudkerk77c84f22012-05-18 14:28:02 +01004663 'TestFlags.run_in_child()')
4664 data = subprocess.check_output(
4665 [sys.executable, '-E', '-S', '-O', '-c', prog])
4666 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4667 self.assertEqual(child_flags, grandchild_flags)
4668
Richard Oudkerkb15e6222012-07-27 14:19:00 +01004669#
4670# Test interaction with socket timeouts - see Issue #6056
4671#
4672
4673class TestTimeouts(unittest.TestCase):
4674 @classmethod
4675 def _test_timeout(cls, child, address):
4676 time.sleep(1)
4677 child.send(123)
4678 child.close()
4679 conn = multiprocessing.connection.Client(address)
4680 conn.send(456)
4681 conn.close()
4682
4683 def test_timeout(self):
4684 old_timeout = socket.getdefaulttimeout()
4685 try:
4686 socket.setdefaulttimeout(0.1)
4687 parent, child = multiprocessing.Pipe(duplex=True)
4688 l = multiprocessing.connection.Listener(family='AF_INET')
4689 p = multiprocessing.Process(target=self._test_timeout,
4690 args=(child, l.address))
4691 p.start()
4692 child.close()
4693 self.assertEqual(parent.recv(), 123)
4694 parent.close()
4695 conn = l.accept()
4696 self.assertEqual(conn.recv(), 456)
4697 conn.close()
4698 l.close()
Victor Stinner11f08072017-09-15 06:55:31 -07004699 join_process(p)
Richard Oudkerkb15e6222012-07-27 14:19:00 +01004700 finally:
4701 socket.setdefaulttimeout(old_timeout)
4702
Richard Oudkerke88a2442012-08-14 11:41:32 +01004703#
4704# Test what happens with no "if __name__ == '__main__'"
4705#
4706
4707class TestNoForkBomb(unittest.TestCase):
4708 def test_noforkbomb(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004709 sm = multiprocessing.get_start_method()
Richard Oudkerke88a2442012-08-14 11:41:32 +01004710 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004711 if sm != 'fork':
Berker Peksag076dbd02015-05-06 07:01:52 +03004712 rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02004713 self.assertEqual(out, b'')
4714 self.assertIn(b'RuntimeError', err)
Richard Oudkerke88a2442012-08-14 11:41:32 +01004715 else:
Berker Peksag076dbd02015-05-06 07:01:52 +03004716 rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02004717 self.assertEqual(out.rstrip(), b'123')
4718 self.assertEqual(err, b'')
Richard Oudkerke88a2442012-08-14 11:41:32 +01004719
4720#
Richard Oudkerk409c3132013-04-17 20:58:00 +01004721# Issue #17555: ForkAwareThreadLock
4722#
4723
4724class TestForkAwareThreadLock(unittest.TestCase):
Mike53f7a7c2017-12-14 14:04:53 +03004725 # We recursively start processes. Issue #17555 meant that the
Richard Oudkerk409c3132013-04-17 20:58:00 +01004726 # after fork registry would get duplicate entries for the same
4727 # lock. The size of the registry at generation n was ~2**n.
4728
4729 @classmethod
4730 def child(cls, n, conn):
4731 if n > 1:
4732 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4733 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01004734 conn.close()
Victor Stinner11f08072017-09-15 06:55:31 -07004735 join_process(p)
Richard Oudkerk409c3132013-04-17 20:58:00 +01004736 else:
4737 conn.send(len(util._afterfork_registry))
4738 conn.close()
4739
4740 def test_lock(self):
4741 r, w = multiprocessing.Pipe(False)
4742 l = util.ForkAwareThreadLock()
4743 old_size = len(util._afterfork_registry)
4744 p = multiprocessing.Process(target=self.child, args=(5, w))
4745 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01004746 w.close()
Richard Oudkerk409c3132013-04-17 20:58:00 +01004747 new_size = r.recv()
Victor Stinner11f08072017-09-15 06:55:31 -07004748 join_process(p)
Richard Oudkerk409c3132013-04-17 20:58:00 +01004749 self.assertLessEqual(new_size, old_size)
4750
4751#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004752# Check that non-forked child processes do not inherit unneeded fds/handles
4753#
4754
4755class TestCloseFds(unittest.TestCase):
4756
4757 def get_high_socket_fd(self):
Victor Stinner937ee9e2018-06-26 02:11:06 +02004758 if WIN32:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004759 # The child process will not have any socket handles, so
4760 # calling socket.fromfd() should produce WSAENOTSOCK even
4761 # if there is a handle of the same number.
4762 return socket.socket().detach()
4763 else:
4764 # We want to produce a socket with an fd high enough that a
4765 # freshly created child process will not have any fds as high.
4766 fd = socket.socket().detach()
4767 to_close = []
4768 while fd < 50:
4769 to_close.append(fd)
4770 fd = os.dup(fd)
4771 for x in to_close:
4772 os.close(x)
4773 return fd
4774
4775 def close(self, fd):
Victor Stinner937ee9e2018-06-26 02:11:06 +02004776 if WIN32:
Christian Heimesb6e43af2018-01-29 22:37:58 +01004777 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004778 else:
4779 os.close(fd)
4780
4781 @classmethod
4782 def _test_closefds(cls, conn, fd):
4783 try:
4784 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
4785 except Exception as e:
4786 conn.send(e)
4787 else:
4788 s.close()
4789 conn.send(None)
4790
4791 def test_closefd(self):
4792 if not HAS_REDUCTION:
4793 raise unittest.SkipTest('requires fd pickling')
4794
4795 reader, writer = multiprocessing.Pipe()
4796 fd = self.get_high_socket_fd()
4797 try:
4798 p = multiprocessing.Process(target=self._test_closefds,
4799 args=(writer, fd))
4800 p.start()
4801 writer.close()
4802 e = reader.recv()
Victor Stinner11f08072017-09-15 06:55:31 -07004803 join_process(p)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004804 finally:
4805 self.close(fd)
4806 writer.close()
4807 reader.close()
4808
4809 if multiprocessing.get_start_method() == 'fork':
4810 self.assertIs(e, None)
4811 else:
4812 WSAENOTSOCK = 10038
4813 self.assertIsInstance(e, OSError)
4814 self.assertTrue(e.errno == errno.EBADF or
4815 e.winerror == WSAENOTSOCK, e)
4816
4817#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004818# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
4819#
4820
4821class TestIgnoreEINTR(unittest.TestCase):
4822
Victor Stinner252f6ab2018-06-01 16:48:34 +02004823 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
4824 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
4825
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004826 @classmethod
4827 def _test_ignore(cls, conn):
4828 def handler(signum, frame):
4829 pass
4830 signal.signal(signal.SIGUSR1, handler)
4831 conn.send('ready')
4832 x = conn.recv()
4833 conn.send(x)
Victor Stinner252f6ab2018-06-01 16:48:34 +02004834 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004835
4836 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4837 def test_ignore(self):
4838 conn, child_conn = multiprocessing.Pipe()
4839 try:
4840 p = multiprocessing.Process(target=self._test_ignore,
4841 args=(child_conn,))
4842 p.daemon = True
4843 p.start()
4844 child_conn.close()
4845 self.assertEqual(conn.recv(), 'ready')
4846 time.sleep(0.1)
4847 os.kill(p.pid, signal.SIGUSR1)
4848 time.sleep(0.1)
4849 conn.send(1234)
4850 self.assertEqual(conn.recv(), 1234)
4851 time.sleep(0.1)
4852 os.kill(p.pid, signal.SIGUSR1)
Victor Stinner252f6ab2018-06-01 16:48:34 +02004853 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004854 time.sleep(0.1)
4855 p.join()
4856 finally:
4857 conn.close()
4858
4859 @classmethod
4860 def _test_ignore_listener(cls, conn):
4861 def handler(signum, frame):
4862 pass
4863 signal.signal(signal.SIGUSR1, handler)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004864 with multiprocessing.connection.Listener() as l:
4865 conn.send(l.address)
4866 a = l.accept()
4867 a.send('welcome')
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004868
4869 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4870 def test_ignore_listener(self):
4871 conn, child_conn = multiprocessing.Pipe()
4872 try:
4873 p = multiprocessing.Process(target=self._test_ignore_listener,
4874 args=(child_conn,))
4875 p.daemon = True
4876 p.start()
4877 child_conn.close()
4878 address = conn.recv()
4879 time.sleep(0.1)
4880 os.kill(p.pid, signal.SIGUSR1)
4881 time.sleep(0.1)
4882 client = multiprocessing.connection.Client(address)
4883 self.assertEqual(client.recv(), 'welcome')
4884 p.join()
4885 finally:
4886 conn.close()
4887
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004888class TestStartMethod(unittest.TestCase):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004889 @classmethod
4890 def _check_context(cls, conn):
4891 conn.send(multiprocessing.get_start_method())
4892
4893 def check_context(self, ctx):
4894 r, w = ctx.Pipe(duplex=False)
4895 p = ctx.Process(target=self._check_context, args=(w,))
4896 p.start()
4897 w.close()
4898 child_method = r.recv()
4899 r.close()
4900 p.join()
4901 self.assertEqual(child_method, ctx.get_start_method())
4902
4903 def test_context(self):
4904 for method in ('fork', 'spawn', 'forkserver'):
4905 try:
4906 ctx = multiprocessing.get_context(method)
4907 except ValueError:
4908 continue
4909 self.assertEqual(ctx.get_start_method(), method)
4910 self.assertIs(ctx.get_context(), ctx)
4911 self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
4912 self.assertRaises(ValueError, ctx.set_start_method, None)
4913 self.check_context(ctx)
4914
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004915 def test_set_get(self):
4916 multiprocessing.set_forkserver_preload(PRELOAD)
4917 count = 0
4918 old_method = multiprocessing.get_start_method()
Jesse Nollerd00df3c2008-06-18 14:22:48 +00004919 try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004920 for method in ('fork', 'spawn', 'forkserver'):
4921 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004922 multiprocessing.set_start_method(method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004923 except ValueError:
4924 continue
4925 self.assertEqual(multiprocessing.get_start_method(), method)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004926 ctx = multiprocessing.get_context()
4927 self.assertEqual(ctx.get_start_method(), method)
4928 self.assertTrue(type(ctx).__name__.lower().startswith(method))
4929 self.assertTrue(
4930 ctx.Process.__name__.lower().startswith(method))
4931 self.check_context(multiprocessing)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004932 count += 1
4933 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004934 multiprocessing.set_start_method(old_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004935 self.assertGreaterEqual(count, 1)
4936
4937 def test_get_all(self):
4938 methods = multiprocessing.get_all_start_methods()
4939 if sys.platform == 'win32':
4940 self.assertEqual(methods, ['spawn'])
4941 else:
4942 self.assertTrue(methods == ['fork', 'spawn'] or
4943 methods == ['fork', 'spawn', 'forkserver'])
4944
Antoine Pitroucd2a2012016-12-10 17:13:16 +01004945 def test_preload_resources(self):
4946 if multiprocessing.get_start_method() != 'forkserver':
4947 self.skipTest("test only relevant for 'forkserver' method")
4948 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
4949 rc, out, err = test.support.script_helper.assert_python_ok(name)
4950 out = out.decode()
4951 err = err.decode()
4952 if out.rstrip() != 'ok' or err != '':
4953 print(out)
4954 print(err)
4955 self.fail("failed spawning forkserver or grandchild")
4956
4957
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004958@unittest.skipIf(sys.platform == "win32",
4959 "test semantics don't make sense on Windows")
Pierre Glaserf22cc692019-05-10 22:59:08 +02004960class TestResourceTracker(unittest.TestCase):
Antoine Pitroucbe17562017-11-03 14:31:38 +01004961
Pierre Glaserf22cc692019-05-10 22:59:08 +02004962 def test_resource_tracker(self):
Antoine Pitroucbe17562017-11-03 14:31:38 +01004963 #
4964 # Check that killing process does not leak named semaphores
4965 #
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004966 cmd = '''if 1:
Pierre Glaserf22cc692019-05-10 22:59:08 +02004967 import time, os, tempfile
4968 import multiprocessing as mp
4969 from multiprocessing import resource_tracker
4970 from multiprocessing.shared_memory import SharedMemory
4971
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004972 mp.set_start_method("spawn")
Pierre Glaserf22cc692019-05-10 22:59:08 +02004973 rand = tempfile._RandomNameSequence()
4974
4975
4976 def create_and_register_resource(rtype):
4977 if rtype == "semaphore":
4978 lock = mp.Lock()
4979 return lock, lock._semlock.name
4980 elif rtype == "shared_memory":
4981 sm = SharedMemory(create=True, size=10)
4982 return sm, sm._name
4983 else:
4984 raise ValueError(
4985 "Resource type {{}} not understood".format(rtype))
4986
4987
4988 resource1, rname1 = create_and_register_resource("{rtype}")
4989 resource2, rname2 = create_and_register_resource("{rtype}")
4990
4991 os.write({w}, rname1.encode("ascii") + b"\\n")
4992 os.write({w}, rname2.encode("ascii") + b"\\n")
4993
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004994 time.sleep(10)
4995 '''
Pierre Glaserf22cc692019-05-10 22:59:08 +02004996 for rtype in resource_tracker._CLEANUP_FUNCS:
4997 with self.subTest(rtype=rtype):
4998 if rtype == "noop":
4999 # Artefact resource type used by the resource_tracker
5000 continue
5001 r, w = os.pipe()
5002 p = subprocess.Popen([sys.executable,
5003 '-E', '-c', cmd.format(w=w, rtype=rtype)],
5004 pass_fds=[w],
5005 stderr=subprocess.PIPE)
5006 os.close(w)
5007 with open(r, 'rb', closefd=True) as f:
5008 name1 = f.readline().rstrip().decode('ascii')
5009 name2 = f.readline().rstrip().decode('ascii')
5010 _resource_unlink(name1, rtype)
5011 p.terminate()
5012 p.wait()
Miss Islington (bot)dd4edbc2019-06-25 15:49:31 -07005013
5014 deadline = time.monotonic() + 60
5015 while time.monotonic() < deadline:
5016 time.sleep(.5)
5017 try:
5018 _resource_unlink(name2, rtype)
5019 except OSError as e:
5020 # docs say it should be ENOENT, but OSX seems to give
5021 # EINVAL
5022 self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
5023 break
5024 else:
5025 raise AssertionError(
5026 f"A {rtype} resource was leaked after a process was "
5027 f"abruptly terminated.")
Pierre Glaserf22cc692019-05-10 22:59:08 +02005028 err = p.stderr.read().decode('utf-8')
5029 p.stderr.close()
5030 expected = ('resource_tracker: There appear to be 2 leaked {} '
5031 'objects'.format(
5032 rtype))
5033 self.assertRegex(err, expected)
5034 self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005035
Pierre Glaserf22cc692019-05-10 22:59:08 +02005036 def check_resource_tracker_death(self, signum, should_die):
Antoine Pitroucbe17562017-11-03 14:31:38 +01005037 # bpo-31310: if the semaphore tracker process has died, it should
5038 # be restarted implicitly.
Pierre Glaserf22cc692019-05-10 22:59:08 +02005039 from multiprocessing.resource_tracker import _resource_tracker
5040 pid = _resource_tracker._pid
Pablo Galindoec74d182018-09-04 09:53:54 +01005041 if pid is not None:
5042 os.kill(pid, signal.SIGKILL)
5043 os.waitpid(pid, 0)
Pablo Galindo3058b7d2018-10-10 08:40:14 +01005044 with warnings.catch_warnings():
5045 warnings.simplefilter("ignore")
Pierre Glaserf22cc692019-05-10 22:59:08 +02005046 _resource_tracker.ensure_running()
5047 pid = _resource_tracker._pid
Pablo Galindoec74d182018-09-04 09:53:54 +01005048
Antoine Pitroucbe17562017-11-03 14:31:38 +01005049 os.kill(pid, signum)
5050 time.sleep(1.0) # give it time to die
5051
5052 ctx = multiprocessing.get_context("spawn")
Pablo Galindoec74d182018-09-04 09:53:54 +01005053 with warnings.catch_warnings(record=True) as all_warn:
Pablo Galindo3058b7d2018-10-10 08:40:14 +01005054 warnings.simplefilter("always")
Antoine Pitroucbe17562017-11-03 14:31:38 +01005055 sem = ctx.Semaphore()
5056 sem.acquire()
5057 sem.release()
5058 wr = weakref.ref(sem)
5059 # ensure `sem` gets collected, which triggers communication with
5060 # the semaphore tracker
5061 del sem
5062 gc.collect()
5063 self.assertIsNone(wr())
Pablo Galindoec74d182018-09-04 09:53:54 +01005064 if should_die:
5065 self.assertEqual(len(all_warn), 1)
5066 the_warn = all_warn[0]
Pablo Galindo3058b7d2018-10-10 08:40:14 +01005067 self.assertTrue(issubclass(the_warn.category, UserWarning))
Pierre Glaserf22cc692019-05-10 22:59:08 +02005068 self.assertTrue("resource_tracker: process died"
Pablo Galindoec74d182018-09-04 09:53:54 +01005069 in str(the_warn.message))
5070 else:
5071 self.assertEqual(len(all_warn), 0)
Antoine Pitroucbe17562017-11-03 14:31:38 +01005072
Pierre Glaserf22cc692019-05-10 22:59:08 +02005073 def test_resource_tracker_sigint(self):
Antoine Pitroucbe17562017-11-03 14:31:38 +01005074 # Catchable signal (ignored by semaphore tracker)
Pierre Glaserf22cc692019-05-10 22:59:08 +02005075 self.check_resource_tracker_death(signal.SIGINT, False)
Antoine Pitroucbe17562017-11-03 14:31:38 +01005076
Pierre Glaserf22cc692019-05-10 22:59:08 +02005077 def test_resource_tracker_sigterm(self):
Pablo Galindoec74d182018-09-04 09:53:54 +01005078 # Catchable signal (ignored by semaphore tracker)
Pierre Glaserf22cc692019-05-10 22:59:08 +02005079 self.check_resource_tracker_death(signal.SIGTERM, False)
Pablo Galindoec74d182018-09-04 09:53:54 +01005080
Pierre Glaserf22cc692019-05-10 22:59:08 +02005081 def test_resource_tracker_sigkill(self):
Antoine Pitroucbe17562017-11-03 14:31:38 +01005082 # Uncatchable signal.
Pierre Glaserf22cc692019-05-10 22:59:08 +02005083 self.check_resource_tracker_death(signal.SIGKILL, True)
Antoine Pitroucbe17562017-11-03 14:31:38 +01005084
Thomas Moreau004b93e2019-04-24 21:45:52 +02005085 @staticmethod
Pierre Glaserf22cc692019-05-10 22:59:08 +02005086 def _is_resource_tracker_reused(conn, pid):
5087 from multiprocessing.resource_tracker import _resource_tracker
5088 _resource_tracker.ensure_running()
Thomas Moreau004b93e2019-04-24 21:45:52 +02005089 # The pid should be None in the child process, expect for the fork
5090 # context. It should not be a new value.
Pierre Glaserf22cc692019-05-10 22:59:08 +02005091 reused = _resource_tracker._pid in (None, pid)
5092 reused &= _resource_tracker._check_alive()
Thomas Moreau004b93e2019-04-24 21:45:52 +02005093 conn.send(reused)
5094
Pierre Glaserf22cc692019-05-10 22:59:08 +02005095 def test_resource_tracker_reused(self):
5096 from multiprocessing.resource_tracker import _resource_tracker
5097 _resource_tracker.ensure_running()
5098 pid = _resource_tracker._pid
Thomas Moreau004b93e2019-04-24 21:45:52 +02005099
5100 r, w = multiprocessing.Pipe(duplex=False)
Pierre Glaserf22cc692019-05-10 22:59:08 +02005101 p = multiprocessing.Process(target=self._is_resource_tracker_reused,
Thomas Moreau004b93e2019-04-24 21:45:52 +02005102 args=(w, pid))
5103 p.start()
Pierre Glaserf22cc692019-05-10 22:59:08 +02005104 is_resource_tracker_reused = r.recv()
Thomas Moreau004b93e2019-04-24 21:45:52 +02005105
5106 # Clean up
5107 p.join()
5108 w.close()
5109 r.close()
5110
Pierre Glaserf22cc692019-05-10 22:59:08 +02005111 self.assertTrue(is_resource_tracker_reused)
Thomas Moreau004b93e2019-04-24 21:45:52 +02005112
Antoine Pitroucbe17562017-11-03 14:31:38 +01005113
Xiang Zhang6f75bc02017-05-17 21:04:00 +08005114class TestSimpleQueue(unittest.TestCase):
5115
5116 @classmethod
5117 def _test_empty(cls, queue, child_can_start, parent_can_continue):
5118 child_can_start.wait()
5119 # issue 30301, could fail under spawn and forkserver
5120 try:
5121 queue.put(queue.empty())
5122 queue.put(queue.empty())
5123 finally:
5124 parent_can_continue.set()
5125
5126 def test_empty(self):
5127 queue = multiprocessing.SimpleQueue()
5128 child_can_start = multiprocessing.Event()
5129 parent_can_continue = multiprocessing.Event()
5130
5131 proc = multiprocessing.Process(
5132 target=self._test_empty,
5133 args=(queue, child_can_start, parent_can_continue)
5134 )
5135 proc.daemon = True
5136 proc.start()
5137
5138 self.assertTrue(queue.empty())
5139
5140 child_can_start.set()
5141 parent_can_continue.wait()
5142
5143 self.assertFalse(queue.empty())
5144 self.assertEqual(queue.get(), True)
5145 self.assertEqual(queue.get(), False)
5146 self.assertTrue(queue.empty())
5147
5148 proc.join()
5149
Derek B. Kimc40278e2018-07-11 19:22:28 +09005150
Julien Palard5d236ca2018-11-04 23:40:32 +01005151class TestPoolNotLeakOnFailure(unittest.TestCase):
5152
5153 def test_release_unused_processes(self):
5154 # Issue #19675: During pool creation, if we can't create a process,
5155 # don't leak already created ones.
5156 will_fail_in = 3
5157 forked_processes = []
5158
5159 class FailingForkProcess:
5160 def __init__(self, **kwargs):
5161 self.name = 'Fake Process'
5162 self.exitcode = None
5163 self.state = None
5164 forked_processes.append(self)
5165
5166 def start(self):
5167 nonlocal will_fail_in
5168 if will_fail_in <= 0:
5169 raise OSError("Manually induced OSError")
5170 will_fail_in -= 1
5171 self.state = 'started'
5172
5173 def terminate(self):
5174 self.state = 'stopping'
5175
5176 def join(self):
5177 if self.state == 'stopping':
5178 self.state = 'stopped'
5179
5180 def is_alive(self):
5181 return self.state == 'started' or self.state == 'stopping'
5182
5183 with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
5184 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
5185 Process=FailingForkProcess))
5186 p.close()
5187 p.join()
5188 self.assertFalse(
5189 any(process.is_alive() for process in forked_processes))
5190
5191
Giampaolo Rodola2848d9d2019-02-07 03:03:11 -08005192class TestSyncManagerTypes(unittest.TestCase):
5193 """Test all the types which can be shared between a parent and a
5194 child process by using a manager which acts as an intermediary
5195 between them.
5196
5197 In the following unit-tests the base type is created in the parent
5198 process, the @classmethod represents the worker process and the
5199 shared object is readable and editable between the two.
5200
5201 # The child.
5202 @classmethod
5203 def _test_list(cls, obj):
5204 assert obj[0] == 5
5205 assert obj.append(6)
5206
5207 # The parent.
5208 def test_list(self):
5209 o = self.manager.list()
5210 o.append(5)
5211 self.run_worker(self._test_list, o)
5212 assert o[1] == 6
5213 """
5214 manager_class = multiprocessing.managers.SyncManager
5215
5216 def setUp(self):
5217 self.manager = self.manager_class()
5218 self.manager.start()
5219 self.proc = None
5220
5221 def tearDown(self):
5222 if self.proc is not None and self.proc.is_alive():
5223 self.proc.terminate()
5224 self.proc.join()
5225 self.manager.shutdown()
Pablo Galindo613f7292019-02-09 17:08:49 +00005226 self.manager = None
5227 self.proc = None
Giampaolo Rodola2848d9d2019-02-07 03:03:11 -08005228
5229 @classmethod
5230 def setUpClass(cls):
5231 support.reap_children()
5232
5233 tearDownClass = setUpClass
5234
5235 def wait_proc_exit(self):
5236 # Only the manager process should be returned by active_children()
5237 # but this can take a bit on slow machines, so wait a few seconds
5238 # if there are other children too (see #17395).
5239 join_process(self.proc)
5240 start_time = time.monotonic()
5241 t = 0.01
5242 while len(multiprocessing.active_children()) > 1:
5243 time.sleep(t)
5244 t *= 2
5245 dt = time.monotonic() - start_time
5246 if dt >= 5.0:
5247 test.support.environment_altered = True
5248 print("Warning -- multiprocessing.Manager still has %s active "
5249 "children after %s seconds"
5250 % (multiprocessing.active_children(), dt),
5251 file=sys.stderr)
5252 break
5253
5254 def run_worker(self, worker, obj):
5255 self.proc = multiprocessing.Process(target=worker, args=(obj, ))
5256 self.proc.daemon = True
5257 self.proc.start()
5258 self.wait_proc_exit()
5259 self.assertEqual(self.proc.exitcode, 0)
5260
5261 @classmethod
Giampaolo Rodola2848d9d2019-02-07 03:03:11 -08005262 def _test_event(cls, obj):
5263 assert obj.is_set()
5264 obj.wait()
5265 obj.clear()
5266 obj.wait(0.001)
5267
5268 def test_event(self):
5269 o = self.manager.Event()
5270 o.set()
5271 self.run_worker(self._test_event, o)
5272 assert not o.is_set()
5273 o.wait(0.001)
5274
5275 @classmethod
5276 def _test_lock(cls, obj):
5277 obj.acquire()
5278
5279 def test_lock(self, lname="Lock"):
5280 o = getattr(self.manager, lname)()
5281 self.run_worker(self._test_lock, o)
5282 o.release()
5283 self.assertRaises(RuntimeError, o.release) # already released
5284
5285 @classmethod
5286 def _test_rlock(cls, obj):
5287 obj.acquire()
5288 obj.release()
5289
5290 def test_rlock(self, lname="Lock"):
5291 o = getattr(self.manager, lname)()
5292 self.run_worker(self._test_rlock, o)
5293
5294 @classmethod
5295 def _test_semaphore(cls, obj):
5296 obj.acquire()
5297
5298 def test_semaphore(self, sname="Semaphore"):
5299 o = getattr(self.manager, sname)()
5300 self.run_worker(self._test_semaphore, o)
5301 o.release()
5302
5303 def test_bounded_semaphore(self):
5304 self.test_semaphore(sname="BoundedSemaphore")
5305
5306 @classmethod
5307 def _test_condition(cls, obj):
5308 obj.acquire()
5309 obj.release()
5310
5311 def test_condition(self):
5312 o = self.manager.Condition()
5313 self.run_worker(self._test_condition, o)
5314
5315 @classmethod
5316 def _test_barrier(cls, obj):
5317 assert obj.parties == 5
5318 obj.reset()
5319
5320 def test_barrier(self):
5321 o = self.manager.Barrier(5)
5322 self.run_worker(self._test_barrier, o)
5323
5324 @classmethod
5325 def _test_pool(cls, obj):
5326 # TODO: fix https://bugs.python.org/issue35919
5327 with obj:
5328 pass
5329
5330 def test_pool(self):
5331 o = self.manager.Pool(processes=4)
5332 self.run_worker(self._test_pool, o)
5333
5334 @classmethod
Davin Pottse895de32019-02-23 22:08:16 -06005335 def _test_queue(cls, obj):
5336 assert obj.qsize() == 2
5337 assert obj.full()
5338 assert not obj.empty()
5339 assert obj.get() == 5
5340 assert not obj.empty()
5341 assert obj.get() == 6
5342 assert obj.empty()
5343
5344 def test_queue(self, qname="Queue"):
5345 o = getattr(self.manager, qname)(2)
5346 o.put(5)
5347 o.put(6)
5348 self.run_worker(self._test_queue, o)
5349 assert o.empty()
5350 assert not o.full()
5351
5352 def test_joinable_queue(self):
5353 self.test_queue("JoinableQueue")
5354
5355 @classmethod
Giampaolo Rodola2848d9d2019-02-07 03:03:11 -08005356 def _test_list(cls, obj):
5357 assert obj[0] == 5
5358 assert obj.count(5) == 1
5359 assert obj.index(5) == 0
5360 obj.sort()
5361 obj.reverse()
5362 for x in obj:
5363 pass
5364 assert len(obj) == 1
5365 assert obj.pop(0) == 5
5366
5367 def test_list(self):
5368 o = self.manager.list()
5369 o.append(5)
5370 self.run_worker(self._test_list, o)
5371 assert not o
5372 self.assertEqual(len(o), 0)
5373
5374 @classmethod
5375 def _test_dict(cls, obj):
5376 assert len(obj) == 1
5377 assert obj['foo'] == 5
5378 assert obj.get('foo') == 5
Giampaolo Rodola2848d9d2019-02-07 03:03:11 -08005379 assert list(obj.items()) == [('foo', 5)]
5380 assert list(obj.keys()) == ['foo']
5381 assert list(obj.values()) == [5]
5382 assert obj.copy() == {'foo': 5}
5383 assert obj.popitem() == ('foo', 5)
5384
5385 def test_dict(self):
5386 o = self.manager.dict()
5387 o['foo'] = 5
5388 self.run_worker(self._test_dict, o)
5389 assert not o
5390 self.assertEqual(len(o), 0)
5391
5392 @classmethod
5393 def _test_value(cls, obj):
5394 assert obj.value == 1
5395 assert obj.get() == 1
5396 obj.set(2)
5397
5398 def test_value(self):
5399 o = self.manager.Value('i', 1)
5400 self.run_worker(self._test_value, o)
5401 self.assertEqual(o.value, 2)
5402 self.assertEqual(o.get(), 2)
5403
5404 @classmethod
5405 def _test_array(cls, obj):
5406 assert obj[0] == 0
5407 assert obj[1] == 1
5408 assert len(obj) == 2
5409 assert list(obj) == [0, 1]
5410
5411 def test_array(self):
5412 o = self.manager.Array('i', [0, 1])
5413 self.run_worker(self._test_array, o)
5414
5415 @classmethod
5416 def _test_namespace(cls, obj):
5417 assert obj.x == 0
5418 assert obj.y == 1
5419
5420 def test_namespace(self):
5421 o = self.manager.Namespace()
5422 o.x = 0
5423 o.y = 1
5424 self.run_worker(self._test_namespace, o)
5425
5426
Derek B. Kimc40278e2018-07-11 19:22:28 +09005427class MiscTestCase(unittest.TestCase):
5428 def test__all__(self):
5429 # Just make sure names in blacklist are excluded
5430 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
5431 blacklist=['SUBDEBUG', 'SUBWARNING'])
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005432#
5433# Mixins
5434#
5435
Victor Stinnerffb49402017-07-25 01:55:54 +02005436class BaseMixin(object):
5437 @classmethod
5438 def setUpClass(cls):
5439 cls.dangling = (multiprocessing.process._dangling.copy(),
5440 threading._dangling.copy())
5441
5442 @classmethod
5443 def tearDownClass(cls):
5444 # bpo-26762: Some multiprocessing objects like Pool create reference
5445 # cycles. Trigger a garbage collection to break these cycles.
5446 test.support.gc_collect()
5447
5448 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
5449 if processes:
Victor Stinner957d0e92017-08-10 17:36:50 +02005450 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005451 print('Warning -- Dangling processes: %s' % processes,
5452 file=sys.stderr)
5453 processes = None
5454
5455 threads = set(threading._dangling) - set(cls.dangling[1])
5456 if threads:
Victor Stinner957d0e92017-08-10 17:36:50 +02005457 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005458 print('Warning -- Dangling threads: %s' % threads,
5459 file=sys.stderr)
5460 threads = None
5461
5462
5463class ProcessesMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005464 TYPE = 'processes'
5465 Process = multiprocessing.Process
5466 connection = multiprocessing.connection
5467 current_process = staticmethod(multiprocessing.current_process)
Thomas Moreauc09a9f52019-05-20 21:37:05 +02005468 parent_process = staticmethod(multiprocessing.parent_process)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005469 active_children = staticmethod(multiprocessing.active_children)
5470 Pool = staticmethod(multiprocessing.Pool)
5471 Pipe = staticmethod(multiprocessing.Pipe)
5472 Queue = staticmethod(multiprocessing.Queue)
5473 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
5474 Lock = staticmethod(multiprocessing.Lock)
5475 RLock = staticmethod(multiprocessing.RLock)
5476 Semaphore = staticmethod(multiprocessing.Semaphore)
5477 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
5478 Condition = staticmethod(multiprocessing.Condition)
5479 Event = staticmethod(multiprocessing.Event)
5480 Barrier = staticmethod(multiprocessing.Barrier)
5481 Value = staticmethod(multiprocessing.Value)
5482 Array = staticmethod(multiprocessing.Array)
5483 RawValue = staticmethod(multiprocessing.RawValue)
5484 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00005485
Benjamin Petersone711caf2008-06-11 16:44:04 +00005486
Victor Stinnerffb49402017-07-25 01:55:54 +02005487class ManagerMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005488 TYPE = 'manager'
5489 Process = multiprocessing.Process
5490 Queue = property(operator.attrgetter('manager.Queue'))
5491 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
5492 Lock = property(operator.attrgetter('manager.Lock'))
5493 RLock = property(operator.attrgetter('manager.RLock'))
5494 Semaphore = property(operator.attrgetter('manager.Semaphore'))
5495 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
5496 Condition = property(operator.attrgetter('manager.Condition'))
5497 Event = property(operator.attrgetter('manager.Event'))
5498 Barrier = property(operator.attrgetter('manager.Barrier'))
5499 Value = property(operator.attrgetter('manager.Value'))
5500 Array = property(operator.attrgetter('manager.Array'))
5501 list = property(operator.attrgetter('manager.list'))
5502 dict = property(operator.attrgetter('manager.dict'))
5503 Namespace = property(operator.attrgetter('manager.Namespace'))
5504
5505 @classmethod
5506 def Pool(cls, *args, **kwds):
5507 return cls.manager.Pool(*args, **kwds)
5508
5509 @classmethod
5510 def setUpClass(cls):
Victor Stinnerffb49402017-07-25 01:55:54 +02005511 super().setUpClass()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005512 cls.manager = multiprocessing.Manager()
5513
5514 @classmethod
5515 def tearDownClass(cls):
5516 # only the manager process should be returned by active_children()
5517 # but this can take a bit on slow machines, so wait a few seconds
5518 # if there are other children too (see #17395)
Victor Stinnerffb49402017-07-25 01:55:54 +02005519 start_time = time.monotonic()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005520 t = 0.01
Victor Stinnerffb49402017-07-25 01:55:54 +02005521 while len(multiprocessing.active_children()) > 1:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005522 time.sleep(t)
5523 t *= 2
Victor Stinnerffb49402017-07-25 01:55:54 +02005524 dt = time.monotonic() - start_time
5525 if dt >= 5.0:
Victor Stinner957d0e92017-08-10 17:36:50 +02005526 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005527 print("Warning -- multiprocessing.Manager still has %s active "
5528 "children after %s seconds"
5529 % (multiprocessing.active_children(), dt),
5530 file=sys.stderr)
5531 break
5532
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005533 gc.collect() # do garbage collection
5534 if cls.manager._number_of_objects() != 0:
5535 # This is not really an error since some tests do not
5536 # ensure that all processes which hold a reference to a
5537 # managed object have been joined.
Victor Stinner957d0e92017-08-10 17:36:50 +02005538 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005539 print('Warning -- Shared objects which still exist at manager '
5540 'shutdown:')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005541 print(cls.manager._debug_info())
5542 cls.manager.shutdown()
5543 cls.manager.join()
5544 cls.manager = None
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01005545
Victor Stinnerffb49402017-07-25 01:55:54 +02005546 super().tearDownClass()
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01005547
Victor Stinnerffb49402017-07-25 01:55:54 +02005548
5549class ThreadsMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005550 TYPE = 'threads'
5551 Process = multiprocessing.dummy.Process
5552 connection = multiprocessing.dummy.connection
5553 current_process = staticmethod(multiprocessing.dummy.current_process)
5554 active_children = staticmethod(multiprocessing.dummy.active_children)
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01005555 Pool = staticmethod(multiprocessing.dummy.Pool)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005556 Pipe = staticmethod(multiprocessing.dummy.Pipe)
5557 Queue = staticmethod(multiprocessing.dummy.Queue)
5558 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
5559 Lock = staticmethod(multiprocessing.dummy.Lock)
5560 RLock = staticmethod(multiprocessing.dummy.RLock)
5561 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
5562 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
5563 Condition = staticmethod(multiprocessing.dummy.Condition)
5564 Event = staticmethod(multiprocessing.dummy.Event)
5565 Barrier = staticmethod(multiprocessing.dummy.Barrier)
5566 Value = staticmethod(multiprocessing.dummy.Value)
5567 Array = staticmethod(multiprocessing.dummy.Array)
5568
5569#
5570# Functions used to create test cases from the base ones in this module
5571#
5572
5573def install_tests_in_module_dict(remote_globs, start_method):
5574 __module__ = remote_globs['__name__']
5575 local_globs = globals()
5576 ALL_TYPES = {'processes', 'threads', 'manager'}
5577
5578 for name, base in local_globs.items():
5579 if not isinstance(base, type):
5580 continue
5581 if issubclass(base, BaseTestCase):
5582 if base is BaseTestCase:
5583 continue
5584 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
5585 for type_ in base.ALLOWED_TYPES:
5586 newname = 'With' + type_.capitalize() + name[1:]
5587 Mixin = local_globs[type_.capitalize() + 'Mixin']
5588 class Temp(base, Mixin, unittest.TestCase):
5589 pass
5590 Temp.__name__ = Temp.__qualname__ = newname
5591 Temp.__module__ = __module__
5592 remote_globs[newname] = Temp
5593 elif issubclass(base, unittest.TestCase):
5594 class Temp(base, object):
5595 pass
5596 Temp.__name__ = Temp.__qualname__ = name
5597 Temp.__module__ = __module__
5598 remote_globs[name] = Temp
5599
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01005600 dangling = [None, None]
5601 old_start_method = [None]
5602
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005603 def setUpModule():
5604 multiprocessing.set_forkserver_preload(PRELOAD)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01005605 multiprocessing.process._cleanup()
5606 dangling[0] = multiprocessing.process._dangling.copy()
5607 dangling[1] = threading._dangling.copy()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01005608 old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005609 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01005610 multiprocessing.set_start_method(start_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005611 except ValueError:
5612 raise unittest.SkipTest(start_method +
5613 ' start method not supported')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005614
5615 if sys.platform.startswith("linux"):
5616 try:
5617 lock = multiprocessing.RLock()
5618 except OSError:
5619 raise unittest.SkipTest("OSError raises on RLock creation, "
5620 "see issue 3111!")
5621 check_enough_semaphores()
5622 util.get_temp_dir() # creates temp directory
5623 multiprocessing.get_logger().setLevel(LOG_LEVEL)
5624
5625 def tearDownModule():
Victor Stinnerffb49402017-07-25 01:55:54 +02005626 need_sleep = False
5627
5628 # bpo-26762: Some multiprocessing objects like Pool create reference
5629 # cycles. Trigger a garbage collection to break these cycles.
5630 test.support.gc_collect()
5631
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01005632 multiprocessing.set_start_method(old_start_method[0], force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005633 # pause a bit so we don't get warning about dangling threads/processes
Victor Stinnerffb49402017-07-25 01:55:54 +02005634 processes = set(multiprocessing.process._dangling) - set(dangling[0])
5635 if processes:
5636 need_sleep = True
Victor Stinner957d0e92017-08-10 17:36:50 +02005637 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005638 print('Warning -- Dangling processes: %s' % processes,
5639 file=sys.stderr)
5640 processes = None
5641
5642 threads = set(threading._dangling) - set(dangling[1])
5643 if threads:
5644 need_sleep = True
Victor Stinner957d0e92017-08-10 17:36:50 +02005645 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02005646 print('Warning -- Dangling threads: %s' % threads,
5647 file=sys.stderr)
5648 threads = None
5649
5650 # Sleep 500 ms to give time to child processes to complete.
5651 if need_sleep:
5652 time.sleep(0.5)
Miss Islington (bot)229f6e82019-07-05 07:35:38 -07005653
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01005654 multiprocessing.process._cleanup()
Miss Islington (bot)229f6e82019-07-05 07:35:38 -07005655
5656 # Stop the ForkServer process if it's running
5657 from multiprocessing import forkserver
5658 forkserver._forkserver._stop()
5659
Miss Islington (bot)632cb362019-07-02 04:58:05 -07005660 # bpo-37421: Explicitly call _run_finalizers() to remove immediately
5661 # temporary directories created by multiprocessing.util.get_temp_dir().
5662 multiprocessing.util._run_finalizers()
Victor Stinnerffb49402017-07-25 01:55:54 +02005663 test.support.gc_collect()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01005664
5665 remote_globs['setUpModule'] = setUpModule
5666 remote_globs['tearDownModule'] = tearDownModule