blob: aa66db4b7e6e3608e64909d7675b96dec27b43de [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00006import queue as pyqueue
7import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00008import io
Antoine Pitroude911b22011-12-21 11:03:24 +01009import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000010import sys
11import os
12import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020013import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000014import signal
15import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import socket
17import random
18import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010019import struct
Richard Oudkerkd15642e2013-07-16 15:33:41 +010020import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010022import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000023
Benjamin Petersone5384b02008-10-04 22:00:42 +000024
R. David Murraya21e4ca2009-03-31 23:16:50 +000025# Skip tests if _multiprocessing wasn't built.
26_multiprocessing = test.support.import_module('_multiprocessing')
27# Skip tests if sem_open implementation is broken.
28test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000029# import threading after _multiprocessing to raise a more revelant error
30# message: "No module named _multiprocessing". _multiprocessing is not compiled
31# without thread support.
32import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000033
Benjamin Petersone711caf2008-06-11 16:44:04 +000034import multiprocessing.dummy
35import multiprocessing.connection
36import multiprocessing.managers
37import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000038import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000039
Charles-François Natalibc8f0822011-09-20 20:36:51 +020040from multiprocessing import util
41
42try:
43 from multiprocessing import reduction
44 HAS_REDUCTION = True
45except ImportError:
46 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000047
Brian Curtinafa88b52010-10-07 01:12:19 +000048try:
49 from multiprocessing.sharedctypes import Value, copy
50 HAS_SHAREDCTYPES = True
51except ImportError:
52 HAS_SHAREDCTYPES = False
53
Antoine Pitroubcb39d42011-08-23 19:46:22 +020054try:
55 import msvcrt
56except ImportError:
57 msvcrt = None
58
Benjamin Petersone711caf2008-06-11 16:44:04 +000059#
60#
61#
62
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000063def latin(s):
64 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000065
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
67# Constants
68#
69
70LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000071#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000072
73DELTA = 0.1
74CHECK_TIMINGS = False # making true makes tests take a lot longer
75 # and can sometimes cause some non-serious
76 # failures because some calls block a bit
77 # longer than expected
78if CHECK_TIMINGS:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
80else:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
82
83HAVE_GETVALUE = not getattr(_multiprocessing,
84 'HAVE_BROKEN_SEM_GETVALUE', False)
85
Jesse Noller6214edd2009-01-19 16:23:53 +000086WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020087
Richard Oudkerk59d54042012-05-10 16:11:12 +010088from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090def wait_for_handle(handle, timeout):
91 if timeout is not None and timeout < 0.0:
92 timeout = None
93 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000094
Antoine Pitroubcb39d42011-08-23 19:46:22 +020095try:
96 MAXFD = os.sysconf("SC_OPEN_MAX")
97except:
98 MAXFD = 256
99
Benjamin Petersone711caf2008-06-11 16:44:04 +0000100#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000101# Some tests require ctypes
102#
103
104try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000105 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000106except ImportError:
107 Structure = object
108 c_int = c_double = None
109
Charles-François Natali221ef672011-11-22 18:55:22 +0100110
111def check_enough_semaphores():
112 """Check that the system supports enough semaphores to run the test."""
113 # minimum number of semaphores available according to POSIX
114 nsems_min = 256
115 try:
116 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
117 except (AttributeError, ValueError):
118 # sysconf not available or setting not available
119 return
120 if nsems == -1 or nsems >= nsems_min:
121 return
122 raise unittest.SkipTest("The OS doesn't support enough semaphores "
123 "to run the test (required: %d)." % nsems_min)
124
125
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000126#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000127# Creates a wrapper for a function which records the time it takes to finish
128#
129
130class TimingWrapper(object):
131
132 def __init__(self, func):
133 self.func = func
134 self.elapsed = None
135
136 def __call__(self, *args, **kwds):
137 t = time.time()
138 try:
139 return self.func(*args, **kwds)
140 finally:
141 self.elapsed = time.time() - t
142
143#
144# Base class for test cases
145#
146
147class BaseTestCase(object):
148
149 ALLOWED_TYPES = ('processes', 'manager', 'threads')
150
151 def assertTimingAlmostEqual(self, a, b):
152 if CHECK_TIMINGS:
153 self.assertAlmostEqual(a, b, 1)
154
155 def assertReturnsIfImplemented(self, value, func, *args):
156 try:
157 res = func(*args)
158 except NotImplementedError:
159 pass
160 else:
161 return self.assertEqual(value, res)
162
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000163 # For the sanity of Windows users, rather than crashing or freezing in
164 # multiple ways.
165 def __reduce__(self, *args):
166 raise NotImplementedError("shouldn't try to pickle a test case")
167
168 __reduce_ex__ = __reduce__
169
Benjamin Petersone711caf2008-06-11 16:44:04 +0000170#
171# Return the value of a semaphore
172#
173
174def get_value(self):
175 try:
176 return self.get_value()
177 except AttributeError:
178 try:
179 return self._Semaphore__value
180 except AttributeError:
181 try:
182 return self._value
183 except AttributeError:
184 raise NotImplementedError
185
186#
187# Testcases
188#
189
190class _TestProcess(BaseTestCase):
191
192 ALLOWED_TYPES = ('processes', 'threads')
193
194 def test_current(self):
195 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600196 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000197
198 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000199 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000200
201 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000203 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000204 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000205 self.assertEqual(current.ident, os.getpid())
206 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000207
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000208 def test_daemon_argument(self):
209 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600210 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000211
212 # By default uses the current process's daemon flag.
213 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000214 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000215 proc1 = self.Process(target=self._test, daemon=True)
216 self.assertTrue(proc1.daemon)
217 proc2 = self.Process(target=self._test, daemon=False)
218 self.assertFalse(proc2.daemon)
219
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000220 @classmethod
221 def _test(cls, q, *args, **kwds):
222 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000223 q.put(args)
224 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000225 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000226 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000228 q.put(current.pid)
229
230 def test_process(self):
231 q = self.Queue(1)
232 e = self.Event()
233 args = (q, 1, 2)
234 kwargs = {'hello':23, 'bye':2.54}
235 name = 'SomeProcess'
236 p = self.Process(
237 target=self._test, args=args, kwargs=kwargs, name=name
238 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000239 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000240 current = self.current_process()
241
242 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000243 self.assertEqual(p.authkey, current.authkey)
244 self.assertEqual(p.is_alive(), False)
245 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000246 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000248 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249
250 p.start()
251
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(p.exitcode, None)
253 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000254 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000255
Ezio Melottib3aedd42010-11-20 19:04:17 +0000256 self.assertEqual(q.get(), args[1:])
257 self.assertEqual(q.get(), kwargs)
258 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000259 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000260 self.assertEqual(q.get(), current.authkey)
261 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000262
263 p.join()
264
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(p.exitcode, 0)
266 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000267 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000269 @classmethod
270 def _test_terminate(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100271 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272
273 def test_terminate(self):
274 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600275 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000276
277 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000278 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279 p.start()
280
281 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000282 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000283 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000284
Richard Oudkerk59d54042012-05-10 16:11:12 +0100285 join = TimingWrapper(p.join)
286
287 self.assertEqual(join(0), None)
288 self.assertTimingAlmostEqual(join.elapsed, 0.0)
289 self.assertEqual(p.is_alive(), True)
290
291 self.assertEqual(join(-1), None)
292 self.assertTimingAlmostEqual(join.elapsed, 0.0)
293 self.assertEqual(p.is_alive(), True)
294
Benjamin Petersone711caf2008-06-11 16:44:04 +0000295 p.terminate()
296
Richard Oudkerk4f350792013-10-13 00:49:27 +0100297 if hasattr(signal, 'alarm'):
298 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100299 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100300 old_handler = signal.signal(signal.SIGALRM, handler)
301 try:
302 signal.alarm(10)
303 self.assertEqual(join(), None)
304 signal.alarm(0)
305 finally:
306 signal.signal(signal.SIGALRM, old_handler)
307 else:
308 self.assertEqual(join(), None)
309
Benjamin Petersone711caf2008-06-11 16:44:04 +0000310 self.assertTimingAlmostEqual(join.elapsed, 0.0)
311
312 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000313 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000314
315 p.join()
316
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000317 # XXX sometimes get p.exitcode == 0 on Windows ...
318 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000319
320 def test_cpu_count(self):
321 try:
322 cpus = multiprocessing.cpu_count()
323 except NotImplementedError:
324 cpus = 1
325 self.assertTrue(type(cpus) is int)
326 self.assertTrue(cpus >= 1)
327
328 def test_active_children(self):
329 self.assertEqual(type(self.active_children()), list)
330
331 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000332 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333
Jesus Cea94f964f2011-09-09 20:26:57 +0200334 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000335 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000336 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337
338 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000339 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000340
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000341 @classmethod
342 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000343 from multiprocessing import forking
344 wconn.send(id)
345 if len(id) < 2:
346 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000347 p = cls.Process(
348 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000349 )
350 p.start()
351 p.join()
352
353 def test_recursion(self):
354 rconn, wconn = self.Pipe(duplex=False)
355 self._test_recursion(wconn, [])
356
357 time.sleep(DELTA)
358 result = []
359 while rconn.poll():
360 result.append(rconn.recv())
361
362 expected = [
363 [],
364 [0],
365 [0, 0],
366 [0, 1],
367 [1],
368 [1, 0],
369 [1, 1]
370 ]
371 self.assertEqual(result, expected)
372
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200373 @classmethod
374 def _test_sentinel(cls, event):
375 event.wait(10.0)
376
377 def test_sentinel(self):
378 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600379 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200380 event = self.Event()
381 p = self.Process(target=self._test_sentinel, args=(event,))
382 with self.assertRaises(ValueError):
383 p.sentinel
384 p.start()
385 self.addCleanup(p.join)
386 sentinel = p.sentinel
387 self.assertIsInstance(sentinel, int)
388 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
389 event.set()
390 p.join()
391 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
392
Benjamin Petersone711caf2008-06-11 16:44:04 +0000393#
394#
395#
396
397class _UpperCaser(multiprocessing.Process):
398
399 def __init__(self):
400 multiprocessing.Process.__init__(self)
401 self.child_conn, self.parent_conn = multiprocessing.Pipe()
402
403 def run(self):
404 self.parent_conn.close()
405 for s in iter(self.child_conn.recv, None):
406 self.child_conn.send(s.upper())
407 self.child_conn.close()
408
409 def submit(self, s):
410 assert type(s) is str
411 self.parent_conn.send(s)
412 return self.parent_conn.recv()
413
414 def stop(self):
415 self.parent_conn.send(None)
416 self.parent_conn.close()
417 self.child_conn.close()
418
419class _TestSubclassingProcess(BaseTestCase):
420
421 ALLOWED_TYPES = ('processes',)
422
423 def test_subclassing(self):
424 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200425 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000426 uppercaser.start()
427 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
428 self.assertEqual(uppercaser.submit('world'), 'WORLD')
429 uppercaser.stop()
430 uppercaser.join()
431
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100432 def test_stderr_flush(self):
433 # sys.stderr is flushed at process shutdown (issue #13812)
434 if self.TYPE == "threads":
Zachary Ware9fe6d862013-12-08 00:20:35 -0600435 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100436
437 testfn = test.support.TESTFN
438 self.addCleanup(test.support.unlink, testfn)
439 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
440 proc.start()
441 proc.join()
442 with open(testfn, 'r') as f:
443 err = f.read()
444 # The whole traceback was printed
445 self.assertIn("ZeroDivisionError", err)
446 self.assertIn("test_multiprocessing.py", err)
447 self.assertIn("1/0 # MARKER", err)
448
449 @classmethod
450 def _test_stderr_flush(cls, testfn):
451 sys.stderr = open(testfn, 'w')
452 1/0 # MARKER
453
454
Richard Oudkerk29471de2012-06-06 19:04:57 +0100455 @classmethod
456 def _test_sys_exit(cls, reason, testfn):
457 sys.stderr = open(testfn, 'w')
458 sys.exit(reason)
459
460 def test_sys_exit(self):
461 # See Issue 13854
462 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600463 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk29471de2012-06-06 19:04:57 +0100464
465 testfn = test.support.TESTFN
466 self.addCleanup(test.support.unlink, testfn)
467
Richard Oudkerk8731d7b2013-11-17 17:24:11 +0000468 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100469 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
470 p.daemon = True
471 p.start()
472 p.join(5)
473 self.assertEqual(p.exitcode, code)
474
475 with open(testfn, 'r') as f:
476 self.assertEqual(f.read().rstrip(), str(reason))
477
478 for reason in (True, False, 8):
479 p = self.Process(target=sys.exit, args=(reason,))
480 p.daemon = True
481 p.start()
482 p.join(5)
483 self.assertEqual(p.exitcode, reason)
484
Benjamin Petersone711caf2008-06-11 16:44:04 +0000485#
486#
487#
488
489def queue_empty(q):
490 if hasattr(q, 'empty'):
491 return q.empty()
492 else:
493 return q.qsize() == 0
494
495def queue_full(q, maxsize):
496 if hasattr(q, 'full'):
497 return q.full()
498 else:
499 return q.qsize() == maxsize
500
501
502class _TestQueue(BaseTestCase):
503
504
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000505 @classmethod
506 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000507 child_can_start.wait()
508 for i in range(6):
509 queue.get()
510 parent_can_continue.set()
511
512 def test_put(self):
513 MAXSIZE = 6
514 queue = self.Queue(maxsize=MAXSIZE)
515 child_can_start = self.Event()
516 parent_can_continue = self.Event()
517
518 proc = self.Process(
519 target=self._test_put,
520 args=(queue, child_can_start, parent_can_continue)
521 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000522 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000523 proc.start()
524
525 self.assertEqual(queue_empty(queue), True)
526 self.assertEqual(queue_full(queue, MAXSIZE), False)
527
528 queue.put(1)
529 queue.put(2, True)
530 queue.put(3, True, None)
531 queue.put(4, False)
532 queue.put(5, False, None)
533 queue.put_nowait(6)
534
535 # the values may be in buffer but not yet in pipe so sleep a bit
536 time.sleep(DELTA)
537
538 self.assertEqual(queue_empty(queue), False)
539 self.assertEqual(queue_full(queue, MAXSIZE), True)
540
541 put = TimingWrapper(queue.put)
542 put_nowait = TimingWrapper(queue.put_nowait)
543
544 self.assertRaises(pyqueue.Full, put, 7, False)
545 self.assertTimingAlmostEqual(put.elapsed, 0)
546
547 self.assertRaises(pyqueue.Full, put, 7, False, None)
548 self.assertTimingAlmostEqual(put.elapsed, 0)
549
550 self.assertRaises(pyqueue.Full, put_nowait, 7)
551 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
552
553 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
554 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
555
556 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
557 self.assertTimingAlmostEqual(put.elapsed, 0)
558
559 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
560 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
561
562 child_can_start.set()
563 parent_can_continue.wait()
564
565 self.assertEqual(queue_empty(queue), True)
566 self.assertEqual(queue_full(queue, MAXSIZE), False)
567
568 proc.join()
569
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000570 @classmethod
571 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000572 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000573 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000574 queue.put(2)
575 queue.put(3)
576 queue.put(4)
577 queue.put(5)
578 parent_can_continue.set()
579
580 def test_get(self):
581 queue = self.Queue()
582 child_can_start = self.Event()
583 parent_can_continue = self.Event()
584
585 proc = self.Process(
586 target=self._test_get,
587 args=(queue, child_can_start, parent_can_continue)
588 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000589 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000590 proc.start()
591
592 self.assertEqual(queue_empty(queue), True)
593
594 child_can_start.set()
595 parent_can_continue.wait()
596
597 time.sleep(DELTA)
598 self.assertEqual(queue_empty(queue), False)
599
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000600 # Hangs unexpectedly, remove for now
601 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000602 self.assertEqual(queue.get(True, None), 2)
603 self.assertEqual(queue.get(True), 3)
604 self.assertEqual(queue.get(timeout=1), 4)
605 self.assertEqual(queue.get_nowait(), 5)
606
607 self.assertEqual(queue_empty(queue), True)
608
609 get = TimingWrapper(queue.get)
610 get_nowait = TimingWrapper(queue.get_nowait)
611
612 self.assertRaises(pyqueue.Empty, get, False)
613 self.assertTimingAlmostEqual(get.elapsed, 0)
614
615 self.assertRaises(pyqueue.Empty, get, False, None)
616 self.assertTimingAlmostEqual(get.elapsed, 0)
617
618 self.assertRaises(pyqueue.Empty, get_nowait)
619 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
620
621 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
622 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
623
624 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
625 self.assertTimingAlmostEqual(get.elapsed, 0)
626
627 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
628 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
629
630 proc.join()
631
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000632 @classmethod
633 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000634 for i in range(10, 20):
635 queue.put(i)
636 # note that at this point the items may only be buffered, so the
637 # process cannot shutdown until the feeder thread has finished
638 # pushing items onto the pipe.
639
640 def test_fork(self):
641 # Old versions of Queue would fail to create a new feeder
642 # thread for a forked process if the original process had its
643 # own feeder thread. This test checks that this no longer
644 # happens.
645
646 queue = self.Queue()
647
648 # put items on queue so that main process starts a feeder thread
649 for i in range(10):
650 queue.put(i)
651
652 # wait to make sure thread starts before we fork a new process
653 time.sleep(DELTA)
654
655 # fork process
656 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200657 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000658 p.start()
659
660 # check that all expected items are in the queue
661 for i in range(20):
662 self.assertEqual(queue.get(), i)
663 self.assertRaises(pyqueue.Empty, queue.get, False)
664
665 p.join()
666
667 def test_qsize(self):
668 q = self.Queue()
669 try:
670 self.assertEqual(q.qsize(), 0)
671 except NotImplementedError:
Zachary Ware9fe6d862013-12-08 00:20:35 -0600672 self.skipTest('qsize method not implemented')
Benjamin Petersone711caf2008-06-11 16:44:04 +0000673 q.put(1)
674 self.assertEqual(q.qsize(), 1)
675 q.put(5)
676 self.assertEqual(q.qsize(), 2)
677 q.get()
678 self.assertEqual(q.qsize(), 1)
679 q.get()
680 self.assertEqual(q.qsize(), 0)
681
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000682 @classmethod
683 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000684 for obj in iter(q.get, None):
685 time.sleep(DELTA)
686 q.task_done()
687
688 def test_task_done(self):
689 queue = self.JoinableQueue()
690
691 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000692 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000693
694 workers = [self.Process(target=self._test_task_done, args=(queue,))
695 for i in range(4)]
696
697 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200698 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000699 p.start()
700
701 for i in range(10):
702 queue.put(i)
703
704 queue.join()
705
706 for p in workers:
707 queue.put(None)
708
709 for p in workers:
710 p.join()
711
Giampaolo Rodola'b38897f2013-04-17 13:08:59 +0200712 def test_timeout(self):
713 q = multiprocessing.Queue()
714 start = time.time()
715 self.assertRaises(pyqueue.Empty, q.get, True, 0.2)
716 delta = time.time() - start
Richard Oudkerkb8ec1e32013-11-02 16:46:32 +0000717 self.assertGreaterEqual(delta, 0.18)
Giampaolo Rodola'b38897f2013-04-17 13:08:59 +0200718
Benjamin Petersone711caf2008-06-11 16:44:04 +0000719#
720#
721#
722
723class _TestLock(BaseTestCase):
724
725 def test_lock(self):
726 lock = self.Lock()
727 self.assertEqual(lock.acquire(), True)
728 self.assertEqual(lock.acquire(False), False)
729 self.assertEqual(lock.release(), None)
730 self.assertRaises((ValueError, threading.ThreadError), lock.release)
731
732 def test_rlock(self):
733 lock = self.RLock()
734 self.assertEqual(lock.acquire(), True)
735 self.assertEqual(lock.acquire(), True)
736 self.assertEqual(lock.acquire(), True)
737 self.assertEqual(lock.release(), None)
738 self.assertEqual(lock.release(), None)
739 self.assertEqual(lock.release(), None)
740 self.assertRaises((AssertionError, RuntimeError), lock.release)
741
Jesse Nollerf8d00852009-03-31 03:25:07 +0000742 def test_lock_context(self):
743 with self.Lock():
744 pass
745
Benjamin Petersone711caf2008-06-11 16:44:04 +0000746
747class _TestSemaphore(BaseTestCase):
748
749 def _test_semaphore(self, sem):
750 self.assertReturnsIfImplemented(2, get_value, sem)
751 self.assertEqual(sem.acquire(), True)
752 self.assertReturnsIfImplemented(1, get_value, sem)
753 self.assertEqual(sem.acquire(), True)
754 self.assertReturnsIfImplemented(0, get_value, sem)
755 self.assertEqual(sem.acquire(False), False)
756 self.assertReturnsIfImplemented(0, get_value, sem)
757 self.assertEqual(sem.release(), None)
758 self.assertReturnsIfImplemented(1, get_value, sem)
759 self.assertEqual(sem.release(), None)
760 self.assertReturnsIfImplemented(2, get_value, sem)
761
762 def test_semaphore(self):
763 sem = self.Semaphore(2)
764 self._test_semaphore(sem)
765 self.assertEqual(sem.release(), None)
766 self.assertReturnsIfImplemented(3, get_value, sem)
767 self.assertEqual(sem.release(), None)
768 self.assertReturnsIfImplemented(4, get_value, sem)
769
770 def test_bounded_semaphore(self):
771 sem = self.BoundedSemaphore(2)
772 self._test_semaphore(sem)
773 # Currently fails on OS/X
774 #if HAVE_GETVALUE:
775 # self.assertRaises(ValueError, sem.release)
776 # self.assertReturnsIfImplemented(2, get_value, sem)
777
778 def test_timeout(self):
779 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -0600780 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000781
782 sem = self.Semaphore(0)
783 acquire = TimingWrapper(sem.acquire)
784
785 self.assertEqual(acquire(False), False)
786 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
787
788 self.assertEqual(acquire(False, None), False)
789 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
790
791 self.assertEqual(acquire(False, TIMEOUT1), False)
792 self.assertTimingAlmostEqual(acquire.elapsed, 0)
793
794 self.assertEqual(acquire(True, TIMEOUT2), False)
795 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
796
797 self.assertEqual(acquire(timeout=TIMEOUT3), False)
798 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
799
800
801class _TestCondition(BaseTestCase):
802
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000803 @classmethod
804 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000805 cond.acquire()
806 sleeping.release()
807 cond.wait(timeout)
808 woken.release()
809 cond.release()
810
811 def check_invariant(self, cond):
812 # this is only supposed to succeed when there are no sleepers
813 if self.TYPE == 'processes':
814 try:
815 sleepers = (cond._sleeping_count.get_value() -
816 cond._woken_count.get_value())
817 self.assertEqual(sleepers, 0)
818 self.assertEqual(cond._wait_semaphore.get_value(), 0)
819 except NotImplementedError:
820 pass
821
822 def test_notify(self):
823 cond = self.Condition()
824 sleeping = self.Semaphore(0)
825 woken = self.Semaphore(0)
826
827 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000828 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000829 p.start()
830
831 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000832 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000833 p.start()
834
835 # wait for both children to start sleeping
836 sleeping.acquire()
837 sleeping.acquire()
838
839 # check no process/thread has woken up
840 time.sleep(DELTA)
841 self.assertReturnsIfImplemented(0, get_value, woken)
842
843 # wake up one process/thread
844 cond.acquire()
845 cond.notify()
846 cond.release()
847
848 # check one process/thread has woken up
849 time.sleep(DELTA)
850 self.assertReturnsIfImplemented(1, get_value, woken)
851
852 # wake up another
853 cond.acquire()
854 cond.notify()
855 cond.release()
856
857 # check other has woken up
858 time.sleep(DELTA)
859 self.assertReturnsIfImplemented(2, get_value, woken)
860
861 # check state is not mucked up
862 self.check_invariant(cond)
863 p.join()
864
865 def test_notify_all(self):
866 cond = self.Condition()
867 sleeping = self.Semaphore(0)
868 woken = self.Semaphore(0)
869
870 # start some threads/processes which will timeout
871 for i in range(3):
872 p = self.Process(target=self.f,
873 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000874 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000875 p.start()
876
877 t = threading.Thread(target=self.f,
878 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000879 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000880 t.start()
881
882 # wait for them all to sleep
883 for i in range(6):
884 sleeping.acquire()
885
886 # check they have all timed out
887 for i in range(6):
888 woken.acquire()
889 self.assertReturnsIfImplemented(0, get_value, woken)
890
891 # check state is not mucked up
892 self.check_invariant(cond)
893
894 # start some more threads/processes
895 for i in range(3):
896 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000897 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000898 p.start()
899
900 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000901 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000902 t.start()
903
904 # wait for them to all sleep
905 for i in range(6):
906 sleeping.acquire()
907
908 # check no process/thread has woken up
909 time.sleep(DELTA)
910 self.assertReturnsIfImplemented(0, get_value, woken)
911
912 # wake them all up
913 cond.acquire()
914 cond.notify_all()
915 cond.release()
916
917 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200918 for i in range(10):
919 try:
920 if get_value(woken) == 6:
921 break
922 except NotImplementedError:
923 break
924 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000925 self.assertReturnsIfImplemented(6, get_value, woken)
926
927 # check state is not mucked up
928 self.check_invariant(cond)
929
930 def test_timeout(self):
931 cond = self.Condition()
932 wait = TimingWrapper(cond.wait)
933 cond.acquire()
934 res = wait(TIMEOUT1)
935 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000936 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000937 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
938
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200939 @classmethod
940 def _test_waitfor_f(cls, cond, state):
941 with cond:
942 state.value = 0
943 cond.notify()
944 result = cond.wait_for(lambda : state.value==4)
945 if not result or state.value != 4:
946 sys.exit(1)
947
948 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
949 def test_waitfor(self):
950 # based on test in test/lock_tests.py
951 cond = self.Condition()
952 state = self.Value('i', -1)
953
954 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
955 p.daemon = True
956 p.start()
957
958 with cond:
959 result = cond.wait_for(lambda : state.value==0)
960 self.assertTrue(result)
961 self.assertEqual(state.value, 0)
962
963 for i in range(4):
964 time.sleep(0.01)
965 with cond:
966 state.value += 1
967 cond.notify()
968
969 p.join(5)
970 self.assertFalse(p.is_alive())
971 self.assertEqual(p.exitcode, 0)
972
973 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100974 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
975 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200976 with cond:
977 expected = 0.1
978 dt = time.time()
979 result = cond.wait_for(lambda : state.value==4, timeout=expected)
980 dt = time.time() - dt
981 # borrow logic in assertTimeout() from test/lock_tests.py
982 if not result and expected * 0.6 < dt < expected * 10.0:
983 success.value = True
984
985 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
986 def test_waitfor_timeout(self):
987 # based on test in test/lock_tests.py
988 cond = self.Condition()
989 state = self.Value('i', 0)
990 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100991 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200992
993 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100994 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200995 p.daemon = True
996 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100997 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200998
999 # Only increment 3 times, so state == 4 is never reached.
1000 for i in range(3):
1001 time.sleep(0.01)
1002 with cond:
1003 state.value += 1
1004 cond.notify()
1005
1006 p.join(5)
1007 self.assertTrue(success.value)
1008
Richard Oudkerk98449932012-06-05 13:15:29 +01001009 @classmethod
1010 def _test_wait_result(cls, c, pid):
1011 with c:
1012 c.notify()
1013 time.sleep(1)
1014 if pid is not None:
1015 os.kill(pid, signal.SIGINT)
1016
1017 def test_wait_result(self):
1018 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1019 pid = os.getpid()
1020 else:
1021 pid = None
1022
1023 c = self.Condition()
1024 with c:
1025 self.assertFalse(c.wait(0))
1026 self.assertFalse(c.wait(0.1))
1027
1028 p = self.Process(target=self._test_wait_result, args=(c, pid))
1029 p.start()
1030
1031 self.assertTrue(c.wait(10))
1032 if pid is not None:
1033 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1034
1035 p.join()
1036
Benjamin Petersone711caf2008-06-11 16:44:04 +00001037
1038class _TestEvent(BaseTestCase):
1039
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001040 @classmethod
1041 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001042 time.sleep(TIMEOUT2)
1043 event.set()
1044
1045 def test_event(self):
1046 event = self.Event()
1047 wait = TimingWrapper(event.wait)
1048
Ezio Melotti13925002011-03-16 11:05:33 +02001049 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001051 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001052
Benjamin Peterson965ce872009-04-05 21:24:58 +00001053 # Removed, threading.Event.wait() will return the value of the __flag
1054 # instead of None. API Shear with the semaphore backed mp.Event
1055 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001056 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001057 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1059
1060 event.set()
1061
1062 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001063 self.assertEqual(event.is_set(), True)
1064 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001065 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001066 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001067 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1068 # self.assertEqual(event.is_set(), True)
1069
1070 event.clear()
1071
1072 #self.assertEqual(event.is_set(), False)
1073
Jesus Cea94f964f2011-09-09 20:26:57 +02001074 p = self.Process(target=self._test_event, args=(event,))
1075 p.daemon = True
1076 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001077 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001078
1079#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001080# Tests for Barrier - adapted from tests in test/lock_tests.py
1081#
1082
1083# Many of the tests for threading.Barrier use a list as an atomic
1084# counter: a value is appended to increment the counter, and the
1085# length of the list gives the value. We use the class DummyList
1086# for the same purpose.
1087
1088class _DummyList(object):
1089
1090 def __init__(self):
1091 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1092 lock = multiprocessing.Lock()
1093 self.__setstate__((wrapper, lock))
1094 self._lengthbuf[0] = 0
1095
1096 def __setstate__(self, state):
1097 (self._wrapper, self._lock) = state
1098 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1099
1100 def __getstate__(self):
1101 return (self._wrapper, self._lock)
1102
1103 def append(self, _):
1104 with self._lock:
1105 self._lengthbuf[0] += 1
1106
1107 def __len__(self):
1108 with self._lock:
1109 return self._lengthbuf[0]
1110
1111def _wait():
1112 # A crude wait/yield function not relying on synchronization primitives.
1113 time.sleep(0.01)
1114
1115
1116class Bunch(object):
1117 """
1118 A bunch of threads.
1119 """
1120 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1121 """
1122 Construct a bunch of `n` threads running the same function `f`.
1123 If `wait_before_exit` is True, the threads won't terminate until
1124 do_finish() is called.
1125 """
1126 self.f = f
1127 self.args = args
1128 self.n = n
1129 self.started = namespace.DummyList()
1130 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001131 self._can_exit = namespace.Event()
1132 if not wait_before_exit:
1133 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001134 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001135 p = namespace.Process(target=self.task)
1136 p.daemon = True
1137 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001138
1139 def task(self):
1140 pid = os.getpid()
1141 self.started.append(pid)
1142 try:
1143 self.f(*self.args)
1144 finally:
1145 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001146 self._can_exit.wait(30)
1147 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001148
1149 def wait_for_started(self):
1150 while len(self.started) < self.n:
1151 _wait()
1152
1153 def wait_for_finished(self):
1154 while len(self.finished) < self.n:
1155 _wait()
1156
1157 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001158 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001159
1160
1161class AppendTrue(object):
1162 def __init__(self, obj):
1163 self.obj = obj
1164 def __call__(self):
1165 self.obj.append(True)
1166
1167
1168class _TestBarrier(BaseTestCase):
1169 """
1170 Tests for Barrier objects.
1171 """
1172 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001173 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001174
1175 def setUp(self):
1176 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1177
1178 def tearDown(self):
1179 self.barrier.abort()
1180 self.barrier = None
1181
1182 def DummyList(self):
1183 if self.TYPE == 'threads':
1184 return []
1185 elif self.TYPE == 'manager':
1186 return self.manager.list()
1187 else:
1188 return _DummyList()
1189
1190 def run_threads(self, f, args):
1191 b = Bunch(self, f, args, self.N-1)
1192 f(*args)
1193 b.wait_for_finished()
1194
1195 @classmethod
1196 def multipass(cls, barrier, results, n):
1197 m = barrier.parties
1198 assert m == cls.N
1199 for i in range(n):
1200 results[0].append(True)
1201 assert len(results[1]) == i * m
1202 barrier.wait()
1203 results[1].append(True)
1204 assert len(results[0]) == (i + 1) * m
1205 barrier.wait()
1206 try:
1207 assert barrier.n_waiting == 0
1208 except NotImplementedError:
1209 pass
1210 assert not barrier.broken
1211
1212 def test_barrier(self, passes=1):
1213 """
1214 Test that a barrier is passed in lockstep
1215 """
1216 results = [self.DummyList(), self.DummyList()]
1217 self.run_threads(self.multipass, (self.barrier, results, passes))
1218
1219 def test_barrier_10(self):
1220 """
1221 Test that a barrier works for 10 consecutive runs
1222 """
1223 return self.test_barrier(10)
1224
1225 @classmethod
1226 def _test_wait_return_f(cls, barrier, queue):
1227 res = barrier.wait()
1228 queue.put(res)
1229
1230 def test_wait_return(self):
1231 """
1232 test the return value from barrier.wait
1233 """
1234 queue = self.Queue()
1235 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1236 results = [queue.get() for i in range(self.N)]
1237 self.assertEqual(results.count(0), 1)
1238
1239 @classmethod
1240 def _test_action_f(cls, barrier, results):
1241 barrier.wait()
1242 if len(results) != 1:
1243 raise RuntimeError
1244
1245 def test_action(self):
1246 """
1247 Test the 'action' callback
1248 """
1249 results = self.DummyList()
1250 barrier = self.Barrier(self.N, action=AppendTrue(results))
1251 self.run_threads(self._test_action_f, (barrier, results))
1252 self.assertEqual(len(results), 1)
1253
1254 @classmethod
1255 def _test_abort_f(cls, barrier, results1, results2):
1256 try:
1257 i = barrier.wait()
1258 if i == cls.N//2:
1259 raise RuntimeError
1260 barrier.wait()
1261 results1.append(True)
1262 except threading.BrokenBarrierError:
1263 results2.append(True)
1264 except RuntimeError:
1265 barrier.abort()
1266
1267 def test_abort(self):
1268 """
1269 Test that an abort will put the barrier in a broken state
1270 """
1271 results1 = self.DummyList()
1272 results2 = self.DummyList()
1273 self.run_threads(self._test_abort_f,
1274 (self.barrier, results1, results2))
1275 self.assertEqual(len(results1), 0)
1276 self.assertEqual(len(results2), self.N-1)
1277 self.assertTrue(self.barrier.broken)
1278
1279 @classmethod
1280 def _test_reset_f(cls, barrier, results1, results2, results3):
1281 i = barrier.wait()
1282 if i == cls.N//2:
1283 # Wait until the other threads are all in the barrier.
1284 while barrier.n_waiting < cls.N-1:
1285 time.sleep(0.001)
1286 barrier.reset()
1287 else:
1288 try:
1289 barrier.wait()
1290 results1.append(True)
1291 except threading.BrokenBarrierError:
1292 results2.append(True)
1293 # Now, pass the barrier again
1294 barrier.wait()
1295 results3.append(True)
1296
1297 def test_reset(self):
1298 """
1299 Test that a 'reset' on a barrier frees the waiting threads
1300 """
1301 results1 = self.DummyList()
1302 results2 = self.DummyList()
1303 results3 = self.DummyList()
1304 self.run_threads(self._test_reset_f,
1305 (self.barrier, results1, results2, results3))
1306 self.assertEqual(len(results1), 0)
1307 self.assertEqual(len(results2), self.N-1)
1308 self.assertEqual(len(results3), self.N)
1309
1310 @classmethod
1311 def _test_abort_and_reset_f(cls, barrier, barrier2,
1312 results1, results2, results3):
1313 try:
1314 i = barrier.wait()
1315 if i == cls.N//2:
1316 raise RuntimeError
1317 barrier.wait()
1318 results1.append(True)
1319 except threading.BrokenBarrierError:
1320 results2.append(True)
1321 except RuntimeError:
1322 barrier.abort()
1323 # Synchronize and reset the barrier. Must synchronize first so
1324 # that everyone has left it when we reset, and after so that no
1325 # one enters it before the reset.
1326 if barrier2.wait() == cls.N//2:
1327 barrier.reset()
1328 barrier2.wait()
1329 barrier.wait()
1330 results3.append(True)
1331
1332 def test_abort_and_reset(self):
1333 """
1334 Test that a barrier can be reset after being broken.
1335 """
1336 results1 = self.DummyList()
1337 results2 = self.DummyList()
1338 results3 = self.DummyList()
1339 barrier2 = self.Barrier(self.N)
1340
1341 self.run_threads(self._test_abort_and_reset_f,
1342 (self.barrier, barrier2, results1, results2, results3))
1343 self.assertEqual(len(results1), 0)
1344 self.assertEqual(len(results2), self.N-1)
1345 self.assertEqual(len(results3), self.N)
1346
1347 @classmethod
1348 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001349 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001350 if i == cls.N//2:
1351 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001352 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001353 try:
1354 barrier.wait(0.5)
1355 except threading.BrokenBarrierError:
1356 results.append(True)
1357
1358 def test_timeout(self):
1359 """
1360 Test wait(timeout)
1361 """
1362 results = self.DummyList()
1363 self.run_threads(self._test_timeout_f, (self.barrier, results))
1364 self.assertEqual(len(results), self.barrier.parties)
1365
1366 @classmethod
1367 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001368 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001369 if i == cls.N//2:
1370 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001371 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001372 try:
1373 barrier.wait()
1374 except threading.BrokenBarrierError:
1375 results.append(True)
1376
1377 def test_default_timeout(self):
1378 """
1379 Test the barrier's default timeout
1380 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001381 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001382 results = self.DummyList()
1383 self.run_threads(self._test_default_timeout_f, (barrier, results))
1384 self.assertEqual(len(results), barrier.parties)
1385
1386 def test_single_thread(self):
1387 b = self.Barrier(1)
1388 b.wait()
1389 b.wait()
1390
1391 @classmethod
1392 def _test_thousand_f(cls, barrier, passes, conn, lock):
1393 for i in range(passes):
1394 barrier.wait()
1395 with lock:
1396 conn.send(i)
1397
1398 def test_thousand(self):
1399 if self.TYPE == 'manager':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001400 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerk3730a172012-06-15 18:26:07 +01001401 passes = 1000
1402 lock = self.Lock()
1403 conn, child_conn = self.Pipe(False)
1404 for j in range(self.N):
1405 p = self.Process(target=self._test_thousand_f,
1406 args=(self.barrier, passes, child_conn, lock))
1407 p.start()
1408
1409 for i in range(passes):
1410 for j in range(self.N):
1411 self.assertEqual(conn.recv(), i)
1412
1413#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001414#
1415#
1416
1417class _TestValue(BaseTestCase):
1418
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001419 ALLOWED_TYPES = ('processes',)
1420
Benjamin Petersone711caf2008-06-11 16:44:04 +00001421 codes_values = [
1422 ('i', 4343, 24234),
1423 ('d', 3.625, -4.25),
1424 ('h', -232, 234),
1425 ('c', latin('x'), latin('y'))
1426 ]
1427
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001428 def setUp(self):
1429 if not HAS_SHAREDCTYPES:
1430 self.skipTest("requires multiprocessing.sharedctypes")
1431
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001432 @classmethod
1433 def _test(cls, values):
1434 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001435 sv.value = cv[2]
1436
1437
1438 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001439 if raw:
1440 values = [self.RawValue(code, value)
1441 for code, value, _ in self.codes_values]
1442 else:
1443 values = [self.Value(code, value)
1444 for code, value, _ in self.codes_values]
1445
1446 for sv, cv in zip(values, self.codes_values):
1447 self.assertEqual(sv.value, cv[1])
1448
1449 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001450 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001451 proc.start()
1452 proc.join()
1453
1454 for sv, cv in zip(values, self.codes_values):
1455 self.assertEqual(sv.value, cv[2])
1456
1457 def test_rawvalue(self):
1458 self.test_value(raw=True)
1459
1460 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001461 val1 = self.Value('i', 5)
1462 lock1 = val1.get_lock()
1463 obj1 = val1.get_obj()
1464
1465 val2 = self.Value('i', 5, lock=None)
1466 lock2 = val2.get_lock()
1467 obj2 = val2.get_obj()
1468
1469 lock = self.Lock()
1470 val3 = self.Value('i', 5, lock=lock)
1471 lock3 = val3.get_lock()
1472 obj3 = val3.get_obj()
1473 self.assertEqual(lock, lock3)
1474
Jesse Nollerb0516a62009-01-18 03:11:38 +00001475 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001476 self.assertFalse(hasattr(arr4, 'get_lock'))
1477 self.assertFalse(hasattr(arr4, 'get_obj'))
1478
Jesse Nollerb0516a62009-01-18 03:11:38 +00001479 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1480
1481 arr5 = self.RawValue('i', 5)
1482 self.assertFalse(hasattr(arr5, 'get_lock'))
1483 self.assertFalse(hasattr(arr5, 'get_obj'))
1484
Benjamin Petersone711caf2008-06-11 16:44:04 +00001485
1486class _TestArray(BaseTestCase):
1487
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001488 ALLOWED_TYPES = ('processes',)
1489
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001490 @classmethod
1491 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001492 for i in range(1, len(seq)):
1493 seq[i] += seq[i-1]
1494
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001495 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001496 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001497 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1498 if raw:
1499 arr = self.RawArray('i', seq)
1500 else:
1501 arr = self.Array('i', seq)
1502
1503 self.assertEqual(len(arr), len(seq))
1504 self.assertEqual(arr[3], seq[3])
1505 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1506
1507 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1508
1509 self.assertEqual(list(arr[:]), seq)
1510
1511 self.f(seq)
1512
1513 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001514 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001515 p.start()
1516 p.join()
1517
1518 self.assertEqual(list(arr[:]), seq)
1519
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001520 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001521 def test_array_from_size(self):
1522 size = 10
1523 # Test for zeroing (see issue #11675).
1524 # The repetition below strengthens the test by increasing the chances
1525 # of previously allocated non-zero memory being used for the new array
1526 # on the 2nd and 3rd loops.
1527 for _ in range(3):
1528 arr = self.Array('i', size)
1529 self.assertEqual(len(arr), size)
1530 self.assertEqual(list(arr), [0] * size)
1531 arr[:] = range(10)
1532 self.assertEqual(list(arr), list(range(10)))
1533 del arr
1534
1535 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001536 def test_rawarray(self):
1537 self.test_array(raw=True)
1538
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001539 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001540 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001541 arr1 = self.Array('i', list(range(10)))
1542 lock1 = arr1.get_lock()
1543 obj1 = arr1.get_obj()
1544
1545 arr2 = self.Array('i', list(range(10)), lock=None)
1546 lock2 = arr2.get_lock()
1547 obj2 = arr2.get_obj()
1548
1549 lock = self.Lock()
1550 arr3 = self.Array('i', list(range(10)), lock=lock)
1551 lock3 = arr3.get_lock()
1552 obj3 = arr3.get_obj()
1553 self.assertEqual(lock, lock3)
1554
Jesse Nollerb0516a62009-01-18 03:11:38 +00001555 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001556 self.assertFalse(hasattr(arr4, 'get_lock'))
1557 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001558 self.assertRaises(AttributeError,
1559 self.Array, 'i', range(10), lock='notalock')
1560
1561 arr5 = self.RawArray('i', range(10))
1562 self.assertFalse(hasattr(arr5, 'get_lock'))
1563 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001564
1565#
1566#
1567#
1568
1569class _TestContainers(BaseTestCase):
1570
1571 ALLOWED_TYPES = ('manager',)
1572
1573 def test_list(self):
1574 a = self.list(list(range(10)))
1575 self.assertEqual(a[:], list(range(10)))
1576
1577 b = self.list()
1578 self.assertEqual(b[:], [])
1579
1580 b.extend(list(range(5)))
1581 self.assertEqual(b[:], list(range(5)))
1582
1583 self.assertEqual(b[2], 2)
1584 self.assertEqual(b[2:10], [2,3,4])
1585
1586 b *= 2
1587 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1588
1589 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1590
1591 self.assertEqual(a[:], list(range(10)))
1592
1593 d = [a, b]
1594 e = self.list(d)
1595 self.assertEqual(
1596 e[:],
1597 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1598 )
1599
1600 f = self.list([a])
1601 a.append('hello')
1602 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1603
1604 def test_dict(self):
1605 d = self.dict()
1606 indices = list(range(65, 70))
1607 for i in indices:
1608 d[i] = chr(i)
1609 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1610 self.assertEqual(sorted(d.keys()), indices)
1611 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1612 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1613
1614 def test_namespace(self):
1615 n = self.Namespace()
1616 n.name = 'Bob'
1617 n.job = 'Builder'
1618 n._hidden = 'hidden'
1619 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1620 del n.job
1621 self.assertEqual(str(n), "Namespace(name='Bob')")
1622 self.assertTrue(hasattr(n, 'name'))
1623 self.assertTrue(not hasattr(n, 'job'))
1624
1625#
1626#
1627#
1628
1629def sqr(x, wait=0.0):
1630 time.sleep(wait)
1631 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001632
Antoine Pitroude911b22011-12-21 11:03:24 +01001633def mul(x, y):
1634 return x*y
1635
Benjamin Petersone711caf2008-06-11 16:44:04 +00001636class _TestPool(BaseTestCase):
1637
Richard Oudkerkd15642e2013-07-16 15:33:41 +01001638 @classmethod
1639 def setUpClass(cls):
1640 super().setUpClass()
1641 cls.pool = cls.Pool(4)
1642
1643 @classmethod
1644 def tearDownClass(cls):
1645 cls.pool.terminate()
1646 cls.pool.join()
1647 cls.pool = None
1648 super().tearDownClass()
1649
Benjamin Petersone711caf2008-06-11 16:44:04 +00001650 def test_apply(self):
1651 papply = self.pool.apply
1652 self.assertEqual(papply(sqr, (5,)), sqr(5))
1653 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1654
1655 def test_map(self):
1656 pmap = self.pool.map
1657 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1658 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1659 list(map(sqr, list(range(100)))))
1660
Antoine Pitroude911b22011-12-21 11:03:24 +01001661 def test_starmap(self):
1662 psmap = self.pool.starmap
1663 tuples = list(zip(range(10), range(9,-1, -1)))
1664 self.assertEqual(psmap(mul, tuples),
1665 list(itertools.starmap(mul, tuples)))
1666 tuples = list(zip(range(100), range(99,-1, -1)))
1667 self.assertEqual(psmap(mul, tuples, chunksize=20),
1668 list(itertools.starmap(mul, tuples)))
1669
1670 def test_starmap_async(self):
1671 tuples = list(zip(range(100), range(99,-1, -1)))
1672 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1673 list(itertools.starmap(mul, tuples)))
1674
Hynek Schlawack254af262012-10-27 12:53:02 +02001675 def test_map_async(self):
1676 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1677 list(map(sqr, list(range(10)))))
1678
1679 def test_map_async_callbacks(self):
1680 call_args = self.manager.list() if self.TYPE == 'manager' else []
1681 self.pool.map_async(int, ['1'],
1682 callback=call_args.append,
1683 error_callback=call_args.append).wait()
1684 self.assertEqual(1, len(call_args))
1685 self.assertEqual([1], call_args[0])
1686 self.pool.map_async(int, ['a'],
1687 callback=call_args.append,
1688 error_callback=call_args.append).wait()
1689 self.assertEqual(2, len(call_args))
1690 self.assertIsInstance(call_args[1], ValueError)
1691
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001692 def test_map_unplicklable(self):
1693 # Issue #19425 -- failure to pickle should not cause a hang
1694 if self.TYPE == 'threads':
Zachary Ware9fe6d862013-12-08 00:20:35 -06001695 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001696 class A(object):
1697 def __reduce__(self):
1698 raise RuntimeError('cannot pickle')
1699 with self.assertRaises(RuntimeError):
1700 self.pool.map(sqr, [A()]*10)
1701
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001702 def test_map_chunksize(self):
1703 try:
1704 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1705 except multiprocessing.TimeoutError:
1706 self.fail("pool.map_async with chunksize stalled on null list")
1707
Benjamin Petersone711caf2008-06-11 16:44:04 +00001708 def test_async(self):
1709 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1710 get = TimingWrapper(res.get)
1711 self.assertEqual(get(), 49)
1712 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1713
1714 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00001715 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001716 get = TimingWrapper(res.get)
1717 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1718 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1719
1720 def test_imap(self):
1721 it = self.pool.imap(sqr, list(range(10)))
1722 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1723
1724 it = self.pool.imap(sqr, list(range(10)))
1725 for i in range(10):
1726 self.assertEqual(next(it), i*i)
1727 self.assertRaises(StopIteration, it.__next__)
1728
1729 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1730 for i in range(1000):
1731 self.assertEqual(next(it), i*i)
1732 self.assertRaises(StopIteration, it.__next__)
1733
1734 def test_imap_unordered(self):
1735 it = self.pool.imap_unordered(sqr, list(range(1000)))
1736 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1737
1738 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1739 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1740
1741 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001742 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1743 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1744
Benjamin Petersone711caf2008-06-11 16:44:04 +00001745 p = multiprocessing.Pool(3)
1746 self.assertEqual(3, len(p._pool))
1747 p.close()
1748 p.join()
1749
1750 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001751 result = self.pool.map_async(
1752 time.sleep, [0.1 for i in range(10000)], chunksize=1
1753 )
1754 self.pool.terminate()
1755 join = TimingWrapper(self.pool.join)
1756 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001757 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001758
Richard Oudkerke41682b2012-06-06 19:04:57 +01001759 def test_empty_iterable(self):
1760 # See Issue 12157
1761 p = self.Pool(1)
1762
1763 self.assertEqual(p.map(sqr, []), [])
1764 self.assertEqual(list(p.imap(sqr, [])), [])
1765 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1766 self.assertEqual(p.map_async(sqr, []).get(), [])
1767
1768 p.close()
1769 p.join()
1770
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001771 def test_context(self):
1772 if self.TYPE == 'processes':
1773 L = list(range(10))
1774 expected = [sqr(i) for i in L]
1775 with multiprocessing.Pool(2) as p:
1776 r = p.map_async(sqr, L)
1777 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001778 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001779
Ask Solem2afcbf22010-11-09 20:55:52 +00001780def raising():
1781 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001782
Ask Solem2afcbf22010-11-09 20:55:52 +00001783def unpickleable_result():
1784 return lambda: 42
1785
1786class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001787 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001788
1789 def test_async_error_callback(self):
1790 p = multiprocessing.Pool(2)
1791
1792 scratchpad = [None]
1793 def errback(exc):
1794 scratchpad[0] = exc
1795
1796 res = p.apply_async(raising, error_callback=errback)
1797 self.assertRaises(KeyError, res.get)
1798 self.assertTrue(scratchpad[0])
1799 self.assertIsInstance(scratchpad[0], KeyError)
1800
1801 p.close()
1802 p.join()
1803
1804 def test_unpickleable_result(self):
1805 from multiprocessing.pool import MaybeEncodingError
1806 p = multiprocessing.Pool(2)
1807
1808 # Make sure we don't lose pool processes because of encoding errors.
1809 for iteration in range(20):
1810
1811 scratchpad = [None]
1812 def errback(exc):
1813 scratchpad[0] = exc
1814
1815 res = p.apply_async(unpickleable_result, error_callback=errback)
1816 self.assertRaises(MaybeEncodingError, res.get)
1817 wrapped = scratchpad[0]
1818 self.assertTrue(wrapped)
1819 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1820 self.assertIsNotNone(wrapped.exc)
1821 self.assertIsNotNone(wrapped.value)
1822
1823 p.close()
1824 p.join()
1825
1826class _TestPoolWorkerLifetime(BaseTestCase):
1827 ALLOWED_TYPES = ('processes', )
1828
Jesse Noller1f0b6582010-01-27 03:36:01 +00001829 def test_pool_worker_lifetime(self):
1830 p = multiprocessing.Pool(3, maxtasksperchild=10)
1831 self.assertEqual(3, len(p._pool))
1832 origworkerpids = [w.pid for w in p._pool]
1833 # Run many tasks so each worker gets replaced (hopefully)
1834 results = []
1835 for i in range(100):
1836 results.append(p.apply_async(sqr, (i, )))
1837 # Fetch the results and verify we got the right answers,
1838 # also ensuring all the tasks have completed.
1839 for (j, res) in enumerate(results):
1840 self.assertEqual(res.get(), sqr(j))
1841 # Refill the pool
1842 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001843 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001844 # (countdown * DELTA = 5 seconds max startup process time)
1845 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001846 while countdown and not all(w.is_alive() for w in p._pool):
1847 countdown -= 1
1848 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001849 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001850 # All pids should be assigned. See issue #7805.
1851 self.assertNotIn(None, origworkerpids)
1852 self.assertNotIn(None, finalworkerpids)
1853 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001854 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1855 p.close()
1856 p.join()
1857
Charles-François Natalif8859e12011-10-24 18:45:29 +02001858 def test_pool_worker_lifetime_early_close(self):
1859 # Issue #10332: closing a pool whose workers have limited lifetimes
1860 # before all the tasks completed would make join() hang.
1861 p = multiprocessing.Pool(3, maxtasksperchild=1)
1862 results = []
1863 for i in range(6):
1864 results.append(p.apply_async(sqr, (i, 0.3)))
1865 p.close()
1866 p.join()
1867 # check the results
1868 for (j, res) in enumerate(results):
1869 self.assertEqual(res.get(), sqr(j))
1870
Benjamin Petersone711caf2008-06-11 16:44:04 +00001871#
1872# Test of creating a customized manager class
1873#
1874
1875from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1876
1877class FooBar(object):
1878 def f(self):
1879 return 'f()'
1880 def g(self):
1881 raise ValueError
1882 def _h(self):
1883 return '_h()'
1884
1885def baz():
1886 for i in range(10):
1887 yield i*i
1888
1889class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001890 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001891 def __iter__(self):
1892 return self
1893 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001894 return self._callmethod('__next__')
1895
1896class MyManager(BaseManager):
1897 pass
1898
1899MyManager.register('Foo', callable=FooBar)
1900MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1901MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1902
1903
1904class _TestMyManager(BaseTestCase):
1905
1906 ALLOWED_TYPES = ('manager',)
1907
1908 def test_mymanager(self):
1909 manager = MyManager()
1910 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001911 self.common(manager)
1912 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001913
Richard Oudkerkac385712012-06-18 21:29:30 +01001914 # If the manager process exited cleanly then the exitcode
1915 # will be zero. Otherwise (after a short timeout)
1916 # terminate() is used, resulting in an exitcode of -SIGTERM.
1917 self.assertEqual(manager._process.exitcode, 0)
1918
1919 def test_mymanager_context(self):
1920 with MyManager() as manager:
1921 self.common(manager)
1922 self.assertEqual(manager._process.exitcode, 0)
1923
1924 def test_mymanager_context_prestarted(self):
1925 manager = MyManager()
1926 manager.start()
1927 with manager:
1928 self.common(manager)
1929 self.assertEqual(manager._process.exitcode, 0)
1930
1931 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001932 foo = manager.Foo()
1933 bar = manager.Bar()
1934 baz = manager.baz()
1935
1936 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1937 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1938
1939 self.assertEqual(foo_methods, ['f', 'g'])
1940 self.assertEqual(bar_methods, ['f', '_h'])
1941
1942 self.assertEqual(foo.f(), 'f()')
1943 self.assertRaises(ValueError, foo.g)
1944 self.assertEqual(foo._callmethod('f'), 'f()')
1945 self.assertRaises(RemoteError, foo._callmethod, '_h')
1946
1947 self.assertEqual(bar.f(), 'f()')
1948 self.assertEqual(bar._h(), '_h()')
1949 self.assertEqual(bar._callmethod('f'), 'f()')
1950 self.assertEqual(bar._callmethod('_h'), '_h()')
1951
1952 self.assertEqual(list(baz), [i*i for i in range(10)])
1953
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001954
Benjamin Petersone711caf2008-06-11 16:44:04 +00001955#
1956# Test of connecting to a remote server and using xmlrpclib for serialization
1957#
1958
1959_queue = pyqueue.Queue()
1960def get_queue():
1961 return _queue
1962
1963class QueueManager(BaseManager):
1964 '''manager class used by server process'''
1965QueueManager.register('get_queue', callable=get_queue)
1966
1967class QueueManager2(BaseManager):
1968 '''manager class which specifies the same interface as QueueManager'''
1969QueueManager2.register('get_queue')
1970
1971
1972SERIALIZER = 'xmlrpclib'
1973
1974class _TestRemoteManager(BaseTestCase):
1975
1976 ALLOWED_TYPES = ('manager',)
1977
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001978 @classmethod
1979 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001980 manager = QueueManager2(
1981 address=address, authkey=authkey, serializer=SERIALIZER
1982 )
1983 manager.connect()
1984 queue = manager.get_queue()
1985 queue.put(('hello world', None, True, 2.25))
1986
1987 def test_remote(self):
1988 authkey = os.urandom(32)
1989
1990 manager = QueueManager(
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02001991 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00001992 )
1993 manager.start()
1994
1995 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001996 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001997 p.start()
1998
1999 manager2 = QueueManager2(
2000 address=manager.address, authkey=authkey, serializer=SERIALIZER
2001 )
2002 manager2.connect()
2003 queue = manager2.get_queue()
2004
2005 # Note that xmlrpclib will deserialize object as a list not a tuple
2006 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
2007
2008 # Because we are using xmlrpclib for serialization instead of
2009 # pickle this will cause a serialization error.
2010 self.assertRaises(Exception, queue.put, time.sleep)
2011
2012 # Make queue finalizer run before the server is stopped
2013 del queue
2014 manager.shutdown()
2015
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002016class _TestManagerRestart(BaseTestCase):
2017
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002018 @classmethod
2019 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002020 manager = QueueManager(
2021 address=address, authkey=authkey, serializer=SERIALIZER)
2022 manager.connect()
2023 queue = manager.get_queue()
2024 queue.put('hello world')
2025
2026 def test_rapid_restart(self):
2027 authkey = os.urandom(32)
2028 manager = QueueManager(
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02002029 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002030 srvr = manager.get_server()
2031 addr = srvr.address
2032 # Close the connection.Listener socket which gets opened as a part
2033 # of manager.get_server(). It's not needed for the test.
2034 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002035 manager.start()
2036
2037 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002038 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002039 p.start()
2040 queue = manager.get_queue()
2041 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002042 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002043 manager.shutdown()
2044 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002045 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002046 try:
2047 manager.start()
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002048 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002049 if e.errno != errno.EADDRINUSE:
2050 raise
2051 # Retry after some time, in case the old socket was lingering
2052 # (sporadic failure on buildbots)
2053 time.sleep(1.0)
2054 manager = QueueManager(
2055 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002056 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002057
Benjamin Petersone711caf2008-06-11 16:44:04 +00002058#
2059#
2060#
2061
2062SENTINEL = latin('')
2063
2064class _TestConnection(BaseTestCase):
2065
2066 ALLOWED_TYPES = ('processes', 'threads')
2067
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002068 @classmethod
2069 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002070 for msg in iter(conn.recv_bytes, SENTINEL):
2071 conn.send_bytes(msg)
2072 conn.close()
2073
2074 def test_connection(self):
2075 conn, child_conn = self.Pipe()
2076
2077 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002078 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002079 p.start()
2080
2081 seq = [1, 2.25, None]
2082 msg = latin('hello world')
2083 longmsg = msg * 10
2084 arr = array.array('i', list(range(4)))
2085
2086 if self.TYPE == 'processes':
2087 self.assertEqual(type(conn.fileno()), int)
2088
2089 self.assertEqual(conn.send(seq), None)
2090 self.assertEqual(conn.recv(), seq)
2091
2092 self.assertEqual(conn.send_bytes(msg), None)
2093 self.assertEqual(conn.recv_bytes(), msg)
2094
2095 if self.TYPE == 'processes':
2096 buffer = array.array('i', [0]*10)
2097 expected = list(arr) + [0] * (10 - len(arr))
2098 self.assertEqual(conn.send_bytes(arr), None)
2099 self.assertEqual(conn.recv_bytes_into(buffer),
2100 len(arr) * buffer.itemsize)
2101 self.assertEqual(list(buffer), expected)
2102
2103 buffer = array.array('i', [0]*10)
2104 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2105 self.assertEqual(conn.send_bytes(arr), None)
2106 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2107 len(arr) * buffer.itemsize)
2108 self.assertEqual(list(buffer), expected)
2109
2110 buffer = bytearray(latin(' ' * 40))
2111 self.assertEqual(conn.send_bytes(longmsg), None)
2112 try:
2113 res = conn.recv_bytes_into(buffer)
2114 except multiprocessing.BufferTooShort as e:
2115 self.assertEqual(e.args, (longmsg,))
2116 else:
2117 self.fail('expected BufferTooShort, got %s' % res)
2118
2119 poll = TimingWrapper(conn.poll)
2120
2121 self.assertEqual(poll(), False)
2122 self.assertTimingAlmostEqual(poll.elapsed, 0)
2123
Richard Oudkerk59d54042012-05-10 16:11:12 +01002124 self.assertEqual(poll(-1), False)
2125 self.assertTimingAlmostEqual(poll.elapsed, 0)
2126
Benjamin Petersone711caf2008-06-11 16:44:04 +00002127 self.assertEqual(poll(TIMEOUT1), False)
2128 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2129
2130 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002131 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002132
2133 self.assertEqual(poll(TIMEOUT1), True)
2134 self.assertTimingAlmostEqual(poll.elapsed, 0)
2135
2136 self.assertEqual(conn.recv(), None)
2137
2138 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2139 conn.send_bytes(really_big_msg)
2140 self.assertEqual(conn.recv_bytes(), really_big_msg)
2141
2142 conn.send_bytes(SENTINEL) # tell child to quit
2143 child_conn.close()
2144
2145 if self.TYPE == 'processes':
2146 self.assertEqual(conn.readable, True)
2147 self.assertEqual(conn.writable, True)
2148 self.assertRaises(EOFError, conn.recv)
2149 self.assertRaises(EOFError, conn.recv_bytes)
2150
2151 p.join()
2152
2153 def test_duplex_false(self):
2154 reader, writer = self.Pipe(duplex=False)
2155 self.assertEqual(writer.send(1), None)
2156 self.assertEqual(reader.recv(), 1)
2157 if self.TYPE == 'processes':
2158 self.assertEqual(reader.readable, True)
2159 self.assertEqual(reader.writable, False)
2160 self.assertEqual(writer.readable, False)
2161 self.assertEqual(writer.writable, True)
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002162 self.assertRaises(OSError, reader.send, 2)
2163 self.assertRaises(OSError, writer.recv)
2164 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002165
2166 def test_spawn_close(self):
2167 # We test that a pipe connection can be closed by parent
2168 # process immediately after child is spawned. On Windows this
2169 # would have sometimes failed on old versions because
2170 # child_conn would be closed before the child got a chance to
2171 # duplicate it.
2172 conn, child_conn = self.Pipe()
2173
2174 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002175 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002176 p.start()
2177 child_conn.close() # this might complete before child initializes
2178
2179 msg = latin('hello')
2180 conn.send_bytes(msg)
2181 self.assertEqual(conn.recv_bytes(), msg)
2182
2183 conn.send_bytes(SENTINEL)
2184 conn.close()
2185 p.join()
2186
2187 def test_sendbytes(self):
2188 if self.TYPE != 'processes':
Zachary Ware9fe6d862013-12-08 00:20:35 -06002189 self.skipTest('test not appropriate for {}'.format(self.TYPE))
Benjamin Petersone711caf2008-06-11 16:44:04 +00002190
2191 msg = latin('abcdefghijklmnopqrstuvwxyz')
2192 a, b = self.Pipe()
2193
2194 a.send_bytes(msg)
2195 self.assertEqual(b.recv_bytes(), msg)
2196
2197 a.send_bytes(msg, 5)
2198 self.assertEqual(b.recv_bytes(), msg[5:])
2199
2200 a.send_bytes(msg, 7, 8)
2201 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2202
2203 a.send_bytes(msg, 26)
2204 self.assertEqual(b.recv_bytes(), latin(''))
2205
2206 a.send_bytes(msg, 26, 0)
2207 self.assertEqual(b.recv_bytes(), latin(''))
2208
2209 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2210
2211 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2212
2213 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2214
2215 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2216
2217 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2218
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002219 @classmethod
2220 def _is_fd_assigned(cls, fd):
2221 try:
2222 os.fstat(fd)
2223 except OSError as e:
2224 if e.errno == errno.EBADF:
2225 return False
2226 raise
2227 else:
2228 return True
2229
2230 @classmethod
2231 def _writefd(cls, conn, data, create_dummy_fds=False):
2232 if create_dummy_fds:
2233 for i in range(0, 256):
2234 if not cls._is_fd_assigned(i):
2235 os.dup2(conn.fileno(), i)
2236 fd = reduction.recv_handle(conn)
2237 if msvcrt:
2238 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2239 os.write(fd, data)
2240 os.close(fd)
2241
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002242 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002243 def test_fd_transfer(self):
2244 if self.TYPE != 'processes':
2245 self.skipTest("only makes sense with processes")
2246 conn, child_conn = self.Pipe(duplex=True)
2247
2248 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002249 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002250 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002251 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002252 with open(test.support.TESTFN, "wb") as f:
2253 fd = f.fileno()
2254 if msvcrt:
2255 fd = msvcrt.get_osfhandle(fd)
2256 reduction.send_handle(conn, fd, p.pid)
2257 p.join()
2258 with open(test.support.TESTFN, "rb") as f:
2259 self.assertEqual(f.read(), b"foo")
2260
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002261 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002262 @unittest.skipIf(sys.platform == "win32",
2263 "test semantics don't make sense on Windows")
2264 @unittest.skipIf(MAXFD <= 256,
2265 "largest assignable fd number is too small")
2266 @unittest.skipUnless(hasattr(os, "dup2"),
2267 "test needs os.dup2()")
2268 def test_large_fd_transfer(self):
2269 # With fd > 256 (issue #11657)
2270 if self.TYPE != 'processes':
2271 self.skipTest("only makes sense with processes")
2272 conn, child_conn = self.Pipe(duplex=True)
2273
2274 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002275 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002276 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002277 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002278 with open(test.support.TESTFN, "wb") as f:
2279 fd = f.fileno()
2280 for newfd in range(256, MAXFD):
2281 if not self._is_fd_assigned(newfd):
2282 break
2283 else:
2284 self.fail("could not find an unassigned large file descriptor")
2285 os.dup2(fd, newfd)
2286 try:
2287 reduction.send_handle(conn, newfd, p.pid)
2288 finally:
2289 os.close(newfd)
2290 p.join()
2291 with open(test.support.TESTFN, "rb") as f:
2292 self.assertEqual(f.read(), b"bar")
2293
Jesus Cea4507e642011-09-21 03:53:25 +02002294 @classmethod
2295 def _send_data_without_fd(self, conn):
2296 os.write(conn.fileno(), b"\0")
2297
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002298 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002299 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2300 def test_missing_fd_transfer(self):
2301 # Check that exception is raised when received data is not
2302 # accompanied by a file descriptor in ancillary data.
2303 if self.TYPE != 'processes':
2304 self.skipTest("only makes sense with processes")
2305 conn, child_conn = self.Pipe(duplex=True)
2306
2307 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2308 p.daemon = True
2309 p.start()
2310 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2311 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002312
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002313 def test_context(self):
2314 a, b = self.Pipe()
2315
2316 with a, b:
2317 a.send(1729)
2318 self.assertEqual(b.recv(), 1729)
2319 if self.TYPE == 'processes':
2320 self.assertFalse(a.closed)
2321 self.assertFalse(b.closed)
2322
2323 if self.TYPE == 'processes':
2324 self.assertTrue(a.closed)
2325 self.assertTrue(b.closed)
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002326 self.assertRaises(OSError, a.recv)
2327 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002328
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002329class _TestListener(BaseTestCase):
2330
Richard Oudkerk91257752012-06-15 21:53:34 +01002331 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002332
2333 def test_multiple_bind(self):
2334 for family in self.connection.families:
2335 l = self.connection.Listener(family=family)
2336 self.addCleanup(l.close)
2337 self.assertRaises(OSError, self.connection.Listener,
2338 l.address, family)
2339
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002340 def test_context(self):
2341 with self.connection.Listener() as l:
2342 with self.connection.Client(l.address) as c:
2343 with l.accept() as d:
2344 c.send(1729)
2345 self.assertEqual(d.recv(), 1729)
2346
2347 if self.TYPE == 'processes':
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002348 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002349
Benjamin Petersone711caf2008-06-11 16:44:04 +00002350class _TestListenerClient(BaseTestCase):
2351
2352 ALLOWED_TYPES = ('processes', 'threads')
2353
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002354 @classmethod
2355 def _test(cls, address):
2356 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002357 conn.send('hello')
2358 conn.close()
2359
2360 def test_listener_client(self):
2361 for family in self.connection.families:
2362 l = self.connection.Listener(family=family)
2363 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002364 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002365 p.start()
2366 conn = l.accept()
2367 self.assertEqual(conn.recv(), 'hello')
2368 p.join()
2369 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002370
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002371 def test_issue14725(self):
2372 l = self.connection.Listener()
2373 p = self.Process(target=self._test, args=(l.address,))
2374 p.daemon = True
2375 p.start()
2376 time.sleep(1)
2377 # On Windows the client process should by now have connected,
2378 # written data and closed the pipe handle by now. This causes
2379 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2380 # 14725.
2381 conn = l.accept()
2382 self.assertEqual(conn.recv(), 'hello')
2383 conn.close()
2384 p.join()
2385 l.close()
2386
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002387 def test_issue16955(self):
2388 for fam in self.connection.families:
2389 l = self.connection.Listener(family=fam)
2390 c = self.connection.Client(l.address)
2391 a = l.accept()
2392 a.send_bytes(b"hello")
2393 self.assertTrue(c.poll(1))
2394 a.close()
2395 c.close()
2396 l.close()
2397
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002398class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002399
2400 ALLOWED_TYPES = ('processes', 'threads')
2401
2402 def test_empty_string(self):
2403 a, b = self.Pipe()
2404 self.assertEqual(a.poll(), False)
2405 b.send_bytes(b'')
2406 self.assertEqual(a.poll(), True)
2407 self.assertEqual(a.poll(), True)
2408
2409 @classmethod
2410 def _child_strings(cls, conn, strings):
2411 for s in strings:
2412 time.sleep(0.1)
2413 conn.send_bytes(s)
2414 conn.close()
2415
2416 def test_strings(self):
2417 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2418 a, b = self.Pipe()
2419 p = self.Process(target=self._child_strings, args=(b, strings))
2420 p.start()
2421
2422 for s in strings:
2423 for i in range(200):
2424 if a.poll(0.01):
2425 break
2426 x = a.recv_bytes()
2427 self.assertEqual(s, x)
2428
2429 p.join()
2430
2431 @classmethod
2432 def _child_boundaries(cls, r):
2433 # Polling may "pull" a message in to the child process, but we
2434 # don't want it to pull only part of a message, as that would
2435 # corrupt the pipe for any other processes which might later
2436 # read from it.
2437 r.poll(5)
2438
2439 def test_boundaries(self):
2440 r, w = self.Pipe(False)
2441 p = self.Process(target=self._child_boundaries, args=(r,))
2442 p.start()
2443 time.sleep(2)
2444 L = [b"first", b"second"]
2445 for obj in L:
2446 w.send_bytes(obj)
2447 w.close()
2448 p.join()
2449 self.assertIn(r.recv_bytes(), L)
2450
2451 @classmethod
2452 def _child_dont_merge(cls, b):
2453 b.send_bytes(b'a')
2454 b.send_bytes(b'b')
2455 b.send_bytes(b'cd')
2456
2457 def test_dont_merge(self):
2458 a, b = self.Pipe()
2459 self.assertEqual(a.poll(0.0), False)
2460 self.assertEqual(a.poll(0.1), False)
2461
2462 p = self.Process(target=self._child_dont_merge, args=(b,))
2463 p.start()
2464
2465 self.assertEqual(a.recv_bytes(), b'a')
2466 self.assertEqual(a.poll(1.0), True)
2467 self.assertEqual(a.poll(1.0), True)
2468 self.assertEqual(a.recv_bytes(), b'b')
2469 self.assertEqual(a.poll(1.0), True)
2470 self.assertEqual(a.poll(1.0), True)
2471 self.assertEqual(a.poll(0.0), True)
2472 self.assertEqual(a.recv_bytes(), b'cd')
2473
2474 p.join()
2475
Benjamin Petersone711caf2008-06-11 16:44:04 +00002476#
2477# Test of sending connection and socket objects between processes
2478#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002479
2480@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002481class _TestPicklingConnections(BaseTestCase):
2482
2483 ALLOWED_TYPES = ('processes',)
2484
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002485 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002486 def tearDownClass(cls):
2487 from multiprocessing.reduction import resource_sharer
2488 resource_sharer.stop(timeout=5)
2489
2490 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002491 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002492 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002493 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002494 conn.send(l.address)
2495 new_conn = l.accept()
2496 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002497 new_conn.close()
2498 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002499
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002500 l = socket.socket()
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02002501 l.bind((test.support.HOST, 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002502 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002503 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002504 new_conn, addr = l.accept()
2505 conn.send(new_conn)
2506 new_conn.close()
2507 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002508
2509 conn.recv()
2510
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002511 @classmethod
2512 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002513 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002514 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002515 client.send(msg.upper())
2516 client.close()
2517
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002518 address, msg = conn.recv()
2519 client = socket.socket()
2520 client.connect(address)
2521 client.sendall(msg.upper())
2522 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002523
2524 conn.close()
2525
2526 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002527 families = self.connection.families
2528
2529 lconn, lconn0 = self.Pipe()
2530 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002531 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002532 lp.start()
2533 lconn0.close()
2534
2535 rconn, rconn0 = self.Pipe()
2536 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002537 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002538 rp.start()
2539 rconn0.close()
2540
2541 for fam in families:
2542 msg = ('This connection uses family %s' % fam).encode('ascii')
2543 address = lconn.recv()
2544 rconn.send((address, msg))
2545 new_conn = lconn.recv()
2546 self.assertEqual(new_conn.recv(), msg.upper())
2547
2548 rconn.send(None)
2549
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002550 msg = latin('This connection uses a normal socket')
2551 address = lconn.recv()
2552 rconn.send((address, msg))
2553 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002554 buf = []
2555 while True:
2556 s = new_conn.recv(100)
2557 if not s:
2558 break
2559 buf.append(s)
2560 buf = b''.join(buf)
2561 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002562 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002563
2564 lconn.send(None)
2565
2566 rconn.close()
2567 lconn.close()
2568
2569 lp.join()
2570 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002571
2572 @classmethod
2573 def child_access(cls, conn):
2574 w = conn.recv()
2575 w.send('all is well')
2576 w.close()
2577
2578 r = conn.recv()
2579 msg = r.recv()
2580 conn.send(msg*2)
2581
2582 conn.close()
2583
2584 def test_access(self):
2585 # On Windows, if we do not specify a destination pid when
2586 # using DupHandle then we need to be careful to use the
2587 # correct access flags for DuplicateHandle(), or else
2588 # DupHandle.detach() will raise PermissionError. For example,
2589 # for a read only pipe handle we should use
2590 # access=FILE_GENERIC_READ. (Unfortunately
2591 # DUPLICATE_SAME_ACCESS does not work.)
2592 conn, child_conn = self.Pipe()
2593 p = self.Process(target=self.child_access, args=(child_conn,))
2594 p.daemon = True
2595 p.start()
2596 child_conn.close()
2597
2598 r, w = self.Pipe(duplex=False)
2599 conn.send(w)
2600 w.close()
2601 self.assertEqual(r.recv(), 'all is well')
2602 r.close()
2603
2604 r, w = self.Pipe(duplex=False)
2605 conn.send(r)
2606 r.close()
2607 w.send('foobar')
2608 w.close()
2609 self.assertEqual(conn.recv(), 'foobar'*2)
2610
Benjamin Petersone711caf2008-06-11 16:44:04 +00002611#
2612#
2613#
2614
2615class _TestHeap(BaseTestCase):
2616
2617 ALLOWED_TYPES = ('processes',)
2618
2619 def test_heap(self):
2620 iterations = 5000
2621 maxblocks = 50
2622 blocks = []
2623
2624 # create and destroy lots of blocks of different sizes
2625 for i in range(iterations):
2626 size = int(random.lognormvariate(0, 1) * 1000)
2627 b = multiprocessing.heap.BufferWrapper(size)
2628 blocks.append(b)
2629 if len(blocks) > maxblocks:
2630 i = random.randrange(maxblocks)
2631 del blocks[i]
2632
2633 # get the heap object
2634 heap = multiprocessing.heap.BufferWrapper._heap
2635
2636 # verify the state of the heap
2637 all = []
2638 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002639 heap._lock.acquire()
2640 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002641 for L in list(heap._len_to_seq.values()):
2642 for arena, start, stop in L:
2643 all.append((heap._arenas.index(arena), start, stop,
2644 stop-start, 'free'))
2645 for arena, start, stop in heap._allocated_blocks:
2646 all.append((heap._arenas.index(arena), start, stop,
2647 stop-start, 'occupied'))
2648 occupied += (stop-start)
2649
2650 all.sort()
2651
2652 for i in range(len(all)-1):
2653 (arena, start, stop) = all[i][:3]
2654 (narena, nstart, nstop) = all[i+1][:3]
2655 self.assertTrue((arena != narena and nstart == 0) or
2656 (stop == nstart))
2657
Charles-François Natali778db492011-07-02 14:35:49 +02002658 def test_free_from_gc(self):
2659 # Check that freeing of blocks by the garbage collector doesn't deadlock
2660 # (issue #12352).
2661 # Make sure the GC is enabled, and set lower collection thresholds to
2662 # make collections more frequent (and increase the probability of
2663 # deadlock).
2664 if not gc.isenabled():
2665 gc.enable()
2666 self.addCleanup(gc.disable)
2667 thresholds = gc.get_threshold()
2668 self.addCleanup(gc.set_threshold, *thresholds)
2669 gc.set_threshold(10)
2670
2671 # perform numerous block allocations, with cyclic references to make
2672 # sure objects are collected asynchronously by the gc
2673 for i in range(5000):
2674 a = multiprocessing.heap.BufferWrapper(1)
2675 b = multiprocessing.heap.BufferWrapper(1)
2676 # circular references
2677 a.buddy = b
2678 b.buddy = a
2679
Benjamin Petersone711caf2008-06-11 16:44:04 +00002680#
2681#
2682#
2683
Benjamin Petersone711caf2008-06-11 16:44:04 +00002684class _Foo(Structure):
2685 _fields_ = [
2686 ('x', c_int),
2687 ('y', c_double)
2688 ]
2689
2690class _TestSharedCTypes(BaseTestCase):
2691
2692 ALLOWED_TYPES = ('processes',)
2693
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002694 def setUp(self):
2695 if not HAS_SHAREDCTYPES:
2696 self.skipTest("requires multiprocessing.sharedctypes")
2697
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002698 @classmethod
2699 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002700 x.value *= 2
2701 y.value *= 2
2702 foo.x *= 2
2703 foo.y *= 2
2704 string.value *= 2
2705 for i in range(len(arr)):
2706 arr[i] *= 2
2707
2708 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002709 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002710 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002711 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002712 arr = self.Array('d', list(range(10)), lock=lock)
2713 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002714 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002715
2716 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002717 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002718 p.start()
2719 p.join()
2720
2721 self.assertEqual(x.value, 14)
2722 self.assertAlmostEqual(y.value, 2.0/3.0)
2723 self.assertEqual(foo.x, 6)
2724 self.assertAlmostEqual(foo.y, 4.0)
2725 for i in range(10):
2726 self.assertAlmostEqual(arr[i], i*2)
2727 self.assertEqual(string.value, latin('hellohello'))
2728
2729 def test_synchronize(self):
2730 self.test_sharedctypes(lock=True)
2731
2732 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002733 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002734 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002735 foo.x = 0
2736 foo.y = 0
2737 self.assertEqual(bar.x, 2)
2738 self.assertAlmostEqual(bar.y, 5.0)
2739
2740#
2741#
2742#
2743
2744class _TestFinalize(BaseTestCase):
2745
2746 ALLOWED_TYPES = ('processes',)
2747
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002748 @classmethod
2749 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002750 class Foo(object):
2751 pass
2752
2753 a = Foo()
2754 util.Finalize(a, conn.send, args=('a',))
2755 del a # triggers callback for a
2756
2757 b = Foo()
2758 close_b = util.Finalize(b, conn.send, args=('b',))
2759 close_b() # triggers callback for b
2760 close_b() # does nothing because callback has already been called
2761 del b # does nothing because callback has already been called
2762
2763 c = Foo()
2764 util.Finalize(c, conn.send, args=('c',))
2765
2766 d10 = Foo()
2767 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2768
2769 d01 = Foo()
2770 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2771 d02 = Foo()
2772 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2773 d03 = Foo()
2774 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2775
2776 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2777
2778 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2779
Ezio Melotti13925002011-03-16 11:05:33 +02002780 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002781 # garbage collecting locals
2782 util._exit_function()
2783 conn.close()
2784 os._exit(0)
2785
2786 def test_finalize(self):
2787 conn, child_conn = self.Pipe()
2788
2789 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002790 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002791 p.start()
2792 p.join()
2793
2794 result = [obj for obj in iter(conn.recv, 'STOP')]
2795 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2796
2797#
2798# Test that from ... import * works for each module
2799#
2800
2801class _TestImportStar(BaseTestCase):
2802
2803 ALLOWED_TYPES = ('processes',)
2804
2805 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002806 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002807 'multiprocessing', 'multiprocessing.connection',
2808 'multiprocessing.heap', 'multiprocessing.managers',
2809 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002810 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002811 ]
2812
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002813 if HAS_REDUCTION:
2814 modules.append('multiprocessing.reduction')
2815
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002816 if c_int is not None:
2817 # This module requires _ctypes
2818 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002819
2820 for name in modules:
2821 __import__(name)
2822 mod = sys.modules[name]
2823
2824 for attr in getattr(mod, '__all__', ()):
2825 self.assertTrue(
2826 hasattr(mod, attr),
2827 '%r does not have attribute %r' % (mod, attr)
2828 )
2829
2830#
2831# Quick test that logging works -- does not test logging output
2832#
2833
2834class _TestLogging(BaseTestCase):
2835
2836 ALLOWED_TYPES = ('processes',)
2837
2838 def test_enable_logging(self):
2839 logger = multiprocessing.get_logger()
2840 logger.setLevel(util.SUBWARNING)
2841 self.assertTrue(logger is not None)
2842 logger.debug('this will not be printed')
2843 logger.info('nor will this')
2844 logger.setLevel(LOG_LEVEL)
2845
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002846 @classmethod
2847 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002848 logger = multiprocessing.get_logger()
2849 conn.send(logger.getEffectiveLevel())
2850
2851 def test_level(self):
2852 LEVEL1 = 32
2853 LEVEL2 = 37
2854
2855 logger = multiprocessing.get_logger()
2856 root_logger = logging.getLogger()
2857 root_level = root_logger.level
2858
2859 reader, writer = multiprocessing.Pipe(duplex=False)
2860
2861 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002862 p = self.Process(target=self._test_level, args=(writer,))
2863 p.daemon = True
2864 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002865 self.assertEqual(LEVEL1, reader.recv())
2866
2867 logger.setLevel(logging.NOTSET)
2868 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002869 p = self.Process(target=self._test_level, args=(writer,))
2870 p.daemon = True
2871 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002872 self.assertEqual(LEVEL2, reader.recv())
2873
2874 root_logger.setLevel(root_level)
2875 logger.setLevel(level=LOG_LEVEL)
2876
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002877
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002878# class _TestLoggingProcessName(BaseTestCase):
2879#
2880# def handle(self, record):
2881# assert record.processName == multiprocessing.current_process().name
2882# self.__handled = True
2883#
2884# def test_logging(self):
2885# handler = logging.Handler()
2886# handler.handle = self.handle
2887# self.__handled = False
2888# # Bypass getLogger() and side-effects
2889# logger = logging.getLoggerClass()(
2890# 'multiprocessing.test.TestLoggingProcessName')
2891# logger.addHandler(handler)
2892# logger.propagate = False
2893#
2894# logger.warn('foo')
2895# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002896
Benjamin Petersone711caf2008-06-11 16:44:04 +00002897#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002898# Check that Process.join() retries if os.waitpid() fails with EINTR
2899#
2900
2901class _TestPollEintr(BaseTestCase):
2902
2903 ALLOWED_TYPES = ('processes',)
2904
2905 @classmethod
2906 def _killer(cls, pid):
2907 time.sleep(0.5)
2908 os.kill(pid, signal.SIGUSR1)
2909
2910 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2911 def test_poll_eintr(self):
2912 got_signal = [False]
2913 def record(*args):
2914 got_signal[0] = True
2915 pid = os.getpid()
2916 oldhandler = signal.signal(signal.SIGUSR1, record)
2917 try:
2918 killer = self.Process(target=self._killer, args=(pid,))
2919 killer.start()
2920 p = self.Process(target=time.sleep, args=(1,))
2921 p.start()
2922 p.join()
2923 self.assertTrue(got_signal[0])
2924 self.assertEqual(p.exitcode, 0)
2925 killer.join()
2926 finally:
2927 signal.signal(signal.SIGUSR1, oldhandler)
2928
2929#
Jesse Noller6214edd2009-01-19 16:23:53 +00002930# Test to verify handle verification, see issue 3321
2931#
2932
2933class TestInvalidHandle(unittest.TestCase):
2934
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002935 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002936 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002937 conn = multiprocessing.connection.Connection(44977608)
2938 try:
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002939 self.assertRaises((ValueError, OSError), conn.poll)
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002940 finally:
2941 # Hack private attribute _handle to avoid printing an error
2942 # in conn.__del__
2943 conn._handle = None
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002944 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002945 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002946
Jesse Noller6214edd2009-01-19 16:23:53 +00002947#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002948# Functions used to create test cases from the base ones in this module
2949#
2950
Benjamin Petersone711caf2008-06-11 16:44:04 +00002951def create_test_cases(Mixin, type):
2952 result = {}
2953 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002954 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002955 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002956
2957 for name in list(glob.keys()):
2958 if name.startswith('_Test'):
2959 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002960 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002961 if type in base.ALLOWED_TYPES:
2962 newname = 'With' + Type + name[1:]
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002963 class Temp(base, Mixin, unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002964 pass
2965 result[newname] = Temp
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002966 Temp.__name__ = Temp.__qualname__ = newname
Benjamin Petersone711caf2008-06-11 16:44:04 +00002967 Temp.__module__ = Mixin.__module__
2968 return result
2969
2970#
2971# Create test cases
2972#
2973
2974class ProcessesMixin(object):
2975 TYPE = 'processes'
2976 Process = multiprocessing.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002977 connection = multiprocessing.connection
2978 current_process = staticmethod(multiprocessing.current_process)
2979 active_children = staticmethod(multiprocessing.active_children)
2980 Pool = staticmethod(multiprocessing.Pool)
2981 Pipe = staticmethod(multiprocessing.Pipe)
2982 Queue = staticmethod(multiprocessing.Queue)
2983 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2984 Lock = staticmethod(multiprocessing.Lock)
2985 RLock = staticmethod(multiprocessing.RLock)
2986 Semaphore = staticmethod(multiprocessing.Semaphore)
2987 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2988 Condition = staticmethod(multiprocessing.Condition)
2989 Event = staticmethod(multiprocessing.Event)
2990 Barrier = staticmethod(multiprocessing.Barrier)
2991 Value = staticmethod(multiprocessing.Value)
2992 Array = staticmethod(multiprocessing.Array)
2993 RawValue = staticmethod(multiprocessing.RawValue)
2994 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002995
2996testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2997globals().update(testcases_processes)
2998
2999
3000class ManagerMixin(object):
3001 TYPE = 'manager'
3002 Process = multiprocessing.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003003 Queue = property(operator.attrgetter('manager.Queue'))
3004 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3005 Lock = property(operator.attrgetter('manager.Lock'))
3006 RLock = property(operator.attrgetter('manager.RLock'))
3007 Semaphore = property(operator.attrgetter('manager.Semaphore'))
3008 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3009 Condition = property(operator.attrgetter('manager.Condition'))
3010 Event = property(operator.attrgetter('manager.Event'))
3011 Barrier = property(operator.attrgetter('manager.Barrier'))
3012 Value = property(operator.attrgetter('manager.Value'))
3013 Array = property(operator.attrgetter('manager.Array'))
3014 list = property(operator.attrgetter('manager.list'))
3015 dict = property(operator.attrgetter('manager.dict'))
3016 Namespace = property(operator.attrgetter('manager.Namespace'))
3017
3018 @classmethod
3019 def Pool(cls, *args, **kwds):
3020 return cls.manager.Pool(*args, **kwds)
3021
3022 @classmethod
3023 def setUpClass(cls):
3024 cls.manager = multiprocessing.Manager()
3025
3026 @classmethod
3027 def tearDownClass(cls):
3028 # only the manager process should be returned by active_children()
3029 # but this can take a bit on slow machines, so wait a few seconds
3030 # if there are other children too (see #17395)
3031 t = 0.01
3032 while len(multiprocessing.active_children()) > 1 and t < 5:
3033 time.sleep(t)
3034 t *= 2
3035 gc.collect() # do garbage collection
3036 if cls.manager._number_of_objects() != 0:
3037 # This is not really an error since some tests do not
3038 # ensure that all processes which hold a reference to a
3039 # managed object have been joined.
3040 print('Shared objects which still exist at manager shutdown:')
3041 print(cls.manager._debug_info())
3042 cls.manager.shutdown()
3043 cls.manager.join()
3044 cls.manager = None
Benjamin Petersone711caf2008-06-11 16:44:04 +00003045
3046testcases_manager = create_test_cases(ManagerMixin, type='manager')
3047globals().update(testcases_manager)
3048
3049
3050class ThreadsMixin(object):
3051 TYPE = 'threads'
3052 Process = multiprocessing.dummy.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003053 connection = multiprocessing.dummy.connection
3054 current_process = staticmethod(multiprocessing.dummy.current_process)
3055 active_children = staticmethod(multiprocessing.dummy.active_children)
3056 Pool = staticmethod(multiprocessing.Pool)
3057 Pipe = staticmethod(multiprocessing.dummy.Pipe)
3058 Queue = staticmethod(multiprocessing.dummy.Queue)
3059 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3060 Lock = staticmethod(multiprocessing.dummy.Lock)
3061 RLock = staticmethod(multiprocessing.dummy.RLock)
3062 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3063 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3064 Condition = staticmethod(multiprocessing.dummy.Condition)
3065 Event = staticmethod(multiprocessing.dummy.Event)
3066 Barrier = staticmethod(multiprocessing.dummy.Barrier)
3067 Value = staticmethod(multiprocessing.dummy.Value)
3068 Array = staticmethod(multiprocessing.dummy.Array)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003069
3070testcases_threads = create_test_cases(ThreadsMixin, type='threads')
3071globals().update(testcases_threads)
3072
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003073
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003074class OtherTest(unittest.TestCase):
3075 # TODO: add more tests for deliver/answer challenge.
3076 def test_deliver_challenge_auth_failure(self):
3077 class _FakeConnection(object):
3078 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003079 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003080 def send_bytes(self, data):
3081 pass
3082 self.assertRaises(multiprocessing.AuthenticationError,
3083 multiprocessing.connection.deliver_challenge,
3084 _FakeConnection(), b'abc')
3085
3086 def test_answer_challenge_auth_failure(self):
3087 class _FakeConnection(object):
3088 def __init__(self):
3089 self.count = 0
3090 def recv_bytes(self, size):
3091 self.count += 1
3092 if self.count == 1:
3093 return multiprocessing.connection.CHALLENGE
3094 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003095 return b'something bogus'
3096 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003097 def send_bytes(self, data):
3098 pass
3099 self.assertRaises(multiprocessing.AuthenticationError,
3100 multiprocessing.connection.answer_challenge,
3101 _FakeConnection(), b'abc')
3102
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003103#
3104# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3105#
3106
3107def initializer(ns):
3108 ns.test += 1
3109
3110class TestInitializers(unittest.TestCase):
3111 def setUp(self):
3112 self.mgr = multiprocessing.Manager()
3113 self.ns = self.mgr.Namespace()
3114 self.ns.test = 0
3115
3116 def tearDown(self):
3117 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003118 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003119
3120 def test_manager_initializer(self):
3121 m = multiprocessing.managers.SyncManager()
3122 self.assertRaises(TypeError, m.start, 1)
3123 m.start(initializer, (self.ns,))
3124 self.assertEqual(self.ns.test, 1)
3125 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003126 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003127
3128 def test_pool_initializer(self):
3129 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3130 p = multiprocessing.Pool(1, initializer, (self.ns,))
3131 p.close()
3132 p.join()
3133 self.assertEqual(self.ns.test, 1)
3134
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003135#
3136# Issue 5155, 5313, 5331: Test process in processes
3137# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3138#
3139
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003140def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003141 try:
3142 item = q.get(block=False)
3143 except pyqueue.Empty:
3144 pass
3145
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003146def _test_process(q):
3147 queue = multiprocessing.Queue()
3148 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3149 subProc.daemon = True
3150 subProc.start()
3151 subProc.join()
3152
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003153def _afunc(x):
3154 return x*x
3155
3156def pool_in_process():
3157 pool = multiprocessing.Pool(processes=4)
3158 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003159 pool.close()
3160 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003161
3162class _file_like(object):
3163 def __init__(self, delegate):
3164 self._delegate = delegate
3165 self._pid = None
3166
3167 @property
3168 def cache(self):
3169 pid = os.getpid()
3170 # There are no race conditions since fork keeps only the running thread
3171 if pid != self._pid:
3172 self._pid = pid
3173 self._cache = []
3174 return self._cache
3175
3176 def write(self, data):
3177 self.cache.append(data)
3178
3179 def flush(self):
3180 self._delegate.write(''.join(self.cache))
3181 self._cache = []
3182
3183class TestStdinBadfiledescriptor(unittest.TestCase):
3184
3185 def test_queue_in_process(self):
3186 queue = multiprocessing.Queue()
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003187 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003188 proc.start()
3189 proc.join()
3190
3191 def test_pool_in_process(self):
3192 p = multiprocessing.Process(target=pool_in_process)
3193 p.start()
3194 p.join()
3195
3196 def test_flushing(self):
3197 sio = io.StringIO()
3198 flike = _file_like(sio)
3199 flike.write('foo')
3200 proc = multiprocessing.Process(target=lambda: flike.flush())
3201 flike.flush()
3202 assert sio.getvalue() == 'foo'
3203
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003204
3205class TestWait(unittest.TestCase):
3206
3207 @classmethod
3208 def _child_test_wait(cls, w, slow):
3209 for i in range(10):
3210 if slow:
3211 time.sleep(random.random()*0.1)
3212 w.send((i, os.getpid()))
3213 w.close()
3214
3215 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003216 from multiprocessing.connection import wait
3217 readers = []
3218 procs = []
3219 messages = []
3220
3221 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003222 r, w = multiprocessing.Pipe(duplex=False)
3223 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003224 p.daemon = True
3225 p.start()
3226 w.close()
3227 readers.append(r)
3228 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003229 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003230
3231 while readers:
3232 for r in wait(readers):
3233 try:
3234 msg = r.recv()
3235 except EOFError:
3236 readers.remove(r)
3237 r.close()
3238 else:
3239 messages.append(msg)
3240
3241 messages.sort()
3242 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3243 self.assertEqual(messages, expected)
3244
3245 @classmethod
3246 def _child_test_wait_socket(cls, address, slow):
3247 s = socket.socket()
3248 s.connect(address)
3249 for i in range(10):
3250 if slow:
3251 time.sleep(random.random()*0.1)
3252 s.sendall(('%s\n' % i).encode('ascii'))
3253 s.close()
3254
3255 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003256 from multiprocessing.connection import wait
3257 l = socket.socket()
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02003258 l.bind((test.support.HOST, 0))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003259 l.listen(4)
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02003260 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003261 readers = []
3262 procs = []
3263 dic = {}
3264
3265 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003266 p = multiprocessing.Process(target=self._child_test_wait_socket,
3267 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003268 p.daemon = True
3269 p.start()
3270 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003271 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003272
3273 for i in range(4):
3274 r, _ = l.accept()
3275 readers.append(r)
3276 dic[r] = []
3277 l.close()
3278
3279 while readers:
3280 for r in wait(readers):
3281 msg = r.recv(32)
3282 if not msg:
3283 readers.remove(r)
3284 r.close()
3285 else:
3286 dic[r].append(msg)
3287
3288 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3289 for v in dic.values():
3290 self.assertEqual(b''.join(v), expected)
3291
3292 def test_wait_slow(self):
3293 self.test_wait(True)
3294
3295 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003296 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003297
3298 def test_wait_timeout(self):
3299 from multiprocessing.connection import wait
3300
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003301 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003302 a, b = multiprocessing.Pipe()
3303
3304 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003305 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003306 delta = time.time() - start
3307
3308 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003309 self.assertLess(delta, expected * 2)
3310 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003311
3312 b.send(None)
3313
3314 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003315 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003316 delta = time.time() - start
3317
3318 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003319 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003320
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003321 @classmethod
3322 def signal_and_sleep(cls, sem, period):
3323 sem.release()
3324 time.sleep(period)
3325
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003326 def test_wait_integer(self):
3327 from multiprocessing.connection import wait
3328
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003329 expected = 3
Giampaolo Rodola'67da8942013-01-14 02:24:25 +01003330 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003331 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003332 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003333 p = multiprocessing.Process(target=self.signal_and_sleep,
3334 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003335
3336 p.start()
3337 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003338 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003339
3340 start = time.time()
3341 res = wait([a, p.sentinel, b], expected + 20)
3342 delta = time.time() - start
3343
3344 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003345 self.assertLess(delta, expected + 2)
3346 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003347
3348 a.send(None)
3349
3350 start = time.time()
3351 res = wait([a, p.sentinel, b], 20)
3352 delta = time.time() - start
3353
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003354 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003355 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003356
3357 b.send(None)
3358
3359 start = time.time()
3360 res = wait([a, p.sentinel, b], 20)
3361 delta = time.time() - start
3362
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003363 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003364 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003365
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003366 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003367 p.join()
3368
Richard Oudkerk59d54042012-05-10 16:11:12 +01003369 def test_neg_timeout(self):
3370 from multiprocessing.connection import wait
3371 a, b = multiprocessing.Pipe()
3372 t = time.time()
3373 res = wait([a], timeout=-1)
3374 t = time.time() - t
3375 self.assertEqual(res, [])
3376 self.assertLess(t, 1)
3377 a.close()
3378 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003379
Antoine Pitrou709176f2012-04-01 17:19:09 +02003380#
3381# Issue 14151: Test invalid family on invalid environment
3382#
3383
3384class TestInvalidFamily(unittest.TestCase):
3385
3386 @unittest.skipIf(WIN32, "skipped on Windows")
3387 def test_invalid_family(self):
3388 with self.assertRaises(ValueError):
3389 multiprocessing.connection.Listener(r'\\.\test')
3390
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003391 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3392 def test_invalid_family_win32(self):
3393 with self.assertRaises(ValueError):
3394 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003395
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003396#
3397# Issue 12098: check sys.flags of child matches that for parent
3398#
3399
3400class TestFlags(unittest.TestCase):
3401 @classmethod
3402 def run_in_grandchild(cls, conn):
3403 conn.send(tuple(sys.flags))
3404
3405 @classmethod
3406 def run_in_child(cls):
3407 import json
3408 r, w = multiprocessing.Pipe(duplex=False)
3409 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3410 p.start()
3411 grandchild_flags = r.recv()
3412 p.join()
3413 r.close()
3414 w.close()
3415 flags = (tuple(sys.flags), grandchild_flags)
3416 print(json.dumps(flags))
3417
3418 def test_flags(self):
3419 import json, subprocess
3420 # start child process using unusual flags
3421 prog = ('from test.test_multiprocessing import TestFlags; ' +
3422 'TestFlags.run_in_child()')
3423 data = subprocess.check_output(
3424 [sys.executable, '-E', '-S', '-O', '-c', prog])
3425 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3426 self.assertEqual(child_flags, grandchild_flags)
3427
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003428#
3429# Test interaction with socket timeouts - see Issue #6056
3430#
3431
3432class TestTimeouts(unittest.TestCase):
3433 @classmethod
3434 def _test_timeout(cls, child, address):
3435 time.sleep(1)
3436 child.send(123)
3437 child.close()
3438 conn = multiprocessing.connection.Client(address)
3439 conn.send(456)
3440 conn.close()
3441
3442 def test_timeout(self):
3443 old_timeout = socket.getdefaulttimeout()
3444 try:
3445 socket.setdefaulttimeout(0.1)
3446 parent, child = multiprocessing.Pipe(duplex=True)
3447 l = multiprocessing.connection.Listener(family='AF_INET')
3448 p = multiprocessing.Process(target=self._test_timeout,
3449 args=(child, l.address))
3450 p.start()
3451 child.close()
3452 self.assertEqual(parent.recv(), 123)
3453 parent.close()
3454 conn = l.accept()
3455 self.assertEqual(conn.recv(), 456)
3456 conn.close()
3457 l.close()
3458 p.join(10)
3459 finally:
3460 socket.setdefaulttimeout(old_timeout)
3461
Richard Oudkerke88a2442012-08-14 11:41:32 +01003462#
3463# Test what happens with no "if __name__ == '__main__'"
3464#
3465
3466class TestNoForkBomb(unittest.TestCase):
3467 def test_noforkbomb(self):
3468 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3469 if WIN32:
3470 rc, out, err = test.script_helper.assert_python_failure(name)
3471 self.assertEqual('', out.decode('ascii'))
3472 self.assertIn('RuntimeError', err.decode('ascii'))
3473 else:
3474 rc, out, err = test.script_helper.assert_python_ok(name)
3475 self.assertEqual('123', out.decode('ascii').rstrip())
3476 self.assertEqual('', err.decode('ascii'))
3477
3478#
Richard Oudkerk409c3132013-04-17 20:58:00 +01003479# Issue #17555: ForkAwareThreadLock
3480#
3481
3482class TestForkAwareThreadLock(unittest.TestCase):
3483 # We recurisvely start processes. Issue #17555 meant that the
3484 # after fork registry would get duplicate entries for the same
3485 # lock. The size of the registry at generation n was ~2**n.
3486
3487 @classmethod
3488 def child(cls, n, conn):
3489 if n > 1:
3490 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3491 p.start()
3492 p.join()
3493 else:
3494 conn.send(len(util._afterfork_registry))
3495 conn.close()
3496
3497 def test_lock(self):
3498 r, w = multiprocessing.Pipe(False)
3499 l = util.ForkAwareThreadLock()
3500 old_size = len(util._afterfork_registry)
3501 p = multiprocessing.Process(target=self.child, args=(5, w))
3502 p.start()
3503 new_size = r.recv()
3504 p.join()
3505 self.assertLessEqual(new_size, old_size)
3506
3507#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003508# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3509#
3510
3511class TestIgnoreEINTR(unittest.TestCase):
3512
3513 @classmethod
3514 def _test_ignore(cls, conn):
3515 def handler(signum, frame):
3516 pass
3517 signal.signal(signal.SIGUSR1, handler)
3518 conn.send('ready')
3519 x = conn.recv()
3520 conn.send(x)
3521 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
3522
3523 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3524 def test_ignore(self):
3525 conn, child_conn = multiprocessing.Pipe()
3526 try:
3527 p = multiprocessing.Process(target=self._test_ignore,
3528 args=(child_conn,))
3529 p.daemon = True
3530 p.start()
3531 child_conn.close()
3532 self.assertEqual(conn.recv(), 'ready')
3533 time.sleep(0.1)
3534 os.kill(p.pid, signal.SIGUSR1)
3535 time.sleep(0.1)
3536 conn.send(1234)
3537 self.assertEqual(conn.recv(), 1234)
3538 time.sleep(0.1)
3539 os.kill(p.pid, signal.SIGUSR1)
3540 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3541 time.sleep(0.1)
3542 p.join()
3543 finally:
3544 conn.close()
3545
3546 @classmethod
3547 def _test_ignore_listener(cls, conn):
3548 def handler(signum, frame):
3549 pass
3550 signal.signal(signal.SIGUSR1, handler)
3551 l = multiprocessing.connection.Listener()
3552 conn.send(l.address)
3553 a = l.accept()
3554 a.send('welcome')
3555
3556 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3557 def test_ignore_listener(self):
3558 conn, child_conn = multiprocessing.Pipe()
3559 try:
3560 p = multiprocessing.Process(target=self._test_ignore_listener,
3561 args=(child_conn,))
3562 p.daemon = True
3563 p.start()
3564 child_conn.close()
3565 address = conn.recv()
3566 time.sleep(0.1)
3567 os.kill(p.pid, signal.SIGUSR1)
3568 time.sleep(0.1)
3569 client = multiprocessing.connection.Client(address)
3570 self.assertEqual(client.recv(), 'welcome')
3571 p.join()
3572 finally:
3573 conn.close()
3574
3575#
Richard Oudkerke88a2442012-08-14 11:41:32 +01003576#
3577#
3578
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003579def setUpModule():
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003580 if sys.platform.startswith("linux"):
3581 try:
3582 lock = multiprocessing.RLock()
3583 except OSError:
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003584 raise unittest.SkipTest("OSError raises on RLock creation, "
3585 "see issue 3111!")
Charles-François Natali221ef672011-11-22 18:55:22 +01003586 check_enough_semaphores()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003587 util.get_temp_dir() # creates temp directory for use by all processes
Benjamin Petersone711caf2008-06-11 16:44:04 +00003588 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3589
Benjamin Petersone711caf2008-06-11 16:44:04 +00003590
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003591def tearDownModule():
3592 # pause a bit so we don't get warning about dangling threads/processes
3593 time.sleep(0.5)
3594
3595
Benjamin Petersone711caf2008-06-11 16:44:04 +00003596if __name__ == '__main__':
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003597 unittest.main()