blob: 27db029b3c65f663c55dff1959caf4d6a6913eb8 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkd15642e2013-07-16 15:33:41 +010022import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000023import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010024import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000025
Benjamin Petersone5384b02008-10-04 22:00:42 +000026
R. David Murraya21e4ca2009-03-31 23:16:50 +000027# Skip tests if _multiprocessing wasn't built.
28_multiprocessing = test.support.import_module('_multiprocessing')
29# Skip tests if sem_open implementation is broken.
30test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000031# import threading after _multiprocessing to raise a more revelant error
32# message: "No module named _multiprocessing". _multiprocessing is not compiled
33# without thread support.
34import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000035
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.dummy
37import multiprocessing.connection
38import multiprocessing.managers
39import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Charles-François Natalibc8f0822011-09-20 20:36:51 +020042from multiprocessing import util
43
44try:
45 from multiprocessing import reduction
46 HAS_REDUCTION = True
47except ImportError:
48 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Brian Curtinafa88b52010-10-07 01:12:19 +000050try:
51 from multiprocessing.sharedctypes import Value, copy
52 HAS_SHAREDCTYPES = True
53except ImportError:
54 HAS_SHAREDCTYPES = False
55
Antoine Pitroubcb39d42011-08-23 19:46:22 +020056try:
57 import msvcrt
58except ImportError:
59 msvcrt = None
60
Benjamin Petersone711caf2008-06-11 16:44:04 +000061#
62#
63#
64
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000065def latin(s):
66 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000067
Benjamin Petersone711caf2008-06-11 16:44:04 +000068#
69# Constants
70#
71
72LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000073#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000074
75DELTA = 0.1
76CHECK_TIMINGS = False # making true makes tests take a lot longer
77 # and can sometimes cause some non-serious
78 # failures because some calls block a bit
79 # longer than expected
80if CHECK_TIMINGS:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
82else:
83 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
84
85HAVE_GETVALUE = not getattr(_multiprocessing,
86 'HAVE_BROKEN_SEM_GETVALUE', False)
87
Jesse Noller6214edd2009-01-19 16:23:53 +000088WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020091
Richard Oudkerk59d54042012-05-10 16:11:12 +010092def wait_for_handle(handle, timeout):
93 if timeout is not None and timeout < 0.0:
94 timeout = None
95 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000096
Antoine Pitroubcb39d42011-08-23 19:46:22 +020097try:
98 MAXFD = os.sysconf("SC_OPEN_MAX")
99except:
100 MAXFD = 256
101
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000103# Some tests require ctypes
104#
105
106try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000107 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000108except ImportError:
109 Structure = object
110 c_int = c_double = None
111
Charles-François Natali221ef672011-11-22 18:55:22 +0100112
113def check_enough_semaphores():
114 """Check that the system supports enough semaphores to run the test."""
115 # minimum number of semaphores available according to POSIX
116 nsems_min = 256
117 try:
118 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
119 except (AttributeError, ValueError):
120 # sysconf not available or setting not available
121 return
122 if nsems == -1 or nsems >= nsems_min:
123 return
124 raise unittest.SkipTest("The OS doesn't support enough semaphores "
125 "to run the test (required: %d)." % nsems_min)
126
127
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000128#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129# Creates a wrapper for a function which records the time it takes to finish
130#
131
132class TimingWrapper(object):
133
134 def __init__(self, func):
135 self.func = func
136 self.elapsed = None
137
138 def __call__(self, *args, **kwds):
139 t = time.time()
140 try:
141 return self.func(*args, **kwds)
142 finally:
143 self.elapsed = time.time() - t
144
145#
146# Base class for test cases
147#
148
149class BaseTestCase(object):
150
151 ALLOWED_TYPES = ('processes', 'manager', 'threads')
152
153 def assertTimingAlmostEqual(self, a, b):
154 if CHECK_TIMINGS:
155 self.assertAlmostEqual(a, b, 1)
156
157 def assertReturnsIfImplemented(self, value, func, *args):
158 try:
159 res = func(*args)
160 except NotImplementedError:
161 pass
162 else:
163 return self.assertEqual(value, res)
164
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000165 # For the sanity of Windows users, rather than crashing or freezing in
166 # multiple ways.
167 def __reduce__(self, *args):
168 raise NotImplementedError("shouldn't try to pickle a test case")
169
170 __reduce_ex__ = __reduce__
171
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172#
173# Return the value of a semaphore
174#
175
176def get_value(self):
177 try:
178 return self.get_value()
179 except AttributeError:
180 try:
181 return self._Semaphore__value
182 except AttributeError:
183 try:
184 return self._value
185 except AttributeError:
186 raise NotImplementedError
187
188#
189# Testcases
190#
191
192class _TestProcess(BaseTestCase):
193
194 ALLOWED_TYPES = ('processes', 'threads')
195
196 def test_current(self):
197 if self.TYPE == 'threads':
198 return
199
200 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000205 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEqual(current.ident, os.getpid())
208 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000210 def test_daemon_argument(self):
211 if self.TYPE == "threads":
212 return
213
214 # By default uses the current process's daemon flag.
215 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000216 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 proc1 = self.Process(target=self._test, daemon=True)
218 self.assertTrue(proc1.daemon)
219 proc2 = self.Process(target=self._test, daemon=False)
220 self.assertFalse(proc2.daemon)
221
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000222 @classmethod
223 def _test(cls, q, *args, **kwds):
224 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 q.put(args)
226 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230 q.put(current.pid)
231
232 def test_process(self):
233 q = self.Queue(1)
234 e = self.Event()
235 args = (q, 1, 2)
236 kwargs = {'hello':23, 'bye':2.54}
237 name = 'SomeProcess'
238 p = self.Process(
239 target=self._test, args=args, kwargs=kwargs, name=name
240 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242 current = self.current_process()
243
244 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(p.authkey, current.authkey)
246 self.assertEqual(p.is_alive(), False)
247 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000250 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 p.start()
253
Ezio Melottib3aedd42010-11-20 19:04:17 +0000254 self.assertEqual(p.exitcode, None)
255 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000256 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(q.get(), args[1:])
259 self.assertEqual(q.get(), kwargs)
260 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000262 self.assertEqual(q.get(), current.authkey)
263 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
265 p.join()
266
Ezio Melottib3aedd42010-11-20 19:04:17 +0000267 self.assertEqual(p.exitcode, 0)
268 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 @classmethod
272 def _test_terminate(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100273 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274
275 def test_terminate(self):
276 if self.TYPE == 'threads':
277 return
278
279 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000280 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000281 p.start()
282
283 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000285 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286
Richard Oudkerk59d54042012-05-10 16:11:12 +0100287 join = TimingWrapper(p.join)
288
289 self.assertEqual(join(0), None)
290 self.assertTimingAlmostEqual(join.elapsed, 0.0)
291 self.assertEqual(p.is_alive(), True)
292
293 self.assertEqual(join(-1), None)
294 self.assertTimingAlmostEqual(join.elapsed, 0.0)
295 self.assertEqual(p.is_alive(), True)
296
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 p.terminate()
298
Richard Oudkerk4f350792013-10-13 00:49:27 +0100299 if hasattr(signal, 'alarm'):
300 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100301 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100302 old_handler = signal.signal(signal.SIGALRM, handler)
303 try:
304 signal.alarm(10)
305 self.assertEqual(join(), None)
306 signal.alarm(0)
307 finally:
308 signal.signal(signal.SIGALRM, old_handler)
309 else:
310 self.assertEqual(join(), None)
311
Benjamin Petersone711caf2008-06-11 16:44:04 +0000312 self.assertTimingAlmostEqual(join.elapsed, 0.0)
313
314 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000315 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000316
317 p.join()
318
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000319 # XXX sometimes get p.exitcode == 0 on Windows ...
320 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321
322 def test_cpu_count(self):
323 try:
324 cpus = multiprocessing.cpu_count()
325 except NotImplementedError:
326 cpus = 1
327 self.assertTrue(type(cpus) is int)
328 self.assertTrue(cpus >= 1)
329
330 def test_active_children(self):
331 self.assertEqual(type(self.active_children()), list)
332
333 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000334 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000335
Jesus Cea94f964f2011-09-09 20:26:57 +0200336 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000338 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339
340 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000341 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000342
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000343 @classmethod
344 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345 from multiprocessing import forking
346 wconn.send(id)
347 if len(id) < 2:
348 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000349 p = cls.Process(
350 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000351 )
352 p.start()
353 p.join()
354
355 def test_recursion(self):
356 rconn, wconn = self.Pipe(duplex=False)
357 self._test_recursion(wconn, [])
358
359 time.sleep(DELTA)
360 result = []
361 while rconn.poll():
362 result.append(rconn.recv())
363
364 expected = [
365 [],
366 [0],
367 [0, 0],
368 [0, 1],
369 [1],
370 [1, 0],
371 [1, 1]
372 ]
373 self.assertEqual(result, expected)
374
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200375 @classmethod
376 def _test_sentinel(cls, event):
377 event.wait(10.0)
378
379 def test_sentinel(self):
380 if self.TYPE == "threads":
381 return
382 event = self.Event()
383 p = self.Process(target=self._test_sentinel, args=(event,))
384 with self.assertRaises(ValueError):
385 p.sentinel
386 p.start()
387 self.addCleanup(p.join)
388 sentinel = p.sentinel
389 self.assertIsInstance(sentinel, int)
390 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
391 event.set()
392 p.join()
393 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
394
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395#
396#
397#
398
399class _UpperCaser(multiprocessing.Process):
400
401 def __init__(self):
402 multiprocessing.Process.__init__(self)
403 self.child_conn, self.parent_conn = multiprocessing.Pipe()
404
405 def run(self):
406 self.parent_conn.close()
407 for s in iter(self.child_conn.recv, None):
408 self.child_conn.send(s.upper())
409 self.child_conn.close()
410
411 def submit(self, s):
412 assert type(s) is str
413 self.parent_conn.send(s)
414 return self.parent_conn.recv()
415
416 def stop(self):
417 self.parent_conn.send(None)
418 self.parent_conn.close()
419 self.child_conn.close()
420
421class _TestSubclassingProcess(BaseTestCase):
422
423 ALLOWED_TYPES = ('processes',)
424
425 def test_subclassing(self):
426 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200427 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000428 uppercaser.start()
429 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
430 self.assertEqual(uppercaser.submit('world'), 'WORLD')
431 uppercaser.stop()
432 uppercaser.join()
433
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100434 def test_stderr_flush(self):
435 # sys.stderr is flushed at process shutdown (issue #13812)
436 if self.TYPE == "threads":
437 return
438
439 testfn = test.support.TESTFN
440 self.addCleanup(test.support.unlink, testfn)
441 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
442 proc.start()
443 proc.join()
444 with open(testfn, 'r') as f:
445 err = f.read()
446 # The whole traceback was printed
447 self.assertIn("ZeroDivisionError", err)
448 self.assertIn("test_multiprocessing.py", err)
449 self.assertIn("1/0 # MARKER", err)
450
451 @classmethod
452 def _test_stderr_flush(cls, testfn):
453 sys.stderr = open(testfn, 'w')
454 1/0 # MARKER
455
456
Richard Oudkerk29471de2012-06-06 19:04:57 +0100457 @classmethod
458 def _test_sys_exit(cls, reason, testfn):
459 sys.stderr = open(testfn, 'w')
460 sys.exit(reason)
461
462 def test_sys_exit(self):
463 # See Issue 13854
464 if self.TYPE == 'threads':
465 return
466
467 testfn = test.support.TESTFN
468 self.addCleanup(test.support.unlink, testfn)
469
Richard Oudkerk8731d7b2013-11-17 17:24:11 +0000470 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
Richard Oudkerk29471de2012-06-06 19:04:57 +0100471 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
472 p.daemon = True
473 p.start()
474 p.join(5)
475 self.assertEqual(p.exitcode, code)
476
477 with open(testfn, 'r') as f:
478 self.assertEqual(f.read().rstrip(), str(reason))
479
480 for reason in (True, False, 8):
481 p = self.Process(target=sys.exit, args=(reason,))
482 p.daemon = True
483 p.start()
484 p.join(5)
485 self.assertEqual(p.exitcode, reason)
486
Benjamin Petersone711caf2008-06-11 16:44:04 +0000487#
488#
489#
490
491def queue_empty(q):
492 if hasattr(q, 'empty'):
493 return q.empty()
494 else:
495 return q.qsize() == 0
496
497def queue_full(q, maxsize):
498 if hasattr(q, 'full'):
499 return q.full()
500 else:
501 return q.qsize() == maxsize
502
503
504class _TestQueue(BaseTestCase):
505
506
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000507 @classmethod
508 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000509 child_can_start.wait()
510 for i in range(6):
511 queue.get()
512 parent_can_continue.set()
513
514 def test_put(self):
515 MAXSIZE = 6
516 queue = self.Queue(maxsize=MAXSIZE)
517 child_can_start = self.Event()
518 parent_can_continue = self.Event()
519
520 proc = self.Process(
521 target=self._test_put,
522 args=(queue, child_can_start, parent_can_continue)
523 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000524 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000525 proc.start()
526
527 self.assertEqual(queue_empty(queue), True)
528 self.assertEqual(queue_full(queue, MAXSIZE), False)
529
530 queue.put(1)
531 queue.put(2, True)
532 queue.put(3, True, None)
533 queue.put(4, False)
534 queue.put(5, False, None)
535 queue.put_nowait(6)
536
537 # the values may be in buffer but not yet in pipe so sleep a bit
538 time.sleep(DELTA)
539
540 self.assertEqual(queue_empty(queue), False)
541 self.assertEqual(queue_full(queue, MAXSIZE), True)
542
543 put = TimingWrapper(queue.put)
544 put_nowait = TimingWrapper(queue.put_nowait)
545
546 self.assertRaises(pyqueue.Full, put, 7, False)
547 self.assertTimingAlmostEqual(put.elapsed, 0)
548
549 self.assertRaises(pyqueue.Full, put, 7, False, None)
550 self.assertTimingAlmostEqual(put.elapsed, 0)
551
552 self.assertRaises(pyqueue.Full, put_nowait, 7)
553 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
554
555 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
556 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
557
558 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
559 self.assertTimingAlmostEqual(put.elapsed, 0)
560
561 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
562 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
563
564 child_can_start.set()
565 parent_can_continue.wait()
566
567 self.assertEqual(queue_empty(queue), True)
568 self.assertEqual(queue_full(queue, MAXSIZE), False)
569
570 proc.join()
571
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000572 @classmethod
573 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000574 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000575 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000576 queue.put(2)
577 queue.put(3)
578 queue.put(4)
579 queue.put(5)
580 parent_can_continue.set()
581
582 def test_get(self):
583 queue = self.Queue()
584 child_can_start = self.Event()
585 parent_can_continue = self.Event()
586
587 proc = self.Process(
588 target=self._test_get,
589 args=(queue, child_can_start, parent_can_continue)
590 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000591 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 proc.start()
593
594 self.assertEqual(queue_empty(queue), True)
595
596 child_can_start.set()
597 parent_can_continue.wait()
598
599 time.sleep(DELTA)
600 self.assertEqual(queue_empty(queue), False)
601
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000602 # Hangs unexpectedly, remove for now
603 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000604 self.assertEqual(queue.get(True, None), 2)
605 self.assertEqual(queue.get(True), 3)
606 self.assertEqual(queue.get(timeout=1), 4)
607 self.assertEqual(queue.get_nowait(), 5)
608
609 self.assertEqual(queue_empty(queue), True)
610
611 get = TimingWrapper(queue.get)
612 get_nowait = TimingWrapper(queue.get_nowait)
613
614 self.assertRaises(pyqueue.Empty, get, False)
615 self.assertTimingAlmostEqual(get.elapsed, 0)
616
617 self.assertRaises(pyqueue.Empty, get, False, None)
618 self.assertTimingAlmostEqual(get.elapsed, 0)
619
620 self.assertRaises(pyqueue.Empty, get_nowait)
621 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
622
623 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
624 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
625
626 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
627 self.assertTimingAlmostEqual(get.elapsed, 0)
628
629 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
630 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
631
632 proc.join()
633
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000634 @classmethod
635 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000636 for i in range(10, 20):
637 queue.put(i)
638 # note that at this point the items may only be buffered, so the
639 # process cannot shutdown until the feeder thread has finished
640 # pushing items onto the pipe.
641
642 def test_fork(self):
643 # Old versions of Queue would fail to create a new feeder
644 # thread for a forked process if the original process had its
645 # own feeder thread. This test checks that this no longer
646 # happens.
647
648 queue = self.Queue()
649
650 # put items on queue so that main process starts a feeder thread
651 for i in range(10):
652 queue.put(i)
653
654 # wait to make sure thread starts before we fork a new process
655 time.sleep(DELTA)
656
657 # fork process
658 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200659 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000660 p.start()
661
662 # check that all expected items are in the queue
663 for i in range(20):
664 self.assertEqual(queue.get(), i)
665 self.assertRaises(pyqueue.Empty, queue.get, False)
666
667 p.join()
668
669 def test_qsize(self):
670 q = self.Queue()
671 try:
672 self.assertEqual(q.qsize(), 0)
673 except NotImplementedError:
674 return
675 q.put(1)
676 self.assertEqual(q.qsize(), 1)
677 q.put(5)
678 self.assertEqual(q.qsize(), 2)
679 q.get()
680 self.assertEqual(q.qsize(), 1)
681 q.get()
682 self.assertEqual(q.qsize(), 0)
683
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000684 @classmethod
685 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000686 for obj in iter(q.get, None):
687 time.sleep(DELTA)
688 q.task_done()
689
690 def test_task_done(self):
691 queue = self.JoinableQueue()
692
693 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000694 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000695
696 workers = [self.Process(target=self._test_task_done, args=(queue,))
697 for i in range(4)]
698
699 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200700 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701 p.start()
702
703 for i in range(10):
704 queue.put(i)
705
706 queue.join()
707
708 for p in workers:
709 queue.put(None)
710
711 for p in workers:
712 p.join()
713
Giampaolo Rodola'b38897f2013-04-17 13:08:59 +0200714 def test_timeout(self):
715 q = multiprocessing.Queue()
716 start = time.time()
717 self.assertRaises(pyqueue.Empty, q.get, True, 0.2)
718 delta = time.time() - start
Richard Oudkerkb8ec1e32013-11-02 16:46:32 +0000719 self.assertGreaterEqual(delta, 0.18)
Giampaolo Rodola'b38897f2013-04-17 13:08:59 +0200720
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721#
722#
723#
724
725class _TestLock(BaseTestCase):
726
727 def test_lock(self):
728 lock = self.Lock()
729 self.assertEqual(lock.acquire(), True)
730 self.assertEqual(lock.acquire(False), False)
731 self.assertEqual(lock.release(), None)
732 self.assertRaises((ValueError, threading.ThreadError), lock.release)
733
734 def test_rlock(self):
735 lock = self.RLock()
736 self.assertEqual(lock.acquire(), True)
737 self.assertEqual(lock.acquire(), True)
738 self.assertEqual(lock.acquire(), True)
739 self.assertEqual(lock.release(), None)
740 self.assertEqual(lock.release(), None)
741 self.assertEqual(lock.release(), None)
742 self.assertRaises((AssertionError, RuntimeError), lock.release)
743
Jesse Nollerf8d00852009-03-31 03:25:07 +0000744 def test_lock_context(self):
745 with self.Lock():
746 pass
747
Benjamin Petersone711caf2008-06-11 16:44:04 +0000748
749class _TestSemaphore(BaseTestCase):
750
751 def _test_semaphore(self, sem):
752 self.assertReturnsIfImplemented(2, get_value, sem)
753 self.assertEqual(sem.acquire(), True)
754 self.assertReturnsIfImplemented(1, get_value, sem)
755 self.assertEqual(sem.acquire(), True)
756 self.assertReturnsIfImplemented(0, get_value, sem)
757 self.assertEqual(sem.acquire(False), False)
758 self.assertReturnsIfImplemented(0, get_value, sem)
759 self.assertEqual(sem.release(), None)
760 self.assertReturnsIfImplemented(1, get_value, sem)
761 self.assertEqual(sem.release(), None)
762 self.assertReturnsIfImplemented(2, get_value, sem)
763
764 def test_semaphore(self):
765 sem = self.Semaphore(2)
766 self._test_semaphore(sem)
767 self.assertEqual(sem.release(), None)
768 self.assertReturnsIfImplemented(3, get_value, sem)
769 self.assertEqual(sem.release(), None)
770 self.assertReturnsIfImplemented(4, get_value, sem)
771
772 def test_bounded_semaphore(self):
773 sem = self.BoundedSemaphore(2)
774 self._test_semaphore(sem)
775 # Currently fails on OS/X
776 #if HAVE_GETVALUE:
777 # self.assertRaises(ValueError, sem.release)
778 # self.assertReturnsIfImplemented(2, get_value, sem)
779
780 def test_timeout(self):
781 if self.TYPE != 'processes':
782 return
783
784 sem = self.Semaphore(0)
785 acquire = TimingWrapper(sem.acquire)
786
787 self.assertEqual(acquire(False), False)
788 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
789
790 self.assertEqual(acquire(False, None), False)
791 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
792
793 self.assertEqual(acquire(False, TIMEOUT1), False)
794 self.assertTimingAlmostEqual(acquire.elapsed, 0)
795
796 self.assertEqual(acquire(True, TIMEOUT2), False)
797 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
798
799 self.assertEqual(acquire(timeout=TIMEOUT3), False)
800 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
801
802
803class _TestCondition(BaseTestCase):
804
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000805 @classmethod
806 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807 cond.acquire()
808 sleeping.release()
809 cond.wait(timeout)
810 woken.release()
811 cond.release()
812
813 def check_invariant(self, cond):
814 # this is only supposed to succeed when there are no sleepers
815 if self.TYPE == 'processes':
816 try:
817 sleepers = (cond._sleeping_count.get_value() -
818 cond._woken_count.get_value())
819 self.assertEqual(sleepers, 0)
820 self.assertEqual(cond._wait_semaphore.get_value(), 0)
821 except NotImplementedError:
822 pass
823
824 def test_notify(self):
825 cond = self.Condition()
826 sleeping = self.Semaphore(0)
827 woken = self.Semaphore(0)
828
829 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000830 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 p.start()
832
833 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000834 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 p.start()
836
837 # wait for both children to start sleeping
838 sleeping.acquire()
839 sleeping.acquire()
840
841 # check no process/thread has woken up
842 time.sleep(DELTA)
843 self.assertReturnsIfImplemented(0, get_value, woken)
844
845 # wake up one process/thread
846 cond.acquire()
847 cond.notify()
848 cond.release()
849
850 # check one process/thread has woken up
851 time.sleep(DELTA)
852 self.assertReturnsIfImplemented(1, get_value, woken)
853
854 # wake up another
855 cond.acquire()
856 cond.notify()
857 cond.release()
858
859 # check other has woken up
860 time.sleep(DELTA)
861 self.assertReturnsIfImplemented(2, get_value, woken)
862
863 # check state is not mucked up
864 self.check_invariant(cond)
865 p.join()
866
867 def test_notify_all(self):
868 cond = self.Condition()
869 sleeping = self.Semaphore(0)
870 woken = self.Semaphore(0)
871
872 # start some threads/processes which will timeout
873 for i in range(3):
874 p = self.Process(target=self.f,
875 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000876 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000877 p.start()
878
879 t = threading.Thread(target=self.f,
880 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000881 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882 t.start()
883
884 # wait for them all to sleep
885 for i in range(6):
886 sleeping.acquire()
887
888 # check they have all timed out
889 for i in range(6):
890 woken.acquire()
891 self.assertReturnsIfImplemented(0, get_value, woken)
892
893 # check state is not mucked up
894 self.check_invariant(cond)
895
896 # start some more threads/processes
897 for i in range(3):
898 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000899 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 p.start()
901
902 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000903 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 t.start()
905
906 # wait for them to all sleep
907 for i in range(6):
908 sleeping.acquire()
909
910 # check no process/thread has woken up
911 time.sleep(DELTA)
912 self.assertReturnsIfImplemented(0, get_value, woken)
913
914 # wake them all up
915 cond.acquire()
916 cond.notify_all()
917 cond.release()
918
919 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200920 for i in range(10):
921 try:
922 if get_value(woken) == 6:
923 break
924 except NotImplementedError:
925 break
926 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927 self.assertReturnsIfImplemented(6, get_value, woken)
928
929 # check state is not mucked up
930 self.check_invariant(cond)
931
932 def test_timeout(self):
933 cond = self.Condition()
934 wait = TimingWrapper(cond.wait)
935 cond.acquire()
936 res = wait(TIMEOUT1)
937 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000938 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000939 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
940
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200941 @classmethod
942 def _test_waitfor_f(cls, cond, state):
943 with cond:
944 state.value = 0
945 cond.notify()
946 result = cond.wait_for(lambda : state.value==4)
947 if not result or state.value != 4:
948 sys.exit(1)
949
950 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
951 def test_waitfor(self):
952 # based on test in test/lock_tests.py
953 cond = self.Condition()
954 state = self.Value('i', -1)
955
956 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
957 p.daemon = True
958 p.start()
959
960 with cond:
961 result = cond.wait_for(lambda : state.value==0)
962 self.assertTrue(result)
963 self.assertEqual(state.value, 0)
964
965 for i in range(4):
966 time.sleep(0.01)
967 with cond:
968 state.value += 1
969 cond.notify()
970
971 p.join(5)
972 self.assertFalse(p.is_alive())
973 self.assertEqual(p.exitcode, 0)
974
975 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100976 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
977 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200978 with cond:
979 expected = 0.1
980 dt = time.time()
981 result = cond.wait_for(lambda : state.value==4, timeout=expected)
982 dt = time.time() - dt
983 # borrow logic in assertTimeout() from test/lock_tests.py
984 if not result and expected * 0.6 < dt < expected * 10.0:
985 success.value = True
986
987 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
988 def test_waitfor_timeout(self):
989 # based on test in test/lock_tests.py
990 cond = self.Condition()
991 state = self.Value('i', 0)
992 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100993 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200994
995 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100996 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200997 p.daemon = True
998 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100999 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001000
1001 # Only increment 3 times, so state == 4 is never reached.
1002 for i in range(3):
1003 time.sleep(0.01)
1004 with cond:
1005 state.value += 1
1006 cond.notify()
1007
1008 p.join(5)
1009 self.assertTrue(success.value)
1010
Richard Oudkerk98449932012-06-05 13:15:29 +01001011 @classmethod
1012 def _test_wait_result(cls, c, pid):
1013 with c:
1014 c.notify()
1015 time.sleep(1)
1016 if pid is not None:
1017 os.kill(pid, signal.SIGINT)
1018
1019 def test_wait_result(self):
1020 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1021 pid = os.getpid()
1022 else:
1023 pid = None
1024
1025 c = self.Condition()
1026 with c:
1027 self.assertFalse(c.wait(0))
1028 self.assertFalse(c.wait(0.1))
1029
1030 p = self.Process(target=self._test_wait_result, args=(c, pid))
1031 p.start()
1032
1033 self.assertTrue(c.wait(10))
1034 if pid is not None:
1035 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1036
1037 p.join()
1038
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039
1040class _TestEvent(BaseTestCase):
1041
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001042 @classmethod
1043 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001044 time.sleep(TIMEOUT2)
1045 event.set()
1046
1047 def test_event(self):
1048 event = self.Event()
1049 wait = TimingWrapper(event.wait)
1050
Ezio Melotti13925002011-03-16 11:05:33 +02001051 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001052 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001053 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001054
Benjamin Peterson965ce872009-04-05 21:24:58 +00001055 # Removed, threading.Event.wait() will return the value of the __flag
1056 # instead of None. API Shear with the semaphore backed mp.Event
1057 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001059 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001060 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1061
1062 event.set()
1063
1064 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001065 self.assertEqual(event.is_set(), True)
1066 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001067 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001068 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001069 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1070 # self.assertEqual(event.is_set(), True)
1071
1072 event.clear()
1073
1074 #self.assertEqual(event.is_set(), False)
1075
Jesus Cea94f964f2011-09-09 20:26:57 +02001076 p = self.Process(target=self._test_event, args=(event,))
1077 p.daemon = True
1078 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001079 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001080
1081#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001082# Tests for Barrier - adapted from tests in test/lock_tests.py
1083#
1084
1085# Many of the tests for threading.Barrier use a list as an atomic
1086# counter: a value is appended to increment the counter, and the
1087# length of the list gives the value. We use the class DummyList
1088# for the same purpose.
1089
1090class _DummyList(object):
1091
1092 def __init__(self):
1093 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1094 lock = multiprocessing.Lock()
1095 self.__setstate__((wrapper, lock))
1096 self._lengthbuf[0] = 0
1097
1098 def __setstate__(self, state):
1099 (self._wrapper, self._lock) = state
1100 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1101
1102 def __getstate__(self):
1103 return (self._wrapper, self._lock)
1104
1105 def append(self, _):
1106 with self._lock:
1107 self._lengthbuf[0] += 1
1108
1109 def __len__(self):
1110 with self._lock:
1111 return self._lengthbuf[0]
1112
1113def _wait():
1114 # A crude wait/yield function not relying on synchronization primitives.
1115 time.sleep(0.01)
1116
1117
1118class Bunch(object):
1119 """
1120 A bunch of threads.
1121 """
1122 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1123 """
1124 Construct a bunch of `n` threads running the same function `f`.
1125 If `wait_before_exit` is True, the threads won't terminate until
1126 do_finish() is called.
1127 """
1128 self.f = f
1129 self.args = args
1130 self.n = n
1131 self.started = namespace.DummyList()
1132 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001133 self._can_exit = namespace.Event()
1134 if not wait_before_exit:
1135 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001136 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001137 p = namespace.Process(target=self.task)
1138 p.daemon = True
1139 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001140
1141 def task(self):
1142 pid = os.getpid()
1143 self.started.append(pid)
1144 try:
1145 self.f(*self.args)
1146 finally:
1147 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001148 self._can_exit.wait(30)
1149 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001150
1151 def wait_for_started(self):
1152 while len(self.started) < self.n:
1153 _wait()
1154
1155 def wait_for_finished(self):
1156 while len(self.finished) < self.n:
1157 _wait()
1158
1159 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001160 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001161
1162
1163class AppendTrue(object):
1164 def __init__(self, obj):
1165 self.obj = obj
1166 def __call__(self):
1167 self.obj.append(True)
1168
1169
1170class _TestBarrier(BaseTestCase):
1171 """
1172 Tests for Barrier objects.
1173 """
1174 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001175 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001176
1177 def setUp(self):
1178 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1179
1180 def tearDown(self):
1181 self.barrier.abort()
1182 self.barrier = None
1183
1184 def DummyList(self):
1185 if self.TYPE == 'threads':
1186 return []
1187 elif self.TYPE == 'manager':
1188 return self.manager.list()
1189 else:
1190 return _DummyList()
1191
1192 def run_threads(self, f, args):
1193 b = Bunch(self, f, args, self.N-1)
1194 f(*args)
1195 b.wait_for_finished()
1196
1197 @classmethod
1198 def multipass(cls, barrier, results, n):
1199 m = barrier.parties
1200 assert m == cls.N
1201 for i in range(n):
1202 results[0].append(True)
1203 assert len(results[1]) == i * m
1204 barrier.wait()
1205 results[1].append(True)
1206 assert len(results[0]) == (i + 1) * m
1207 barrier.wait()
1208 try:
1209 assert barrier.n_waiting == 0
1210 except NotImplementedError:
1211 pass
1212 assert not barrier.broken
1213
1214 def test_barrier(self, passes=1):
1215 """
1216 Test that a barrier is passed in lockstep
1217 """
1218 results = [self.DummyList(), self.DummyList()]
1219 self.run_threads(self.multipass, (self.barrier, results, passes))
1220
1221 def test_barrier_10(self):
1222 """
1223 Test that a barrier works for 10 consecutive runs
1224 """
1225 return self.test_barrier(10)
1226
1227 @classmethod
1228 def _test_wait_return_f(cls, barrier, queue):
1229 res = barrier.wait()
1230 queue.put(res)
1231
1232 def test_wait_return(self):
1233 """
1234 test the return value from barrier.wait
1235 """
1236 queue = self.Queue()
1237 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1238 results = [queue.get() for i in range(self.N)]
1239 self.assertEqual(results.count(0), 1)
1240
1241 @classmethod
1242 def _test_action_f(cls, barrier, results):
1243 barrier.wait()
1244 if len(results) != 1:
1245 raise RuntimeError
1246
1247 def test_action(self):
1248 """
1249 Test the 'action' callback
1250 """
1251 results = self.DummyList()
1252 barrier = self.Barrier(self.N, action=AppendTrue(results))
1253 self.run_threads(self._test_action_f, (barrier, results))
1254 self.assertEqual(len(results), 1)
1255
1256 @classmethod
1257 def _test_abort_f(cls, barrier, results1, results2):
1258 try:
1259 i = barrier.wait()
1260 if i == cls.N//2:
1261 raise RuntimeError
1262 barrier.wait()
1263 results1.append(True)
1264 except threading.BrokenBarrierError:
1265 results2.append(True)
1266 except RuntimeError:
1267 barrier.abort()
1268
1269 def test_abort(self):
1270 """
1271 Test that an abort will put the barrier in a broken state
1272 """
1273 results1 = self.DummyList()
1274 results2 = self.DummyList()
1275 self.run_threads(self._test_abort_f,
1276 (self.barrier, results1, results2))
1277 self.assertEqual(len(results1), 0)
1278 self.assertEqual(len(results2), self.N-1)
1279 self.assertTrue(self.barrier.broken)
1280
1281 @classmethod
1282 def _test_reset_f(cls, barrier, results1, results2, results3):
1283 i = barrier.wait()
1284 if i == cls.N//2:
1285 # Wait until the other threads are all in the barrier.
1286 while barrier.n_waiting < cls.N-1:
1287 time.sleep(0.001)
1288 barrier.reset()
1289 else:
1290 try:
1291 barrier.wait()
1292 results1.append(True)
1293 except threading.BrokenBarrierError:
1294 results2.append(True)
1295 # Now, pass the barrier again
1296 barrier.wait()
1297 results3.append(True)
1298
1299 def test_reset(self):
1300 """
1301 Test that a 'reset' on a barrier frees the waiting threads
1302 """
1303 results1 = self.DummyList()
1304 results2 = self.DummyList()
1305 results3 = self.DummyList()
1306 self.run_threads(self._test_reset_f,
1307 (self.barrier, results1, results2, results3))
1308 self.assertEqual(len(results1), 0)
1309 self.assertEqual(len(results2), self.N-1)
1310 self.assertEqual(len(results3), self.N)
1311
1312 @classmethod
1313 def _test_abort_and_reset_f(cls, barrier, barrier2,
1314 results1, results2, results3):
1315 try:
1316 i = barrier.wait()
1317 if i == cls.N//2:
1318 raise RuntimeError
1319 barrier.wait()
1320 results1.append(True)
1321 except threading.BrokenBarrierError:
1322 results2.append(True)
1323 except RuntimeError:
1324 barrier.abort()
1325 # Synchronize and reset the barrier. Must synchronize first so
1326 # that everyone has left it when we reset, and after so that no
1327 # one enters it before the reset.
1328 if barrier2.wait() == cls.N//2:
1329 barrier.reset()
1330 barrier2.wait()
1331 barrier.wait()
1332 results3.append(True)
1333
1334 def test_abort_and_reset(self):
1335 """
1336 Test that a barrier can be reset after being broken.
1337 """
1338 results1 = self.DummyList()
1339 results2 = self.DummyList()
1340 results3 = self.DummyList()
1341 barrier2 = self.Barrier(self.N)
1342
1343 self.run_threads(self._test_abort_and_reset_f,
1344 (self.barrier, barrier2, results1, results2, results3))
1345 self.assertEqual(len(results1), 0)
1346 self.assertEqual(len(results2), self.N-1)
1347 self.assertEqual(len(results3), self.N)
1348
1349 @classmethod
1350 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001351 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001352 if i == cls.N//2:
1353 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001354 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001355 try:
1356 barrier.wait(0.5)
1357 except threading.BrokenBarrierError:
1358 results.append(True)
1359
1360 def test_timeout(self):
1361 """
1362 Test wait(timeout)
1363 """
1364 results = self.DummyList()
1365 self.run_threads(self._test_timeout_f, (self.barrier, results))
1366 self.assertEqual(len(results), self.barrier.parties)
1367
1368 @classmethod
1369 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001370 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001371 if i == cls.N//2:
1372 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001373 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001374 try:
1375 barrier.wait()
1376 except threading.BrokenBarrierError:
1377 results.append(True)
1378
1379 def test_default_timeout(self):
1380 """
1381 Test the barrier's default timeout
1382 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001383 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001384 results = self.DummyList()
1385 self.run_threads(self._test_default_timeout_f, (barrier, results))
1386 self.assertEqual(len(results), barrier.parties)
1387
1388 def test_single_thread(self):
1389 b = self.Barrier(1)
1390 b.wait()
1391 b.wait()
1392
1393 @classmethod
1394 def _test_thousand_f(cls, barrier, passes, conn, lock):
1395 for i in range(passes):
1396 barrier.wait()
1397 with lock:
1398 conn.send(i)
1399
1400 def test_thousand(self):
1401 if self.TYPE == 'manager':
1402 return
1403 passes = 1000
1404 lock = self.Lock()
1405 conn, child_conn = self.Pipe(False)
1406 for j in range(self.N):
1407 p = self.Process(target=self._test_thousand_f,
1408 args=(self.barrier, passes, child_conn, lock))
1409 p.start()
1410
1411 for i in range(passes):
1412 for j in range(self.N):
1413 self.assertEqual(conn.recv(), i)
1414
1415#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001416#
1417#
1418
1419class _TestValue(BaseTestCase):
1420
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001421 ALLOWED_TYPES = ('processes',)
1422
Benjamin Petersone711caf2008-06-11 16:44:04 +00001423 codes_values = [
1424 ('i', 4343, 24234),
1425 ('d', 3.625, -4.25),
1426 ('h', -232, 234),
1427 ('c', latin('x'), latin('y'))
1428 ]
1429
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001430 def setUp(self):
1431 if not HAS_SHAREDCTYPES:
1432 self.skipTest("requires multiprocessing.sharedctypes")
1433
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001434 @classmethod
1435 def _test(cls, values):
1436 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001437 sv.value = cv[2]
1438
1439
1440 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001441 if raw:
1442 values = [self.RawValue(code, value)
1443 for code, value, _ in self.codes_values]
1444 else:
1445 values = [self.Value(code, value)
1446 for code, value, _ in self.codes_values]
1447
1448 for sv, cv in zip(values, self.codes_values):
1449 self.assertEqual(sv.value, cv[1])
1450
1451 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001452 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001453 proc.start()
1454 proc.join()
1455
1456 for sv, cv in zip(values, self.codes_values):
1457 self.assertEqual(sv.value, cv[2])
1458
1459 def test_rawvalue(self):
1460 self.test_value(raw=True)
1461
1462 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001463 val1 = self.Value('i', 5)
1464 lock1 = val1.get_lock()
1465 obj1 = val1.get_obj()
1466
1467 val2 = self.Value('i', 5, lock=None)
1468 lock2 = val2.get_lock()
1469 obj2 = val2.get_obj()
1470
1471 lock = self.Lock()
1472 val3 = self.Value('i', 5, lock=lock)
1473 lock3 = val3.get_lock()
1474 obj3 = val3.get_obj()
1475 self.assertEqual(lock, lock3)
1476
Jesse Nollerb0516a62009-01-18 03:11:38 +00001477 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001478 self.assertFalse(hasattr(arr4, 'get_lock'))
1479 self.assertFalse(hasattr(arr4, 'get_obj'))
1480
Jesse Nollerb0516a62009-01-18 03:11:38 +00001481 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1482
1483 arr5 = self.RawValue('i', 5)
1484 self.assertFalse(hasattr(arr5, 'get_lock'))
1485 self.assertFalse(hasattr(arr5, 'get_obj'))
1486
Benjamin Petersone711caf2008-06-11 16:44:04 +00001487
1488class _TestArray(BaseTestCase):
1489
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001490 ALLOWED_TYPES = ('processes',)
1491
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001492 @classmethod
1493 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001494 for i in range(1, len(seq)):
1495 seq[i] += seq[i-1]
1496
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001497 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001498 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001499 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1500 if raw:
1501 arr = self.RawArray('i', seq)
1502 else:
1503 arr = self.Array('i', seq)
1504
1505 self.assertEqual(len(arr), len(seq))
1506 self.assertEqual(arr[3], seq[3])
1507 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1508
1509 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1510
1511 self.assertEqual(list(arr[:]), seq)
1512
1513 self.f(seq)
1514
1515 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001516 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001517 p.start()
1518 p.join()
1519
1520 self.assertEqual(list(arr[:]), seq)
1521
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001522 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001523 def test_array_from_size(self):
1524 size = 10
1525 # Test for zeroing (see issue #11675).
1526 # The repetition below strengthens the test by increasing the chances
1527 # of previously allocated non-zero memory being used for the new array
1528 # on the 2nd and 3rd loops.
1529 for _ in range(3):
1530 arr = self.Array('i', size)
1531 self.assertEqual(len(arr), size)
1532 self.assertEqual(list(arr), [0] * size)
1533 arr[:] = range(10)
1534 self.assertEqual(list(arr), list(range(10)))
1535 del arr
1536
1537 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001538 def test_rawarray(self):
1539 self.test_array(raw=True)
1540
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001541 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001542 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001543 arr1 = self.Array('i', list(range(10)))
1544 lock1 = arr1.get_lock()
1545 obj1 = arr1.get_obj()
1546
1547 arr2 = self.Array('i', list(range(10)), lock=None)
1548 lock2 = arr2.get_lock()
1549 obj2 = arr2.get_obj()
1550
1551 lock = self.Lock()
1552 arr3 = self.Array('i', list(range(10)), lock=lock)
1553 lock3 = arr3.get_lock()
1554 obj3 = arr3.get_obj()
1555 self.assertEqual(lock, lock3)
1556
Jesse Nollerb0516a62009-01-18 03:11:38 +00001557 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001558 self.assertFalse(hasattr(arr4, 'get_lock'))
1559 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001560 self.assertRaises(AttributeError,
1561 self.Array, 'i', range(10), lock='notalock')
1562
1563 arr5 = self.RawArray('i', range(10))
1564 self.assertFalse(hasattr(arr5, 'get_lock'))
1565 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001566
1567#
1568#
1569#
1570
1571class _TestContainers(BaseTestCase):
1572
1573 ALLOWED_TYPES = ('manager',)
1574
1575 def test_list(self):
1576 a = self.list(list(range(10)))
1577 self.assertEqual(a[:], list(range(10)))
1578
1579 b = self.list()
1580 self.assertEqual(b[:], [])
1581
1582 b.extend(list(range(5)))
1583 self.assertEqual(b[:], list(range(5)))
1584
1585 self.assertEqual(b[2], 2)
1586 self.assertEqual(b[2:10], [2,3,4])
1587
1588 b *= 2
1589 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1590
1591 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1592
1593 self.assertEqual(a[:], list(range(10)))
1594
1595 d = [a, b]
1596 e = self.list(d)
1597 self.assertEqual(
1598 e[:],
1599 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1600 )
1601
1602 f = self.list([a])
1603 a.append('hello')
1604 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1605
1606 def test_dict(self):
1607 d = self.dict()
1608 indices = list(range(65, 70))
1609 for i in indices:
1610 d[i] = chr(i)
1611 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1612 self.assertEqual(sorted(d.keys()), indices)
1613 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1614 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1615
1616 def test_namespace(self):
1617 n = self.Namespace()
1618 n.name = 'Bob'
1619 n.job = 'Builder'
1620 n._hidden = 'hidden'
1621 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1622 del n.job
1623 self.assertEqual(str(n), "Namespace(name='Bob')")
1624 self.assertTrue(hasattr(n, 'name'))
1625 self.assertTrue(not hasattr(n, 'job'))
1626
1627#
1628#
1629#
1630
1631def sqr(x, wait=0.0):
1632 time.sleep(wait)
1633 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001634
Antoine Pitroude911b22011-12-21 11:03:24 +01001635def mul(x, y):
1636 return x*y
1637
Benjamin Petersone711caf2008-06-11 16:44:04 +00001638class _TestPool(BaseTestCase):
1639
Richard Oudkerkd15642e2013-07-16 15:33:41 +01001640 @classmethod
1641 def setUpClass(cls):
1642 super().setUpClass()
1643 cls.pool = cls.Pool(4)
1644
1645 @classmethod
1646 def tearDownClass(cls):
1647 cls.pool.terminate()
1648 cls.pool.join()
1649 cls.pool = None
1650 super().tearDownClass()
1651
Benjamin Petersone711caf2008-06-11 16:44:04 +00001652 def test_apply(self):
1653 papply = self.pool.apply
1654 self.assertEqual(papply(sqr, (5,)), sqr(5))
1655 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1656
1657 def test_map(self):
1658 pmap = self.pool.map
1659 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1660 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1661 list(map(sqr, list(range(100)))))
1662
Antoine Pitroude911b22011-12-21 11:03:24 +01001663 def test_starmap(self):
1664 psmap = self.pool.starmap
1665 tuples = list(zip(range(10), range(9,-1, -1)))
1666 self.assertEqual(psmap(mul, tuples),
1667 list(itertools.starmap(mul, tuples)))
1668 tuples = list(zip(range(100), range(99,-1, -1)))
1669 self.assertEqual(psmap(mul, tuples, chunksize=20),
1670 list(itertools.starmap(mul, tuples)))
1671
1672 def test_starmap_async(self):
1673 tuples = list(zip(range(100), range(99,-1, -1)))
1674 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1675 list(itertools.starmap(mul, tuples)))
1676
Hynek Schlawack254af262012-10-27 12:53:02 +02001677 def test_map_async(self):
1678 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1679 list(map(sqr, list(range(10)))))
1680
1681 def test_map_async_callbacks(self):
1682 call_args = self.manager.list() if self.TYPE == 'manager' else []
1683 self.pool.map_async(int, ['1'],
1684 callback=call_args.append,
1685 error_callback=call_args.append).wait()
1686 self.assertEqual(1, len(call_args))
1687 self.assertEqual([1], call_args[0])
1688 self.pool.map_async(int, ['a'],
1689 callback=call_args.append,
1690 error_callback=call_args.append).wait()
1691 self.assertEqual(2, len(call_args))
1692 self.assertIsInstance(call_args[1], ValueError)
1693
Richard Oudkerke90cedb2013-10-28 23:11:58 +00001694 def test_map_unplicklable(self):
1695 # Issue #19425 -- failure to pickle should not cause a hang
1696 if self.TYPE == 'threads':
1697 return
1698 class A(object):
1699 def __reduce__(self):
1700 raise RuntimeError('cannot pickle')
1701 with self.assertRaises(RuntimeError):
1702 self.pool.map(sqr, [A()]*10)
1703
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001704 def test_map_chunksize(self):
1705 try:
1706 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1707 except multiprocessing.TimeoutError:
1708 self.fail("pool.map_async with chunksize stalled on null list")
1709
Benjamin Petersone711caf2008-06-11 16:44:04 +00001710 def test_async(self):
1711 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1712 get = TimingWrapper(res.get)
1713 self.assertEqual(get(), 49)
1714 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1715
1716 def test_async_timeout(self):
Richard Oudkerk46b4a5e2013-11-17 17:45:16 +00001717 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001718 get = TimingWrapper(res.get)
1719 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1720 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1721
1722 def test_imap(self):
1723 it = self.pool.imap(sqr, list(range(10)))
1724 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1725
1726 it = self.pool.imap(sqr, list(range(10)))
1727 for i in range(10):
1728 self.assertEqual(next(it), i*i)
1729 self.assertRaises(StopIteration, it.__next__)
1730
1731 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1732 for i in range(1000):
1733 self.assertEqual(next(it), i*i)
1734 self.assertRaises(StopIteration, it.__next__)
1735
1736 def test_imap_unordered(self):
1737 it = self.pool.imap_unordered(sqr, list(range(1000)))
1738 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1739
1740 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1741 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1742
1743 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001744 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1745 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1746
Benjamin Petersone711caf2008-06-11 16:44:04 +00001747 p = multiprocessing.Pool(3)
1748 self.assertEqual(3, len(p._pool))
1749 p.close()
1750 p.join()
1751
1752 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001753 result = self.pool.map_async(
1754 time.sleep, [0.1 for i in range(10000)], chunksize=1
1755 )
1756 self.pool.terminate()
1757 join = TimingWrapper(self.pool.join)
1758 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001759 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001760
Richard Oudkerke41682b2012-06-06 19:04:57 +01001761 def test_empty_iterable(self):
1762 # See Issue 12157
1763 p = self.Pool(1)
1764
1765 self.assertEqual(p.map(sqr, []), [])
1766 self.assertEqual(list(p.imap(sqr, [])), [])
1767 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1768 self.assertEqual(p.map_async(sqr, []).get(), [])
1769
1770 p.close()
1771 p.join()
1772
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001773 def test_context(self):
1774 if self.TYPE == 'processes':
1775 L = list(range(10))
1776 expected = [sqr(i) for i in L]
1777 with multiprocessing.Pool(2) as p:
1778 r = p.map_async(sqr, L)
1779 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001780 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001781
Ask Solem2afcbf22010-11-09 20:55:52 +00001782def raising():
1783 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001784
Ask Solem2afcbf22010-11-09 20:55:52 +00001785def unpickleable_result():
1786 return lambda: 42
1787
1788class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001789 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001790
1791 def test_async_error_callback(self):
1792 p = multiprocessing.Pool(2)
1793
1794 scratchpad = [None]
1795 def errback(exc):
1796 scratchpad[0] = exc
1797
1798 res = p.apply_async(raising, error_callback=errback)
1799 self.assertRaises(KeyError, res.get)
1800 self.assertTrue(scratchpad[0])
1801 self.assertIsInstance(scratchpad[0], KeyError)
1802
1803 p.close()
1804 p.join()
1805
1806 def test_unpickleable_result(self):
1807 from multiprocessing.pool import MaybeEncodingError
1808 p = multiprocessing.Pool(2)
1809
1810 # Make sure we don't lose pool processes because of encoding errors.
1811 for iteration in range(20):
1812
1813 scratchpad = [None]
1814 def errback(exc):
1815 scratchpad[0] = exc
1816
1817 res = p.apply_async(unpickleable_result, error_callback=errback)
1818 self.assertRaises(MaybeEncodingError, res.get)
1819 wrapped = scratchpad[0]
1820 self.assertTrue(wrapped)
1821 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1822 self.assertIsNotNone(wrapped.exc)
1823 self.assertIsNotNone(wrapped.value)
1824
1825 p.close()
1826 p.join()
1827
1828class _TestPoolWorkerLifetime(BaseTestCase):
1829 ALLOWED_TYPES = ('processes', )
1830
Jesse Noller1f0b6582010-01-27 03:36:01 +00001831 def test_pool_worker_lifetime(self):
1832 p = multiprocessing.Pool(3, maxtasksperchild=10)
1833 self.assertEqual(3, len(p._pool))
1834 origworkerpids = [w.pid for w in p._pool]
1835 # Run many tasks so each worker gets replaced (hopefully)
1836 results = []
1837 for i in range(100):
1838 results.append(p.apply_async(sqr, (i, )))
1839 # Fetch the results and verify we got the right answers,
1840 # also ensuring all the tasks have completed.
1841 for (j, res) in enumerate(results):
1842 self.assertEqual(res.get(), sqr(j))
1843 # Refill the pool
1844 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001845 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001846 # (countdown * DELTA = 5 seconds max startup process time)
1847 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001848 while countdown and not all(w.is_alive() for w in p._pool):
1849 countdown -= 1
1850 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001851 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001852 # All pids should be assigned. See issue #7805.
1853 self.assertNotIn(None, origworkerpids)
1854 self.assertNotIn(None, finalworkerpids)
1855 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001856 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1857 p.close()
1858 p.join()
1859
Charles-François Natalif8859e12011-10-24 18:45:29 +02001860 def test_pool_worker_lifetime_early_close(self):
1861 # Issue #10332: closing a pool whose workers have limited lifetimes
1862 # before all the tasks completed would make join() hang.
1863 p = multiprocessing.Pool(3, maxtasksperchild=1)
1864 results = []
1865 for i in range(6):
1866 results.append(p.apply_async(sqr, (i, 0.3)))
1867 p.close()
1868 p.join()
1869 # check the results
1870 for (j, res) in enumerate(results):
1871 self.assertEqual(res.get(), sqr(j))
1872
Benjamin Petersone711caf2008-06-11 16:44:04 +00001873#
1874# Test of creating a customized manager class
1875#
1876
1877from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1878
1879class FooBar(object):
1880 def f(self):
1881 return 'f()'
1882 def g(self):
1883 raise ValueError
1884 def _h(self):
1885 return '_h()'
1886
1887def baz():
1888 for i in range(10):
1889 yield i*i
1890
1891class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001892 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001893 def __iter__(self):
1894 return self
1895 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001896 return self._callmethod('__next__')
1897
1898class MyManager(BaseManager):
1899 pass
1900
1901MyManager.register('Foo', callable=FooBar)
1902MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1903MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1904
1905
1906class _TestMyManager(BaseTestCase):
1907
1908 ALLOWED_TYPES = ('manager',)
1909
1910 def test_mymanager(self):
1911 manager = MyManager()
1912 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001913 self.common(manager)
1914 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001915
Richard Oudkerkac385712012-06-18 21:29:30 +01001916 # If the manager process exited cleanly then the exitcode
1917 # will be zero. Otherwise (after a short timeout)
1918 # terminate() is used, resulting in an exitcode of -SIGTERM.
1919 self.assertEqual(manager._process.exitcode, 0)
1920
1921 def test_mymanager_context(self):
1922 with MyManager() as manager:
1923 self.common(manager)
1924 self.assertEqual(manager._process.exitcode, 0)
1925
1926 def test_mymanager_context_prestarted(self):
1927 manager = MyManager()
1928 manager.start()
1929 with manager:
1930 self.common(manager)
1931 self.assertEqual(manager._process.exitcode, 0)
1932
1933 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001934 foo = manager.Foo()
1935 bar = manager.Bar()
1936 baz = manager.baz()
1937
1938 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1939 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1940
1941 self.assertEqual(foo_methods, ['f', 'g'])
1942 self.assertEqual(bar_methods, ['f', '_h'])
1943
1944 self.assertEqual(foo.f(), 'f()')
1945 self.assertRaises(ValueError, foo.g)
1946 self.assertEqual(foo._callmethod('f'), 'f()')
1947 self.assertRaises(RemoteError, foo._callmethod, '_h')
1948
1949 self.assertEqual(bar.f(), 'f()')
1950 self.assertEqual(bar._h(), '_h()')
1951 self.assertEqual(bar._callmethod('f'), 'f()')
1952 self.assertEqual(bar._callmethod('_h'), '_h()')
1953
1954 self.assertEqual(list(baz), [i*i for i in range(10)])
1955
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001956
Benjamin Petersone711caf2008-06-11 16:44:04 +00001957#
1958# Test of connecting to a remote server and using xmlrpclib for serialization
1959#
1960
1961_queue = pyqueue.Queue()
1962def get_queue():
1963 return _queue
1964
1965class QueueManager(BaseManager):
1966 '''manager class used by server process'''
1967QueueManager.register('get_queue', callable=get_queue)
1968
1969class QueueManager2(BaseManager):
1970 '''manager class which specifies the same interface as QueueManager'''
1971QueueManager2.register('get_queue')
1972
1973
1974SERIALIZER = 'xmlrpclib'
1975
1976class _TestRemoteManager(BaseTestCase):
1977
1978 ALLOWED_TYPES = ('manager',)
1979
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001980 @classmethod
1981 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001982 manager = QueueManager2(
1983 address=address, authkey=authkey, serializer=SERIALIZER
1984 )
1985 manager.connect()
1986 queue = manager.get_queue()
1987 queue.put(('hello world', None, True, 2.25))
1988
1989 def test_remote(self):
1990 authkey = os.urandom(32)
1991
1992 manager = QueueManager(
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02001993 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00001994 )
1995 manager.start()
1996
1997 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001998 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001999 p.start()
2000
2001 manager2 = QueueManager2(
2002 address=manager.address, authkey=authkey, serializer=SERIALIZER
2003 )
2004 manager2.connect()
2005 queue = manager2.get_queue()
2006
2007 # Note that xmlrpclib will deserialize object as a list not a tuple
2008 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
2009
2010 # Because we are using xmlrpclib for serialization instead of
2011 # pickle this will cause a serialization error.
2012 self.assertRaises(Exception, queue.put, time.sleep)
2013
2014 # Make queue finalizer run before the server is stopped
2015 del queue
2016 manager.shutdown()
2017
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002018class _TestManagerRestart(BaseTestCase):
2019
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002020 @classmethod
2021 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002022 manager = QueueManager(
2023 address=address, authkey=authkey, serializer=SERIALIZER)
2024 manager.connect()
2025 queue = manager.get_queue()
2026 queue.put('hello world')
2027
2028 def test_rapid_restart(self):
2029 authkey = os.urandom(32)
2030 manager = QueueManager(
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02002031 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002032 srvr = manager.get_server()
2033 addr = srvr.address
2034 # Close the connection.Listener socket which gets opened as a part
2035 # of manager.get_server(). It's not needed for the test.
2036 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002037 manager.start()
2038
2039 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002040 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002041 p.start()
2042 queue = manager.get_queue()
2043 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002044 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002045 manager.shutdown()
2046 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002047 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002048 try:
2049 manager.start()
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002050 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002051 if e.errno != errno.EADDRINUSE:
2052 raise
2053 # Retry after some time, in case the old socket was lingering
2054 # (sporadic failure on buildbots)
2055 time.sleep(1.0)
2056 manager = QueueManager(
2057 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002058 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002059
Benjamin Petersone711caf2008-06-11 16:44:04 +00002060#
2061#
2062#
2063
2064SENTINEL = latin('')
2065
2066class _TestConnection(BaseTestCase):
2067
2068 ALLOWED_TYPES = ('processes', 'threads')
2069
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002070 @classmethod
2071 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002072 for msg in iter(conn.recv_bytes, SENTINEL):
2073 conn.send_bytes(msg)
2074 conn.close()
2075
2076 def test_connection(self):
2077 conn, child_conn = self.Pipe()
2078
2079 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002080 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002081 p.start()
2082
2083 seq = [1, 2.25, None]
2084 msg = latin('hello world')
2085 longmsg = msg * 10
2086 arr = array.array('i', list(range(4)))
2087
2088 if self.TYPE == 'processes':
2089 self.assertEqual(type(conn.fileno()), int)
2090
2091 self.assertEqual(conn.send(seq), None)
2092 self.assertEqual(conn.recv(), seq)
2093
2094 self.assertEqual(conn.send_bytes(msg), None)
2095 self.assertEqual(conn.recv_bytes(), msg)
2096
2097 if self.TYPE == 'processes':
2098 buffer = array.array('i', [0]*10)
2099 expected = list(arr) + [0] * (10 - len(arr))
2100 self.assertEqual(conn.send_bytes(arr), None)
2101 self.assertEqual(conn.recv_bytes_into(buffer),
2102 len(arr) * buffer.itemsize)
2103 self.assertEqual(list(buffer), expected)
2104
2105 buffer = array.array('i', [0]*10)
2106 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2107 self.assertEqual(conn.send_bytes(arr), None)
2108 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2109 len(arr) * buffer.itemsize)
2110 self.assertEqual(list(buffer), expected)
2111
2112 buffer = bytearray(latin(' ' * 40))
2113 self.assertEqual(conn.send_bytes(longmsg), None)
2114 try:
2115 res = conn.recv_bytes_into(buffer)
2116 except multiprocessing.BufferTooShort as e:
2117 self.assertEqual(e.args, (longmsg,))
2118 else:
2119 self.fail('expected BufferTooShort, got %s' % res)
2120
2121 poll = TimingWrapper(conn.poll)
2122
2123 self.assertEqual(poll(), False)
2124 self.assertTimingAlmostEqual(poll.elapsed, 0)
2125
Richard Oudkerk59d54042012-05-10 16:11:12 +01002126 self.assertEqual(poll(-1), False)
2127 self.assertTimingAlmostEqual(poll.elapsed, 0)
2128
Benjamin Petersone711caf2008-06-11 16:44:04 +00002129 self.assertEqual(poll(TIMEOUT1), False)
2130 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2131
2132 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002133 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002134
2135 self.assertEqual(poll(TIMEOUT1), True)
2136 self.assertTimingAlmostEqual(poll.elapsed, 0)
2137
2138 self.assertEqual(conn.recv(), None)
2139
2140 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2141 conn.send_bytes(really_big_msg)
2142 self.assertEqual(conn.recv_bytes(), really_big_msg)
2143
2144 conn.send_bytes(SENTINEL) # tell child to quit
2145 child_conn.close()
2146
2147 if self.TYPE == 'processes':
2148 self.assertEqual(conn.readable, True)
2149 self.assertEqual(conn.writable, True)
2150 self.assertRaises(EOFError, conn.recv)
2151 self.assertRaises(EOFError, conn.recv_bytes)
2152
2153 p.join()
2154
2155 def test_duplex_false(self):
2156 reader, writer = self.Pipe(duplex=False)
2157 self.assertEqual(writer.send(1), None)
2158 self.assertEqual(reader.recv(), 1)
2159 if self.TYPE == 'processes':
2160 self.assertEqual(reader.readable, True)
2161 self.assertEqual(reader.writable, False)
2162 self.assertEqual(writer.readable, False)
2163 self.assertEqual(writer.writable, True)
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002164 self.assertRaises(OSError, reader.send, 2)
2165 self.assertRaises(OSError, writer.recv)
2166 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002167
2168 def test_spawn_close(self):
2169 # We test that a pipe connection can be closed by parent
2170 # process immediately after child is spawned. On Windows this
2171 # would have sometimes failed on old versions because
2172 # child_conn would be closed before the child got a chance to
2173 # duplicate it.
2174 conn, child_conn = self.Pipe()
2175
2176 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002177 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002178 p.start()
2179 child_conn.close() # this might complete before child initializes
2180
2181 msg = latin('hello')
2182 conn.send_bytes(msg)
2183 self.assertEqual(conn.recv_bytes(), msg)
2184
2185 conn.send_bytes(SENTINEL)
2186 conn.close()
2187 p.join()
2188
2189 def test_sendbytes(self):
2190 if self.TYPE != 'processes':
2191 return
2192
2193 msg = latin('abcdefghijklmnopqrstuvwxyz')
2194 a, b = self.Pipe()
2195
2196 a.send_bytes(msg)
2197 self.assertEqual(b.recv_bytes(), msg)
2198
2199 a.send_bytes(msg, 5)
2200 self.assertEqual(b.recv_bytes(), msg[5:])
2201
2202 a.send_bytes(msg, 7, 8)
2203 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2204
2205 a.send_bytes(msg, 26)
2206 self.assertEqual(b.recv_bytes(), latin(''))
2207
2208 a.send_bytes(msg, 26, 0)
2209 self.assertEqual(b.recv_bytes(), latin(''))
2210
2211 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2212
2213 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2214
2215 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2216
2217 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2218
2219 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2220
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002221 @classmethod
2222 def _is_fd_assigned(cls, fd):
2223 try:
2224 os.fstat(fd)
2225 except OSError as e:
2226 if e.errno == errno.EBADF:
2227 return False
2228 raise
2229 else:
2230 return True
2231
2232 @classmethod
2233 def _writefd(cls, conn, data, create_dummy_fds=False):
2234 if create_dummy_fds:
2235 for i in range(0, 256):
2236 if not cls._is_fd_assigned(i):
2237 os.dup2(conn.fileno(), i)
2238 fd = reduction.recv_handle(conn)
2239 if msvcrt:
2240 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2241 os.write(fd, data)
2242 os.close(fd)
2243
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002244 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002245 def test_fd_transfer(self):
2246 if self.TYPE != 'processes':
2247 self.skipTest("only makes sense with processes")
2248 conn, child_conn = self.Pipe(duplex=True)
2249
2250 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002251 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002252 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002253 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002254 with open(test.support.TESTFN, "wb") as f:
2255 fd = f.fileno()
2256 if msvcrt:
2257 fd = msvcrt.get_osfhandle(fd)
2258 reduction.send_handle(conn, fd, p.pid)
2259 p.join()
2260 with open(test.support.TESTFN, "rb") as f:
2261 self.assertEqual(f.read(), b"foo")
2262
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002263 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002264 @unittest.skipIf(sys.platform == "win32",
2265 "test semantics don't make sense on Windows")
2266 @unittest.skipIf(MAXFD <= 256,
2267 "largest assignable fd number is too small")
2268 @unittest.skipUnless(hasattr(os, "dup2"),
2269 "test needs os.dup2()")
2270 def test_large_fd_transfer(self):
2271 # With fd > 256 (issue #11657)
2272 if self.TYPE != 'processes':
2273 self.skipTest("only makes sense with processes")
2274 conn, child_conn = self.Pipe(duplex=True)
2275
2276 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002277 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002278 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002279 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002280 with open(test.support.TESTFN, "wb") as f:
2281 fd = f.fileno()
2282 for newfd in range(256, MAXFD):
2283 if not self._is_fd_assigned(newfd):
2284 break
2285 else:
2286 self.fail("could not find an unassigned large file descriptor")
2287 os.dup2(fd, newfd)
2288 try:
2289 reduction.send_handle(conn, newfd, p.pid)
2290 finally:
2291 os.close(newfd)
2292 p.join()
2293 with open(test.support.TESTFN, "rb") as f:
2294 self.assertEqual(f.read(), b"bar")
2295
Jesus Cea4507e642011-09-21 03:53:25 +02002296 @classmethod
2297 def _send_data_without_fd(self, conn):
2298 os.write(conn.fileno(), b"\0")
2299
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002300 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002301 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2302 def test_missing_fd_transfer(self):
2303 # Check that exception is raised when received data is not
2304 # accompanied by a file descriptor in ancillary data.
2305 if self.TYPE != 'processes':
2306 self.skipTest("only makes sense with processes")
2307 conn, child_conn = self.Pipe(duplex=True)
2308
2309 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2310 p.daemon = True
2311 p.start()
2312 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2313 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002314
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002315 def test_context(self):
2316 a, b = self.Pipe()
2317
2318 with a, b:
2319 a.send(1729)
2320 self.assertEqual(b.recv(), 1729)
2321 if self.TYPE == 'processes':
2322 self.assertFalse(a.closed)
2323 self.assertFalse(b.closed)
2324
2325 if self.TYPE == 'processes':
2326 self.assertTrue(a.closed)
2327 self.assertTrue(b.closed)
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002328 self.assertRaises(OSError, a.recv)
2329 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002330
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002331class _TestListener(BaseTestCase):
2332
Richard Oudkerk91257752012-06-15 21:53:34 +01002333 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002334
2335 def test_multiple_bind(self):
2336 for family in self.connection.families:
2337 l = self.connection.Listener(family=family)
2338 self.addCleanup(l.close)
2339 self.assertRaises(OSError, self.connection.Listener,
2340 l.address, family)
2341
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002342 def test_context(self):
2343 with self.connection.Listener() as l:
2344 with self.connection.Client(l.address) as c:
2345 with l.accept() as d:
2346 c.send(1729)
2347 self.assertEqual(d.recv(), 1729)
2348
2349 if self.TYPE == 'processes':
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002350 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002351
Benjamin Petersone711caf2008-06-11 16:44:04 +00002352class _TestListenerClient(BaseTestCase):
2353
2354 ALLOWED_TYPES = ('processes', 'threads')
2355
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002356 @classmethod
2357 def _test(cls, address):
2358 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002359 conn.send('hello')
2360 conn.close()
2361
2362 def test_listener_client(self):
2363 for family in self.connection.families:
2364 l = self.connection.Listener(family=family)
2365 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002366 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002367 p.start()
2368 conn = l.accept()
2369 self.assertEqual(conn.recv(), 'hello')
2370 p.join()
2371 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002372
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002373 def test_issue14725(self):
2374 l = self.connection.Listener()
2375 p = self.Process(target=self._test, args=(l.address,))
2376 p.daemon = True
2377 p.start()
2378 time.sleep(1)
2379 # On Windows the client process should by now have connected,
2380 # written data and closed the pipe handle by now. This causes
2381 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2382 # 14725.
2383 conn = l.accept()
2384 self.assertEqual(conn.recv(), 'hello')
2385 conn.close()
2386 p.join()
2387 l.close()
2388
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002389 def test_issue16955(self):
2390 for fam in self.connection.families:
2391 l = self.connection.Listener(family=fam)
2392 c = self.connection.Client(l.address)
2393 a = l.accept()
2394 a.send_bytes(b"hello")
2395 self.assertTrue(c.poll(1))
2396 a.close()
2397 c.close()
2398 l.close()
2399
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002400class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002401
2402 ALLOWED_TYPES = ('processes', 'threads')
2403
2404 def test_empty_string(self):
2405 a, b = self.Pipe()
2406 self.assertEqual(a.poll(), False)
2407 b.send_bytes(b'')
2408 self.assertEqual(a.poll(), True)
2409 self.assertEqual(a.poll(), True)
2410
2411 @classmethod
2412 def _child_strings(cls, conn, strings):
2413 for s in strings:
2414 time.sleep(0.1)
2415 conn.send_bytes(s)
2416 conn.close()
2417
2418 def test_strings(self):
2419 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2420 a, b = self.Pipe()
2421 p = self.Process(target=self._child_strings, args=(b, strings))
2422 p.start()
2423
2424 for s in strings:
2425 for i in range(200):
2426 if a.poll(0.01):
2427 break
2428 x = a.recv_bytes()
2429 self.assertEqual(s, x)
2430
2431 p.join()
2432
2433 @classmethod
2434 def _child_boundaries(cls, r):
2435 # Polling may "pull" a message in to the child process, but we
2436 # don't want it to pull only part of a message, as that would
2437 # corrupt the pipe for any other processes which might later
2438 # read from it.
2439 r.poll(5)
2440
2441 def test_boundaries(self):
2442 r, w = self.Pipe(False)
2443 p = self.Process(target=self._child_boundaries, args=(r,))
2444 p.start()
2445 time.sleep(2)
2446 L = [b"first", b"second"]
2447 for obj in L:
2448 w.send_bytes(obj)
2449 w.close()
2450 p.join()
2451 self.assertIn(r.recv_bytes(), L)
2452
2453 @classmethod
2454 def _child_dont_merge(cls, b):
2455 b.send_bytes(b'a')
2456 b.send_bytes(b'b')
2457 b.send_bytes(b'cd')
2458
2459 def test_dont_merge(self):
2460 a, b = self.Pipe()
2461 self.assertEqual(a.poll(0.0), False)
2462 self.assertEqual(a.poll(0.1), False)
2463
2464 p = self.Process(target=self._child_dont_merge, args=(b,))
2465 p.start()
2466
2467 self.assertEqual(a.recv_bytes(), b'a')
2468 self.assertEqual(a.poll(1.0), True)
2469 self.assertEqual(a.poll(1.0), True)
2470 self.assertEqual(a.recv_bytes(), b'b')
2471 self.assertEqual(a.poll(1.0), True)
2472 self.assertEqual(a.poll(1.0), True)
2473 self.assertEqual(a.poll(0.0), True)
2474 self.assertEqual(a.recv_bytes(), b'cd')
2475
2476 p.join()
2477
Benjamin Petersone711caf2008-06-11 16:44:04 +00002478#
2479# Test of sending connection and socket objects between processes
2480#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002481
2482@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002483class _TestPicklingConnections(BaseTestCase):
2484
2485 ALLOWED_TYPES = ('processes',)
2486
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002487 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002488 def tearDownClass(cls):
2489 from multiprocessing.reduction import resource_sharer
2490 resource_sharer.stop(timeout=5)
2491
2492 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002493 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002494 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002495 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002496 conn.send(l.address)
2497 new_conn = l.accept()
2498 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002499 new_conn.close()
2500 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002501
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002502 l = socket.socket()
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02002503 l.bind((test.support.HOST, 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002504 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002505 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002506 new_conn, addr = l.accept()
2507 conn.send(new_conn)
2508 new_conn.close()
2509 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002510
2511 conn.recv()
2512
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002513 @classmethod
2514 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002515 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002516 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002517 client.send(msg.upper())
2518 client.close()
2519
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002520 address, msg = conn.recv()
2521 client = socket.socket()
2522 client.connect(address)
2523 client.sendall(msg.upper())
2524 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002525
2526 conn.close()
2527
2528 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002529 families = self.connection.families
2530
2531 lconn, lconn0 = self.Pipe()
2532 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002533 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002534 lp.start()
2535 lconn0.close()
2536
2537 rconn, rconn0 = self.Pipe()
2538 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002539 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002540 rp.start()
2541 rconn0.close()
2542
2543 for fam in families:
2544 msg = ('This connection uses family %s' % fam).encode('ascii')
2545 address = lconn.recv()
2546 rconn.send((address, msg))
2547 new_conn = lconn.recv()
2548 self.assertEqual(new_conn.recv(), msg.upper())
2549
2550 rconn.send(None)
2551
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002552 msg = latin('This connection uses a normal socket')
2553 address = lconn.recv()
2554 rconn.send((address, msg))
2555 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002556 buf = []
2557 while True:
2558 s = new_conn.recv(100)
2559 if not s:
2560 break
2561 buf.append(s)
2562 buf = b''.join(buf)
2563 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002564 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002565
2566 lconn.send(None)
2567
2568 rconn.close()
2569 lconn.close()
2570
2571 lp.join()
2572 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002573
2574 @classmethod
2575 def child_access(cls, conn):
2576 w = conn.recv()
2577 w.send('all is well')
2578 w.close()
2579
2580 r = conn.recv()
2581 msg = r.recv()
2582 conn.send(msg*2)
2583
2584 conn.close()
2585
2586 def test_access(self):
2587 # On Windows, if we do not specify a destination pid when
2588 # using DupHandle then we need to be careful to use the
2589 # correct access flags for DuplicateHandle(), or else
2590 # DupHandle.detach() will raise PermissionError. For example,
2591 # for a read only pipe handle we should use
2592 # access=FILE_GENERIC_READ. (Unfortunately
2593 # DUPLICATE_SAME_ACCESS does not work.)
2594 conn, child_conn = self.Pipe()
2595 p = self.Process(target=self.child_access, args=(child_conn,))
2596 p.daemon = True
2597 p.start()
2598 child_conn.close()
2599
2600 r, w = self.Pipe(duplex=False)
2601 conn.send(w)
2602 w.close()
2603 self.assertEqual(r.recv(), 'all is well')
2604 r.close()
2605
2606 r, w = self.Pipe(duplex=False)
2607 conn.send(r)
2608 r.close()
2609 w.send('foobar')
2610 w.close()
2611 self.assertEqual(conn.recv(), 'foobar'*2)
2612
Benjamin Petersone711caf2008-06-11 16:44:04 +00002613#
2614#
2615#
2616
2617class _TestHeap(BaseTestCase):
2618
2619 ALLOWED_TYPES = ('processes',)
2620
2621 def test_heap(self):
2622 iterations = 5000
2623 maxblocks = 50
2624 blocks = []
2625
2626 # create and destroy lots of blocks of different sizes
2627 for i in range(iterations):
2628 size = int(random.lognormvariate(0, 1) * 1000)
2629 b = multiprocessing.heap.BufferWrapper(size)
2630 blocks.append(b)
2631 if len(blocks) > maxblocks:
2632 i = random.randrange(maxblocks)
2633 del blocks[i]
2634
2635 # get the heap object
2636 heap = multiprocessing.heap.BufferWrapper._heap
2637
2638 # verify the state of the heap
2639 all = []
2640 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002641 heap._lock.acquire()
2642 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002643 for L in list(heap._len_to_seq.values()):
2644 for arena, start, stop in L:
2645 all.append((heap._arenas.index(arena), start, stop,
2646 stop-start, 'free'))
2647 for arena, start, stop in heap._allocated_blocks:
2648 all.append((heap._arenas.index(arena), start, stop,
2649 stop-start, 'occupied'))
2650 occupied += (stop-start)
2651
2652 all.sort()
2653
2654 for i in range(len(all)-1):
2655 (arena, start, stop) = all[i][:3]
2656 (narena, nstart, nstop) = all[i+1][:3]
2657 self.assertTrue((arena != narena and nstart == 0) or
2658 (stop == nstart))
2659
Charles-François Natali778db492011-07-02 14:35:49 +02002660 def test_free_from_gc(self):
2661 # Check that freeing of blocks by the garbage collector doesn't deadlock
2662 # (issue #12352).
2663 # Make sure the GC is enabled, and set lower collection thresholds to
2664 # make collections more frequent (and increase the probability of
2665 # deadlock).
2666 if not gc.isenabled():
2667 gc.enable()
2668 self.addCleanup(gc.disable)
2669 thresholds = gc.get_threshold()
2670 self.addCleanup(gc.set_threshold, *thresholds)
2671 gc.set_threshold(10)
2672
2673 # perform numerous block allocations, with cyclic references to make
2674 # sure objects are collected asynchronously by the gc
2675 for i in range(5000):
2676 a = multiprocessing.heap.BufferWrapper(1)
2677 b = multiprocessing.heap.BufferWrapper(1)
2678 # circular references
2679 a.buddy = b
2680 b.buddy = a
2681
Benjamin Petersone711caf2008-06-11 16:44:04 +00002682#
2683#
2684#
2685
Benjamin Petersone711caf2008-06-11 16:44:04 +00002686class _Foo(Structure):
2687 _fields_ = [
2688 ('x', c_int),
2689 ('y', c_double)
2690 ]
2691
2692class _TestSharedCTypes(BaseTestCase):
2693
2694 ALLOWED_TYPES = ('processes',)
2695
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002696 def setUp(self):
2697 if not HAS_SHAREDCTYPES:
2698 self.skipTest("requires multiprocessing.sharedctypes")
2699
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002700 @classmethod
2701 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002702 x.value *= 2
2703 y.value *= 2
2704 foo.x *= 2
2705 foo.y *= 2
2706 string.value *= 2
2707 for i in range(len(arr)):
2708 arr[i] *= 2
2709
2710 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002711 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002712 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002713 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002714 arr = self.Array('d', list(range(10)), lock=lock)
2715 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002716 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002717
2718 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002719 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002720 p.start()
2721 p.join()
2722
2723 self.assertEqual(x.value, 14)
2724 self.assertAlmostEqual(y.value, 2.0/3.0)
2725 self.assertEqual(foo.x, 6)
2726 self.assertAlmostEqual(foo.y, 4.0)
2727 for i in range(10):
2728 self.assertAlmostEqual(arr[i], i*2)
2729 self.assertEqual(string.value, latin('hellohello'))
2730
2731 def test_synchronize(self):
2732 self.test_sharedctypes(lock=True)
2733
2734 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002735 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002736 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002737 foo.x = 0
2738 foo.y = 0
2739 self.assertEqual(bar.x, 2)
2740 self.assertAlmostEqual(bar.y, 5.0)
2741
2742#
2743#
2744#
2745
2746class _TestFinalize(BaseTestCase):
2747
2748 ALLOWED_TYPES = ('processes',)
2749
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002750 @classmethod
2751 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002752 class Foo(object):
2753 pass
2754
2755 a = Foo()
2756 util.Finalize(a, conn.send, args=('a',))
2757 del a # triggers callback for a
2758
2759 b = Foo()
2760 close_b = util.Finalize(b, conn.send, args=('b',))
2761 close_b() # triggers callback for b
2762 close_b() # does nothing because callback has already been called
2763 del b # does nothing because callback has already been called
2764
2765 c = Foo()
2766 util.Finalize(c, conn.send, args=('c',))
2767
2768 d10 = Foo()
2769 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2770
2771 d01 = Foo()
2772 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2773 d02 = Foo()
2774 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2775 d03 = Foo()
2776 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2777
2778 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2779
2780 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2781
Ezio Melotti13925002011-03-16 11:05:33 +02002782 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002783 # garbage collecting locals
2784 util._exit_function()
2785 conn.close()
2786 os._exit(0)
2787
2788 def test_finalize(self):
2789 conn, child_conn = self.Pipe()
2790
2791 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002792 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002793 p.start()
2794 p.join()
2795
2796 result = [obj for obj in iter(conn.recv, 'STOP')]
2797 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2798
2799#
2800# Test that from ... import * works for each module
2801#
2802
2803class _TestImportStar(BaseTestCase):
2804
2805 ALLOWED_TYPES = ('processes',)
2806
2807 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002808 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002809 'multiprocessing', 'multiprocessing.connection',
2810 'multiprocessing.heap', 'multiprocessing.managers',
2811 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002812 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002813 ]
2814
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002815 if HAS_REDUCTION:
2816 modules.append('multiprocessing.reduction')
2817
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002818 if c_int is not None:
2819 # This module requires _ctypes
2820 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002821
2822 for name in modules:
2823 __import__(name)
2824 mod = sys.modules[name]
2825
2826 for attr in getattr(mod, '__all__', ()):
2827 self.assertTrue(
2828 hasattr(mod, attr),
2829 '%r does not have attribute %r' % (mod, attr)
2830 )
2831
2832#
2833# Quick test that logging works -- does not test logging output
2834#
2835
2836class _TestLogging(BaseTestCase):
2837
2838 ALLOWED_TYPES = ('processes',)
2839
2840 def test_enable_logging(self):
2841 logger = multiprocessing.get_logger()
2842 logger.setLevel(util.SUBWARNING)
2843 self.assertTrue(logger is not None)
2844 logger.debug('this will not be printed')
2845 logger.info('nor will this')
2846 logger.setLevel(LOG_LEVEL)
2847
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002848 @classmethod
2849 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002850 logger = multiprocessing.get_logger()
2851 conn.send(logger.getEffectiveLevel())
2852
2853 def test_level(self):
2854 LEVEL1 = 32
2855 LEVEL2 = 37
2856
2857 logger = multiprocessing.get_logger()
2858 root_logger = logging.getLogger()
2859 root_level = root_logger.level
2860
2861 reader, writer = multiprocessing.Pipe(duplex=False)
2862
2863 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002864 p = self.Process(target=self._test_level, args=(writer,))
2865 p.daemon = True
2866 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002867 self.assertEqual(LEVEL1, reader.recv())
2868
2869 logger.setLevel(logging.NOTSET)
2870 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002871 p = self.Process(target=self._test_level, args=(writer,))
2872 p.daemon = True
2873 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002874 self.assertEqual(LEVEL2, reader.recv())
2875
2876 root_logger.setLevel(root_level)
2877 logger.setLevel(level=LOG_LEVEL)
2878
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002879
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002880# class _TestLoggingProcessName(BaseTestCase):
2881#
2882# def handle(self, record):
2883# assert record.processName == multiprocessing.current_process().name
2884# self.__handled = True
2885#
2886# def test_logging(self):
2887# handler = logging.Handler()
2888# handler.handle = self.handle
2889# self.__handled = False
2890# # Bypass getLogger() and side-effects
2891# logger = logging.getLoggerClass()(
2892# 'multiprocessing.test.TestLoggingProcessName')
2893# logger.addHandler(handler)
2894# logger.propagate = False
2895#
2896# logger.warn('foo')
2897# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002898
Benjamin Petersone711caf2008-06-11 16:44:04 +00002899#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002900# Check that Process.join() retries if os.waitpid() fails with EINTR
2901#
2902
2903class _TestPollEintr(BaseTestCase):
2904
2905 ALLOWED_TYPES = ('processes',)
2906
2907 @classmethod
2908 def _killer(cls, pid):
2909 time.sleep(0.5)
2910 os.kill(pid, signal.SIGUSR1)
2911
2912 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2913 def test_poll_eintr(self):
2914 got_signal = [False]
2915 def record(*args):
2916 got_signal[0] = True
2917 pid = os.getpid()
2918 oldhandler = signal.signal(signal.SIGUSR1, record)
2919 try:
2920 killer = self.Process(target=self._killer, args=(pid,))
2921 killer.start()
2922 p = self.Process(target=time.sleep, args=(1,))
2923 p.start()
2924 p.join()
2925 self.assertTrue(got_signal[0])
2926 self.assertEqual(p.exitcode, 0)
2927 killer.join()
2928 finally:
2929 signal.signal(signal.SIGUSR1, oldhandler)
2930
2931#
Jesse Noller6214edd2009-01-19 16:23:53 +00002932# Test to verify handle verification, see issue 3321
2933#
2934
2935class TestInvalidHandle(unittest.TestCase):
2936
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002937 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002938 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002939 conn = multiprocessing.connection.Connection(44977608)
2940 try:
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002941 self.assertRaises((ValueError, OSError), conn.poll)
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002942 finally:
2943 # Hack private attribute _handle to avoid printing an error
2944 # in conn.__del__
2945 conn._handle = None
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002946 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002947 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002948
Jesse Noller6214edd2009-01-19 16:23:53 +00002949#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002950# Functions used to create test cases from the base ones in this module
2951#
2952
Benjamin Petersone711caf2008-06-11 16:44:04 +00002953def create_test_cases(Mixin, type):
2954 result = {}
2955 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002956 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002957 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002958
2959 for name in list(glob.keys()):
2960 if name.startswith('_Test'):
2961 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002962 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002963 if type in base.ALLOWED_TYPES:
2964 newname = 'With' + Type + name[1:]
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002965 class Temp(base, Mixin, unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002966 pass
2967 result[newname] = Temp
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002968 Temp.__name__ = Temp.__qualname__ = newname
Benjamin Petersone711caf2008-06-11 16:44:04 +00002969 Temp.__module__ = Mixin.__module__
2970 return result
2971
2972#
2973# Create test cases
2974#
2975
2976class ProcessesMixin(object):
2977 TYPE = 'processes'
2978 Process = multiprocessing.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002979 connection = multiprocessing.connection
2980 current_process = staticmethod(multiprocessing.current_process)
2981 active_children = staticmethod(multiprocessing.active_children)
2982 Pool = staticmethod(multiprocessing.Pool)
2983 Pipe = staticmethod(multiprocessing.Pipe)
2984 Queue = staticmethod(multiprocessing.Queue)
2985 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2986 Lock = staticmethod(multiprocessing.Lock)
2987 RLock = staticmethod(multiprocessing.RLock)
2988 Semaphore = staticmethod(multiprocessing.Semaphore)
2989 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2990 Condition = staticmethod(multiprocessing.Condition)
2991 Event = staticmethod(multiprocessing.Event)
2992 Barrier = staticmethod(multiprocessing.Barrier)
2993 Value = staticmethod(multiprocessing.Value)
2994 Array = staticmethod(multiprocessing.Array)
2995 RawValue = staticmethod(multiprocessing.RawValue)
2996 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002997
2998testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2999globals().update(testcases_processes)
3000
3001
3002class ManagerMixin(object):
3003 TYPE = 'manager'
3004 Process = multiprocessing.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003005 Queue = property(operator.attrgetter('manager.Queue'))
3006 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3007 Lock = property(operator.attrgetter('manager.Lock'))
3008 RLock = property(operator.attrgetter('manager.RLock'))
3009 Semaphore = property(operator.attrgetter('manager.Semaphore'))
3010 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3011 Condition = property(operator.attrgetter('manager.Condition'))
3012 Event = property(operator.attrgetter('manager.Event'))
3013 Barrier = property(operator.attrgetter('manager.Barrier'))
3014 Value = property(operator.attrgetter('manager.Value'))
3015 Array = property(operator.attrgetter('manager.Array'))
3016 list = property(operator.attrgetter('manager.list'))
3017 dict = property(operator.attrgetter('manager.dict'))
3018 Namespace = property(operator.attrgetter('manager.Namespace'))
3019
3020 @classmethod
3021 def Pool(cls, *args, **kwds):
3022 return cls.manager.Pool(*args, **kwds)
3023
3024 @classmethod
3025 def setUpClass(cls):
3026 cls.manager = multiprocessing.Manager()
3027
3028 @classmethod
3029 def tearDownClass(cls):
3030 # only the manager process should be returned by active_children()
3031 # but this can take a bit on slow machines, so wait a few seconds
3032 # if there are other children too (see #17395)
3033 t = 0.01
3034 while len(multiprocessing.active_children()) > 1 and t < 5:
3035 time.sleep(t)
3036 t *= 2
3037 gc.collect() # do garbage collection
3038 if cls.manager._number_of_objects() != 0:
3039 # This is not really an error since some tests do not
3040 # ensure that all processes which hold a reference to a
3041 # managed object have been joined.
3042 print('Shared objects which still exist at manager shutdown:')
3043 print(cls.manager._debug_info())
3044 cls.manager.shutdown()
3045 cls.manager.join()
3046 cls.manager = None
Benjamin Petersone711caf2008-06-11 16:44:04 +00003047
3048testcases_manager = create_test_cases(ManagerMixin, type='manager')
3049globals().update(testcases_manager)
3050
3051
3052class ThreadsMixin(object):
3053 TYPE = 'threads'
3054 Process = multiprocessing.dummy.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003055 connection = multiprocessing.dummy.connection
3056 current_process = staticmethod(multiprocessing.dummy.current_process)
3057 active_children = staticmethod(multiprocessing.dummy.active_children)
3058 Pool = staticmethod(multiprocessing.Pool)
3059 Pipe = staticmethod(multiprocessing.dummy.Pipe)
3060 Queue = staticmethod(multiprocessing.dummy.Queue)
3061 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3062 Lock = staticmethod(multiprocessing.dummy.Lock)
3063 RLock = staticmethod(multiprocessing.dummy.RLock)
3064 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3065 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3066 Condition = staticmethod(multiprocessing.dummy.Condition)
3067 Event = staticmethod(multiprocessing.dummy.Event)
3068 Barrier = staticmethod(multiprocessing.dummy.Barrier)
3069 Value = staticmethod(multiprocessing.dummy.Value)
3070 Array = staticmethod(multiprocessing.dummy.Array)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003071
3072testcases_threads = create_test_cases(ThreadsMixin, type='threads')
3073globals().update(testcases_threads)
3074
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003075
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003076class OtherTest(unittest.TestCase):
3077 # TODO: add more tests for deliver/answer challenge.
3078 def test_deliver_challenge_auth_failure(self):
3079 class _FakeConnection(object):
3080 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003081 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003082 def send_bytes(self, data):
3083 pass
3084 self.assertRaises(multiprocessing.AuthenticationError,
3085 multiprocessing.connection.deliver_challenge,
3086 _FakeConnection(), b'abc')
3087
3088 def test_answer_challenge_auth_failure(self):
3089 class _FakeConnection(object):
3090 def __init__(self):
3091 self.count = 0
3092 def recv_bytes(self, size):
3093 self.count += 1
3094 if self.count == 1:
3095 return multiprocessing.connection.CHALLENGE
3096 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003097 return b'something bogus'
3098 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003099 def send_bytes(self, data):
3100 pass
3101 self.assertRaises(multiprocessing.AuthenticationError,
3102 multiprocessing.connection.answer_challenge,
3103 _FakeConnection(), b'abc')
3104
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003105#
3106# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3107#
3108
3109def initializer(ns):
3110 ns.test += 1
3111
3112class TestInitializers(unittest.TestCase):
3113 def setUp(self):
3114 self.mgr = multiprocessing.Manager()
3115 self.ns = self.mgr.Namespace()
3116 self.ns.test = 0
3117
3118 def tearDown(self):
3119 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003120 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003121
3122 def test_manager_initializer(self):
3123 m = multiprocessing.managers.SyncManager()
3124 self.assertRaises(TypeError, m.start, 1)
3125 m.start(initializer, (self.ns,))
3126 self.assertEqual(self.ns.test, 1)
3127 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003128 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003129
3130 def test_pool_initializer(self):
3131 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3132 p = multiprocessing.Pool(1, initializer, (self.ns,))
3133 p.close()
3134 p.join()
3135 self.assertEqual(self.ns.test, 1)
3136
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003137#
3138# Issue 5155, 5313, 5331: Test process in processes
3139# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3140#
3141
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003142def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003143 try:
3144 item = q.get(block=False)
3145 except pyqueue.Empty:
3146 pass
3147
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003148def _test_process(q):
3149 queue = multiprocessing.Queue()
3150 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3151 subProc.daemon = True
3152 subProc.start()
3153 subProc.join()
3154
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003155def _afunc(x):
3156 return x*x
3157
3158def pool_in_process():
3159 pool = multiprocessing.Pool(processes=4)
3160 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003161 pool.close()
3162 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003163
3164class _file_like(object):
3165 def __init__(self, delegate):
3166 self._delegate = delegate
3167 self._pid = None
3168
3169 @property
3170 def cache(self):
3171 pid = os.getpid()
3172 # There are no race conditions since fork keeps only the running thread
3173 if pid != self._pid:
3174 self._pid = pid
3175 self._cache = []
3176 return self._cache
3177
3178 def write(self, data):
3179 self.cache.append(data)
3180
3181 def flush(self):
3182 self._delegate.write(''.join(self.cache))
3183 self._cache = []
3184
3185class TestStdinBadfiledescriptor(unittest.TestCase):
3186
3187 def test_queue_in_process(self):
3188 queue = multiprocessing.Queue()
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003189 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003190 proc.start()
3191 proc.join()
3192
3193 def test_pool_in_process(self):
3194 p = multiprocessing.Process(target=pool_in_process)
3195 p.start()
3196 p.join()
3197
3198 def test_flushing(self):
3199 sio = io.StringIO()
3200 flike = _file_like(sio)
3201 flike.write('foo')
3202 proc = multiprocessing.Process(target=lambda: flike.flush())
3203 flike.flush()
3204 assert sio.getvalue() == 'foo'
3205
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003206
3207class TestWait(unittest.TestCase):
3208
3209 @classmethod
3210 def _child_test_wait(cls, w, slow):
3211 for i in range(10):
3212 if slow:
3213 time.sleep(random.random()*0.1)
3214 w.send((i, os.getpid()))
3215 w.close()
3216
3217 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003218 from multiprocessing.connection import wait
3219 readers = []
3220 procs = []
3221 messages = []
3222
3223 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003224 r, w = multiprocessing.Pipe(duplex=False)
3225 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003226 p.daemon = True
3227 p.start()
3228 w.close()
3229 readers.append(r)
3230 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003231 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003232
3233 while readers:
3234 for r in wait(readers):
3235 try:
3236 msg = r.recv()
3237 except EOFError:
3238 readers.remove(r)
3239 r.close()
3240 else:
3241 messages.append(msg)
3242
3243 messages.sort()
3244 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3245 self.assertEqual(messages, expected)
3246
3247 @classmethod
3248 def _child_test_wait_socket(cls, address, slow):
3249 s = socket.socket()
3250 s.connect(address)
3251 for i in range(10):
3252 if slow:
3253 time.sleep(random.random()*0.1)
3254 s.sendall(('%s\n' % i).encode('ascii'))
3255 s.close()
3256
3257 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003258 from multiprocessing.connection import wait
3259 l = socket.socket()
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02003260 l.bind((test.support.HOST, 0))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003261 l.listen(4)
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02003262 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003263 readers = []
3264 procs = []
3265 dic = {}
3266
3267 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003268 p = multiprocessing.Process(target=self._child_test_wait_socket,
3269 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003270 p.daemon = True
3271 p.start()
3272 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003273 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003274
3275 for i in range(4):
3276 r, _ = l.accept()
3277 readers.append(r)
3278 dic[r] = []
3279 l.close()
3280
3281 while readers:
3282 for r in wait(readers):
3283 msg = r.recv(32)
3284 if not msg:
3285 readers.remove(r)
3286 r.close()
3287 else:
3288 dic[r].append(msg)
3289
3290 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3291 for v in dic.values():
3292 self.assertEqual(b''.join(v), expected)
3293
3294 def test_wait_slow(self):
3295 self.test_wait(True)
3296
3297 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003298 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003299
3300 def test_wait_timeout(self):
3301 from multiprocessing.connection import wait
3302
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003303 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003304 a, b = multiprocessing.Pipe()
3305
3306 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003307 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003308 delta = time.time() - start
3309
3310 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003311 self.assertLess(delta, expected * 2)
3312 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003313
3314 b.send(None)
3315
3316 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003317 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003318 delta = time.time() - start
3319
3320 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003321 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003322
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003323 @classmethod
3324 def signal_and_sleep(cls, sem, period):
3325 sem.release()
3326 time.sleep(period)
3327
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003328 def test_wait_integer(self):
3329 from multiprocessing.connection import wait
3330
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003331 expected = 3
Giampaolo Rodola'67da8942013-01-14 02:24:25 +01003332 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003333 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003334 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003335 p = multiprocessing.Process(target=self.signal_and_sleep,
3336 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003337
3338 p.start()
3339 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003340 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003341
3342 start = time.time()
3343 res = wait([a, p.sentinel, b], expected + 20)
3344 delta = time.time() - start
3345
3346 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003347 self.assertLess(delta, expected + 2)
3348 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003349
3350 a.send(None)
3351
3352 start = time.time()
3353 res = wait([a, p.sentinel, b], 20)
3354 delta = time.time() - start
3355
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003356 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003357 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003358
3359 b.send(None)
3360
3361 start = time.time()
3362 res = wait([a, p.sentinel, b], 20)
3363 delta = time.time() - start
3364
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003365 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003366 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003367
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003368 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003369 p.join()
3370
Richard Oudkerk59d54042012-05-10 16:11:12 +01003371 def test_neg_timeout(self):
3372 from multiprocessing.connection import wait
3373 a, b = multiprocessing.Pipe()
3374 t = time.time()
3375 res = wait([a], timeout=-1)
3376 t = time.time() - t
3377 self.assertEqual(res, [])
3378 self.assertLess(t, 1)
3379 a.close()
3380 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003381
Antoine Pitrou709176f2012-04-01 17:19:09 +02003382#
3383# Issue 14151: Test invalid family on invalid environment
3384#
3385
3386class TestInvalidFamily(unittest.TestCase):
3387
3388 @unittest.skipIf(WIN32, "skipped on Windows")
3389 def test_invalid_family(self):
3390 with self.assertRaises(ValueError):
3391 multiprocessing.connection.Listener(r'\\.\test')
3392
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003393 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3394 def test_invalid_family_win32(self):
3395 with self.assertRaises(ValueError):
3396 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003397
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003398#
3399# Issue 12098: check sys.flags of child matches that for parent
3400#
3401
3402class TestFlags(unittest.TestCase):
3403 @classmethod
3404 def run_in_grandchild(cls, conn):
3405 conn.send(tuple(sys.flags))
3406
3407 @classmethod
3408 def run_in_child(cls):
3409 import json
3410 r, w = multiprocessing.Pipe(duplex=False)
3411 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3412 p.start()
3413 grandchild_flags = r.recv()
3414 p.join()
3415 r.close()
3416 w.close()
3417 flags = (tuple(sys.flags), grandchild_flags)
3418 print(json.dumps(flags))
3419
3420 def test_flags(self):
3421 import json, subprocess
3422 # start child process using unusual flags
3423 prog = ('from test.test_multiprocessing import TestFlags; ' +
3424 'TestFlags.run_in_child()')
3425 data = subprocess.check_output(
3426 [sys.executable, '-E', '-S', '-O', '-c', prog])
3427 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3428 self.assertEqual(child_flags, grandchild_flags)
3429
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003430#
3431# Test interaction with socket timeouts - see Issue #6056
3432#
3433
3434class TestTimeouts(unittest.TestCase):
3435 @classmethod
3436 def _test_timeout(cls, child, address):
3437 time.sleep(1)
3438 child.send(123)
3439 child.close()
3440 conn = multiprocessing.connection.Client(address)
3441 conn.send(456)
3442 conn.close()
3443
3444 def test_timeout(self):
3445 old_timeout = socket.getdefaulttimeout()
3446 try:
3447 socket.setdefaulttimeout(0.1)
3448 parent, child = multiprocessing.Pipe(duplex=True)
3449 l = multiprocessing.connection.Listener(family='AF_INET')
3450 p = multiprocessing.Process(target=self._test_timeout,
3451 args=(child, l.address))
3452 p.start()
3453 child.close()
3454 self.assertEqual(parent.recv(), 123)
3455 parent.close()
3456 conn = l.accept()
3457 self.assertEqual(conn.recv(), 456)
3458 conn.close()
3459 l.close()
3460 p.join(10)
3461 finally:
3462 socket.setdefaulttimeout(old_timeout)
3463
Richard Oudkerke88a2442012-08-14 11:41:32 +01003464#
3465# Test what happens with no "if __name__ == '__main__'"
3466#
3467
3468class TestNoForkBomb(unittest.TestCase):
3469 def test_noforkbomb(self):
3470 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3471 if WIN32:
3472 rc, out, err = test.script_helper.assert_python_failure(name)
3473 self.assertEqual('', out.decode('ascii'))
3474 self.assertIn('RuntimeError', err.decode('ascii'))
3475 else:
3476 rc, out, err = test.script_helper.assert_python_ok(name)
3477 self.assertEqual('123', out.decode('ascii').rstrip())
3478 self.assertEqual('', err.decode('ascii'))
3479
3480#
Richard Oudkerk409c3132013-04-17 20:58:00 +01003481# Issue #17555: ForkAwareThreadLock
3482#
3483
3484class TestForkAwareThreadLock(unittest.TestCase):
3485 # We recurisvely start processes. Issue #17555 meant that the
3486 # after fork registry would get duplicate entries for the same
3487 # lock. The size of the registry at generation n was ~2**n.
3488
3489 @classmethod
3490 def child(cls, n, conn):
3491 if n > 1:
3492 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3493 p.start()
3494 p.join()
3495 else:
3496 conn.send(len(util._afterfork_registry))
3497 conn.close()
3498
3499 def test_lock(self):
3500 r, w = multiprocessing.Pipe(False)
3501 l = util.ForkAwareThreadLock()
3502 old_size = len(util._afterfork_registry)
3503 p = multiprocessing.Process(target=self.child, args=(5, w))
3504 p.start()
3505 new_size = r.recv()
3506 p.join()
3507 self.assertLessEqual(new_size, old_size)
3508
3509#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003510# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3511#
3512
3513class TestIgnoreEINTR(unittest.TestCase):
3514
3515 @classmethod
3516 def _test_ignore(cls, conn):
3517 def handler(signum, frame):
3518 pass
3519 signal.signal(signal.SIGUSR1, handler)
3520 conn.send('ready')
3521 x = conn.recv()
3522 conn.send(x)
3523 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
3524
3525 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3526 def test_ignore(self):
3527 conn, child_conn = multiprocessing.Pipe()
3528 try:
3529 p = multiprocessing.Process(target=self._test_ignore,
3530 args=(child_conn,))
3531 p.daemon = True
3532 p.start()
3533 child_conn.close()
3534 self.assertEqual(conn.recv(), 'ready')
3535 time.sleep(0.1)
3536 os.kill(p.pid, signal.SIGUSR1)
3537 time.sleep(0.1)
3538 conn.send(1234)
3539 self.assertEqual(conn.recv(), 1234)
3540 time.sleep(0.1)
3541 os.kill(p.pid, signal.SIGUSR1)
3542 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3543 time.sleep(0.1)
3544 p.join()
3545 finally:
3546 conn.close()
3547
3548 @classmethod
3549 def _test_ignore_listener(cls, conn):
3550 def handler(signum, frame):
3551 pass
3552 signal.signal(signal.SIGUSR1, handler)
3553 l = multiprocessing.connection.Listener()
3554 conn.send(l.address)
3555 a = l.accept()
3556 a.send('welcome')
3557
3558 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3559 def test_ignore_listener(self):
3560 conn, child_conn = multiprocessing.Pipe()
3561 try:
3562 p = multiprocessing.Process(target=self._test_ignore_listener,
3563 args=(child_conn,))
3564 p.daemon = True
3565 p.start()
3566 child_conn.close()
3567 address = conn.recv()
3568 time.sleep(0.1)
3569 os.kill(p.pid, signal.SIGUSR1)
3570 time.sleep(0.1)
3571 client = multiprocessing.connection.Client(address)
3572 self.assertEqual(client.recv(), 'welcome')
3573 p.join()
3574 finally:
3575 conn.close()
3576
3577#
Richard Oudkerke88a2442012-08-14 11:41:32 +01003578#
3579#
3580
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003581def setUpModule():
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003582 if sys.platform.startswith("linux"):
3583 try:
3584 lock = multiprocessing.RLock()
3585 except OSError:
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003586 raise unittest.SkipTest("OSError raises on RLock creation, "
3587 "see issue 3111!")
Charles-François Natali221ef672011-11-22 18:55:22 +01003588 check_enough_semaphores()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003589 util.get_temp_dir() # creates temp directory for use by all processes
Benjamin Petersone711caf2008-06-11 16:44:04 +00003590 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3591
Benjamin Petersone711caf2008-06-11 16:44:04 +00003592
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003593def tearDownModule():
3594 # pause a bit so we don't get warning about dangling threads/processes
3595 time.sleep(0.5)
3596
3597
Benjamin Petersone711caf2008-06-11 16:44:04 +00003598if __name__ == '__main__':
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003599 unittest.main()