blob: 9c7a202d96cf7980271bfade84d379afc32569a0 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010022import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000023import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010024import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000025
Benjamin Petersone5384b02008-10-04 22:00:42 +000026
R. David Murraya21e4ca2009-03-31 23:16:50 +000027# Skip tests if _multiprocessing wasn't built.
28_multiprocessing = test.support.import_module('_multiprocessing')
29# Skip tests if sem_open implementation is broken.
30test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000031# import threading after _multiprocessing to raise a more revelant error
32# message: "No module named _multiprocessing". _multiprocessing is not compiled
33# without thread support.
34import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000035
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.dummy
37import multiprocessing.connection
38import multiprocessing.managers
39import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Charles-François Natalibc8f0822011-09-20 20:36:51 +020042from multiprocessing import util
43
44try:
45 from multiprocessing import reduction
46 HAS_REDUCTION = True
47except ImportError:
48 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Brian Curtinafa88b52010-10-07 01:12:19 +000050try:
51 from multiprocessing.sharedctypes import Value, copy
52 HAS_SHAREDCTYPES = True
53except ImportError:
54 HAS_SHAREDCTYPES = False
55
Antoine Pitroubcb39d42011-08-23 19:46:22 +020056try:
57 import msvcrt
58except ImportError:
59 msvcrt = None
60
Benjamin Petersone711caf2008-06-11 16:44:04 +000061#
62#
63#
64
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000065def latin(s):
66 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000067
Benjamin Petersone711caf2008-06-11 16:44:04 +000068#
69# Constants
70#
71
72LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000073#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000074
75DELTA = 0.1
76CHECK_TIMINGS = False # making true makes tests take a lot longer
77 # and can sometimes cause some non-serious
78 # failures because some calls block a bit
79 # longer than expected
80if CHECK_TIMINGS:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
82else:
83 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
84
85HAVE_GETVALUE = not getattr(_multiprocessing,
86 'HAVE_BROKEN_SEM_GETVALUE', False)
87
Jesse Noller6214edd2009-01-19 16:23:53 +000088WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020091
Richard Oudkerk59d54042012-05-10 16:11:12 +010092def wait_for_handle(handle, timeout):
93 if timeout is not None and timeout < 0.0:
94 timeout = None
95 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000096
Antoine Pitroubcb39d42011-08-23 19:46:22 +020097try:
98 MAXFD = os.sysconf("SC_OPEN_MAX")
99except:
100 MAXFD = 256
101
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000103# Some tests require ctypes
104#
105
106try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000107 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000108except ImportError:
109 Structure = object
110 c_int = c_double = None
111
Charles-François Natali221ef672011-11-22 18:55:22 +0100112
113def check_enough_semaphores():
114 """Check that the system supports enough semaphores to run the test."""
115 # minimum number of semaphores available according to POSIX
116 nsems_min = 256
117 try:
118 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
119 except (AttributeError, ValueError):
120 # sysconf not available or setting not available
121 return
122 if nsems == -1 or nsems >= nsems_min:
123 return
124 raise unittest.SkipTest("The OS doesn't support enough semaphores "
125 "to run the test (required: %d)." % nsems_min)
126
127
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000128#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129# Creates a wrapper for a function which records the time it takes to finish
130#
131
132class TimingWrapper(object):
133
134 def __init__(self, func):
135 self.func = func
136 self.elapsed = None
137
138 def __call__(self, *args, **kwds):
139 t = time.time()
140 try:
141 return self.func(*args, **kwds)
142 finally:
143 self.elapsed = time.time() - t
144
145#
146# Base class for test cases
147#
148
149class BaseTestCase(object):
150
151 ALLOWED_TYPES = ('processes', 'manager', 'threads')
152
153 def assertTimingAlmostEqual(self, a, b):
154 if CHECK_TIMINGS:
155 self.assertAlmostEqual(a, b, 1)
156
157 def assertReturnsIfImplemented(self, value, func, *args):
158 try:
159 res = func(*args)
160 except NotImplementedError:
161 pass
162 else:
163 return self.assertEqual(value, res)
164
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000165 # For the sanity of Windows users, rather than crashing or freezing in
166 # multiple ways.
167 def __reduce__(self, *args):
168 raise NotImplementedError("shouldn't try to pickle a test case")
169
170 __reduce_ex__ = __reduce__
171
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172#
173# Return the value of a semaphore
174#
175
176def get_value(self):
177 try:
178 return self.get_value()
179 except AttributeError:
180 try:
181 return self._Semaphore__value
182 except AttributeError:
183 try:
184 return self._value
185 except AttributeError:
186 raise NotImplementedError
187
188#
189# Testcases
190#
191
192class _TestProcess(BaseTestCase):
193
194 ALLOWED_TYPES = ('processes', 'threads')
195
196 def test_current(self):
197 if self.TYPE == 'threads':
198 return
199
200 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000205 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEqual(current.ident, os.getpid())
208 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000210 def test_daemon_argument(self):
211 if self.TYPE == "threads":
212 return
213
214 # By default uses the current process's daemon flag.
215 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000216 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 proc1 = self.Process(target=self._test, daemon=True)
218 self.assertTrue(proc1.daemon)
219 proc2 = self.Process(target=self._test, daemon=False)
220 self.assertFalse(proc2.daemon)
221
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000222 @classmethod
223 def _test(cls, q, *args, **kwds):
224 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 q.put(args)
226 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230 q.put(current.pid)
231
232 def test_process(self):
233 q = self.Queue(1)
234 e = self.Event()
235 args = (q, 1, 2)
236 kwargs = {'hello':23, 'bye':2.54}
237 name = 'SomeProcess'
238 p = self.Process(
239 target=self._test, args=args, kwargs=kwargs, name=name
240 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242 current = self.current_process()
243
244 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(p.authkey, current.authkey)
246 self.assertEqual(p.is_alive(), False)
247 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000250 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 p.start()
253
Ezio Melottib3aedd42010-11-20 19:04:17 +0000254 self.assertEqual(p.exitcode, None)
255 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000256 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(q.get(), args[1:])
259 self.assertEqual(q.get(), kwargs)
260 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000262 self.assertEqual(q.get(), current.authkey)
263 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
265 p.join()
266
Ezio Melottib3aedd42010-11-20 19:04:17 +0000267 self.assertEqual(p.exitcode, 0)
268 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 @classmethod
272 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 time.sleep(1000)
274
275 def test_terminate(self):
276 if self.TYPE == 'threads':
277 return
278
279 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000280 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000281 p.start()
282
283 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000285 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286
Richard Oudkerk59d54042012-05-10 16:11:12 +0100287 join = TimingWrapper(p.join)
288
289 self.assertEqual(join(0), None)
290 self.assertTimingAlmostEqual(join.elapsed, 0.0)
291 self.assertEqual(p.is_alive(), True)
292
293 self.assertEqual(join(-1), None)
294 self.assertTimingAlmostEqual(join.elapsed, 0.0)
295 self.assertEqual(p.is_alive(), True)
296
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 p.terminate()
298
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299 self.assertEqual(join(), None)
300 self.assertTimingAlmostEqual(join.elapsed, 0.0)
301
302 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000303 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
305 p.join()
306
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000307 # XXX sometimes get p.exitcode == 0 on Windows ...
308 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000309
310 def test_cpu_count(self):
311 try:
312 cpus = multiprocessing.cpu_count()
313 except NotImplementedError:
314 cpus = 1
315 self.assertTrue(type(cpus) is int)
316 self.assertTrue(cpus >= 1)
317
318 def test_active_children(self):
319 self.assertEqual(type(self.active_children()), list)
320
321 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000322 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323
Jesus Cea94f964f2011-09-09 20:26:57 +0200324 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000329 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000331 @classmethod
332 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 from multiprocessing import forking
334 wconn.send(id)
335 if len(id) < 2:
336 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000337 p = cls.Process(
338 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339 )
340 p.start()
341 p.join()
342
343 def test_recursion(self):
344 rconn, wconn = self.Pipe(duplex=False)
345 self._test_recursion(wconn, [])
346
347 time.sleep(DELTA)
348 result = []
349 while rconn.poll():
350 result.append(rconn.recv())
351
352 expected = [
353 [],
354 [0],
355 [0, 0],
356 [0, 1],
357 [1],
358 [1, 0],
359 [1, 1]
360 ]
361 self.assertEqual(result, expected)
362
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200363 @classmethod
364 def _test_sentinel(cls, event):
365 event.wait(10.0)
366
367 def test_sentinel(self):
368 if self.TYPE == "threads":
369 return
370 event = self.Event()
371 p = self.Process(target=self._test_sentinel, args=(event,))
372 with self.assertRaises(ValueError):
373 p.sentinel
374 p.start()
375 self.addCleanup(p.join)
376 sentinel = p.sentinel
377 self.assertIsInstance(sentinel, int)
378 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
379 event.set()
380 p.join()
381 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
382
Benjamin Petersone711caf2008-06-11 16:44:04 +0000383#
384#
385#
386
387class _UpperCaser(multiprocessing.Process):
388
389 def __init__(self):
390 multiprocessing.Process.__init__(self)
391 self.child_conn, self.parent_conn = multiprocessing.Pipe()
392
393 def run(self):
394 self.parent_conn.close()
395 for s in iter(self.child_conn.recv, None):
396 self.child_conn.send(s.upper())
397 self.child_conn.close()
398
399 def submit(self, s):
400 assert type(s) is str
401 self.parent_conn.send(s)
402 return self.parent_conn.recv()
403
404 def stop(self):
405 self.parent_conn.send(None)
406 self.parent_conn.close()
407 self.child_conn.close()
408
409class _TestSubclassingProcess(BaseTestCase):
410
411 ALLOWED_TYPES = ('processes',)
412
413 def test_subclassing(self):
414 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200415 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000416 uppercaser.start()
417 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
418 self.assertEqual(uppercaser.submit('world'), 'WORLD')
419 uppercaser.stop()
420 uppercaser.join()
421
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100422 def test_stderr_flush(self):
423 # sys.stderr is flushed at process shutdown (issue #13812)
424 if self.TYPE == "threads":
425 return
426
427 testfn = test.support.TESTFN
428 self.addCleanup(test.support.unlink, testfn)
429 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
430 proc.start()
431 proc.join()
432 with open(testfn, 'r') as f:
433 err = f.read()
434 # The whole traceback was printed
435 self.assertIn("ZeroDivisionError", err)
436 self.assertIn("test_multiprocessing.py", err)
437 self.assertIn("1/0 # MARKER", err)
438
439 @classmethod
440 def _test_stderr_flush(cls, testfn):
441 sys.stderr = open(testfn, 'w')
442 1/0 # MARKER
443
444
Richard Oudkerk29471de2012-06-06 19:04:57 +0100445 @classmethod
446 def _test_sys_exit(cls, reason, testfn):
447 sys.stderr = open(testfn, 'w')
448 sys.exit(reason)
449
450 def test_sys_exit(self):
451 # See Issue 13854
452 if self.TYPE == 'threads':
453 return
454
455 testfn = test.support.TESTFN
456 self.addCleanup(test.support.unlink, testfn)
457
458 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
459 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
460 p.daemon = True
461 p.start()
462 p.join(5)
463 self.assertEqual(p.exitcode, code)
464
465 with open(testfn, 'r') as f:
466 self.assertEqual(f.read().rstrip(), str(reason))
467
468 for reason in (True, False, 8):
469 p = self.Process(target=sys.exit, args=(reason,))
470 p.daemon = True
471 p.start()
472 p.join(5)
473 self.assertEqual(p.exitcode, reason)
474
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475#
476#
477#
478
479def queue_empty(q):
480 if hasattr(q, 'empty'):
481 return q.empty()
482 else:
483 return q.qsize() == 0
484
485def queue_full(q, maxsize):
486 if hasattr(q, 'full'):
487 return q.full()
488 else:
489 return q.qsize() == maxsize
490
491
492class _TestQueue(BaseTestCase):
493
494
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000495 @classmethod
496 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000497 child_can_start.wait()
498 for i in range(6):
499 queue.get()
500 parent_can_continue.set()
501
502 def test_put(self):
503 MAXSIZE = 6
504 queue = self.Queue(maxsize=MAXSIZE)
505 child_can_start = self.Event()
506 parent_can_continue = self.Event()
507
508 proc = self.Process(
509 target=self._test_put,
510 args=(queue, child_can_start, parent_can_continue)
511 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000512 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 proc.start()
514
515 self.assertEqual(queue_empty(queue), True)
516 self.assertEqual(queue_full(queue, MAXSIZE), False)
517
518 queue.put(1)
519 queue.put(2, True)
520 queue.put(3, True, None)
521 queue.put(4, False)
522 queue.put(5, False, None)
523 queue.put_nowait(6)
524
525 # the values may be in buffer but not yet in pipe so sleep a bit
526 time.sleep(DELTA)
527
528 self.assertEqual(queue_empty(queue), False)
529 self.assertEqual(queue_full(queue, MAXSIZE), True)
530
531 put = TimingWrapper(queue.put)
532 put_nowait = TimingWrapper(queue.put_nowait)
533
534 self.assertRaises(pyqueue.Full, put, 7, False)
535 self.assertTimingAlmostEqual(put.elapsed, 0)
536
537 self.assertRaises(pyqueue.Full, put, 7, False, None)
538 self.assertTimingAlmostEqual(put.elapsed, 0)
539
540 self.assertRaises(pyqueue.Full, put_nowait, 7)
541 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
542
543 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
544 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
545
546 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
547 self.assertTimingAlmostEqual(put.elapsed, 0)
548
549 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
550 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 self.assertEqual(queue_empty(queue), True)
556 self.assertEqual(queue_full(queue, MAXSIZE), False)
557
558 proc.join()
559
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000560 @classmethod
561 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000563 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000564 queue.put(2)
565 queue.put(3)
566 queue.put(4)
567 queue.put(5)
568 parent_can_continue.set()
569
570 def test_get(self):
571 queue = self.Queue()
572 child_can_start = self.Event()
573 parent_can_continue = self.Event()
574
575 proc = self.Process(
576 target=self._test_get,
577 args=(queue, child_can_start, parent_can_continue)
578 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000579 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000580 proc.start()
581
582 self.assertEqual(queue_empty(queue), True)
583
584 child_can_start.set()
585 parent_can_continue.wait()
586
587 time.sleep(DELTA)
588 self.assertEqual(queue_empty(queue), False)
589
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000590 # Hangs unexpectedly, remove for now
591 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 self.assertEqual(queue.get(True, None), 2)
593 self.assertEqual(queue.get(True), 3)
594 self.assertEqual(queue.get(timeout=1), 4)
595 self.assertEqual(queue.get_nowait(), 5)
596
597 self.assertEqual(queue_empty(queue), True)
598
599 get = TimingWrapper(queue.get)
600 get_nowait = TimingWrapper(queue.get_nowait)
601
602 self.assertRaises(pyqueue.Empty, get, False)
603 self.assertTimingAlmostEqual(get.elapsed, 0)
604
605 self.assertRaises(pyqueue.Empty, get, False, None)
606 self.assertTimingAlmostEqual(get.elapsed, 0)
607
608 self.assertRaises(pyqueue.Empty, get_nowait)
609 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
610
611 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
612 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
613
614 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
615 self.assertTimingAlmostEqual(get.elapsed, 0)
616
617 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
618 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
619
620 proc.join()
621
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000622 @classmethod
623 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000624 for i in range(10, 20):
625 queue.put(i)
626 # note that at this point the items may only be buffered, so the
627 # process cannot shutdown until the feeder thread has finished
628 # pushing items onto the pipe.
629
630 def test_fork(self):
631 # Old versions of Queue would fail to create a new feeder
632 # thread for a forked process if the original process had its
633 # own feeder thread. This test checks that this no longer
634 # happens.
635
636 queue = self.Queue()
637
638 # put items on queue so that main process starts a feeder thread
639 for i in range(10):
640 queue.put(i)
641
642 # wait to make sure thread starts before we fork a new process
643 time.sleep(DELTA)
644
645 # fork process
646 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200647 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000648 p.start()
649
650 # check that all expected items are in the queue
651 for i in range(20):
652 self.assertEqual(queue.get(), i)
653 self.assertRaises(pyqueue.Empty, queue.get, False)
654
655 p.join()
656
657 def test_qsize(self):
658 q = self.Queue()
659 try:
660 self.assertEqual(q.qsize(), 0)
661 except NotImplementedError:
662 return
663 q.put(1)
664 self.assertEqual(q.qsize(), 1)
665 q.put(5)
666 self.assertEqual(q.qsize(), 2)
667 q.get()
668 self.assertEqual(q.qsize(), 1)
669 q.get()
670 self.assertEqual(q.qsize(), 0)
671
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000672 @classmethod
673 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 for obj in iter(q.get, None):
675 time.sleep(DELTA)
676 q.task_done()
677
678 def test_task_done(self):
679 queue = self.JoinableQueue()
680
681 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000682 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683
684 workers = [self.Process(target=self._test_task_done, args=(queue,))
685 for i in range(4)]
686
687 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200688 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000689 p.start()
690
691 for i in range(10):
692 queue.put(i)
693
694 queue.join()
695
696 for p in workers:
697 queue.put(None)
698
699 for p in workers:
700 p.join()
701
702#
703#
704#
705
706class _TestLock(BaseTestCase):
707
708 def test_lock(self):
709 lock = self.Lock()
710 self.assertEqual(lock.acquire(), True)
711 self.assertEqual(lock.acquire(False), False)
712 self.assertEqual(lock.release(), None)
713 self.assertRaises((ValueError, threading.ThreadError), lock.release)
714
715 def test_rlock(self):
716 lock = self.RLock()
717 self.assertEqual(lock.acquire(), True)
718 self.assertEqual(lock.acquire(), True)
719 self.assertEqual(lock.acquire(), True)
720 self.assertEqual(lock.release(), None)
721 self.assertEqual(lock.release(), None)
722 self.assertEqual(lock.release(), None)
723 self.assertRaises((AssertionError, RuntimeError), lock.release)
724
Jesse Nollerf8d00852009-03-31 03:25:07 +0000725 def test_lock_context(self):
726 with self.Lock():
727 pass
728
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729
730class _TestSemaphore(BaseTestCase):
731
732 def _test_semaphore(self, sem):
733 self.assertReturnsIfImplemented(2, get_value, sem)
734 self.assertEqual(sem.acquire(), True)
735 self.assertReturnsIfImplemented(1, get_value, sem)
736 self.assertEqual(sem.acquire(), True)
737 self.assertReturnsIfImplemented(0, get_value, sem)
738 self.assertEqual(sem.acquire(False), False)
739 self.assertReturnsIfImplemented(0, get_value, sem)
740 self.assertEqual(sem.release(), None)
741 self.assertReturnsIfImplemented(1, get_value, sem)
742 self.assertEqual(sem.release(), None)
743 self.assertReturnsIfImplemented(2, get_value, sem)
744
745 def test_semaphore(self):
746 sem = self.Semaphore(2)
747 self._test_semaphore(sem)
748 self.assertEqual(sem.release(), None)
749 self.assertReturnsIfImplemented(3, get_value, sem)
750 self.assertEqual(sem.release(), None)
751 self.assertReturnsIfImplemented(4, get_value, sem)
752
753 def test_bounded_semaphore(self):
754 sem = self.BoundedSemaphore(2)
755 self._test_semaphore(sem)
756 # Currently fails on OS/X
757 #if HAVE_GETVALUE:
758 # self.assertRaises(ValueError, sem.release)
759 # self.assertReturnsIfImplemented(2, get_value, sem)
760
761 def test_timeout(self):
762 if self.TYPE != 'processes':
763 return
764
765 sem = self.Semaphore(0)
766 acquire = TimingWrapper(sem.acquire)
767
768 self.assertEqual(acquire(False), False)
769 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
770
771 self.assertEqual(acquire(False, None), False)
772 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
773
774 self.assertEqual(acquire(False, TIMEOUT1), False)
775 self.assertTimingAlmostEqual(acquire.elapsed, 0)
776
777 self.assertEqual(acquire(True, TIMEOUT2), False)
778 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
779
780 self.assertEqual(acquire(timeout=TIMEOUT3), False)
781 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
782
783
784class _TestCondition(BaseTestCase):
785
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000786 @classmethod
787 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000788 cond.acquire()
789 sleeping.release()
790 cond.wait(timeout)
791 woken.release()
792 cond.release()
793
794 def check_invariant(self, cond):
795 # this is only supposed to succeed when there are no sleepers
796 if self.TYPE == 'processes':
797 try:
798 sleepers = (cond._sleeping_count.get_value() -
799 cond._woken_count.get_value())
800 self.assertEqual(sleepers, 0)
801 self.assertEqual(cond._wait_semaphore.get_value(), 0)
802 except NotImplementedError:
803 pass
804
805 def test_notify(self):
806 cond = self.Condition()
807 sleeping = self.Semaphore(0)
808 woken = self.Semaphore(0)
809
810 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000811 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000812 p.start()
813
814 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000815 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816 p.start()
817
818 # wait for both children to start sleeping
819 sleeping.acquire()
820 sleeping.acquire()
821
822 # check no process/thread has woken up
823 time.sleep(DELTA)
824 self.assertReturnsIfImplemented(0, get_value, woken)
825
826 # wake up one process/thread
827 cond.acquire()
828 cond.notify()
829 cond.release()
830
831 # check one process/thread has woken up
832 time.sleep(DELTA)
833 self.assertReturnsIfImplemented(1, get_value, woken)
834
835 # wake up another
836 cond.acquire()
837 cond.notify()
838 cond.release()
839
840 # check other has woken up
841 time.sleep(DELTA)
842 self.assertReturnsIfImplemented(2, get_value, woken)
843
844 # check state is not mucked up
845 self.check_invariant(cond)
846 p.join()
847
848 def test_notify_all(self):
849 cond = self.Condition()
850 sleeping = self.Semaphore(0)
851 woken = self.Semaphore(0)
852
853 # start some threads/processes which will timeout
854 for i in range(3):
855 p = self.Process(target=self.f,
856 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000857 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858 p.start()
859
860 t = threading.Thread(target=self.f,
861 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000862 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863 t.start()
864
865 # wait for them all to sleep
866 for i in range(6):
867 sleeping.acquire()
868
869 # check they have all timed out
870 for i in range(6):
871 woken.acquire()
872 self.assertReturnsIfImplemented(0, get_value, woken)
873
874 # check state is not mucked up
875 self.check_invariant(cond)
876
877 # start some more threads/processes
878 for i in range(3):
879 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000880 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881 p.start()
882
883 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000884 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885 t.start()
886
887 # wait for them to all sleep
888 for i in range(6):
889 sleeping.acquire()
890
891 # check no process/thread has woken up
892 time.sleep(DELTA)
893 self.assertReturnsIfImplemented(0, get_value, woken)
894
895 # wake them all up
896 cond.acquire()
897 cond.notify_all()
898 cond.release()
899
900 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200901 for i in range(10):
902 try:
903 if get_value(woken) == 6:
904 break
905 except NotImplementedError:
906 break
907 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908 self.assertReturnsIfImplemented(6, get_value, woken)
909
910 # check state is not mucked up
911 self.check_invariant(cond)
912
913 def test_timeout(self):
914 cond = self.Condition()
915 wait = TimingWrapper(cond.wait)
916 cond.acquire()
917 res = wait(TIMEOUT1)
918 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000919 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
921
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200922 @classmethod
923 def _test_waitfor_f(cls, cond, state):
924 with cond:
925 state.value = 0
926 cond.notify()
927 result = cond.wait_for(lambda : state.value==4)
928 if not result or state.value != 4:
929 sys.exit(1)
930
931 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
932 def test_waitfor(self):
933 # based on test in test/lock_tests.py
934 cond = self.Condition()
935 state = self.Value('i', -1)
936
937 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
938 p.daemon = True
939 p.start()
940
941 with cond:
942 result = cond.wait_for(lambda : state.value==0)
943 self.assertTrue(result)
944 self.assertEqual(state.value, 0)
945
946 for i in range(4):
947 time.sleep(0.01)
948 with cond:
949 state.value += 1
950 cond.notify()
951
952 p.join(5)
953 self.assertFalse(p.is_alive())
954 self.assertEqual(p.exitcode, 0)
955
956 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100957 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
958 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200959 with cond:
960 expected = 0.1
961 dt = time.time()
962 result = cond.wait_for(lambda : state.value==4, timeout=expected)
963 dt = time.time() - dt
964 # borrow logic in assertTimeout() from test/lock_tests.py
965 if not result and expected * 0.6 < dt < expected * 10.0:
966 success.value = True
967
968 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
969 def test_waitfor_timeout(self):
970 # based on test in test/lock_tests.py
971 cond = self.Condition()
972 state = self.Value('i', 0)
973 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100974 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200975
976 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100977 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200978 p.daemon = True
979 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100980 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200981
982 # Only increment 3 times, so state == 4 is never reached.
983 for i in range(3):
984 time.sleep(0.01)
985 with cond:
986 state.value += 1
987 cond.notify()
988
989 p.join(5)
990 self.assertTrue(success.value)
991
Richard Oudkerk98449932012-06-05 13:15:29 +0100992 @classmethod
993 def _test_wait_result(cls, c, pid):
994 with c:
995 c.notify()
996 time.sleep(1)
997 if pid is not None:
998 os.kill(pid, signal.SIGINT)
999
1000 def test_wait_result(self):
1001 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1002 pid = os.getpid()
1003 else:
1004 pid = None
1005
1006 c = self.Condition()
1007 with c:
1008 self.assertFalse(c.wait(0))
1009 self.assertFalse(c.wait(0.1))
1010
1011 p = self.Process(target=self._test_wait_result, args=(c, pid))
1012 p.start()
1013
1014 self.assertTrue(c.wait(10))
1015 if pid is not None:
1016 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1017
1018 p.join()
1019
Benjamin Petersone711caf2008-06-11 16:44:04 +00001020
1021class _TestEvent(BaseTestCase):
1022
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001023 @classmethod
1024 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001025 time.sleep(TIMEOUT2)
1026 event.set()
1027
1028 def test_event(self):
1029 event = self.Event()
1030 wait = TimingWrapper(event.wait)
1031
Ezio Melotti13925002011-03-16 11:05:33 +02001032 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001034 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001035
Benjamin Peterson965ce872009-04-05 21:24:58 +00001036 # Removed, threading.Event.wait() will return the value of the __flag
1037 # instead of None. API Shear with the semaphore backed mp.Event
1038 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001040 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001041 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1042
1043 event.set()
1044
1045 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001046 self.assertEqual(event.is_set(), True)
1047 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001049 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1051 # self.assertEqual(event.is_set(), True)
1052
1053 event.clear()
1054
1055 #self.assertEqual(event.is_set(), False)
1056
Jesus Cea94f964f2011-09-09 20:26:57 +02001057 p = self.Process(target=self._test_event, args=(event,))
1058 p.daemon = True
1059 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001060 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001061
1062#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001063# Tests for Barrier - adapted from tests in test/lock_tests.py
1064#
1065
1066# Many of the tests for threading.Barrier use a list as an atomic
1067# counter: a value is appended to increment the counter, and the
1068# length of the list gives the value. We use the class DummyList
1069# for the same purpose.
1070
1071class _DummyList(object):
1072
1073 def __init__(self):
1074 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1075 lock = multiprocessing.Lock()
1076 self.__setstate__((wrapper, lock))
1077 self._lengthbuf[0] = 0
1078
1079 def __setstate__(self, state):
1080 (self._wrapper, self._lock) = state
1081 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1082
1083 def __getstate__(self):
1084 return (self._wrapper, self._lock)
1085
1086 def append(self, _):
1087 with self._lock:
1088 self._lengthbuf[0] += 1
1089
1090 def __len__(self):
1091 with self._lock:
1092 return self._lengthbuf[0]
1093
1094def _wait():
1095 # A crude wait/yield function not relying on synchronization primitives.
1096 time.sleep(0.01)
1097
1098
1099class Bunch(object):
1100 """
1101 A bunch of threads.
1102 """
1103 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1104 """
1105 Construct a bunch of `n` threads running the same function `f`.
1106 If `wait_before_exit` is True, the threads won't terminate until
1107 do_finish() is called.
1108 """
1109 self.f = f
1110 self.args = args
1111 self.n = n
1112 self.started = namespace.DummyList()
1113 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001114 self._can_exit = namespace.Event()
1115 if not wait_before_exit:
1116 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001117 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001118 p = namespace.Process(target=self.task)
1119 p.daemon = True
1120 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001121
1122 def task(self):
1123 pid = os.getpid()
1124 self.started.append(pid)
1125 try:
1126 self.f(*self.args)
1127 finally:
1128 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001129 self._can_exit.wait(30)
1130 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001131
1132 def wait_for_started(self):
1133 while len(self.started) < self.n:
1134 _wait()
1135
1136 def wait_for_finished(self):
1137 while len(self.finished) < self.n:
1138 _wait()
1139
1140 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001141 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001142
1143
1144class AppendTrue(object):
1145 def __init__(self, obj):
1146 self.obj = obj
1147 def __call__(self):
1148 self.obj.append(True)
1149
1150
1151class _TestBarrier(BaseTestCase):
1152 """
1153 Tests for Barrier objects.
1154 """
1155 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001156 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001157
1158 def setUp(self):
1159 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1160
1161 def tearDown(self):
1162 self.barrier.abort()
1163 self.barrier = None
1164
1165 def DummyList(self):
1166 if self.TYPE == 'threads':
1167 return []
1168 elif self.TYPE == 'manager':
1169 return self.manager.list()
1170 else:
1171 return _DummyList()
1172
1173 def run_threads(self, f, args):
1174 b = Bunch(self, f, args, self.N-1)
1175 f(*args)
1176 b.wait_for_finished()
1177
1178 @classmethod
1179 def multipass(cls, barrier, results, n):
1180 m = barrier.parties
1181 assert m == cls.N
1182 for i in range(n):
1183 results[0].append(True)
1184 assert len(results[1]) == i * m
1185 barrier.wait()
1186 results[1].append(True)
1187 assert len(results[0]) == (i + 1) * m
1188 barrier.wait()
1189 try:
1190 assert barrier.n_waiting == 0
1191 except NotImplementedError:
1192 pass
1193 assert not barrier.broken
1194
1195 def test_barrier(self, passes=1):
1196 """
1197 Test that a barrier is passed in lockstep
1198 """
1199 results = [self.DummyList(), self.DummyList()]
1200 self.run_threads(self.multipass, (self.barrier, results, passes))
1201
1202 def test_barrier_10(self):
1203 """
1204 Test that a barrier works for 10 consecutive runs
1205 """
1206 return self.test_barrier(10)
1207
1208 @classmethod
1209 def _test_wait_return_f(cls, barrier, queue):
1210 res = barrier.wait()
1211 queue.put(res)
1212
1213 def test_wait_return(self):
1214 """
1215 test the return value from barrier.wait
1216 """
1217 queue = self.Queue()
1218 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1219 results = [queue.get() for i in range(self.N)]
1220 self.assertEqual(results.count(0), 1)
1221
1222 @classmethod
1223 def _test_action_f(cls, barrier, results):
1224 barrier.wait()
1225 if len(results) != 1:
1226 raise RuntimeError
1227
1228 def test_action(self):
1229 """
1230 Test the 'action' callback
1231 """
1232 results = self.DummyList()
1233 barrier = self.Barrier(self.N, action=AppendTrue(results))
1234 self.run_threads(self._test_action_f, (barrier, results))
1235 self.assertEqual(len(results), 1)
1236
1237 @classmethod
1238 def _test_abort_f(cls, barrier, results1, results2):
1239 try:
1240 i = barrier.wait()
1241 if i == cls.N//2:
1242 raise RuntimeError
1243 barrier.wait()
1244 results1.append(True)
1245 except threading.BrokenBarrierError:
1246 results2.append(True)
1247 except RuntimeError:
1248 barrier.abort()
1249
1250 def test_abort(self):
1251 """
1252 Test that an abort will put the barrier in a broken state
1253 """
1254 results1 = self.DummyList()
1255 results2 = self.DummyList()
1256 self.run_threads(self._test_abort_f,
1257 (self.barrier, results1, results2))
1258 self.assertEqual(len(results1), 0)
1259 self.assertEqual(len(results2), self.N-1)
1260 self.assertTrue(self.barrier.broken)
1261
1262 @classmethod
1263 def _test_reset_f(cls, barrier, results1, results2, results3):
1264 i = barrier.wait()
1265 if i == cls.N//2:
1266 # Wait until the other threads are all in the barrier.
1267 while barrier.n_waiting < cls.N-1:
1268 time.sleep(0.001)
1269 barrier.reset()
1270 else:
1271 try:
1272 barrier.wait()
1273 results1.append(True)
1274 except threading.BrokenBarrierError:
1275 results2.append(True)
1276 # Now, pass the barrier again
1277 barrier.wait()
1278 results3.append(True)
1279
1280 def test_reset(self):
1281 """
1282 Test that a 'reset' on a barrier frees the waiting threads
1283 """
1284 results1 = self.DummyList()
1285 results2 = self.DummyList()
1286 results3 = self.DummyList()
1287 self.run_threads(self._test_reset_f,
1288 (self.barrier, results1, results2, results3))
1289 self.assertEqual(len(results1), 0)
1290 self.assertEqual(len(results2), self.N-1)
1291 self.assertEqual(len(results3), self.N)
1292
1293 @classmethod
1294 def _test_abort_and_reset_f(cls, barrier, barrier2,
1295 results1, results2, results3):
1296 try:
1297 i = barrier.wait()
1298 if i == cls.N//2:
1299 raise RuntimeError
1300 barrier.wait()
1301 results1.append(True)
1302 except threading.BrokenBarrierError:
1303 results2.append(True)
1304 except RuntimeError:
1305 barrier.abort()
1306 # Synchronize and reset the barrier. Must synchronize first so
1307 # that everyone has left it when we reset, and after so that no
1308 # one enters it before the reset.
1309 if barrier2.wait() == cls.N//2:
1310 barrier.reset()
1311 barrier2.wait()
1312 barrier.wait()
1313 results3.append(True)
1314
1315 def test_abort_and_reset(self):
1316 """
1317 Test that a barrier can be reset after being broken.
1318 """
1319 results1 = self.DummyList()
1320 results2 = self.DummyList()
1321 results3 = self.DummyList()
1322 barrier2 = self.Barrier(self.N)
1323
1324 self.run_threads(self._test_abort_and_reset_f,
1325 (self.barrier, barrier2, results1, results2, results3))
1326 self.assertEqual(len(results1), 0)
1327 self.assertEqual(len(results2), self.N-1)
1328 self.assertEqual(len(results3), self.N)
1329
1330 @classmethod
1331 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001332 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001333 if i == cls.N//2:
1334 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001335 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001336 try:
1337 barrier.wait(0.5)
1338 except threading.BrokenBarrierError:
1339 results.append(True)
1340
1341 def test_timeout(self):
1342 """
1343 Test wait(timeout)
1344 """
1345 results = self.DummyList()
1346 self.run_threads(self._test_timeout_f, (self.barrier, results))
1347 self.assertEqual(len(results), self.barrier.parties)
1348
1349 @classmethod
1350 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001351 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001352 if i == cls.N//2:
1353 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001354 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001355 try:
1356 barrier.wait()
1357 except threading.BrokenBarrierError:
1358 results.append(True)
1359
1360 def test_default_timeout(self):
1361 """
1362 Test the barrier's default timeout
1363 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001364 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001365 results = self.DummyList()
1366 self.run_threads(self._test_default_timeout_f, (barrier, results))
1367 self.assertEqual(len(results), barrier.parties)
1368
1369 def test_single_thread(self):
1370 b = self.Barrier(1)
1371 b.wait()
1372 b.wait()
1373
1374 @classmethod
1375 def _test_thousand_f(cls, barrier, passes, conn, lock):
1376 for i in range(passes):
1377 barrier.wait()
1378 with lock:
1379 conn.send(i)
1380
1381 def test_thousand(self):
1382 if self.TYPE == 'manager':
1383 return
1384 passes = 1000
1385 lock = self.Lock()
1386 conn, child_conn = self.Pipe(False)
1387 for j in range(self.N):
1388 p = self.Process(target=self._test_thousand_f,
1389 args=(self.barrier, passes, child_conn, lock))
1390 p.start()
1391
1392 for i in range(passes):
1393 for j in range(self.N):
1394 self.assertEqual(conn.recv(), i)
1395
1396#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001397#
1398#
1399
1400class _TestValue(BaseTestCase):
1401
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001402 ALLOWED_TYPES = ('processes',)
1403
Benjamin Petersone711caf2008-06-11 16:44:04 +00001404 codes_values = [
1405 ('i', 4343, 24234),
1406 ('d', 3.625, -4.25),
1407 ('h', -232, 234),
1408 ('c', latin('x'), latin('y'))
1409 ]
1410
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001411 def setUp(self):
1412 if not HAS_SHAREDCTYPES:
1413 self.skipTest("requires multiprocessing.sharedctypes")
1414
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001415 @classmethod
1416 def _test(cls, values):
1417 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001418 sv.value = cv[2]
1419
1420
1421 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001422 if raw:
1423 values = [self.RawValue(code, value)
1424 for code, value, _ in self.codes_values]
1425 else:
1426 values = [self.Value(code, value)
1427 for code, value, _ in self.codes_values]
1428
1429 for sv, cv in zip(values, self.codes_values):
1430 self.assertEqual(sv.value, cv[1])
1431
1432 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001433 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001434 proc.start()
1435 proc.join()
1436
1437 for sv, cv in zip(values, self.codes_values):
1438 self.assertEqual(sv.value, cv[2])
1439
1440 def test_rawvalue(self):
1441 self.test_value(raw=True)
1442
1443 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001444 val1 = self.Value('i', 5)
1445 lock1 = val1.get_lock()
1446 obj1 = val1.get_obj()
1447
1448 val2 = self.Value('i', 5, lock=None)
1449 lock2 = val2.get_lock()
1450 obj2 = val2.get_obj()
1451
1452 lock = self.Lock()
1453 val3 = self.Value('i', 5, lock=lock)
1454 lock3 = val3.get_lock()
1455 obj3 = val3.get_obj()
1456 self.assertEqual(lock, lock3)
1457
Jesse Nollerb0516a62009-01-18 03:11:38 +00001458 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001459 self.assertFalse(hasattr(arr4, 'get_lock'))
1460 self.assertFalse(hasattr(arr4, 'get_obj'))
1461
Jesse Nollerb0516a62009-01-18 03:11:38 +00001462 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1463
1464 arr5 = self.RawValue('i', 5)
1465 self.assertFalse(hasattr(arr5, 'get_lock'))
1466 self.assertFalse(hasattr(arr5, 'get_obj'))
1467
Benjamin Petersone711caf2008-06-11 16:44:04 +00001468
1469class _TestArray(BaseTestCase):
1470
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001471 ALLOWED_TYPES = ('processes',)
1472
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001473 @classmethod
1474 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001475 for i in range(1, len(seq)):
1476 seq[i] += seq[i-1]
1477
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001478 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001479 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001480 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1481 if raw:
1482 arr = self.RawArray('i', seq)
1483 else:
1484 arr = self.Array('i', seq)
1485
1486 self.assertEqual(len(arr), len(seq))
1487 self.assertEqual(arr[3], seq[3])
1488 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1489
1490 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1491
1492 self.assertEqual(list(arr[:]), seq)
1493
1494 self.f(seq)
1495
1496 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001497 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001498 p.start()
1499 p.join()
1500
1501 self.assertEqual(list(arr[:]), seq)
1502
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001503 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001504 def test_array_from_size(self):
1505 size = 10
1506 # Test for zeroing (see issue #11675).
1507 # The repetition below strengthens the test by increasing the chances
1508 # of previously allocated non-zero memory being used for the new array
1509 # on the 2nd and 3rd loops.
1510 for _ in range(3):
1511 arr = self.Array('i', size)
1512 self.assertEqual(len(arr), size)
1513 self.assertEqual(list(arr), [0] * size)
1514 arr[:] = range(10)
1515 self.assertEqual(list(arr), list(range(10)))
1516 del arr
1517
1518 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001519 def test_rawarray(self):
1520 self.test_array(raw=True)
1521
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001522 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001523 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001524 arr1 = self.Array('i', list(range(10)))
1525 lock1 = arr1.get_lock()
1526 obj1 = arr1.get_obj()
1527
1528 arr2 = self.Array('i', list(range(10)), lock=None)
1529 lock2 = arr2.get_lock()
1530 obj2 = arr2.get_obj()
1531
1532 lock = self.Lock()
1533 arr3 = self.Array('i', list(range(10)), lock=lock)
1534 lock3 = arr3.get_lock()
1535 obj3 = arr3.get_obj()
1536 self.assertEqual(lock, lock3)
1537
Jesse Nollerb0516a62009-01-18 03:11:38 +00001538 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001539 self.assertFalse(hasattr(arr4, 'get_lock'))
1540 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001541 self.assertRaises(AttributeError,
1542 self.Array, 'i', range(10), lock='notalock')
1543
1544 arr5 = self.RawArray('i', range(10))
1545 self.assertFalse(hasattr(arr5, 'get_lock'))
1546 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001547
1548#
1549#
1550#
1551
1552class _TestContainers(BaseTestCase):
1553
1554 ALLOWED_TYPES = ('manager',)
1555
1556 def test_list(self):
1557 a = self.list(list(range(10)))
1558 self.assertEqual(a[:], list(range(10)))
1559
1560 b = self.list()
1561 self.assertEqual(b[:], [])
1562
1563 b.extend(list(range(5)))
1564 self.assertEqual(b[:], list(range(5)))
1565
1566 self.assertEqual(b[2], 2)
1567 self.assertEqual(b[2:10], [2,3,4])
1568
1569 b *= 2
1570 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1571
1572 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1573
1574 self.assertEqual(a[:], list(range(10)))
1575
1576 d = [a, b]
1577 e = self.list(d)
1578 self.assertEqual(
1579 e[:],
1580 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1581 )
1582
1583 f = self.list([a])
1584 a.append('hello')
1585 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1586
1587 def test_dict(self):
1588 d = self.dict()
1589 indices = list(range(65, 70))
1590 for i in indices:
1591 d[i] = chr(i)
1592 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1593 self.assertEqual(sorted(d.keys()), indices)
1594 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1595 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1596
1597 def test_namespace(self):
1598 n = self.Namespace()
1599 n.name = 'Bob'
1600 n.job = 'Builder'
1601 n._hidden = 'hidden'
1602 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1603 del n.job
1604 self.assertEqual(str(n), "Namespace(name='Bob')")
1605 self.assertTrue(hasattr(n, 'name'))
1606 self.assertTrue(not hasattr(n, 'job'))
1607
1608#
1609#
1610#
1611
1612def sqr(x, wait=0.0):
1613 time.sleep(wait)
1614 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001615
Antoine Pitroude911b22011-12-21 11:03:24 +01001616def mul(x, y):
1617 return x*y
1618
Benjamin Petersone711caf2008-06-11 16:44:04 +00001619class _TestPool(BaseTestCase):
1620
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01001621 @classmethod
1622 def setUpClass(cls):
1623 super().setUpClass()
1624 cls.pool = cls.Pool(4)
1625
1626 @classmethod
1627 def tearDownClass(cls):
1628 cls.pool.terminate()
1629 cls.pool.join()
1630 cls.pool = None
1631 super().tearDownClass()
1632
Benjamin Petersone711caf2008-06-11 16:44:04 +00001633 def test_apply(self):
1634 papply = self.pool.apply
1635 self.assertEqual(papply(sqr, (5,)), sqr(5))
1636 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1637
1638 def test_map(self):
1639 pmap = self.pool.map
1640 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1641 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1642 list(map(sqr, list(range(100)))))
1643
Antoine Pitroude911b22011-12-21 11:03:24 +01001644 def test_starmap(self):
1645 psmap = self.pool.starmap
1646 tuples = list(zip(range(10), range(9,-1, -1)))
1647 self.assertEqual(psmap(mul, tuples),
1648 list(itertools.starmap(mul, tuples)))
1649 tuples = list(zip(range(100), range(99,-1, -1)))
1650 self.assertEqual(psmap(mul, tuples, chunksize=20),
1651 list(itertools.starmap(mul, tuples)))
1652
1653 def test_starmap_async(self):
1654 tuples = list(zip(range(100), range(99,-1, -1)))
1655 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1656 list(itertools.starmap(mul, tuples)))
1657
Hynek Schlawack254af262012-10-27 12:53:02 +02001658 def test_map_async(self):
1659 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1660 list(map(sqr, list(range(10)))))
1661
1662 def test_map_async_callbacks(self):
1663 call_args = self.manager.list() if self.TYPE == 'manager' else []
1664 self.pool.map_async(int, ['1'],
1665 callback=call_args.append,
1666 error_callback=call_args.append).wait()
1667 self.assertEqual(1, len(call_args))
1668 self.assertEqual([1], call_args[0])
1669 self.pool.map_async(int, ['a'],
1670 callback=call_args.append,
1671 error_callback=call_args.append).wait()
1672 self.assertEqual(2, len(call_args))
1673 self.assertIsInstance(call_args[1], ValueError)
1674
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001675 def test_map_chunksize(self):
1676 try:
1677 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1678 except multiprocessing.TimeoutError:
1679 self.fail("pool.map_async with chunksize stalled on null list")
1680
Benjamin Petersone711caf2008-06-11 16:44:04 +00001681 def test_async(self):
1682 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1683 get = TimingWrapper(res.get)
1684 self.assertEqual(get(), 49)
1685 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1686
1687 def test_async_timeout(self):
1688 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1689 get = TimingWrapper(res.get)
1690 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1691 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1692
1693 def test_imap(self):
1694 it = self.pool.imap(sqr, list(range(10)))
1695 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1696
1697 it = self.pool.imap(sqr, list(range(10)))
1698 for i in range(10):
1699 self.assertEqual(next(it), i*i)
1700 self.assertRaises(StopIteration, it.__next__)
1701
1702 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1703 for i in range(1000):
1704 self.assertEqual(next(it), i*i)
1705 self.assertRaises(StopIteration, it.__next__)
1706
1707 def test_imap_unordered(self):
1708 it = self.pool.imap_unordered(sqr, list(range(1000)))
1709 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1710
1711 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1712 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1713
1714 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001715 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1716 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1717
Benjamin Petersone711caf2008-06-11 16:44:04 +00001718 p = multiprocessing.Pool(3)
1719 self.assertEqual(3, len(p._pool))
1720 p.close()
1721 p.join()
1722
1723 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001724 result = self.pool.map_async(
1725 time.sleep, [0.1 for i in range(10000)], chunksize=1
1726 )
1727 self.pool.terminate()
1728 join = TimingWrapper(self.pool.join)
1729 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001730 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001731
Richard Oudkerke41682b2012-06-06 19:04:57 +01001732 def test_empty_iterable(self):
1733 # See Issue 12157
1734 p = self.Pool(1)
1735
1736 self.assertEqual(p.map(sqr, []), [])
1737 self.assertEqual(list(p.imap(sqr, [])), [])
1738 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1739 self.assertEqual(p.map_async(sqr, []).get(), [])
1740
1741 p.close()
1742 p.join()
1743
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001744 def test_context(self):
1745 if self.TYPE == 'processes':
1746 L = list(range(10))
1747 expected = [sqr(i) for i in L]
1748 with multiprocessing.Pool(2) as p:
1749 r = p.map_async(sqr, L)
1750 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001751 print(p._state)
1752 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001753
Ask Solem2afcbf22010-11-09 20:55:52 +00001754def raising():
1755 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001756
Ask Solem2afcbf22010-11-09 20:55:52 +00001757def unpickleable_result():
1758 return lambda: 42
1759
1760class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001761 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001762
1763 def test_async_error_callback(self):
1764 p = multiprocessing.Pool(2)
1765
1766 scratchpad = [None]
1767 def errback(exc):
1768 scratchpad[0] = exc
1769
1770 res = p.apply_async(raising, error_callback=errback)
1771 self.assertRaises(KeyError, res.get)
1772 self.assertTrue(scratchpad[0])
1773 self.assertIsInstance(scratchpad[0], KeyError)
1774
1775 p.close()
1776 p.join()
1777
1778 def test_unpickleable_result(self):
1779 from multiprocessing.pool import MaybeEncodingError
1780 p = multiprocessing.Pool(2)
1781
1782 # Make sure we don't lose pool processes because of encoding errors.
1783 for iteration in range(20):
1784
1785 scratchpad = [None]
1786 def errback(exc):
1787 scratchpad[0] = exc
1788
1789 res = p.apply_async(unpickleable_result, error_callback=errback)
1790 self.assertRaises(MaybeEncodingError, res.get)
1791 wrapped = scratchpad[0]
1792 self.assertTrue(wrapped)
1793 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1794 self.assertIsNotNone(wrapped.exc)
1795 self.assertIsNotNone(wrapped.value)
1796
1797 p.close()
1798 p.join()
1799
1800class _TestPoolWorkerLifetime(BaseTestCase):
1801 ALLOWED_TYPES = ('processes', )
1802
Jesse Noller1f0b6582010-01-27 03:36:01 +00001803 def test_pool_worker_lifetime(self):
1804 p = multiprocessing.Pool(3, maxtasksperchild=10)
1805 self.assertEqual(3, len(p._pool))
1806 origworkerpids = [w.pid for w in p._pool]
1807 # Run many tasks so each worker gets replaced (hopefully)
1808 results = []
1809 for i in range(100):
1810 results.append(p.apply_async(sqr, (i, )))
1811 # Fetch the results and verify we got the right answers,
1812 # also ensuring all the tasks have completed.
1813 for (j, res) in enumerate(results):
1814 self.assertEqual(res.get(), sqr(j))
1815 # Refill the pool
1816 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001817 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001818 # (countdown * DELTA = 5 seconds max startup process time)
1819 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001820 while countdown and not all(w.is_alive() for w in p._pool):
1821 countdown -= 1
1822 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001823 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001824 # All pids should be assigned. See issue #7805.
1825 self.assertNotIn(None, origworkerpids)
1826 self.assertNotIn(None, finalworkerpids)
1827 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001828 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1829 p.close()
1830 p.join()
1831
Charles-François Natalif8859e12011-10-24 18:45:29 +02001832 def test_pool_worker_lifetime_early_close(self):
1833 # Issue #10332: closing a pool whose workers have limited lifetimes
1834 # before all the tasks completed would make join() hang.
1835 p = multiprocessing.Pool(3, maxtasksperchild=1)
1836 results = []
1837 for i in range(6):
1838 results.append(p.apply_async(sqr, (i, 0.3)))
1839 p.close()
1840 p.join()
1841 # check the results
1842 for (j, res) in enumerate(results):
1843 self.assertEqual(res.get(), sqr(j))
1844
Benjamin Petersone711caf2008-06-11 16:44:04 +00001845#
1846# Test of creating a customized manager class
1847#
1848
1849from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1850
1851class FooBar(object):
1852 def f(self):
1853 return 'f()'
1854 def g(self):
1855 raise ValueError
1856 def _h(self):
1857 return '_h()'
1858
1859def baz():
1860 for i in range(10):
1861 yield i*i
1862
1863class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001864 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001865 def __iter__(self):
1866 return self
1867 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001868 return self._callmethod('__next__')
1869
1870class MyManager(BaseManager):
1871 pass
1872
1873MyManager.register('Foo', callable=FooBar)
1874MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1875MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1876
1877
1878class _TestMyManager(BaseTestCase):
1879
1880 ALLOWED_TYPES = ('manager',)
1881
1882 def test_mymanager(self):
1883 manager = MyManager()
1884 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001885 self.common(manager)
1886 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001887
Richard Oudkerkac385712012-06-18 21:29:30 +01001888 # If the manager process exited cleanly then the exitcode
1889 # will be zero. Otherwise (after a short timeout)
1890 # terminate() is used, resulting in an exitcode of -SIGTERM.
1891 self.assertEqual(manager._process.exitcode, 0)
1892
1893 def test_mymanager_context(self):
1894 with MyManager() as manager:
1895 self.common(manager)
1896 self.assertEqual(manager._process.exitcode, 0)
1897
1898 def test_mymanager_context_prestarted(self):
1899 manager = MyManager()
1900 manager.start()
1901 with manager:
1902 self.common(manager)
1903 self.assertEqual(manager._process.exitcode, 0)
1904
1905 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001906 foo = manager.Foo()
1907 bar = manager.Bar()
1908 baz = manager.baz()
1909
1910 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1911 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1912
1913 self.assertEqual(foo_methods, ['f', 'g'])
1914 self.assertEqual(bar_methods, ['f', '_h'])
1915
1916 self.assertEqual(foo.f(), 'f()')
1917 self.assertRaises(ValueError, foo.g)
1918 self.assertEqual(foo._callmethod('f'), 'f()')
1919 self.assertRaises(RemoteError, foo._callmethod, '_h')
1920
1921 self.assertEqual(bar.f(), 'f()')
1922 self.assertEqual(bar._h(), '_h()')
1923 self.assertEqual(bar._callmethod('f'), 'f()')
1924 self.assertEqual(bar._callmethod('_h'), '_h()')
1925
1926 self.assertEqual(list(baz), [i*i for i in range(10)])
1927
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001928
Benjamin Petersone711caf2008-06-11 16:44:04 +00001929#
1930# Test of connecting to a remote server and using xmlrpclib for serialization
1931#
1932
1933_queue = pyqueue.Queue()
1934def get_queue():
1935 return _queue
1936
1937class QueueManager(BaseManager):
1938 '''manager class used by server process'''
1939QueueManager.register('get_queue', callable=get_queue)
1940
1941class QueueManager2(BaseManager):
1942 '''manager class which specifies the same interface as QueueManager'''
1943QueueManager2.register('get_queue')
1944
1945
1946SERIALIZER = 'xmlrpclib'
1947
1948class _TestRemoteManager(BaseTestCase):
1949
1950 ALLOWED_TYPES = ('manager',)
1951
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001952 @classmethod
1953 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001954 manager = QueueManager2(
1955 address=address, authkey=authkey, serializer=SERIALIZER
1956 )
1957 manager.connect()
1958 queue = manager.get_queue()
1959 queue.put(('hello world', None, True, 2.25))
1960
1961 def test_remote(self):
1962 authkey = os.urandom(32)
1963
1964 manager = QueueManager(
1965 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1966 )
1967 manager.start()
1968
1969 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001970 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001971 p.start()
1972
1973 manager2 = QueueManager2(
1974 address=manager.address, authkey=authkey, serializer=SERIALIZER
1975 )
1976 manager2.connect()
1977 queue = manager2.get_queue()
1978
1979 # Note that xmlrpclib will deserialize object as a list not a tuple
1980 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1981
1982 # Because we are using xmlrpclib for serialization instead of
1983 # pickle this will cause a serialization error.
1984 self.assertRaises(Exception, queue.put, time.sleep)
1985
1986 # Make queue finalizer run before the server is stopped
1987 del queue
1988 manager.shutdown()
1989
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001990class _TestManagerRestart(BaseTestCase):
1991
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001992 @classmethod
1993 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001994 manager = QueueManager(
1995 address=address, authkey=authkey, serializer=SERIALIZER)
1996 manager.connect()
1997 queue = manager.get_queue()
1998 queue.put('hello world')
1999
2000 def test_rapid_restart(self):
2001 authkey = os.urandom(32)
2002 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002003 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00002004 srvr = manager.get_server()
2005 addr = srvr.address
2006 # Close the connection.Listener socket which gets opened as a part
2007 # of manager.get_server(). It's not needed for the test.
2008 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002009 manager.start()
2010
2011 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02002012 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002013 p.start()
2014 queue = manager.get_queue()
2015 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00002016 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002017 manager.shutdown()
2018 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002019 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002020 try:
2021 manager.start()
2022 except IOError as e:
2023 if e.errno != errno.EADDRINUSE:
2024 raise
2025 # Retry after some time, in case the old socket was lingering
2026 # (sporadic failure on buildbots)
2027 time.sleep(1.0)
2028 manager = QueueManager(
2029 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002030 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002031
Benjamin Petersone711caf2008-06-11 16:44:04 +00002032#
2033#
2034#
2035
2036SENTINEL = latin('')
2037
2038class _TestConnection(BaseTestCase):
2039
2040 ALLOWED_TYPES = ('processes', 'threads')
2041
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002042 @classmethod
2043 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002044 for msg in iter(conn.recv_bytes, SENTINEL):
2045 conn.send_bytes(msg)
2046 conn.close()
2047
2048 def test_connection(self):
2049 conn, child_conn = self.Pipe()
2050
2051 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002052 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002053 p.start()
2054
2055 seq = [1, 2.25, None]
2056 msg = latin('hello world')
2057 longmsg = msg * 10
2058 arr = array.array('i', list(range(4)))
2059
2060 if self.TYPE == 'processes':
2061 self.assertEqual(type(conn.fileno()), int)
2062
2063 self.assertEqual(conn.send(seq), None)
2064 self.assertEqual(conn.recv(), seq)
2065
2066 self.assertEqual(conn.send_bytes(msg), None)
2067 self.assertEqual(conn.recv_bytes(), msg)
2068
2069 if self.TYPE == 'processes':
2070 buffer = array.array('i', [0]*10)
2071 expected = list(arr) + [0] * (10 - len(arr))
2072 self.assertEqual(conn.send_bytes(arr), None)
2073 self.assertEqual(conn.recv_bytes_into(buffer),
2074 len(arr) * buffer.itemsize)
2075 self.assertEqual(list(buffer), expected)
2076
2077 buffer = array.array('i', [0]*10)
2078 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2079 self.assertEqual(conn.send_bytes(arr), None)
2080 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2081 len(arr) * buffer.itemsize)
2082 self.assertEqual(list(buffer), expected)
2083
2084 buffer = bytearray(latin(' ' * 40))
2085 self.assertEqual(conn.send_bytes(longmsg), None)
2086 try:
2087 res = conn.recv_bytes_into(buffer)
2088 except multiprocessing.BufferTooShort as e:
2089 self.assertEqual(e.args, (longmsg,))
2090 else:
2091 self.fail('expected BufferTooShort, got %s' % res)
2092
2093 poll = TimingWrapper(conn.poll)
2094
2095 self.assertEqual(poll(), False)
2096 self.assertTimingAlmostEqual(poll.elapsed, 0)
2097
Richard Oudkerk59d54042012-05-10 16:11:12 +01002098 self.assertEqual(poll(-1), False)
2099 self.assertTimingAlmostEqual(poll.elapsed, 0)
2100
Benjamin Petersone711caf2008-06-11 16:44:04 +00002101 self.assertEqual(poll(TIMEOUT1), False)
2102 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2103
2104 conn.send(None)
2105
2106 self.assertEqual(poll(TIMEOUT1), True)
2107 self.assertTimingAlmostEqual(poll.elapsed, 0)
2108
2109 self.assertEqual(conn.recv(), None)
2110
2111 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2112 conn.send_bytes(really_big_msg)
2113 self.assertEqual(conn.recv_bytes(), really_big_msg)
2114
2115 conn.send_bytes(SENTINEL) # tell child to quit
2116 child_conn.close()
2117
2118 if self.TYPE == 'processes':
2119 self.assertEqual(conn.readable, True)
2120 self.assertEqual(conn.writable, True)
2121 self.assertRaises(EOFError, conn.recv)
2122 self.assertRaises(EOFError, conn.recv_bytes)
2123
2124 p.join()
2125
2126 def test_duplex_false(self):
2127 reader, writer = self.Pipe(duplex=False)
2128 self.assertEqual(writer.send(1), None)
2129 self.assertEqual(reader.recv(), 1)
2130 if self.TYPE == 'processes':
2131 self.assertEqual(reader.readable, True)
2132 self.assertEqual(reader.writable, False)
2133 self.assertEqual(writer.readable, False)
2134 self.assertEqual(writer.writable, True)
2135 self.assertRaises(IOError, reader.send, 2)
2136 self.assertRaises(IOError, writer.recv)
2137 self.assertRaises(IOError, writer.poll)
2138
2139 def test_spawn_close(self):
2140 # We test that a pipe connection can be closed by parent
2141 # process immediately after child is spawned. On Windows this
2142 # would have sometimes failed on old versions because
2143 # child_conn would be closed before the child got a chance to
2144 # duplicate it.
2145 conn, child_conn = self.Pipe()
2146
2147 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002148 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002149 p.start()
2150 child_conn.close() # this might complete before child initializes
2151
2152 msg = latin('hello')
2153 conn.send_bytes(msg)
2154 self.assertEqual(conn.recv_bytes(), msg)
2155
2156 conn.send_bytes(SENTINEL)
2157 conn.close()
2158 p.join()
2159
2160 def test_sendbytes(self):
2161 if self.TYPE != 'processes':
2162 return
2163
2164 msg = latin('abcdefghijklmnopqrstuvwxyz')
2165 a, b = self.Pipe()
2166
2167 a.send_bytes(msg)
2168 self.assertEqual(b.recv_bytes(), msg)
2169
2170 a.send_bytes(msg, 5)
2171 self.assertEqual(b.recv_bytes(), msg[5:])
2172
2173 a.send_bytes(msg, 7, 8)
2174 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2175
2176 a.send_bytes(msg, 26)
2177 self.assertEqual(b.recv_bytes(), latin(''))
2178
2179 a.send_bytes(msg, 26, 0)
2180 self.assertEqual(b.recv_bytes(), latin(''))
2181
2182 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2183
2184 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2185
2186 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2187
2188 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2189
2190 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2191
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002192 @classmethod
2193 def _is_fd_assigned(cls, fd):
2194 try:
2195 os.fstat(fd)
2196 except OSError as e:
2197 if e.errno == errno.EBADF:
2198 return False
2199 raise
2200 else:
2201 return True
2202
2203 @classmethod
2204 def _writefd(cls, conn, data, create_dummy_fds=False):
2205 if create_dummy_fds:
2206 for i in range(0, 256):
2207 if not cls._is_fd_assigned(i):
2208 os.dup2(conn.fileno(), i)
2209 fd = reduction.recv_handle(conn)
2210 if msvcrt:
2211 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2212 os.write(fd, data)
2213 os.close(fd)
2214
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002215 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002216 def test_fd_transfer(self):
2217 if self.TYPE != 'processes':
2218 self.skipTest("only makes sense with processes")
2219 conn, child_conn = self.Pipe(duplex=True)
2220
2221 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002222 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002223 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002224 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002225 with open(test.support.TESTFN, "wb") as f:
2226 fd = f.fileno()
2227 if msvcrt:
2228 fd = msvcrt.get_osfhandle(fd)
2229 reduction.send_handle(conn, fd, p.pid)
2230 p.join()
2231 with open(test.support.TESTFN, "rb") as f:
2232 self.assertEqual(f.read(), b"foo")
2233
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002234 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002235 @unittest.skipIf(sys.platform == "win32",
2236 "test semantics don't make sense on Windows")
2237 @unittest.skipIf(MAXFD <= 256,
2238 "largest assignable fd number is too small")
2239 @unittest.skipUnless(hasattr(os, "dup2"),
2240 "test needs os.dup2()")
2241 def test_large_fd_transfer(self):
2242 # With fd > 256 (issue #11657)
2243 if self.TYPE != 'processes':
2244 self.skipTest("only makes sense with processes")
2245 conn, child_conn = self.Pipe(duplex=True)
2246
2247 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002248 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002249 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002250 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002251 with open(test.support.TESTFN, "wb") as f:
2252 fd = f.fileno()
2253 for newfd in range(256, MAXFD):
2254 if not self._is_fd_assigned(newfd):
2255 break
2256 else:
2257 self.fail("could not find an unassigned large file descriptor")
2258 os.dup2(fd, newfd)
2259 try:
2260 reduction.send_handle(conn, newfd, p.pid)
2261 finally:
2262 os.close(newfd)
2263 p.join()
2264 with open(test.support.TESTFN, "rb") as f:
2265 self.assertEqual(f.read(), b"bar")
2266
Jesus Cea4507e642011-09-21 03:53:25 +02002267 @classmethod
2268 def _send_data_without_fd(self, conn):
2269 os.write(conn.fileno(), b"\0")
2270
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002271 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002272 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2273 def test_missing_fd_transfer(self):
2274 # Check that exception is raised when received data is not
2275 # accompanied by a file descriptor in ancillary data.
2276 if self.TYPE != 'processes':
2277 self.skipTest("only makes sense with processes")
2278 conn, child_conn = self.Pipe(duplex=True)
2279
2280 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2281 p.daemon = True
2282 p.start()
2283 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2284 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002285
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002286 def test_context(self):
2287 a, b = self.Pipe()
2288
2289 with a, b:
2290 a.send(1729)
2291 self.assertEqual(b.recv(), 1729)
2292 if self.TYPE == 'processes':
2293 self.assertFalse(a.closed)
2294 self.assertFalse(b.closed)
2295
2296 if self.TYPE == 'processes':
2297 self.assertTrue(a.closed)
2298 self.assertTrue(b.closed)
2299 self.assertRaises(IOError, a.recv)
2300 self.assertRaises(IOError, b.recv)
2301
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002302class _TestListener(BaseTestCase):
2303
Richard Oudkerk91257752012-06-15 21:53:34 +01002304 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002305
2306 def test_multiple_bind(self):
2307 for family in self.connection.families:
2308 l = self.connection.Listener(family=family)
2309 self.addCleanup(l.close)
2310 self.assertRaises(OSError, self.connection.Listener,
2311 l.address, family)
2312
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002313 def test_context(self):
2314 with self.connection.Listener() as l:
2315 with self.connection.Client(l.address) as c:
2316 with l.accept() as d:
2317 c.send(1729)
2318 self.assertEqual(d.recv(), 1729)
2319
2320 if self.TYPE == 'processes':
2321 self.assertRaises(IOError, l.accept)
2322
Benjamin Petersone711caf2008-06-11 16:44:04 +00002323class _TestListenerClient(BaseTestCase):
2324
2325 ALLOWED_TYPES = ('processes', 'threads')
2326
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002327 @classmethod
2328 def _test(cls, address):
2329 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002330 conn.send('hello')
2331 conn.close()
2332
2333 def test_listener_client(self):
2334 for family in self.connection.families:
2335 l = self.connection.Listener(family=family)
2336 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002337 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002338 p.start()
2339 conn = l.accept()
2340 self.assertEqual(conn.recv(), 'hello')
2341 p.join()
2342 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002343
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002344 def test_issue14725(self):
2345 l = self.connection.Listener()
2346 p = self.Process(target=self._test, args=(l.address,))
2347 p.daemon = True
2348 p.start()
2349 time.sleep(1)
2350 # On Windows the client process should by now have connected,
2351 # written data and closed the pipe handle by now. This causes
2352 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2353 # 14725.
2354 conn = l.accept()
2355 self.assertEqual(conn.recv(), 'hello')
2356 conn.close()
2357 p.join()
2358 l.close()
2359
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002360class _TestPoll(unittest.TestCase):
2361
2362 ALLOWED_TYPES = ('processes', 'threads')
2363
2364 def test_empty_string(self):
2365 a, b = self.Pipe()
2366 self.assertEqual(a.poll(), False)
2367 b.send_bytes(b'')
2368 self.assertEqual(a.poll(), True)
2369 self.assertEqual(a.poll(), True)
2370
2371 @classmethod
2372 def _child_strings(cls, conn, strings):
2373 for s in strings:
2374 time.sleep(0.1)
2375 conn.send_bytes(s)
2376 conn.close()
2377
2378 def test_strings(self):
2379 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2380 a, b = self.Pipe()
2381 p = self.Process(target=self._child_strings, args=(b, strings))
2382 p.start()
2383
2384 for s in strings:
2385 for i in range(200):
2386 if a.poll(0.01):
2387 break
2388 x = a.recv_bytes()
2389 self.assertEqual(s, x)
2390
2391 p.join()
2392
2393 @classmethod
2394 def _child_boundaries(cls, r):
2395 # Polling may "pull" a message in to the child process, but we
2396 # don't want it to pull only part of a message, as that would
2397 # corrupt the pipe for any other processes which might later
2398 # read from it.
2399 r.poll(5)
2400
2401 def test_boundaries(self):
2402 r, w = self.Pipe(False)
2403 p = self.Process(target=self._child_boundaries, args=(r,))
2404 p.start()
2405 time.sleep(2)
2406 L = [b"first", b"second"]
2407 for obj in L:
2408 w.send_bytes(obj)
2409 w.close()
2410 p.join()
2411 self.assertIn(r.recv_bytes(), L)
2412
2413 @classmethod
2414 def _child_dont_merge(cls, b):
2415 b.send_bytes(b'a')
2416 b.send_bytes(b'b')
2417 b.send_bytes(b'cd')
2418
2419 def test_dont_merge(self):
2420 a, b = self.Pipe()
2421 self.assertEqual(a.poll(0.0), False)
2422 self.assertEqual(a.poll(0.1), False)
2423
2424 p = self.Process(target=self._child_dont_merge, args=(b,))
2425 p.start()
2426
2427 self.assertEqual(a.recv_bytes(), b'a')
2428 self.assertEqual(a.poll(1.0), True)
2429 self.assertEqual(a.poll(1.0), True)
2430 self.assertEqual(a.recv_bytes(), b'b')
2431 self.assertEqual(a.poll(1.0), True)
2432 self.assertEqual(a.poll(1.0), True)
2433 self.assertEqual(a.poll(0.0), True)
2434 self.assertEqual(a.recv_bytes(), b'cd')
2435
2436 p.join()
2437
Benjamin Petersone711caf2008-06-11 16:44:04 +00002438#
2439# Test of sending connection and socket objects between processes
2440#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002441
2442@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002443class _TestPicklingConnections(BaseTestCase):
2444
2445 ALLOWED_TYPES = ('processes',)
2446
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002447 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002448 def tearDownClass(cls):
2449 from multiprocessing.reduction import resource_sharer
2450 resource_sharer.stop(timeout=5)
2451
2452 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002453 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002454 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002455 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002456 conn.send(l.address)
2457 new_conn = l.accept()
2458 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002459 new_conn.close()
2460 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002461
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002462 l = socket.socket()
2463 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002464 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002465 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002466 new_conn, addr = l.accept()
2467 conn.send(new_conn)
2468 new_conn.close()
2469 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002470
2471 conn.recv()
2472
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002473 @classmethod
2474 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002475 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002476 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002477 client.send(msg.upper())
2478 client.close()
2479
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002480 address, msg = conn.recv()
2481 client = socket.socket()
2482 client.connect(address)
2483 client.sendall(msg.upper())
2484 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002485
2486 conn.close()
2487
2488 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002489 families = self.connection.families
2490
2491 lconn, lconn0 = self.Pipe()
2492 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002493 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002494 lp.start()
2495 lconn0.close()
2496
2497 rconn, rconn0 = self.Pipe()
2498 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002499 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002500 rp.start()
2501 rconn0.close()
2502
2503 for fam in families:
2504 msg = ('This connection uses family %s' % fam).encode('ascii')
2505 address = lconn.recv()
2506 rconn.send((address, msg))
2507 new_conn = lconn.recv()
2508 self.assertEqual(new_conn.recv(), msg.upper())
2509
2510 rconn.send(None)
2511
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002512 msg = latin('This connection uses a normal socket')
2513 address = lconn.recv()
2514 rconn.send((address, msg))
2515 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002516 buf = []
2517 while True:
2518 s = new_conn.recv(100)
2519 if not s:
2520 break
2521 buf.append(s)
2522 buf = b''.join(buf)
2523 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002524 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002525
2526 lconn.send(None)
2527
2528 rconn.close()
2529 lconn.close()
2530
2531 lp.join()
2532 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002533
2534 @classmethod
2535 def child_access(cls, conn):
2536 w = conn.recv()
2537 w.send('all is well')
2538 w.close()
2539
2540 r = conn.recv()
2541 msg = r.recv()
2542 conn.send(msg*2)
2543
2544 conn.close()
2545
2546 def test_access(self):
2547 # On Windows, if we do not specify a destination pid when
2548 # using DupHandle then we need to be careful to use the
2549 # correct access flags for DuplicateHandle(), or else
2550 # DupHandle.detach() will raise PermissionError. For example,
2551 # for a read only pipe handle we should use
2552 # access=FILE_GENERIC_READ. (Unfortunately
2553 # DUPLICATE_SAME_ACCESS does not work.)
2554 conn, child_conn = self.Pipe()
2555 p = self.Process(target=self.child_access, args=(child_conn,))
2556 p.daemon = True
2557 p.start()
2558 child_conn.close()
2559
2560 r, w = self.Pipe(duplex=False)
2561 conn.send(w)
2562 w.close()
2563 self.assertEqual(r.recv(), 'all is well')
2564 r.close()
2565
2566 r, w = self.Pipe(duplex=False)
2567 conn.send(r)
2568 r.close()
2569 w.send('foobar')
2570 w.close()
2571 self.assertEqual(conn.recv(), 'foobar'*2)
2572
Benjamin Petersone711caf2008-06-11 16:44:04 +00002573#
2574#
2575#
2576
2577class _TestHeap(BaseTestCase):
2578
2579 ALLOWED_TYPES = ('processes',)
2580
2581 def test_heap(self):
2582 iterations = 5000
2583 maxblocks = 50
2584 blocks = []
2585
2586 # create and destroy lots of blocks of different sizes
2587 for i in range(iterations):
2588 size = int(random.lognormvariate(0, 1) * 1000)
2589 b = multiprocessing.heap.BufferWrapper(size)
2590 blocks.append(b)
2591 if len(blocks) > maxblocks:
2592 i = random.randrange(maxblocks)
2593 del blocks[i]
2594
2595 # get the heap object
2596 heap = multiprocessing.heap.BufferWrapper._heap
2597
2598 # verify the state of the heap
2599 all = []
2600 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002601 heap._lock.acquire()
2602 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002603 for L in list(heap._len_to_seq.values()):
2604 for arena, start, stop in L:
2605 all.append((heap._arenas.index(arena), start, stop,
2606 stop-start, 'free'))
2607 for arena, start, stop in heap._allocated_blocks:
2608 all.append((heap._arenas.index(arena), start, stop,
2609 stop-start, 'occupied'))
2610 occupied += (stop-start)
2611
2612 all.sort()
2613
2614 for i in range(len(all)-1):
2615 (arena, start, stop) = all[i][:3]
2616 (narena, nstart, nstop) = all[i+1][:3]
2617 self.assertTrue((arena != narena and nstart == 0) or
2618 (stop == nstart))
2619
Charles-François Natali778db492011-07-02 14:35:49 +02002620 def test_free_from_gc(self):
2621 # Check that freeing of blocks by the garbage collector doesn't deadlock
2622 # (issue #12352).
2623 # Make sure the GC is enabled, and set lower collection thresholds to
2624 # make collections more frequent (and increase the probability of
2625 # deadlock).
2626 if not gc.isenabled():
2627 gc.enable()
2628 self.addCleanup(gc.disable)
2629 thresholds = gc.get_threshold()
2630 self.addCleanup(gc.set_threshold, *thresholds)
2631 gc.set_threshold(10)
2632
2633 # perform numerous block allocations, with cyclic references to make
2634 # sure objects are collected asynchronously by the gc
2635 for i in range(5000):
2636 a = multiprocessing.heap.BufferWrapper(1)
2637 b = multiprocessing.heap.BufferWrapper(1)
2638 # circular references
2639 a.buddy = b
2640 b.buddy = a
2641
Benjamin Petersone711caf2008-06-11 16:44:04 +00002642#
2643#
2644#
2645
Benjamin Petersone711caf2008-06-11 16:44:04 +00002646class _Foo(Structure):
2647 _fields_ = [
2648 ('x', c_int),
2649 ('y', c_double)
2650 ]
2651
2652class _TestSharedCTypes(BaseTestCase):
2653
2654 ALLOWED_TYPES = ('processes',)
2655
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002656 def setUp(self):
2657 if not HAS_SHAREDCTYPES:
2658 self.skipTest("requires multiprocessing.sharedctypes")
2659
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002660 @classmethod
2661 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002662 x.value *= 2
2663 y.value *= 2
2664 foo.x *= 2
2665 foo.y *= 2
2666 string.value *= 2
2667 for i in range(len(arr)):
2668 arr[i] *= 2
2669
2670 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002671 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002672 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002673 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002674 arr = self.Array('d', list(range(10)), lock=lock)
2675 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002676 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002677
2678 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002679 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002680 p.start()
2681 p.join()
2682
2683 self.assertEqual(x.value, 14)
2684 self.assertAlmostEqual(y.value, 2.0/3.0)
2685 self.assertEqual(foo.x, 6)
2686 self.assertAlmostEqual(foo.y, 4.0)
2687 for i in range(10):
2688 self.assertAlmostEqual(arr[i], i*2)
2689 self.assertEqual(string.value, latin('hellohello'))
2690
2691 def test_synchronize(self):
2692 self.test_sharedctypes(lock=True)
2693
2694 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002695 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002696 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002697 foo.x = 0
2698 foo.y = 0
2699 self.assertEqual(bar.x, 2)
2700 self.assertAlmostEqual(bar.y, 5.0)
2701
2702#
2703#
2704#
2705
2706class _TestFinalize(BaseTestCase):
2707
2708 ALLOWED_TYPES = ('processes',)
2709
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002710 @classmethod
2711 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002712 class Foo(object):
2713 pass
2714
2715 a = Foo()
2716 util.Finalize(a, conn.send, args=('a',))
2717 del a # triggers callback for a
2718
2719 b = Foo()
2720 close_b = util.Finalize(b, conn.send, args=('b',))
2721 close_b() # triggers callback for b
2722 close_b() # does nothing because callback has already been called
2723 del b # does nothing because callback has already been called
2724
2725 c = Foo()
2726 util.Finalize(c, conn.send, args=('c',))
2727
2728 d10 = Foo()
2729 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2730
2731 d01 = Foo()
2732 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2733 d02 = Foo()
2734 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2735 d03 = Foo()
2736 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2737
2738 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2739
2740 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2741
Ezio Melotti13925002011-03-16 11:05:33 +02002742 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002743 # garbage collecting locals
2744 util._exit_function()
2745 conn.close()
2746 os._exit(0)
2747
2748 def test_finalize(self):
2749 conn, child_conn = self.Pipe()
2750
2751 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002752 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002753 p.start()
2754 p.join()
2755
2756 result = [obj for obj in iter(conn.recv, 'STOP')]
2757 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2758
2759#
2760# Test that from ... import * works for each module
2761#
2762
2763class _TestImportStar(BaseTestCase):
2764
2765 ALLOWED_TYPES = ('processes',)
2766
2767 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002768 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002769 'multiprocessing', 'multiprocessing.connection',
2770 'multiprocessing.heap', 'multiprocessing.managers',
2771 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002772 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002773 ]
2774
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002775 if HAS_REDUCTION:
2776 modules.append('multiprocessing.reduction')
2777
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002778 if c_int is not None:
2779 # This module requires _ctypes
2780 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002781
2782 for name in modules:
2783 __import__(name)
2784 mod = sys.modules[name]
2785
2786 for attr in getattr(mod, '__all__', ()):
2787 self.assertTrue(
2788 hasattr(mod, attr),
2789 '%r does not have attribute %r' % (mod, attr)
2790 )
2791
2792#
2793# Quick test that logging works -- does not test logging output
2794#
2795
2796class _TestLogging(BaseTestCase):
2797
2798 ALLOWED_TYPES = ('processes',)
2799
2800 def test_enable_logging(self):
2801 logger = multiprocessing.get_logger()
2802 logger.setLevel(util.SUBWARNING)
2803 self.assertTrue(logger is not None)
2804 logger.debug('this will not be printed')
2805 logger.info('nor will this')
2806 logger.setLevel(LOG_LEVEL)
2807
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002808 @classmethod
2809 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002810 logger = multiprocessing.get_logger()
2811 conn.send(logger.getEffectiveLevel())
2812
2813 def test_level(self):
2814 LEVEL1 = 32
2815 LEVEL2 = 37
2816
2817 logger = multiprocessing.get_logger()
2818 root_logger = logging.getLogger()
2819 root_level = root_logger.level
2820
2821 reader, writer = multiprocessing.Pipe(duplex=False)
2822
2823 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002824 p = self.Process(target=self._test_level, args=(writer,))
2825 p.daemon = True
2826 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002827 self.assertEqual(LEVEL1, reader.recv())
2828
2829 logger.setLevel(logging.NOTSET)
2830 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002831 p = self.Process(target=self._test_level, args=(writer,))
2832 p.daemon = True
2833 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002834 self.assertEqual(LEVEL2, reader.recv())
2835
2836 root_logger.setLevel(root_level)
2837 logger.setLevel(level=LOG_LEVEL)
2838
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002839
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002840# class _TestLoggingProcessName(BaseTestCase):
2841#
2842# def handle(self, record):
2843# assert record.processName == multiprocessing.current_process().name
2844# self.__handled = True
2845#
2846# def test_logging(self):
2847# handler = logging.Handler()
2848# handler.handle = self.handle
2849# self.__handled = False
2850# # Bypass getLogger() and side-effects
2851# logger = logging.getLoggerClass()(
2852# 'multiprocessing.test.TestLoggingProcessName')
2853# logger.addHandler(handler)
2854# logger.propagate = False
2855#
2856# logger.warn('foo')
2857# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002858
Benjamin Petersone711caf2008-06-11 16:44:04 +00002859#
Jesse Noller6214edd2009-01-19 16:23:53 +00002860# Test to verify handle verification, see issue 3321
2861#
2862
2863class TestInvalidHandle(unittest.TestCase):
2864
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002865 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002866 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002867 conn = multiprocessing.connection.Connection(44977608)
2868 try:
2869 self.assertRaises((ValueError, IOError), conn.poll)
2870 finally:
2871 # Hack private attribute _handle to avoid printing an error
2872 # in conn.__del__
2873 conn._handle = None
2874 self.assertRaises((ValueError, IOError),
2875 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002876
Jesse Noller6214edd2009-01-19 16:23:53 +00002877#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002878# Functions used to create test cases from the base ones in this module
2879#
2880
Benjamin Petersone711caf2008-06-11 16:44:04 +00002881def create_test_cases(Mixin, type):
2882 result = {}
2883 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002884 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002885 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002886
2887 for name in list(glob.keys()):
2888 if name.startswith('_Test'):
2889 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002890 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002891 if type in base.ALLOWED_TYPES:
2892 newname = 'With' + Type + name[1:]
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002893 class Temp(base, Mixin, unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002894 pass
2895 result[newname] = Temp
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002896 Temp.__name__ = Temp.__qualname__ = newname
Benjamin Petersone711caf2008-06-11 16:44:04 +00002897 Temp.__module__ = Mixin.__module__
2898 return result
2899
2900#
2901# Create test cases
2902#
2903
2904class ProcessesMixin(object):
2905 TYPE = 'processes'
2906 Process = multiprocessing.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002907 connection = multiprocessing.connection
2908 current_process = staticmethod(multiprocessing.current_process)
2909 active_children = staticmethod(multiprocessing.active_children)
2910 Pool = staticmethod(multiprocessing.Pool)
2911 Pipe = staticmethod(multiprocessing.Pipe)
2912 Queue = staticmethod(multiprocessing.Queue)
2913 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2914 Lock = staticmethod(multiprocessing.Lock)
2915 RLock = staticmethod(multiprocessing.RLock)
2916 Semaphore = staticmethod(multiprocessing.Semaphore)
2917 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2918 Condition = staticmethod(multiprocessing.Condition)
2919 Event = staticmethod(multiprocessing.Event)
2920 Barrier = staticmethod(multiprocessing.Barrier)
2921 Value = staticmethod(multiprocessing.Value)
2922 Array = staticmethod(multiprocessing.Array)
2923 RawValue = staticmethod(multiprocessing.RawValue)
2924 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002925
2926testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2927globals().update(testcases_processes)
2928
2929
2930class ManagerMixin(object):
2931 TYPE = 'manager'
2932 Process = multiprocessing.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002933 Queue = property(operator.attrgetter('manager.Queue'))
2934 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
2935 Lock = property(operator.attrgetter('manager.Lock'))
2936 RLock = property(operator.attrgetter('manager.RLock'))
2937 Semaphore = property(operator.attrgetter('manager.Semaphore'))
2938 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
2939 Condition = property(operator.attrgetter('manager.Condition'))
2940 Event = property(operator.attrgetter('manager.Event'))
2941 Barrier = property(operator.attrgetter('manager.Barrier'))
2942 Value = property(operator.attrgetter('manager.Value'))
2943 Array = property(operator.attrgetter('manager.Array'))
2944 list = property(operator.attrgetter('manager.list'))
2945 dict = property(operator.attrgetter('manager.dict'))
2946 Namespace = property(operator.attrgetter('manager.Namespace'))
2947
2948 @classmethod
2949 def Pool(cls, *args, **kwds):
2950 return cls.manager.Pool(*args, **kwds)
2951
2952 @classmethod
2953 def setUpClass(cls):
2954 cls.manager = multiprocessing.Manager()
2955
2956 @classmethod
2957 def tearDownClass(cls):
2958 multiprocessing.active_children() # discard dead process objs
2959 gc.collect() # do garbage collection
2960 if cls.manager._number_of_objects() != 0:
2961 # This is not really an error since some tests do not
2962 # ensure that all processes which hold a reference to a
2963 # managed object have been joined.
2964 print('Shared objects which still exist at manager shutdown:')
2965 print(cls.manager._debug_info())
2966 cls.manager.shutdown()
2967 cls.manager.join()
2968 cls.manager = None
Benjamin Petersone711caf2008-06-11 16:44:04 +00002969
2970testcases_manager = create_test_cases(ManagerMixin, type='manager')
2971globals().update(testcases_manager)
2972
2973
2974class ThreadsMixin(object):
2975 TYPE = 'threads'
2976 Process = multiprocessing.dummy.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002977 connection = multiprocessing.dummy.connection
2978 current_process = staticmethod(multiprocessing.dummy.current_process)
2979 active_children = staticmethod(multiprocessing.dummy.active_children)
2980 Pool = staticmethod(multiprocessing.Pool)
2981 Pipe = staticmethod(multiprocessing.dummy.Pipe)
2982 Queue = staticmethod(multiprocessing.dummy.Queue)
2983 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
2984 Lock = staticmethod(multiprocessing.dummy.Lock)
2985 RLock = staticmethod(multiprocessing.dummy.RLock)
2986 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
2987 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
2988 Condition = staticmethod(multiprocessing.dummy.Condition)
2989 Event = staticmethod(multiprocessing.dummy.Event)
2990 Barrier = staticmethod(multiprocessing.dummy.Barrier)
2991 Value = staticmethod(multiprocessing.dummy.Value)
2992 Array = staticmethod(multiprocessing.dummy.Array)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002993
2994testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2995globals().update(testcases_threads)
2996
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002997
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002998class OtherTest(unittest.TestCase):
2999 # TODO: add more tests for deliver/answer challenge.
3000 def test_deliver_challenge_auth_failure(self):
3001 class _FakeConnection(object):
3002 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00003003 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003004 def send_bytes(self, data):
3005 pass
3006 self.assertRaises(multiprocessing.AuthenticationError,
3007 multiprocessing.connection.deliver_challenge,
3008 _FakeConnection(), b'abc')
3009
3010 def test_answer_challenge_auth_failure(self):
3011 class _FakeConnection(object):
3012 def __init__(self):
3013 self.count = 0
3014 def recv_bytes(self, size):
3015 self.count += 1
3016 if self.count == 1:
3017 return multiprocessing.connection.CHALLENGE
3018 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003019 return b'something bogus'
3020 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003021 def send_bytes(self, data):
3022 pass
3023 self.assertRaises(multiprocessing.AuthenticationError,
3024 multiprocessing.connection.answer_challenge,
3025 _FakeConnection(), b'abc')
3026
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003027#
3028# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3029#
3030
3031def initializer(ns):
3032 ns.test += 1
3033
3034class TestInitializers(unittest.TestCase):
3035 def setUp(self):
3036 self.mgr = multiprocessing.Manager()
3037 self.ns = self.mgr.Namespace()
3038 self.ns.test = 0
3039
3040 def tearDown(self):
3041 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003042 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003043
3044 def test_manager_initializer(self):
3045 m = multiprocessing.managers.SyncManager()
3046 self.assertRaises(TypeError, m.start, 1)
3047 m.start(initializer, (self.ns,))
3048 self.assertEqual(self.ns.test, 1)
3049 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003050 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003051
3052 def test_pool_initializer(self):
3053 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3054 p = multiprocessing.Pool(1, initializer, (self.ns,))
3055 p.close()
3056 p.join()
3057 self.assertEqual(self.ns.test, 1)
3058
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003059#
3060# Issue 5155, 5313, 5331: Test process in processes
3061# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3062#
3063
3064def _ThisSubProcess(q):
3065 try:
3066 item = q.get(block=False)
3067 except pyqueue.Empty:
3068 pass
3069
3070def _TestProcess(q):
3071 queue = multiprocessing.Queue()
3072 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003073 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003074 subProc.start()
3075 subProc.join()
3076
3077def _afunc(x):
3078 return x*x
3079
3080def pool_in_process():
3081 pool = multiprocessing.Pool(processes=4)
3082 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003083 pool.close()
3084 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003085
3086class _file_like(object):
3087 def __init__(self, delegate):
3088 self._delegate = delegate
3089 self._pid = None
3090
3091 @property
3092 def cache(self):
3093 pid = os.getpid()
3094 # There are no race conditions since fork keeps only the running thread
3095 if pid != self._pid:
3096 self._pid = pid
3097 self._cache = []
3098 return self._cache
3099
3100 def write(self, data):
3101 self.cache.append(data)
3102
3103 def flush(self):
3104 self._delegate.write(''.join(self.cache))
3105 self._cache = []
3106
3107class TestStdinBadfiledescriptor(unittest.TestCase):
3108
3109 def test_queue_in_process(self):
3110 queue = multiprocessing.Queue()
3111 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3112 proc.start()
3113 proc.join()
3114
3115 def test_pool_in_process(self):
3116 p = multiprocessing.Process(target=pool_in_process)
3117 p.start()
3118 p.join()
3119
3120 def test_flushing(self):
3121 sio = io.StringIO()
3122 flike = _file_like(sio)
3123 flike.write('foo')
3124 proc = multiprocessing.Process(target=lambda: flike.flush())
3125 flike.flush()
3126 assert sio.getvalue() == 'foo'
3127
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003128
3129class TestWait(unittest.TestCase):
3130
3131 @classmethod
3132 def _child_test_wait(cls, w, slow):
3133 for i in range(10):
3134 if slow:
3135 time.sleep(random.random()*0.1)
3136 w.send((i, os.getpid()))
3137 w.close()
3138
3139 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003140 from multiprocessing.connection import wait
3141 readers = []
3142 procs = []
3143 messages = []
3144
3145 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003146 r, w = multiprocessing.Pipe(duplex=False)
3147 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003148 p.daemon = True
3149 p.start()
3150 w.close()
3151 readers.append(r)
3152 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003153 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003154
3155 while readers:
3156 for r in wait(readers):
3157 try:
3158 msg = r.recv()
3159 except EOFError:
3160 readers.remove(r)
3161 r.close()
3162 else:
3163 messages.append(msg)
3164
3165 messages.sort()
3166 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3167 self.assertEqual(messages, expected)
3168
3169 @classmethod
3170 def _child_test_wait_socket(cls, address, slow):
3171 s = socket.socket()
3172 s.connect(address)
3173 for i in range(10):
3174 if slow:
3175 time.sleep(random.random()*0.1)
3176 s.sendall(('%s\n' % i).encode('ascii'))
3177 s.close()
3178
3179 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003180 from multiprocessing.connection import wait
3181 l = socket.socket()
3182 l.bind(('', 0))
3183 l.listen(4)
3184 addr = ('localhost', l.getsockname()[1])
3185 readers = []
3186 procs = []
3187 dic = {}
3188
3189 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003190 p = multiprocessing.Process(target=self._child_test_wait_socket,
3191 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003192 p.daemon = True
3193 p.start()
3194 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003195 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003196
3197 for i in range(4):
3198 r, _ = l.accept()
3199 readers.append(r)
3200 dic[r] = []
3201 l.close()
3202
3203 while readers:
3204 for r in wait(readers):
3205 msg = r.recv(32)
3206 if not msg:
3207 readers.remove(r)
3208 r.close()
3209 else:
3210 dic[r].append(msg)
3211
3212 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3213 for v in dic.values():
3214 self.assertEqual(b''.join(v), expected)
3215
3216 def test_wait_slow(self):
3217 self.test_wait(True)
3218
3219 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003220 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003221
3222 def test_wait_timeout(self):
3223 from multiprocessing.connection import wait
3224
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003225 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003226 a, b = multiprocessing.Pipe()
3227
3228 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003229 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003230 delta = time.time() - start
3231
3232 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003233 self.assertLess(delta, expected * 2)
3234 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003235
3236 b.send(None)
3237
3238 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003239 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003240 delta = time.time() - start
3241
3242 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003243 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003244
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003245 @classmethod
3246 def signal_and_sleep(cls, sem, period):
3247 sem.release()
3248 time.sleep(period)
3249
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003250 def test_wait_integer(self):
3251 from multiprocessing.connection import wait
3252
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003253 expected = 3
3254 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003255 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003256 p = multiprocessing.Process(target=self.signal_and_sleep,
3257 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003258
3259 p.start()
3260 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003261 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003262
3263 start = time.time()
3264 res = wait([a, p.sentinel, b], expected + 20)
3265 delta = time.time() - start
3266
3267 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003268 self.assertLess(delta, expected + 2)
3269 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003270
3271 a.send(None)
3272
3273 start = time.time()
3274 res = wait([a, p.sentinel, b], 20)
3275 delta = time.time() - start
3276
3277 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01003278 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003279
3280 b.send(None)
3281
3282 start = time.time()
3283 res = wait([a, p.sentinel, b], 20)
3284 delta = time.time() - start
3285
3286 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01003287 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003288
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003289 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003290 p.join()
3291
Richard Oudkerk59d54042012-05-10 16:11:12 +01003292 def test_neg_timeout(self):
3293 from multiprocessing.connection import wait
3294 a, b = multiprocessing.Pipe()
3295 t = time.time()
3296 res = wait([a], timeout=-1)
3297 t = time.time() - t
3298 self.assertEqual(res, [])
3299 self.assertLess(t, 1)
3300 a.close()
3301 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003302
Antoine Pitrou709176f2012-04-01 17:19:09 +02003303#
3304# Issue 14151: Test invalid family on invalid environment
3305#
3306
3307class TestInvalidFamily(unittest.TestCase):
3308
3309 @unittest.skipIf(WIN32, "skipped on Windows")
3310 def test_invalid_family(self):
3311 with self.assertRaises(ValueError):
3312 multiprocessing.connection.Listener(r'\\.\test')
3313
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003314 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3315 def test_invalid_family_win32(self):
3316 with self.assertRaises(ValueError):
3317 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003318
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003319#
3320# Issue 12098: check sys.flags of child matches that for parent
3321#
3322
3323class TestFlags(unittest.TestCase):
3324 @classmethod
3325 def run_in_grandchild(cls, conn):
3326 conn.send(tuple(sys.flags))
3327
3328 @classmethod
3329 def run_in_child(cls):
3330 import json
3331 r, w = multiprocessing.Pipe(duplex=False)
3332 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3333 p.start()
3334 grandchild_flags = r.recv()
3335 p.join()
3336 r.close()
3337 w.close()
3338 flags = (tuple(sys.flags), grandchild_flags)
3339 print(json.dumps(flags))
3340
3341 def test_flags(self):
3342 import json, subprocess
3343 # start child process using unusual flags
3344 prog = ('from test.test_multiprocessing import TestFlags; ' +
3345 'TestFlags.run_in_child()')
3346 data = subprocess.check_output(
3347 [sys.executable, '-E', '-S', '-O', '-c', prog])
3348 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3349 self.assertEqual(child_flags, grandchild_flags)
3350
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003351#
3352# Test interaction with socket timeouts - see Issue #6056
3353#
3354
3355class TestTimeouts(unittest.TestCase):
3356 @classmethod
3357 def _test_timeout(cls, child, address):
3358 time.sleep(1)
3359 child.send(123)
3360 child.close()
3361 conn = multiprocessing.connection.Client(address)
3362 conn.send(456)
3363 conn.close()
3364
3365 def test_timeout(self):
3366 old_timeout = socket.getdefaulttimeout()
3367 try:
3368 socket.setdefaulttimeout(0.1)
3369 parent, child = multiprocessing.Pipe(duplex=True)
3370 l = multiprocessing.connection.Listener(family='AF_INET')
3371 p = multiprocessing.Process(target=self._test_timeout,
3372 args=(child, l.address))
3373 p.start()
3374 child.close()
3375 self.assertEqual(parent.recv(), 123)
3376 parent.close()
3377 conn = l.accept()
3378 self.assertEqual(conn.recv(), 456)
3379 conn.close()
3380 l.close()
3381 p.join(10)
3382 finally:
3383 socket.setdefaulttimeout(old_timeout)
3384
Richard Oudkerke88a2442012-08-14 11:41:32 +01003385#
3386# Test what happens with no "if __name__ == '__main__'"
3387#
3388
3389class TestNoForkBomb(unittest.TestCase):
3390 def test_noforkbomb(self):
3391 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3392 if WIN32:
3393 rc, out, err = test.script_helper.assert_python_failure(name)
3394 self.assertEqual('', out.decode('ascii'))
3395 self.assertIn('RuntimeError', err.decode('ascii'))
3396 else:
3397 rc, out, err = test.script_helper.assert_python_ok(name)
3398 self.assertEqual('123', out.decode('ascii').rstrip())
3399 self.assertEqual('', err.decode('ascii'))
3400
3401#
3402#
3403#
3404
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003405testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003406 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
Richard Oudkerk3165a752012-08-14 12:51:14 +01003407 TestFlags, TestTimeouts, TestNoForkBomb]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003408
Benjamin Petersone711caf2008-06-11 16:44:04 +00003409#
3410#
3411#
3412
3413def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003414 if sys.platform.startswith("linux"):
3415 try:
3416 lock = multiprocessing.RLock()
3417 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00003418 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00003419
Charles-François Natali221ef672011-11-22 18:55:22 +01003420 check_enough_semaphores()
3421
Benjamin Petersone711caf2008-06-11 16:44:04 +00003422 if run is None:
3423 from test.support import run_unittest as run
3424
3425 util.get_temp_dir() # creates temp directory for use by all processes
3426
3427 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3428
Benjamin Petersone711caf2008-06-11 16:44:04 +00003429 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00003430 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
3431 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003432 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
3433 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00003434 )
3435
3436 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
3437 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003438 run(suite)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003439
3440def main():
3441 test_main(unittest.TextTestRunner(verbosity=2).run)
3442
3443if __name__ == '__main__':
3444 main()