blob: 8e004e21a4fd5ad7841bfc2fe2580cfe70e3c4e3 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00006import queue as pyqueue
7import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00008import io
Antoine Pitroude911b22011-12-21 11:03:24 +01009import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000010import sys
11import os
12import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020013import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000014import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010019import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010020import operator
Antoine Pitrou89889452017-03-24 13:52:11 +010021import weakref
R. David Murraya21e4ca2009-03-31 23:16:50 +000022import test.support
Berker Peksag076dbd02015-05-06 07:01:52 +030023import test.support.script_helper
Victor Stinnerb9b69002017-09-14 14:40:56 -070024from test import support
Benjamin Petersone711caf2008-06-11 16:44:04 +000025
Benjamin Petersone5384b02008-10-04 22:00:42 +000026
R. David Murraya21e4ca2009-03-31 23:16:50 +000027# Skip tests if _multiprocessing wasn't built.
28_multiprocessing = test.support.import_module('_multiprocessing')
29# Skip tests if sem_open implementation is broken.
30test.support.import_module('multiprocessing.synchronize')
Raymond Hettinger15f44ab2016-08-30 10:47:49 -070031# import threading after _multiprocessing to raise a more relevant error
Victor Stinner45df8202010-04-28 22:31:17 +000032# message: "No module named _multiprocessing". _multiprocessing is not compiled
33# without thread support.
34import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000035
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.connection
Victor Stinnerd7e64d92017-07-25 00:33:56 +020037import multiprocessing.dummy
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import multiprocessing.heap
Victor Stinnerd7e64d92017-07-25 00:33:56 +020039import multiprocessing.managers
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import multiprocessing.pool
Victor Stinnerd7e64d92017-07-25 00:33:56 +020041import multiprocessing.queues
Benjamin Petersone711caf2008-06-11 16:44:04 +000042
Charles-François Natalibc8f0822011-09-20 20:36:51 +020043from multiprocessing import util
44
45try:
46 from multiprocessing import reduction
Richard Oudkerk84ed9a62013-08-14 15:35:41 +010047 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
Charles-François Natalibc8f0822011-09-20 20:36:51 +020048except ImportError:
49 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000050
Brian Curtinafa88b52010-10-07 01:12:19 +000051try:
52 from multiprocessing.sharedctypes import Value, copy
53 HAS_SHAREDCTYPES = True
54except ImportError:
55 HAS_SHAREDCTYPES = False
56
Antoine Pitroubcb39d42011-08-23 19:46:22 +020057try:
58 import msvcrt
59except ImportError:
60 msvcrt = None
61
Benjamin Petersone711caf2008-06-11 16:44:04 +000062#
63#
64#
65
Victor Stinner11f08072017-09-15 06:55:31 -070066# Timeout to wait until a process completes
67TIMEOUT = 30.0 # seconds
68
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000069def latin(s):
70 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
Victor Stinnerd7e64d92017-07-25 00:33:56 +020072
73def close_queue(queue):
74 if isinstance(queue, multiprocessing.queues.Queue):
75 queue.close()
76 queue.join_thread()
77
78
Victor Stinner11f08072017-09-15 06:55:31 -070079def join_process(process):
Victor Stinnerb9b69002017-09-14 14:40:56 -070080 # Since multiprocessing.Process has the same API than threading.Thread
81 # (join() and is_alive(), the support function can be reused
Victor Stinner11f08072017-09-15 06:55:31 -070082 support.join_thread(process, timeout=TIMEOUT)
Victor Stinnerb9b69002017-09-14 14:40:56 -070083
84
Benjamin Petersone711caf2008-06-11 16:44:04 +000085#
86# Constants
87#
88
89LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000090#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000091
92DELTA = 0.1
93CHECK_TIMINGS = False # making true makes tests take a lot longer
94 # and can sometimes cause some non-serious
95 # failures because some calls block a bit
96 # longer than expected
97if CHECK_TIMINGS:
98 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
99else:
100 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
101
102HAVE_GETVALUE = not getattr(_multiprocessing,
103 'HAVE_BROKEN_SEM_GETVALUE', False)
104
Jesse Noller6214edd2009-01-19 16:23:53 +0000105WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200106
Richard Oudkerk59d54042012-05-10 16:11:12 +0100107from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200108
Richard Oudkerk59d54042012-05-10 16:11:12 +0100109def wait_for_handle(handle, timeout):
110 if timeout is not None and timeout < 0.0:
111 timeout = None
112 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +0000113
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200114try:
115 MAXFD = os.sysconf("SC_OPEN_MAX")
116except:
117 MAXFD = 256
118
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100119# To speed up tests when using the forkserver, we can preload these:
120PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
121
Benjamin Petersone711caf2008-06-11 16:44:04 +0000122#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000123# Some tests require ctypes
124#
125
126try:
Gareth Rees3913bad2017-07-21 11:35:33 +0100127 from ctypes import Structure, c_int, c_double, c_longlong
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000128except ImportError:
129 Structure = object
Antoine Pitrouff92ff52017-07-21 13:24:05 +0200130 c_int = c_double = c_longlong = None
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000131
Charles-François Natali221ef672011-11-22 18:55:22 +0100132
133def check_enough_semaphores():
134 """Check that the system supports enough semaphores to run the test."""
135 # minimum number of semaphores available according to POSIX
136 nsems_min = 256
137 try:
138 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
139 except (AttributeError, ValueError):
140 # sysconf not available or setting not available
141 return
142 if nsems == -1 or nsems >= nsems_min:
143 return
144 raise unittest.SkipTest("The OS doesn't support enough semaphores "
145 "to run the test (required: %d)." % nsems_min)
146
147
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000148#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000149# Creates a wrapper for a function which records the time it takes to finish
150#
151
152class TimingWrapper(object):
153
154 def __init__(self, func):
155 self.func = func
156 self.elapsed = None
157
158 def __call__(self, *args, **kwds):
159 t = time.time()
160 try:
161 return self.func(*args, **kwds)
162 finally:
163 self.elapsed = time.time() - t
164
165#
166# Base class for test cases
167#
168
169class BaseTestCase(object):
170
171 ALLOWED_TYPES = ('processes', 'manager', 'threads')
172
173 def assertTimingAlmostEqual(self, a, b):
174 if CHECK_TIMINGS:
175 self.assertAlmostEqual(a, b, 1)
176
177 def assertReturnsIfImplemented(self, value, func, *args):
178 try:
179 res = func(*args)
180 except NotImplementedError:
181 pass
182 else:
183 return self.assertEqual(value, res)
184
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000185 # For the sanity of Windows users, rather than crashing or freezing in
186 # multiple ways.
187 def __reduce__(self, *args):
188 raise NotImplementedError("shouldn't try to pickle a test case")
189
190 __reduce_ex__ = __reduce__
191
Benjamin Petersone711caf2008-06-11 16:44:04 +0000192#
193# Return the value of a semaphore
194#
195
196def get_value(self):
197 try:
198 return self.get_value()
199 except AttributeError:
200 try:
201 return self._Semaphore__value
202 except AttributeError:
203 try:
204 return self._value
205 except AttributeError:
206 raise NotImplementedError
207
208#
209# Testcases
210#
211
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200212class DummyCallable:
213 def __call__(self, q, c):
214 assert isinstance(c, DummyCallable)
215 q.put(5)
216
217
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218class _TestProcess(BaseTestCase):
219
220 ALLOWED_TYPES = ('processes', 'threads')
221
222 def test_current(self):
223 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600224 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228
229 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000230 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000231 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000233 self.assertEqual(current.ident, os.getpid())
234 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000235
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000236 def test_daemon_argument(self):
237 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600238 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000239
240 # By default uses the current process's daemon flag.
241 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000242 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000243 proc1 = self.Process(target=self._test, daemon=True)
244 self.assertTrue(proc1.daemon)
245 proc2 = self.Process(target=self._test, daemon=False)
246 self.assertFalse(proc2.daemon)
247
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000248 @classmethod
249 def _test(cls, q, *args, **kwds):
250 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251 q.put(args)
252 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000253 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000254 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000255 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 q.put(current.pid)
257
258 def test_process(self):
259 q = self.Queue(1)
260 e = self.Event()
261 args = (q, 1, 2)
262 kwargs = {'hello':23, 'bye':2.54}
263 name = 'SomeProcess'
264 p = self.Process(
265 target=self._test, args=args, kwargs=kwargs, name=name
266 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000267 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 current = self.current_process()
269
270 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000271 self.assertEqual(p.authkey, current.authkey)
272 self.assertEqual(p.is_alive(), False)
273 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000274 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000276 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
278 p.start()
279
Ezio Melottib3aedd42010-11-20 19:04:17 +0000280 self.assertEqual(p.exitcode, None)
281 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000282 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000283
Ezio Melottib3aedd42010-11-20 19:04:17 +0000284 self.assertEqual(q.get(), args[1:])
285 self.assertEqual(q.get(), kwargs)
286 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000287 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000288 self.assertEqual(q.get(), current.authkey)
289 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000290
291 p.join()
292
Ezio Melottib3aedd42010-11-20 19:04:17 +0000293 self.assertEqual(p.exitcode, 0)
294 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000295 self.assertNotIn(p, self.active_children())
Victor Stinnerb4c52962017-07-25 02:40:55 +0200296 close_queue(q)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000298 @classmethod
Vitor Pereiraba75af72017-07-18 16:34:23 +0100299 def _sleep_some(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100300 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000301
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200302 @classmethod
303 def _test_sleep(cls, delay):
304 time.sleep(delay)
305
Vitor Pereiraba75af72017-07-18 16:34:23 +0100306 def _kill_process(self, meth):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600308 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000309
Vitor Pereiraba75af72017-07-18 16:34:23 +0100310 p = self.Process(target=self._sleep_some)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000311 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000312 p.start()
313
314 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000315 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000316 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000317
Richard Oudkerk59d54042012-05-10 16:11:12 +0100318 join = TimingWrapper(p.join)
319
320 self.assertEqual(join(0), None)
321 self.assertTimingAlmostEqual(join.elapsed, 0.0)
322 self.assertEqual(p.is_alive(), True)
323
324 self.assertEqual(join(-1), None)
325 self.assertTimingAlmostEqual(join.elapsed, 0.0)
326 self.assertEqual(p.is_alive(), True)
327
Richard Oudkerk26f92682013-10-17 13:56:18 +0100328 # XXX maybe terminating too soon causes the problems on Gentoo...
329 time.sleep(1)
330
Vitor Pereiraba75af72017-07-18 16:34:23 +0100331 meth(p)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000332
Richard Oudkerk4f350792013-10-13 00:49:27 +0100333 if hasattr(signal, 'alarm'):
Richard Oudkerkd44500a2013-10-17 10:38:37 +0100334 # On the Gentoo buildbot waitpid() often seems to block forever.
Richard Oudkerk26f92682013-10-17 13:56:18 +0100335 # We use alarm() to interrupt it if it blocks for too long.
Richard Oudkerk4f350792013-10-13 00:49:27 +0100336 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100337 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100338 old_handler = signal.signal(signal.SIGALRM, handler)
339 try:
340 signal.alarm(10)
341 self.assertEqual(join(), None)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100342 finally:
Richard Oudkerk1e2f67c2013-10-17 14:24:06 +0100343 signal.alarm(0)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100344 signal.signal(signal.SIGALRM, old_handler)
345 else:
346 self.assertEqual(join(), None)
347
Benjamin Petersone711caf2008-06-11 16:44:04 +0000348 self.assertTimingAlmostEqual(join.elapsed, 0.0)
349
350 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000351 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000352
353 p.join()
354
Vitor Pereiraba75af72017-07-18 16:34:23 +0100355 return p.exitcode
356
357 def test_terminate(self):
358 exitcode = self._kill_process(multiprocessing.Process.terminate)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200359 if os.name != 'nt':
Vitor Pereiraba75af72017-07-18 16:34:23 +0100360 self.assertEqual(exitcode, -signal.SIGTERM)
361
362 def test_kill(self):
363 exitcode = self._kill_process(multiprocessing.Process.kill)
364 if os.name != 'nt':
365 self.assertEqual(exitcode, -signal.SIGKILL)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000366
367 def test_cpu_count(self):
368 try:
369 cpus = multiprocessing.cpu_count()
370 except NotImplementedError:
371 cpus = 1
372 self.assertTrue(type(cpus) is int)
373 self.assertTrue(cpus >= 1)
374
375 def test_active_children(self):
376 self.assertEqual(type(self.active_children()), list)
377
378 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000379 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000380
Jesus Cea94f964f2011-09-09 20:26:57 +0200381 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000382 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000383 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000384
385 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000386 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000387
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000388 @classmethod
389 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000390 wconn.send(id)
391 if len(id) < 2:
392 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000393 p = cls.Process(
394 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395 )
396 p.start()
397 p.join()
398
399 def test_recursion(self):
400 rconn, wconn = self.Pipe(duplex=False)
401 self._test_recursion(wconn, [])
402
403 time.sleep(DELTA)
404 result = []
405 while rconn.poll():
406 result.append(rconn.recv())
407
408 expected = [
409 [],
410 [0],
411 [0, 0],
412 [0, 1],
413 [1],
414 [1, 0],
415 [1, 1]
416 ]
417 self.assertEqual(result, expected)
418
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200419 @classmethod
420 def _test_sentinel(cls, event):
421 event.wait(10.0)
422
423 def test_sentinel(self):
424 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600425 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200426 event = self.Event()
427 p = self.Process(target=self._test_sentinel, args=(event,))
428 with self.assertRaises(ValueError):
429 p.sentinel
430 p.start()
431 self.addCleanup(p.join)
432 sentinel = p.sentinel
433 self.assertIsInstance(sentinel, int)
434 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
435 event.set()
436 p.join()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +0100437 self.assertTrue(wait_for_handle(sentinel, timeout=1))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200438
Antoine Pitrou13e96cc2017-06-24 19:22:23 +0200439 @classmethod
440 def _test_close(cls, rc=0, q=None):
441 if q is not None:
442 q.get()
443 sys.exit(rc)
444
445 def test_close(self):
446 if self.TYPE == "threads":
447 self.skipTest('test not appropriate for {}'.format(self.TYPE))
448 q = self.Queue()
449 p = self.Process(target=self._test_close, kwargs={'q': q})
450 p.daemon = True
451 p.start()
452 self.assertEqual(p.is_alive(), True)
453 # Child is still alive, cannot close
454 with self.assertRaises(ValueError):
455 p.close()
456
457 q.put(None)
458 p.join()
459 self.assertEqual(p.is_alive(), False)
460 self.assertEqual(p.exitcode, 0)
461 p.close()
462 with self.assertRaises(ValueError):
463 p.is_alive()
464 with self.assertRaises(ValueError):
465 p.join()
466 with self.assertRaises(ValueError):
467 p.terminate()
468 p.close()
469
470 wr = weakref.ref(p)
471 del p
472 gc.collect()
473 self.assertIs(wr(), None)
474
Victor Stinnerb4c52962017-07-25 02:40:55 +0200475 close_queue(q)
476
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200477 def test_many_processes(self):
478 if self.TYPE == 'threads':
479 self.skipTest('test not appropriate for {}'.format(self.TYPE))
480
481 sm = multiprocessing.get_start_method()
482 N = 5 if sm == 'spawn' else 100
483
484 # Try to overwhelm the forkserver loop with events
485 procs = [self.Process(target=self._test_sleep, args=(0.01,))
486 for i in range(N)]
487 for p in procs:
488 p.start()
489 for p in procs:
Victor Stinner11f08072017-09-15 06:55:31 -0700490 join_process(p)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200491 for p in procs:
492 self.assertEqual(p.exitcode, 0)
493
Vitor Pereiraba75af72017-07-18 16:34:23 +0100494 procs = [self.Process(target=self._sleep_some)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200495 for i in range(N)]
496 for p in procs:
497 p.start()
498 time.sleep(0.001) # let the children start...
499 for p in procs:
500 p.terminate()
501 for p in procs:
Victor Stinner11f08072017-09-15 06:55:31 -0700502 join_process(p)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200503 if os.name != 'nt':
Victor Stinnere6cfdef2017-10-02 08:27:34 -0700504 exitcodes = [-signal.SIGTERM]
505 if sys.platform == 'darwin':
506 # bpo-31510: On macOS, killing a freshly started process with
507 # SIGTERM sometimes kills the process with SIGKILL.
508 exitcodes.append(-signal.SIGKILL)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200509 for p in procs:
Victor Stinnere6cfdef2017-10-02 08:27:34 -0700510 self.assertIn(p.exitcode, exitcodes)
Antoine Pitroudfd5f342017-06-12 15:28:19 +0200511
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200512 def test_lose_target_ref(self):
513 c = DummyCallable()
514 wr = weakref.ref(c)
515 q = self.Queue()
516 p = self.Process(target=c, args=(q, c))
517 del c
518 p.start()
519 p.join()
520 self.assertIs(wr(), None)
521 self.assertEqual(q.get(), 5)
Victor Stinnerb4c52962017-07-25 02:40:55 +0200522 close_queue(q)
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200523
Antoine Pitrou896145d2017-07-22 13:22:54 +0200524 @classmethod
525 def _test_child_fd_inflation(self, evt, q):
526 q.put(test.support.fd_count())
527 evt.wait()
528
529 def test_child_fd_inflation(self):
530 # Number of fds in child processes should not grow with the
531 # number of running children.
532 if self.TYPE == 'threads':
533 self.skipTest('test not appropriate for {}'.format(self.TYPE))
534
535 sm = multiprocessing.get_start_method()
536 if sm == 'fork':
537 # The fork method by design inherits all fds from the parent,
538 # trying to go against it is a lost battle
539 self.skipTest('test not appropriate for {}'.format(sm))
540
541 N = 5
542 evt = self.Event()
543 q = self.Queue()
544
545 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
546 for i in range(N)]
547 for p in procs:
548 p.start()
549
550 try:
551 fd_counts = [q.get() for i in range(N)]
552 self.assertEqual(len(set(fd_counts)), 1, fd_counts)
553
554 finally:
555 evt.set()
556 for p in procs:
557 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200558 close_queue(q)
Antoine Pitrou79d37ae2017-06-28 12:29:08 +0200559
Antoine Pitrouee84a602017-08-16 20:53:28 +0200560 @classmethod
561 def _test_wait_for_threads(self, evt):
562 def func1():
563 time.sleep(0.5)
564 evt.set()
565
566 def func2():
567 time.sleep(20)
568 evt.clear()
569
570 threading.Thread(target=func1).start()
571 threading.Thread(target=func2, daemon=True).start()
572
573 def test_wait_for_threads(self):
574 # A child process should wait for non-daemonic threads to end
575 # before exiting
576 if self.TYPE == 'threads':
577 self.skipTest('test not appropriate for {}'.format(self.TYPE))
578
579 evt = self.Event()
580 proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
581 proc.start()
582 proc.join()
583 self.assertTrue(evt.is_set())
584
585
Benjamin Petersone711caf2008-06-11 16:44:04 +0000586#
587#
588#
589
590class _UpperCaser(multiprocessing.Process):
591
592 def __init__(self):
593 multiprocessing.Process.__init__(self)
594 self.child_conn, self.parent_conn = multiprocessing.Pipe()
595
596 def run(self):
597 self.parent_conn.close()
598 for s in iter(self.child_conn.recv, None):
599 self.child_conn.send(s.upper())
600 self.child_conn.close()
601
602 def submit(self, s):
603 assert type(s) is str
604 self.parent_conn.send(s)
605 return self.parent_conn.recv()
606
607 def stop(self):
608 self.parent_conn.send(None)
609 self.parent_conn.close()
610 self.child_conn.close()
611
612class _TestSubclassingProcess(BaseTestCase):
613
614 ALLOWED_TYPES = ('processes',)
615
616 def test_subclassing(self):
617 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200618 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000619 uppercaser.start()
620 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
621 self.assertEqual(uppercaser.submit('world'), 'WORLD')
622 uppercaser.stop()
623 uppercaser.join()
624
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100625 def test_stderr_flush(self):
626 # sys.stderr is flushed at process shutdown (issue #13812)
627 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600628 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100629
630 testfn = test.support.TESTFN
631 self.addCleanup(test.support.unlink, testfn)
632 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
633 proc.start()
634 proc.join()
635 with open(testfn, 'r') as f:
636 err = f.read()
637 # The whole traceback was printed
638 self.assertIn("ZeroDivisionError", err)
639 self.assertIn("test_multiprocessing.py", err)
640 self.assertIn("1/0 # MARKER", err)
641
642 @classmethod
643 def _test_stderr_flush(cls, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100644 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
645 sys.stderr = open(fd, 'w', closefd=False)
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100646 1/0 # MARKER
647
648
Richard Oudkerk29471de2012-06-06 19:04:57 +0100649 @classmethod
650 def _test_sys_exit(cls, reason, testfn):
Victor Stinnera6d865c2016-03-25 09:29:50 +0100651 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
652 sys.stderr = open(fd, 'w', closefd=False)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100653 sys.exit(reason)
654
655 def test_sys_exit(self):
656 # See Issue 13854
657 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600658 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk29471de2012-06-06 19:04:57 +0100659
660 testfn = test.support.TESTFN
661 self.addCleanup(test.support.unlink, testfn)
662
Victor Stinnera6d865c2016-03-25 09:29:50 +0100663 for reason in (
664 [1, 2, 3],
665 'ignore this',
666 ):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100667 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
668 p.daemon = True
669 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -0700670 join_process(p)
Victor Stinnera6d865c2016-03-25 09:29:50 +0100671 self.assertEqual(p.exitcode, 1)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100672
673 with open(testfn, 'r') as f:
Victor Stinnera6d865c2016-03-25 09:29:50 +0100674 content = f.read()
675 self.assertEqual(content.rstrip(), str(reason))
676
677 os.unlink(testfn)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100678
679 for reason in (True, False, 8):
680 p = self.Process(target=sys.exit, args=(reason,))
681 p.daemon = True
682 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -0700683 join_process(p)
Richard Oudkerk29471de2012-06-06 19:04:57 +0100684 self.assertEqual(p.exitcode, reason)
685
Benjamin Petersone711caf2008-06-11 16:44:04 +0000686#
687#
688#
689
690def queue_empty(q):
691 if hasattr(q, 'empty'):
692 return q.empty()
693 else:
694 return q.qsize() == 0
695
696def queue_full(q, maxsize):
697 if hasattr(q, 'full'):
698 return q.full()
699 else:
700 return q.qsize() == maxsize
701
702
703class _TestQueue(BaseTestCase):
704
705
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000706 @classmethod
707 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000708 child_can_start.wait()
709 for i in range(6):
710 queue.get()
711 parent_can_continue.set()
712
713 def test_put(self):
714 MAXSIZE = 6
715 queue = self.Queue(maxsize=MAXSIZE)
716 child_can_start = self.Event()
717 parent_can_continue = self.Event()
718
719 proc = self.Process(
720 target=self._test_put,
721 args=(queue, child_can_start, parent_can_continue)
722 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000723 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000724 proc.start()
725
726 self.assertEqual(queue_empty(queue), True)
727 self.assertEqual(queue_full(queue, MAXSIZE), False)
728
729 queue.put(1)
730 queue.put(2, True)
731 queue.put(3, True, None)
732 queue.put(4, False)
733 queue.put(5, False, None)
734 queue.put_nowait(6)
735
736 # the values may be in buffer but not yet in pipe so sleep a bit
737 time.sleep(DELTA)
738
739 self.assertEqual(queue_empty(queue), False)
740 self.assertEqual(queue_full(queue, MAXSIZE), True)
741
742 put = TimingWrapper(queue.put)
743 put_nowait = TimingWrapper(queue.put_nowait)
744
745 self.assertRaises(pyqueue.Full, put, 7, False)
746 self.assertTimingAlmostEqual(put.elapsed, 0)
747
748 self.assertRaises(pyqueue.Full, put, 7, False, None)
749 self.assertTimingAlmostEqual(put.elapsed, 0)
750
751 self.assertRaises(pyqueue.Full, put_nowait, 7)
752 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
753
754 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
755 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
756
757 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
758 self.assertTimingAlmostEqual(put.elapsed, 0)
759
760 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
761 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
762
763 child_can_start.set()
764 parent_can_continue.wait()
765
766 self.assertEqual(queue_empty(queue), True)
767 self.assertEqual(queue_full(queue, MAXSIZE), False)
768
769 proc.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200770 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000771
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000772 @classmethod
773 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000774 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000775 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000776 queue.put(2)
777 queue.put(3)
778 queue.put(4)
779 queue.put(5)
780 parent_can_continue.set()
781
782 def test_get(self):
783 queue = self.Queue()
784 child_can_start = self.Event()
785 parent_can_continue = self.Event()
786
787 proc = self.Process(
788 target=self._test_get,
789 args=(queue, child_can_start, parent_can_continue)
790 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000791 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000792 proc.start()
793
794 self.assertEqual(queue_empty(queue), True)
795
796 child_can_start.set()
797 parent_can_continue.wait()
798
799 time.sleep(DELTA)
800 self.assertEqual(queue_empty(queue), False)
801
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000802 # Hangs unexpectedly, remove for now
803 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000804 self.assertEqual(queue.get(True, None), 2)
805 self.assertEqual(queue.get(True), 3)
806 self.assertEqual(queue.get(timeout=1), 4)
807 self.assertEqual(queue.get_nowait(), 5)
808
809 self.assertEqual(queue_empty(queue), True)
810
811 get = TimingWrapper(queue.get)
812 get_nowait = TimingWrapper(queue.get_nowait)
813
814 self.assertRaises(pyqueue.Empty, get, False)
815 self.assertTimingAlmostEqual(get.elapsed, 0)
816
817 self.assertRaises(pyqueue.Empty, get, False, None)
818 self.assertTimingAlmostEqual(get.elapsed, 0)
819
820 self.assertRaises(pyqueue.Empty, get_nowait)
821 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
822
823 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
824 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
825
826 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
827 self.assertTimingAlmostEqual(get.elapsed, 0)
828
829 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
830 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
831
832 proc.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200833 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000834
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000835 @classmethod
836 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000837 for i in range(10, 20):
838 queue.put(i)
839 # note that at this point the items may only be buffered, so the
840 # process cannot shutdown until the feeder thread has finished
841 # pushing items onto the pipe.
842
843 def test_fork(self):
844 # Old versions of Queue would fail to create a new feeder
845 # thread for a forked process if the original process had its
846 # own feeder thread. This test checks that this no longer
847 # happens.
848
849 queue = self.Queue()
850
851 # put items on queue so that main process starts a feeder thread
852 for i in range(10):
853 queue.put(i)
854
855 # wait to make sure thread starts before we fork a new process
856 time.sleep(DELTA)
857
858 # fork process
859 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200860 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000861 p.start()
862
863 # check that all expected items are in the queue
864 for i in range(20):
865 self.assertEqual(queue.get(), i)
866 self.assertRaises(pyqueue.Empty, queue.get, False)
867
868 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200869 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000870
871 def test_qsize(self):
872 q = self.Queue()
873 try:
874 self.assertEqual(q.qsize(), 0)
875 except NotImplementedError:
Zachary Ware9fe6d862013-12-08 00:20:35 -0600876 self.skipTest('qsize method not implemented')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000877 q.put(1)
878 self.assertEqual(q.qsize(), 1)
879 q.put(5)
880 self.assertEqual(q.qsize(), 2)
881 q.get()
882 self.assertEqual(q.qsize(), 1)
883 q.get()
884 self.assertEqual(q.qsize(), 0)
Victor Stinnerd7e64d92017-07-25 00:33:56 +0200885 close_queue(q)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000886
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000887 @classmethod
888 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000889 for obj in iter(q.get, None):
890 time.sleep(DELTA)
891 q.task_done()
892
893 def test_task_done(self):
894 queue = self.JoinableQueue()
895
Benjamin Petersone711caf2008-06-11 16:44:04 +0000896 workers = [self.Process(target=self._test_task_done, args=(queue,))
897 for i in range(4)]
898
899 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200900 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000901 p.start()
902
903 for i in range(10):
904 queue.put(i)
905
906 queue.join()
907
908 for p in workers:
909 queue.put(None)
910
911 for p in workers:
912 p.join()
Victor Stinnerb4c52962017-07-25 02:40:55 +0200913 close_queue(queue)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000914
Serhiy Storchakaf8904e92015-03-06 23:32:54 +0200915 def test_no_import_lock_contention(self):
916 with test.support.temp_cwd():
917 module_name = 'imported_by_an_imported_module'
918 with open(module_name + '.py', 'w') as f:
919 f.write("""if 1:
920 import multiprocessing
921
922 q = multiprocessing.Queue()
923 q.put('knock knock')
924 q.get(timeout=3)
925 q.close()
926 del q
927 """)
928
929 with test.support.DirsOnSysPath(os.getcwd()):
930 try:
931 __import__(module_name)
932 except pyqueue.Empty:
933 self.fail("Probable regression on import lock contention;"
934 " see Issue #22853")
935
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200936 def test_timeout(self):
937 q = multiprocessing.Queue()
938 start = time.time()
Victor Stinneraad7b2e2015-02-05 14:25:05 +0100939 self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200940 delta = time.time() - start
Victor Stinneraad7b2e2015-02-05 14:25:05 +0100941 # Tolerate a delta of 30 ms because of the bad clock resolution on
942 # Windows (usually 15.6 ms)
943 self.assertGreaterEqual(delta, 0.170)
Victor Stinnerb4c52962017-07-25 02:40:55 +0200944 close_queue(q)
Giampaolo Rodola'30830712013-04-17 13:12:27 +0200945
grzgrzgrz3bc50f032017-05-25 16:22:57 +0200946 def test_queue_feeder_donot_stop_onexc(self):
947 # bpo-30414: verify feeder handles exceptions correctly
948 if self.TYPE != 'processes':
949 self.skipTest('test not appropriate for {}'.format(self.TYPE))
950
951 class NotSerializable(object):
952 def __reduce__(self):
953 raise AttributeError
954 with test.support.captured_stderr():
955 q = self.Queue()
956 q.put(NotSerializable())
957 q.put(True)
Victor Stinner8f6eeaf2017-06-13 23:48:47 +0200958 # bpo-30595: use a timeout of 1 second for slow buildbots
959 self.assertTrue(q.get(timeout=1.0))
Victor Stinnerd7e64d92017-07-25 00:33:56 +0200960 close_queue(q)
grzgrzgrz3bc50f032017-05-25 16:22:57 +0200961
Benjamin Petersone711caf2008-06-11 16:44:04 +0000962#
963#
964#
965
966class _TestLock(BaseTestCase):
967
968 def test_lock(self):
969 lock = self.Lock()
970 self.assertEqual(lock.acquire(), True)
971 self.assertEqual(lock.acquire(False), False)
972 self.assertEqual(lock.release(), None)
973 self.assertRaises((ValueError, threading.ThreadError), lock.release)
974
975 def test_rlock(self):
976 lock = self.RLock()
977 self.assertEqual(lock.acquire(), True)
978 self.assertEqual(lock.acquire(), True)
979 self.assertEqual(lock.acquire(), True)
980 self.assertEqual(lock.release(), None)
981 self.assertEqual(lock.release(), None)
982 self.assertEqual(lock.release(), None)
983 self.assertRaises((AssertionError, RuntimeError), lock.release)
984
Jesse Nollerf8d00852009-03-31 03:25:07 +0000985 def test_lock_context(self):
986 with self.Lock():
987 pass
988
Benjamin Petersone711caf2008-06-11 16:44:04 +0000989
990class _TestSemaphore(BaseTestCase):
991
992 def _test_semaphore(self, sem):
993 self.assertReturnsIfImplemented(2, get_value, sem)
994 self.assertEqual(sem.acquire(), True)
995 self.assertReturnsIfImplemented(1, get_value, sem)
996 self.assertEqual(sem.acquire(), True)
997 self.assertReturnsIfImplemented(0, get_value, sem)
998 self.assertEqual(sem.acquire(False), False)
999 self.assertReturnsIfImplemented(0, get_value, sem)
1000 self.assertEqual(sem.release(), None)
1001 self.assertReturnsIfImplemented(1, get_value, sem)
1002 self.assertEqual(sem.release(), None)
1003 self.assertReturnsIfImplemented(2, get_value, sem)
1004
1005 def test_semaphore(self):
1006 sem = self.Semaphore(2)
1007 self._test_semaphore(sem)
1008 self.assertEqual(sem.release(), None)
1009 self.assertReturnsIfImplemented(3, get_value, sem)
1010 self.assertEqual(sem.release(), None)
1011 self.assertReturnsIfImplemented(4, get_value, sem)
1012
1013 def test_bounded_semaphore(self):
1014 sem = self.BoundedSemaphore(2)
1015 self._test_semaphore(sem)
1016 # Currently fails on OS/X
1017 #if HAVE_GETVALUE:
1018 # self.assertRaises(ValueError, sem.release)
1019 # self.assertReturnsIfImplemented(2, get_value, sem)
1020
1021 def test_timeout(self):
1022 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001023 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001024
1025 sem = self.Semaphore(0)
1026 acquire = TimingWrapper(sem.acquire)
1027
1028 self.assertEqual(acquire(False), False)
1029 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1030
1031 self.assertEqual(acquire(False, None), False)
1032 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1033
1034 self.assertEqual(acquire(False, TIMEOUT1), False)
1035 self.assertTimingAlmostEqual(acquire.elapsed, 0)
1036
1037 self.assertEqual(acquire(True, TIMEOUT2), False)
1038 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1039
1040 self.assertEqual(acquire(timeout=TIMEOUT3), False)
1041 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1042
1043
1044class _TestCondition(BaseTestCase):
1045
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001046 @classmethod
1047 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 cond.acquire()
1049 sleeping.release()
1050 cond.wait(timeout)
1051 woken.release()
1052 cond.release()
1053
Antoine Pitrou48350412017-07-04 08:59:22 +02001054 def assertReachesEventually(self, func, value):
1055 for i in range(10):
1056 try:
1057 if func() == value:
1058 break
1059 except NotImplementedError:
1060 break
1061 time.sleep(DELTA)
1062 time.sleep(DELTA)
1063 self.assertReturnsIfImplemented(value, func)
1064
Benjamin Petersone711caf2008-06-11 16:44:04 +00001065 def check_invariant(self, cond):
1066 # this is only supposed to succeed when there are no sleepers
1067 if self.TYPE == 'processes':
1068 try:
1069 sleepers = (cond._sleeping_count.get_value() -
1070 cond._woken_count.get_value())
1071 self.assertEqual(sleepers, 0)
1072 self.assertEqual(cond._wait_semaphore.get_value(), 0)
1073 except NotImplementedError:
1074 pass
1075
1076 def test_notify(self):
1077 cond = self.Condition()
1078 sleeping = self.Semaphore(0)
1079 woken = self.Semaphore(0)
1080
1081 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001082 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001083 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001084 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001085
1086 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001087 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001088 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001089 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001090
1091 # wait for both children to start sleeping
1092 sleeping.acquire()
1093 sleeping.acquire()
1094
1095 # check no process/thread has woken up
1096 time.sleep(DELTA)
1097 self.assertReturnsIfImplemented(0, get_value, woken)
1098
1099 # wake up one process/thread
1100 cond.acquire()
1101 cond.notify()
1102 cond.release()
1103
1104 # check one process/thread has woken up
1105 time.sleep(DELTA)
1106 self.assertReturnsIfImplemented(1, get_value, woken)
1107
1108 # wake up another
1109 cond.acquire()
1110 cond.notify()
1111 cond.release()
1112
1113 # check other has woken up
1114 time.sleep(DELTA)
1115 self.assertReturnsIfImplemented(2, get_value, woken)
1116
1117 # check state is not mucked up
1118 self.check_invariant(cond)
1119 p.join()
1120
1121 def test_notify_all(self):
1122 cond = self.Condition()
1123 sleeping = self.Semaphore(0)
1124 woken = self.Semaphore(0)
1125
1126 # start some threads/processes which will timeout
1127 for i in range(3):
1128 p = self.Process(target=self.f,
1129 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001130 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001131 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001132 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001133
1134 t = threading.Thread(target=self.f,
1135 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +00001136 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001137 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001138 self.addCleanup(t.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001139
1140 # wait for them all to sleep
1141 for i in range(6):
1142 sleeping.acquire()
1143
1144 # check they have all timed out
1145 for i in range(6):
1146 woken.acquire()
1147 self.assertReturnsIfImplemented(0, get_value, woken)
1148
1149 # check state is not mucked up
1150 self.check_invariant(cond)
1151
1152 # start some more threads/processes
1153 for i in range(3):
1154 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001155 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001156 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001157 self.addCleanup(p.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001158
1159 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +00001160 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001161 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001162 self.addCleanup(t.join)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001163
1164 # wait for them to all sleep
1165 for i in range(6):
1166 sleeping.acquire()
1167
1168 # check no process/thread has woken up
1169 time.sleep(DELTA)
1170 self.assertReturnsIfImplemented(0, get_value, woken)
1171
1172 # wake them all up
1173 cond.acquire()
1174 cond.notify_all()
1175 cond.release()
1176
1177 # check they have all woken
Antoine Pitrou48350412017-07-04 08:59:22 +02001178 self.assertReachesEventually(lambda: get_value(woken), 6)
1179
1180 # check state is not mucked up
1181 self.check_invariant(cond)
1182
1183 def test_notify_n(self):
1184 cond = self.Condition()
1185 sleeping = self.Semaphore(0)
1186 woken = self.Semaphore(0)
1187
1188 # start some threads/processes
1189 for i in range(3):
1190 p = self.Process(target=self.f, args=(cond, sleeping, woken))
1191 p.daemon = True
1192 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001193 self.addCleanup(p.join)
Antoine Pitrou48350412017-07-04 08:59:22 +02001194
1195 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1196 t.daemon = True
1197 t.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001198 self.addCleanup(t.join)
Antoine Pitrou48350412017-07-04 08:59:22 +02001199
1200 # wait for them to all sleep
1201 for i in range(6):
1202 sleeping.acquire()
1203
1204 # check no process/thread has woken up
1205 time.sleep(DELTA)
1206 self.assertReturnsIfImplemented(0, get_value, woken)
1207
1208 # wake some of them up
1209 cond.acquire()
1210 cond.notify(n=2)
1211 cond.release()
1212
1213 # check 2 have woken
1214 self.assertReachesEventually(lambda: get_value(woken), 2)
1215
1216 # wake the rest of them
1217 cond.acquire()
1218 cond.notify(n=4)
1219 cond.release()
1220
1221 self.assertReachesEventually(lambda: get_value(woken), 6)
1222
1223 # doesn't do anything more
1224 cond.acquire()
1225 cond.notify(n=3)
1226 cond.release()
1227
Benjamin Petersone711caf2008-06-11 16:44:04 +00001228 self.assertReturnsIfImplemented(6, get_value, woken)
1229
1230 # check state is not mucked up
1231 self.check_invariant(cond)
1232
1233 def test_timeout(self):
1234 cond = self.Condition()
1235 wait = TimingWrapper(cond.wait)
1236 cond.acquire()
1237 res = wait(TIMEOUT1)
1238 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +00001239 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001240 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1241
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001242 @classmethod
1243 def _test_waitfor_f(cls, cond, state):
1244 with cond:
1245 state.value = 0
1246 cond.notify()
1247 result = cond.wait_for(lambda : state.value==4)
1248 if not result or state.value != 4:
1249 sys.exit(1)
1250
1251 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1252 def test_waitfor(self):
1253 # based on test in test/lock_tests.py
1254 cond = self.Condition()
1255 state = self.Value('i', -1)
1256
1257 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1258 p.daemon = True
1259 p.start()
1260
1261 with cond:
1262 result = cond.wait_for(lambda : state.value==0)
1263 self.assertTrue(result)
1264 self.assertEqual(state.value, 0)
1265
1266 for i in range(4):
1267 time.sleep(0.01)
1268 with cond:
1269 state.value += 1
1270 cond.notify()
1271
Victor Stinner11f08072017-09-15 06:55:31 -07001272 join_process(p)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001273 self.assertEqual(p.exitcode, 0)
1274
1275 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001276 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1277 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001278 with cond:
1279 expected = 0.1
1280 dt = time.time()
1281 result = cond.wait_for(lambda : state.value==4, timeout=expected)
1282 dt = time.time() - dt
1283 # borrow logic in assertTimeout() from test/lock_tests.py
1284 if not result and expected * 0.6 < dt < expected * 10.0:
1285 success.value = True
1286
1287 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1288 def test_waitfor_timeout(self):
1289 # based on test in test/lock_tests.py
1290 cond = self.Condition()
1291 state = self.Value('i', 0)
1292 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001293 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001294
1295 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +01001296 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001297 p.daemon = True
1298 p.start()
Victor Stinner11f08072017-09-15 06:55:31 -07001299 self.assertTrue(sem.acquire(timeout=TIMEOUT))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001300
1301 # Only increment 3 times, so state == 4 is never reached.
1302 for i in range(3):
1303 time.sleep(0.01)
1304 with cond:
1305 state.value += 1
1306 cond.notify()
1307
Victor Stinner11f08072017-09-15 06:55:31 -07001308 join_process(p)
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001309 self.assertTrue(success.value)
1310
Richard Oudkerk98449932012-06-05 13:15:29 +01001311 @classmethod
1312 def _test_wait_result(cls, c, pid):
1313 with c:
1314 c.notify()
1315 time.sleep(1)
1316 if pid is not None:
1317 os.kill(pid, signal.SIGINT)
1318
1319 def test_wait_result(self):
1320 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1321 pid = os.getpid()
1322 else:
1323 pid = None
1324
1325 c = self.Condition()
1326 with c:
1327 self.assertFalse(c.wait(0))
1328 self.assertFalse(c.wait(0.1))
1329
1330 p = self.Process(target=self._test_wait_result, args=(c, pid))
1331 p.start()
1332
1333 self.assertTrue(c.wait(10))
1334 if pid is not None:
1335 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1336
1337 p.join()
1338
Benjamin Petersone711caf2008-06-11 16:44:04 +00001339
1340class _TestEvent(BaseTestCase):
1341
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001342 @classmethod
1343 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001344 time.sleep(TIMEOUT2)
1345 event.set()
1346
1347 def test_event(self):
1348 event = self.Event()
1349 wait = TimingWrapper(event.wait)
1350
Ezio Melotti13925002011-03-16 11:05:33 +02001351 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001352 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001353 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001354
Benjamin Peterson965ce872009-04-05 21:24:58 +00001355 # Removed, threading.Event.wait() will return the value of the __flag
1356 # instead of None. API Shear with the semaphore backed mp.Event
1357 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001358 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001359 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001360 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1361
1362 event.set()
1363
1364 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001365 self.assertEqual(event.is_set(), True)
1366 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001367 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001368 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001369 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1370 # self.assertEqual(event.is_set(), True)
1371
1372 event.clear()
1373
1374 #self.assertEqual(event.is_set(), False)
1375
Jesus Cea94f964f2011-09-09 20:26:57 +02001376 p = self.Process(target=self._test_event, args=(event,))
1377 p.daemon = True
1378 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001379 self.assertEqual(wait(), True)
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001380 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001381
1382#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001383# Tests for Barrier - adapted from tests in test/lock_tests.py
1384#
1385
1386# Many of the tests for threading.Barrier use a list as an atomic
1387# counter: a value is appended to increment the counter, and the
1388# length of the list gives the value. We use the class DummyList
1389# for the same purpose.
1390
1391class _DummyList(object):
1392
1393 def __init__(self):
1394 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1395 lock = multiprocessing.Lock()
1396 self.__setstate__((wrapper, lock))
1397 self._lengthbuf[0] = 0
1398
1399 def __setstate__(self, state):
1400 (self._wrapper, self._lock) = state
1401 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1402
1403 def __getstate__(self):
1404 return (self._wrapper, self._lock)
1405
1406 def append(self, _):
1407 with self._lock:
1408 self._lengthbuf[0] += 1
1409
1410 def __len__(self):
1411 with self._lock:
1412 return self._lengthbuf[0]
1413
1414def _wait():
1415 # A crude wait/yield function not relying on synchronization primitives.
1416 time.sleep(0.01)
1417
1418
1419class Bunch(object):
1420 """
1421 A bunch of threads.
1422 """
1423 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1424 """
1425 Construct a bunch of `n` threads running the same function `f`.
1426 If `wait_before_exit` is True, the threads won't terminate until
1427 do_finish() is called.
1428 """
1429 self.f = f
1430 self.args = args
1431 self.n = n
1432 self.started = namespace.DummyList()
1433 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001434 self._can_exit = namespace.Event()
1435 if not wait_before_exit:
1436 self._can_exit.set()
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001437
1438 threads = []
Richard Oudkerk3730a172012-06-15 18:26:07 +01001439 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001440 p = namespace.Process(target=self.task)
1441 p.daemon = True
1442 p.start()
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001443 threads.append(p)
1444
1445 def finalize(threads):
1446 for p in threads:
1447 p.join()
1448
1449 self._finalizer = weakref.finalize(self, finalize, threads)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001450
1451 def task(self):
1452 pid = os.getpid()
1453 self.started.append(pid)
1454 try:
1455 self.f(*self.args)
1456 finally:
1457 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001458 self._can_exit.wait(30)
1459 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001460
1461 def wait_for_started(self):
1462 while len(self.started) < self.n:
1463 _wait()
1464
1465 def wait_for_finished(self):
1466 while len(self.finished) < self.n:
1467 _wait()
1468
1469 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001470 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001471
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001472 def close(self):
1473 self._finalizer()
1474
Richard Oudkerk3730a172012-06-15 18:26:07 +01001475
1476class AppendTrue(object):
1477 def __init__(self, obj):
1478 self.obj = obj
1479 def __call__(self):
1480 self.obj.append(True)
1481
1482
1483class _TestBarrier(BaseTestCase):
1484 """
1485 Tests for Barrier objects.
1486 """
1487 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001488 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001489
1490 def setUp(self):
1491 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1492
1493 def tearDown(self):
1494 self.barrier.abort()
1495 self.barrier = None
1496
1497 def DummyList(self):
1498 if self.TYPE == 'threads':
1499 return []
1500 elif self.TYPE == 'manager':
1501 return self.manager.list()
1502 else:
1503 return _DummyList()
1504
1505 def run_threads(self, f, args):
1506 b = Bunch(self, f, args, self.N-1)
Antoine Pitroua79f8fa2017-06-28 11:21:52 +02001507 try:
1508 f(*args)
1509 b.wait_for_finished()
1510 finally:
1511 b.close()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001512
1513 @classmethod
1514 def multipass(cls, barrier, results, n):
1515 m = barrier.parties
1516 assert m == cls.N
1517 for i in range(n):
1518 results[0].append(True)
1519 assert len(results[1]) == i * m
1520 barrier.wait()
1521 results[1].append(True)
1522 assert len(results[0]) == (i + 1) * m
1523 barrier.wait()
1524 try:
1525 assert barrier.n_waiting == 0
1526 except NotImplementedError:
1527 pass
1528 assert not barrier.broken
1529
1530 def test_barrier(self, passes=1):
1531 """
1532 Test that a barrier is passed in lockstep
1533 """
1534 results = [self.DummyList(), self.DummyList()]
1535 self.run_threads(self.multipass, (self.barrier, results, passes))
1536
1537 def test_barrier_10(self):
1538 """
1539 Test that a barrier works for 10 consecutive runs
1540 """
1541 return self.test_barrier(10)
1542
1543 @classmethod
1544 def _test_wait_return_f(cls, barrier, queue):
1545 res = barrier.wait()
1546 queue.put(res)
1547
1548 def test_wait_return(self):
1549 """
1550 test the return value from barrier.wait
1551 """
1552 queue = self.Queue()
1553 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1554 results = [queue.get() for i in range(self.N)]
1555 self.assertEqual(results.count(0), 1)
Victor Stinnerb4c52962017-07-25 02:40:55 +02001556 close_queue(queue)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001557
1558 @classmethod
1559 def _test_action_f(cls, barrier, results):
1560 barrier.wait()
1561 if len(results) != 1:
1562 raise RuntimeError
1563
1564 def test_action(self):
1565 """
1566 Test the 'action' callback
1567 """
1568 results = self.DummyList()
1569 barrier = self.Barrier(self.N, action=AppendTrue(results))
1570 self.run_threads(self._test_action_f, (barrier, results))
1571 self.assertEqual(len(results), 1)
1572
1573 @classmethod
1574 def _test_abort_f(cls, barrier, results1, results2):
1575 try:
1576 i = barrier.wait()
1577 if i == cls.N//2:
1578 raise RuntimeError
1579 barrier.wait()
1580 results1.append(True)
1581 except threading.BrokenBarrierError:
1582 results2.append(True)
1583 except RuntimeError:
1584 barrier.abort()
1585
1586 def test_abort(self):
1587 """
1588 Test that an abort will put the barrier in a broken state
1589 """
1590 results1 = self.DummyList()
1591 results2 = self.DummyList()
1592 self.run_threads(self._test_abort_f,
1593 (self.barrier, results1, results2))
1594 self.assertEqual(len(results1), 0)
1595 self.assertEqual(len(results2), self.N-1)
1596 self.assertTrue(self.barrier.broken)
1597
1598 @classmethod
1599 def _test_reset_f(cls, barrier, results1, results2, results3):
1600 i = barrier.wait()
1601 if i == cls.N//2:
1602 # Wait until the other threads are all in the barrier.
1603 while barrier.n_waiting < cls.N-1:
1604 time.sleep(0.001)
1605 barrier.reset()
1606 else:
1607 try:
1608 barrier.wait()
1609 results1.append(True)
1610 except threading.BrokenBarrierError:
1611 results2.append(True)
1612 # Now, pass the barrier again
1613 barrier.wait()
1614 results3.append(True)
1615
1616 def test_reset(self):
1617 """
1618 Test that a 'reset' on a barrier frees the waiting threads
1619 """
1620 results1 = self.DummyList()
1621 results2 = self.DummyList()
1622 results3 = self.DummyList()
1623 self.run_threads(self._test_reset_f,
1624 (self.barrier, results1, results2, results3))
1625 self.assertEqual(len(results1), 0)
1626 self.assertEqual(len(results2), self.N-1)
1627 self.assertEqual(len(results3), self.N)
1628
1629 @classmethod
1630 def _test_abort_and_reset_f(cls, barrier, barrier2,
1631 results1, results2, results3):
1632 try:
1633 i = barrier.wait()
1634 if i == cls.N//2:
1635 raise RuntimeError
1636 barrier.wait()
1637 results1.append(True)
1638 except threading.BrokenBarrierError:
1639 results2.append(True)
1640 except RuntimeError:
1641 barrier.abort()
1642 # Synchronize and reset the barrier. Must synchronize first so
1643 # that everyone has left it when we reset, and after so that no
1644 # one enters it before the reset.
1645 if barrier2.wait() == cls.N//2:
1646 barrier.reset()
1647 barrier2.wait()
1648 barrier.wait()
1649 results3.append(True)
1650
1651 def test_abort_and_reset(self):
1652 """
1653 Test that a barrier can be reset after being broken.
1654 """
1655 results1 = self.DummyList()
1656 results2 = self.DummyList()
1657 results3 = self.DummyList()
1658 barrier2 = self.Barrier(self.N)
1659
1660 self.run_threads(self._test_abort_and_reset_f,
1661 (self.barrier, barrier2, results1, results2, results3))
1662 self.assertEqual(len(results1), 0)
1663 self.assertEqual(len(results2), self.N-1)
1664 self.assertEqual(len(results3), self.N)
1665
1666 @classmethod
1667 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001668 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001669 if i == cls.N//2:
1670 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001671 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001672 try:
1673 barrier.wait(0.5)
1674 except threading.BrokenBarrierError:
1675 results.append(True)
1676
1677 def test_timeout(self):
1678 """
1679 Test wait(timeout)
1680 """
1681 results = self.DummyList()
1682 self.run_threads(self._test_timeout_f, (self.barrier, results))
1683 self.assertEqual(len(results), self.barrier.parties)
1684
1685 @classmethod
1686 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001687 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001688 if i == cls.N//2:
1689 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001690 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001691 try:
1692 barrier.wait()
1693 except threading.BrokenBarrierError:
1694 results.append(True)
1695
1696 def test_default_timeout(self):
1697 """
1698 Test the barrier's default timeout
1699 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001700 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001701 results = self.DummyList()
1702 self.run_threads(self._test_default_timeout_f, (barrier, results))
1703 self.assertEqual(len(results), barrier.parties)
1704
1705 def test_single_thread(self):
1706 b = self.Barrier(1)
1707 b.wait()
1708 b.wait()
1709
1710 @classmethod
1711 def _test_thousand_f(cls, barrier, passes, conn, lock):
1712 for i in range(passes):
1713 barrier.wait()
1714 with lock:
1715 conn.send(i)
1716
1717 def test_thousand(self):
1718 if self.TYPE == 'manager':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001719 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk3730a172012-06-15 18:26:07 +01001720 passes = 1000
1721 lock = self.Lock()
1722 conn, child_conn = self.Pipe(False)
1723 for j in range(self.N):
1724 p = self.Process(target=self._test_thousand_f,
1725 args=(self.barrier, passes, child_conn, lock))
1726 p.start()
Victor Stinnerd7e64d92017-07-25 00:33:56 +02001727 self.addCleanup(p.join)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001728
1729 for i in range(passes):
1730 for j in range(self.N):
1731 self.assertEqual(conn.recv(), i)
1732
1733#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001734#
1735#
1736
1737class _TestValue(BaseTestCase):
1738
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001739 ALLOWED_TYPES = ('processes',)
1740
Benjamin Petersone711caf2008-06-11 16:44:04 +00001741 codes_values = [
1742 ('i', 4343, 24234),
1743 ('d', 3.625, -4.25),
1744 ('h', -232, 234),
Gareth Rees3913bad2017-07-21 11:35:33 +01001745 ('q', 2 ** 33, 2 ** 34),
Benjamin Petersone711caf2008-06-11 16:44:04 +00001746 ('c', latin('x'), latin('y'))
1747 ]
1748
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001749 def setUp(self):
1750 if not HAS_SHAREDCTYPES:
1751 self.skipTest("requires multiprocessing.sharedctypes")
1752
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001753 @classmethod
1754 def _test(cls, values):
1755 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001756 sv.value = cv[2]
1757
1758
1759 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001760 if raw:
1761 values = [self.RawValue(code, value)
1762 for code, value, _ in self.codes_values]
1763 else:
1764 values = [self.Value(code, value)
1765 for code, value, _ in self.codes_values]
1766
1767 for sv, cv in zip(values, self.codes_values):
1768 self.assertEqual(sv.value, cv[1])
1769
1770 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001771 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001772 proc.start()
1773 proc.join()
1774
1775 for sv, cv in zip(values, self.codes_values):
1776 self.assertEqual(sv.value, cv[2])
1777
1778 def test_rawvalue(self):
1779 self.test_value(raw=True)
1780
1781 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001782 val1 = self.Value('i', 5)
1783 lock1 = val1.get_lock()
1784 obj1 = val1.get_obj()
1785
1786 val2 = self.Value('i', 5, lock=None)
1787 lock2 = val2.get_lock()
1788 obj2 = val2.get_obj()
1789
1790 lock = self.Lock()
1791 val3 = self.Value('i', 5, lock=lock)
1792 lock3 = val3.get_lock()
1793 obj3 = val3.get_obj()
1794 self.assertEqual(lock, lock3)
1795
Jesse Nollerb0516a62009-01-18 03:11:38 +00001796 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001797 self.assertFalse(hasattr(arr4, 'get_lock'))
1798 self.assertFalse(hasattr(arr4, 'get_obj'))
1799
Jesse Nollerb0516a62009-01-18 03:11:38 +00001800 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1801
1802 arr5 = self.RawValue('i', 5)
1803 self.assertFalse(hasattr(arr5, 'get_lock'))
1804 self.assertFalse(hasattr(arr5, 'get_obj'))
1805
Benjamin Petersone711caf2008-06-11 16:44:04 +00001806
1807class _TestArray(BaseTestCase):
1808
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001809 ALLOWED_TYPES = ('processes',)
1810
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001811 @classmethod
1812 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001813 for i in range(1, len(seq)):
1814 seq[i] += seq[i-1]
1815
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001816 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001817 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001818 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1819 if raw:
1820 arr = self.RawArray('i', seq)
1821 else:
1822 arr = self.Array('i', seq)
1823
1824 self.assertEqual(len(arr), len(seq))
1825 self.assertEqual(arr[3], seq[3])
1826 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1827
1828 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1829
1830 self.assertEqual(list(arr[:]), seq)
1831
1832 self.f(seq)
1833
1834 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001835 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001836 p.start()
1837 p.join()
1838
1839 self.assertEqual(list(arr[:]), seq)
1840
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001841 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001842 def test_array_from_size(self):
1843 size = 10
1844 # Test for zeroing (see issue #11675).
1845 # The repetition below strengthens the test by increasing the chances
1846 # of previously allocated non-zero memory being used for the new array
1847 # on the 2nd and 3rd loops.
1848 for _ in range(3):
1849 arr = self.Array('i', size)
1850 self.assertEqual(len(arr), size)
1851 self.assertEqual(list(arr), [0] * size)
1852 arr[:] = range(10)
1853 self.assertEqual(list(arr), list(range(10)))
1854 del arr
1855
1856 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001857 def test_rawarray(self):
1858 self.test_array(raw=True)
1859
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001860 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001861 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001862 arr1 = self.Array('i', list(range(10)))
1863 lock1 = arr1.get_lock()
1864 obj1 = arr1.get_obj()
1865
1866 arr2 = self.Array('i', list(range(10)), lock=None)
1867 lock2 = arr2.get_lock()
1868 obj2 = arr2.get_obj()
1869
1870 lock = self.Lock()
1871 arr3 = self.Array('i', list(range(10)), lock=lock)
1872 lock3 = arr3.get_lock()
1873 obj3 = arr3.get_obj()
1874 self.assertEqual(lock, lock3)
1875
Jesse Nollerb0516a62009-01-18 03:11:38 +00001876 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001877 self.assertFalse(hasattr(arr4, 'get_lock'))
1878 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001879 self.assertRaises(AttributeError,
1880 self.Array, 'i', range(10), lock='notalock')
1881
1882 arr5 = self.RawArray('i', range(10))
1883 self.assertFalse(hasattr(arr5, 'get_lock'))
1884 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001885
1886#
1887#
1888#
1889
1890class _TestContainers(BaseTestCase):
1891
1892 ALLOWED_TYPES = ('manager',)
1893
1894 def test_list(self):
1895 a = self.list(list(range(10)))
1896 self.assertEqual(a[:], list(range(10)))
1897
1898 b = self.list()
1899 self.assertEqual(b[:], [])
1900
1901 b.extend(list(range(5)))
1902 self.assertEqual(b[:], list(range(5)))
1903
1904 self.assertEqual(b[2], 2)
1905 self.assertEqual(b[2:10], [2,3,4])
1906
1907 b *= 2
1908 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1909
1910 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1911
1912 self.assertEqual(a[:], list(range(10)))
1913
1914 d = [a, b]
1915 e = self.list(d)
1916 self.assertEqual(
Davin Potts86a76682016-09-07 18:48:01 -05001917 [element[:] for element in e],
Benjamin Petersone711caf2008-06-11 16:44:04 +00001918 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1919 )
1920
1921 f = self.list([a])
1922 a.append('hello')
Davin Potts86a76682016-09-07 18:48:01 -05001923 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
1924
1925 def test_list_proxy_in_list(self):
1926 a = self.list([self.list(range(3)) for _i in range(3)])
1927 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
1928
1929 a[0][-1] = 55
1930 self.assertEqual(a[0][:], [0, 1, 55])
1931 for i in range(1, 3):
1932 self.assertEqual(a[i][:], [0, 1, 2])
1933
1934 self.assertEqual(a[1].pop(), 2)
1935 self.assertEqual(len(a[1]), 2)
1936 for i in range(0, 3, 2):
1937 self.assertEqual(len(a[i]), 3)
1938
1939 del a
1940
1941 b = self.list()
1942 b.append(b)
1943 del b
Benjamin Petersone711caf2008-06-11 16:44:04 +00001944
1945 def test_dict(self):
1946 d = self.dict()
1947 indices = list(range(65, 70))
1948 for i in indices:
1949 d[i] = chr(i)
1950 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1951 self.assertEqual(sorted(d.keys()), indices)
1952 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1953 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1954
Davin Potts86a76682016-09-07 18:48:01 -05001955 def test_dict_proxy_nested(self):
1956 pets = self.dict(ferrets=2, hamsters=4)
1957 supplies = self.dict(water=10, feed=3)
1958 d = self.dict(pets=pets, supplies=supplies)
1959
1960 self.assertEqual(supplies['water'], 10)
1961 self.assertEqual(d['supplies']['water'], 10)
1962
1963 d['supplies']['blankets'] = 5
1964 self.assertEqual(supplies['blankets'], 5)
1965 self.assertEqual(d['supplies']['blankets'], 5)
1966
1967 d['supplies']['water'] = 7
1968 self.assertEqual(supplies['water'], 7)
1969 self.assertEqual(d['supplies']['water'], 7)
1970
1971 del pets
1972 del supplies
1973 self.assertEqual(d['pets']['ferrets'], 2)
1974 d['supplies']['blankets'] = 11
1975 self.assertEqual(d['supplies']['blankets'], 11)
1976
1977 pets = d['pets']
1978 supplies = d['supplies']
1979 supplies['water'] = 7
1980 self.assertEqual(supplies['water'], 7)
1981 self.assertEqual(d['supplies']['water'], 7)
1982
1983 d.clear()
1984 self.assertEqual(len(d), 0)
1985 self.assertEqual(supplies['water'], 7)
1986 self.assertEqual(pets['hamsters'], 4)
1987
1988 l = self.list([pets, supplies])
1989 l[0]['marmots'] = 1
1990 self.assertEqual(pets['marmots'], 1)
1991 self.assertEqual(l[0]['marmots'], 1)
1992
1993 del pets
1994 del supplies
1995 self.assertEqual(l[0]['marmots'], 1)
1996
1997 outer = self.list([[88, 99], l])
1998 self.assertIsInstance(outer[0], list) # Not a ListProxy
1999 self.assertEqual(outer[-1][-1]['feed'], 3)
2000
Benjamin Petersone711caf2008-06-11 16:44:04 +00002001 def test_namespace(self):
2002 n = self.Namespace()
2003 n.name = 'Bob'
2004 n.job = 'Builder'
2005 n._hidden = 'hidden'
2006 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2007 del n.job
2008 self.assertEqual(str(n), "Namespace(name='Bob')")
2009 self.assertTrue(hasattr(n, 'name'))
2010 self.assertTrue(not hasattr(n, 'job'))
2011
2012#
2013#
2014#
2015
2016def sqr(x, wait=0.0):
2017 time.sleep(wait)
2018 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00002019
Antoine Pitroude911b22011-12-21 11:03:24 +01002020def mul(x, y):
2021 return x*y
2022
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002023def raise_large_valuerror(wait):
2024 time.sleep(wait)
2025 raise ValueError("x" * 1024**2)
2026
Antoine Pitrou89889452017-03-24 13:52:11 +01002027def identity(x):
2028 return x
2029
2030class CountedObject(object):
2031 n_instances = 0
2032
2033 def __new__(cls):
2034 cls.n_instances += 1
2035 return object.__new__(cls)
2036
2037 def __del__(self):
2038 type(self).n_instances -= 1
2039
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002040class SayWhenError(ValueError): pass
2041
2042def exception_throwing_generator(total, when):
Xiang Zhang794623b2017-03-29 11:58:54 +08002043 if when == -1:
2044 raise SayWhenError("Somebody said when")
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002045 for i in range(total):
2046 if i == when:
2047 raise SayWhenError("Somebody said when")
2048 yield i
2049
Antoine Pitrou89889452017-03-24 13:52:11 +01002050
Benjamin Petersone711caf2008-06-11 16:44:04 +00002051class _TestPool(BaseTestCase):
2052
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002053 @classmethod
2054 def setUpClass(cls):
2055 super().setUpClass()
2056 cls.pool = cls.Pool(4)
2057
2058 @classmethod
2059 def tearDownClass(cls):
2060 cls.pool.terminate()
2061 cls.pool.join()
2062 cls.pool = None
2063 super().tearDownClass()
2064
Benjamin Petersone711caf2008-06-11 16:44:04 +00002065 def test_apply(self):
2066 papply = self.pool.apply
2067 self.assertEqual(papply(sqr, (5,)), sqr(5))
2068 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2069
2070 def test_map(self):
2071 pmap = self.pool.map
2072 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2073 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2074 list(map(sqr, list(range(100)))))
2075
Antoine Pitroude911b22011-12-21 11:03:24 +01002076 def test_starmap(self):
2077 psmap = self.pool.starmap
2078 tuples = list(zip(range(10), range(9,-1, -1)))
2079 self.assertEqual(psmap(mul, tuples),
2080 list(itertools.starmap(mul, tuples)))
2081 tuples = list(zip(range(100), range(99,-1, -1)))
2082 self.assertEqual(psmap(mul, tuples, chunksize=20),
2083 list(itertools.starmap(mul, tuples)))
2084
2085 def test_starmap_async(self):
2086 tuples = list(zip(range(100), range(99,-1, -1)))
2087 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2088 list(itertools.starmap(mul, tuples)))
2089
Hynek Schlawack254af262012-10-27 12:53:02 +02002090 def test_map_async(self):
2091 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2092 list(map(sqr, list(range(10)))))
2093
2094 def test_map_async_callbacks(self):
2095 call_args = self.manager.list() if self.TYPE == 'manager' else []
2096 self.pool.map_async(int, ['1'],
2097 callback=call_args.append,
2098 error_callback=call_args.append).wait()
2099 self.assertEqual(1, len(call_args))
2100 self.assertEqual([1], call_args[0])
2101 self.pool.map_async(int, ['a'],
2102 callback=call_args.append,
2103 error_callback=call_args.append).wait()
2104 self.assertEqual(2, len(call_args))
2105 self.assertIsInstance(call_args[1], ValueError)
2106
Richard Oudkerke90cedb2013-10-28 23:11:58 +00002107 def test_map_unplicklable(self):
2108 # Issue #19425 -- failure to pickle should not cause a hang
2109 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002110 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerke90cedb2013-10-28 23:11:58 +00002111 class A(object):
2112 def __reduce__(self):
2113 raise RuntimeError('cannot pickle')
2114 with self.assertRaises(RuntimeError):
2115 self.pool.map(sqr, [A()]*10)
2116
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00002117 def test_map_chunksize(self):
2118 try:
2119 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2120 except multiprocessing.TimeoutError:
2121 self.fail("pool.map_async with chunksize stalled on null list")
2122
Xiang Zhang794623b2017-03-29 11:58:54 +08002123 def test_map_handle_iterable_exception(self):
2124 if self.TYPE == 'manager':
2125 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2126
2127 # SayWhenError seen at the very first of the iterable
2128 with self.assertRaises(SayWhenError):
2129 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2130 # again, make sure it's reentrant
2131 with self.assertRaises(SayWhenError):
2132 self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2133
2134 with self.assertRaises(SayWhenError):
2135 self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2136
2137 class SpecialIterable:
2138 def __iter__(self):
2139 return self
2140 def __next__(self):
2141 raise SayWhenError
2142 def __len__(self):
2143 return 1
2144 with self.assertRaises(SayWhenError):
2145 self.pool.map(sqr, SpecialIterable(), 1)
2146 with self.assertRaises(SayWhenError):
2147 self.pool.map(sqr, SpecialIterable(), 1)
2148
Benjamin Petersone711caf2008-06-11 16:44:04 +00002149 def test_async(self):
2150 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2151 get = TimingWrapper(res.get)
2152 self.assertEqual(get(), 49)
2153 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2154
2155 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00002156 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002157 get = TimingWrapper(res.get)
2158 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2159 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2160
2161 def test_imap(self):
2162 it = self.pool.imap(sqr, list(range(10)))
2163 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2164
2165 it = self.pool.imap(sqr, list(range(10)))
2166 for i in range(10):
2167 self.assertEqual(next(it), i*i)
2168 self.assertRaises(StopIteration, it.__next__)
2169
2170 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2171 for i in range(1000):
2172 self.assertEqual(next(it), i*i)
2173 self.assertRaises(StopIteration, it.__next__)
2174
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002175 def test_imap_handle_iterable_exception(self):
2176 if self.TYPE == 'manager':
2177 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2178
Xiang Zhang794623b2017-03-29 11:58:54 +08002179 # SayWhenError seen at the very first of the iterable
2180 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2181 self.assertRaises(SayWhenError, it.__next__)
2182 # again, make sure it's reentrant
2183 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2184 self.assertRaises(SayWhenError, it.__next__)
2185
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002186 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2187 for i in range(3):
2188 self.assertEqual(next(it), i*i)
2189 self.assertRaises(SayWhenError, it.__next__)
2190
2191 # SayWhenError seen at start of problematic chunk's results
2192 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2193 for i in range(6):
2194 self.assertEqual(next(it), i*i)
2195 self.assertRaises(SayWhenError, it.__next__)
2196 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2197 for i in range(4):
2198 self.assertEqual(next(it), i*i)
2199 self.assertRaises(SayWhenError, it.__next__)
2200
Benjamin Petersone711caf2008-06-11 16:44:04 +00002201 def test_imap_unordered(self):
2202 it = self.pool.imap_unordered(sqr, list(range(1000)))
2203 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2204
2205 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
2206 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2207
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002208 def test_imap_unordered_handle_iterable_exception(self):
2209 if self.TYPE == 'manager':
2210 self.skipTest('test not appropriate for {}'.format(self.TYPE))
2211
Xiang Zhang794623b2017-03-29 11:58:54 +08002212 # SayWhenError seen at the very first of the iterable
2213 it = self.pool.imap_unordered(sqr,
2214 exception_throwing_generator(1, -1),
2215 1)
2216 self.assertRaises(SayWhenError, it.__next__)
2217 # again, make sure it's reentrant
2218 it = self.pool.imap_unordered(sqr,
2219 exception_throwing_generator(1, -1),
2220 1)
2221 self.assertRaises(SayWhenError, it.__next__)
2222
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002223 it = self.pool.imap_unordered(sqr,
2224 exception_throwing_generator(10, 3),
2225 1)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002226 expected_values = list(map(sqr, list(range(10))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002227 with self.assertRaises(SayWhenError):
2228 # imap_unordered makes it difficult to anticipate the SayWhenError
2229 for i in range(10):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002230 value = next(it)
2231 self.assertIn(value, expected_values)
2232 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002233
2234 it = self.pool.imap_unordered(sqr,
2235 exception_throwing_generator(20, 7),
2236 2)
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002237 expected_values = list(map(sqr, list(range(20))))
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002238 with self.assertRaises(SayWhenError):
2239 for i in range(20):
Serhiy Storchaka71f73ca2015-04-23 11:35:59 +03002240 value = next(it)
2241 self.assertIn(value, expected_values)
2242 expected_values.remove(value)
Serhiy Storchaka79fbeee2015-03-13 08:25:26 +02002243
Benjamin Petersone711caf2008-06-11 16:44:04 +00002244 def test_make_pool(self):
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002245 expected_error = (RemoteError if self.TYPE == 'manager'
2246 else ValueError)
Victor Stinner2fae27b2011-06-20 17:53:35 +02002247
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002248 self.assertRaises(expected_error, self.Pool, -1)
2249 self.assertRaises(expected_error, self.Pool, 0)
2250
2251 if self.TYPE != 'manager':
2252 p = self.Pool(3)
2253 try:
2254 self.assertEqual(3, len(p._pool))
2255 finally:
2256 p.close()
2257 p.join()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002258
2259 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002260 result = self.pool.map_async(
2261 time.sleep, [0.1 for i in range(10000)], chunksize=1
2262 )
2263 self.pool.terminate()
2264 join = TimingWrapper(self.pool.join)
2265 join()
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002266 # Sanity check the pool didn't wait for all tasks to finish
2267 self.assertLess(join.elapsed, 2.0)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002268
Richard Oudkerke41682b2012-06-06 19:04:57 +01002269 def test_empty_iterable(self):
2270 # See Issue 12157
2271 p = self.Pool(1)
2272
2273 self.assertEqual(p.map(sqr, []), [])
2274 self.assertEqual(list(p.imap(sqr, [])), [])
2275 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2276 self.assertEqual(p.map_async(sqr, []).get(), [])
2277
2278 p.close()
2279 p.join()
2280
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002281 def test_context(self):
2282 if self.TYPE == 'processes':
2283 L = list(range(10))
2284 expected = [sqr(i) for i in L]
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01002285 with self.Pool(2) as p:
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002286 r = p.map_async(sqr, L)
2287 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04002288 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002289
Richard Oudkerk85757832013-05-06 11:38:25 +01002290 @classmethod
2291 def _test_traceback(cls):
2292 raise RuntimeError(123) # some comment
2293
2294 def test_traceback(self):
2295 # We want ensure that the traceback from the child process is
2296 # contained in the traceback raised in the main process.
2297 if self.TYPE == 'processes':
2298 with self.Pool(1) as p:
2299 try:
2300 p.apply(self._test_traceback)
2301 except Exception as e:
2302 exc = e
2303 else:
Xiang Zhang794623b2017-03-29 11:58:54 +08002304 self.fail('expected RuntimeError')
Richard Oudkerk85757832013-05-06 11:38:25 +01002305 self.assertIs(type(exc), RuntimeError)
2306 self.assertEqual(exc.args, (123,))
2307 cause = exc.__cause__
2308 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2309 self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2310
2311 with test.support.captured_stderr() as f1:
2312 try:
2313 raise exc
2314 except RuntimeError:
2315 sys.excepthook(*sys.exc_info())
2316 self.assertIn('raise RuntimeError(123) # some comment',
2317 f1.getvalue())
Xiang Zhang794623b2017-03-29 11:58:54 +08002318 # _helper_reraises_exception should not make the error
2319 # a remote exception
2320 with self.Pool(1) as p:
2321 try:
2322 p.map(sqr, exception_throwing_generator(1, -1), 1)
2323 except Exception as e:
2324 exc = e
2325 else:
2326 self.fail('expected SayWhenError')
2327 self.assertIs(type(exc), SayWhenError)
2328 self.assertIs(exc.__cause__, None)
Richard Oudkerk85757832013-05-06 11:38:25 +01002329
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002330 @classmethod
2331 def _test_wrapped_exception(cls):
2332 raise RuntimeError('foo')
2333
2334 def test_wrapped_exception(self):
2335 # Issue #20980: Should not wrap exception when using thread pool
2336 with self.Pool(1) as p:
2337 with self.assertRaises(RuntimeError):
2338 p.apply(self._test_wrapped_exception)
2339
Charles-François Natali78f55ff2016-02-10 22:58:18 +00002340 def test_map_no_failfast(self):
2341 # Issue #23992: the fail-fast behaviour when an exception is raised
2342 # during map() would make Pool.join() deadlock, because a worker
2343 # process would fill the result queue (after the result handler thread
2344 # terminated, hence not draining it anymore).
2345
2346 t_start = time.time()
2347
2348 with self.assertRaises(ValueError):
2349 with self.Pool(2) as p:
2350 try:
2351 p.map(raise_large_valuerror, [0, 1])
2352 finally:
2353 time.sleep(0.5)
2354 p.close()
2355 p.join()
2356
2357 # check that we indeed waited for all jobs
2358 self.assertGreater(time.time() - t_start, 0.9)
2359
Antoine Pitrou89889452017-03-24 13:52:11 +01002360 def test_release_task_refs(self):
2361 # Issue #29861: task arguments and results should not be kept
2362 # alive after we are done with them.
2363 objs = [CountedObject() for i in range(10)]
2364 refs = [weakref.ref(o) for o in objs]
2365 self.pool.map(identity, objs)
2366
2367 del objs
Antoine Pitrou685cdb92017-04-14 13:10:00 +02002368 time.sleep(DELTA) # let threaded cleanup code run
Antoine Pitrou89889452017-03-24 13:52:11 +01002369 self.assertEqual(set(wr() for wr in refs), {None})
2370 # With a process pool, copies of the objects are returned, check
2371 # they were released too.
2372 self.assertEqual(CountedObject.n_instances, 0)
2373
Richard Oudkerk80a5be12014-03-23 12:30:54 +00002374
Ask Solem2afcbf22010-11-09 20:55:52 +00002375def raising():
2376 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00002377
Ask Solem2afcbf22010-11-09 20:55:52 +00002378def unpickleable_result():
2379 return lambda: 42
2380
2381class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00002382 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00002383
2384 def test_async_error_callback(self):
2385 p = multiprocessing.Pool(2)
2386
2387 scratchpad = [None]
2388 def errback(exc):
2389 scratchpad[0] = exc
2390
2391 res = p.apply_async(raising, error_callback=errback)
2392 self.assertRaises(KeyError, res.get)
2393 self.assertTrue(scratchpad[0])
2394 self.assertIsInstance(scratchpad[0], KeyError)
2395
2396 p.close()
2397 p.join()
2398
2399 def test_unpickleable_result(self):
2400 from multiprocessing.pool import MaybeEncodingError
2401 p = multiprocessing.Pool(2)
2402
2403 # Make sure we don't lose pool processes because of encoding errors.
2404 for iteration in range(20):
2405
2406 scratchpad = [None]
2407 def errback(exc):
2408 scratchpad[0] = exc
2409
2410 res = p.apply_async(unpickleable_result, error_callback=errback)
2411 self.assertRaises(MaybeEncodingError, res.get)
2412 wrapped = scratchpad[0]
2413 self.assertTrue(wrapped)
2414 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2415 self.assertIsNotNone(wrapped.exc)
2416 self.assertIsNotNone(wrapped.value)
2417
2418 p.close()
2419 p.join()
2420
2421class _TestPoolWorkerLifetime(BaseTestCase):
2422 ALLOWED_TYPES = ('processes', )
2423
Jesse Noller1f0b6582010-01-27 03:36:01 +00002424 def test_pool_worker_lifetime(self):
2425 p = multiprocessing.Pool(3, maxtasksperchild=10)
2426 self.assertEqual(3, len(p._pool))
2427 origworkerpids = [w.pid for w in p._pool]
2428 # Run many tasks so each worker gets replaced (hopefully)
2429 results = []
2430 for i in range(100):
2431 results.append(p.apply_async(sqr, (i, )))
2432 # Fetch the results and verify we got the right answers,
2433 # also ensuring all the tasks have completed.
2434 for (j, res) in enumerate(results):
2435 self.assertEqual(res.get(), sqr(j))
2436 # Refill the pool
2437 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00002438 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02002439 # (countdown * DELTA = 5 seconds max startup process time)
2440 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00002441 while countdown and not all(w.is_alive() for w in p._pool):
2442 countdown -= 1
2443 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00002444 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00002445 # All pids should be assigned. See issue #7805.
2446 self.assertNotIn(None, origworkerpids)
2447 self.assertNotIn(None, finalworkerpids)
2448 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00002449 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2450 p.close()
2451 p.join()
2452
Charles-François Natalif8859e12011-10-24 18:45:29 +02002453 def test_pool_worker_lifetime_early_close(self):
2454 # Issue #10332: closing a pool whose workers have limited lifetimes
2455 # before all the tasks completed would make join() hang.
2456 p = multiprocessing.Pool(3, maxtasksperchild=1)
2457 results = []
2458 for i in range(6):
2459 results.append(p.apply_async(sqr, (i, 0.3)))
2460 p.close()
2461 p.join()
2462 # check the results
2463 for (j, res) in enumerate(results):
2464 self.assertEqual(res.get(), sqr(j))
2465
Benjamin Petersone711caf2008-06-11 16:44:04 +00002466#
2467# Test of creating a customized manager class
2468#
2469
2470from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2471
2472class FooBar(object):
2473 def f(self):
2474 return 'f()'
2475 def g(self):
2476 raise ValueError
2477 def _h(self):
2478 return '_h()'
2479
2480def baz():
2481 for i in range(10):
2482 yield i*i
2483
2484class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00002485 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002486 def __iter__(self):
2487 return self
2488 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002489 return self._callmethod('__next__')
2490
2491class MyManager(BaseManager):
2492 pass
2493
2494MyManager.register('Foo', callable=FooBar)
2495MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2496MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2497
2498
2499class _TestMyManager(BaseTestCase):
2500
2501 ALLOWED_TYPES = ('manager',)
2502
2503 def test_mymanager(self):
2504 manager = MyManager()
2505 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01002506 self.common(manager)
2507 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002508
Richard Oudkerkac385712012-06-18 21:29:30 +01002509 # If the manager process exited cleanly then the exitcode
2510 # will be zero. Otherwise (after a short timeout)
2511 # terminate() is used, resulting in an exitcode of -SIGTERM.
2512 self.assertEqual(manager._process.exitcode, 0)
2513
2514 def test_mymanager_context(self):
2515 with MyManager() as manager:
2516 self.common(manager)
2517 self.assertEqual(manager._process.exitcode, 0)
2518
2519 def test_mymanager_context_prestarted(self):
2520 manager = MyManager()
2521 manager.start()
2522 with manager:
2523 self.common(manager)
2524 self.assertEqual(manager._process.exitcode, 0)
2525
2526 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002527 foo = manager.Foo()
2528 bar = manager.Bar()
2529 baz = manager.baz()
2530
2531 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2532 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2533
2534 self.assertEqual(foo_methods, ['f', 'g'])
2535 self.assertEqual(bar_methods, ['f', '_h'])
2536
2537 self.assertEqual(foo.f(), 'f()')
2538 self.assertRaises(ValueError, foo.g)
2539 self.assertEqual(foo._callmethod('f'), 'f()')
2540 self.assertRaises(RemoteError, foo._callmethod, '_h')
2541
2542 self.assertEqual(bar.f(), 'f()')
2543 self.assertEqual(bar._h(), '_h()')
2544 self.assertEqual(bar._callmethod('f'), 'f()')
2545 self.assertEqual(bar._callmethod('_h'), '_h()')
2546
2547 self.assertEqual(list(baz), [i*i for i in range(10)])
2548
Richard Oudkerk73d9a292012-06-14 15:30:10 +01002549
Benjamin Petersone711caf2008-06-11 16:44:04 +00002550#
2551# Test of connecting to a remote server and using xmlrpclib for serialization
2552#
2553
2554_queue = pyqueue.Queue()
2555def get_queue():
2556 return _queue
2557
2558class QueueManager(BaseManager):
2559 '''manager class used by server process'''
2560QueueManager.register('get_queue', callable=get_queue)
2561
2562class QueueManager2(BaseManager):
2563 '''manager class which specifies the same interface as QueueManager'''
2564QueueManager2.register('get_queue')
2565
2566
2567SERIALIZER = 'xmlrpclib'
2568
2569class _TestRemoteManager(BaseTestCase):
2570
2571 ALLOWED_TYPES = ('manager',)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002572 values = ['hello world', None, True, 2.25,
2573 'hall\xe5 v\xe4rlden',
2574 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2575 b'hall\xe5 v\xe4rlden',
2576 ]
2577 result = values[:]
Benjamin Petersone711caf2008-06-11 16:44:04 +00002578
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002579 @classmethod
2580 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002581 manager = QueueManager2(
2582 address=address, authkey=authkey, serializer=SERIALIZER
2583 )
2584 manager.connect()
2585 queue = manager.get_queue()
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002586 # Note that xmlrpclib will deserialize object as a list not a tuple
2587 queue.put(tuple(cls.values))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002588
2589 def test_remote(self):
2590 authkey = os.urandom(32)
2591
2592 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002593 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00002594 )
2595 manager.start()
2596
2597 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002598 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002599 p.start()
2600
2601 manager2 = QueueManager2(
2602 address=manager.address, authkey=authkey, serializer=SERIALIZER
2603 )
2604 manager2.connect()
2605 queue = manager2.get_queue()
2606
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02002607 self.assertEqual(queue.get(), self.result)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002608
2609 # Because we are using xmlrpclib for serialization instead of
2610 # pickle this will cause a serialization error.
2611 self.assertRaises(Exception, queue.put, time.sleep)
2612
2613 # Make queue finalizer run before the server is stopped
2614 del queue
2615 manager.shutdown()
2616
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002617class _TestManagerRestart(BaseTestCase):
2618
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002619 @classmethod
2620 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002621 manager = QueueManager(
2622 address=address, authkey=authkey, serializer=SERIALIZER)
2623 manager.connect()
2624 queue = manager.get_queue()
2625 queue.put('hello world')
2626
2627 def test_rapid_restart(self):
2628 authkey = os.urandom(32)
2629 manager = QueueManager(
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02002630 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002631 srvr = manager.get_server()
2632 addr = srvr.address
2633 # Close the connection.Listener socket which gets opened as a part
2634 # of manager.get_server(). It's not needed for the test.
2635 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002636 manager.start()
2637
2638 p = self.Process(target=self._putter, args=(manager.address, authkey))
2639 p.start()
Victor Stinner17657bb2017-08-16 12:46:04 +02002640 p.join()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002641 queue = manager.get_queue()
2642 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002643 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002644 manager.shutdown()
Victor Stinner17657bb2017-08-16 12:46:04 +02002645
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002646 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002647 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002648 try:
2649 manager.start()
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002650 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002651 if e.errno != errno.EADDRINUSE:
2652 raise
2653 # Retry after some time, in case the old socket was lingering
2654 # (sporadic failure on buildbots)
2655 time.sleep(1.0)
2656 manager = QueueManager(
2657 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002658 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002659
Benjamin Petersone711caf2008-06-11 16:44:04 +00002660#
2661#
2662#
2663
2664SENTINEL = latin('')
2665
2666class _TestConnection(BaseTestCase):
2667
2668 ALLOWED_TYPES = ('processes', 'threads')
2669
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002670 @classmethod
2671 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002672 for msg in iter(conn.recv_bytes, SENTINEL):
2673 conn.send_bytes(msg)
2674 conn.close()
2675
2676 def test_connection(self):
2677 conn, child_conn = self.Pipe()
2678
2679 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002680 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002681 p.start()
2682
2683 seq = [1, 2.25, None]
2684 msg = latin('hello world')
2685 longmsg = msg * 10
2686 arr = array.array('i', list(range(4)))
2687
2688 if self.TYPE == 'processes':
2689 self.assertEqual(type(conn.fileno()), int)
2690
2691 self.assertEqual(conn.send(seq), None)
2692 self.assertEqual(conn.recv(), seq)
2693
2694 self.assertEqual(conn.send_bytes(msg), None)
2695 self.assertEqual(conn.recv_bytes(), msg)
2696
2697 if self.TYPE == 'processes':
2698 buffer = array.array('i', [0]*10)
2699 expected = list(arr) + [0] * (10 - len(arr))
2700 self.assertEqual(conn.send_bytes(arr), None)
2701 self.assertEqual(conn.recv_bytes_into(buffer),
2702 len(arr) * buffer.itemsize)
2703 self.assertEqual(list(buffer), expected)
2704
2705 buffer = array.array('i', [0]*10)
2706 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2707 self.assertEqual(conn.send_bytes(arr), None)
2708 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2709 len(arr) * buffer.itemsize)
2710 self.assertEqual(list(buffer), expected)
2711
2712 buffer = bytearray(latin(' ' * 40))
2713 self.assertEqual(conn.send_bytes(longmsg), None)
2714 try:
2715 res = conn.recv_bytes_into(buffer)
2716 except multiprocessing.BufferTooShort as e:
2717 self.assertEqual(e.args, (longmsg,))
2718 else:
2719 self.fail('expected BufferTooShort, got %s' % res)
2720
2721 poll = TimingWrapper(conn.poll)
2722
2723 self.assertEqual(poll(), False)
2724 self.assertTimingAlmostEqual(poll.elapsed, 0)
2725
Richard Oudkerk59d54042012-05-10 16:11:12 +01002726 self.assertEqual(poll(-1), False)
2727 self.assertTimingAlmostEqual(poll.elapsed, 0)
2728
Benjamin Petersone711caf2008-06-11 16:44:04 +00002729 self.assertEqual(poll(TIMEOUT1), False)
2730 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2731
2732 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002733 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002734
2735 self.assertEqual(poll(TIMEOUT1), True)
2736 self.assertTimingAlmostEqual(poll.elapsed, 0)
2737
2738 self.assertEqual(conn.recv(), None)
2739
2740 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2741 conn.send_bytes(really_big_msg)
2742 self.assertEqual(conn.recv_bytes(), really_big_msg)
2743
2744 conn.send_bytes(SENTINEL) # tell child to quit
2745 child_conn.close()
2746
2747 if self.TYPE == 'processes':
2748 self.assertEqual(conn.readable, True)
2749 self.assertEqual(conn.writable, True)
2750 self.assertRaises(EOFError, conn.recv)
2751 self.assertRaises(EOFError, conn.recv_bytes)
2752
2753 p.join()
2754
2755 def test_duplex_false(self):
2756 reader, writer = self.Pipe(duplex=False)
2757 self.assertEqual(writer.send(1), None)
2758 self.assertEqual(reader.recv(), 1)
2759 if self.TYPE == 'processes':
2760 self.assertEqual(reader.readable, True)
2761 self.assertEqual(reader.writable, False)
2762 self.assertEqual(writer.readable, False)
2763 self.assertEqual(writer.writable, True)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002764 self.assertRaises(OSError, reader.send, 2)
2765 self.assertRaises(OSError, writer.recv)
2766 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002767
2768 def test_spawn_close(self):
2769 # We test that a pipe connection can be closed by parent
2770 # process immediately after child is spawned. On Windows this
2771 # would have sometimes failed on old versions because
2772 # child_conn would be closed before the child got a chance to
2773 # duplicate it.
2774 conn, child_conn = self.Pipe()
2775
2776 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002777 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002778 p.start()
2779 child_conn.close() # this might complete before child initializes
2780
2781 msg = latin('hello')
2782 conn.send_bytes(msg)
2783 self.assertEqual(conn.recv_bytes(), msg)
2784
2785 conn.send_bytes(SENTINEL)
2786 conn.close()
2787 p.join()
2788
2789 def test_sendbytes(self):
2790 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002791 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002792
2793 msg = latin('abcdefghijklmnopqrstuvwxyz')
2794 a, b = self.Pipe()
2795
2796 a.send_bytes(msg)
2797 self.assertEqual(b.recv_bytes(), msg)
2798
2799 a.send_bytes(msg, 5)
2800 self.assertEqual(b.recv_bytes(), msg[5:])
2801
2802 a.send_bytes(msg, 7, 8)
2803 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2804
2805 a.send_bytes(msg, 26)
2806 self.assertEqual(b.recv_bytes(), latin(''))
2807
2808 a.send_bytes(msg, 26, 0)
2809 self.assertEqual(b.recv_bytes(), latin(''))
2810
2811 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2812
2813 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2814
2815 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2816
2817 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2818
2819 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2820
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002821 @classmethod
2822 def _is_fd_assigned(cls, fd):
2823 try:
2824 os.fstat(fd)
2825 except OSError as e:
2826 if e.errno == errno.EBADF:
2827 return False
2828 raise
2829 else:
2830 return True
2831
2832 @classmethod
2833 def _writefd(cls, conn, data, create_dummy_fds=False):
2834 if create_dummy_fds:
2835 for i in range(0, 256):
2836 if not cls._is_fd_assigned(i):
2837 os.dup2(conn.fileno(), i)
2838 fd = reduction.recv_handle(conn)
2839 if msvcrt:
2840 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2841 os.write(fd, data)
2842 os.close(fd)
2843
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002844 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002845 def test_fd_transfer(self):
2846 if self.TYPE != 'processes':
2847 self.skipTest("only makes sense with processes")
2848 conn, child_conn = self.Pipe(duplex=True)
2849
2850 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002851 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002852 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002853 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002854 with open(test.support.TESTFN, "wb") as f:
2855 fd = f.fileno()
2856 if msvcrt:
2857 fd = msvcrt.get_osfhandle(fd)
2858 reduction.send_handle(conn, fd, p.pid)
2859 p.join()
2860 with open(test.support.TESTFN, "rb") as f:
2861 self.assertEqual(f.read(), b"foo")
2862
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002863 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002864 @unittest.skipIf(sys.platform == "win32",
2865 "test semantics don't make sense on Windows")
2866 @unittest.skipIf(MAXFD <= 256,
2867 "largest assignable fd number is too small")
2868 @unittest.skipUnless(hasattr(os, "dup2"),
2869 "test needs os.dup2()")
2870 def test_large_fd_transfer(self):
2871 # With fd > 256 (issue #11657)
2872 if self.TYPE != 'processes':
2873 self.skipTest("only makes sense with processes")
2874 conn, child_conn = self.Pipe(duplex=True)
2875
2876 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002877 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002878 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002879 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002880 with open(test.support.TESTFN, "wb") as f:
2881 fd = f.fileno()
2882 for newfd in range(256, MAXFD):
2883 if not self._is_fd_assigned(newfd):
2884 break
2885 else:
2886 self.fail("could not find an unassigned large file descriptor")
2887 os.dup2(fd, newfd)
2888 try:
2889 reduction.send_handle(conn, newfd, p.pid)
2890 finally:
2891 os.close(newfd)
2892 p.join()
2893 with open(test.support.TESTFN, "rb") as f:
2894 self.assertEqual(f.read(), b"bar")
2895
Jesus Cea4507e642011-09-21 03:53:25 +02002896 @classmethod
2897 def _send_data_without_fd(self, conn):
2898 os.write(conn.fileno(), b"\0")
2899
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002900 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002901 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2902 def test_missing_fd_transfer(self):
2903 # Check that exception is raised when received data is not
2904 # accompanied by a file descriptor in ancillary data.
2905 if self.TYPE != 'processes':
2906 self.skipTest("only makes sense with processes")
2907 conn, child_conn = self.Pipe(duplex=True)
2908
2909 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2910 p.daemon = True
2911 p.start()
2912 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2913 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002914
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002915 def test_context(self):
2916 a, b = self.Pipe()
2917
2918 with a, b:
2919 a.send(1729)
2920 self.assertEqual(b.recv(), 1729)
2921 if self.TYPE == 'processes':
2922 self.assertFalse(a.closed)
2923 self.assertFalse(b.closed)
2924
2925 if self.TYPE == 'processes':
2926 self.assertTrue(a.closed)
2927 self.assertTrue(b.closed)
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002928 self.assertRaises(OSError, a.recv)
2929 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002930
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002931class _TestListener(BaseTestCase):
2932
Richard Oudkerk91257752012-06-15 21:53:34 +01002933 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002934
2935 def test_multiple_bind(self):
2936 for family in self.connection.families:
2937 l = self.connection.Listener(family=family)
2938 self.addCleanup(l.close)
2939 self.assertRaises(OSError, self.connection.Listener,
2940 l.address, family)
2941
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002942 def test_context(self):
2943 with self.connection.Listener() as l:
2944 with self.connection.Client(l.address) as c:
2945 with l.accept() as d:
2946 c.send(1729)
2947 self.assertEqual(d.recv(), 1729)
2948
2949 if self.TYPE == 'processes':
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02002950 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002951
Benjamin Petersone711caf2008-06-11 16:44:04 +00002952class _TestListenerClient(BaseTestCase):
2953
2954 ALLOWED_TYPES = ('processes', 'threads')
2955
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002956 @classmethod
2957 def _test(cls, address):
2958 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002959 conn.send('hello')
2960 conn.close()
2961
2962 def test_listener_client(self):
2963 for family in self.connection.families:
2964 l = self.connection.Listener(family=family)
2965 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002966 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002967 p.start()
2968 conn = l.accept()
2969 self.assertEqual(conn.recv(), 'hello')
2970 p.join()
2971 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002972
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002973 def test_issue14725(self):
2974 l = self.connection.Listener()
2975 p = self.Process(target=self._test, args=(l.address,))
2976 p.daemon = True
2977 p.start()
2978 time.sleep(1)
2979 # On Windows the client process should by now have connected,
2980 # written data and closed the pipe handle by now. This causes
2981 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2982 # 14725.
2983 conn = l.accept()
2984 self.assertEqual(conn.recv(), 'hello')
2985 conn.close()
2986 p.join()
2987 l.close()
2988
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002989 def test_issue16955(self):
2990 for fam in self.connection.families:
2991 l = self.connection.Listener(family=fam)
2992 c = self.connection.Client(l.address)
2993 a = l.accept()
2994 a.send_bytes(b"hello")
2995 self.assertTrue(c.poll(1))
2996 a.close()
2997 c.close()
2998 l.close()
2999
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003000class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003001
3002 ALLOWED_TYPES = ('processes', 'threads')
3003
3004 def test_empty_string(self):
3005 a, b = self.Pipe()
3006 self.assertEqual(a.poll(), False)
3007 b.send_bytes(b'')
3008 self.assertEqual(a.poll(), True)
3009 self.assertEqual(a.poll(), True)
3010
3011 @classmethod
3012 def _child_strings(cls, conn, strings):
3013 for s in strings:
3014 time.sleep(0.1)
3015 conn.send_bytes(s)
3016 conn.close()
3017
3018 def test_strings(self):
3019 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3020 a, b = self.Pipe()
3021 p = self.Process(target=self._child_strings, args=(b, strings))
3022 p.start()
3023
3024 for s in strings:
3025 for i in range(200):
3026 if a.poll(0.01):
3027 break
3028 x = a.recv_bytes()
3029 self.assertEqual(s, x)
3030
3031 p.join()
3032
3033 @classmethod
3034 def _child_boundaries(cls, r):
3035 # Polling may "pull" a message in to the child process, but we
3036 # don't want it to pull only part of a message, as that would
3037 # corrupt the pipe for any other processes which might later
3038 # read from it.
3039 r.poll(5)
3040
3041 def test_boundaries(self):
3042 r, w = self.Pipe(False)
3043 p = self.Process(target=self._child_boundaries, args=(r,))
3044 p.start()
3045 time.sleep(2)
3046 L = [b"first", b"second"]
3047 for obj in L:
3048 w.send_bytes(obj)
3049 w.close()
3050 p.join()
3051 self.assertIn(r.recv_bytes(), L)
3052
3053 @classmethod
3054 def _child_dont_merge(cls, b):
3055 b.send_bytes(b'a')
3056 b.send_bytes(b'b')
3057 b.send_bytes(b'cd')
3058
3059 def test_dont_merge(self):
3060 a, b = self.Pipe()
3061 self.assertEqual(a.poll(0.0), False)
3062 self.assertEqual(a.poll(0.1), False)
3063
3064 p = self.Process(target=self._child_dont_merge, args=(b,))
3065 p.start()
3066
3067 self.assertEqual(a.recv_bytes(), b'a')
3068 self.assertEqual(a.poll(1.0), True)
3069 self.assertEqual(a.poll(1.0), True)
3070 self.assertEqual(a.recv_bytes(), b'b')
3071 self.assertEqual(a.poll(1.0), True)
3072 self.assertEqual(a.poll(1.0), True)
3073 self.assertEqual(a.poll(0.0), True)
3074 self.assertEqual(a.recv_bytes(), b'cd')
3075
3076 p.join()
3077
Benjamin Petersone711caf2008-06-11 16:44:04 +00003078#
3079# Test of sending connection and socket objects between processes
3080#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003081
3082@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00003083class _TestPicklingConnections(BaseTestCase):
3084
3085 ALLOWED_TYPES = ('processes',)
3086
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003087 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02003088 def tearDownClass(cls):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003089 from multiprocessing import resource_sharer
Victor Stinner11f08072017-09-15 06:55:31 -07003090 resource_sharer.stop(timeout=TIMEOUT)
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02003091
3092 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003093 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003094 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003095 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003096 conn.send(l.address)
3097 new_conn = l.accept()
3098 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003099 new_conn.close()
3100 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003101
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003102 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003103 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01003104 l.listen()
Richard Oudkerk5d73c172012-05-08 22:24:47 +01003105 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003106 new_conn, addr = l.accept()
3107 conn.send(new_conn)
3108 new_conn.close()
3109 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003110
3111 conn.recv()
3112
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003113 @classmethod
3114 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003115 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003116 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003117 client.send(msg.upper())
3118 client.close()
3119
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003120 address, msg = conn.recv()
3121 client = socket.socket()
3122 client.connect(address)
3123 client.sendall(msg.upper())
3124 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003125
3126 conn.close()
3127
3128 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003129 families = self.connection.families
3130
3131 lconn, lconn0 = self.Pipe()
3132 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02003133 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003134 lp.start()
3135 lconn0.close()
3136
3137 rconn, rconn0 = self.Pipe()
3138 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003139 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003140 rp.start()
3141 rconn0.close()
3142
3143 for fam in families:
3144 msg = ('This connection uses family %s' % fam).encode('ascii')
3145 address = lconn.recv()
3146 rconn.send((address, msg))
3147 new_conn = lconn.recv()
3148 self.assertEqual(new_conn.recv(), msg.upper())
3149
3150 rconn.send(None)
3151
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003152 msg = latin('This connection uses a normal socket')
3153 address = lconn.recv()
3154 rconn.send((address, msg))
3155 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01003156 buf = []
3157 while True:
3158 s = new_conn.recv(100)
3159 if not s:
3160 break
3161 buf.append(s)
3162 buf = b''.join(buf)
3163 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003164 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003165
3166 lconn.send(None)
3167
3168 rconn.close()
3169 lconn.close()
3170
3171 lp.join()
3172 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02003173
3174 @classmethod
3175 def child_access(cls, conn):
3176 w = conn.recv()
3177 w.send('all is well')
3178 w.close()
3179
3180 r = conn.recv()
3181 msg = r.recv()
3182 conn.send(msg*2)
3183
3184 conn.close()
3185
3186 def test_access(self):
3187 # On Windows, if we do not specify a destination pid when
3188 # using DupHandle then we need to be careful to use the
3189 # correct access flags for DuplicateHandle(), or else
3190 # DupHandle.detach() will raise PermissionError. For example,
3191 # for a read only pipe handle we should use
3192 # access=FILE_GENERIC_READ. (Unfortunately
3193 # DUPLICATE_SAME_ACCESS does not work.)
3194 conn, child_conn = self.Pipe()
3195 p = self.Process(target=self.child_access, args=(child_conn,))
3196 p.daemon = True
3197 p.start()
3198 child_conn.close()
3199
3200 r, w = self.Pipe(duplex=False)
3201 conn.send(w)
3202 w.close()
3203 self.assertEqual(r.recv(), 'all is well')
3204 r.close()
3205
3206 r, w = self.Pipe(duplex=False)
3207 conn.send(r)
3208 r.close()
3209 w.send('foobar')
3210 w.close()
3211 self.assertEqual(conn.recv(), 'foobar'*2)
3212
Victor Stinnerb4c52962017-07-25 02:40:55 +02003213 p.join()
3214
Benjamin Petersone711caf2008-06-11 16:44:04 +00003215#
3216#
3217#
3218
3219class _TestHeap(BaseTestCase):
3220
3221 ALLOWED_TYPES = ('processes',)
3222
3223 def test_heap(self):
3224 iterations = 5000
3225 maxblocks = 50
3226 blocks = []
3227
3228 # create and destroy lots of blocks of different sizes
3229 for i in range(iterations):
3230 size = int(random.lognormvariate(0, 1) * 1000)
3231 b = multiprocessing.heap.BufferWrapper(size)
3232 blocks.append(b)
3233 if len(blocks) > maxblocks:
3234 i = random.randrange(maxblocks)
3235 del blocks[i]
3236
3237 # get the heap object
3238 heap = multiprocessing.heap.BufferWrapper._heap
3239
3240 # verify the state of the heap
3241 all = []
3242 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02003243 heap._lock.acquire()
3244 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003245 for L in list(heap._len_to_seq.values()):
3246 for arena, start, stop in L:
3247 all.append((heap._arenas.index(arena), start, stop,
3248 stop-start, 'free'))
3249 for arena, start, stop in heap._allocated_blocks:
3250 all.append((heap._arenas.index(arena), start, stop,
3251 stop-start, 'occupied'))
3252 occupied += (stop-start)
3253
3254 all.sort()
3255
3256 for i in range(len(all)-1):
3257 (arena, start, stop) = all[i][:3]
3258 (narena, nstart, nstop) = all[i+1][:3]
3259 self.assertTrue((arena != narena and nstart == 0) or
3260 (stop == nstart))
3261
Charles-François Natali778db492011-07-02 14:35:49 +02003262 def test_free_from_gc(self):
3263 # Check that freeing of blocks by the garbage collector doesn't deadlock
3264 # (issue #12352).
3265 # Make sure the GC is enabled, and set lower collection thresholds to
3266 # make collections more frequent (and increase the probability of
3267 # deadlock).
3268 if not gc.isenabled():
3269 gc.enable()
3270 self.addCleanup(gc.disable)
3271 thresholds = gc.get_threshold()
3272 self.addCleanup(gc.set_threshold, *thresholds)
3273 gc.set_threshold(10)
3274
3275 # perform numerous block allocations, with cyclic references to make
3276 # sure objects are collected asynchronously by the gc
3277 for i in range(5000):
3278 a = multiprocessing.heap.BufferWrapper(1)
3279 b = multiprocessing.heap.BufferWrapper(1)
3280 # circular references
3281 a.buddy = b
3282 b.buddy = a
3283
Benjamin Petersone711caf2008-06-11 16:44:04 +00003284#
3285#
3286#
3287
Benjamin Petersone711caf2008-06-11 16:44:04 +00003288class _Foo(Structure):
3289 _fields_ = [
3290 ('x', c_int),
Gareth Rees3913bad2017-07-21 11:35:33 +01003291 ('y', c_double),
3292 ('z', c_longlong,)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003293 ]
3294
3295class _TestSharedCTypes(BaseTestCase):
3296
3297 ALLOWED_TYPES = ('processes',)
3298
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00003299 def setUp(self):
3300 if not HAS_SHAREDCTYPES:
3301 self.skipTest("requires multiprocessing.sharedctypes")
3302
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003303 @classmethod
Gareth Rees3913bad2017-07-21 11:35:33 +01003304 def _double(cls, x, y, z, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003305 x.value *= 2
3306 y.value *= 2
Gareth Rees3913bad2017-07-21 11:35:33 +01003307 z.value *= 2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003308 foo.x *= 2
3309 foo.y *= 2
3310 string.value *= 2
3311 for i in range(len(arr)):
3312 arr[i] *= 2
3313
3314 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003315 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003316 y = Value(c_double, 1.0/3.0, lock=lock)
Gareth Rees3913bad2017-07-21 11:35:33 +01003317 z = Value(c_longlong, 2 ** 33, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003318 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00003319 arr = self.Array('d', list(range(10)), lock=lock)
3320 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00003321 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00003322
Gareth Rees3913bad2017-07-21 11:35:33 +01003323 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02003324 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003325 p.start()
3326 p.join()
3327
3328 self.assertEqual(x.value, 14)
3329 self.assertAlmostEqual(y.value, 2.0/3.0)
Gareth Rees3913bad2017-07-21 11:35:33 +01003330 self.assertEqual(z.value, 2 ** 34)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003331 self.assertEqual(foo.x, 6)
3332 self.assertAlmostEqual(foo.y, 4.0)
3333 for i in range(10):
3334 self.assertAlmostEqual(arr[i], i*2)
3335 self.assertEqual(string.value, latin('hellohello'))
3336
3337 def test_synchronize(self):
3338 self.test_sharedctypes(lock=True)
3339
3340 def test_copy(self):
Gareth Rees3913bad2017-07-21 11:35:33 +01003341 foo = _Foo(2, 5.0, 2 ** 33)
Brian Curtinafa88b52010-10-07 01:12:19 +00003342 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003343 foo.x = 0
3344 foo.y = 0
Gareth Rees3913bad2017-07-21 11:35:33 +01003345 foo.z = 0
Benjamin Petersone711caf2008-06-11 16:44:04 +00003346 self.assertEqual(bar.x, 2)
3347 self.assertAlmostEqual(bar.y, 5.0)
Gareth Rees3913bad2017-07-21 11:35:33 +01003348 self.assertEqual(bar.z, 2 ** 33)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003349
3350#
3351#
3352#
3353
3354class _TestFinalize(BaseTestCase):
3355
3356 ALLOWED_TYPES = ('processes',)
3357
Antoine Pitrou1eb6c002017-06-13 17:10:39 +02003358 def setUp(self):
3359 self.registry_backup = util._finalizer_registry.copy()
3360 util._finalizer_registry.clear()
3361
3362 def tearDown(self):
3363 self.assertFalse(util._finalizer_registry)
3364 util._finalizer_registry.update(self.registry_backup)
3365
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003366 @classmethod
3367 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003368 class Foo(object):
3369 pass
3370
3371 a = Foo()
3372 util.Finalize(a, conn.send, args=('a',))
3373 del a # triggers callback for a
3374
3375 b = Foo()
3376 close_b = util.Finalize(b, conn.send, args=('b',))
3377 close_b() # triggers callback for b
3378 close_b() # does nothing because callback has already been called
3379 del b # does nothing because callback has already been called
3380
3381 c = Foo()
3382 util.Finalize(c, conn.send, args=('c',))
3383
3384 d10 = Foo()
3385 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
3386
3387 d01 = Foo()
3388 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
3389 d02 = Foo()
3390 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
3391 d03 = Foo()
3392 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
3393
3394 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
3395
3396 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
3397
Ezio Melotti13925002011-03-16 11:05:33 +02003398 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00003399 # garbage collecting locals
3400 util._exit_function()
3401 conn.close()
3402 os._exit(0)
3403
3404 def test_finalize(self):
3405 conn, child_conn = self.Pipe()
3406
3407 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003408 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00003409 p.start()
3410 p.join()
3411
3412 result = [obj for obj in iter(conn.recv, 'STOP')]
3413 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
3414
Antoine Pitrou1eb6c002017-06-13 17:10:39 +02003415 def test_thread_safety(self):
3416 # bpo-24484: _run_finalizers() should be thread-safe
3417 def cb():
3418 pass
3419
3420 class Foo(object):
3421 def __init__(self):
3422 self.ref = self # create reference cycle
3423 # insert finalizer at random key
3424 util.Finalize(self, cb, exitpriority=random.randint(1, 100))
3425
3426 finish = False
3427 exc = None
3428
3429 def run_finalizers():
3430 nonlocal exc
3431 while not finish:
3432 time.sleep(random.random() * 1e-1)
3433 try:
3434 # A GC run will eventually happen during this,
3435 # collecting stale Foo's and mutating the registry
3436 util._run_finalizers()
3437 except Exception as e:
3438 exc = e
3439
3440 def make_finalizers():
3441 nonlocal exc
3442 d = {}
3443 while not finish:
3444 try:
3445 # Old Foo's get gradually replaced and later
3446 # collected by the GC (because of the cyclic ref)
3447 d[random.getrandbits(5)] = {Foo() for i in range(10)}
3448 except Exception as e:
3449 exc = e
3450 d.clear()
3451
3452 old_interval = sys.getswitchinterval()
3453 old_threshold = gc.get_threshold()
3454 try:
3455 sys.setswitchinterval(1e-6)
3456 gc.set_threshold(5, 5, 5)
3457 threads = [threading.Thread(target=run_finalizers),
3458 threading.Thread(target=make_finalizers)]
3459 with test.support.start_threads(threads):
3460 time.sleep(4.0) # Wait a bit to trigger race condition
3461 finish = True
3462 if exc is not None:
3463 raise exc
3464 finally:
3465 sys.setswitchinterval(old_interval)
3466 gc.set_threshold(*old_threshold)
3467 gc.collect() # Collect remaining Foo's
3468
3469
Benjamin Petersone711caf2008-06-11 16:44:04 +00003470#
3471# Test that from ... import * works for each module
3472#
3473
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003474class _TestImportStar(unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003475
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003476 def get_module_names(self):
3477 import glob
3478 folder = os.path.dirname(multiprocessing.__file__)
3479 pattern = os.path.join(folder, '*.py')
3480 files = glob.glob(pattern)
3481 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
3482 modules = ['multiprocessing.' + m for m in modules]
3483 modules.remove('multiprocessing.__init__')
3484 modules.append('multiprocessing')
3485 return modules
Benjamin Petersone711caf2008-06-11 16:44:04 +00003486
3487 def test_import(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003488 modules = self.get_module_names()
3489 if sys.platform == 'win32':
3490 modules.remove('multiprocessing.popen_fork')
3491 modules.remove('multiprocessing.popen_forkserver')
3492 modules.remove('multiprocessing.popen_spawn_posix')
3493 else:
3494 modules.remove('multiprocessing.popen_spawn_win32')
3495 if not HAS_REDUCTION:
3496 modules.remove('multiprocessing.popen_forkserver')
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003497
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003498 if c_int is None:
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003499 # This module requires _ctypes
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003500 modules.remove('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00003501
3502 for name in modules:
3503 __import__(name)
3504 mod = sys.modules[name]
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003505 self.assertTrue(hasattr(mod, '__all__'), name)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003506
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003507 for attr in mod.__all__:
Benjamin Petersone711caf2008-06-11 16:44:04 +00003508 self.assertTrue(
3509 hasattr(mod, attr),
3510 '%r does not have attribute %r' % (mod, attr)
3511 )
3512
3513#
3514# Quick test that logging works -- does not test logging output
3515#
3516
3517class _TestLogging(BaseTestCase):
3518
3519 ALLOWED_TYPES = ('processes',)
3520
3521 def test_enable_logging(self):
3522 logger = multiprocessing.get_logger()
3523 logger.setLevel(util.SUBWARNING)
3524 self.assertTrue(logger is not None)
3525 logger.debug('this will not be printed')
3526 logger.info('nor will this')
3527 logger.setLevel(LOG_LEVEL)
3528
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00003529 @classmethod
3530 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00003531 logger = multiprocessing.get_logger()
3532 conn.send(logger.getEffectiveLevel())
3533
3534 def test_level(self):
3535 LEVEL1 = 32
3536 LEVEL2 = 37
3537
3538 logger = multiprocessing.get_logger()
3539 root_logger = logging.getLogger()
3540 root_level = root_logger.level
3541
3542 reader, writer = multiprocessing.Pipe(duplex=False)
3543
3544 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02003545 p = self.Process(target=self._test_level, args=(writer,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003546 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003547 self.assertEqual(LEVEL1, reader.recv())
Victor Stinner06634952017-07-24 13:02:20 +02003548 p.join()
3549 p.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003550
3551 logger.setLevel(logging.NOTSET)
3552 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02003553 p = self.Process(target=self._test_level, args=(writer,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003554 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003555 self.assertEqual(LEVEL2, reader.recv())
Victor Stinner06634952017-07-24 13:02:20 +02003556 p.join()
3557 p.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003558
3559 root_logger.setLevel(root_level)
3560 logger.setLevel(level=LOG_LEVEL)
3561
Jesse Nollerb9a49b72009-11-21 18:09:38 +00003562
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00003563# class _TestLoggingProcessName(BaseTestCase):
3564#
3565# def handle(self, record):
3566# assert record.processName == multiprocessing.current_process().name
3567# self.__handled = True
3568#
3569# def test_logging(self):
3570# handler = logging.Handler()
3571# handler.handle = self.handle
3572# self.__handled = False
3573# # Bypass getLogger() and side-effects
3574# logger = logging.getLoggerClass()(
3575# 'multiprocessing.test.TestLoggingProcessName')
3576# logger.addHandler(handler)
3577# logger.propagate = False
3578#
3579# logger.warn('foo')
3580# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00003581
Benjamin Petersone711caf2008-06-11 16:44:04 +00003582#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003583# Check that Process.join() retries if os.waitpid() fails with EINTR
3584#
3585
3586class _TestPollEintr(BaseTestCase):
3587
3588 ALLOWED_TYPES = ('processes',)
3589
3590 @classmethod
3591 def _killer(cls, pid):
Richard Oudkerk6a53af82013-08-28 13:50:19 +01003592 time.sleep(0.1)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003593 os.kill(pid, signal.SIGUSR1)
3594
3595 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3596 def test_poll_eintr(self):
3597 got_signal = [False]
3598 def record(*args):
3599 got_signal[0] = True
3600 pid = os.getpid()
3601 oldhandler = signal.signal(signal.SIGUSR1, record)
3602 try:
3603 killer = self.Process(target=self._killer, args=(pid,))
3604 killer.start()
Richard Oudkerk6a53af82013-08-28 13:50:19 +01003605 try:
3606 p = self.Process(target=time.sleep, args=(2,))
3607 p.start()
3608 p.join()
3609 finally:
3610 killer.join()
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003611 self.assertTrue(got_signal[0])
3612 self.assertEqual(p.exitcode, 0)
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00003613 finally:
3614 signal.signal(signal.SIGUSR1, oldhandler)
3615
3616#
Jesse Noller6214edd2009-01-19 16:23:53 +00003617# Test to verify handle verification, see issue 3321
3618#
3619
3620class TestInvalidHandle(unittest.TestCase):
3621
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003622 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00003623 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003624 conn = multiprocessing.connection.Connection(44977608)
Charles-François Natali6703bb42013-09-06 21:12:22 +02003625 # check that poll() doesn't crash
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003626 try:
Charles-François Natali6703bb42013-09-06 21:12:22 +02003627 conn.poll()
3628 except (ValueError, OSError):
3629 pass
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003630 finally:
3631 # Hack private attribute _handle to avoid printing an error
3632 # in conn.__del__
3633 conn._handle = None
Andrew Svetlovf7a17b42012-12-25 16:47:37 +02003634 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02003635 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00003636
Benjamin Petersone711caf2008-06-11 16:44:04 +00003637
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003638
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003639class OtherTest(unittest.TestCase):
3640 # TODO: add more tests for deliver/answer challenge.
3641 def test_deliver_challenge_auth_failure(self):
3642 class _FakeConnection(object):
3643 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003644 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003645 def send_bytes(self, data):
3646 pass
3647 self.assertRaises(multiprocessing.AuthenticationError,
3648 multiprocessing.connection.deliver_challenge,
3649 _FakeConnection(), b'abc')
3650
3651 def test_answer_challenge_auth_failure(self):
3652 class _FakeConnection(object):
3653 def __init__(self):
3654 self.count = 0
3655 def recv_bytes(self, size):
3656 self.count += 1
3657 if self.count == 1:
3658 return multiprocessing.connection.CHALLENGE
3659 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003660 return b'something bogus'
3661 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003662 def send_bytes(self, data):
3663 pass
3664 self.assertRaises(multiprocessing.AuthenticationError,
3665 multiprocessing.connection.answer_challenge,
3666 _FakeConnection(), b'abc')
3667
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003668#
3669# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3670#
3671
3672def initializer(ns):
3673 ns.test += 1
3674
3675class TestInitializers(unittest.TestCase):
3676 def setUp(self):
3677 self.mgr = multiprocessing.Manager()
3678 self.ns = self.mgr.Namespace()
3679 self.ns.test = 0
3680
3681 def tearDown(self):
3682 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003683 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003684
3685 def test_manager_initializer(self):
3686 m = multiprocessing.managers.SyncManager()
3687 self.assertRaises(TypeError, m.start, 1)
3688 m.start(initializer, (self.ns,))
3689 self.assertEqual(self.ns.test, 1)
3690 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003691 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003692
3693 def test_pool_initializer(self):
3694 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3695 p = multiprocessing.Pool(1, initializer, (self.ns,))
3696 p.close()
3697 p.join()
3698 self.assertEqual(self.ns.test, 1)
3699
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003700#
3701# Issue 5155, 5313, 5331: Test process in processes
3702# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3703#
3704
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003705def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003706 try:
3707 item = q.get(block=False)
3708 except pyqueue.Empty:
3709 pass
3710
Victor Stinnerb4c52962017-07-25 02:40:55 +02003711def _test_process():
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003712 queue = multiprocessing.Queue()
3713 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3714 subProc.daemon = True
3715 subProc.start()
3716 subProc.join()
3717
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003718def _afunc(x):
3719 return x*x
3720
3721def pool_in_process():
3722 pool = multiprocessing.Pool(processes=4)
3723 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003724 pool.close()
3725 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003726
3727class _file_like(object):
3728 def __init__(self, delegate):
3729 self._delegate = delegate
3730 self._pid = None
3731
3732 @property
3733 def cache(self):
3734 pid = os.getpid()
3735 # There are no race conditions since fork keeps only the running thread
3736 if pid != self._pid:
3737 self._pid = pid
3738 self._cache = []
3739 return self._cache
3740
3741 def write(self, data):
3742 self.cache.append(data)
3743
3744 def flush(self):
3745 self._delegate.write(''.join(self.cache))
3746 self._cache = []
3747
3748class TestStdinBadfiledescriptor(unittest.TestCase):
3749
3750 def test_queue_in_process(self):
Victor Stinnerb4c52962017-07-25 02:40:55 +02003751 proc = multiprocessing.Process(target=_test_process)
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003752 proc.start()
3753 proc.join()
3754
3755 def test_pool_in_process(self):
3756 p = multiprocessing.Process(target=pool_in_process)
3757 p.start()
3758 p.join()
3759
3760 def test_flushing(self):
3761 sio = io.StringIO()
3762 flike = _file_like(sio)
3763 flike.write('foo')
3764 proc = multiprocessing.Process(target=lambda: flike.flush())
3765 flike.flush()
3766 assert sio.getvalue() == 'foo'
3767
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003768
3769class TestWait(unittest.TestCase):
3770
3771 @classmethod
3772 def _child_test_wait(cls, w, slow):
3773 for i in range(10):
3774 if slow:
3775 time.sleep(random.random()*0.1)
3776 w.send((i, os.getpid()))
3777 w.close()
3778
3779 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003780 from multiprocessing.connection import wait
3781 readers = []
3782 procs = []
3783 messages = []
3784
3785 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003786 r, w = multiprocessing.Pipe(duplex=False)
3787 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003788 p.daemon = True
3789 p.start()
3790 w.close()
3791 readers.append(r)
3792 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003793 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003794
3795 while readers:
3796 for r in wait(readers):
3797 try:
3798 msg = r.recv()
3799 except EOFError:
3800 readers.remove(r)
3801 r.close()
3802 else:
3803 messages.append(msg)
3804
3805 messages.sort()
3806 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3807 self.assertEqual(messages, expected)
3808
3809 @classmethod
3810 def _child_test_wait_socket(cls, address, slow):
3811 s = socket.socket()
3812 s.connect(address)
3813 for i in range(10):
3814 if slow:
3815 time.sleep(random.random()*0.1)
3816 s.sendall(('%s\n' % i).encode('ascii'))
3817 s.close()
3818
3819 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003820 from multiprocessing.connection import wait
3821 l = socket.socket()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003822 l.bind((test.support.HOST, 0))
Charles-François Natali6e204602014-07-23 19:28:13 +01003823 l.listen()
Antoine Pitrouf6fbf562013-08-22 00:39:46 +02003824 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003825 readers = []
3826 procs = []
3827 dic = {}
3828
3829 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003830 p = multiprocessing.Process(target=self._child_test_wait_socket,
3831 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003832 p.daemon = True
3833 p.start()
3834 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003835 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003836
3837 for i in range(4):
3838 r, _ = l.accept()
3839 readers.append(r)
3840 dic[r] = []
3841 l.close()
3842
3843 while readers:
3844 for r in wait(readers):
3845 msg = r.recv(32)
3846 if not msg:
3847 readers.remove(r)
3848 r.close()
3849 else:
3850 dic[r].append(msg)
3851
3852 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3853 for v in dic.values():
3854 self.assertEqual(b''.join(v), expected)
3855
3856 def test_wait_slow(self):
3857 self.test_wait(True)
3858
3859 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003860 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003861
3862 def test_wait_timeout(self):
3863 from multiprocessing.connection import wait
3864
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003865 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003866 a, b = multiprocessing.Pipe()
3867
3868 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003869 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003870 delta = time.time() - start
3871
3872 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003873 self.assertLess(delta, expected * 2)
3874 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003875
3876 b.send(None)
3877
3878 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003879 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003880 delta = time.time() - start
3881
3882 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003883 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003884
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003885 @classmethod
3886 def signal_and_sleep(cls, sem, period):
3887 sem.release()
3888 time.sleep(period)
3889
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003890 def test_wait_integer(self):
3891 from multiprocessing.connection import wait
3892
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003893 expected = 3
Giampaolo Rodola'0c8ad612013-01-14 02:24:05 +01003894 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003895 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003896 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003897 p = multiprocessing.Process(target=self.signal_and_sleep,
3898 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003899
3900 p.start()
3901 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003902 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003903
3904 start = time.time()
3905 res = wait([a, p.sentinel, b], expected + 20)
3906 delta = time.time() - start
3907
3908 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003909 self.assertLess(delta, expected + 2)
3910 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003911
3912 a.send(None)
3913
3914 start = time.time()
3915 res = wait([a, p.sentinel, b], 20)
3916 delta = time.time() - start
3917
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003918 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003919 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003920
3921 b.send(None)
3922
3923 start = time.time()
3924 res = wait([a, p.sentinel, b], 20)
3925 delta = time.time() - start
3926
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003927 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003928 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003929
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003930 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003931 p.join()
3932
Richard Oudkerk59d54042012-05-10 16:11:12 +01003933 def test_neg_timeout(self):
3934 from multiprocessing.connection import wait
3935 a, b = multiprocessing.Pipe()
3936 t = time.time()
3937 res = wait([a], timeout=-1)
3938 t = time.time() - t
3939 self.assertEqual(res, [])
3940 self.assertLess(t, 1)
3941 a.close()
3942 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003943
Antoine Pitrou709176f2012-04-01 17:19:09 +02003944#
3945# Issue 14151: Test invalid family on invalid environment
3946#
3947
3948class TestInvalidFamily(unittest.TestCase):
3949
3950 @unittest.skipIf(WIN32, "skipped on Windows")
3951 def test_invalid_family(self):
3952 with self.assertRaises(ValueError):
3953 multiprocessing.connection.Listener(r'\\.\test')
3954
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003955 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3956 def test_invalid_family_win32(self):
3957 with self.assertRaises(ValueError):
3958 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003959
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003960#
3961# Issue 12098: check sys.flags of child matches that for parent
3962#
3963
3964class TestFlags(unittest.TestCase):
3965 @classmethod
3966 def run_in_grandchild(cls, conn):
3967 conn.send(tuple(sys.flags))
3968
3969 @classmethod
3970 def run_in_child(cls):
3971 import json
3972 r, w = multiprocessing.Pipe(duplex=False)
3973 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3974 p.start()
3975 grandchild_flags = r.recv()
3976 p.join()
3977 r.close()
3978 w.close()
3979 flags = (tuple(sys.flags), grandchild_flags)
3980 print(json.dumps(flags))
3981
3982 def test_flags(self):
3983 import json, subprocess
3984 # start child process using unusual flags
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01003985 prog = ('from test._test_multiprocessing import TestFlags; ' +
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003986 'TestFlags.run_in_child()')
3987 data = subprocess.check_output(
3988 [sys.executable, '-E', '-S', '-O', '-c', prog])
3989 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3990 self.assertEqual(child_flags, grandchild_flags)
3991
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003992#
3993# Test interaction with socket timeouts - see Issue #6056
3994#
3995
3996class TestTimeouts(unittest.TestCase):
3997 @classmethod
3998 def _test_timeout(cls, child, address):
3999 time.sleep(1)
4000 child.send(123)
4001 child.close()
4002 conn = multiprocessing.connection.Client(address)
4003 conn.send(456)
4004 conn.close()
4005
4006 def test_timeout(self):
4007 old_timeout = socket.getdefaulttimeout()
4008 try:
4009 socket.setdefaulttimeout(0.1)
4010 parent, child = multiprocessing.Pipe(duplex=True)
4011 l = multiprocessing.connection.Listener(family='AF_INET')
4012 p = multiprocessing.Process(target=self._test_timeout,
4013 args=(child, l.address))
4014 p.start()
4015 child.close()
4016 self.assertEqual(parent.recv(), 123)
4017 parent.close()
4018 conn = l.accept()
4019 self.assertEqual(conn.recv(), 456)
4020 conn.close()
4021 l.close()
Victor Stinner11f08072017-09-15 06:55:31 -07004022 join_process(p)
Richard Oudkerkb15e6222012-07-27 14:19:00 +01004023 finally:
4024 socket.setdefaulttimeout(old_timeout)
4025
Richard Oudkerke88a2442012-08-14 11:41:32 +01004026#
4027# Test what happens with no "if __name__ == '__main__'"
4028#
4029
4030class TestNoForkBomb(unittest.TestCase):
4031 def test_noforkbomb(self):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004032 sm = multiprocessing.get_start_method()
Richard Oudkerke88a2442012-08-14 11:41:32 +01004033 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004034 if sm != 'fork':
Berker Peksag076dbd02015-05-06 07:01:52 +03004035 rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02004036 self.assertEqual(out, b'')
4037 self.assertIn(b'RuntimeError', err)
Richard Oudkerke88a2442012-08-14 11:41:32 +01004038 else:
Berker Peksag076dbd02015-05-06 07:01:52 +03004039 rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
Serhiy Storchakaa25c5422015-02-13 15:13:33 +02004040 self.assertEqual(out.rstrip(), b'123')
4041 self.assertEqual(err, b'')
Richard Oudkerke88a2442012-08-14 11:41:32 +01004042
4043#
Richard Oudkerk409c3132013-04-17 20:58:00 +01004044# Issue #17555: ForkAwareThreadLock
4045#
4046
4047class TestForkAwareThreadLock(unittest.TestCase):
4048 # We recurisvely start processes. Issue #17555 meant that the
4049 # after fork registry would get duplicate entries for the same
4050 # lock. The size of the registry at generation n was ~2**n.
4051
4052 @classmethod
4053 def child(cls, n, conn):
4054 if n > 1:
4055 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4056 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01004057 conn.close()
Victor Stinner11f08072017-09-15 06:55:31 -07004058 join_process(p)
Richard Oudkerk409c3132013-04-17 20:58:00 +01004059 else:
4060 conn.send(len(util._afterfork_registry))
4061 conn.close()
4062
4063 def test_lock(self):
4064 r, w = multiprocessing.Pipe(False)
4065 l = util.ForkAwareThreadLock()
4066 old_size = len(util._afterfork_registry)
4067 p = multiprocessing.Process(target=self.child, args=(5, w))
4068 p.start()
Richard Oudkerka01fb392013-08-21 19:45:19 +01004069 w.close()
Richard Oudkerk409c3132013-04-17 20:58:00 +01004070 new_size = r.recv()
Victor Stinner11f08072017-09-15 06:55:31 -07004071 join_process(p)
Richard Oudkerk409c3132013-04-17 20:58:00 +01004072 self.assertLessEqual(new_size, old_size)
4073
4074#
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004075# Check that non-forked child processes do not inherit unneeded fds/handles
4076#
4077
4078class TestCloseFds(unittest.TestCase):
4079
4080 def get_high_socket_fd(self):
4081 if WIN32:
4082 # The child process will not have any socket handles, so
4083 # calling socket.fromfd() should produce WSAENOTSOCK even
4084 # if there is a handle of the same number.
4085 return socket.socket().detach()
4086 else:
4087 # We want to produce a socket with an fd high enough that a
4088 # freshly created child process will not have any fds as high.
4089 fd = socket.socket().detach()
4090 to_close = []
4091 while fd < 50:
4092 to_close.append(fd)
4093 fd = os.dup(fd)
4094 for x in to_close:
4095 os.close(x)
4096 return fd
4097
4098 def close(self, fd):
4099 if WIN32:
4100 socket.socket(fileno=fd).close()
4101 else:
4102 os.close(fd)
4103
4104 @classmethod
4105 def _test_closefds(cls, conn, fd):
4106 try:
4107 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
4108 except Exception as e:
4109 conn.send(e)
4110 else:
4111 s.close()
4112 conn.send(None)
4113
4114 def test_closefd(self):
4115 if not HAS_REDUCTION:
4116 raise unittest.SkipTest('requires fd pickling')
4117
4118 reader, writer = multiprocessing.Pipe()
4119 fd = self.get_high_socket_fd()
4120 try:
4121 p = multiprocessing.Process(target=self._test_closefds,
4122 args=(writer, fd))
4123 p.start()
4124 writer.close()
4125 e = reader.recv()
Victor Stinner11f08072017-09-15 06:55:31 -07004126 join_process(p)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004127 finally:
4128 self.close(fd)
4129 writer.close()
4130 reader.close()
4131
4132 if multiprocessing.get_start_method() == 'fork':
4133 self.assertIs(e, None)
4134 else:
4135 WSAENOTSOCK = 10038
4136 self.assertIsInstance(e, OSError)
4137 self.assertTrue(e.errno == errno.EBADF or
4138 e.winerror == WSAENOTSOCK, e)
4139
4140#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004141# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
4142#
4143
4144class TestIgnoreEINTR(unittest.TestCase):
4145
4146 @classmethod
4147 def _test_ignore(cls, conn):
4148 def handler(signum, frame):
4149 pass
4150 signal.signal(signal.SIGUSR1, handler)
4151 conn.send('ready')
4152 x = conn.recv()
4153 conn.send(x)
4154 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
4155
4156 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4157 def test_ignore(self):
4158 conn, child_conn = multiprocessing.Pipe()
4159 try:
4160 p = multiprocessing.Process(target=self._test_ignore,
4161 args=(child_conn,))
4162 p.daemon = True
4163 p.start()
4164 child_conn.close()
4165 self.assertEqual(conn.recv(), 'ready')
4166 time.sleep(0.1)
4167 os.kill(p.pid, signal.SIGUSR1)
4168 time.sleep(0.1)
4169 conn.send(1234)
4170 self.assertEqual(conn.recv(), 1234)
4171 time.sleep(0.1)
4172 os.kill(p.pid, signal.SIGUSR1)
4173 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
4174 time.sleep(0.1)
4175 p.join()
4176 finally:
4177 conn.close()
4178
4179 @classmethod
4180 def _test_ignore_listener(cls, conn):
4181 def handler(signum, frame):
4182 pass
4183 signal.signal(signal.SIGUSR1, handler)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004184 with multiprocessing.connection.Listener() as l:
4185 conn.send(l.address)
4186 a = l.accept()
4187 a.send('welcome')
Richard Oudkerkcca8c532013-07-01 18:59:26 +01004188
4189 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4190 def test_ignore_listener(self):
4191 conn, child_conn = multiprocessing.Pipe()
4192 try:
4193 p = multiprocessing.Process(target=self._test_ignore_listener,
4194 args=(child_conn,))
4195 p.daemon = True
4196 p.start()
4197 child_conn.close()
4198 address = conn.recv()
4199 time.sleep(0.1)
4200 os.kill(p.pid, signal.SIGUSR1)
4201 time.sleep(0.1)
4202 client = multiprocessing.connection.Client(address)
4203 self.assertEqual(client.recv(), 'welcome')
4204 p.join()
4205 finally:
4206 conn.close()
4207
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004208class TestStartMethod(unittest.TestCase):
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004209 @classmethod
4210 def _check_context(cls, conn):
4211 conn.send(multiprocessing.get_start_method())
4212
4213 def check_context(self, ctx):
4214 r, w = ctx.Pipe(duplex=False)
4215 p = ctx.Process(target=self._check_context, args=(w,))
4216 p.start()
4217 w.close()
4218 child_method = r.recv()
4219 r.close()
4220 p.join()
4221 self.assertEqual(child_method, ctx.get_start_method())
4222
4223 def test_context(self):
4224 for method in ('fork', 'spawn', 'forkserver'):
4225 try:
4226 ctx = multiprocessing.get_context(method)
4227 except ValueError:
4228 continue
4229 self.assertEqual(ctx.get_start_method(), method)
4230 self.assertIs(ctx.get_context(), ctx)
4231 self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
4232 self.assertRaises(ValueError, ctx.set_start_method, None)
4233 self.check_context(ctx)
4234
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004235 def test_set_get(self):
4236 multiprocessing.set_forkserver_preload(PRELOAD)
4237 count = 0
4238 old_method = multiprocessing.get_start_method()
Jesse Nollerd00df3c2008-06-18 14:22:48 +00004239 try:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004240 for method in ('fork', 'spawn', 'forkserver'):
4241 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004242 multiprocessing.set_start_method(method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004243 except ValueError:
4244 continue
4245 self.assertEqual(multiprocessing.get_start_method(), method)
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004246 ctx = multiprocessing.get_context()
4247 self.assertEqual(ctx.get_start_method(), method)
4248 self.assertTrue(type(ctx).__name__.lower().startswith(method))
4249 self.assertTrue(
4250 ctx.Process.__name__.lower().startswith(method))
4251 self.check_context(multiprocessing)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004252 count += 1
4253 finally:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004254 multiprocessing.set_start_method(old_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004255 self.assertGreaterEqual(count, 1)
4256
4257 def test_get_all(self):
4258 methods = multiprocessing.get_all_start_methods()
4259 if sys.platform == 'win32':
4260 self.assertEqual(methods, ['spawn'])
4261 else:
4262 self.assertTrue(methods == ['fork', 'spawn'] or
4263 methods == ['fork', 'spawn', 'forkserver'])
4264
Antoine Pitroucd2a2012016-12-10 17:13:16 +01004265 def test_preload_resources(self):
4266 if multiprocessing.get_start_method() != 'forkserver':
4267 self.skipTest("test only relevant for 'forkserver' method")
4268 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
4269 rc, out, err = test.support.script_helper.assert_python_ok(name)
4270 out = out.decode()
4271 err = err.decode()
4272 if out.rstrip() != 'ok' or err != '':
4273 print(out)
4274 print(err)
4275 self.fail("failed spawning forkserver or grandchild")
4276
4277
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004278#
4279# Check that killing process does not leak named semaphores
4280#
4281
4282@unittest.skipIf(sys.platform == "win32",
4283 "test semantics don't make sense on Windows")
4284class TestSemaphoreTracker(unittest.TestCase):
4285 def test_semaphore_tracker(self):
4286 import subprocess
4287 cmd = '''if 1:
4288 import multiprocessing as mp, time, os
4289 mp.set_start_method("spawn")
4290 lock1 = mp.Lock()
4291 lock2 = mp.Lock()
4292 os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
4293 os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
4294 time.sleep(10)
4295 '''
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004296 r, w = os.pipe()
4297 p = subprocess.Popen([sys.executable,
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004298 '-c', cmd % (w, w)],
Richard Oudkerk67e51982013-08-22 23:37:23 +01004299 pass_fds=[w],
4300 stderr=subprocess.PIPE)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004301 os.close(w)
4302 with open(r, 'rb', closefd=True) as f:
4303 name1 = f.readline().rstrip().decode('ascii')
4304 name2 = f.readline().rstrip().decode('ascii')
4305 _multiprocessing.sem_unlink(name1)
4306 p.terminate()
4307 p.wait()
Richard Oudkerk42a526c2014-02-21 22:29:58 +00004308 time.sleep(2.0)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004309 with self.assertRaises(OSError) as ctx:
4310 _multiprocessing.sem_unlink(name2)
4311 # docs say it should be ENOENT, but OSX seems to give EINVAL
4312 self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
Richard Oudkerk67e51982013-08-22 23:37:23 +01004313 err = p.stderr.read().decode('utf-8')
4314 p.stderr.close()
4315 expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
4316 self.assertRegex(err, expected)
R David Murray44b548d2016-09-08 13:59:53 -04004317 self.assertRegex(err, r'semaphore_tracker: %r: \[Errno' % name1)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004318
Xiang Zhang6f75bc02017-05-17 21:04:00 +08004319class TestSimpleQueue(unittest.TestCase):
4320
4321 @classmethod
4322 def _test_empty(cls, queue, child_can_start, parent_can_continue):
4323 child_can_start.wait()
4324 # issue 30301, could fail under spawn and forkserver
4325 try:
4326 queue.put(queue.empty())
4327 queue.put(queue.empty())
4328 finally:
4329 parent_can_continue.set()
4330
4331 def test_empty(self):
4332 queue = multiprocessing.SimpleQueue()
4333 child_can_start = multiprocessing.Event()
4334 parent_can_continue = multiprocessing.Event()
4335
4336 proc = multiprocessing.Process(
4337 target=self._test_empty,
4338 args=(queue, child_can_start, parent_can_continue)
4339 )
4340 proc.daemon = True
4341 proc.start()
4342
4343 self.assertTrue(queue.empty())
4344
4345 child_can_start.set()
4346 parent_can_continue.wait()
4347
4348 self.assertFalse(queue.empty())
4349 self.assertEqual(queue.get(), True)
4350 self.assertEqual(queue.get(), False)
4351 self.assertTrue(queue.empty())
4352
4353 proc.join()
4354
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004355#
4356# Mixins
4357#
4358
Victor Stinnerffb49402017-07-25 01:55:54 +02004359class BaseMixin(object):
4360 @classmethod
4361 def setUpClass(cls):
4362 cls.dangling = (multiprocessing.process._dangling.copy(),
4363 threading._dangling.copy())
4364
4365 @classmethod
4366 def tearDownClass(cls):
4367 # bpo-26762: Some multiprocessing objects like Pool create reference
4368 # cycles. Trigger a garbage collection to break these cycles.
4369 test.support.gc_collect()
4370
4371 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
4372 if processes:
Victor Stinner957d0e92017-08-10 17:36:50 +02004373 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004374 print('Warning -- Dangling processes: %s' % processes,
4375 file=sys.stderr)
4376 processes = None
4377
4378 threads = set(threading._dangling) - set(cls.dangling[1])
4379 if threads:
Victor Stinner957d0e92017-08-10 17:36:50 +02004380 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004381 print('Warning -- Dangling threads: %s' % threads,
4382 file=sys.stderr)
4383 threads = None
4384
4385
4386class ProcessesMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004387 TYPE = 'processes'
4388 Process = multiprocessing.Process
4389 connection = multiprocessing.connection
4390 current_process = staticmethod(multiprocessing.current_process)
4391 active_children = staticmethod(multiprocessing.active_children)
4392 Pool = staticmethod(multiprocessing.Pool)
4393 Pipe = staticmethod(multiprocessing.Pipe)
4394 Queue = staticmethod(multiprocessing.Queue)
4395 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
4396 Lock = staticmethod(multiprocessing.Lock)
4397 RLock = staticmethod(multiprocessing.RLock)
4398 Semaphore = staticmethod(multiprocessing.Semaphore)
4399 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
4400 Condition = staticmethod(multiprocessing.Condition)
4401 Event = staticmethod(multiprocessing.Event)
4402 Barrier = staticmethod(multiprocessing.Barrier)
4403 Value = staticmethod(multiprocessing.Value)
4404 Array = staticmethod(multiprocessing.Array)
4405 RawValue = staticmethod(multiprocessing.RawValue)
4406 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00004407
Benjamin Petersone711caf2008-06-11 16:44:04 +00004408
Victor Stinnerffb49402017-07-25 01:55:54 +02004409class ManagerMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004410 TYPE = 'manager'
4411 Process = multiprocessing.Process
4412 Queue = property(operator.attrgetter('manager.Queue'))
4413 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
4414 Lock = property(operator.attrgetter('manager.Lock'))
4415 RLock = property(operator.attrgetter('manager.RLock'))
4416 Semaphore = property(operator.attrgetter('manager.Semaphore'))
4417 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
4418 Condition = property(operator.attrgetter('manager.Condition'))
4419 Event = property(operator.attrgetter('manager.Event'))
4420 Barrier = property(operator.attrgetter('manager.Barrier'))
4421 Value = property(operator.attrgetter('manager.Value'))
4422 Array = property(operator.attrgetter('manager.Array'))
4423 list = property(operator.attrgetter('manager.list'))
4424 dict = property(operator.attrgetter('manager.dict'))
4425 Namespace = property(operator.attrgetter('manager.Namespace'))
4426
4427 @classmethod
4428 def Pool(cls, *args, **kwds):
4429 return cls.manager.Pool(*args, **kwds)
4430
4431 @classmethod
4432 def setUpClass(cls):
Victor Stinnerffb49402017-07-25 01:55:54 +02004433 super().setUpClass()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004434 cls.manager = multiprocessing.Manager()
4435
4436 @classmethod
4437 def tearDownClass(cls):
4438 # only the manager process should be returned by active_children()
4439 # but this can take a bit on slow machines, so wait a few seconds
4440 # if there are other children too (see #17395)
Victor Stinnerffb49402017-07-25 01:55:54 +02004441 start_time = time.monotonic()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004442 t = 0.01
Victor Stinnerffb49402017-07-25 01:55:54 +02004443 while len(multiprocessing.active_children()) > 1:
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004444 time.sleep(t)
4445 t *= 2
Victor Stinnerffb49402017-07-25 01:55:54 +02004446 dt = time.monotonic() - start_time
4447 if dt >= 5.0:
Victor Stinner957d0e92017-08-10 17:36:50 +02004448 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004449 print("Warning -- multiprocessing.Manager still has %s active "
4450 "children after %s seconds"
4451 % (multiprocessing.active_children(), dt),
4452 file=sys.stderr)
4453 break
4454
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004455 gc.collect() # do garbage collection
4456 if cls.manager._number_of_objects() != 0:
4457 # This is not really an error since some tests do not
4458 # ensure that all processes which hold a reference to a
4459 # managed object have been joined.
Victor Stinner957d0e92017-08-10 17:36:50 +02004460 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004461 print('Warning -- Shared objects which still exist at manager '
4462 'shutdown:')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004463 print(cls.manager._debug_info())
4464 cls.manager.shutdown()
4465 cls.manager.join()
4466 cls.manager = None
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01004467
Victor Stinnerffb49402017-07-25 01:55:54 +02004468 super().tearDownClass()
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01004469
Victor Stinnerffb49402017-07-25 01:55:54 +02004470
4471class ThreadsMixin(BaseMixin):
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004472 TYPE = 'threads'
4473 Process = multiprocessing.dummy.Process
4474 connection = multiprocessing.dummy.connection
4475 current_process = staticmethod(multiprocessing.dummy.current_process)
4476 active_children = staticmethod(multiprocessing.dummy.active_children)
Antoine Pitrou62b6a0d2016-03-15 10:48:28 +01004477 Pool = staticmethod(multiprocessing.dummy.Pool)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004478 Pipe = staticmethod(multiprocessing.dummy.Pipe)
4479 Queue = staticmethod(multiprocessing.dummy.Queue)
4480 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
4481 Lock = staticmethod(multiprocessing.dummy.Lock)
4482 RLock = staticmethod(multiprocessing.dummy.RLock)
4483 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
4484 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
4485 Condition = staticmethod(multiprocessing.dummy.Condition)
4486 Event = staticmethod(multiprocessing.dummy.Event)
4487 Barrier = staticmethod(multiprocessing.dummy.Barrier)
4488 Value = staticmethod(multiprocessing.dummy.Value)
4489 Array = staticmethod(multiprocessing.dummy.Array)
4490
4491#
4492# Functions used to create test cases from the base ones in this module
4493#
4494
4495def install_tests_in_module_dict(remote_globs, start_method):
4496 __module__ = remote_globs['__name__']
4497 local_globs = globals()
4498 ALL_TYPES = {'processes', 'threads', 'manager'}
4499
4500 for name, base in local_globs.items():
4501 if not isinstance(base, type):
4502 continue
4503 if issubclass(base, BaseTestCase):
4504 if base is BaseTestCase:
4505 continue
4506 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
4507 for type_ in base.ALLOWED_TYPES:
4508 newname = 'With' + type_.capitalize() + name[1:]
4509 Mixin = local_globs[type_.capitalize() + 'Mixin']
4510 class Temp(base, Mixin, unittest.TestCase):
4511 pass
4512 Temp.__name__ = Temp.__qualname__ = newname
4513 Temp.__module__ = __module__
4514 remote_globs[newname] = Temp
4515 elif issubclass(base, unittest.TestCase):
4516 class Temp(base, object):
4517 pass
4518 Temp.__name__ = Temp.__qualname__ = name
4519 Temp.__module__ = __module__
4520 remote_globs[name] = Temp
4521
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004522 dangling = [None, None]
4523 old_start_method = [None]
4524
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004525 def setUpModule():
4526 multiprocessing.set_forkserver_preload(PRELOAD)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004527 multiprocessing.process._cleanup()
4528 dangling[0] = multiprocessing.process._dangling.copy()
4529 dangling[1] = threading._dangling.copy()
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004530 old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004531 try:
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004532 multiprocessing.set_start_method(start_method, force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004533 except ValueError:
4534 raise unittest.SkipTest(start_method +
4535 ' start method not supported')
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004536
4537 if sys.platform.startswith("linux"):
4538 try:
4539 lock = multiprocessing.RLock()
4540 except OSError:
4541 raise unittest.SkipTest("OSError raises on RLock creation, "
4542 "see issue 3111!")
4543 check_enough_semaphores()
4544 util.get_temp_dir() # creates temp directory
4545 multiprocessing.get_logger().setLevel(LOG_LEVEL)
4546
4547 def tearDownModule():
Victor Stinnerffb49402017-07-25 01:55:54 +02004548 need_sleep = False
4549
4550 # bpo-26762: Some multiprocessing objects like Pool create reference
4551 # cycles. Trigger a garbage collection to break these cycles.
4552 test.support.gc_collect()
4553
Richard Oudkerkb1694cf2013-10-16 16:41:56 +01004554 multiprocessing.set_start_method(old_start_method[0], force=True)
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004555 # pause a bit so we don't get warning about dangling threads/processes
Victor Stinnerffb49402017-07-25 01:55:54 +02004556 processes = set(multiprocessing.process._dangling) - set(dangling[0])
4557 if processes:
4558 need_sleep = True
Victor Stinner957d0e92017-08-10 17:36:50 +02004559 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004560 print('Warning -- Dangling processes: %s' % processes,
4561 file=sys.stderr)
4562 processes = None
4563
4564 threads = set(threading._dangling) - set(dangling[1])
4565 if threads:
4566 need_sleep = True
Victor Stinner957d0e92017-08-10 17:36:50 +02004567 test.support.environment_altered = True
Victor Stinnerffb49402017-07-25 01:55:54 +02004568 print('Warning -- Dangling threads: %s' % threads,
4569 file=sys.stderr)
4570 threads = None
4571
4572 # Sleep 500 ms to give time to child processes to complete.
4573 if need_sleep:
4574 time.sleep(0.5)
Richard Oudkerke0d25ce2013-08-29 14:37:47 +01004575 multiprocessing.process._cleanup()
Victor Stinnerffb49402017-07-25 01:55:54 +02004576 test.support.gc_collect()
Richard Oudkerk84ed9a62013-08-14 15:35:41 +01004577
4578 remote_globs['setUpModule'] = setUpModule
4579 remote_globs['tearDownModule'] = tearDownModule