blob: 3fb07f6a540b5d12eb9f50ef20be06d892aeb659 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
R. David Murraya21e4ca2009-03-31 23:16:50 +000022import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010023import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000024
Benjamin Petersone5384b02008-10-04 22:00:42 +000025
R. David Murraya21e4ca2009-03-31 23:16:50 +000026# Skip tests if _multiprocessing wasn't built.
27_multiprocessing = test.support.import_module('_multiprocessing')
28# Skip tests if sem_open implementation is broken.
29test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000030# import threading after _multiprocessing to raise a more revelant error
31# message: "No module named _multiprocessing". _multiprocessing is not compiled
32# without thread support.
33import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000034
Benjamin Petersone711caf2008-06-11 16:44:04 +000035import multiprocessing.dummy
36import multiprocessing.connection
37import multiprocessing.managers
38import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000039import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000040
Charles-François Natalibc8f0822011-09-20 20:36:51 +020041from multiprocessing import util
42
43try:
44 from multiprocessing import reduction
45 HAS_REDUCTION = True
46except ImportError:
47 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000048
Brian Curtinafa88b52010-10-07 01:12:19 +000049try:
50 from multiprocessing.sharedctypes import Value, copy
51 HAS_SHAREDCTYPES = True
52except ImportError:
53 HAS_SHAREDCTYPES = False
54
Antoine Pitroubcb39d42011-08-23 19:46:22 +020055try:
56 import msvcrt
57except ImportError:
58 msvcrt = None
59
Benjamin Petersone711caf2008-06-11 16:44:04 +000060#
61#
62#
63
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000064def latin(s):
65 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000066
Benjamin Petersone711caf2008-06-11 16:44:04 +000067#
68# Constants
69#
70
71LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000072#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000073
74DELTA = 0.1
75CHECK_TIMINGS = False # making true makes tests take a lot longer
76 # and can sometimes cause some non-serious
77 # failures because some calls block a bit
78 # longer than expected
79if CHECK_TIMINGS:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
81else:
82 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
83
84HAVE_GETVALUE = not getattr(_multiprocessing,
85 'HAVE_BROKEN_SEM_GETVALUE', False)
86
Jesse Noller6214edd2009-01-19 16:23:53 +000087WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020088
Richard Oudkerk59d54042012-05-10 16:11:12 +010089from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020090
Richard Oudkerk59d54042012-05-10 16:11:12 +010091def wait_for_handle(handle, timeout):
92 if timeout is not None and timeout < 0.0:
93 timeout = None
94 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000095
Antoine Pitroubcb39d42011-08-23 19:46:22 +020096try:
97 MAXFD = os.sysconf("SC_OPEN_MAX")
98except:
99 MAXFD = 256
100
Benjamin Petersone711caf2008-06-11 16:44:04 +0000101#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000102# Some tests require ctypes
103#
104
105try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000106 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000107except ImportError:
108 Structure = object
109 c_int = c_double = None
110
Charles-François Natali221ef672011-11-22 18:55:22 +0100111
112def check_enough_semaphores():
113 """Check that the system supports enough semaphores to run the test."""
114 # minimum number of semaphores available according to POSIX
115 nsems_min = 256
116 try:
117 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
118 except (AttributeError, ValueError):
119 # sysconf not available or setting not available
120 return
121 if nsems == -1 or nsems >= nsems_min:
122 return
123 raise unittest.SkipTest("The OS doesn't support enough semaphores "
124 "to run the test (required: %d)." % nsems_min)
125
126
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000127#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000128# Creates a wrapper for a function which records the time it takes to finish
129#
130
131class TimingWrapper(object):
132
133 def __init__(self, func):
134 self.func = func
135 self.elapsed = None
136
137 def __call__(self, *args, **kwds):
138 t = time.time()
139 try:
140 return self.func(*args, **kwds)
141 finally:
142 self.elapsed = time.time() - t
143
144#
145# Base class for test cases
146#
147
148class BaseTestCase(object):
149
150 ALLOWED_TYPES = ('processes', 'manager', 'threads')
151
152 def assertTimingAlmostEqual(self, a, b):
153 if CHECK_TIMINGS:
154 self.assertAlmostEqual(a, b, 1)
155
156 def assertReturnsIfImplemented(self, value, func, *args):
157 try:
158 res = func(*args)
159 except NotImplementedError:
160 pass
161 else:
162 return self.assertEqual(value, res)
163
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000164 # For the sanity of Windows users, rather than crashing or freezing in
165 # multiple ways.
166 def __reduce__(self, *args):
167 raise NotImplementedError("shouldn't try to pickle a test case")
168
169 __reduce_ex__ = __reduce__
170
Benjamin Petersone711caf2008-06-11 16:44:04 +0000171#
172# Return the value of a semaphore
173#
174
175def get_value(self):
176 try:
177 return self.get_value()
178 except AttributeError:
179 try:
180 return self._Semaphore__value
181 except AttributeError:
182 try:
183 return self._value
184 except AttributeError:
185 raise NotImplementedError
186
187#
188# Testcases
189#
190
191class _TestProcess(BaseTestCase):
192
193 ALLOWED_TYPES = ('processes', 'threads')
194
195 def test_current(self):
196 if self.TYPE == 'threads':
197 return
198
199 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000200 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000201
202 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000203 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000204 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000205 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000206 self.assertEqual(current.ident, os.getpid())
207 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000208
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000209 def test_daemon_argument(self):
210 if self.TYPE == "threads":
211 return
212
213 # By default uses the current process's daemon flag.
214 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000215 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000216 proc1 = self.Process(target=self._test, daemon=True)
217 self.assertTrue(proc1.daemon)
218 proc2 = self.Process(target=self._test, daemon=False)
219 self.assertFalse(proc2.daemon)
220
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000221 @classmethod
222 def _test(cls, q, *args, **kwds):
223 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000224 q.put(args)
225 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000226 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000227 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000228 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000229 q.put(current.pid)
230
231 def test_process(self):
232 q = self.Queue(1)
233 e = self.Event()
234 args = (q, 1, 2)
235 kwargs = {'hello':23, 'bye':2.54}
236 name = 'SomeProcess'
237 p = self.Process(
238 target=self._test, args=args, kwargs=kwargs, name=name
239 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000240 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000241 current = self.current_process()
242
243 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000244 self.assertEqual(p.authkey, current.authkey)
245 self.assertEqual(p.is_alive(), False)
246 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000247 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000248 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000249 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000250
251 p.start()
252
Ezio Melottib3aedd42010-11-20 19:04:17 +0000253 self.assertEqual(p.exitcode, None)
254 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256
Ezio Melottib3aedd42010-11-20 19:04:17 +0000257 self.assertEqual(q.get(), args[1:])
258 self.assertEqual(q.get(), kwargs)
259 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000260 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(q.get(), current.authkey)
262 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000263
264 p.join()
265
Ezio Melottib3aedd42010-11-20 19:04:17 +0000266 self.assertEqual(p.exitcode, 0)
267 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000268 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000269
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000270 @classmethod
271 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000272 time.sleep(1000)
273
274 def test_terminate(self):
275 if self.TYPE == 'threads':
276 return
277
278 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000279 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 p.start()
281
282 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000283 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000284 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285
Richard Oudkerk59d54042012-05-10 16:11:12 +0100286 join = TimingWrapper(p.join)
287
288 self.assertEqual(join(0), None)
289 self.assertTimingAlmostEqual(join.elapsed, 0.0)
290 self.assertEqual(p.is_alive(), True)
291
292 self.assertEqual(join(-1), None)
293 self.assertTimingAlmostEqual(join.elapsed, 0.0)
294 self.assertEqual(p.is_alive(), True)
295
Benjamin Petersone711caf2008-06-11 16:44:04 +0000296 p.terminate()
297
Benjamin Petersone711caf2008-06-11 16:44:04 +0000298 self.assertEqual(join(), None)
299 self.assertTimingAlmostEqual(join.elapsed, 0.0)
300
301 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000302 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000303
304 p.join()
305
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000306 # XXX sometimes get p.exitcode == 0 on Windows ...
307 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000308
309 def test_cpu_count(self):
310 try:
311 cpus = multiprocessing.cpu_count()
312 except NotImplementedError:
313 cpus = 1
314 self.assertTrue(type(cpus) is int)
315 self.assertTrue(cpus >= 1)
316
317 def test_active_children(self):
318 self.assertEqual(type(self.active_children()), list)
319
320 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000321 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000322
Jesus Cea94f964f2011-09-09 20:26:57 +0200323 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000324 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000325 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000326
327 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000328 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000329
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000330 @classmethod
331 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000332 from multiprocessing import forking
333 wconn.send(id)
334 if len(id) < 2:
335 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000336 p = cls.Process(
337 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000338 )
339 p.start()
340 p.join()
341
342 def test_recursion(self):
343 rconn, wconn = self.Pipe(duplex=False)
344 self._test_recursion(wconn, [])
345
346 time.sleep(DELTA)
347 result = []
348 while rconn.poll():
349 result.append(rconn.recv())
350
351 expected = [
352 [],
353 [0],
354 [0, 0],
355 [0, 1],
356 [1],
357 [1, 0],
358 [1, 1]
359 ]
360 self.assertEqual(result, expected)
361
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200362 @classmethod
363 def _test_sentinel(cls, event):
364 event.wait(10.0)
365
366 def test_sentinel(self):
367 if self.TYPE == "threads":
368 return
369 event = self.Event()
370 p = self.Process(target=self._test_sentinel, args=(event,))
371 with self.assertRaises(ValueError):
372 p.sentinel
373 p.start()
374 self.addCleanup(p.join)
375 sentinel = p.sentinel
376 self.assertIsInstance(sentinel, int)
377 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
378 event.set()
379 p.join()
380 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
381
Benjamin Petersone711caf2008-06-11 16:44:04 +0000382#
383#
384#
385
386class _UpperCaser(multiprocessing.Process):
387
388 def __init__(self):
389 multiprocessing.Process.__init__(self)
390 self.child_conn, self.parent_conn = multiprocessing.Pipe()
391
392 def run(self):
393 self.parent_conn.close()
394 for s in iter(self.child_conn.recv, None):
395 self.child_conn.send(s.upper())
396 self.child_conn.close()
397
398 def submit(self, s):
399 assert type(s) is str
400 self.parent_conn.send(s)
401 return self.parent_conn.recv()
402
403 def stop(self):
404 self.parent_conn.send(None)
405 self.parent_conn.close()
406 self.child_conn.close()
407
408class _TestSubclassingProcess(BaseTestCase):
409
410 ALLOWED_TYPES = ('processes',)
411
412 def test_subclassing(self):
413 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200414 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000415 uppercaser.start()
416 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
417 self.assertEqual(uppercaser.submit('world'), 'WORLD')
418 uppercaser.stop()
419 uppercaser.join()
420
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100421 def test_stderr_flush(self):
422 # sys.stderr is flushed at process shutdown (issue #13812)
423 if self.TYPE == "threads":
424 return
425
426 testfn = test.support.TESTFN
427 self.addCleanup(test.support.unlink, testfn)
428 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
429 proc.start()
430 proc.join()
431 with open(testfn, 'r') as f:
432 err = f.read()
433 # The whole traceback was printed
434 self.assertIn("ZeroDivisionError", err)
435 self.assertIn("test_multiprocessing.py", err)
436 self.assertIn("1/0 # MARKER", err)
437
438 @classmethod
439 def _test_stderr_flush(cls, testfn):
440 sys.stderr = open(testfn, 'w')
441 1/0 # MARKER
442
443
Richard Oudkerk29471de2012-06-06 19:04:57 +0100444 @classmethod
445 def _test_sys_exit(cls, reason, testfn):
446 sys.stderr = open(testfn, 'w')
447 sys.exit(reason)
448
449 def test_sys_exit(self):
450 # See Issue 13854
451 if self.TYPE == 'threads':
452 return
453
454 testfn = test.support.TESTFN
455 self.addCleanup(test.support.unlink, testfn)
456
457 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
458 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
459 p.daemon = True
460 p.start()
461 p.join(5)
462 self.assertEqual(p.exitcode, code)
463
464 with open(testfn, 'r') as f:
465 self.assertEqual(f.read().rstrip(), str(reason))
466
467 for reason in (True, False, 8):
468 p = self.Process(target=sys.exit, args=(reason,))
469 p.daemon = True
470 p.start()
471 p.join(5)
472 self.assertEqual(p.exitcode, reason)
473
Benjamin Petersone711caf2008-06-11 16:44:04 +0000474#
475#
476#
477
478def queue_empty(q):
479 if hasattr(q, 'empty'):
480 return q.empty()
481 else:
482 return q.qsize() == 0
483
484def queue_full(q, maxsize):
485 if hasattr(q, 'full'):
486 return q.full()
487 else:
488 return q.qsize() == maxsize
489
490
491class _TestQueue(BaseTestCase):
492
493
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000494 @classmethod
495 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000496 child_can_start.wait()
497 for i in range(6):
498 queue.get()
499 parent_can_continue.set()
500
501 def test_put(self):
502 MAXSIZE = 6
503 queue = self.Queue(maxsize=MAXSIZE)
504 child_can_start = self.Event()
505 parent_can_continue = self.Event()
506
507 proc = self.Process(
508 target=self._test_put,
509 args=(queue, child_can_start, parent_can_continue)
510 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000511 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000512 proc.start()
513
514 self.assertEqual(queue_empty(queue), True)
515 self.assertEqual(queue_full(queue, MAXSIZE), False)
516
517 queue.put(1)
518 queue.put(2, True)
519 queue.put(3, True, None)
520 queue.put(4, False)
521 queue.put(5, False, None)
522 queue.put_nowait(6)
523
524 # the values may be in buffer but not yet in pipe so sleep a bit
525 time.sleep(DELTA)
526
527 self.assertEqual(queue_empty(queue), False)
528 self.assertEqual(queue_full(queue, MAXSIZE), True)
529
530 put = TimingWrapper(queue.put)
531 put_nowait = TimingWrapper(queue.put_nowait)
532
533 self.assertRaises(pyqueue.Full, put, 7, False)
534 self.assertTimingAlmostEqual(put.elapsed, 0)
535
536 self.assertRaises(pyqueue.Full, put, 7, False, None)
537 self.assertTimingAlmostEqual(put.elapsed, 0)
538
539 self.assertRaises(pyqueue.Full, put_nowait, 7)
540 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
541
542 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
543 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
544
545 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
546 self.assertTimingAlmostEqual(put.elapsed, 0)
547
548 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
549 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
550
551 child_can_start.set()
552 parent_can_continue.wait()
553
554 self.assertEqual(queue_empty(queue), True)
555 self.assertEqual(queue_full(queue, MAXSIZE), False)
556
557 proc.join()
558
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000559 @classmethod
560 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000561 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000562 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000563 queue.put(2)
564 queue.put(3)
565 queue.put(4)
566 queue.put(5)
567 parent_can_continue.set()
568
569 def test_get(self):
570 queue = self.Queue()
571 child_can_start = self.Event()
572 parent_can_continue = self.Event()
573
574 proc = self.Process(
575 target=self._test_get,
576 args=(queue, child_can_start, parent_can_continue)
577 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000578 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000579 proc.start()
580
581 self.assertEqual(queue_empty(queue), True)
582
583 child_can_start.set()
584 parent_can_continue.wait()
585
586 time.sleep(DELTA)
587 self.assertEqual(queue_empty(queue), False)
588
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000589 # Hangs unexpectedly, remove for now
590 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000591 self.assertEqual(queue.get(True, None), 2)
592 self.assertEqual(queue.get(True), 3)
593 self.assertEqual(queue.get(timeout=1), 4)
594 self.assertEqual(queue.get_nowait(), 5)
595
596 self.assertEqual(queue_empty(queue), True)
597
598 get = TimingWrapper(queue.get)
599 get_nowait = TimingWrapper(queue.get_nowait)
600
601 self.assertRaises(pyqueue.Empty, get, False)
602 self.assertTimingAlmostEqual(get.elapsed, 0)
603
604 self.assertRaises(pyqueue.Empty, get, False, None)
605 self.assertTimingAlmostEqual(get.elapsed, 0)
606
607 self.assertRaises(pyqueue.Empty, get_nowait)
608 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
609
610 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
611 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
612
613 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
614 self.assertTimingAlmostEqual(get.elapsed, 0)
615
616 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
617 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
618
619 proc.join()
620
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000621 @classmethod
622 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000623 for i in range(10, 20):
624 queue.put(i)
625 # note that at this point the items may only be buffered, so the
626 # process cannot shutdown until the feeder thread has finished
627 # pushing items onto the pipe.
628
629 def test_fork(self):
630 # Old versions of Queue would fail to create a new feeder
631 # thread for a forked process if the original process had its
632 # own feeder thread. This test checks that this no longer
633 # happens.
634
635 queue = self.Queue()
636
637 # put items on queue so that main process starts a feeder thread
638 for i in range(10):
639 queue.put(i)
640
641 # wait to make sure thread starts before we fork a new process
642 time.sleep(DELTA)
643
644 # fork process
645 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200646 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000647 p.start()
648
649 # check that all expected items are in the queue
650 for i in range(20):
651 self.assertEqual(queue.get(), i)
652 self.assertRaises(pyqueue.Empty, queue.get, False)
653
654 p.join()
655
656 def test_qsize(self):
657 q = self.Queue()
658 try:
659 self.assertEqual(q.qsize(), 0)
660 except NotImplementedError:
661 return
662 q.put(1)
663 self.assertEqual(q.qsize(), 1)
664 q.put(5)
665 self.assertEqual(q.qsize(), 2)
666 q.get()
667 self.assertEqual(q.qsize(), 1)
668 q.get()
669 self.assertEqual(q.qsize(), 0)
670
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000671 @classmethod
672 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000673 for obj in iter(q.get, None):
674 time.sleep(DELTA)
675 q.task_done()
676
677 def test_task_done(self):
678 queue = self.JoinableQueue()
679
680 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000681 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000682
683 workers = [self.Process(target=self._test_task_done, args=(queue,))
684 for i in range(4)]
685
686 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200687 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000688 p.start()
689
690 for i in range(10):
691 queue.put(i)
692
693 queue.join()
694
695 for p in workers:
696 queue.put(None)
697
698 for p in workers:
699 p.join()
700
701#
702#
703#
704
705class _TestLock(BaseTestCase):
706
707 def test_lock(self):
708 lock = self.Lock()
709 self.assertEqual(lock.acquire(), True)
710 self.assertEqual(lock.acquire(False), False)
711 self.assertEqual(lock.release(), None)
712 self.assertRaises((ValueError, threading.ThreadError), lock.release)
713
714 def test_rlock(self):
715 lock = self.RLock()
716 self.assertEqual(lock.acquire(), True)
717 self.assertEqual(lock.acquire(), True)
718 self.assertEqual(lock.acquire(), True)
719 self.assertEqual(lock.release(), None)
720 self.assertEqual(lock.release(), None)
721 self.assertEqual(lock.release(), None)
722 self.assertRaises((AssertionError, RuntimeError), lock.release)
723
Jesse Nollerf8d00852009-03-31 03:25:07 +0000724 def test_lock_context(self):
725 with self.Lock():
726 pass
727
Benjamin Petersone711caf2008-06-11 16:44:04 +0000728
729class _TestSemaphore(BaseTestCase):
730
731 def _test_semaphore(self, sem):
732 self.assertReturnsIfImplemented(2, get_value, sem)
733 self.assertEqual(sem.acquire(), True)
734 self.assertReturnsIfImplemented(1, get_value, sem)
735 self.assertEqual(sem.acquire(), True)
736 self.assertReturnsIfImplemented(0, get_value, sem)
737 self.assertEqual(sem.acquire(False), False)
738 self.assertReturnsIfImplemented(0, get_value, sem)
739 self.assertEqual(sem.release(), None)
740 self.assertReturnsIfImplemented(1, get_value, sem)
741 self.assertEqual(sem.release(), None)
742 self.assertReturnsIfImplemented(2, get_value, sem)
743
744 def test_semaphore(self):
745 sem = self.Semaphore(2)
746 self._test_semaphore(sem)
747 self.assertEqual(sem.release(), None)
748 self.assertReturnsIfImplemented(3, get_value, sem)
749 self.assertEqual(sem.release(), None)
750 self.assertReturnsIfImplemented(4, get_value, sem)
751
752 def test_bounded_semaphore(self):
753 sem = self.BoundedSemaphore(2)
754 self._test_semaphore(sem)
755 # Currently fails on OS/X
756 #if HAVE_GETVALUE:
757 # self.assertRaises(ValueError, sem.release)
758 # self.assertReturnsIfImplemented(2, get_value, sem)
759
760 def test_timeout(self):
761 if self.TYPE != 'processes':
762 return
763
764 sem = self.Semaphore(0)
765 acquire = TimingWrapper(sem.acquire)
766
767 self.assertEqual(acquire(False), False)
768 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
769
770 self.assertEqual(acquire(False, None), False)
771 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
772
773 self.assertEqual(acquire(False, TIMEOUT1), False)
774 self.assertTimingAlmostEqual(acquire.elapsed, 0)
775
776 self.assertEqual(acquire(True, TIMEOUT2), False)
777 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
778
779 self.assertEqual(acquire(timeout=TIMEOUT3), False)
780 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
781
782
783class _TestCondition(BaseTestCase):
784
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000785 @classmethod
786 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787 cond.acquire()
788 sleeping.release()
789 cond.wait(timeout)
790 woken.release()
791 cond.release()
792
793 def check_invariant(self, cond):
794 # this is only supposed to succeed when there are no sleepers
795 if self.TYPE == 'processes':
796 try:
797 sleepers = (cond._sleeping_count.get_value() -
798 cond._woken_count.get_value())
799 self.assertEqual(sleepers, 0)
800 self.assertEqual(cond._wait_semaphore.get_value(), 0)
801 except NotImplementedError:
802 pass
803
804 def test_notify(self):
805 cond = self.Condition()
806 sleeping = self.Semaphore(0)
807 woken = self.Semaphore(0)
808
809 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000810 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000811 p.start()
812
813 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000814 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000815 p.start()
816
817 # wait for both children to start sleeping
818 sleeping.acquire()
819 sleeping.acquire()
820
821 # check no process/thread has woken up
822 time.sleep(DELTA)
823 self.assertReturnsIfImplemented(0, get_value, woken)
824
825 # wake up one process/thread
826 cond.acquire()
827 cond.notify()
828 cond.release()
829
830 # check one process/thread has woken up
831 time.sleep(DELTA)
832 self.assertReturnsIfImplemented(1, get_value, woken)
833
834 # wake up another
835 cond.acquire()
836 cond.notify()
837 cond.release()
838
839 # check other has woken up
840 time.sleep(DELTA)
841 self.assertReturnsIfImplemented(2, get_value, woken)
842
843 # check state is not mucked up
844 self.check_invariant(cond)
845 p.join()
846
847 def test_notify_all(self):
848 cond = self.Condition()
849 sleeping = self.Semaphore(0)
850 woken = self.Semaphore(0)
851
852 # start some threads/processes which will timeout
853 for i in range(3):
854 p = self.Process(target=self.f,
855 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000856 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000857 p.start()
858
859 t = threading.Thread(target=self.f,
860 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000861 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000862 t.start()
863
864 # wait for them all to sleep
865 for i in range(6):
866 sleeping.acquire()
867
868 # check they have all timed out
869 for i in range(6):
870 woken.acquire()
871 self.assertReturnsIfImplemented(0, get_value, woken)
872
873 # check state is not mucked up
874 self.check_invariant(cond)
875
876 # start some more threads/processes
877 for i in range(3):
878 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000879 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000880 p.start()
881
882 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000883 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000884 t.start()
885
886 # wait for them to all sleep
887 for i in range(6):
888 sleeping.acquire()
889
890 # check no process/thread has woken up
891 time.sleep(DELTA)
892 self.assertReturnsIfImplemented(0, get_value, woken)
893
894 # wake them all up
895 cond.acquire()
896 cond.notify_all()
897 cond.release()
898
899 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200900 for i in range(10):
901 try:
902 if get_value(woken) == 6:
903 break
904 except NotImplementedError:
905 break
906 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000907 self.assertReturnsIfImplemented(6, get_value, woken)
908
909 # check state is not mucked up
910 self.check_invariant(cond)
911
912 def test_timeout(self):
913 cond = self.Condition()
914 wait = TimingWrapper(cond.wait)
915 cond.acquire()
916 res = wait(TIMEOUT1)
917 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000918 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000919 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
920
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200921 @classmethod
922 def _test_waitfor_f(cls, cond, state):
923 with cond:
924 state.value = 0
925 cond.notify()
926 result = cond.wait_for(lambda : state.value==4)
927 if not result or state.value != 4:
928 sys.exit(1)
929
930 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
931 def test_waitfor(self):
932 # based on test in test/lock_tests.py
933 cond = self.Condition()
934 state = self.Value('i', -1)
935
936 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
937 p.daemon = True
938 p.start()
939
940 with cond:
941 result = cond.wait_for(lambda : state.value==0)
942 self.assertTrue(result)
943 self.assertEqual(state.value, 0)
944
945 for i in range(4):
946 time.sleep(0.01)
947 with cond:
948 state.value += 1
949 cond.notify()
950
951 p.join(5)
952 self.assertFalse(p.is_alive())
953 self.assertEqual(p.exitcode, 0)
954
955 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100956 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
957 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200958 with cond:
959 expected = 0.1
960 dt = time.time()
961 result = cond.wait_for(lambda : state.value==4, timeout=expected)
962 dt = time.time() - dt
963 # borrow logic in assertTimeout() from test/lock_tests.py
964 if not result and expected * 0.6 < dt < expected * 10.0:
965 success.value = True
966
967 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
968 def test_waitfor_timeout(self):
969 # based on test in test/lock_tests.py
970 cond = self.Condition()
971 state = self.Value('i', 0)
972 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100973 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200974
975 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100976 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200977 p.daemon = True
978 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100979 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200980
981 # Only increment 3 times, so state == 4 is never reached.
982 for i in range(3):
983 time.sleep(0.01)
984 with cond:
985 state.value += 1
986 cond.notify()
987
988 p.join(5)
989 self.assertTrue(success.value)
990
Richard Oudkerk98449932012-06-05 13:15:29 +0100991 @classmethod
992 def _test_wait_result(cls, c, pid):
993 with c:
994 c.notify()
995 time.sleep(1)
996 if pid is not None:
997 os.kill(pid, signal.SIGINT)
998
999 def test_wait_result(self):
1000 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1001 pid = os.getpid()
1002 else:
1003 pid = None
1004
1005 c = self.Condition()
1006 with c:
1007 self.assertFalse(c.wait(0))
1008 self.assertFalse(c.wait(0.1))
1009
1010 p = self.Process(target=self._test_wait_result, args=(c, pid))
1011 p.start()
1012
1013 self.assertTrue(c.wait(10))
1014 if pid is not None:
1015 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1016
1017 p.join()
1018
Benjamin Petersone711caf2008-06-11 16:44:04 +00001019
1020class _TestEvent(BaseTestCase):
1021
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001022 @classmethod
1023 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001024 time.sleep(TIMEOUT2)
1025 event.set()
1026
1027 def test_event(self):
1028 event = self.Event()
1029 wait = TimingWrapper(event.wait)
1030
Ezio Melotti13925002011-03-16 11:05:33 +02001031 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001032 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001033 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001034
Benjamin Peterson965ce872009-04-05 21:24:58 +00001035 # Removed, threading.Event.wait() will return the value of the __flag
1036 # instead of None. API Shear with the semaphore backed mp.Event
1037 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001038 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001039 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001040 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1041
1042 event.set()
1043
1044 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001045 self.assertEqual(event.is_set(), True)
1046 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001047 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001048 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001049 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1050 # self.assertEqual(event.is_set(), True)
1051
1052 event.clear()
1053
1054 #self.assertEqual(event.is_set(), False)
1055
Jesus Cea94f964f2011-09-09 20:26:57 +02001056 p = self.Process(target=self._test_event, args=(event,))
1057 p.daemon = True
1058 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001059 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001060
1061#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001062# Tests for Barrier - adapted from tests in test/lock_tests.py
1063#
1064
1065# Many of the tests for threading.Barrier use a list as an atomic
1066# counter: a value is appended to increment the counter, and the
1067# length of the list gives the value. We use the class DummyList
1068# for the same purpose.
1069
1070class _DummyList(object):
1071
1072 def __init__(self):
1073 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1074 lock = multiprocessing.Lock()
1075 self.__setstate__((wrapper, lock))
1076 self._lengthbuf[0] = 0
1077
1078 def __setstate__(self, state):
1079 (self._wrapper, self._lock) = state
1080 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1081
1082 def __getstate__(self):
1083 return (self._wrapper, self._lock)
1084
1085 def append(self, _):
1086 with self._lock:
1087 self._lengthbuf[0] += 1
1088
1089 def __len__(self):
1090 with self._lock:
1091 return self._lengthbuf[0]
1092
1093def _wait():
1094 # A crude wait/yield function not relying on synchronization primitives.
1095 time.sleep(0.01)
1096
1097
1098class Bunch(object):
1099 """
1100 A bunch of threads.
1101 """
1102 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1103 """
1104 Construct a bunch of `n` threads running the same function `f`.
1105 If `wait_before_exit` is True, the threads won't terminate until
1106 do_finish() is called.
1107 """
1108 self.f = f
1109 self.args = args
1110 self.n = n
1111 self.started = namespace.DummyList()
1112 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001113 self._can_exit = namespace.Event()
1114 if not wait_before_exit:
1115 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001116 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001117 p = namespace.Process(target=self.task)
1118 p.daemon = True
1119 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001120
1121 def task(self):
1122 pid = os.getpid()
1123 self.started.append(pid)
1124 try:
1125 self.f(*self.args)
1126 finally:
1127 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001128 self._can_exit.wait(30)
1129 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001130
1131 def wait_for_started(self):
1132 while len(self.started) < self.n:
1133 _wait()
1134
1135 def wait_for_finished(self):
1136 while len(self.finished) < self.n:
1137 _wait()
1138
1139 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001140 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001141
1142
1143class AppendTrue(object):
1144 def __init__(self, obj):
1145 self.obj = obj
1146 def __call__(self):
1147 self.obj.append(True)
1148
1149
1150class _TestBarrier(BaseTestCase):
1151 """
1152 Tests for Barrier objects.
1153 """
1154 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001155 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001156
1157 def setUp(self):
1158 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1159
1160 def tearDown(self):
1161 self.barrier.abort()
1162 self.barrier = None
1163
1164 def DummyList(self):
1165 if self.TYPE == 'threads':
1166 return []
1167 elif self.TYPE == 'manager':
1168 return self.manager.list()
1169 else:
1170 return _DummyList()
1171
1172 def run_threads(self, f, args):
1173 b = Bunch(self, f, args, self.N-1)
1174 f(*args)
1175 b.wait_for_finished()
1176
1177 @classmethod
1178 def multipass(cls, barrier, results, n):
1179 m = barrier.parties
1180 assert m == cls.N
1181 for i in range(n):
1182 results[0].append(True)
1183 assert len(results[1]) == i * m
1184 barrier.wait()
1185 results[1].append(True)
1186 assert len(results[0]) == (i + 1) * m
1187 barrier.wait()
1188 try:
1189 assert barrier.n_waiting == 0
1190 except NotImplementedError:
1191 pass
1192 assert not barrier.broken
1193
1194 def test_barrier(self, passes=1):
1195 """
1196 Test that a barrier is passed in lockstep
1197 """
1198 results = [self.DummyList(), self.DummyList()]
1199 self.run_threads(self.multipass, (self.barrier, results, passes))
1200
1201 def test_barrier_10(self):
1202 """
1203 Test that a barrier works for 10 consecutive runs
1204 """
1205 return self.test_barrier(10)
1206
1207 @classmethod
1208 def _test_wait_return_f(cls, barrier, queue):
1209 res = barrier.wait()
1210 queue.put(res)
1211
1212 def test_wait_return(self):
1213 """
1214 test the return value from barrier.wait
1215 """
1216 queue = self.Queue()
1217 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1218 results = [queue.get() for i in range(self.N)]
1219 self.assertEqual(results.count(0), 1)
1220
1221 @classmethod
1222 def _test_action_f(cls, barrier, results):
1223 barrier.wait()
1224 if len(results) != 1:
1225 raise RuntimeError
1226
1227 def test_action(self):
1228 """
1229 Test the 'action' callback
1230 """
1231 results = self.DummyList()
1232 barrier = self.Barrier(self.N, action=AppendTrue(results))
1233 self.run_threads(self._test_action_f, (barrier, results))
1234 self.assertEqual(len(results), 1)
1235
1236 @classmethod
1237 def _test_abort_f(cls, barrier, results1, results2):
1238 try:
1239 i = barrier.wait()
1240 if i == cls.N//2:
1241 raise RuntimeError
1242 barrier.wait()
1243 results1.append(True)
1244 except threading.BrokenBarrierError:
1245 results2.append(True)
1246 except RuntimeError:
1247 barrier.abort()
1248
1249 def test_abort(self):
1250 """
1251 Test that an abort will put the barrier in a broken state
1252 """
1253 results1 = self.DummyList()
1254 results2 = self.DummyList()
1255 self.run_threads(self._test_abort_f,
1256 (self.barrier, results1, results2))
1257 self.assertEqual(len(results1), 0)
1258 self.assertEqual(len(results2), self.N-1)
1259 self.assertTrue(self.barrier.broken)
1260
1261 @classmethod
1262 def _test_reset_f(cls, barrier, results1, results2, results3):
1263 i = barrier.wait()
1264 if i == cls.N//2:
1265 # Wait until the other threads are all in the barrier.
1266 while barrier.n_waiting < cls.N-1:
1267 time.sleep(0.001)
1268 barrier.reset()
1269 else:
1270 try:
1271 barrier.wait()
1272 results1.append(True)
1273 except threading.BrokenBarrierError:
1274 results2.append(True)
1275 # Now, pass the barrier again
1276 barrier.wait()
1277 results3.append(True)
1278
1279 def test_reset(self):
1280 """
1281 Test that a 'reset' on a barrier frees the waiting threads
1282 """
1283 results1 = self.DummyList()
1284 results2 = self.DummyList()
1285 results3 = self.DummyList()
1286 self.run_threads(self._test_reset_f,
1287 (self.barrier, results1, results2, results3))
1288 self.assertEqual(len(results1), 0)
1289 self.assertEqual(len(results2), self.N-1)
1290 self.assertEqual(len(results3), self.N)
1291
1292 @classmethod
1293 def _test_abort_and_reset_f(cls, barrier, barrier2,
1294 results1, results2, results3):
1295 try:
1296 i = barrier.wait()
1297 if i == cls.N//2:
1298 raise RuntimeError
1299 barrier.wait()
1300 results1.append(True)
1301 except threading.BrokenBarrierError:
1302 results2.append(True)
1303 except RuntimeError:
1304 barrier.abort()
1305 # Synchronize and reset the barrier. Must synchronize first so
1306 # that everyone has left it when we reset, and after so that no
1307 # one enters it before the reset.
1308 if barrier2.wait() == cls.N//2:
1309 barrier.reset()
1310 barrier2.wait()
1311 barrier.wait()
1312 results3.append(True)
1313
1314 def test_abort_and_reset(self):
1315 """
1316 Test that a barrier can be reset after being broken.
1317 """
1318 results1 = self.DummyList()
1319 results2 = self.DummyList()
1320 results3 = self.DummyList()
1321 barrier2 = self.Barrier(self.N)
1322
1323 self.run_threads(self._test_abort_and_reset_f,
1324 (self.barrier, barrier2, results1, results2, results3))
1325 self.assertEqual(len(results1), 0)
1326 self.assertEqual(len(results2), self.N-1)
1327 self.assertEqual(len(results3), self.N)
1328
1329 @classmethod
1330 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001331 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001332 if i == cls.N//2:
1333 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001334 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001335 try:
1336 barrier.wait(0.5)
1337 except threading.BrokenBarrierError:
1338 results.append(True)
1339
1340 def test_timeout(self):
1341 """
1342 Test wait(timeout)
1343 """
1344 results = self.DummyList()
1345 self.run_threads(self._test_timeout_f, (self.barrier, results))
1346 self.assertEqual(len(results), self.barrier.parties)
1347
1348 @classmethod
1349 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001350 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001351 if i == cls.N//2:
1352 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001353 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001354 try:
1355 barrier.wait()
1356 except threading.BrokenBarrierError:
1357 results.append(True)
1358
1359 def test_default_timeout(self):
1360 """
1361 Test the barrier's default timeout
1362 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001363 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001364 results = self.DummyList()
1365 self.run_threads(self._test_default_timeout_f, (barrier, results))
1366 self.assertEqual(len(results), barrier.parties)
1367
1368 def test_single_thread(self):
1369 b = self.Barrier(1)
1370 b.wait()
1371 b.wait()
1372
1373 @classmethod
1374 def _test_thousand_f(cls, barrier, passes, conn, lock):
1375 for i in range(passes):
1376 barrier.wait()
1377 with lock:
1378 conn.send(i)
1379
1380 def test_thousand(self):
1381 if self.TYPE == 'manager':
1382 return
1383 passes = 1000
1384 lock = self.Lock()
1385 conn, child_conn = self.Pipe(False)
1386 for j in range(self.N):
1387 p = self.Process(target=self._test_thousand_f,
1388 args=(self.barrier, passes, child_conn, lock))
1389 p.start()
1390
1391 for i in range(passes):
1392 for j in range(self.N):
1393 self.assertEqual(conn.recv(), i)
1394
1395#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001396#
1397#
1398
1399class _TestValue(BaseTestCase):
1400
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001401 ALLOWED_TYPES = ('processes',)
1402
Benjamin Petersone711caf2008-06-11 16:44:04 +00001403 codes_values = [
1404 ('i', 4343, 24234),
1405 ('d', 3.625, -4.25),
1406 ('h', -232, 234),
1407 ('c', latin('x'), latin('y'))
1408 ]
1409
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001410 def setUp(self):
1411 if not HAS_SHAREDCTYPES:
1412 self.skipTest("requires multiprocessing.sharedctypes")
1413
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001414 @classmethod
1415 def _test(cls, values):
1416 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001417 sv.value = cv[2]
1418
1419
1420 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001421 if raw:
1422 values = [self.RawValue(code, value)
1423 for code, value, _ in self.codes_values]
1424 else:
1425 values = [self.Value(code, value)
1426 for code, value, _ in self.codes_values]
1427
1428 for sv, cv in zip(values, self.codes_values):
1429 self.assertEqual(sv.value, cv[1])
1430
1431 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001432 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001433 proc.start()
1434 proc.join()
1435
1436 for sv, cv in zip(values, self.codes_values):
1437 self.assertEqual(sv.value, cv[2])
1438
1439 def test_rawvalue(self):
1440 self.test_value(raw=True)
1441
1442 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001443 val1 = self.Value('i', 5)
1444 lock1 = val1.get_lock()
1445 obj1 = val1.get_obj()
1446
1447 val2 = self.Value('i', 5, lock=None)
1448 lock2 = val2.get_lock()
1449 obj2 = val2.get_obj()
1450
1451 lock = self.Lock()
1452 val3 = self.Value('i', 5, lock=lock)
1453 lock3 = val3.get_lock()
1454 obj3 = val3.get_obj()
1455 self.assertEqual(lock, lock3)
1456
Jesse Nollerb0516a62009-01-18 03:11:38 +00001457 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001458 self.assertFalse(hasattr(arr4, 'get_lock'))
1459 self.assertFalse(hasattr(arr4, 'get_obj'))
1460
Jesse Nollerb0516a62009-01-18 03:11:38 +00001461 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1462
1463 arr5 = self.RawValue('i', 5)
1464 self.assertFalse(hasattr(arr5, 'get_lock'))
1465 self.assertFalse(hasattr(arr5, 'get_obj'))
1466
Benjamin Petersone711caf2008-06-11 16:44:04 +00001467
1468class _TestArray(BaseTestCase):
1469
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001470 ALLOWED_TYPES = ('processes',)
1471
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001472 @classmethod
1473 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001474 for i in range(1, len(seq)):
1475 seq[i] += seq[i-1]
1476
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001477 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001478 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001479 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1480 if raw:
1481 arr = self.RawArray('i', seq)
1482 else:
1483 arr = self.Array('i', seq)
1484
1485 self.assertEqual(len(arr), len(seq))
1486 self.assertEqual(arr[3], seq[3])
1487 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1488
1489 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1490
1491 self.assertEqual(list(arr[:]), seq)
1492
1493 self.f(seq)
1494
1495 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001496 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001497 p.start()
1498 p.join()
1499
1500 self.assertEqual(list(arr[:]), seq)
1501
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001502 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001503 def test_array_from_size(self):
1504 size = 10
1505 # Test for zeroing (see issue #11675).
1506 # The repetition below strengthens the test by increasing the chances
1507 # of previously allocated non-zero memory being used for the new array
1508 # on the 2nd and 3rd loops.
1509 for _ in range(3):
1510 arr = self.Array('i', size)
1511 self.assertEqual(len(arr), size)
1512 self.assertEqual(list(arr), [0] * size)
1513 arr[:] = range(10)
1514 self.assertEqual(list(arr), list(range(10)))
1515 del arr
1516
1517 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001518 def test_rawarray(self):
1519 self.test_array(raw=True)
1520
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001521 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001522 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001523 arr1 = self.Array('i', list(range(10)))
1524 lock1 = arr1.get_lock()
1525 obj1 = arr1.get_obj()
1526
1527 arr2 = self.Array('i', list(range(10)), lock=None)
1528 lock2 = arr2.get_lock()
1529 obj2 = arr2.get_obj()
1530
1531 lock = self.Lock()
1532 arr3 = self.Array('i', list(range(10)), lock=lock)
1533 lock3 = arr3.get_lock()
1534 obj3 = arr3.get_obj()
1535 self.assertEqual(lock, lock3)
1536
Jesse Nollerb0516a62009-01-18 03:11:38 +00001537 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001538 self.assertFalse(hasattr(arr4, 'get_lock'))
1539 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001540 self.assertRaises(AttributeError,
1541 self.Array, 'i', range(10), lock='notalock')
1542
1543 arr5 = self.RawArray('i', range(10))
1544 self.assertFalse(hasattr(arr5, 'get_lock'))
1545 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001546
1547#
1548#
1549#
1550
1551class _TestContainers(BaseTestCase):
1552
1553 ALLOWED_TYPES = ('manager',)
1554
1555 def test_list(self):
1556 a = self.list(list(range(10)))
1557 self.assertEqual(a[:], list(range(10)))
1558
1559 b = self.list()
1560 self.assertEqual(b[:], [])
1561
1562 b.extend(list(range(5)))
1563 self.assertEqual(b[:], list(range(5)))
1564
1565 self.assertEqual(b[2], 2)
1566 self.assertEqual(b[2:10], [2,3,4])
1567
1568 b *= 2
1569 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1570
1571 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1572
1573 self.assertEqual(a[:], list(range(10)))
1574
1575 d = [a, b]
1576 e = self.list(d)
1577 self.assertEqual(
1578 e[:],
1579 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1580 )
1581
1582 f = self.list([a])
1583 a.append('hello')
1584 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1585
1586 def test_dict(self):
1587 d = self.dict()
1588 indices = list(range(65, 70))
1589 for i in indices:
1590 d[i] = chr(i)
1591 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1592 self.assertEqual(sorted(d.keys()), indices)
1593 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1594 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1595
1596 def test_namespace(self):
1597 n = self.Namespace()
1598 n.name = 'Bob'
1599 n.job = 'Builder'
1600 n._hidden = 'hidden'
1601 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1602 del n.job
1603 self.assertEqual(str(n), "Namespace(name='Bob')")
1604 self.assertTrue(hasattr(n, 'name'))
1605 self.assertTrue(not hasattr(n, 'job'))
1606
1607#
1608#
1609#
1610
1611def sqr(x, wait=0.0):
1612 time.sleep(wait)
1613 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001614
Antoine Pitroude911b22011-12-21 11:03:24 +01001615def mul(x, y):
1616 return x*y
1617
Benjamin Petersone711caf2008-06-11 16:44:04 +00001618class _TestPool(BaseTestCase):
1619
1620 def test_apply(self):
1621 papply = self.pool.apply
1622 self.assertEqual(papply(sqr, (5,)), sqr(5))
1623 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1624
1625 def test_map(self):
1626 pmap = self.pool.map
1627 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1628 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1629 list(map(sqr, list(range(100)))))
1630
Antoine Pitroude911b22011-12-21 11:03:24 +01001631 def test_starmap(self):
1632 psmap = self.pool.starmap
1633 tuples = list(zip(range(10), range(9,-1, -1)))
1634 self.assertEqual(psmap(mul, tuples),
1635 list(itertools.starmap(mul, tuples)))
1636 tuples = list(zip(range(100), range(99,-1, -1)))
1637 self.assertEqual(psmap(mul, tuples, chunksize=20),
1638 list(itertools.starmap(mul, tuples)))
1639
1640 def test_starmap_async(self):
1641 tuples = list(zip(range(100), range(99,-1, -1)))
1642 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1643 list(itertools.starmap(mul, tuples)))
1644
Hynek Schlawack254af262012-10-27 12:53:02 +02001645 def test_map_async(self):
1646 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1647 list(map(sqr, list(range(10)))))
1648
1649 def test_map_async_callbacks(self):
1650 call_args = self.manager.list() if self.TYPE == 'manager' else []
1651 self.pool.map_async(int, ['1'],
1652 callback=call_args.append,
1653 error_callback=call_args.append).wait()
1654 self.assertEqual(1, len(call_args))
1655 self.assertEqual([1], call_args[0])
1656 self.pool.map_async(int, ['a'],
1657 callback=call_args.append,
1658 error_callback=call_args.append).wait()
1659 self.assertEqual(2, len(call_args))
1660 self.assertIsInstance(call_args[1], ValueError)
1661
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001662 def test_map_chunksize(self):
1663 try:
1664 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1665 except multiprocessing.TimeoutError:
1666 self.fail("pool.map_async with chunksize stalled on null list")
1667
Benjamin Petersone711caf2008-06-11 16:44:04 +00001668 def test_async(self):
1669 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1670 get = TimingWrapper(res.get)
1671 self.assertEqual(get(), 49)
1672 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1673
1674 def test_async_timeout(self):
1675 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1676 get = TimingWrapper(res.get)
1677 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1678 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1679
1680 def test_imap(self):
1681 it = self.pool.imap(sqr, list(range(10)))
1682 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1683
1684 it = self.pool.imap(sqr, list(range(10)))
1685 for i in range(10):
1686 self.assertEqual(next(it), i*i)
1687 self.assertRaises(StopIteration, it.__next__)
1688
1689 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1690 for i in range(1000):
1691 self.assertEqual(next(it), i*i)
1692 self.assertRaises(StopIteration, it.__next__)
1693
1694 def test_imap_unordered(self):
1695 it = self.pool.imap_unordered(sqr, list(range(1000)))
1696 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1697
1698 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1699 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1700
1701 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001702 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1703 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1704
Benjamin Petersone711caf2008-06-11 16:44:04 +00001705 p = multiprocessing.Pool(3)
1706 self.assertEqual(3, len(p._pool))
1707 p.close()
1708 p.join()
1709
1710 def test_terminate(self):
1711 if self.TYPE == 'manager':
1712 # On Unix a forked process increfs each shared object to
1713 # which its parent process held a reference. If the
1714 # forked process gets terminated then there is likely to
1715 # be a reference leak. So to prevent
1716 # _TestZZZNumberOfObjects from failing we skip this test
1717 # when using a manager.
1718 return
1719
1720 result = self.pool.map_async(
1721 time.sleep, [0.1 for i in range(10000)], chunksize=1
1722 )
1723 self.pool.terminate()
1724 join = TimingWrapper(self.pool.join)
1725 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001726 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001727
Richard Oudkerke41682b2012-06-06 19:04:57 +01001728 def test_empty_iterable(self):
1729 # See Issue 12157
1730 p = self.Pool(1)
1731
1732 self.assertEqual(p.map(sqr, []), [])
1733 self.assertEqual(list(p.imap(sqr, [])), [])
1734 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1735 self.assertEqual(p.map_async(sqr, []).get(), [])
1736
1737 p.close()
1738 p.join()
1739
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001740 def test_context(self):
1741 if self.TYPE == 'processes':
1742 L = list(range(10))
1743 expected = [sqr(i) for i in L]
1744 with multiprocessing.Pool(2) as p:
1745 r = p.map_async(sqr, L)
1746 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001747 print(p._state)
1748 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001749
Ask Solem2afcbf22010-11-09 20:55:52 +00001750def raising():
1751 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001752
Ask Solem2afcbf22010-11-09 20:55:52 +00001753def unpickleable_result():
1754 return lambda: 42
1755
1756class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001757 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001758
1759 def test_async_error_callback(self):
1760 p = multiprocessing.Pool(2)
1761
1762 scratchpad = [None]
1763 def errback(exc):
1764 scratchpad[0] = exc
1765
1766 res = p.apply_async(raising, error_callback=errback)
1767 self.assertRaises(KeyError, res.get)
1768 self.assertTrue(scratchpad[0])
1769 self.assertIsInstance(scratchpad[0], KeyError)
1770
1771 p.close()
1772 p.join()
1773
1774 def test_unpickleable_result(self):
1775 from multiprocessing.pool import MaybeEncodingError
1776 p = multiprocessing.Pool(2)
1777
1778 # Make sure we don't lose pool processes because of encoding errors.
1779 for iteration in range(20):
1780
1781 scratchpad = [None]
1782 def errback(exc):
1783 scratchpad[0] = exc
1784
1785 res = p.apply_async(unpickleable_result, error_callback=errback)
1786 self.assertRaises(MaybeEncodingError, res.get)
1787 wrapped = scratchpad[0]
1788 self.assertTrue(wrapped)
1789 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1790 self.assertIsNotNone(wrapped.exc)
1791 self.assertIsNotNone(wrapped.value)
1792
1793 p.close()
1794 p.join()
1795
1796class _TestPoolWorkerLifetime(BaseTestCase):
1797 ALLOWED_TYPES = ('processes', )
1798
Jesse Noller1f0b6582010-01-27 03:36:01 +00001799 def test_pool_worker_lifetime(self):
1800 p = multiprocessing.Pool(3, maxtasksperchild=10)
1801 self.assertEqual(3, len(p._pool))
1802 origworkerpids = [w.pid for w in p._pool]
1803 # Run many tasks so each worker gets replaced (hopefully)
1804 results = []
1805 for i in range(100):
1806 results.append(p.apply_async(sqr, (i, )))
1807 # Fetch the results and verify we got the right answers,
1808 # also ensuring all the tasks have completed.
1809 for (j, res) in enumerate(results):
1810 self.assertEqual(res.get(), sqr(j))
1811 # Refill the pool
1812 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001813 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001814 # (countdown * DELTA = 5 seconds max startup process time)
1815 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001816 while countdown and not all(w.is_alive() for w in p._pool):
1817 countdown -= 1
1818 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001819 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001820 # All pids should be assigned. See issue #7805.
1821 self.assertNotIn(None, origworkerpids)
1822 self.assertNotIn(None, finalworkerpids)
1823 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001824 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1825 p.close()
1826 p.join()
1827
Charles-François Natalif8859e12011-10-24 18:45:29 +02001828 def test_pool_worker_lifetime_early_close(self):
1829 # Issue #10332: closing a pool whose workers have limited lifetimes
1830 # before all the tasks completed would make join() hang.
1831 p = multiprocessing.Pool(3, maxtasksperchild=1)
1832 results = []
1833 for i in range(6):
1834 results.append(p.apply_async(sqr, (i, 0.3)))
1835 p.close()
1836 p.join()
1837 # check the results
1838 for (j, res) in enumerate(results):
1839 self.assertEqual(res.get(), sqr(j))
1840
1841
Benjamin Petersone711caf2008-06-11 16:44:04 +00001842#
1843# Test that manager has expected number of shared objects left
1844#
1845
1846class _TestZZZNumberOfObjects(BaseTestCase):
1847 # Because test cases are sorted alphabetically, this one will get
1848 # run after all the other tests for the manager. It tests that
1849 # there have been no "reference leaks" for the manager's shared
1850 # objects. Note the comment in _TestPool.test_terminate().
Richard Oudkerk3049f122012-06-15 20:08:29 +01001851
1852 # If some other test using ManagerMixin.manager fails, then the
1853 # raised exception may keep alive a frame which holds a reference
1854 # to a managed object. This will cause test_number_of_objects to
1855 # also fail.
Benjamin Petersone711caf2008-06-11 16:44:04 +00001856 ALLOWED_TYPES = ('manager',)
1857
1858 def test_number_of_objects(self):
1859 EXPECTED_NUMBER = 1 # the pool object is still alive
1860 multiprocessing.active_children() # discard dead process objs
1861 gc.collect() # do garbage collection
1862 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001863 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001864 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001865 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001866 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001867
1868 self.assertEqual(refs, EXPECTED_NUMBER)
1869
1870#
1871# Test of creating a customized manager class
1872#
1873
1874from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1875
1876class FooBar(object):
1877 def f(self):
1878 return 'f()'
1879 def g(self):
1880 raise ValueError
1881 def _h(self):
1882 return '_h()'
1883
1884def baz():
1885 for i in range(10):
1886 yield i*i
1887
1888class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001889 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001890 def __iter__(self):
1891 return self
1892 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001893 return self._callmethod('__next__')
1894
1895class MyManager(BaseManager):
1896 pass
1897
1898MyManager.register('Foo', callable=FooBar)
1899MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1900MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1901
1902
1903class _TestMyManager(BaseTestCase):
1904
1905 ALLOWED_TYPES = ('manager',)
1906
1907 def test_mymanager(self):
1908 manager = MyManager()
1909 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001910 self.common(manager)
1911 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001912
Richard Oudkerkac385712012-06-18 21:29:30 +01001913 # If the manager process exited cleanly then the exitcode
1914 # will be zero. Otherwise (after a short timeout)
1915 # terminate() is used, resulting in an exitcode of -SIGTERM.
1916 self.assertEqual(manager._process.exitcode, 0)
1917
1918 def test_mymanager_context(self):
1919 with MyManager() as manager:
1920 self.common(manager)
1921 self.assertEqual(manager._process.exitcode, 0)
1922
1923 def test_mymanager_context_prestarted(self):
1924 manager = MyManager()
1925 manager.start()
1926 with manager:
1927 self.common(manager)
1928 self.assertEqual(manager._process.exitcode, 0)
1929
1930 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001931 foo = manager.Foo()
1932 bar = manager.Bar()
1933 baz = manager.baz()
1934
1935 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1936 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1937
1938 self.assertEqual(foo_methods, ['f', 'g'])
1939 self.assertEqual(bar_methods, ['f', '_h'])
1940
1941 self.assertEqual(foo.f(), 'f()')
1942 self.assertRaises(ValueError, foo.g)
1943 self.assertEqual(foo._callmethod('f'), 'f()')
1944 self.assertRaises(RemoteError, foo._callmethod, '_h')
1945
1946 self.assertEqual(bar.f(), 'f()')
1947 self.assertEqual(bar._h(), '_h()')
1948 self.assertEqual(bar._callmethod('f'), 'f()')
1949 self.assertEqual(bar._callmethod('_h'), '_h()')
1950
1951 self.assertEqual(list(baz), [i*i for i in range(10)])
1952
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001953
Benjamin Petersone711caf2008-06-11 16:44:04 +00001954#
1955# Test of connecting to a remote server and using xmlrpclib for serialization
1956#
1957
1958_queue = pyqueue.Queue()
1959def get_queue():
1960 return _queue
1961
1962class QueueManager(BaseManager):
1963 '''manager class used by server process'''
1964QueueManager.register('get_queue', callable=get_queue)
1965
1966class QueueManager2(BaseManager):
1967 '''manager class which specifies the same interface as QueueManager'''
1968QueueManager2.register('get_queue')
1969
1970
1971SERIALIZER = 'xmlrpclib'
1972
1973class _TestRemoteManager(BaseTestCase):
1974
1975 ALLOWED_TYPES = ('manager',)
1976
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001977 @classmethod
1978 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001979 manager = QueueManager2(
1980 address=address, authkey=authkey, serializer=SERIALIZER
1981 )
1982 manager.connect()
1983 queue = manager.get_queue()
1984 queue.put(('hello world', None, True, 2.25))
1985
1986 def test_remote(self):
1987 authkey = os.urandom(32)
1988
1989 manager = QueueManager(
1990 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1991 )
1992 manager.start()
1993
1994 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001995 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001996 p.start()
1997
1998 manager2 = QueueManager2(
1999 address=manager.address, authkey=authkey, serializer=SERIALIZER
2000 )
2001 manager2.connect()
2002 queue = manager2.get_queue()
2003
2004 # Note that xmlrpclib will deserialize object as a list not a tuple
2005 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
2006
2007 # Because we are using xmlrpclib for serialization instead of
2008 # pickle this will cause a serialization error.
2009 self.assertRaises(Exception, queue.put, time.sleep)
2010
2011 # Make queue finalizer run before the server is stopped
2012 del queue
2013 manager.shutdown()
2014
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002015class _TestManagerRestart(BaseTestCase):
2016
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002017 @classmethod
2018 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002019 manager = QueueManager(
2020 address=address, authkey=authkey, serializer=SERIALIZER)
2021 manager.connect()
2022 queue = manager.get_queue()
2023 queue.put('hello world')
2024
2025 def test_rapid_restart(self):
2026 authkey = os.urandom(32)
2027 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002028 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002029 srvr = manager.get_server()
2030 addr = srvr.address
2031 # Close the connection.Listener socket which gets opened as a part
2032 # of manager.get_server(). It's not needed for the test.
2033 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002034 manager.start()
2035
2036 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002037 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002038 p.start()
2039 queue = manager.get_queue()
2040 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002041 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002042 manager.shutdown()
2043 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002044 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002045 try:
2046 manager.start()
2047 except IOError as e:
2048 if e.errno != errno.EADDRINUSE:
2049 raise
2050 # Retry after some time, in case the old socket was lingering
2051 # (sporadic failure on buildbots)
2052 time.sleep(1.0)
2053 manager = QueueManager(
2054 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002055 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002056
Benjamin Petersone711caf2008-06-11 16:44:04 +00002057#
2058#
2059#
2060
2061SENTINEL = latin('')
2062
2063class _TestConnection(BaseTestCase):
2064
2065 ALLOWED_TYPES = ('processes', 'threads')
2066
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002067 @classmethod
2068 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002069 for msg in iter(conn.recv_bytes, SENTINEL):
2070 conn.send_bytes(msg)
2071 conn.close()
2072
2073 def test_connection(self):
2074 conn, child_conn = self.Pipe()
2075
2076 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002077 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002078 p.start()
2079
2080 seq = [1, 2.25, None]
2081 msg = latin('hello world')
2082 longmsg = msg * 10
2083 arr = array.array('i', list(range(4)))
2084
2085 if self.TYPE == 'processes':
2086 self.assertEqual(type(conn.fileno()), int)
2087
2088 self.assertEqual(conn.send(seq), None)
2089 self.assertEqual(conn.recv(), seq)
2090
2091 self.assertEqual(conn.send_bytes(msg), None)
2092 self.assertEqual(conn.recv_bytes(), msg)
2093
2094 if self.TYPE == 'processes':
2095 buffer = array.array('i', [0]*10)
2096 expected = list(arr) + [0] * (10 - len(arr))
2097 self.assertEqual(conn.send_bytes(arr), None)
2098 self.assertEqual(conn.recv_bytes_into(buffer),
2099 len(arr) * buffer.itemsize)
2100 self.assertEqual(list(buffer), expected)
2101
2102 buffer = array.array('i', [0]*10)
2103 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2104 self.assertEqual(conn.send_bytes(arr), None)
2105 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2106 len(arr) * buffer.itemsize)
2107 self.assertEqual(list(buffer), expected)
2108
2109 buffer = bytearray(latin(' ' * 40))
2110 self.assertEqual(conn.send_bytes(longmsg), None)
2111 try:
2112 res = conn.recv_bytes_into(buffer)
2113 except multiprocessing.BufferTooShort as e:
2114 self.assertEqual(e.args, (longmsg,))
2115 else:
2116 self.fail('expected BufferTooShort, got %s' % res)
2117
2118 poll = TimingWrapper(conn.poll)
2119
2120 self.assertEqual(poll(), False)
2121 self.assertTimingAlmostEqual(poll.elapsed, 0)
2122
Richard Oudkerk59d54042012-05-10 16:11:12 +01002123 self.assertEqual(poll(-1), False)
2124 self.assertTimingAlmostEqual(poll.elapsed, 0)
2125
Benjamin Petersone711caf2008-06-11 16:44:04 +00002126 self.assertEqual(poll(TIMEOUT1), False)
2127 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2128
2129 conn.send(None)
Giampaolo Rodola'5e844c82012-12-31 17:23:09 +01002130 time.sleep(.1)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002131
2132 self.assertEqual(poll(TIMEOUT1), True)
2133 self.assertTimingAlmostEqual(poll.elapsed, 0)
2134
2135 self.assertEqual(conn.recv(), None)
2136
2137 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2138 conn.send_bytes(really_big_msg)
2139 self.assertEqual(conn.recv_bytes(), really_big_msg)
2140
2141 conn.send_bytes(SENTINEL) # tell child to quit
2142 child_conn.close()
2143
2144 if self.TYPE == 'processes':
2145 self.assertEqual(conn.readable, True)
2146 self.assertEqual(conn.writable, True)
2147 self.assertRaises(EOFError, conn.recv)
2148 self.assertRaises(EOFError, conn.recv_bytes)
2149
2150 p.join()
2151
2152 def test_duplex_false(self):
2153 reader, writer = self.Pipe(duplex=False)
2154 self.assertEqual(writer.send(1), None)
2155 self.assertEqual(reader.recv(), 1)
2156 if self.TYPE == 'processes':
2157 self.assertEqual(reader.readable, True)
2158 self.assertEqual(reader.writable, False)
2159 self.assertEqual(writer.readable, False)
2160 self.assertEqual(writer.writable, True)
2161 self.assertRaises(IOError, reader.send, 2)
2162 self.assertRaises(IOError, writer.recv)
2163 self.assertRaises(IOError, writer.poll)
2164
2165 def test_spawn_close(self):
2166 # We test that a pipe connection can be closed by parent
2167 # process immediately after child is spawned. On Windows this
2168 # would have sometimes failed on old versions because
2169 # child_conn would be closed before the child got a chance to
2170 # duplicate it.
2171 conn, child_conn = self.Pipe()
2172
2173 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002174 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002175 p.start()
2176 child_conn.close() # this might complete before child initializes
2177
2178 msg = latin('hello')
2179 conn.send_bytes(msg)
2180 self.assertEqual(conn.recv_bytes(), msg)
2181
2182 conn.send_bytes(SENTINEL)
2183 conn.close()
2184 p.join()
2185
2186 def test_sendbytes(self):
2187 if self.TYPE != 'processes':
2188 return
2189
2190 msg = latin('abcdefghijklmnopqrstuvwxyz')
2191 a, b = self.Pipe()
2192
2193 a.send_bytes(msg)
2194 self.assertEqual(b.recv_bytes(), msg)
2195
2196 a.send_bytes(msg, 5)
2197 self.assertEqual(b.recv_bytes(), msg[5:])
2198
2199 a.send_bytes(msg, 7, 8)
2200 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2201
2202 a.send_bytes(msg, 26)
2203 self.assertEqual(b.recv_bytes(), latin(''))
2204
2205 a.send_bytes(msg, 26, 0)
2206 self.assertEqual(b.recv_bytes(), latin(''))
2207
2208 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2209
2210 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2211
2212 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2213
2214 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2215
2216 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2217
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002218 @classmethod
2219 def _is_fd_assigned(cls, fd):
2220 try:
2221 os.fstat(fd)
2222 except OSError as e:
2223 if e.errno == errno.EBADF:
2224 return False
2225 raise
2226 else:
2227 return True
2228
2229 @classmethod
2230 def _writefd(cls, conn, data, create_dummy_fds=False):
2231 if create_dummy_fds:
2232 for i in range(0, 256):
2233 if not cls._is_fd_assigned(i):
2234 os.dup2(conn.fileno(), i)
2235 fd = reduction.recv_handle(conn)
2236 if msvcrt:
2237 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2238 os.write(fd, data)
2239 os.close(fd)
2240
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002241 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002242 def test_fd_transfer(self):
2243 if self.TYPE != 'processes':
2244 self.skipTest("only makes sense with processes")
2245 conn, child_conn = self.Pipe(duplex=True)
2246
2247 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002248 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002249 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002250 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002251 with open(test.support.TESTFN, "wb") as f:
2252 fd = f.fileno()
2253 if msvcrt:
2254 fd = msvcrt.get_osfhandle(fd)
2255 reduction.send_handle(conn, fd, p.pid)
2256 p.join()
2257 with open(test.support.TESTFN, "rb") as f:
2258 self.assertEqual(f.read(), b"foo")
2259
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002260 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002261 @unittest.skipIf(sys.platform == "win32",
2262 "test semantics don't make sense on Windows")
2263 @unittest.skipIf(MAXFD <= 256,
2264 "largest assignable fd number is too small")
2265 @unittest.skipUnless(hasattr(os, "dup2"),
2266 "test needs os.dup2()")
2267 def test_large_fd_transfer(self):
2268 # With fd > 256 (issue #11657)
2269 if self.TYPE != 'processes':
2270 self.skipTest("only makes sense with processes")
2271 conn, child_conn = self.Pipe(duplex=True)
2272
2273 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002274 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002275 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002276 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002277 with open(test.support.TESTFN, "wb") as f:
2278 fd = f.fileno()
2279 for newfd in range(256, MAXFD):
2280 if not self._is_fd_assigned(newfd):
2281 break
2282 else:
2283 self.fail("could not find an unassigned large file descriptor")
2284 os.dup2(fd, newfd)
2285 try:
2286 reduction.send_handle(conn, newfd, p.pid)
2287 finally:
2288 os.close(newfd)
2289 p.join()
2290 with open(test.support.TESTFN, "rb") as f:
2291 self.assertEqual(f.read(), b"bar")
2292
Jesus Cea4507e642011-09-21 03:53:25 +02002293 @classmethod
2294 def _send_data_without_fd(self, conn):
2295 os.write(conn.fileno(), b"\0")
2296
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002297 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002298 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2299 def test_missing_fd_transfer(self):
2300 # Check that exception is raised when received data is not
2301 # accompanied by a file descriptor in ancillary data.
2302 if self.TYPE != 'processes':
2303 self.skipTest("only makes sense with processes")
2304 conn, child_conn = self.Pipe(duplex=True)
2305
2306 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2307 p.daemon = True
2308 p.start()
2309 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2310 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002311
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002312 def test_context(self):
2313 a, b = self.Pipe()
2314
2315 with a, b:
2316 a.send(1729)
2317 self.assertEqual(b.recv(), 1729)
2318 if self.TYPE == 'processes':
2319 self.assertFalse(a.closed)
2320 self.assertFalse(b.closed)
2321
2322 if self.TYPE == 'processes':
2323 self.assertTrue(a.closed)
2324 self.assertTrue(b.closed)
2325 self.assertRaises(IOError, a.recv)
2326 self.assertRaises(IOError, b.recv)
2327
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002328class _TestListener(BaseTestCase):
2329
Richard Oudkerk91257752012-06-15 21:53:34 +01002330 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002331
2332 def test_multiple_bind(self):
2333 for family in self.connection.families:
2334 l = self.connection.Listener(family=family)
2335 self.addCleanup(l.close)
2336 self.assertRaises(OSError, self.connection.Listener,
2337 l.address, family)
2338
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002339 def test_context(self):
2340 with self.connection.Listener() as l:
2341 with self.connection.Client(l.address) as c:
2342 with l.accept() as d:
2343 c.send(1729)
2344 self.assertEqual(d.recv(), 1729)
2345
2346 if self.TYPE == 'processes':
2347 self.assertRaises(IOError, l.accept)
2348
Benjamin Petersone711caf2008-06-11 16:44:04 +00002349class _TestListenerClient(BaseTestCase):
2350
2351 ALLOWED_TYPES = ('processes', 'threads')
2352
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002353 @classmethod
2354 def _test(cls, address):
2355 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002356 conn.send('hello')
2357 conn.close()
2358
2359 def test_listener_client(self):
2360 for family in self.connection.families:
2361 l = self.connection.Listener(family=family)
2362 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002363 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002364 p.start()
2365 conn = l.accept()
2366 self.assertEqual(conn.recv(), 'hello')
2367 p.join()
2368 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002369
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002370 def test_issue14725(self):
2371 l = self.connection.Listener()
2372 p = self.Process(target=self._test, args=(l.address,))
2373 p.daemon = True
2374 p.start()
2375 time.sleep(1)
2376 # On Windows the client process should by now have connected,
2377 # written data and closed the pipe handle by now. This causes
2378 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2379 # 14725.
2380 conn = l.accept()
2381 self.assertEqual(conn.recv(), 'hello')
2382 conn.close()
2383 p.join()
2384 l.close()
2385
Richard Oudkerked9e06c2013-01-13 22:46:48 +00002386 def test_issue16955(self):
2387 for fam in self.connection.families:
2388 l = self.connection.Listener(family=fam)
2389 c = self.connection.Client(l.address)
2390 a = l.accept()
2391 a.send_bytes(b"hello")
2392 self.assertTrue(c.poll(1))
2393 a.close()
2394 c.close()
2395 l.close()
2396
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002397class _TestPoll(unittest.TestCase):
2398
2399 ALLOWED_TYPES = ('processes', 'threads')
2400
2401 def test_empty_string(self):
2402 a, b = self.Pipe()
2403 self.assertEqual(a.poll(), False)
2404 b.send_bytes(b'')
2405 self.assertEqual(a.poll(), True)
2406 self.assertEqual(a.poll(), True)
2407
2408 @classmethod
2409 def _child_strings(cls, conn, strings):
2410 for s in strings:
2411 time.sleep(0.1)
2412 conn.send_bytes(s)
2413 conn.close()
2414
2415 def test_strings(self):
2416 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2417 a, b = self.Pipe()
2418 p = self.Process(target=self._child_strings, args=(b, strings))
2419 p.start()
2420
2421 for s in strings:
2422 for i in range(200):
2423 if a.poll(0.01):
2424 break
2425 x = a.recv_bytes()
2426 self.assertEqual(s, x)
2427
2428 p.join()
2429
2430 @classmethod
2431 def _child_boundaries(cls, r):
2432 # Polling may "pull" a message in to the child process, but we
2433 # don't want it to pull only part of a message, as that would
2434 # corrupt the pipe for any other processes which might later
2435 # read from it.
2436 r.poll(5)
2437
2438 def test_boundaries(self):
2439 r, w = self.Pipe(False)
2440 p = self.Process(target=self._child_boundaries, args=(r,))
2441 p.start()
2442 time.sleep(2)
2443 L = [b"first", b"second"]
2444 for obj in L:
2445 w.send_bytes(obj)
2446 w.close()
2447 p.join()
2448 self.assertIn(r.recv_bytes(), L)
2449
2450 @classmethod
2451 def _child_dont_merge(cls, b):
2452 b.send_bytes(b'a')
2453 b.send_bytes(b'b')
2454 b.send_bytes(b'cd')
2455
2456 def test_dont_merge(self):
2457 a, b = self.Pipe()
2458 self.assertEqual(a.poll(0.0), False)
2459 self.assertEqual(a.poll(0.1), False)
2460
2461 p = self.Process(target=self._child_dont_merge, args=(b,))
2462 p.start()
2463
2464 self.assertEqual(a.recv_bytes(), b'a')
2465 self.assertEqual(a.poll(1.0), True)
2466 self.assertEqual(a.poll(1.0), True)
2467 self.assertEqual(a.recv_bytes(), b'b')
2468 self.assertEqual(a.poll(1.0), True)
2469 self.assertEqual(a.poll(1.0), True)
2470 self.assertEqual(a.poll(0.0), True)
2471 self.assertEqual(a.recv_bytes(), b'cd')
2472
2473 p.join()
2474
Benjamin Petersone711caf2008-06-11 16:44:04 +00002475#
2476# Test of sending connection and socket objects between processes
2477#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002478
2479@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002480class _TestPicklingConnections(BaseTestCase):
2481
2482 ALLOWED_TYPES = ('processes',)
2483
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002484 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002485 def tearDownClass(cls):
2486 from multiprocessing.reduction import resource_sharer
2487 resource_sharer.stop(timeout=5)
2488
2489 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002490 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002491 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002492 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002493 conn.send(l.address)
2494 new_conn = l.accept()
2495 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002496 new_conn.close()
2497 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002498
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002499 l = socket.socket()
2500 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002501 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002502 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002503 new_conn, addr = l.accept()
2504 conn.send(new_conn)
2505 new_conn.close()
2506 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002507
2508 conn.recv()
2509
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002510 @classmethod
2511 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002512 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002513 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002514 client.send(msg.upper())
2515 client.close()
2516
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002517 address, msg = conn.recv()
2518 client = socket.socket()
2519 client.connect(address)
2520 client.sendall(msg.upper())
2521 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002522
2523 conn.close()
2524
2525 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002526 families = self.connection.families
2527
2528 lconn, lconn0 = self.Pipe()
2529 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002530 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002531 lp.start()
2532 lconn0.close()
2533
2534 rconn, rconn0 = self.Pipe()
2535 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002536 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002537 rp.start()
2538 rconn0.close()
2539
2540 for fam in families:
2541 msg = ('This connection uses family %s' % fam).encode('ascii')
2542 address = lconn.recv()
2543 rconn.send((address, msg))
2544 new_conn = lconn.recv()
2545 self.assertEqual(new_conn.recv(), msg.upper())
2546
2547 rconn.send(None)
2548
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002549 msg = latin('This connection uses a normal socket')
2550 address = lconn.recv()
2551 rconn.send((address, msg))
2552 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002553 buf = []
2554 while True:
2555 s = new_conn.recv(100)
2556 if not s:
2557 break
2558 buf.append(s)
2559 buf = b''.join(buf)
2560 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002561 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002562
2563 lconn.send(None)
2564
2565 rconn.close()
2566 lconn.close()
2567
2568 lp.join()
2569 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002570
2571 @classmethod
2572 def child_access(cls, conn):
2573 w = conn.recv()
2574 w.send('all is well')
2575 w.close()
2576
2577 r = conn.recv()
2578 msg = r.recv()
2579 conn.send(msg*2)
2580
2581 conn.close()
2582
2583 def test_access(self):
2584 # On Windows, if we do not specify a destination pid when
2585 # using DupHandle then we need to be careful to use the
2586 # correct access flags for DuplicateHandle(), or else
2587 # DupHandle.detach() will raise PermissionError. For example,
2588 # for a read only pipe handle we should use
2589 # access=FILE_GENERIC_READ. (Unfortunately
2590 # DUPLICATE_SAME_ACCESS does not work.)
2591 conn, child_conn = self.Pipe()
2592 p = self.Process(target=self.child_access, args=(child_conn,))
2593 p.daemon = True
2594 p.start()
2595 child_conn.close()
2596
2597 r, w = self.Pipe(duplex=False)
2598 conn.send(w)
2599 w.close()
2600 self.assertEqual(r.recv(), 'all is well')
2601 r.close()
2602
2603 r, w = self.Pipe(duplex=False)
2604 conn.send(r)
2605 r.close()
2606 w.send('foobar')
2607 w.close()
2608 self.assertEqual(conn.recv(), 'foobar'*2)
2609
Benjamin Petersone711caf2008-06-11 16:44:04 +00002610#
2611#
2612#
2613
2614class _TestHeap(BaseTestCase):
2615
2616 ALLOWED_TYPES = ('processes',)
2617
2618 def test_heap(self):
2619 iterations = 5000
2620 maxblocks = 50
2621 blocks = []
2622
2623 # create and destroy lots of blocks of different sizes
2624 for i in range(iterations):
2625 size = int(random.lognormvariate(0, 1) * 1000)
2626 b = multiprocessing.heap.BufferWrapper(size)
2627 blocks.append(b)
2628 if len(blocks) > maxblocks:
2629 i = random.randrange(maxblocks)
2630 del blocks[i]
2631
2632 # get the heap object
2633 heap = multiprocessing.heap.BufferWrapper._heap
2634
2635 # verify the state of the heap
2636 all = []
2637 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002638 heap._lock.acquire()
2639 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002640 for L in list(heap._len_to_seq.values()):
2641 for arena, start, stop in L:
2642 all.append((heap._arenas.index(arena), start, stop,
2643 stop-start, 'free'))
2644 for arena, start, stop in heap._allocated_blocks:
2645 all.append((heap._arenas.index(arena), start, stop,
2646 stop-start, 'occupied'))
2647 occupied += (stop-start)
2648
2649 all.sort()
2650
2651 for i in range(len(all)-1):
2652 (arena, start, stop) = all[i][:3]
2653 (narena, nstart, nstop) = all[i+1][:3]
2654 self.assertTrue((arena != narena and nstart == 0) or
2655 (stop == nstart))
2656
Charles-François Natali778db492011-07-02 14:35:49 +02002657 def test_free_from_gc(self):
2658 # Check that freeing of blocks by the garbage collector doesn't deadlock
2659 # (issue #12352).
2660 # Make sure the GC is enabled, and set lower collection thresholds to
2661 # make collections more frequent (and increase the probability of
2662 # deadlock).
2663 if not gc.isenabled():
2664 gc.enable()
2665 self.addCleanup(gc.disable)
2666 thresholds = gc.get_threshold()
2667 self.addCleanup(gc.set_threshold, *thresholds)
2668 gc.set_threshold(10)
2669
2670 # perform numerous block allocations, with cyclic references to make
2671 # sure objects are collected asynchronously by the gc
2672 for i in range(5000):
2673 a = multiprocessing.heap.BufferWrapper(1)
2674 b = multiprocessing.heap.BufferWrapper(1)
2675 # circular references
2676 a.buddy = b
2677 b.buddy = a
2678
Benjamin Petersone711caf2008-06-11 16:44:04 +00002679#
2680#
2681#
2682
Benjamin Petersone711caf2008-06-11 16:44:04 +00002683class _Foo(Structure):
2684 _fields_ = [
2685 ('x', c_int),
2686 ('y', c_double)
2687 ]
2688
2689class _TestSharedCTypes(BaseTestCase):
2690
2691 ALLOWED_TYPES = ('processes',)
2692
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002693 def setUp(self):
2694 if not HAS_SHAREDCTYPES:
2695 self.skipTest("requires multiprocessing.sharedctypes")
2696
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002697 @classmethod
2698 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002699 x.value *= 2
2700 y.value *= 2
2701 foo.x *= 2
2702 foo.y *= 2
2703 string.value *= 2
2704 for i in range(len(arr)):
2705 arr[i] *= 2
2706
2707 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002708 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002709 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002710 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002711 arr = self.Array('d', list(range(10)), lock=lock)
2712 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002713 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002714
2715 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002716 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002717 p.start()
2718 p.join()
2719
2720 self.assertEqual(x.value, 14)
2721 self.assertAlmostEqual(y.value, 2.0/3.0)
2722 self.assertEqual(foo.x, 6)
2723 self.assertAlmostEqual(foo.y, 4.0)
2724 for i in range(10):
2725 self.assertAlmostEqual(arr[i], i*2)
2726 self.assertEqual(string.value, latin('hellohello'))
2727
2728 def test_synchronize(self):
2729 self.test_sharedctypes(lock=True)
2730
2731 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002732 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002733 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002734 foo.x = 0
2735 foo.y = 0
2736 self.assertEqual(bar.x, 2)
2737 self.assertAlmostEqual(bar.y, 5.0)
2738
2739#
2740#
2741#
2742
2743class _TestFinalize(BaseTestCase):
2744
2745 ALLOWED_TYPES = ('processes',)
2746
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002747 @classmethod
2748 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002749 class Foo(object):
2750 pass
2751
2752 a = Foo()
2753 util.Finalize(a, conn.send, args=('a',))
2754 del a # triggers callback for a
2755
2756 b = Foo()
2757 close_b = util.Finalize(b, conn.send, args=('b',))
2758 close_b() # triggers callback for b
2759 close_b() # does nothing because callback has already been called
2760 del b # does nothing because callback has already been called
2761
2762 c = Foo()
2763 util.Finalize(c, conn.send, args=('c',))
2764
2765 d10 = Foo()
2766 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2767
2768 d01 = Foo()
2769 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2770 d02 = Foo()
2771 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2772 d03 = Foo()
2773 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2774
2775 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2776
2777 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2778
Ezio Melotti13925002011-03-16 11:05:33 +02002779 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002780 # garbage collecting locals
2781 util._exit_function()
2782 conn.close()
2783 os._exit(0)
2784
2785 def test_finalize(self):
2786 conn, child_conn = self.Pipe()
2787
2788 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002789 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002790 p.start()
2791 p.join()
2792
2793 result = [obj for obj in iter(conn.recv, 'STOP')]
2794 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2795
2796#
2797# Test that from ... import * works for each module
2798#
2799
2800class _TestImportStar(BaseTestCase):
2801
2802 ALLOWED_TYPES = ('processes',)
2803
2804 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002805 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002806 'multiprocessing', 'multiprocessing.connection',
2807 'multiprocessing.heap', 'multiprocessing.managers',
2808 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002809 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002810 ]
2811
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002812 if HAS_REDUCTION:
2813 modules.append('multiprocessing.reduction')
2814
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002815 if c_int is not None:
2816 # This module requires _ctypes
2817 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002818
2819 for name in modules:
2820 __import__(name)
2821 mod = sys.modules[name]
2822
2823 for attr in getattr(mod, '__all__', ()):
2824 self.assertTrue(
2825 hasattr(mod, attr),
2826 '%r does not have attribute %r' % (mod, attr)
2827 )
2828
2829#
2830# Quick test that logging works -- does not test logging output
2831#
2832
2833class _TestLogging(BaseTestCase):
2834
2835 ALLOWED_TYPES = ('processes',)
2836
2837 def test_enable_logging(self):
2838 logger = multiprocessing.get_logger()
2839 logger.setLevel(util.SUBWARNING)
2840 self.assertTrue(logger is not None)
2841 logger.debug('this will not be printed')
2842 logger.info('nor will this')
2843 logger.setLevel(LOG_LEVEL)
2844
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002845 @classmethod
2846 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002847 logger = multiprocessing.get_logger()
2848 conn.send(logger.getEffectiveLevel())
2849
2850 def test_level(self):
2851 LEVEL1 = 32
2852 LEVEL2 = 37
2853
2854 logger = multiprocessing.get_logger()
2855 root_logger = logging.getLogger()
2856 root_level = root_logger.level
2857
2858 reader, writer = multiprocessing.Pipe(duplex=False)
2859
2860 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002861 p = self.Process(target=self._test_level, args=(writer,))
2862 p.daemon = True
2863 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002864 self.assertEqual(LEVEL1, reader.recv())
2865
2866 logger.setLevel(logging.NOTSET)
2867 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002868 p = self.Process(target=self._test_level, args=(writer,))
2869 p.daemon = True
2870 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002871 self.assertEqual(LEVEL2, reader.recv())
2872
2873 root_logger.setLevel(root_level)
2874 logger.setLevel(level=LOG_LEVEL)
2875
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002876
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002877# class _TestLoggingProcessName(BaseTestCase):
2878#
2879# def handle(self, record):
2880# assert record.processName == multiprocessing.current_process().name
2881# self.__handled = True
2882#
2883# def test_logging(self):
2884# handler = logging.Handler()
2885# handler.handle = self.handle
2886# self.__handled = False
2887# # Bypass getLogger() and side-effects
2888# logger = logging.getLoggerClass()(
2889# 'multiprocessing.test.TestLoggingProcessName')
2890# logger.addHandler(handler)
2891# logger.propagate = False
2892#
2893# logger.warn('foo')
2894# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002895
Benjamin Petersone711caf2008-06-11 16:44:04 +00002896#
Richard Oudkerk7aaa1ef2013-02-26 12:39:57 +00002897# Check that Process.join() retries if os.waitpid() fails with EINTR
2898#
2899
2900class _TestPollEintr(BaseTestCase):
2901
2902 ALLOWED_TYPES = ('processes',)
2903
2904 @classmethod
2905 def _killer(cls, pid):
2906 time.sleep(0.5)
2907 os.kill(pid, signal.SIGUSR1)
2908
2909 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2910 def test_poll_eintr(self):
2911 got_signal = [False]
2912 def record(*args):
2913 got_signal[0] = True
2914 pid = os.getpid()
2915 oldhandler = signal.signal(signal.SIGUSR1, record)
2916 try:
2917 killer = self.Process(target=self._killer, args=(pid,))
2918 killer.start()
2919 p = self.Process(target=time.sleep, args=(1,))
2920 p.start()
2921 p.join()
2922 self.assertTrue(got_signal[0])
2923 self.assertEqual(p.exitcode, 0)
2924 killer.join()
2925 finally:
2926 signal.signal(signal.SIGUSR1, oldhandler)
2927
2928#
Jesse Noller6214edd2009-01-19 16:23:53 +00002929# Test to verify handle verification, see issue 3321
2930#
2931
2932class TestInvalidHandle(unittest.TestCase):
2933
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002934 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002935 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002936 conn = multiprocessing.connection.Connection(44977608)
2937 try:
2938 self.assertRaises((ValueError, IOError), conn.poll)
2939 finally:
2940 # Hack private attribute _handle to avoid printing an error
2941 # in conn.__del__
2942 conn._handle = None
2943 self.assertRaises((ValueError, IOError),
2944 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002945
Jesse Noller6214edd2009-01-19 16:23:53 +00002946#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002947# Functions used to create test cases from the base ones in this module
2948#
2949
2950def get_attributes(Source, names):
2951 d = {}
2952 for name in names:
2953 obj = getattr(Source, name)
2954 if type(obj) == type(get_attributes):
2955 obj = staticmethod(obj)
2956 d[name] = obj
2957 return d
2958
2959def create_test_cases(Mixin, type):
2960 result = {}
2961 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002962 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002963 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002964
2965 for name in list(glob.keys()):
2966 if name.startswith('_Test'):
2967 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002968 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002969 if type in base.ALLOWED_TYPES:
2970 newname = 'With' + Type + name[1:]
2971 class Temp(base, unittest.TestCase, Mixin):
2972 pass
2973 result[newname] = Temp
2974 Temp.__name__ = newname
2975 Temp.__module__ = Mixin.__module__
2976 return result
2977
2978#
2979# Create test cases
2980#
2981
2982class ProcessesMixin(object):
2983 TYPE = 'processes'
2984 Process = multiprocessing.Process
2985 locals().update(get_attributes(multiprocessing, (
2986 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01002987 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'RawValue',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002988 'RawArray', 'current_process', 'active_children', 'Pipe',
Richard Oudkerke41682b2012-06-06 19:04:57 +01002989 'connection', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00002990 )))
2991
2992testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2993globals().update(testcases_processes)
2994
2995
2996class ManagerMixin(object):
2997 TYPE = 'manager'
2998 Process = multiprocessing.Process
2999 manager = object.__new__(multiprocessing.managers.SyncManager)
3000 locals().update(get_attributes(manager, (
3001 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01003002 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'list', 'dict',
Richard Oudkerke41682b2012-06-06 19:04:57 +01003003 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00003004 )))
3005
3006testcases_manager = create_test_cases(ManagerMixin, type='manager')
3007globals().update(testcases_manager)
3008
3009
3010class ThreadsMixin(object):
3011 TYPE = 'threads'
3012 Process = multiprocessing.dummy.Process
3013 locals().update(get_attributes(multiprocessing.dummy, (
3014 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
Richard Oudkerk3730a172012-06-15 18:26:07 +01003015 'Condition', 'Event', 'Barrier', 'Value', 'Array', 'current_process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00003016 'active_children', 'Pipe', 'connection', 'dict', 'list',
Richard Oudkerke41682b2012-06-06 19:04:57 +01003017 'Namespace', 'JoinableQueue', 'Pool'
Benjamin Petersone711caf2008-06-11 16:44:04 +00003018 )))
3019
3020testcases_threads = create_test_cases(ThreadsMixin, type='threads')
3021globals().update(testcases_threads)
3022
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003023class OtherTest(unittest.TestCase):
3024 # TODO: add more tests for deliver/answer challenge.
3025 def test_deliver_challenge_auth_failure(self):
3026 class _FakeConnection(object):
3027 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003028 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003029 def send_bytes(self, data):
3030 pass
3031 self.assertRaises(multiprocessing.AuthenticationError,
3032 multiprocessing.connection.deliver_challenge,
3033 _FakeConnection(), b'abc')
3034
3035 def test_answer_challenge_auth_failure(self):
3036 class _FakeConnection(object):
3037 def __init__(self):
3038 self.count = 0
3039 def recv_bytes(self, size):
3040 self.count += 1
3041 if self.count == 1:
3042 return multiprocessing.connection.CHALLENGE
3043 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003044 return b'something bogus'
3045 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003046 def send_bytes(self, data):
3047 pass
3048 self.assertRaises(multiprocessing.AuthenticationError,
3049 multiprocessing.connection.answer_challenge,
3050 _FakeConnection(), b'abc')
3051
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003052#
3053# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3054#
3055
3056def initializer(ns):
3057 ns.test += 1
3058
3059class TestInitializers(unittest.TestCase):
3060 def setUp(self):
3061 self.mgr = multiprocessing.Manager()
3062 self.ns = self.mgr.Namespace()
3063 self.ns.test = 0
3064
3065 def tearDown(self):
3066 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003067 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003068
3069 def test_manager_initializer(self):
3070 m = multiprocessing.managers.SyncManager()
3071 self.assertRaises(TypeError, m.start, 1)
3072 m.start(initializer, (self.ns,))
3073 self.assertEqual(self.ns.test, 1)
3074 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003075 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003076
3077 def test_pool_initializer(self):
3078 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3079 p = multiprocessing.Pool(1, initializer, (self.ns,))
3080 p.close()
3081 p.join()
3082 self.assertEqual(self.ns.test, 1)
3083
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003084#
3085# Issue 5155, 5313, 5331: Test process in processes
3086# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3087#
3088
3089def _ThisSubProcess(q):
3090 try:
3091 item = q.get(block=False)
3092 except pyqueue.Empty:
3093 pass
3094
3095def _TestProcess(q):
3096 queue = multiprocessing.Queue()
3097 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003098 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003099 subProc.start()
3100 subProc.join()
3101
3102def _afunc(x):
3103 return x*x
3104
3105def pool_in_process():
3106 pool = multiprocessing.Pool(processes=4)
3107 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003108 pool.close()
3109 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003110
3111class _file_like(object):
3112 def __init__(self, delegate):
3113 self._delegate = delegate
3114 self._pid = None
3115
3116 @property
3117 def cache(self):
3118 pid = os.getpid()
3119 # There are no race conditions since fork keeps only the running thread
3120 if pid != self._pid:
3121 self._pid = pid
3122 self._cache = []
3123 return self._cache
3124
3125 def write(self, data):
3126 self.cache.append(data)
3127
3128 def flush(self):
3129 self._delegate.write(''.join(self.cache))
3130 self._cache = []
3131
3132class TestStdinBadfiledescriptor(unittest.TestCase):
3133
3134 def test_queue_in_process(self):
3135 queue = multiprocessing.Queue()
3136 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3137 proc.start()
3138 proc.join()
3139
3140 def test_pool_in_process(self):
3141 p = multiprocessing.Process(target=pool_in_process)
3142 p.start()
3143 p.join()
3144
3145 def test_flushing(self):
3146 sio = io.StringIO()
3147 flike = _file_like(sio)
3148 flike.write('foo')
3149 proc = multiprocessing.Process(target=lambda: flike.flush())
3150 flike.flush()
3151 assert sio.getvalue() == 'foo'
3152
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003153
3154class TestWait(unittest.TestCase):
3155
3156 @classmethod
3157 def _child_test_wait(cls, w, slow):
3158 for i in range(10):
3159 if slow:
3160 time.sleep(random.random()*0.1)
3161 w.send((i, os.getpid()))
3162 w.close()
3163
3164 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003165 from multiprocessing.connection import wait
3166 readers = []
3167 procs = []
3168 messages = []
3169
3170 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003171 r, w = multiprocessing.Pipe(duplex=False)
3172 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003173 p.daemon = True
3174 p.start()
3175 w.close()
3176 readers.append(r)
3177 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003178 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003179
3180 while readers:
3181 for r in wait(readers):
3182 try:
3183 msg = r.recv()
3184 except EOFError:
3185 readers.remove(r)
3186 r.close()
3187 else:
3188 messages.append(msg)
3189
3190 messages.sort()
3191 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3192 self.assertEqual(messages, expected)
3193
3194 @classmethod
3195 def _child_test_wait_socket(cls, address, slow):
3196 s = socket.socket()
3197 s.connect(address)
3198 for i in range(10):
3199 if slow:
3200 time.sleep(random.random()*0.1)
3201 s.sendall(('%s\n' % i).encode('ascii'))
3202 s.close()
3203
3204 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003205 from multiprocessing.connection import wait
3206 l = socket.socket()
3207 l.bind(('', 0))
3208 l.listen(4)
3209 addr = ('localhost', l.getsockname()[1])
3210 readers = []
3211 procs = []
3212 dic = {}
3213
3214 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003215 p = multiprocessing.Process(target=self._child_test_wait_socket,
3216 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003217 p.daemon = True
3218 p.start()
3219 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003220 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003221
3222 for i in range(4):
3223 r, _ = l.accept()
3224 readers.append(r)
3225 dic[r] = []
3226 l.close()
3227
3228 while readers:
3229 for r in wait(readers):
3230 msg = r.recv(32)
3231 if not msg:
3232 readers.remove(r)
3233 r.close()
3234 else:
3235 dic[r].append(msg)
3236
3237 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3238 for v in dic.values():
3239 self.assertEqual(b''.join(v), expected)
3240
3241 def test_wait_slow(self):
3242 self.test_wait(True)
3243
3244 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003245 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003246
3247 def test_wait_timeout(self):
3248 from multiprocessing.connection import wait
3249
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003250 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003251 a, b = multiprocessing.Pipe()
3252
3253 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003254 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003255 delta = time.time() - start
3256
3257 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003258 self.assertLess(delta, expected * 2)
3259 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003260
3261 b.send(None)
3262
3263 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003264 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003265 delta = time.time() - start
3266
3267 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003268 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003269
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003270 @classmethod
3271 def signal_and_sleep(cls, sem, period):
3272 sem.release()
3273 time.sleep(period)
3274
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003275 def test_wait_integer(self):
3276 from multiprocessing.connection import wait
3277
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003278 expected = 3
Giampaolo Rodola'67da8942013-01-14 02:24:25 +01003279 sorted_ = lambda l: sorted(l, key=lambda x: id(x))
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003280 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003281 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003282 p = multiprocessing.Process(target=self.signal_and_sleep,
3283 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003284
3285 p.start()
3286 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003287 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003288
3289 start = time.time()
3290 res = wait([a, p.sentinel, b], expected + 20)
3291 delta = time.time() - start
3292
3293 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003294 self.assertLess(delta, expected + 2)
3295 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003296
3297 a.send(None)
3298
3299 start = time.time()
3300 res = wait([a, p.sentinel, b], 20)
3301 delta = time.time() - start
3302
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003303 self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003304 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003305
3306 b.send(None)
3307
3308 start = time.time()
3309 res = wait([a, p.sentinel, b], 20)
3310 delta = time.time() - start
3311
Giampaolo Rodola'5051ca82012-12-31 17:38:17 +01003312 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
Antoine Pitrou37749772012-03-09 18:40:15 +01003313 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003314
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003315 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003316 p.join()
3317
Richard Oudkerk59d54042012-05-10 16:11:12 +01003318 def test_neg_timeout(self):
3319 from multiprocessing.connection import wait
3320 a, b = multiprocessing.Pipe()
3321 t = time.time()
3322 res = wait([a], timeout=-1)
3323 t = time.time() - t
3324 self.assertEqual(res, [])
3325 self.assertLess(t, 1)
3326 a.close()
3327 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003328
Antoine Pitrou709176f2012-04-01 17:19:09 +02003329#
3330# Issue 14151: Test invalid family on invalid environment
3331#
3332
3333class TestInvalidFamily(unittest.TestCase):
3334
3335 @unittest.skipIf(WIN32, "skipped on Windows")
3336 def test_invalid_family(self):
3337 with self.assertRaises(ValueError):
3338 multiprocessing.connection.Listener(r'\\.\test')
3339
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003340 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3341 def test_invalid_family_win32(self):
3342 with self.assertRaises(ValueError):
3343 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003344
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003345#
3346# Issue 12098: check sys.flags of child matches that for parent
3347#
3348
3349class TestFlags(unittest.TestCase):
3350 @classmethod
3351 def run_in_grandchild(cls, conn):
3352 conn.send(tuple(sys.flags))
3353
3354 @classmethod
3355 def run_in_child(cls):
3356 import json
3357 r, w = multiprocessing.Pipe(duplex=False)
3358 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3359 p.start()
3360 grandchild_flags = r.recv()
3361 p.join()
3362 r.close()
3363 w.close()
3364 flags = (tuple(sys.flags), grandchild_flags)
3365 print(json.dumps(flags))
3366
3367 def test_flags(self):
3368 import json, subprocess
3369 # start child process using unusual flags
3370 prog = ('from test.test_multiprocessing import TestFlags; ' +
3371 'TestFlags.run_in_child()')
3372 data = subprocess.check_output(
3373 [sys.executable, '-E', '-S', '-O', '-c', prog])
3374 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3375 self.assertEqual(child_flags, grandchild_flags)
3376
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003377#
3378# Test interaction with socket timeouts - see Issue #6056
3379#
3380
3381class TestTimeouts(unittest.TestCase):
3382 @classmethod
3383 def _test_timeout(cls, child, address):
3384 time.sleep(1)
3385 child.send(123)
3386 child.close()
3387 conn = multiprocessing.connection.Client(address)
3388 conn.send(456)
3389 conn.close()
3390
3391 def test_timeout(self):
3392 old_timeout = socket.getdefaulttimeout()
3393 try:
3394 socket.setdefaulttimeout(0.1)
3395 parent, child = multiprocessing.Pipe(duplex=True)
3396 l = multiprocessing.connection.Listener(family='AF_INET')
3397 p = multiprocessing.Process(target=self._test_timeout,
3398 args=(child, l.address))
3399 p.start()
3400 child.close()
3401 self.assertEqual(parent.recv(), 123)
3402 parent.close()
3403 conn = l.accept()
3404 self.assertEqual(conn.recv(), 456)
3405 conn.close()
3406 l.close()
3407 p.join(10)
3408 finally:
3409 socket.setdefaulttimeout(old_timeout)
3410
Richard Oudkerke88a2442012-08-14 11:41:32 +01003411#
3412# Test what happens with no "if __name__ == '__main__'"
3413#
3414
3415class TestNoForkBomb(unittest.TestCase):
3416 def test_noforkbomb(self):
3417 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3418 if WIN32:
3419 rc, out, err = test.script_helper.assert_python_failure(name)
3420 self.assertEqual('', out.decode('ascii'))
3421 self.assertIn('RuntimeError', err.decode('ascii'))
3422 else:
3423 rc, out, err = test.script_helper.assert_python_ok(name)
3424 self.assertEqual('123', out.decode('ascii').rstrip())
3425 self.assertEqual('', err.decode('ascii'))
3426
3427#
3428#
3429#
3430
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003431testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003432 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
Richard Oudkerk3165a752012-08-14 12:51:14 +01003433 TestFlags, TestTimeouts, TestNoForkBomb]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003434
Benjamin Petersone711caf2008-06-11 16:44:04 +00003435#
3436#
3437#
3438
3439def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003440 if sys.platform.startswith("linux"):
3441 try:
3442 lock = multiprocessing.RLock()
3443 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00003444 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00003445
Charles-François Natali221ef672011-11-22 18:55:22 +01003446 check_enough_semaphores()
3447
Benjamin Petersone711caf2008-06-11 16:44:04 +00003448 if run is None:
3449 from test.support import run_unittest as run
3450
3451 util.get_temp_dir() # creates temp directory for use by all processes
3452
3453 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3454
Benjamin Peterson41181742008-07-02 20:22:54 +00003455 ProcessesMixin.pool = multiprocessing.Pool(4)
3456 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
3457 ManagerMixin.manager.__init__()
3458 ManagerMixin.manager.start()
3459 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003460
3461 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00003462 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
3463 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003464 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
3465 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00003466 )
3467
3468 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
3469 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003470 try:
3471 run(suite)
3472 finally:
3473 ThreadsMixin.pool.terminate()
3474 ProcessesMixin.pool.terminate()
3475 ManagerMixin.pool.terminate()
3476 ManagerMixin.pool.join()
3477 ManagerMixin.manager.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003478 ManagerMixin.manager.join()
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003479 ThreadsMixin.pool.join()
3480 ProcessesMixin.pool.join()
3481 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00003482
3483def main():
3484 test_main(unittest.TextTestRunner(verbosity=2).run)
3485
3486if __name__ == '__main__':
3487 main()