blob: d5582aab82047412f2f34951c0f6b40c08b16a0c [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkd15642e2013-07-16 15:33:41 +010022import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000023import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010024import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000025
Benjamin Petersone5384b02008-10-04 22:00:42 +000026
R. David Murraya21e4ca2009-03-31 23:16:50 +000027# Skip tests if _multiprocessing wasn't built.
28_multiprocessing = test.support.import_module('_multiprocessing')
29# Skip tests if sem_open implementation is broken.
30test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000031# import threading after _multiprocessing to raise a more revelant error
32# message: "No module named _multiprocessing". _multiprocessing is not compiled
33# without thread support.
34import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000035
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.dummy
37import multiprocessing.connection
38import multiprocessing.managers
39import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Charles-François Natalibc8f0822011-09-20 20:36:51 +020042from multiprocessing import util
43
44try:
45 from multiprocessing import reduction
46 HAS_REDUCTION = True
47except ImportError:
48 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Brian Curtinafa88b52010-10-07 01:12:19 +000050try:
51 from multiprocessing.sharedctypes import Value, copy
52 HAS_SHAREDCTYPES = True
53except ImportError:
54 HAS_SHAREDCTYPES = False
55
Antoine Pitroubcb39d42011-08-23 19:46:22 +020056try:
57 import msvcrt
58except ImportError:
59 msvcrt = None
60
Benjamin Petersone711caf2008-06-11 16:44:04 +000061#
62#
63#
64
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000065def latin(s):
66 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000067
Benjamin Petersone711caf2008-06-11 16:44:04 +000068#
69# Constants
70#
71
72LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000073#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000074
75DELTA = 0.1
76CHECK_TIMINGS = False # making true makes tests take a lot longer
77 # and can sometimes cause some non-serious
78 # failures because some calls block a bit
79 # longer than expected
80if CHECK_TIMINGS:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
82else:
83 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
84
85HAVE_GETVALUE = not getattr(_multiprocessing,
86 'HAVE_BROKEN_SEM_GETVALUE', False)
87
Jesse Noller6214edd2009-01-19 16:23:53 +000088WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020091
Richard Oudkerk59d54042012-05-10 16:11:12 +010092def wait_for_handle(handle, timeout):
93 if timeout is not None and timeout < 0.0:
94 timeout = None
95 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000096
Antoine Pitroubcb39d42011-08-23 19:46:22 +020097try:
98 MAXFD = os.sysconf("SC_OPEN_MAX")
99except:
100 MAXFD = 256
101
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000103# Some tests require ctypes
104#
105
106try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000107 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000108except ImportError:
109 Structure = object
110 c_int = c_double = None
111
Charles-François Natali221ef672011-11-22 18:55:22 +0100112
113def check_enough_semaphores():
114 """Check that the system supports enough semaphores to run the test."""
115 # minimum number of semaphores available according to POSIX
116 nsems_min = 256
117 try:
118 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
119 except (AttributeError, ValueError):
120 # sysconf not available or setting not available
121 return
122 if nsems == -1 or nsems >= nsems_min:
123 return
124 raise unittest.SkipTest("The OS doesn't support enough semaphores "
125 "to run the test (required: %d)." % nsems_min)
126
127
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000128#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129# Creates a wrapper for a function which records the time it takes to finish
130#
131
132class TimingWrapper(object):
133
134 def __init__(self, func):
135 self.func = func
136 self.elapsed = None
137
138 def __call__(self, *args, **kwds):
139 t = time.time()
140 try:
141 return self.func(*args, **kwds)
142 finally:
143 self.elapsed = time.time() - t
144
145#
146# Base class for test cases
147#
148
149class BaseTestCase(object):
150
151 ALLOWED_TYPES = ('processes', 'manager', 'threads')
152
153 def assertTimingAlmostEqual(self, a, b):
154 if CHECK_TIMINGS:
155 self.assertAlmostEqual(a, b, 1)
156
157 def assertReturnsIfImplemented(self, value, func, *args):
158 try:
159 res = func(*args)
160 except NotImplementedError:
161 pass
162 else:
163 return self.assertEqual(value, res)
164
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000165 # For the sanity of Windows users, rather than crashing or freezing in
166 # multiple ways.
167 def __reduce__(self, *args):
168 raise NotImplementedError("shouldn't try to pickle a test case")
169
170 __reduce_ex__ = __reduce__
171
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172#
173# Return the value of a semaphore
174#
175
176def get_value(self):
177 try:
178 return self.get_value()
179 except AttributeError:
180 try:
181 return self._Semaphore__value
182 except AttributeError:
183 try:
184 return self._value
185 except AttributeError:
186 raise NotImplementedError
187
188#
189# Testcases
190#
191
192class _TestProcess(BaseTestCase):
193
194 ALLOWED_TYPES = ('processes', 'threads')
195
196 def test_current(self):
197 if self.TYPE == 'threads':
198 return
199
200 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000205 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEqual(current.ident, os.getpid())
208 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000210 def test_daemon_argument(self):
211 if self.TYPE == "threads":
212 return
213
214 # By default uses the current process's daemon flag.
215 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000216 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 proc1 = self.Process(target=self._test, daemon=True)
218 self.assertTrue(proc1.daemon)
219 proc2 = self.Process(target=self._test, daemon=False)
220 self.assertFalse(proc2.daemon)
221
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000222 @classmethod
223 def _test(cls, q, *args, **kwds):
224 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 q.put(args)
226 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230 q.put(current.pid)
231
232 def test_process(self):
233 q = self.Queue(1)
234 e = self.Event()
235 args = (q, 1, 2)
236 kwargs = {'hello':23, 'bye':2.54}
237 name = 'SomeProcess'
238 p = self.Process(
239 target=self._test, args=args, kwargs=kwargs, name=name
240 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242 current = self.current_process()
243
244 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(p.authkey, current.authkey)
246 self.assertEqual(p.is_alive(), False)
247 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000250 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 p.start()
253
Ezio Melottib3aedd42010-11-20 19:04:17 +0000254 self.assertEqual(p.exitcode, None)
255 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000256 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(q.get(), args[1:])
259 self.assertEqual(q.get(), kwargs)
260 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000262 self.assertEqual(q.get(), current.authkey)
263 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
265 p.join()
266
Ezio Melottib3aedd42010-11-20 19:04:17 +0000267 self.assertEqual(p.exitcode, 0)
268 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 @classmethod
272 def _test_terminate(cls):
Richard Oudkerk4f350792013-10-13 00:49:27 +0100273 time.sleep(100)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000274
275 def test_terminate(self):
276 if self.TYPE == 'threads':
277 return
278
279 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000280 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000281 p.start()
282
283 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000285 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286
Richard Oudkerk59d54042012-05-10 16:11:12 +0100287 join = TimingWrapper(p.join)
288
289 self.assertEqual(join(0), None)
290 self.assertTimingAlmostEqual(join.elapsed, 0.0)
291 self.assertEqual(p.is_alive(), True)
292
293 self.assertEqual(join(-1), None)
294 self.assertTimingAlmostEqual(join.elapsed, 0.0)
295 self.assertEqual(p.is_alive(), True)
296
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 p.terminate()
298
Richard Oudkerk4f350792013-10-13 00:49:27 +0100299 if hasattr(signal, 'alarm'):
300 def handler(*args):
Richard Oudkerkb46fe792013-10-15 16:48:51 +0100301 raise RuntimeError('join took too long: %s' % p)
Richard Oudkerk4f350792013-10-13 00:49:27 +0100302 old_handler = signal.signal(signal.SIGALRM, handler)
303 try:
304 signal.alarm(10)
305 self.assertEqual(join(), None)
306 signal.alarm(0)
307 finally:
308 signal.signal(signal.SIGALRM, old_handler)
309 else:
310 self.assertEqual(join(), None)
311
Benjamin Petersone711caf2008-06-11 16:44:04 +0000312 self.assertTimingAlmostEqual(join.elapsed, 0.0)
313
314 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000315 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000316
317 p.join()
318
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000319 # XXX sometimes get p.exitcode == 0 on Windows ...
320 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321
322 def test_cpu_count(self):
323 try:
324 cpus = multiprocessing.cpu_count()
325 except NotImplementedError:
326 cpus = 1
327 self.assertTrue(type(cpus) is int)
328 self.assertTrue(cpus >= 1)
329
330 def test_active_children(self):
331 self.assertEqual(type(self.active_children()), list)
332
333 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000334 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000335
Jesus Cea94f964f2011-09-09 20:26:57 +0200336 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000338 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339
340 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000341 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000342
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000343 @classmethod
344 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000345 from multiprocessing import forking
346 wconn.send(id)
347 if len(id) < 2:
348 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000349 p = cls.Process(
350 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000351 )
352 p.start()
353 p.join()
354
355 def test_recursion(self):
356 rconn, wconn = self.Pipe(duplex=False)
357 self._test_recursion(wconn, [])
358
359 time.sleep(DELTA)
360 result = []
361 while rconn.poll():
362 result.append(rconn.recv())
363
364 expected = [
365 [],
366 [0],
367 [0, 0],
368 [0, 1],
369 [1],
370 [1, 0],
371 [1, 1]
372 ]
373 self.assertEqual(result, expected)
374
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200375 @classmethod
376 def _test_sentinel(cls, event):
377 event.wait(10.0)
378
379 def test_sentinel(self):
380 if self.TYPE == "threads":
381 return
382 event = self.Event()
383 p = self.Process(target=self._test_sentinel, args=(event,))
384 with self.assertRaises(ValueError):
385 p.sentinel
386 p.start()
387 self.addCleanup(p.join)
388 sentinel = p.sentinel
389 self.assertIsInstance(sentinel, int)
390 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
391 event.set()
392 p.join()
393 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
394
Benjamin Petersone711caf2008-06-11 16:44:04 +0000395#
396#
397#
398
399class _UpperCaser(multiprocessing.Process):
400
401 def __init__(self):
402 multiprocessing.Process.__init__(self)
403 self.child_conn, self.parent_conn = multiprocessing.Pipe()
404
405 def run(self):
406 self.parent_conn.close()
407 for s in iter(self.child_conn.recv, None):
408 self.child_conn.send(s.upper())
409 self.child_conn.close()
410
411 def submit(self, s):
412 assert type(s) is str
413 self.parent_conn.send(s)
414 return self.parent_conn.recv()
415
416 def stop(self):
417 self.parent_conn.send(None)
418 self.parent_conn.close()
419 self.child_conn.close()
420
421class _TestSubclassingProcess(BaseTestCase):
422
423 ALLOWED_TYPES = ('processes',)
424
425 def test_subclassing(self):
426 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200427 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000428 uppercaser.start()
429 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
430 self.assertEqual(uppercaser.submit('world'), 'WORLD')
431 uppercaser.stop()
432 uppercaser.join()
433
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100434 def test_stderr_flush(self):
435 # sys.stderr is flushed at process shutdown (issue #13812)
436 if self.TYPE == "threads":
437 return
438
439 testfn = test.support.TESTFN
440 self.addCleanup(test.support.unlink, testfn)
441 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
442 proc.start()
443 proc.join()
444 with open(testfn, 'r') as f:
445 err = f.read()
446 # The whole traceback was printed
447 self.assertIn("ZeroDivisionError", err)
448 self.assertIn("test_multiprocessing.py", err)
449 self.assertIn("1/0 # MARKER", err)
450
451 @classmethod
452 def _test_stderr_flush(cls, testfn):
453 sys.stderr = open(testfn, 'w')
454 1/0 # MARKER
455
456
Richard Oudkerk29471de2012-06-06 19:04:57 +0100457 @classmethod
458 def _test_sys_exit(cls, reason, testfn):
459 sys.stderr = open(testfn, 'w')
460 sys.exit(reason)
461
462 def test_sys_exit(self):
463 # See Issue 13854
464 if self.TYPE == 'threads':
465 return
466
467 testfn = test.support.TESTFN
468 self.addCleanup(test.support.unlink, testfn)
469
470 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
471 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
472 p.daemon = True
473 p.start()
474 p.join(5)
475 self.assertEqual(p.exitcode, code)
476
477 with open(testfn, 'r') as f:
478 self.assertEqual(f.read().rstrip(), str(reason))
479
480 for reason in (True, False, 8):
481 p = self.Process(target=sys.exit, args=(reason,))
482 p.daemon = True
483 p.start()
484 p.join(5)
485 self.assertEqual(p.exitcode, reason)
486
Benjamin Petersone711caf2008-06-11 16:44:04 +0000487#
488#
489#
490
491def queue_empty(q):
492 if hasattr(q, 'empty'):
493 return q.empty()
494 else:
495 return q.qsize() == 0
496
497def queue_full(q, maxsize):
498 if hasattr(q, 'full'):
499 return q.full()
500 else:
501 return q.qsize() == maxsize
502
503
504class _TestQueue(BaseTestCase):
505
506
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000507 @classmethod
508 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000509 child_can_start.wait()
510 for i in range(6):
511 queue.get()
512 parent_can_continue.set()
513
514 def test_put(self):
515 MAXSIZE = 6
516 queue = self.Queue(maxsize=MAXSIZE)
517 child_can_start = self.Event()
518 parent_can_continue = self.Event()
519
520 proc = self.Process(
521 target=self._test_put,
522 args=(queue, child_can_start, parent_can_continue)
523 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000524 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000525 proc.start()
526
527 self.assertEqual(queue_empty(queue), True)
528 self.assertEqual(queue_full(queue, MAXSIZE), False)
529
530 queue.put(1)
531 queue.put(2, True)
532 queue.put(3, True, None)
533 queue.put(4, False)
534 queue.put(5, False, None)
535 queue.put_nowait(6)
536
537 # the values may be in buffer but not yet in pipe so sleep a bit
538 time.sleep(DELTA)
539
540 self.assertEqual(queue_empty(queue), False)
541 self.assertEqual(queue_full(queue, MAXSIZE), True)
542
543 put = TimingWrapper(queue.put)
544 put_nowait = TimingWrapper(queue.put_nowait)
545
546 self.assertRaises(pyqueue.Full, put, 7, False)
547 self.assertTimingAlmostEqual(put.elapsed, 0)
548
549 self.assertRaises(pyqueue.Full, put, 7, False, None)
550 self.assertTimingAlmostEqual(put.elapsed, 0)
551
552 self.assertRaises(pyqueue.Full, put_nowait, 7)
553 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
554
555 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
556 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
557
558 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
559 self.assertTimingAlmostEqual(put.elapsed, 0)
560
561 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
562 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
563
564 child_can_start.set()
565 parent_can_continue.wait()
566
567 self.assertEqual(queue_empty(queue), True)
568 self.assertEqual(queue_full(queue, MAXSIZE), False)
569
570 proc.join()
571
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000572 @classmethod
573 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000574 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000575 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000576 queue.put(2)
577 queue.put(3)
578 queue.put(4)
579 queue.put(5)
580 parent_can_continue.set()
581
582 def test_get(self):
583 queue = self.Queue()
584 child_can_start = self.Event()
585 parent_can_continue = self.Event()
586
587 proc = self.Process(
588 target=self._test_get,
589 args=(queue, child_can_start, parent_can_continue)
590 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000591 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 proc.start()
593
594 self.assertEqual(queue_empty(queue), True)
595
596 child_can_start.set()
597 parent_can_continue.wait()
598
599 time.sleep(DELTA)
600 self.assertEqual(queue_empty(queue), False)
601
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000602 # Hangs unexpectedly, remove for now
603 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000604 self.assertEqual(queue.get(True, None), 2)
605 self.assertEqual(queue.get(True), 3)
606 self.assertEqual(queue.get(timeout=1), 4)
607 self.assertEqual(queue.get_nowait(), 5)
608
609 self.assertEqual(queue_empty(queue), True)
610
611 get = TimingWrapper(queue.get)
612 get_nowait = TimingWrapper(queue.get_nowait)
613
614 self.assertRaises(pyqueue.Empty, get, False)
615 self.assertTimingAlmostEqual(get.elapsed, 0)
616
617 self.assertRaises(pyqueue.Empty, get, False, None)
618 self.assertTimingAlmostEqual(get.elapsed, 0)
619
620 self.assertRaises(pyqueue.Empty, get_nowait)
621 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
622
623 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
624 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
625
626 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
627 self.assertTimingAlmostEqual(get.elapsed, 0)
628
629 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
630 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
631
632 proc.join()
633
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000634 @classmethod
635 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000636 for i in range(10, 20):
637 queue.put(i)
638 # note that at this point the items may only be buffered, so the
639 # process cannot shutdown until the feeder thread has finished
640 # pushing items onto the pipe.
641
642 def test_fork(self):
643 # Old versions of Queue would fail to create a new feeder
644 # thread for a forked process if the original process had its
645 # own feeder thread. This test checks that this no longer
646 # happens.
647
648 queue = self.Queue()
649
650 # put items on queue so that main process starts a feeder thread
651 for i in range(10):
652 queue.put(i)
653
654 # wait to make sure thread starts before we fork a new process
655 time.sleep(DELTA)
656
657 # fork process
658 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200659 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000660 p.start()
661
662 # check that all expected items are in the queue
663 for i in range(20):
664 self.assertEqual(queue.get(), i)
665 self.assertRaises(pyqueue.Empty, queue.get, False)
666
667 p.join()
668
669 def test_qsize(self):
670 q = self.Queue()
671 try:
672 self.assertEqual(q.qsize(), 0)
673 except NotImplementedError:
674 return
675 q.put(1)
676 self.assertEqual(q.qsize(), 1)
677 q.put(5)
678 self.assertEqual(q.qsize(), 2)
679 q.get()
680 self.assertEqual(q.qsize(), 1)
681 q.get()
682 self.assertEqual(q.qsize(), 0)
683
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000684 @classmethod
685 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000686 for obj in iter(q.get, None):
687 time.sleep(DELTA)
688 q.task_done()
689
690 def test_task_done(self):
691 queue = self.JoinableQueue()
692
693 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000694 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000695
696 workers = [self.Process(target=self._test_task_done, args=(queue,))
697 for i in range(4)]
698
699 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200700 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000701 p.start()
702
703 for i in range(10):
704 queue.put(i)
705
706 queue.join()
707
708 for p in workers:
709 queue.put(None)
710
711 for p in workers:
712 p.join()
713
Giampaolo Rodola'b38897f2013-04-17 13:08:59 +0200714 def test_timeout(self):
715 q = multiprocessing.Queue()
716 start = time.time()
717 self.assertRaises(pyqueue.Empty, q.get, True, 0.2)
718 delta = time.time() - start
719 self.assertGreaterEqual(delta, 0.19)
720
Benjamin Petersone711caf2008-06-11 16:44:04 +0000721#
722#
723#
724
725class _TestLock(BaseTestCase):
726
727 def test_lock(self):
728 lock = self.Lock()
729 self.assertEqual(lock.acquire(), True)
730 self.assertEqual(lock.acquire(False), False)
731 self.assertEqual(lock.release(), None)
732 self.assertRaises((ValueError, threading.ThreadError), lock.release)
733
734 def test_rlock(self):
735 lock = self.RLock()
736 self.assertEqual(lock.acquire(), True)
737 self.assertEqual(lock.acquire(), True)
738 self.assertEqual(lock.acquire(), True)
739 self.assertEqual(lock.release(), None)
740 self.assertEqual(lock.release(), None)
741 self.assertEqual(lock.release(), None)
742 self.assertRaises((AssertionError, RuntimeError), lock.release)
743
Jesse Nollerf8d00852009-03-31 03:25:07 +0000744 def test_lock_context(self):
745 with self.Lock():
746 pass
747
Benjamin Petersone711caf2008-06-11 16:44:04 +0000748
749class _TestSemaphore(BaseTestCase):
750
751 def _test_semaphore(self, sem):
752 self.assertReturnsIfImplemented(2, get_value, sem)
753 self.assertEqual(sem.acquire(), True)
754 self.assertReturnsIfImplemented(1, get_value, sem)
755 self.assertEqual(sem.acquire(), True)
756 self.assertReturnsIfImplemented(0, get_value, sem)
757 self.assertEqual(sem.acquire(False), False)
758 self.assertReturnsIfImplemented(0, get_value, sem)
759 self.assertEqual(sem.release(), None)
760 self.assertReturnsIfImplemented(1, get_value, sem)
761 self.assertEqual(sem.release(), None)
762 self.assertReturnsIfImplemented(2, get_value, sem)
763
764 def test_semaphore(self):
765 sem = self.Semaphore(2)
766 self._test_semaphore(sem)
767 self.assertEqual(sem.release(), None)
768 self.assertReturnsIfImplemented(3, get_value, sem)
769 self.assertEqual(sem.release(), None)
770 self.assertReturnsIfImplemented(4, get_value, sem)
771
772 def test_bounded_semaphore(self):
773 sem = self.BoundedSemaphore(2)
774 self._test_semaphore(sem)
775 # Currently fails on OS/X
776 #if HAVE_GETVALUE:
777 # self.assertRaises(ValueError, sem.release)
778 # self.assertReturnsIfImplemented(2, get_value, sem)
779
780 def test_timeout(self):
781 if self.TYPE != 'processes':
782 return
783
784 sem = self.Semaphore(0)
785 acquire = TimingWrapper(sem.acquire)
786
787 self.assertEqual(acquire(False), False)
788 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
789
790 self.assertEqual(acquire(False, None), False)
791 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
792
793 self.assertEqual(acquire(False, TIMEOUT1), False)
794 self.assertTimingAlmostEqual(acquire.elapsed, 0)
795
796 self.assertEqual(acquire(True, TIMEOUT2), False)
797 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
798
799 self.assertEqual(acquire(timeout=TIMEOUT3), False)
800 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
801
802
803class _TestCondition(BaseTestCase):
804
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000805 @classmethod
806 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000807 cond.acquire()
808 sleeping.release()
809 cond.wait(timeout)
810 woken.release()
811 cond.release()
812
813 def check_invariant(self, cond):
814 # this is only supposed to succeed when there are no sleepers
815 if self.TYPE == 'processes':
816 try:
817 sleepers = (cond._sleeping_count.get_value() -
818 cond._woken_count.get_value())
819 self.assertEqual(sleepers, 0)
820 self.assertEqual(cond._wait_semaphore.get_value(), 0)
821 except NotImplementedError:
822 pass
823
824 def test_notify(self):
825 cond = self.Condition()
826 sleeping = self.Semaphore(0)
827 woken = self.Semaphore(0)
828
829 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000830 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 p.start()
832
833 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000834 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000835 p.start()
836
837 # wait for both children to start sleeping
838 sleeping.acquire()
839 sleeping.acquire()
840
841 # check no process/thread has woken up
842 time.sleep(DELTA)
843 self.assertReturnsIfImplemented(0, get_value, woken)
844
845 # wake up one process/thread
846 cond.acquire()
847 cond.notify()
848 cond.release()
849
850 # check one process/thread has woken up
851 time.sleep(DELTA)
852 self.assertReturnsIfImplemented(1, get_value, woken)
853
854 # wake up another
855 cond.acquire()
856 cond.notify()
857 cond.release()
858
859 # check other has woken up
860 time.sleep(DELTA)
861 self.assertReturnsIfImplemented(2, get_value, woken)
862
863 # check state is not mucked up
864 self.check_invariant(cond)
865 p.join()
866
867 def test_notify_all(self):
868 cond = self.Condition()
869 sleeping = self.Semaphore(0)
870 woken = self.Semaphore(0)
871
872 # start some threads/processes which will timeout
873 for i in range(3):
874 p = self.Process(target=self.f,
875 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000876 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000877 p.start()
878
879 t = threading.Thread(target=self.f,
880 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000881 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000882 t.start()
883
884 # wait for them all to sleep
885 for i in range(6):
886 sleeping.acquire()
887
888 # check they have all timed out
889 for i in range(6):
890 woken.acquire()
891 self.assertReturnsIfImplemented(0, get_value, woken)
892
893 # check state is not mucked up
894 self.check_invariant(cond)
895
896 # start some more threads/processes
897 for i in range(3):
898 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000899 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000900 p.start()
901
902 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000903 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000904 t.start()
905
906 # wait for them to all sleep
907 for i in range(6):
908 sleeping.acquire()
909
910 # check no process/thread has woken up
911 time.sleep(DELTA)
912 self.assertReturnsIfImplemented(0, get_value, woken)
913
914 # wake them all up
915 cond.acquire()
916 cond.notify_all()
917 cond.release()
918
919 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200920 for i in range(10):
921 try:
922 if get_value(woken) == 6:
923 break
924 except NotImplementedError:
925 break
926 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927 self.assertReturnsIfImplemented(6, get_value, woken)
928
929 # check state is not mucked up
930 self.check_invariant(cond)
931
932 def test_timeout(self):
933 cond = self.Condition()
934 wait = TimingWrapper(cond.wait)
935 cond.acquire()
936 res = wait(TIMEOUT1)
937 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000938 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000939 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
940
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200941 @classmethod
942 def _test_waitfor_f(cls, cond, state):
943 with cond:
944 state.value = 0
945 cond.notify()
946 result = cond.wait_for(lambda : state.value==4)
947 if not result or state.value != 4:
948 sys.exit(1)
949
950 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
951 def test_waitfor(self):
952 # based on test in test/lock_tests.py
953 cond = self.Condition()
954 state = self.Value('i', -1)
955
956 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
957 p.daemon = True
958 p.start()
959
960 with cond:
961 result = cond.wait_for(lambda : state.value==0)
962 self.assertTrue(result)
963 self.assertEqual(state.value, 0)
964
965 for i in range(4):
966 time.sleep(0.01)
967 with cond:
968 state.value += 1
969 cond.notify()
970
971 p.join(5)
972 self.assertFalse(p.is_alive())
973 self.assertEqual(p.exitcode, 0)
974
975 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100976 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
977 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200978 with cond:
979 expected = 0.1
980 dt = time.time()
981 result = cond.wait_for(lambda : state.value==4, timeout=expected)
982 dt = time.time() - dt
983 # borrow logic in assertTimeout() from test/lock_tests.py
984 if not result and expected * 0.6 < dt < expected * 10.0:
985 success.value = True
986
987 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
988 def test_waitfor_timeout(self):
989 # based on test in test/lock_tests.py
990 cond = self.Condition()
991 state = self.Value('i', 0)
992 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100993 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200994
995 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100996 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200997 p.daemon = True
998 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100999 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +02001000
1001 # Only increment 3 times, so state == 4 is never reached.
1002 for i in range(3):
1003 time.sleep(0.01)
1004 with cond:
1005 state.value += 1
1006 cond.notify()
1007
1008 p.join(5)
1009 self.assertTrue(success.value)
1010
Richard Oudkerk98449932012-06-05 13:15:29 +01001011 @classmethod
1012 def _test_wait_result(cls, c, pid):
1013 with c:
1014 c.notify()
1015 time.sleep(1)
1016 if pid is not None:
1017 os.kill(pid, signal.SIGINT)
1018
1019 def test_wait_result(self):
1020 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1021 pid = os.getpid()
1022 else:
1023 pid = None
1024
1025 c = self.Condition()
1026 with c:
1027 self.assertFalse(c.wait(0))
1028 self.assertFalse(c.wait(0.1))
1029
1030 p = self.Process(target=self._test_wait_result, args=(c, pid))
1031 p.start()
1032
1033 self.assertTrue(c.wait(10))
1034 if pid is not None:
1035 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1036
1037 p.join()
1038
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039
1040class _TestEvent(BaseTestCase):
1041
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001042 @classmethod
1043 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001044 time.sleep(TIMEOUT2)
1045 event.set()
1046
1047 def test_event(self):
1048 event = self.Event()
1049 wait = TimingWrapper(event.wait)
1050
Ezio Melotti13925002011-03-16 11:05:33 +02001051 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001052 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001053 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001054
Benjamin Peterson965ce872009-04-05 21:24:58 +00001055 # Removed, threading.Event.wait() will return the value of the __flag
1056 # instead of None. API Shear with the semaphore backed mp.Event
1057 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001058 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001059 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001060 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1061
1062 event.set()
1063
1064 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001065 self.assertEqual(event.is_set(), True)
1066 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001067 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001068 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001069 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1070 # self.assertEqual(event.is_set(), True)
1071
1072 event.clear()
1073
1074 #self.assertEqual(event.is_set(), False)
1075
Jesus Cea94f964f2011-09-09 20:26:57 +02001076 p = self.Process(target=self._test_event, args=(event,))
1077 p.daemon = True
1078 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001079 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001080
1081#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001082# Tests for Barrier - adapted from tests in test/lock_tests.py
1083#
1084
1085# Many of the tests for threading.Barrier use a list as an atomic
1086# counter: a value is appended to increment the counter, and the
1087# length of the list gives the value. We use the class DummyList
1088# for the same purpose.
1089
1090class _DummyList(object):
1091
1092 def __init__(self):
1093 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1094 lock = multiprocessing.Lock()
1095 self.__setstate__((wrapper, lock))
1096 self._lengthbuf[0] = 0
1097
1098 def __setstate__(self, state):
1099 (self._wrapper, self._lock) = state
1100 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1101
1102 def __getstate__(self):
1103 return (self._wrapper, self._lock)
1104
1105 def append(self, _):
1106 with self._lock:
1107 self._lengthbuf[0] += 1
1108
1109 def __len__(self):
1110 with self._lock:
1111 return self._lengthbuf[0]
1112
1113def _wait():
1114 # A crude wait/yield function not relying on synchronization primitives.
1115 time.sleep(0.01)
1116
1117
1118class Bunch(object):
1119 """
1120 A bunch of threads.
1121 """
1122 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1123 """
1124 Construct a bunch of `n` threads running the same function `f`.
1125 If `wait_before_exit` is True, the threads won't terminate until
1126 do_finish() is called.
1127 """
1128 self.f = f
1129 self.args = args
1130 self.n = n
1131 self.started = namespace.DummyList()
1132 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001133 self._can_exit = namespace.Event()
1134 if not wait_before_exit:
1135 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001136 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001137 p = namespace.Process(target=self.task)
1138 p.daemon = True
1139 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001140
1141 def task(self):
1142 pid = os.getpid()
1143 self.started.append(pid)
1144 try:
1145 self.f(*self.args)
1146 finally:
1147 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001148 self._can_exit.wait(30)
1149 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001150
1151 def wait_for_started(self):
1152 while len(self.started) < self.n:
1153 _wait()
1154
1155 def wait_for_finished(self):
1156 while len(self.finished) < self.n:
1157 _wait()
1158
1159 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001160 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001161
1162
1163class AppendTrue(object):
1164 def __init__(self, obj):
1165 self.obj = obj
1166 def __call__(self):
1167 self.obj.append(True)
1168
1169
1170class _TestBarrier(BaseTestCase):
1171 """
1172 Tests for Barrier objects.
1173 """
1174 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001175 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001176
1177 def setUp(self):
1178 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1179
1180 def tearDown(self):
1181 self.barrier.abort()
1182 self.barrier = None
1183
1184 def DummyList(self):
1185 if self.TYPE == 'threads':
1186 return []
1187 elif self.TYPE == 'manager':
1188 return self.manager.list()
1189 else:
1190 return _DummyList()
1191
1192 def run_threads(self, f, args):
1193 b = Bunch(self, f, args, self.N-1)
1194 f(*args)
1195 b.wait_for_finished()
1196
1197 @classmethod
1198 def multipass(cls, barrier, results, n):
1199 m = barrier.parties
1200 assert m == cls.N
1201 for i in range(n):
1202 results[0].append(True)
1203 assert len(results[1]) == i * m
1204 barrier.wait()
1205 results[1].append(True)
1206 assert len(results[0]) == (i + 1) * m
1207 barrier.wait()
1208 try:
1209 assert barrier.n_waiting == 0
1210 except NotImplementedError:
1211 pass
1212 assert not barrier.broken
1213
1214 def test_barrier(self, passes=1):
1215 """
1216 Test that a barrier is passed in lockstep
1217 """
1218 results = [self.DummyList(), self.DummyList()]
1219 self.run_threads(self.multipass, (self.barrier, results, passes))
1220
1221 def test_barrier_10(self):
1222 """
1223 Test that a barrier works for 10 consecutive runs
1224 """
1225 return self.test_barrier(10)
1226
1227 @classmethod
1228 def _test_wait_return_f(cls, barrier, queue):
1229 res = barrier.wait()
1230 queue.put(res)
1231
1232 def test_wait_return(self):
1233 """
1234 test the return value from barrier.wait
1235 """
1236 queue = self.Queue()
1237 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1238 results = [queue.get() for i in range(self.N)]
1239 self.assertEqual(results.count(0), 1)
1240
1241 @classmethod
1242 def _test_action_f(cls, barrier, results):
1243 barrier.wait()
1244 if len(results) != 1:
1245 raise RuntimeError
1246
1247 def test_action(self):
1248 """
1249 Test the 'action' callback
1250 """
1251 results = self.DummyList()
1252 barrier = self.Barrier(self.N, action=AppendTrue(results))
1253 self.run_threads(self._test_action_f, (barrier, results))
1254 self.assertEqual(len(results), 1)
1255
1256 @classmethod
1257 def _test_abort_f(cls, barrier, results1, results2):
1258 try:
1259 i = barrier.wait()
1260 if i == cls.N//2:
1261 raise RuntimeError
1262 barrier.wait()
1263 results1.append(True)
1264 except threading.BrokenBarrierError:
1265 results2.append(True)
1266 except RuntimeError:
1267 barrier.abort()
1268
1269 def test_abort(self):
1270 """
1271 Test that an abort will put the barrier in a broken state
1272 """
1273 results1 = self.DummyList()
1274 results2 = self.DummyList()
1275 self.run_threads(self._test_abort_f,
1276 (self.barrier, results1, results2))
1277 self.assertEqual(len(results1), 0)
1278 self.assertEqual(len(results2), self.N-1)
1279 self.assertTrue(self.barrier.broken)
1280
1281 @classmethod
1282 def _test_reset_f(cls, barrier, results1, results2, results3):
1283 i = barrier.wait()
1284 if i == cls.N//2:
1285 # Wait until the other threads are all in the barrier.
1286 while barrier.n_waiting < cls.N-1:
1287 time.sleep(0.001)
1288 barrier.reset()
1289 else:
1290 try:
1291 barrier.wait()
1292 results1.append(True)
1293 except threading.BrokenBarrierError:
1294 results2.append(True)
1295 # Now, pass the barrier again
1296 barrier.wait()
1297 results3.append(True)
1298
1299 def test_reset(self):
1300 """
1301 Test that a 'reset' on a barrier frees the waiting threads
1302 """
1303 results1 = self.DummyList()
1304 results2 = self.DummyList()
1305 results3 = self.DummyList()
1306 self.run_threads(self._test_reset_f,
1307 (self.barrier, results1, results2, results3))
1308 self.assertEqual(len(results1), 0)
1309 self.assertEqual(len(results2), self.N-1)
1310 self.assertEqual(len(results3), self.N)
1311
1312 @classmethod
1313 def _test_abort_and_reset_f(cls, barrier, barrier2,
1314 results1, results2, results3):
1315 try:
1316 i = barrier.wait()
1317 if i == cls.N//2:
1318 raise RuntimeError
1319 barrier.wait()
1320 results1.append(True)
1321 except threading.BrokenBarrierError:
1322 results2.append(True)
1323 except RuntimeError:
1324 barrier.abort()
1325 # Synchronize and reset the barrier. Must synchronize first so
1326 # that everyone has left it when we reset, and after so that no
1327 # one enters it before the reset.
1328 if barrier2.wait() == cls.N//2:
1329 barrier.reset()
1330 barrier2.wait()
1331 barrier.wait()
1332 results3.append(True)
1333
1334 def test_abort_and_reset(self):
1335 """
1336 Test that a barrier can be reset after being broken.
1337 """
1338 results1 = self.DummyList()
1339 results2 = self.DummyList()
1340 results3 = self.DummyList()
1341 barrier2 = self.Barrier(self.N)
1342
1343 self.run_threads(self._test_abort_and_reset_f,
1344 (self.barrier, barrier2, results1, results2, results3))
1345 self.assertEqual(len(results1), 0)
1346 self.assertEqual(len(results2), self.N-1)
1347 self.assertEqual(len(results3), self.N)
1348
1349 @classmethod
1350 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001351 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001352 if i == cls.N//2:
1353 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001354 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001355 try:
1356 barrier.wait(0.5)
1357 except threading.BrokenBarrierError:
1358 results.append(True)
1359
1360 def test_timeout(self):
1361 """
1362 Test wait(timeout)
1363 """
1364 results = self.DummyList()
1365 self.run_threads(self._test_timeout_f, (self.barrier, results))
1366 self.assertEqual(len(results), self.barrier.parties)
1367
1368 @classmethod
1369 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001370 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001371 if i == cls.N//2:
1372 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001373 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001374 try:
1375 barrier.wait()
1376 except threading.BrokenBarrierError:
1377 results.append(True)
1378
1379 def test_default_timeout(self):
1380 """
1381 Test the barrier's default timeout
1382 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001383 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001384 results = self.DummyList()
1385 self.run_threads(self._test_default_timeout_f, (barrier, results))
1386 self.assertEqual(len(results), barrier.parties)
1387
1388 def test_single_thread(self):
1389 b = self.Barrier(1)
1390 b.wait()
1391 b.wait()
1392
1393 @classmethod
1394 def _test_thousand_f(cls, barrier, passes, conn, lock):
1395 for i in range(passes):
1396 barrier.wait()
1397 with lock:
1398 conn.send(i)
1399
1400 def test_thousand(self):
1401 if self.TYPE == 'manager':
1402 return
1403 passes = 1000
1404 lock = self.Lock()
1405 conn, child_conn = self.Pipe(False)
1406 for j in range(self.N):
1407 p = self.Process(target=self._test_thousand_f,
1408 args=(self.barrier, passes, child_conn, lock))
1409 p.start()
1410
1411 for i in range(passes):
1412 for j in range(self.N):
1413 self.assertEqual(conn.recv(), i)
1414
1415#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001416#
1417#
1418
1419class _TestValue(BaseTestCase):
1420
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001421 ALLOWED_TYPES = ('processes',)
1422
Benjamin Petersone711caf2008-06-11 16:44:04 +00001423 codes_values = [
1424 ('i', 4343, 24234),
1425 ('d', 3.625, -4.25),
1426 ('h', -232, 234),
1427 ('c', latin('x'), latin('y'))
1428 ]
1429
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001430 def setUp(self):
1431 if not HAS_SHAREDCTYPES:
1432 self.skipTest("requires multiprocessing.sharedctypes")
1433
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001434 @classmethod
1435 def _test(cls, values):
1436 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001437 sv.value = cv[2]
1438
1439
1440 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001441 if raw:
1442 values = [self.RawValue(code, value)
1443 for code, value, _ in self.codes_values]
1444 else:
1445 values = [self.Value(code, value)
1446 for code, value, _ in self.codes_values]
1447
1448 for sv, cv in zip(values, self.codes_values):
1449 self.assertEqual(sv.value, cv[1])
1450
1451 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001452 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001453 proc.start()
1454 proc.join()
1455
1456 for sv, cv in zip(values, self.codes_values):
1457 self.assertEqual(sv.value, cv[2])
1458
1459 def test_rawvalue(self):
1460 self.test_value(raw=True)
1461
1462 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001463 val1 = self.Value('i', 5)
1464 lock1 = val1.get_lock()
1465 obj1 = val1.get_obj()
1466
1467 val2 = self.Value('i', 5, lock=None)
1468 lock2 = val2.get_lock()
1469 obj2 = val2.get_obj()
1470
1471 lock = self.Lock()
1472 val3 = self.Value('i', 5, lock=lock)
1473 lock3 = val3.get_lock()
1474 obj3 = val3.get_obj()
1475 self.assertEqual(lock, lock3)
1476
Jesse Nollerb0516a62009-01-18 03:11:38 +00001477 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001478 self.assertFalse(hasattr(arr4, 'get_lock'))
1479 self.assertFalse(hasattr(arr4, 'get_obj'))
1480
Jesse Nollerb0516a62009-01-18 03:11:38 +00001481 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1482
1483 arr5 = self.RawValue('i', 5)
1484 self.assertFalse(hasattr(arr5, 'get_lock'))
1485 self.assertFalse(hasattr(arr5, 'get_obj'))
1486
Benjamin Petersone711caf2008-06-11 16:44:04 +00001487
1488class _TestArray(BaseTestCase):
1489
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001490 ALLOWED_TYPES = ('processes',)
1491
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001492 @classmethod
1493 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001494 for i in range(1, len(seq)):
1495 seq[i] += seq[i-1]
1496
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001497 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001498 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001499 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1500 if raw:
1501 arr = self.RawArray('i', seq)
1502 else:
1503 arr = self.Array('i', seq)
1504
1505 self.assertEqual(len(arr), len(seq))
1506 self.assertEqual(arr[3], seq[3])
1507 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1508
1509 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1510
1511 self.assertEqual(list(arr[:]), seq)
1512
1513 self.f(seq)
1514
1515 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001516 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001517 p.start()
1518 p.join()
1519
1520 self.assertEqual(list(arr[:]), seq)
1521
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001522 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001523 def test_array_from_size(self):
1524 size = 10
1525 # Test for zeroing (see issue #11675).
1526 # The repetition below strengthens the test by increasing the chances
1527 # of previously allocated non-zero memory being used for the new array
1528 # on the 2nd and 3rd loops.
1529 for _ in range(3):
1530 arr = self.Array('i', size)
1531 self.assertEqual(len(arr), size)
1532 self.assertEqual(list(arr), [0] * size)
1533 arr[:] = range(10)
1534 self.assertEqual(list(arr), list(range(10)))
1535 del arr
1536
1537 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001538 def test_rawarray(self):
1539 self.test_array(raw=True)
1540
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001541 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001542 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001543 arr1 = self.Array('i', list(range(10)))
1544 lock1 = arr1.get_lock()
1545 obj1 = arr1.get_obj()
1546
1547 arr2 = self.Array('i', list(range(10)), lock=None)
1548 lock2 = arr2.get_lock()
1549 obj2 = arr2.get_obj()
1550
1551 lock = self.Lock()
1552 arr3 = self.Array('i', list(range(10)), lock=lock)
1553 lock3 = arr3.get_lock()
1554 obj3 = arr3.get_obj()
1555 self.assertEqual(lock, lock3)
1556
Jesse Nollerb0516a62009-01-18 03:11:38 +00001557 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001558 self.assertFalse(hasattr(arr4, 'get_lock'))
1559 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001560 self.assertRaises(AttributeError,
1561 self.Array, 'i', range(10), lock='notalock')
1562
1563 arr5 = self.RawArray('i', range(10))
1564 self.assertFalse(hasattr(arr5, 'get_lock'))
1565 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001566
1567#
1568#
1569#
1570
1571class _TestContainers(BaseTestCase):
1572
1573 ALLOWED_TYPES = ('manager',)
1574
1575 def test_list(self):
1576 a = self.list(list(range(10)))
1577 self.assertEqual(a[:], list(range(10)))
1578
1579 b = self.list()
1580 self.assertEqual(b[:], [])
1581
1582 b.extend(list(range(5)))
1583 self.assertEqual(b[:], list(range(5)))
1584
1585 self.assertEqual(b[2], 2)
1586 self.assertEqual(b[2:10], [2,3,4])
1587
1588 b *= 2
1589 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1590
1591 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1592
1593 self.assertEqual(a[:], list(range(10)))
1594
1595 d = [a, b]
1596 e = self.list(d)
1597 self.assertEqual(
1598 e[:],
1599 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1600 )
1601
1602 f = self.list([a])
1603 a.append('hello')
1604 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1605
1606 def test_dict(self):
1607 d = self.dict()
1608 indices = list(range(65, 70))
1609 for i in indices:
1610 d[i] = chr(i)
1611 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1612 self.assertEqual(sorted(d.keys()), indices)
1613 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1614 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1615
1616 def test_namespace(self):
1617 n = self.Namespace()
1618 n.name = 'Bob'
1619 n.job = 'Builder'
1620 n._hidden = 'hidden'
1621 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1622 del n.job
1623 self.assertEqual(str(n), "Namespace(name='Bob')")
1624 self.assertTrue(hasattr(n, 'name'))
1625 self.assertTrue(not hasattr(n, 'job'))
1626
1627#
1628#
1629#
1630
1631def sqr(x, wait=0.0):
1632 time.sleep(wait)
1633 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001634
Antoine Pitroude911b22011-12-21 11:03:24 +01001635def mul(x, y):
1636 return x*y
1637
Benjamin Petersone711caf2008-06-11 16:44:04 +00001638class _TestPool(BaseTestCase):
1639
Richard Oudkerkd15642e2013-07-16 15:33:41 +01001640 @classmethod
1641 def setUpClass(cls):
1642 super().setUpClass()
1643 cls.pool = cls.Pool(4)
1644
1645 @classmethod
1646 def tearDownClass(cls):
1647 cls.pool.terminate()
1648 cls.pool.join()
1649 cls.pool = None
1650 super().tearDownClass()
1651
Benjamin Petersone711caf2008-06-11 16:44:04 +00001652 def test_apply(self):
1653 papply = self.pool.apply
1654 self.assertEqual(papply(sqr, (5,)), sqr(5))
1655 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1656
1657 def test_map(self):
1658 pmap = self.pool.map
1659 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1660 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1661 list(map(sqr, list(range(100)))))
1662
Antoine Pitroude911b22011-12-21 11:03:24 +01001663 def test_starmap(self):
1664 psmap = self.pool.starmap
1665 tuples = list(zip(range(10), range(9,-1, -1)))
1666 self.assertEqual(psmap(mul, tuples),
1667 list(itertools.starmap(mul, tuples)))
1668 tuples = list(zip(range(100), range(99,-1, -1)))
1669 self.assertEqual(psmap(mul, tuples, chunksize=20),
1670 list(itertools.starmap(mul, tuples)))
1671
1672 def test_starmap_async(self):
1673 tuples = list(zip(range(100), range(99,-1, -1)))
1674 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1675 list(itertools.starmap(mul, tuples)))
1676
Hynek Schlawack254af262012-10-27 12:53:02 +02001677 def test_map_async(self):
1678 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1679 list(map(sqr, list(range(10)))))
1680
1681 def test_map_async_callbacks(self):
1682 call_args = self.manager.list() if self.TYPE == 'manager' else []
1683 self.pool.map_async(int, ['1'],
1684 callback=call_args.append,
1685 error_callback=call_args.append).wait()
1686 self.assertEqual(1, len(call_args))
1687 self.assertEqual([1], call_args[0])
1688 self.pool.map_async(int, ['a'],
1689 callback=call_args.append,
1690 error_callback=call_args.append).wait()
1691 self.assertEqual(2, len(call_args))
1692 self.assertIsInstance(call_args[1], ValueError)
1693
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001694 def test_map_chunksize(self):
1695 try:
1696 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1697 except multiprocessing.TimeoutError:
1698 self.fail("pool.map_async with chunksize stalled on null list")
1699
Benjamin Petersone711caf2008-06-11 16:44:04 +00001700 def test_async(self):
1701 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1702 get = TimingWrapper(res.get)
1703 self.assertEqual(get(), 49)
1704 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1705
1706 def test_async_timeout(self):
1707 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1708 get = TimingWrapper(res.get)
1709 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1710 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1711
1712 def test_imap(self):
1713 it = self.pool.imap(sqr, list(range(10)))
1714 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1715
1716 it = self.pool.imap(sqr, list(range(10)))
1717 for i in range(10):
1718 self.assertEqual(next(it), i*i)
1719 self.assertRaises(StopIteration, it.__next__)
1720
1721 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1722 for i in range(1000):
1723 self.assertEqual(next(it), i*i)
1724 self.assertRaises(StopIteration, it.__next__)
1725
1726 def test_imap_unordered(self):
1727 it = self.pool.imap_unordered(sqr, list(range(1000)))
1728 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1729
1730 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1731 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1732
1733 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001734 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1735 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1736
Benjamin Petersone711caf2008-06-11 16:44:04 +00001737 p = multiprocessing.Pool(3)
1738 self.assertEqual(3, len(p._pool))
1739 p.close()
1740 p.join()
1741
1742 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001743 result = self.pool.map_async(
1744 time.sleep, [0.1 for i in range(10000)], chunksize=1
1745 )
1746 self.pool.terminate()
1747 join = TimingWrapper(self.pool.join)
1748 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001749 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001750
Richard Oudkerke41682b2012-06-06 19:04:57 +01001751 def test_empty_iterable(self):
1752 # See Issue 12157
1753 p = self.Pool(1)
1754
1755 self.assertEqual(p.map(sqr, []), [])
1756 self.assertEqual(list(p.imap(sqr, [])), [])
1757 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1758 self.assertEqual(p.map_async(sqr, []).get(), [])
1759
1760 p.close()
1761 p.join()
1762
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001763 def test_context(self):
1764 if self.TYPE == 'processes':
1765 L = list(range(10))
1766 expected = [sqr(i) for i in L]
1767 with multiprocessing.Pool(2) as p:
1768 r = p.map_async(sqr, L)
1769 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001770 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001771
Ask Solem2afcbf22010-11-09 20:55:52 +00001772def raising():
1773 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001774
Ask Solem2afcbf22010-11-09 20:55:52 +00001775def unpickleable_result():
1776 return lambda: 42
1777
1778class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001779 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001780
1781 def test_async_error_callback(self):
1782 p = multiprocessing.Pool(2)
1783
1784 scratchpad = [None]
1785 def errback(exc):
1786 scratchpad[0] = exc
1787
1788 res = p.apply_async(raising, error_callback=errback)
1789 self.assertRaises(KeyError, res.get)
1790 self.assertTrue(scratchpad[0])
1791 self.assertIsInstance(scratchpad[0], KeyError)
1792
1793 p.close()
1794 p.join()
1795
1796 def test_unpickleable_result(self):
1797 from multiprocessing.pool import MaybeEncodingError
1798 p = multiprocessing.Pool(2)
1799
1800 # Make sure we don't lose pool processes because of encoding errors.
1801 for iteration in range(20):
1802
1803 scratchpad = [None]
1804 def errback(exc):
1805 scratchpad[0] = exc
1806
1807 res = p.apply_async(unpickleable_result, error_callback=errback)
1808 self.assertRaises(MaybeEncodingError, res.get)
1809 wrapped = scratchpad[0]
1810 self.assertTrue(wrapped)
1811 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1812 self.assertIsNotNone(wrapped.exc)
1813 self.assertIsNotNone(wrapped.value)
1814
1815 p.close()
1816 p.join()
1817
1818class _TestPoolWorkerLifetime(BaseTestCase):
1819 ALLOWED_TYPES = ('processes', )
1820
Jesse Noller1f0b6582010-01-27 03:36:01 +00001821 def test_pool_worker_lifetime(self):
1822 p = multiprocessing.Pool(3, maxtasksperchild=10)
1823 self.assertEqual(3, len(p._pool))
1824 origworkerpids = [w.pid for w in p._pool]
1825 # Run many tasks so each worker gets replaced (hopefully)
1826 results = []
1827 for i in range(100):
1828 results.append(p.apply_async(sqr, (i, )))
1829 # Fetch the results and verify we got the right answers,
1830 # also ensuring all the tasks have completed.
1831 for (j, res) in enumerate(results):
1832 self.assertEqual(res.get(), sqr(j))
1833 # Refill the pool
1834 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001835 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001836 # (countdown * DELTA = 5 seconds max startup process time)
1837 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001838 while countdown and not all(w.is_alive() for w in p._pool):
1839 countdown -= 1
1840 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001841 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001842 # All pids should be assigned. See issue #7805.
1843 self.assertNotIn(None, origworkerpids)
1844 self.assertNotIn(None, finalworkerpids)
1845 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001846 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1847 p.close()
1848 p.join()
1849
Charles-François Natalif8859e12011-10-24 18:45:29 +02001850 def test_pool_worker_lifetime_early_close(self):
1851 # Issue #10332: closing a pool whose workers have limited lifetimes
1852 # before all the tasks completed would make join() hang.
1853 p = multiprocessing.Pool(3, maxtasksperchild=1)
1854 results = []
1855 for i in range(6):
1856 results.append(p.apply_async(sqr, (i, 0.3)))
1857 p.close()
1858 p.join()
1859 # check the results
1860 for (j, res) in enumerate(results):
1861 self.assertEqual(res.get(), sqr(j))
1862
Benjamin Petersone711caf2008-06-11 16:44:04 +00001863#
1864# Test of creating a customized manager class
1865#
1866
1867from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1868
1869class FooBar(object):
1870 def f(self):
1871 return 'f()'
1872 def g(self):
1873 raise ValueError
1874 def _h(self):
1875 return '_h()'
1876
1877def baz():
1878 for i in range(10):
1879 yield i*i
1880
1881class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001882 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001883 def __iter__(self):
1884 return self
1885 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001886 return self._callmethod('__next__')
1887
1888class MyManager(BaseManager):
1889 pass
1890
1891MyManager.register('Foo', callable=FooBar)
1892MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1893MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1894
1895
1896class _TestMyManager(BaseTestCase):
1897
1898 ALLOWED_TYPES = ('manager',)
1899
1900 def test_mymanager(self):
1901 manager = MyManager()
1902 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001903 self.common(manager)
1904 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001905
Richard Oudkerkac385712012-06-18 21:29:30 +01001906 # If the manager process exited cleanly then the exitcode
1907 # will be zero. Otherwise (after a short timeout)
1908 # terminate() is used, resulting in an exitcode of -SIGTERM.
1909 self.assertEqual(manager._process.exitcode, 0)
1910
1911 def test_mymanager_context(self):
1912 with MyManager() as manager:
1913 self.common(manager)
1914 self.assertEqual(manager._process.exitcode, 0)
1915
1916 def test_mymanager_context_prestarted(self):
1917 manager = MyManager()
1918 manager.start()
1919 with manager:
1920 self.common(manager)
1921 self.assertEqual(manager._process.exitcode, 0)
1922
1923 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001924 foo = manager.Foo()
1925 bar = manager.Bar()
1926 baz = manager.baz()
1927
1928 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1929 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1930
1931 self.assertEqual(foo_methods, ['f', 'g'])
1932 self.assertEqual(bar_methods, ['f', '_h'])
1933
1934 self.assertEqual(foo.f(), 'f()')
1935 self.assertRaises(ValueError, foo.g)
1936 self.assertEqual(foo._callmethod('f'), 'f()')
1937 self.assertRaises(RemoteError, foo._callmethod, '_h')
1938
1939 self.assertEqual(bar.f(), 'f()')
1940 self.assertEqual(bar._h(), '_h()')
1941 self.assertEqual(bar._callmethod('f'), 'f()')
1942 self.assertEqual(bar._callmethod('_h'), '_h()')
1943
1944 self.assertEqual(list(baz), [i*i for i in range(10)])
1945
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001946
Benjamin Petersone711caf2008-06-11 16:44:04 +00001947#
1948# Test of connecting to a remote server and using xmlrpclib for serialization
1949#
1950
1951_queue = pyqueue.Queue()
1952def get_queue():
1953 return _queue
1954
1955class QueueManager(BaseManager):
1956 '''manager class used by server process'''
1957QueueManager.register('get_queue', callable=get_queue)
1958
1959class QueueManager2(BaseManager):
1960 '''manager class which specifies the same interface as QueueManager'''
1961QueueManager2.register('get_queue')
1962
1963
1964SERIALIZER = 'xmlrpclib'
1965
1966class _TestRemoteManager(BaseTestCase):
1967
1968 ALLOWED_TYPES = ('manager',)
1969
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001970 @classmethod
1971 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001972 manager = QueueManager2(
1973 address=address, authkey=authkey, serializer=SERIALIZER
1974 )
1975 manager.connect()
1976 queue = manager.get_queue()
1977 queue.put(('hello world', None, True, 2.25))
1978
1979 def test_remote(self):
1980 authkey = os.urandom(32)
1981
1982 manager = QueueManager(
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02001983 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
Benjamin Petersone711caf2008-06-11 16:44:04 +00001984 )
1985 manager.start()
1986
1987 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001988 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001989 p.start()
1990
1991 manager2 = QueueManager2(
1992 address=manager.address, authkey=authkey, serializer=SERIALIZER
1993 )
1994 manager2.connect()
1995 queue = manager2.get_queue()
1996
1997 # Note that xmlrpclib will deserialize object as a list not a tuple
1998 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1999
2000 # Because we are using xmlrpclib for serialization instead of
2001 # pickle this will cause a serialization error.
2002 self.assertRaises(Exception, queue.put, time.sleep)
2003
2004 # Make queue finalizer run before the server is stopped
2005 del queue
2006 manager.shutdown()
2007
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002008class _TestManagerRestart(BaseTestCase):
2009
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002010 @classmethod
2011 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002012 manager = QueueManager(
2013 address=address, authkey=authkey, serializer=SERIALIZER)
2014 manager.connect()
2015 queue = manager.get_queue()
2016 queue.put('hello world')
2017
2018 def test_rapid_restart(self):
2019 authkey = os.urandom(32)
2020 manager = QueueManager(
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02002021 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002022 srvr = manager.get_server()
2023 addr = srvr.address
2024 # Close the connection.Listener socket which gets opened as a part
2025 # of manager.get_server(). It's not needed for the test.
2026 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002027 manager.start()
2028
2029 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002030 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002031 p.start()
2032 queue = manager.get_queue()
2033 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002034 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002035 manager.shutdown()
2036 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002037 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002038 try:
2039 manager.start()
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002040 except OSError as e:
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002041 if e.errno != errno.EADDRINUSE:
2042 raise
2043 # Retry after some time, in case the old socket was lingering
2044 # (sporadic failure on buildbots)
2045 time.sleep(1.0)
2046 manager = QueueManager(
2047 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002048 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002049
Benjamin Petersone711caf2008-06-11 16:44:04 +00002050#
2051#
2052#
2053
2054SENTINEL = latin('')
2055
2056class _TestConnection(BaseTestCase):
2057
2058 ALLOWED_TYPES = ('processes', 'threads')
2059
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002060 @classmethod
2061 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002062 for msg in iter(conn.recv_bytes, SENTINEL):
2063 conn.send_bytes(msg)
2064 conn.close()
2065
2066 def test_connection(self):
2067 conn, child_conn = self.Pipe()
2068
2069 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002070 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002071 p.start()
2072
2073 seq = [1, 2.25, None]
2074 msg = latin('hello world')
2075 longmsg = msg * 10
2076 arr = array.array('i', list(range(4)))
2077
2078 if self.TYPE == 'processes':
2079 self.assertEqual(type(conn.fileno()), int)
2080
2081 self.assertEqual(conn.send(seq), None)
2082 self.assertEqual(conn.recv(), seq)
2083
2084 self.assertEqual(conn.send_bytes(msg), None)
2085 self.assertEqual(conn.recv_bytes(), msg)
2086
2087 if self.TYPE == 'processes':
2088 buffer = array.array('i', [0]*10)
2089 expected = list(arr) + [0] * (10 - len(arr))
2090 self.assertEqual(conn.send_bytes(arr), None)
2091 self.assertEqual(conn.recv_bytes_into(buffer),
2092 len(arr) * buffer.itemsize)
2093 self.assertEqual(list(buffer), expected)
2094
2095 buffer = array.array('i', [0]*10)
2096 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2097 self.assertEqual(conn.send_bytes(arr), None)
2098 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2099 len(arr) * buffer.itemsize)
2100 self.assertEqual(list(buffer), expected)
2101
2102 buffer = bytearray(latin(' ' * 40))
2103 self.assertEqual(conn.send_bytes(longmsg), None)
2104 try:
2105 res = conn.recv_bytes_into(buffer)
2106 except multiprocessing.BufferTooShort as e:
2107 self.assertEqual(e.args, (longmsg,))
2108 else:
2109 self.fail('expected BufferTooShort, got %s' % res)
2110
2111 poll = TimingWrapper(conn.poll)
2112
2113 self.assertEqual(poll(), False)
2114 self.assertTimingAlmostEqual(poll.elapsed, 0)
2115
Richard Oudkerk59d54042012-05-10 16:11:12 +01002116 self.assertEqual(poll(-1), False)
2117 self.assertTimingAlmostEqual(poll.elapsed, 0)
2118
Benjamin Petersone711caf2008-06-11 16:44:04 +00002119 self.assertEqual(poll(TIMEOUT1), False)
2120 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2121
2122 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002123 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002124
2125 self.assertEqual(poll(TIMEOUT1), True)
2126 self.assertTimingAlmostEqual(poll.elapsed, 0)
2127
2128 self.assertEqual(conn.recv(), None)
2129
2130 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2131 conn.send_bytes(really_big_msg)
2132 self.assertEqual(conn.recv_bytes(), really_big_msg)
2133
2134 conn.send_bytes(SENTINEL) # tell child to quit
2135 child_conn.close()
2136
2137 if self.TYPE == 'processes':
2138 self.assertEqual(conn.readable, True)
2139 self.assertEqual(conn.writable, True)
2140 self.assertRaises(EOFError, conn.recv)
2141 self.assertRaises(EOFError, conn.recv_bytes)
2142
2143 p.join()
2144
2145 def test_duplex_false(self):
2146 reader, writer = self.Pipe(duplex=False)
2147 self.assertEqual(writer.send(1), None)
2148 self.assertEqual(reader.recv(), 1)
2149 if self.TYPE == 'processes':
2150 self.assertEqual(reader.readable, True)
2151 self.assertEqual(reader.writable, False)
2152 self.assertEqual(writer.readable, False)
2153 self.assertEqual(writer.writable, True)
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002154 self.assertRaises(OSError, reader.send, 2)
2155 self.assertRaises(OSError, writer.recv)
2156 self.assertRaises(OSError, writer.poll)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002157
2158 def test_spawn_close(self):
2159 # We test that a pipe connection can be closed by parent
2160 # process immediately after child is spawned. On Windows this
2161 # would have sometimes failed on old versions because
2162 # child_conn would be closed before the child got a chance to
2163 # duplicate it.
2164 conn, child_conn = self.Pipe()
2165
2166 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002167 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002168 p.start()
2169 child_conn.close() # this might complete before child initializes
2170
2171 msg = latin('hello')
2172 conn.send_bytes(msg)
2173 self.assertEqual(conn.recv_bytes(), msg)
2174
2175 conn.send_bytes(SENTINEL)
2176 conn.close()
2177 p.join()
2178
2179 def test_sendbytes(self):
2180 if self.TYPE != 'processes':
2181 return
2182
2183 msg = latin('abcdefghijklmnopqrstuvwxyz')
2184 a, b = self.Pipe()
2185
2186 a.send_bytes(msg)
2187 self.assertEqual(b.recv_bytes(), msg)
2188
2189 a.send_bytes(msg, 5)
2190 self.assertEqual(b.recv_bytes(), msg[5:])
2191
2192 a.send_bytes(msg, 7, 8)
2193 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2194
2195 a.send_bytes(msg, 26)
2196 self.assertEqual(b.recv_bytes(), latin(''))
2197
2198 a.send_bytes(msg, 26, 0)
2199 self.assertEqual(b.recv_bytes(), latin(''))
2200
2201 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2202
2203 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2204
2205 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2206
2207 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2208
2209 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2210
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002211 @classmethod
2212 def _is_fd_assigned(cls, fd):
2213 try:
2214 os.fstat(fd)
2215 except OSError as e:
2216 if e.errno == errno.EBADF:
2217 return False
2218 raise
2219 else:
2220 return True
2221
2222 @classmethod
2223 def _writefd(cls, conn, data, create_dummy_fds=False):
2224 if create_dummy_fds:
2225 for i in range(0, 256):
2226 if not cls._is_fd_assigned(i):
2227 os.dup2(conn.fileno(), i)
2228 fd = reduction.recv_handle(conn)
2229 if msvcrt:
2230 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2231 os.write(fd, data)
2232 os.close(fd)
2233
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002234 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002235 def test_fd_transfer(self):
2236 if self.TYPE != 'processes':
2237 self.skipTest("only makes sense with processes")
2238 conn, child_conn = self.Pipe(duplex=True)
2239
2240 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002241 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002242 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002243 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002244 with open(test.support.TESTFN, "wb") as f:
2245 fd = f.fileno()
2246 if msvcrt:
2247 fd = msvcrt.get_osfhandle(fd)
2248 reduction.send_handle(conn, fd, p.pid)
2249 p.join()
2250 with open(test.support.TESTFN, "rb") as f:
2251 self.assertEqual(f.read(), b"foo")
2252
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002253 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002254 @unittest.skipIf(sys.platform == "win32",
2255 "test semantics don't make sense on Windows")
2256 @unittest.skipIf(MAXFD <= 256,
2257 "largest assignable fd number is too small")
2258 @unittest.skipUnless(hasattr(os, "dup2"),
2259 "test needs os.dup2()")
2260 def test_large_fd_transfer(self):
2261 # With fd > 256 (issue #11657)
2262 if self.TYPE != 'processes':
2263 self.skipTest("only makes sense with processes")
2264 conn, child_conn = self.Pipe(duplex=True)
2265
2266 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002267 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002268 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002269 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002270 with open(test.support.TESTFN, "wb") as f:
2271 fd = f.fileno()
2272 for newfd in range(256, MAXFD):
2273 if not self._is_fd_assigned(newfd):
2274 break
2275 else:
2276 self.fail("could not find an unassigned large file descriptor")
2277 os.dup2(fd, newfd)
2278 try:
2279 reduction.send_handle(conn, newfd, p.pid)
2280 finally:
2281 os.close(newfd)
2282 p.join()
2283 with open(test.support.TESTFN, "rb") as f:
2284 self.assertEqual(f.read(), b"bar")
2285
Jesus Cea4507e642011-09-21 03:53:25 +02002286 @classmethod
2287 def _send_data_without_fd(self, conn):
2288 os.write(conn.fileno(), b"\0")
2289
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002290 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002291 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2292 def test_missing_fd_transfer(self):
2293 # Check that exception is raised when received data is not
2294 # accompanied by a file descriptor in ancillary data.
2295 if self.TYPE != 'processes':
2296 self.skipTest("only makes sense with processes")
2297 conn, child_conn = self.Pipe(duplex=True)
2298
2299 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2300 p.daemon = True
2301 p.start()
2302 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2303 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002304
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002305 def test_context(self):
2306 a, b = self.Pipe()
2307
2308 with a, b:
2309 a.send(1729)
2310 self.assertEqual(b.recv(), 1729)
2311 if self.TYPE == 'processes':
2312 self.assertFalse(a.closed)
2313 self.assertFalse(b.closed)
2314
2315 if self.TYPE == 'processes':
2316 self.assertTrue(a.closed)
2317 self.assertTrue(b.closed)
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002318 self.assertRaises(OSError, a.recv)
2319 self.assertRaises(OSError, b.recv)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002320
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002321class _TestListener(BaseTestCase):
2322
Richard Oudkerk91257752012-06-15 21:53:34 +01002323 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002324
2325 def test_multiple_bind(self):
2326 for family in self.connection.families:
2327 l = self.connection.Listener(family=family)
2328 self.addCleanup(l.close)
2329 self.assertRaises(OSError, self.connection.Listener,
2330 l.address, family)
2331
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002332 def test_context(self):
2333 with self.connection.Listener() as l:
2334 with self.connection.Client(l.address) as c:
2335 with l.accept() as d:
2336 c.send(1729)
2337 self.assertEqual(d.recv(), 1729)
2338
2339 if self.TYPE == 'processes':
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002340 self.assertRaises(OSError, l.accept)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002341
Benjamin Petersone711caf2008-06-11 16:44:04 +00002342class _TestListenerClient(BaseTestCase):
2343
2344 ALLOWED_TYPES = ('processes', 'threads')
2345
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002346 @classmethod
2347 def _test(cls, address):
2348 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002349 conn.send('hello')
2350 conn.close()
2351
2352 def test_listener_client(self):
2353 for family in self.connection.families:
2354 l = self.connection.Listener(family=family)
2355 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002356 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002357 p.start()
2358 conn = l.accept()
2359 self.assertEqual(conn.recv(), 'hello')
2360 p.join()
2361 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002362
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002363 def test_issue14725(self):
2364 l = self.connection.Listener()
2365 p = self.Process(target=self._test, args=(l.address,))
2366 p.daemon = True
2367 p.start()
2368 time.sleep(1)
2369 # On Windows the client process should by now have connected,
2370 # written data and closed the pipe handle by now. This causes
2371 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2372 # 14725.
2373 conn = l.accept()
2374 self.assertEqual(conn.recv(), 'hello')
2375 conn.close()
2376 p.join()
2377 l.close()
2378
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002379 def test_issue16955(self):
2380 for fam in self.connection.families:
2381 l = self.connection.Listener(family=fam)
2382 c = self.connection.Client(l.address)
2383 a = l.accept()
2384 a.send_bytes(b"hello")
2385 self.assertTrue(c.poll(1))
2386 a.close()
2387 c.close()
2388 l.close()
2389
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002390class _TestPoll(BaseTestCase):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002391
2392 ALLOWED_TYPES = ('processes', 'threads')
2393
2394 def test_empty_string(self):
2395 a, b = self.Pipe()
2396 self.assertEqual(a.poll(), False)
2397 b.send_bytes(b'')
2398 self.assertEqual(a.poll(), True)
2399 self.assertEqual(a.poll(), True)
2400
2401 @classmethod
2402 def _child_strings(cls, conn, strings):
2403 for s in strings:
2404 time.sleep(0.1)
2405 conn.send_bytes(s)
2406 conn.close()
2407
2408 def test_strings(self):
2409 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2410 a, b = self.Pipe()
2411 p = self.Process(target=self._child_strings, args=(b, strings))
2412 p.start()
2413
2414 for s in strings:
2415 for i in range(200):
2416 if a.poll(0.01):
2417 break
2418 x = a.recv_bytes()
2419 self.assertEqual(s, x)
2420
2421 p.join()
2422
2423 @classmethod
2424 def _child_boundaries(cls, r):
2425 # Polling may "pull" a message in to the child process, but we
2426 # don't want it to pull only part of a message, as that would
2427 # corrupt the pipe for any other processes which might later
2428 # read from it.
2429 r.poll(5)
2430
2431 def test_boundaries(self):
2432 r, w = self.Pipe(False)
2433 p = self.Process(target=self._child_boundaries, args=(r,))
2434 p.start()
2435 time.sleep(2)
2436 L = [b"first", b"second"]
2437 for obj in L:
2438 w.send_bytes(obj)
2439 w.close()
2440 p.join()
2441 self.assertIn(r.recv_bytes(), L)
2442
2443 @classmethod
2444 def _child_dont_merge(cls, b):
2445 b.send_bytes(b'a')
2446 b.send_bytes(b'b')
2447 b.send_bytes(b'cd')
2448
2449 def test_dont_merge(self):
2450 a, b = self.Pipe()
2451 self.assertEqual(a.poll(0.0), False)
2452 self.assertEqual(a.poll(0.1), False)
2453
2454 p = self.Process(target=self._child_dont_merge, args=(b,))
2455 p.start()
2456
2457 self.assertEqual(a.recv_bytes(), b'a')
2458 self.assertEqual(a.poll(1.0), True)
2459 self.assertEqual(a.poll(1.0), True)
2460 self.assertEqual(a.recv_bytes(), b'b')
2461 self.assertEqual(a.poll(1.0), True)
2462 self.assertEqual(a.poll(1.0), True)
2463 self.assertEqual(a.poll(0.0), True)
2464 self.assertEqual(a.recv_bytes(), b'cd')
2465
2466 p.join()
2467
Benjamin Petersone711caf2008-06-11 16:44:04 +00002468#
2469# Test of sending connection and socket objects between processes
2470#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002471
2472@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002473class _TestPicklingConnections(BaseTestCase):
2474
2475 ALLOWED_TYPES = ('processes',)
2476
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002477 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002478 def tearDownClass(cls):
2479 from multiprocessing.reduction import resource_sharer
2480 resource_sharer.stop(timeout=5)
2481
2482 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002483 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002484 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002485 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002486 conn.send(l.address)
2487 new_conn = l.accept()
2488 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002489 new_conn.close()
2490 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002491
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002492 l = socket.socket()
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02002493 l.bind((test.support.HOST, 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002494 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002495 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002496 new_conn, addr = l.accept()
2497 conn.send(new_conn)
2498 new_conn.close()
2499 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002500
2501 conn.recv()
2502
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002503 @classmethod
2504 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002505 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002506 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002507 client.send(msg.upper())
2508 client.close()
2509
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002510 address, msg = conn.recv()
2511 client = socket.socket()
2512 client.connect(address)
2513 client.sendall(msg.upper())
2514 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002515
2516 conn.close()
2517
2518 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002519 families = self.connection.families
2520
2521 lconn, lconn0 = self.Pipe()
2522 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002523 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002524 lp.start()
2525 lconn0.close()
2526
2527 rconn, rconn0 = self.Pipe()
2528 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002529 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002530 rp.start()
2531 rconn0.close()
2532
2533 for fam in families:
2534 msg = ('This connection uses family %s' % fam).encode('ascii')
2535 address = lconn.recv()
2536 rconn.send((address, msg))
2537 new_conn = lconn.recv()
2538 self.assertEqual(new_conn.recv(), msg.upper())
2539
2540 rconn.send(None)
2541
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002542 msg = latin('This connection uses a normal socket')
2543 address = lconn.recv()
2544 rconn.send((address, msg))
2545 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002546 buf = []
2547 while True:
2548 s = new_conn.recv(100)
2549 if not s:
2550 break
2551 buf.append(s)
2552 buf = b''.join(buf)
2553 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002554 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002555
2556 lconn.send(None)
2557
2558 rconn.close()
2559 lconn.close()
2560
2561 lp.join()
2562 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002563
2564 @classmethod
2565 def child_access(cls, conn):
2566 w = conn.recv()
2567 w.send('all is well')
2568 w.close()
2569
2570 r = conn.recv()
2571 msg = r.recv()
2572 conn.send(msg*2)
2573
2574 conn.close()
2575
2576 def test_access(self):
2577 # On Windows, if we do not specify a destination pid when
2578 # using DupHandle then we need to be careful to use the
2579 # correct access flags for DuplicateHandle(), or else
2580 # DupHandle.detach() will raise PermissionError. For example,
2581 # for a read only pipe handle we should use
2582 # access=FILE_GENERIC_READ. (Unfortunately
2583 # DUPLICATE_SAME_ACCESS does not work.)
2584 conn, child_conn = self.Pipe()
2585 p = self.Process(target=self.child_access, args=(child_conn,))
2586 p.daemon = True
2587 p.start()
2588 child_conn.close()
2589
2590 r, w = self.Pipe(duplex=False)
2591 conn.send(w)
2592 w.close()
2593 self.assertEqual(r.recv(), 'all is well')
2594 r.close()
2595
2596 r, w = self.Pipe(duplex=False)
2597 conn.send(r)
2598 r.close()
2599 w.send('foobar')
2600 w.close()
2601 self.assertEqual(conn.recv(), 'foobar'*2)
2602
Benjamin Petersone711caf2008-06-11 16:44:04 +00002603#
2604#
2605#
2606
2607class _TestHeap(BaseTestCase):
2608
2609 ALLOWED_TYPES = ('processes',)
2610
2611 def test_heap(self):
2612 iterations = 5000
2613 maxblocks = 50
2614 blocks = []
2615
2616 # create and destroy lots of blocks of different sizes
2617 for i in range(iterations):
2618 size = int(random.lognormvariate(0, 1) * 1000)
2619 b = multiprocessing.heap.BufferWrapper(size)
2620 blocks.append(b)
2621 if len(blocks) > maxblocks:
2622 i = random.randrange(maxblocks)
2623 del blocks[i]
2624
2625 # get the heap object
2626 heap = multiprocessing.heap.BufferWrapper._heap
2627
2628 # verify the state of the heap
2629 all = []
2630 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002631 heap._lock.acquire()
2632 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002633 for L in list(heap._len_to_seq.values()):
2634 for arena, start, stop in L:
2635 all.append((heap._arenas.index(arena), start, stop,
2636 stop-start, 'free'))
2637 for arena, start, stop in heap._allocated_blocks:
2638 all.append((heap._arenas.index(arena), start, stop,
2639 stop-start, 'occupied'))
2640 occupied += (stop-start)
2641
2642 all.sort()
2643
2644 for i in range(len(all)-1):
2645 (arena, start, stop) = all[i][:3]
2646 (narena, nstart, nstop) = all[i+1][:3]
2647 self.assertTrue((arena != narena and nstart == 0) or
2648 (stop == nstart))
2649
Charles-François Natali778db492011-07-02 14:35:49 +02002650 def test_free_from_gc(self):
2651 # Check that freeing of blocks by the garbage collector doesn't deadlock
2652 # (issue #12352).
2653 # Make sure the GC is enabled, and set lower collection thresholds to
2654 # make collections more frequent (and increase the probability of
2655 # deadlock).
2656 if not gc.isenabled():
2657 gc.enable()
2658 self.addCleanup(gc.disable)
2659 thresholds = gc.get_threshold()
2660 self.addCleanup(gc.set_threshold, *thresholds)
2661 gc.set_threshold(10)
2662
2663 # perform numerous block allocations, with cyclic references to make
2664 # sure objects are collected asynchronously by the gc
2665 for i in range(5000):
2666 a = multiprocessing.heap.BufferWrapper(1)
2667 b = multiprocessing.heap.BufferWrapper(1)
2668 # circular references
2669 a.buddy = b
2670 b.buddy = a
2671
Benjamin Petersone711caf2008-06-11 16:44:04 +00002672#
2673#
2674#
2675
Benjamin Petersone711caf2008-06-11 16:44:04 +00002676class _Foo(Structure):
2677 _fields_ = [
2678 ('x', c_int),
2679 ('y', c_double)
2680 ]
2681
2682class _TestSharedCTypes(BaseTestCase):
2683
2684 ALLOWED_TYPES = ('processes',)
2685
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002686 def setUp(self):
2687 if not HAS_SHAREDCTYPES:
2688 self.skipTest("requires multiprocessing.sharedctypes")
2689
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002690 @classmethod
2691 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002692 x.value *= 2
2693 y.value *= 2
2694 foo.x *= 2
2695 foo.y *= 2
2696 string.value *= 2
2697 for i in range(len(arr)):
2698 arr[i] *= 2
2699
2700 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002701 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002702 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002703 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002704 arr = self.Array('d', list(range(10)), lock=lock)
2705 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002706 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002707
2708 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002709 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002710 p.start()
2711 p.join()
2712
2713 self.assertEqual(x.value, 14)
2714 self.assertAlmostEqual(y.value, 2.0/3.0)
2715 self.assertEqual(foo.x, 6)
2716 self.assertAlmostEqual(foo.y, 4.0)
2717 for i in range(10):
2718 self.assertAlmostEqual(arr[i], i*2)
2719 self.assertEqual(string.value, latin('hellohello'))
2720
2721 def test_synchronize(self):
2722 self.test_sharedctypes(lock=True)
2723
2724 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002725 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002726 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002727 foo.x = 0
2728 foo.y = 0
2729 self.assertEqual(bar.x, 2)
2730 self.assertAlmostEqual(bar.y, 5.0)
2731
2732#
2733#
2734#
2735
2736class _TestFinalize(BaseTestCase):
2737
2738 ALLOWED_TYPES = ('processes',)
2739
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002740 @classmethod
2741 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002742 class Foo(object):
2743 pass
2744
2745 a = Foo()
2746 util.Finalize(a, conn.send, args=('a',))
2747 del a # triggers callback for a
2748
2749 b = Foo()
2750 close_b = util.Finalize(b, conn.send, args=('b',))
2751 close_b() # triggers callback for b
2752 close_b() # does nothing because callback has already been called
2753 del b # does nothing because callback has already been called
2754
2755 c = Foo()
2756 util.Finalize(c, conn.send, args=('c',))
2757
2758 d10 = Foo()
2759 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2760
2761 d01 = Foo()
2762 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2763 d02 = Foo()
2764 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2765 d03 = Foo()
2766 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2767
2768 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2769
2770 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2771
Ezio Melotti13925002011-03-16 11:05:33 +02002772 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002773 # garbage collecting locals
2774 util._exit_function()
2775 conn.close()
2776 os._exit(0)
2777
2778 def test_finalize(self):
2779 conn, child_conn = self.Pipe()
2780
2781 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002782 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002783 p.start()
2784 p.join()
2785
2786 result = [obj for obj in iter(conn.recv, 'STOP')]
2787 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2788
2789#
2790# Test that from ... import * works for each module
2791#
2792
2793class _TestImportStar(BaseTestCase):
2794
2795 ALLOWED_TYPES = ('processes',)
2796
2797 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002798 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002799 'multiprocessing', 'multiprocessing.connection',
2800 'multiprocessing.heap', 'multiprocessing.managers',
2801 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002802 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002803 ]
2804
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002805 if HAS_REDUCTION:
2806 modules.append('multiprocessing.reduction')
2807
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002808 if c_int is not None:
2809 # This module requires _ctypes
2810 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002811
2812 for name in modules:
2813 __import__(name)
2814 mod = sys.modules[name]
2815
2816 for attr in getattr(mod, '__all__', ()):
2817 self.assertTrue(
2818 hasattr(mod, attr),
2819 '%r does not have attribute %r' % (mod, attr)
2820 )
2821
2822#
2823# Quick test that logging works -- does not test logging output
2824#
2825
2826class _TestLogging(BaseTestCase):
2827
2828 ALLOWED_TYPES = ('processes',)
2829
2830 def test_enable_logging(self):
2831 logger = multiprocessing.get_logger()
2832 logger.setLevel(util.SUBWARNING)
2833 self.assertTrue(logger is not None)
2834 logger.debug('this will not be printed')
2835 logger.info('nor will this')
2836 logger.setLevel(LOG_LEVEL)
2837
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002838 @classmethod
2839 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002840 logger = multiprocessing.get_logger()
2841 conn.send(logger.getEffectiveLevel())
2842
2843 def test_level(self):
2844 LEVEL1 = 32
2845 LEVEL2 = 37
2846
2847 logger = multiprocessing.get_logger()
2848 root_logger = logging.getLogger()
2849 root_level = root_logger.level
2850
2851 reader, writer = multiprocessing.Pipe(duplex=False)
2852
2853 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002854 p = self.Process(target=self._test_level, args=(writer,))
2855 p.daemon = True
2856 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002857 self.assertEqual(LEVEL1, reader.recv())
2858
2859 logger.setLevel(logging.NOTSET)
2860 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002861 p = self.Process(target=self._test_level, args=(writer,))
2862 p.daemon = True
2863 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002864 self.assertEqual(LEVEL2, reader.recv())
2865
2866 root_logger.setLevel(root_level)
2867 logger.setLevel(level=LOG_LEVEL)
2868
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002869
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002870# class _TestLoggingProcessName(BaseTestCase):
2871#
2872# def handle(self, record):
2873# assert record.processName == multiprocessing.current_process().name
2874# self.__handled = True
2875#
2876# def test_logging(self):
2877# handler = logging.Handler()
2878# handler.handle = self.handle
2879# self.__handled = False
2880# # Bypass getLogger() and side-effects
2881# logger = logging.getLoggerClass()(
2882# 'multiprocessing.test.TestLoggingProcessName')
2883# logger.addHandler(handler)
2884# logger.propagate = False
2885#
2886# logger.warn('foo')
2887# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002888
Benjamin Petersone711caf2008-06-11 16:44:04 +00002889#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002890# Check that Process.join() retries if os.waitpid() fails with EINTR
2891#
2892
2893class _TestPollEintr(BaseTestCase):
2894
2895 ALLOWED_TYPES = ('processes',)
2896
2897 @classmethod
2898 def _killer(cls, pid):
2899 time.sleep(0.5)
2900 os.kill(pid, signal.SIGUSR1)
2901
2902 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2903 def test_poll_eintr(self):
2904 got_signal = [False]
2905 def record(*args):
2906 got_signal[0] = True
2907 pid = os.getpid()
2908 oldhandler = signal.signal(signal.SIGUSR1, record)
2909 try:
2910 killer = self.Process(target=self._killer, args=(pid,))
2911 killer.start()
2912 p = self.Process(target=time.sleep, args=(1,))
2913 p.start()
2914 p.join()
2915 self.assertTrue(got_signal[0])
2916 self.assertEqual(p.exitcode, 0)
2917 killer.join()
2918 finally:
2919 signal.signal(signal.SIGUSR1, oldhandler)
2920
2921#
Jesse Noller6214edd2009-01-19 16:23:53 +00002922# Test to verify handle verification, see issue 3321
2923#
2924
2925class TestInvalidHandle(unittest.TestCase):
2926
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002927 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002928 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002929 conn = multiprocessing.connection.Connection(44977608)
2930 try:
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002931 self.assertRaises((ValueError, OSError), conn.poll)
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002932 finally:
2933 # Hack private attribute _handle to avoid printing an error
2934 # in conn.__del__
2935 conn._handle = None
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002936 self.assertRaises((ValueError, OSError),
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002937 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002938
Jesse Noller6214edd2009-01-19 16:23:53 +00002939#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002940# Functions used to create test cases from the base ones in this module
2941#
2942
Benjamin Petersone711caf2008-06-11 16:44:04 +00002943def create_test_cases(Mixin, type):
2944 result = {}
2945 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002946 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002947 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002948
2949 for name in list(glob.keys()):
2950 if name.startswith('_Test'):
2951 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002952 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002953 if type in base.ALLOWED_TYPES:
2954 newname = 'With' + Type + name[1:]
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002955 class Temp(base, Mixin, unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002956 pass
2957 result[newname] = Temp
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002958 Temp.__name__ = Temp.__qualname__ = newname
Benjamin Petersone711caf2008-06-11 16:44:04 +00002959 Temp.__module__ = Mixin.__module__
2960 return result
2961
2962#
2963# Create test cases
2964#
2965
2966class ProcessesMixin(object):
2967 TYPE = 'processes'
2968 Process = multiprocessing.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002969 connection = multiprocessing.connection
2970 current_process = staticmethod(multiprocessing.current_process)
2971 active_children = staticmethod(multiprocessing.active_children)
2972 Pool = staticmethod(multiprocessing.Pool)
2973 Pipe = staticmethod(multiprocessing.Pipe)
2974 Queue = staticmethod(multiprocessing.Queue)
2975 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2976 Lock = staticmethod(multiprocessing.Lock)
2977 RLock = staticmethod(multiprocessing.RLock)
2978 Semaphore = staticmethod(multiprocessing.Semaphore)
2979 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2980 Condition = staticmethod(multiprocessing.Condition)
2981 Event = staticmethod(multiprocessing.Event)
2982 Barrier = staticmethod(multiprocessing.Barrier)
2983 Value = staticmethod(multiprocessing.Value)
2984 Array = staticmethod(multiprocessing.Array)
2985 RawValue = staticmethod(multiprocessing.RawValue)
2986 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002987
2988testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2989globals().update(testcases_processes)
2990
2991
2992class ManagerMixin(object):
2993 TYPE = 'manager'
2994 Process = multiprocessing.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01002995 Queue = property(operator.attrgetter('manager.Queue'))
2996 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
2997 Lock = property(operator.attrgetter('manager.Lock'))
2998 RLock = property(operator.attrgetter('manager.RLock'))
2999 Semaphore = property(operator.attrgetter('manager.Semaphore'))
3000 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3001 Condition = property(operator.attrgetter('manager.Condition'))
3002 Event = property(operator.attrgetter('manager.Event'))
3003 Barrier = property(operator.attrgetter('manager.Barrier'))
3004 Value = property(operator.attrgetter('manager.Value'))
3005 Array = property(operator.attrgetter('manager.Array'))
3006 list = property(operator.attrgetter('manager.list'))
3007 dict = property(operator.attrgetter('manager.dict'))
3008 Namespace = property(operator.attrgetter('manager.Namespace'))
3009
3010 @classmethod
3011 def Pool(cls, *args, **kwds):
3012 return cls.manager.Pool(*args, **kwds)
3013
3014 @classmethod
3015 def setUpClass(cls):
3016 cls.manager = multiprocessing.Manager()
3017
3018 @classmethod
3019 def tearDownClass(cls):
3020 # only the manager process should be returned by active_children()
3021 # but this can take a bit on slow machines, so wait a few seconds
3022 # if there are other children too (see #17395)
3023 t = 0.01
3024 while len(multiprocessing.active_children()) > 1 and t < 5:
3025 time.sleep(t)
3026 t *= 2
3027 gc.collect() # do garbage collection
3028 if cls.manager._number_of_objects() != 0:
3029 # This is not really an error since some tests do not
3030 # ensure that all processes which hold a reference to a
3031 # managed object have been joined.
3032 print('Shared objects which still exist at manager shutdown:')
3033 print(cls.manager._debug_info())
3034 cls.manager.shutdown()
3035 cls.manager.join()
3036 cls.manager = None
Benjamin Petersone711caf2008-06-11 16:44:04 +00003037
3038testcases_manager = create_test_cases(ManagerMixin, type='manager')
3039globals().update(testcases_manager)
3040
3041
3042class ThreadsMixin(object):
3043 TYPE = 'threads'
3044 Process = multiprocessing.dummy.Process
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003045 connection = multiprocessing.dummy.connection
3046 current_process = staticmethod(multiprocessing.dummy.current_process)
3047 active_children = staticmethod(multiprocessing.dummy.active_children)
3048 Pool = staticmethod(multiprocessing.Pool)
3049 Pipe = staticmethod(multiprocessing.dummy.Pipe)
3050 Queue = staticmethod(multiprocessing.dummy.Queue)
3051 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3052 Lock = staticmethod(multiprocessing.dummy.Lock)
3053 RLock = staticmethod(multiprocessing.dummy.RLock)
3054 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3055 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3056 Condition = staticmethod(multiprocessing.dummy.Condition)
3057 Event = staticmethod(multiprocessing.dummy.Event)
3058 Barrier = staticmethod(multiprocessing.dummy.Barrier)
3059 Value = staticmethod(multiprocessing.dummy.Value)
3060 Array = staticmethod(multiprocessing.dummy.Array)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003061
3062testcases_threads = create_test_cases(ThreadsMixin, type='threads')
3063globals().update(testcases_threads)
3064
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003065
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003066class OtherTest(unittest.TestCase):
3067 # TODO: add more tests for deliver/answer challenge.
3068 def test_deliver_challenge_auth_failure(self):
3069 class _FakeConnection(object):
3070 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003071 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003072 def send_bytes(self, data):
3073 pass
3074 self.assertRaises(multiprocessing.AuthenticationError,
3075 multiprocessing.connection.deliver_challenge,
3076 _FakeConnection(), b'abc')
3077
3078 def test_answer_challenge_auth_failure(self):
3079 class _FakeConnection(object):
3080 def __init__(self):
3081 self.count = 0
3082 def recv_bytes(self, size):
3083 self.count += 1
3084 if self.count == 1:
3085 return multiprocessing.connection.CHALLENGE
3086 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003087 return b'something bogus'
3088 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003089 def send_bytes(self, data):
3090 pass
3091 self.assertRaises(multiprocessing.AuthenticationError,
3092 multiprocessing.connection.answer_challenge,
3093 _FakeConnection(), b'abc')
3094
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003095#
3096# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3097#
3098
3099def initializer(ns):
3100 ns.test += 1
3101
3102class TestInitializers(unittest.TestCase):
3103 def setUp(self):
3104 self.mgr = multiprocessing.Manager()
3105 self.ns = self.mgr.Namespace()
3106 self.ns.test = 0
3107
3108 def tearDown(self):
3109 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003110 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003111
3112 def test_manager_initializer(self):
3113 m = multiprocessing.managers.SyncManager()
3114 self.assertRaises(TypeError, m.start, 1)
3115 m.start(initializer, (self.ns,))
3116 self.assertEqual(self.ns.test, 1)
3117 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003118 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003119
3120 def test_pool_initializer(self):
3121 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3122 p = multiprocessing.Pool(1, initializer, (self.ns,))
3123 p.close()
3124 p.join()
3125 self.assertEqual(self.ns.test, 1)
3126
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003127#
3128# Issue 5155, 5313, 5331: Test process in processes
3129# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3130#
3131
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003132def _this_sub_process(q):
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003133 try:
3134 item = q.get(block=False)
3135 except pyqueue.Empty:
3136 pass
3137
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003138def _test_process(q):
3139 queue = multiprocessing.Queue()
3140 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3141 subProc.daemon = True
3142 subProc.start()
3143 subProc.join()
3144
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003145def _afunc(x):
3146 return x*x
3147
3148def pool_in_process():
3149 pool = multiprocessing.Pool(processes=4)
3150 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003151 pool.close()
3152 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003153
3154class _file_like(object):
3155 def __init__(self, delegate):
3156 self._delegate = delegate
3157 self._pid = None
3158
3159 @property
3160 def cache(self):
3161 pid = os.getpid()
3162 # There are no race conditions since fork keeps only the running thread
3163 if pid != self._pid:
3164 self._pid = pid
3165 self._cache = []
3166 return self._cache
3167
3168 def write(self, data):
3169 self.cache.append(data)
3170
3171 def flush(self):
3172 self._delegate.write(''.join(self.cache))
3173 self._cache = []
3174
3175class TestStdinBadfiledescriptor(unittest.TestCase):
3176
3177 def test_queue_in_process(self):
3178 queue = multiprocessing.Queue()
Richard Oudkerk8b3f5aa2013-09-29 17:29:56 +01003179 proc = multiprocessing.Process(target=_test_process, args=(queue,))
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003180 proc.start()
3181 proc.join()
3182
3183 def test_pool_in_process(self):
3184 p = multiprocessing.Process(target=pool_in_process)
3185 p.start()
3186 p.join()
3187
3188 def test_flushing(self):
3189 sio = io.StringIO()
3190 flike = _file_like(sio)
3191 flike.write('foo')
3192 proc = multiprocessing.Process(target=lambda: flike.flush())
3193 flike.flush()
3194 assert sio.getvalue() == 'foo'
3195
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003196
3197class TestWait(unittest.TestCase):
3198
3199 @classmethod
3200 def _child_test_wait(cls, w, slow):
3201 for i in range(10):
3202 if slow:
3203 time.sleep(random.random()*0.1)
3204 w.send((i, os.getpid()))
3205 w.close()
3206
3207 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003208 from multiprocessing.connection import wait
3209 readers = []
3210 procs = []
3211 messages = []
3212
3213 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003214 r, w = multiprocessing.Pipe(duplex=False)
3215 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003216 p.daemon = True
3217 p.start()
3218 w.close()
3219 readers.append(r)
3220 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003221 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003222
3223 while readers:
3224 for r in wait(readers):
3225 try:
3226 msg = r.recv()
3227 except EOFError:
3228 readers.remove(r)
3229 r.close()
3230 else:
3231 messages.append(msg)
3232
3233 messages.sort()
3234 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3235 self.assertEqual(messages, expected)
3236
3237 @classmethod
3238 def _child_test_wait_socket(cls, address, slow):
3239 s = socket.socket()
3240 s.connect(address)
3241 for i in range(10):
3242 if slow:
3243 time.sleep(random.random()*0.1)
3244 s.sendall(('%s\n' % i).encode('ascii'))
3245 s.close()
3246
3247 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003248 from multiprocessing.connection import wait
3249 l = socket.socket()
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02003250 l.bind((test.support.HOST, 0))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003251 l.listen(4)
Antoine Pitrou1e440cf2013-08-22 00:39:46 +02003252 addr = l.getsockname()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003253 readers = []
3254 procs = []
3255 dic = {}
3256
3257 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003258 p = multiprocessing.Process(target=self._child_test_wait_socket,
3259 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003260 p.daemon = True
3261 p.start()
3262 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003263 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003264
3265 for i in range(4):
3266 r, _ = l.accept()
3267 readers.append(r)
3268 dic[r] = []
3269 l.close()
3270
3271 while readers:
3272 for r in wait(readers):
3273 msg = r.recv(32)
3274 if not msg:
3275 readers.remove(r)
3276 r.close()
3277 else:
3278 dic[r].append(msg)
3279
3280 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3281 for v in dic.values():
3282 self.assertEqual(b''.join(v), expected)
3283
3284 def test_wait_slow(self):
3285 self.test_wait(True)
3286
3287 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003288 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003289
3290 def test_wait_timeout(self):
3291 from multiprocessing.connection import wait
3292
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003293 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003294 a, b = multiprocessing.Pipe()
3295
3296 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003297 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003298 delta = time.time() - start
3299
3300 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003301 self.assertLess(delta, expected * 2)
3302 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003303
3304 b.send(None)
3305
3306 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003307 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003308 delta = time.time() - start
3309
3310 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003311 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003312
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003313 @classmethod
3314 def signal_and_sleep(cls, sem, period):
3315 sem.release()
3316 time.sleep(period)
3317
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003318 def test_wait_integer(self):
3319 from multiprocessing.connection import wait
3320
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003321 expected = 3
Giampaolo Rodola'67da8942013-01-14 02:24:25 +01003322 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003323 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003324 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003325 p = multiprocessing.Process(target=self.signal_and_sleep,
3326 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003327
3328 p.start()
3329 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003330 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003331
3332 start = time.time()
3333 res = wait([a, p.sentinel, b], expected + 20)
3334 delta = time.time() - start
3335
3336 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003337 self.assertLess(delta, expected + 2)
3338 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003339
3340 a.send(None)
3341
3342 start = time.time()
3343 res = wait([a, p.sentinel, b], 20)
3344 delta = time.time() - start
3345
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003346 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003347 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003348
3349 b.send(None)
3350
3351 start = time.time()
3352 res = wait([a, p.sentinel, b], 20)
3353 delta = time.time() - start
3354
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003355 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003356 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003357
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003358 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003359 p.join()
3360
Richard Oudkerk59d54042012-05-10 16:11:12 +01003361 def test_neg_timeout(self):
3362 from multiprocessing.connection import wait
3363 a, b = multiprocessing.Pipe()
3364 t = time.time()
3365 res = wait([a], timeout=-1)
3366 t = time.time() - t
3367 self.assertEqual(res, [])
3368 self.assertLess(t, 1)
3369 a.close()
3370 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003371
Antoine Pitrou709176f2012-04-01 17:19:09 +02003372#
3373# Issue 14151: Test invalid family on invalid environment
3374#
3375
3376class TestInvalidFamily(unittest.TestCase):
3377
3378 @unittest.skipIf(WIN32, "skipped on Windows")
3379 def test_invalid_family(self):
3380 with self.assertRaises(ValueError):
3381 multiprocessing.connection.Listener(r'\\.\test')
3382
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003383 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3384 def test_invalid_family_win32(self):
3385 with self.assertRaises(ValueError):
3386 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003387
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003388#
3389# Issue 12098: check sys.flags of child matches that for parent
3390#
3391
3392class TestFlags(unittest.TestCase):
3393 @classmethod
3394 def run_in_grandchild(cls, conn):
3395 conn.send(tuple(sys.flags))
3396
3397 @classmethod
3398 def run_in_child(cls):
3399 import json
3400 r, w = multiprocessing.Pipe(duplex=False)
3401 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3402 p.start()
3403 grandchild_flags = r.recv()
3404 p.join()
3405 r.close()
3406 w.close()
3407 flags = (tuple(sys.flags), grandchild_flags)
3408 print(json.dumps(flags))
3409
3410 def test_flags(self):
3411 import json, subprocess
3412 # start child process using unusual flags
3413 prog = ('from test.test_multiprocessing import TestFlags; ' +
3414 'TestFlags.run_in_child()')
3415 data = subprocess.check_output(
3416 [sys.executable, '-E', '-S', '-O', '-c', prog])
3417 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3418 self.assertEqual(child_flags, grandchild_flags)
3419
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003420#
3421# Test interaction with socket timeouts - see Issue #6056
3422#
3423
3424class TestTimeouts(unittest.TestCase):
3425 @classmethod
3426 def _test_timeout(cls, child, address):
3427 time.sleep(1)
3428 child.send(123)
3429 child.close()
3430 conn = multiprocessing.connection.Client(address)
3431 conn.send(456)
3432 conn.close()
3433
3434 def test_timeout(self):
3435 old_timeout = socket.getdefaulttimeout()
3436 try:
3437 socket.setdefaulttimeout(0.1)
3438 parent, child = multiprocessing.Pipe(duplex=True)
3439 l = multiprocessing.connection.Listener(family='AF_INET')
3440 p = multiprocessing.Process(target=self._test_timeout,
3441 args=(child, l.address))
3442 p.start()
3443 child.close()
3444 self.assertEqual(parent.recv(), 123)
3445 parent.close()
3446 conn = l.accept()
3447 self.assertEqual(conn.recv(), 456)
3448 conn.close()
3449 l.close()
3450 p.join(10)
3451 finally:
3452 socket.setdefaulttimeout(old_timeout)
3453
Richard Oudkerke88a2442012-08-14 11:41:32 +01003454#
3455# Test what happens with no "if __name__ == '__main__'"
3456#
3457
3458class TestNoForkBomb(unittest.TestCase):
3459 def test_noforkbomb(self):
3460 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3461 if WIN32:
3462 rc, out, err = test.script_helper.assert_python_failure(name)
3463 self.assertEqual('', out.decode('ascii'))
3464 self.assertIn('RuntimeError', err.decode('ascii'))
3465 else:
3466 rc, out, err = test.script_helper.assert_python_ok(name)
3467 self.assertEqual('123', out.decode('ascii').rstrip())
3468 self.assertEqual('', err.decode('ascii'))
3469
3470#
Richard Oudkerk409c3132013-04-17 20:58:00 +01003471# Issue #17555: ForkAwareThreadLock
3472#
3473
3474class TestForkAwareThreadLock(unittest.TestCase):
3475 # We recurisvely start processes. Issue #17555 meant that the
3476 # after fork registry would get duplicate entries for the same
3477 # lock. The size of the registry at generation n was ~2**n.
3478
3479 @classmethod
3480 def child(cls, n, conn):
3481 if n > 1:
3482 p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3483 p.start()
3484 p.join()
3485 else:
3486 conn.send(len(util._afterfork_registry))
3487 conn.close()
3488
3489 def test_lock(self):
3490 r, w = multiprocessing.Pipe(False)
3491 l = util.ForkAwareThreadLock()
3492 old_size = len(util._afterfork_registry)
3493 p = multiprocessing.Process(target=self.child, args=(5, w))
3494 p.start()
3495 new_size = r.recv()
3496 p.join()
3497 self.assertLessEqual(new_size, old_size)
3498
3499#
Richard Oudkerkcca8c532013-07-01 18:59:26 +01003500# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3501#
3502
3503class TestIgnoreEINTR(unittest.TestCase):
3504
3505 @classmethod
3506 def _test_ignore(cls, conn):
3507 def handler(signum, frame):
3508 pass
3509 signal.signal(signal.SIGUSR1, handler)
3510 conn.send('ready')
3511 x = conn.recv()
3512 conn.send(x)
3513 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
3514
3515 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3516 def test_ignore(self):
3517 conn, child_conn = multiprocessing.Pipe()
3518 try:
3519 p = multiprocessing.Process(target=self._test_ignore,
3520 args=(child_conn,))
3521 p.daemon = True
3522 p.start()
3523 child_conn.close()
3524 self.assertEqual(conn.recv(), 'ready')
3525 time.sleep(0.1)
3526 os.kill(p.pid, signal.SIGUSR1)
3527 time.sleep(0.1)
3528 conn.send(1234)
3529 self.assertEqual(conn.recv(), 1234)
3530 time.sleep(0.1)
3531 os.kill(p.pid, signal.SIGUSR1)
3532 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3533 time.sleep(0.1)
3534 p.join()
3535 finally:
3536 conn.close()
3537
3538 @classmethod
3539 def _test_ignore_listener(cls, conn):
3540 def handler(signum, frame):
3541 pass
3542 signal.signal(signal.SIGUSR1, handler)
3543 l = multiprocessing.connection.Listener()
3544 conn.send(l.address)
3545 a = l.accept()
3546 a.send('welcome')
3547
3548 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3549 def test_ignore_listener(self):
3550 conn, child_conn = multiprocessing.Pipe()
3551 try:
3552 p = multiprocessing.Process(target=self._test_ignore_listener,
3553 args=(child_conn,))
3554 p.daemon = True
3555 p.start()
3556 child_conn.close()
3557 address = conn.recv()
3558 time.sleep(0.1)
3559 os.kill(p.pid, signal.SIGUSR1)
3560 time.sleep(0.1)
3561 client = multiprocessing.connection.Client(address)
3562 self.assertEqual(client.recv(), 'welcome')
3563 p.join()
3564 finally:
3565 conn.close()
3566
3567#
Richard Oudkerke88a2442012-08-14 11:41:32 +01003568#
3569#
3570
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003571def setUpModule():
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003572 if sys.platform.startswith("linux"):
3573 try:
3574 lock = multiprocessing.RLock()
3575 except OSError:
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003576 raise unittest.SkipTest("OSError raises on RLock creation, "
3577 "see issue 3111!")
Charles-François Natali221ef672011-11-22 18:55:22 +01003578 check_enough_semaphores()
Benjamin Petersone711caf2008-06-11 16:44:04 +00003579 util.get_temp_dir() # creates temp directory for use by all processes
Benjamin Petersone711caf2008-06-11 16:44:04 +00003580 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3581
Benjamin Petersone711caf2008-06-11 16:44:04 +00003582
Richard Oudkerk14f5ee02013-07-19 22:53:42 +01003583def tearDownModule():
3584 # pause a bit so we don't get warning about dangling threads/processes
3585 time.sleep(0.5)
3586
3587
Benjamin Petersone711caf2008-06-11 16:44:04 +00003588if __name__ == '__main__':
Richard Oudkerkd15642e2013-07-16 15:33:41 +01003589 unittest.main()