blob: a8e42e4f06d12653596092b2ac1f9cc8e68c4766 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
Richard Oudkerk3730a172012-06-15 18:26:07 +010021import struct
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +010022import operator
R. David Murraya21e4ca2009-03-31 23:16:50 +000023import test.support
Richard Oudkerke88a2442012-08-14 11:41:32 +010024import test.script_helper
Benjamin Petersone711caf2008-06-11 16:44:04 +000025
Benjamin Petersone5384b02008-10-04 22:00:42 +000026
R. David Murraya21e4ca2009-03-31 23:16:50 +000027# Skip tests if _multiprocessing wasn't built.
28_multiprocessing = test.support.import_module('_multiprocessing')
29# Skip tests if sem_open implementation is broken.
30test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000031# import threading after _multiprocessing to raise a more revelant error
32# message: "No module named _multiprocessing". _multiprocessing is not compiled
33# without thread support.
34import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000035
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.dummy
37import multiprocessing.connection
38import multiprocessing.managers
39import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000040import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000041
Charles-François Natalibc8f0822011-09-20 20:36:51 +020042from multiprocessing import util
43
44try:
45 from multiprocessing import reduction
46 HAS_REDUCTION = True
47except ImportError:
48 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000049
Brian Curtinafa88b52010-10-07 01:12:19 +000050try:
51 from multiprocessing.sharedctypes import Value, copy
52 HAS_SHAREDCTYPES = True
53except ImportError:
54 HAS_SHAREDCTYPES = False
55
Antoine Pitroubcb39d42011-08-23 19:46:22 +020056try:
57 import msvcrt
58except ImportError:
59 msvcrt = None
60
Benjamin Petersone711caf2008-06-11 16:44:04 +000061#
62#
63#
64
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000065def latin(s):
66 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000067
Benjamin Petersone711caf2008-06-11 16:44:04 +000068#
69# Constants
70#
71
72LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000073#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000074
75DELTA = 0.1
76CHECK_TIMINGS = False # making true makes tests take a lot longer
77 # and can sometimes cause some non-serious
78 # failures because some calls block a bit
79 # longer than expected
80if CHECK_TIMINGS:
81 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
82else:
83 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
84
85HAVE_GETVALUE = not getattr(_multiprocessing,
86 'HAVE_BROKEN_SEM_GETVALUE', False)
87
Jesse Noller6214edd2009-01-19 16:23:53 +000088WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020089
Richard Oudkerk59d54042012-05-10 16:11:12 +010090from multiprocessing.connection import wait
Antoine Pitrou176f07d2011-06-06 19:35:31 +020091
Richard Oudkerk59d54042012-05-10 16:11:12 +010092def wait_for_handle(handle, timeout):
93 if timeout is not None and timeout < 0.0:
94 timeout = None
95 return wait([handle], timeout)
Jesse Noller6214edd2009-01-19 16:23:53 +000096
Antoine Pitroubcb39d42011-08-23 19:46:22 +020097try:
98 MAXFD = os.sysconf("SC_OPEN_MAX")
99except:
100 MAXFD = 256
101
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000103# Some tests require ctypes
104#
105
106try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000107 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000108except ImportError:
109 Structure = object
110 c_int = c_double = None
111
Charles-François Natali221ef672011-11-22 18:55:22 +0100112
113def check_enough_semaphores():
114 """Check that the system supports enough semaphores to run the test."""
115 # minimum number of semaphores available according to POSIX
116 nsems_min = 256
117 try:
118 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
119 except (AttributeError, ValueError):
120 # sysconf not available or setting not available
121 return
122 if nsems == -1 or nsems >= nsems_min:
123 return
124 raise unittest.SkipTest("The OS doesn't support enough semaphores "
125 "to run the test (required: %d)." % nsems_min)
126
127
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000128#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000129# Creates a wrapper for a function which records the time it takes to finish
130#
131
132class TimingWrapper(object):
133
134 def __init__(self, func):
135 self.func = func
136 self.elapsed = None
137
138 def __call__(self, *args, **kwds):
139 t = time.time()
140 try:
141 return self.func(*args, **kwds)
142 finally:
143 self.elapsed = time.time() - t
144
145#
146# Base class for test cases
147#
148
149class BaseTestCase(object):
150
151 ALLOWED_TYPES = ('processes', 'manager', 'threads')
152
153 def assertTimingAlmostEqual(self, a, b):
154 if CHECK_TIMINGS:
155 self.assertAlmostEqual(a, b, 1)
156
157 def assertReturnsIfImplemented(self, value, func, *args):
158 try:
159 res = func(*args)
160 except NotImplementedError:
161 pass
162 else:
163 return self.assertEqual(value, res)
164
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000165 # For the sanity of Windows users, rather than crashing or freezing in
166 # multiple ways.
167 def __reduce__(self, *args):
168 raise NotImplementedError("shouldn't try to pickle a test case")
169
170 __reduce_ex__ = __reduce__
171
Benjamin Petersone711caf2008-06-11 16:44:04 +0000172#
173# Return the value of a semaphore
174#
175
176def get_value(self):
177 try:
178 return self.get_value()
179 except AttributeError:
180 try:
181 return self._Semaphore__value
182 except AttributeError:
183 try:
184 return self._value
185 except AttributeError:
186 raise NotImplementedError
187
188#
189# Testcases
190#
191
192class _TestProcess(BaseTestCase):
193
194 ALLOWED_TYPES = ('processes', 'threads')
195
196 def test_current(self):
197 if self.TYPE == 'threads':
198 return
199
200 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000201 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000202
203 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000204 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000205 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000206 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000207 self.assertEqual(current.ident, os.getpid())
208 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000210 def test_daemon_argument(self):
211 if self.TYPE == "threads":
212 return
213
214 # By default uses the current process's daemon flag.
215 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000216 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 proc1 = self.Process(target=self._test, daemon=True)
218 self.assertTrue(proc1.daemon)
219 proc2 = self.Process(target=self._test, daemon=False)
220 self.assertFalse(proc2.daemon)
221
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000222 @classmethod
223 def _test(cls, q, *args, **kwds):
224 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225 q.put(args)
226 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000227 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000228 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000229 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000230 q.put(current.pid)
231
232 def test_process(self):
233 q = self.Queue(1)
234 e = self.Event()
235 args = (q, 1, 2)
236 kwargs = {'hello':23, 'bye':2.54}
237 name = 'SomeProcess'
238 p = self.Process(
239 target=self._test, args=args, kwargs=kwargs, name=name
240 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242 current = self.current_process()
243
244 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000245 self.assertEqual(p.authkey, current.authkey)
246 self.assertEqual(p.is_alive(), False)
247 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000248 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000250 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000251
252 p.start()
253
Ezio Melottib3aedd42010-11-20 19:04:17 +0000254 self.assertEqual(p.exitcode, None)
255 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000256 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000257
Ezio Melottib3aedd42010-11-20 19:04:17 +0000258 self.assertEqual(q.get(), args[1:])
259 self.assertEqual(q.get(), kwargs)
260 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000262 self.assertEqual(q.get(), current.authkey)
263 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
265 p.join()
266
Ezio Melottib3aedd42010-11-20 19:04:17 +0000267 self.assertEqual(p.exitcode, 0)
268 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000269 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000270
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000271 @classmethod
272 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000273 time.sleep(1000)
274
275 def test_terminate(self):
276 if self.TYPE == 'threads':
277 return
278
279 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000280 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000281 p.start()
282
283 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000284 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000285 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000286
Richard Oudkerk59d54042012-05-10 16:11:12 +0100287 join = TimingWrapper(p.join)
288
289 self.assertEqual(join(0), None)
290 self.assertTimingAlmostEqual(join.elapsed, 0.0)
291 self.assertEqual(p.is_alive(), True)
292
293 self.assertEqual(join(-1), None)
294 self.assertTimingAlmostEqual(join.elapsed, 0.0)
295 self.assertEqual(p.is_alive(), True)
296
Benjamin Petersone711caf2008-06-11 16:44:04 +0000297 p.terminate()
298
Benjamin Petersone711caf2008-06-11 16:44:04 +0000299 self.assertEqual(join(), None)
300 self.assertTimingAlmostEqual(join.elapsed, 0.0)
301
302 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000303 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000304
305 p.join()
306
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000307 # XXX sometimes get p.exitcode == 0 on Windows ...
308 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000309
310 def test_cpu_count(self):
311 try:
312 cpus = multiprocessing.cpu_count()
313 except NotImplementedError:
314 cpus = 1
315 self.assertTrue(type(cpus) is int)
316 self.assertTrue(cpus >= 1)
317
318 def test_active_children(self):
319 self.assertEqual(type(self.active_children()), list)
320
321 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000322 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323
Jesus Cea94f964f2011-09-09 20:26:57 +0200324 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000326 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000327
328 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000329 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000330
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000331 @classmethod
332 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000333 from multiprocessing import forking
334 wconn.send(id)
335 if len(id) < 2:
336 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000337 p = cls.Process(
338 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000339 )
340 p.start()
341 p.join()
342
343 def test_recursion(self):
344 rconn, wconn = self.Pipe(duplex=False)
345 self._test_recursion(wconn, [])
346
347 time.sleep(DELTA)
348 result = []
349 while rconn.poll():
350 result.append(rconn.recv())
351
352 expected = [
353 [],
354 [0],
355 [0, 0],
356 [0, 1],
357 [1],
358 [1, 0],
359 [1, 1]
360 ]
361 self.assertEqual(result, expected)
362
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200363 @classmethod
364 def _test_sentinel(cls, event):
365 event.wait(10.0)
366
367 def test_sentinel(self):
368 if self.TYPE == "threads":
369 return
370 event = self.Event()
371 p = self.Process(target=self._test_sentinel, args=(event,))
372 with self.assertRaises(ValueError):
373 p.sentinel
374 p.start()
375 self.addCleanup(p.join)
376 sentinel = p.sentinel
377 self.assertIsInstance(sentinel, int)
378 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
379 event.set()
380 p.join()
381 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
382
Benjamin Petersone711caf2008-06-11 16:44:04 +0000383#
384#
385#
386
387class _UpperCaser(multiprocessing.Process):
388
389 def __init__(self):
390 multiprocessing.Process.__init__(self)
391 self.child_conn, self.parent_conn = multiprocessing.Pipe()
392
393 def run(self):
394 self.parent_conn.close()
395 for s in iter(self.child_conn.recv, None):
396 self.child_conn.send(s.upper())
397 self.child_conn.close()
398
399 def submit(self, s):
400 assert type(s) is str
401 self.parent_conn.send(s)
402 return self.parent_conn.recv()
403
404 def stop(self):
405 self.parent_conn.send(None)
406 self.parent_conn.close()
407 self.child_conn.close()
408
409class _TestSubclassingProcess(BaseTestCase):
410
411 ALLOWED_TYPES = ('processes',)
412
413 def test_subclassing(self):
414 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200415 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000416 uppercaser.start()
417 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
418 self.assertEqual(uppercaser.submit('world'), 'WORLD')
419 uppercaser.stop()
420 uppercaser.join()
421
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100422 def test_stderr_flush(self):
423 # sys.stderr is flushed at process shutdown (issue #13812)
424 if self.TYPE == "threads":
425 return
426
427 testfn = test.support.TESTFN
428 self.addCleanup(test.support.unlink, testfn)
429 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
430 proc.start()
431 proc.join()
432 with open(testfn, 'r') as f:
433 err = f.read()
434 # The whole traceback was printed
435 self.assertIn("ZeroDivisionError", err)
436 self.assertIn("test_multiprocessing.py", err)
437 self.assertIn("1/0 # MARKER", err)
438
439 @classmethod
440 def _test_stderr_flush(cls, testfn):
441 sys.stderr = open(testfn, 'w')
442 1/0 # MARKER
443
444
Richard Oudkerk29471de2012-06-06 19:04:57 +0100445 @classmethod
446 def _test_sys_exit(cls, reason, testfn):
447 sys.stderr = open(testfn, 'w')
448 sys.exit(reason)
449
450 def test_sys_exit(self):
451 # See Issue 13854
452 if self.TYPE == 'threads':
453 return
454
455 testfn = test.support.TESTFN
456 self.addCleanup(test.support.unlink, testfn)
457
458 for reason, code in (([1, 2, 3], 1), ('ignore this', 0)):
459 p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
460 p.daemon = True
461 p.start()
462 p.join(5)
463 self.assertEqual(p.exitcode, code)
464
465 with open(testfn, 'r') as f:
466 self.assertEqual(f.read().rstrip(), str(reason))
467
468 for reason in (True, False, 8):
469 p = self.Process(target=sys.exit, args=(reason,))
470 p.daemon = True
471 p.start()
472 p.join(5)
473 self.assertEqual(p.exitcode, reason)
474
Benjamin Petersone711caf2008-06-11 16:44:04 +0000475#
476#
477#
478
479def queue_empty(q):
480 if hasattr(q, 'empty'):
481 return q.empty()
482 else:
483 return q.qsize() == 0
484
485def queue_full(q, maxsize):
486 if hasattr(q, 'full'):
487 return q.full()
488 else:
489 return q.qsize() == maxsize
490
491
492class _TestQueue(BaseTestCase):
493
494
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000495 @classmethod
496 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000497 child_can_start.wait()
498 for i in range(6):
499 queue.get()
500 parent_can_continue.set()
501
502 def test_put(self):
503 MAXSIZE = 6
504 queue = self.Queue(maxsize=MAXSIZE)
505 child_can_start = self.Event()
506 parent_can_continue = self.Event()
507
508 proc = self.Process(
509 target=self._test_put,
510 args=(queue, child_can_start, parent_can_continue)
511 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000512 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000513 proc.start()
514
515 self.assertEqual(queue_empty(queue), True)
516 self.assertEqual(queue_full(queue, MAXSIZE), False)
517
518 queue.put(1)
519 queue.put(2, True)
520 queue.put(3, True, None)
521 queue.put(4, False)
522 queue.put(5, False, None)
523 queue.put_nowait(6)
524
525 # the values may be in buffer but not yet in pipe so sleep a bit
526 time.sleep(DELTA)
527
528 self.assertEqual(queue_empty(queue), False)
529 self.assertEqual(queue_full(queue, MAXSIZE), True)
530
531 put = TimingWrapper(queue.put)
532 put_nowait = TimingWrapper(queue.put_nowait)
533
534 self.assertRaises(pyqueue.Full, put, 7, False)
535 self.assertTimingAlmostEqual(put.elapsed, 0)
536
537 self.assertRaises(pyqueue.Full, put, 7, False, None)
538 self.assertTimingAlmostEqual(put.elapsed, 0)
539
540 self.assertRaises(pyqueue.Full, put_nowait, 7)
541 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
542
543 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
544 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
545
546 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
547 self.assertTimingAlmostEqual(put.elapsed, 0)
548
549 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
550 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 self.assertEqual(queue_empty(queue), True)
556 self.assertEqual(queue_full(queue, MAXSIZE), False)
557
558 proc.join()
559
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000560 @classmethod
561 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000563 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000564 queue.put(2)
565 queue.put(3)
566 queue.put(4)
567 queue.put(5)
568 parent_can_continue.set()
569
570 def test_get(self):
571 queue = self.Queue()
572 child_can_start = self.Event()
573 parent_can_continue = self.Event()
574
575 proc = self.Process(
576 target=self._test_get,
577 args=(queue, child_can_start, parent_can_continue)
578 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000579 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000580 proc.start()
581
582 self.assertEqual(queue_empty(queue), True)
583
584 child_can_start.set()
585 parent_can_continue.wait()
586
587 time.sleep(DELTA)
588 self.assertEqual(queue_empty(queue), False)
589
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000590 # Hangs unexpectedly, remove for now
591 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 self.assertEqual(queue.get(True, None), 2)
593 self.assertEqual(queue.get(True), 3)
594 self.assertEqual(queue.get(timeout=1), 4)
595 self.assertEqual(queue.get_nowait(), 5)
596
597 self.assertEqual(queue_empty(queue), True)
598
599 get = TimingWrapper(queue.get)
600 get_nowait = TimingWrapper(queue.get_nowait)
601
602 self.assertRaises(pyqueue.Empty, get, False)
603 self.assertTimingAlmostEqual(get.elapsed, 0)
604
605 self.assertRaises(pyqueue.Empty, get, False, None)
606 self.assertTimingAlmostEqual(get.elapsed, 0)
607
608 self.assertRaises(pyqueue.Empty, get_nowait)
609 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
610
611 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
612 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
613
614 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
615 self.assertTimingAlmostEqual(get.elapsed, 0)
616
617 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
618 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
619
620 proc.join()
621
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000622 @classmethod
623 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000624 for i in range(10, 20):
625 queue.put(i)
626 # note that at this point the items may only be buffered, so the
627 # process cannot shutdown until the feeder thread has finished
628 # pushing items onto the pipe.
629
630 def test_fork(self):
631 # Old versions of Queue would fail to create a new feeder
632 # thread for a forked process if the original process had its
633 # own feeder thread. This test checks that this no longer
634 # happens.
635
636 queue = self.Queue()
637
638 # put items on queue so that main process starts a feeder thread
639 for i in range(10):
640 queue.put(i)
641
642 # wait to make sure thread starts before we fork a new process
643 time.sleep(DELTA)
644
645 # fork process
646 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200647 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000648 p.start()
649
650 # check that all expected items are in the queue
651 for i in range(20):
652 self.assertEqual(queue.get(), i)
653 self.assertRaises(pyqueue.Empty, queue.get, False)
654
655 p.join()
656
657 def test_qsize(self):
658 q = self.Queue()
659 try:
660 self.assertEqual(q.qsize(), 0)
661 except NotImplementedError:
662 return
663 q.put(1)
664 self.assertEqual(q.qsize(), 1)
665 q.put(5)
666 self.assertEqual(q.qsize(), 2)
667 q.get()
668 self.assertEqual(q.qsize(), 1)
669 q.get()
670 self.assertEqual(q.qsize(), 0)
671
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000672 @classmethod
673 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000674 for obj in iter(q.get, None):
675 time.sleep(DELTA)
676 q.task_done()
677
678 def test_task_done(self):
679 queue = self.JoinableQueue()
680
681 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000682 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683
684 workers = [self.Process(target=self._test_task_done, args=(queue,))
685 for i in range(4)]
686
687 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200688 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000689 p.start()
690
691 for i in range(10):
692 queue.put(i)
693
694 queue.join()
695
696 for p in workers:
697 queue.put(None)
698
699 for p in workers:
700 p.join()
701
702#
703#
704#
705
706class _TestLock(BaseTestCase):
707
708 def test_lock(self):
709 lock = self.Lock()
710 self.assertEqual(lock.acquire(), True)
711 self.assertEqual(lock.acquire(False), False)
712 self.assertEqual(lock.release(), None)
713 self.assertRaises((ValueError, threading.ThreadError), lock.release)
714
715 def test_rlock(self):
716 lock = self.RLock()
717 self.assertEqual(lock.acquire(), True)
718 self.assertEqual(lock.acquire(), True)
719 self.assertEqual(lock.acquire(), True)
720 self.assertEqual(lock.release(), None)
721 self.assertEqual(lock.release(), None)
722 self.assertEqual(lock.release(), None)
723 self.assertRaises((AssertionError, RuntimeError), lock.release)
724
Jesse Nollerf8d00852009-03-31 03:25:07 +0000725 def test_lock_context(self):
726 with self.Lock():
727 pass
728
Benjamin Petersone711caf2008-06-11 16:44:04 +0000729
730class _TestSemaphore(BaseTestCase):
731
732 def _test_semaphore(self, sem):
733 self.assertReturnsIfImplemented(2, get_value, sem)
734 self.assertEqual(sem.acquire(), True)
735 self.assertReturnsIfImplemented(1, get_value, sem)
736 self.assertEqual(sem.acquire(), True)
737 self.assertReturnsIfImplemented(0, get_value, sem)
738 self.assertEqual(sem.acquire(False), False)
739 self.assertReturnsIfImplemented(0, get_value, sem)
740 self.assertEqual(sem.release(), None)
741 self.assertReturnsIfImplemented(1, get_value, sem)
742 self.assertEqual(sem.release(), None)
743 self.assertReturnsIfImplemented(2, get_value, sem)
744
745 def test_semaphore(self):
746 sem = self.Semaphore(2)
747 self._test_semaphore(sem)
748 self.assertEqual(sem.release(), None)
749 self.assertReturnsIfImplemented(3, get_value, sem)
750 self.assertEqual(sem.release(), None)
751 self.assertReturnsIfImplemented(4, get_value, sem)
752
753 def test_bounded_semaphore(self):
754 sem = self.BoundedSemaphore(2)
755 self._test_semaphore(sem)
756 # Currently fails on OS/X
757 #if HAVE_GETVALUE:
758 # self.assertRaises(ValueError, sem.release)
759 # self.assertReturnsIfImplemented(2, get_value, sem)
760
761 def test_timeout(self):
762 if self.TYPE != 'processes':
763 return
764
765 sem = self.Semaphore(0)
766 acquire = TimingWrapper(sem.acquire)
767
768 self.assertEqual(acquire(False), False)
769 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
770
771 self.assertEqual(acquire(False, None), False)
772 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
773
774 self.assertEqual(acquire(False, TIMEOUT1), False)
775 self.assertTimingAlmostEqual(acquire.elapsed, 0)
776
777 self.assertEqual(acquire(True, TIMEOUT2), False)
778 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
779
780 self.assertEqual(acquire(timeout=TIMEOUT3), False)
781 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
782
783
784class _TestCondition(BaseTestCase):
785
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000786 @classmethod
787 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000788 cond.acquire()
789 sleeping.release()
790 cond.wait(timeout)
791 woken.release()
792 cond.release()
793
794 def check_invariant(self, cond):
795 # this is only supposed to succeed when there are no sleepers
796 if self.TYPE == 'processes':
797 try:
798 sleepers = (cond._sleeping_count.get_value() -
799 cond._woken_count.get_value())
800 self.assertEqual(sleepers, 0)
801 self.assertEqual(cond._wait_semaphore.get_value(), 0)
802 except NotImplementedError:
803 pass
804
805 def test_notify(self):
806 cond = self.Condition()
807 sleeping = self.Semaphore(0)
808 woken = self.Semaphore(0)
809
810 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000811 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000812 p.start()
813
814 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000815 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816 p.start()
817
818 # wait for both children to start sleeping
819 sleeping.acquire()
820 sleeping.acquire()
821
822 # check no process/thread has woken up
823 time.sleep(DELTA)
824 self.assertReturnsIfImplemented(0, get_value, woken)
825
826 # wake up one process/thread
827 cond.acquire()
828 cond.notify()
829 cond.release()
830
831 # check one process/thread has woken up
832 time.sleep(DELTA)
833 self.assertReturnsIfImplemented(1, get_value, woken)
834
835 # wake up another
836 cond.acquire()
837 cond.notify()
838 cond.release()
839
840 # check other has woken up
841 time.sleep(DELTA)
842 self.assertReturnsIfImplemented(2, get_value, woken)
843
844 # check state is not mucked up
845 self.check_invariant(cond)
846 p.join()
847
848 def test_notify_all(self):
849 cond = self.Condition()
850 sleeping = self.Semaphore(0)
851 woken = self.Semaphore(0)
852
853 # start some threads/processes which will timeout
854 for i in range(3):
855 p = self.Process(target=self.f,
856 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000857 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000858 p.start()
859
860 t = threading.Thread(target=self.f,
861 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000862 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000863 t.start()
864
865 # wait for them all to sleep
866 for i in range(6):
867 sleeping.acquire()
868
869 # check they have all timed out
870 for i in range(6):
871 woken.acquire()
872 self.assertReturnsIfImplemented(0, get_value, woken)
873
874 # check state is not mucked up
875 self.check_invariant(cond)
876
877 # start some more threads/processes
878 for i in range(3):
879 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000880 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881 p.start()
882
883 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000884 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000885 t.start()
886
887 # wait for them to all sleep
888 for i in range(6):
889 sleeping.acquire()
890
891 # check no process/thread has woken up
892 time.sleep(DELTA)
893 self.assertReturnsIfImplemented(0, get_value, woken)
894
895 # wake them all up
896 cond.acquire()
897 cond.notify_all()
898 cond.release()
899
900 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200901 for i in range(10):
902 try:
903 if get_value(woken) == 6:
904 break
905 except NotImplementedError:
906 break
907 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000908 self.assertReturnsIfImplemented(6, get_value, woken)
909
910 # check state is not mucked up
911 self.check_invariant(cond)
912
913 def test_timeout(self):
914 cond = self.Condition()
915 wait = TimingWrapper(cond.wait)
916 cond.acquire()
917 res = wait(TIMEOUT1)
918 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000919 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000920 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
921
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200922 @classmethod
923 def _test_waitfor_f(cls, cond, state):
924 with cond:
925 state.value = 0
926 cond.notify()
927 result = cond.wait_for(lambda : state.value==4)
928 if not result or state.value != 4:
929 sys.exit(1)
930
931 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
932 def test_waitfor(self):
933 # based on test in test/lock_tests.py
934 cond = self.Condition()
935 state = self.Value('i', -1)
936
937 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
938 p.daemon = True
939 p.start()
940
941 with cond:
942 result = cond.wait_for(lambda : state.value==0)
943 self.assertTrue(result)
944 self.assertEqual(state.value, 0)
945
946 for i in range(4):
947 time.sleep(0.01)
948 with cond:
949 state.value += 1
950 cond.notify()
951
952 p.join(5)
953 self.assertFalse(p.is_alive())
954 self.assertEqual(p.exitcode, 0)
955
956 @classmethod
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100957 def _test_waitfor_timeout_f(cls, cond, state, success, sem):
958 sem.release()
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200959 with cond:
960 expected = 0.1
961 dt = time.time()
962 result = cond.wait_for(lambda : state.value==4, timeout=expected)
963 dt = time.time() - dt
964 # borrow logic in assertTimeout() from test/lock_tests.py
965 if not result and expected * 0.6 < dt < expected * 10.0:
966 success.value = True
967
968 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
969 def test_waitfor_timeout(self):
970 # based on test in test/lock_tests.py
971 cond = self.Condition()
972 state = self.Value('i', 0)
973 success = self.Value('i', False)
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100974 sem = self.Semaphore(0)
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200975
976 p = self.Process(target=self._test_waitfor_timeout_f,
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100977 args=(cond, state, success, sem))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200978 p.daemon = True
979 p.start()
Richard Oudkerk6dbca362012-05-06 16:46:36 +0100980 self.assertTrue(sem.acquire(timeout=10))
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200981
982 # Only increment 3 times, so state == 4 is never reached.
983 for i in range(3):
984 time.sleep(0.01)
985 with cond:
986 state.value += 1
987 cond.notify()
988
989 p.join(5)
990 self.assertTrue(success.value)
991
Richard Oudkerk98449932012-06-05 13:15:29 +0100992 @classmethod
993 def _test_wait_result(cls, c, pid):
994 with c:
995 c.notify()
996 time.sleep(1)
997 if pid is not None:
998 os.kill(pid, signal.SIGINT)
999
1000 def test_wait_result(self):
1001 if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1002 pid = os.getpid()
1003 else:
1004 pid = None
1005
1006 c = self.Condition()
1007 with c:
1008 self.assertFalse(c.wait(0))
1009 self.assertFalse(c.wait(0.1))
1010
1011 p = self.Process(target=self._test_wait_result, args=(c, pid))
1012 p.start()
1013
1014 self.assertTrue(c.wait(10))
1015 if pid is not None:
1016 self.assertRaises(KeyboardInterrupt, c.wait, 10)
1017
1018 p.join()
1019
Benjamin Petersone711caf2008-06-11 16:44:04 +00001020
1021class _TestEvent(BaseTestCase):
1022
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001023 @classmethod
1024 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001025 time.sleep(TIMEOUT2)
1026 event.set()
1027
1028 def test_event(self):
1029 event = self.Event()
1030 wait = TimingWrapper(event.wait)
1031
Ezio Melotti13925002011-03-16 11:05:33 +02001032 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +00001033 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +00001034 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001035
Benjamin Peterson965ce872009-04-05 21:24:58 +00001036 # Removed, threading.Event.wait() will return the value of the __flag
1037 # instead of None. API Shear with the semaphore backed mp.Event
1038 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001039 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001040 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001041 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1042
1043 event.set()
1044
1045 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +00001046 self.assertEqual(event.is_set(), True)
1047 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001048 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +00001049 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1051 # self.assertEqual(event.is_set(), True)
1052
1053 event.clear()
1054
1055 #self.assertEqual(event.is_set(), False)
1056
Jesus Cea94f964f2011-09-09 20:26:57 +02001057 p = self.Process(target=self._test_event, args=(event,))
1058 p.daemon = True
1059 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +00001060 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001061
1062#
Richard Oudkerk3730a172012-06-15 18:26:07 +01001063# Tests for Barrier - adapted from tests in test/lock_tests.py
1064#
1065
1066# Many of the tests for threading.Barrier use a list as an atomic
1067# counter: a value is appended to increment the counter, and the
1068# length of the list gives the value. We use the class DummyList
1069# for the same purpose.
1070
1071class _DummyList(object):
1072
1073 def __init__(self):
1074 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1075 lock = multiprocessing.Lock()
1076 self.__setstate__((wrapper, lock))
1077 self._lengthbuf[0] = 0
1078
1079 def __setstate__(self, state):
1080 (self._wrapper, self._lock) = state
1081 self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1082
1083 def __getstate__(self):
1084 return (self._wrapper, self._lock)
1085
1086 def append(self, _):
1087 with self._lock:
1088 self._lengthbuf[0] += 1
1089
1090 def __len__(self):
1091 with self._lock:
1092 return self._lengthbuf[0]
1093
1094def _wait():
1095 # A crude wait/yield function not relying on synchronization primitives.
1096 time.sleep(0.01)
1097
1098
1099class Bunch(object):
1100 """
1101 A bunch of threads.
1102 """
1103 def __init__(self, namespace, f, args, n, wait_before_exit=False):
1104 """
1105 Construct a bunch of `n` threads running the same function `f`.
1106 If `wait_before_exit` is True, the threads won't terminate until
1107 do_finish() is called.
1108 """
1109 self.f = f
1110 self.args = args
1111 self.n = n
1112 self.started = namespace.DummyList()
1113 self.finished = namespace.DummyList()
Richard Oudkerk0f523462012-06-15 19:18:30 +01001114 self._can_exit = namespace.Event()
1115 if not wait_before_exit:
1116 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001117 for i in range(n):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001118 p = namespace.Process(target=self.task)
1119 p.daemon = True
1120 p.start()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001121
1122 def task(self):
1123 pid = os.getpid()
1124 self.started.append(pid)
1125 try:
1126 self.f(*self.args)
1127 finally:
1128 self.finished.append(pid)
Richard Oudkerk0f523462012-06-15 19:18:30 +01001129 self._can_exit.wait(30)
1130 assert self._can_exit.is_set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001131
1132 def wait_for_started(self):
1133 while len(self.started) < self.n:
1134 _wait()
1135
1136 def wait_for_finished(self):
1137 while len(self.finished) < self.n:
1138 _wait()
1139
1140 def do_finish(self):
Richard Oudkerk0f523462012-06-15 19:18:30 +01001141 self._can_exit.set()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001142
1143
1144class AppendTrue(object):
1145 def __init__(self, obj):
1146 self.obj = obj
1147 def __call__(self):
1148 self.obj.append(True)
1149
1150
1151class _TestBarrier(BaseTestCase):
1152 """
1153 Tests for Barrier objects.
1154 """
1155 N = 5
Richard Oudkerk13758842012-06-18 14:11:10 +01001156 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout
Richard Oudkerk3730a172012-06-15 18:26:07 +01001157
1158 def setUp(self):
1159 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1160
1161 def tearDown(self):
1162 self.barrier.abort()
1163 self.barrier = None
1164
1165 def DummyList(self):
1166 if self.TYPE == 'threads':
1167 return []
1168 elif self.TYPE == 'manager':
1169 return self.manager.list()
1170 else:
1171 return _DummyList()
1172
1173 def run_threads(self, f, args):
1174 b = Bunch(self, f, args, self.N-1)
1175 f(*args)
1176 b.wait_for_finished()
1177
1178 @classmethod
1179 def multipass(cls, barrier, results, n):
1180 m = barrier.parties
1181 assert m == cls.N
1182 for i in range(n):
1183 results[0].append(True)
1184 assert len(results[1]) == i * m
1185 barrier.wait()
1186 results[1].append(True)
1187 assert len(results[0]) == (i + 1) * m
1188 barrier.wait()
1189 try:
1190 assert barrier.n_waiting == 0
1191 except NotImplementedError:
1192 pass
1193 assert not barrier.broken
1194
1195 def test_barrier(self, passes=1):
1196 """
1197 Test that a barrier is passed in lockstep
1198 """
1199 results = [self.DummyList(), self.DummyList()]
1200 self.run_threads(self.multipass, (self.barrier, results, passes))
1201
1202 def test_barrier_10(self):
1203 """
1204 Test that a barrier works for 10 consecutive runs
1205 """
1206 return self.test_barrier(10)
1207
1208 @classmethod
1209 def _test_wait_return_f(cls, barrier, queue):
1210 res = barrier.wait()
1211 queue.put(res)
1212
1213 def test_wait_return(self):
1214 """
1215 test the return value from barrier.wait
1216 """
1217 queue = self.Queue()
1218 self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1219 results = [queue.get() for i in range(self.N)]
1220 self.assertEqual(results.count(0), 1)
1221
1222 @classmethod
1223 def _test_action_f(cls, barrier, results):
1224 barrier.wait()
1225 if len(results) != 1:
1226 raise RuntimeError
1227
1228 def test_action(self):
1229 """
1230 Test the 'action' callback
1231 """
1232 results = self.DummyList()
1233 barrier = self.Barrier(self.N, action=AppendTrue(results))
1234 self.run_threads(self._test_action_f, (barrier, results))
1235 self.assertEqual(len(results), 1)
1236
1237 @classmethod
1238 def _test_abort_f(cls, barrier, results1, results2):
1239 try:
1240 i = barrier.wait()
1241 if i == cls.N//2:
1242 raise RuntimeError
1243 barrier.wait()
1244 results1.append(True)
1245 except threading.BrokenBarrierError:
1246 results2.append(True)
1247 except RuntimeError:
1248 barrier.abort()
1249
1250 def test_abort(self):
1251 """
1252 Test that an abort will put the barrier in a broken state
1253 """
1254 results1 = self.DummyList()
1255 results2 = self.DummyList()
1256 self.run_threads(self._test_abort_f,
1257 (self.barrier, results1, results2))
1258 self.assertEqual(len(results1), 0)
1259 self.assertEqual(len(results2), self.N-1)
1260 self.assertTrue(self.barrier.broken)
1261
1262 @classmethod
1263 def _test_reset_f(cls, barrier, results1, results2, results3):
1264 i = barrier.wait()
1265 if i == cls.N//2:
1266 # Wait until the other threads are all in the barrier.
1267 while barrier.n_waiting < cls.N-1:
1268 time.sleep(0.001)
1269 barrier.reset()
1270 else:
1271 try:
1272 barrier.wait()
1273 results1.append(True)
1274 except threading.BrokenBarrierError:
1275 results2.append(True)
1276 # Now, pass the barrier again
1277 barrier.wait()
1278 results3.append(True)
1279
1280 def test_reset(self):
1281 """
1282 Test that a 'reset' on a barrier frees the waiting threads
1283 """
1284 results1 = self.DummyList()
1285 results2 = self.DummyList()
1286 results3 = self.DummyList()
1287 self.run_threads(self._test_reset_f,
1288 (self.barrier, results1, results2, results3))
1289 self.assertEqual(len(results1), 0)
1290 self.assertEqual(len(results2), self.N-1)
1291 self.assertEqual(len(results3), self.N)
1292
1293 @classmethod
1294 def _test_abort_and_reset_f(cls, barrier, barrier2,
1295 results1, results2, results3):
1296 try:
1297 i = barrier.wait()
1298 if i == cls.N//2:
1299 raise RuntimeError
1300 barrier.wait()
1301 results1.append(True)
1302 except threading.BrokenBarrierError:
1303 results2.append(True)
1304 except RuntimeError:
1305 barrier.abort()
1306 # Synchronize and reset the barrier. Must synchronize first so
1307 # that everyone has left it when we reset, and after so that no
1308 # one enters it before the reset.
1309 if barrier2.wait() == cls.N//2:
1310 barrier.reset()
1311 barrier2.wait()
1312 barrier.wait()
1313 results3.append(True)
1314
1315 def test_abort_and_reset(self):
1316 """
1317 Test that a barrier can be reset after being broken.
1318 """
1319 results1 = self.DummyList()
1320 results2 = self.DummyList()
1321 results3 = self.DummyList()
1322 barrier2 = self.Barrier(self.N)
1323
1324 self.run_threads(self._test_abort_and_reset_f,
1325 (self.barrier, barrier2, results1, results2, results3))
1326 self.assertEqual(len(results1), 0)
1327 self.assertEqual(len(results2), self.N-1)
1328 self.assertEqual(len(results3), self.N)
1329
1330 @classmethod
1331 def _test_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001332 i = barrier.wait()
Richard Oudkerk3730a172012-06-15 18:26:07 +01001333 if i == cls.N//2:
1334 # One thread is late!
Richard Oudkerk13758842012-06-18 14:11:10 +01001335 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001336 try:
1337 barrier.wait(0.5)
1338 except threading.BrokenBarrierError:
1339 results.append(True)
1340
1341 def test_timeout(self):
1342 """
1343 Test wait(timeout)
1344 """
1345 results = self.DummyList()
1346 self.run_threads(self._test_timeout_f, (self.barrier, results))
1347 self.assertEqual(len(results), self.barrier.parties)
1348
1349 @classmethod
1350 def _test_default_timeout_f(cls, barrier, results):
Richard Oudkerk13758842012-06-18 14:11:10 +01001351 i = barrier.wait(cls.defaultTimeout)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001352 if i == cls.N//2:
1353 # One thread is later than the default timeout
Richard Oudkerk13758842012-06-18 14:11:10 +01001354 time.sleep(1.0)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001355 try:
1356 barrier.wait()
1357 except threading.BrokenBarrierError:
1358 results.append(True)
1359
1360 def test_default_timeout(self):
1361 """
1362 Test the barrier's default timeout
1363 """
Richard Oudkerk13758842012-06-18 14:11:10 +01001364 barrier = self.Barrier(self.N, timeout=0.5)
Richard Oudkerk3730a172012-06-15 18:26:07 +01001365 results = self.DummyList()
1366 self.run_threads(self._test_default_timeout_f, (barrier, results))
1367 self.assertEqual(len(results), barrier.parties)
1368
1369 def test_single_thread(self):
1370 b = self.Barrier(1)
1371 b.wait()
1372 b.wait()
1373
1374 @classmethod
1375 def _test_thousand_f(cls, barrier, passes, conn, lock):
1376 for i in range(passes):
1377 barrier.wait()
1378 with lock:
1379 conn.send(i)
1380
1381 def test_thousand(self):
1382 if self.TYPE == 'manager':
1383 return
1384 passes = 1000
1385 lock = self.Lock()
1386 conn, child_conn = self.Pipe(False)
1387 for j in range(self.N):
1388 p = self.Process(target=self._test_thousand_f,
1389 args=(self.barrier, passes, child_conn, lock))
1390 p.start()
1391
1392 for i in range(passes):
1393 for j in range(self.N):
1394 self.assertEqual(conn.recv(), i)
1395
1396#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001397#
1398#
1399
1400class _TestValue(BaseTestCase):
1401
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001402 ALLOWED_TYPES = ('processes',)
1403
Benjamin Petersone711caf2008-06-11 16:44:04 +00001404 codes_values = [
1405 ('i', 4343, 24234),
1406 ('d', 3.625, -4.25),
1407 ('h', -232, 234),
1408 ('c', latin('x'), latin('y'))
1409 ]
1410
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001411 def setUp(self):
1412 if not HAS_SHAREDCTYPES:
1413 self.skipTest("requires multiprocessing.sharedctypes")
1414
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001415 @classmethod
1416 def _test(cls, values):
1417 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001418 sv.value = cv[2]
1419
1420
1421 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001422 if raw:
1423 values = [self.RawValue(code, value)
1424 for code, value, _ in self.codes_values]
1425 else:
1426 values = [self.Value(code, value)
1427 for code, value, _ in self.codes_values]
1428
1429 for sv, cv in zip(values, self.codes_values):
1430 self.assertEqual(sv.value, cv[1])
1431
1432 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001433 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001434 proc.start()
1435 proc.join()
1436
1437 for sv, cv in zip(values, self.codes_values):
1438 self.assertEqual(sv.value, cv[2])
1439
1440 def test_rawvalue(self):
1441 self.test_value(raw=True)
1442
1443 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001444 val1 = self.Value('i', 5)
1445 lock1 = val1.get_lock()
1446 obj1 = val1.get_obj()
1447
1448 val2 = self.Value('i', 5, lock=None)
1449 lock2 = val2.get_lock()
1450 obj2 = val2.get_obj()
1451
1452 lock = self.Lock()
1453 val3 = self.Value('i', 5, lock=lock)
1454 lock3 = val3.get_lock()
1455 obj3 = val3.get_obj()
1456 self.assertEqual(lock, lock3)
1457
Jesse Nollerb0516a62009-01-18 03:11:38 +00001458 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001459 self.assertFalse(hasattr(arr4, 'get_lock'))
1460 self.assertFalse(hasattr(arr4, 'get_obj'))
1461
Jesse Nollerb0516a62009-01-18 03:11:38 +00001462 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1463
1464 arr5 = self.RawValue('i', 5)
1465 self.assertFalse(hasattr(arr5, 'get_lock'))
1466 self.assertFalse(hasattr(arr5, 'get_obj'))
1467
Benjamin Petersone711caf2008-06-11 16:44:04 +00001468
1469class _TestArray(BaseTestCase):
1470
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001471 ALLOWED_TYPES = ('processes',)
1472
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001473 @classmethod
1474 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001475 for i in range(1, len(seq)):
1476 seq[i] += seq[i-1]
1477
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001478 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001479 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001480 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1481 if raw:
1482 arr = self.RawArray('i', seq)
1483 else:
1484 arr = self.Array('i', seq)
1485
1486 self.assertEqual(len(arr), len(seq))
1487 self.assertEqual(arr[3], seq[3])
1488 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1489
1490 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1491
1492 self.assertEqual(list(arr[:]), seq)
1493
1494 self.f(seq)
1495
1496 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001497 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001498 p.start()
1499 p.join()
1500
1501 self.assertEqual(list(arr[:]), seq)
1502
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001503 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001504 def test_array_from_size(self):
1505 size = 10
1506 # Test for zeroing (see issue #11675).
1507 # The repetition below strengthens the test by increasing the chances
1508 # of previously allocated non-zero memory being used for the new array
1509 # on the 2nd and 3rd loops.
1510 for _ in range(3):
1511 arr = self.Array('i', size)
1512 self.assertEqual(len(arr), size)
1513 self.assertEqual(list(arr), [0] * size)
1514 arr[:] = range(10)
1515 self.assertEqual(list(arr), list(range(10)))
1516 del arr
1517
1518 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001519 def test_rawarray(self):
1520 self.test_array(raw=True)
1521
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001522 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001523 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001524 arr1 = self.Array('i', list(range(10)))
1525 lock1 = arr1.get_lock()
1526 obj1 = arr1.get_obj()
1527
1528 arr2 = self.Array('i', list(range(10)), lock=None)
1529 lock2 = arr2.get_lock()
1530 obj2 = arr2.get_obj()
1531
1532 lock = self.Lock()
1533 arr3 = self.Array('i', list(range(10)), lock=lock)
1534 lock3 = arr3.get_lock()
1535 obj3 = arr3.get_obj()
1536 self.assertEqual(lock, lock3)
1537
Jesse Nollerb0516a62009-01-18 03:11:38 +00001538 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001539 self.assertFalse(hasattr(arr4, 'get_lock'))
1540 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001541 self.assertRaises(AttributeError,
1542 self.Array, 'i', range(10), lock='notalock')
1543
1544 arr5 = self.RawArray('i', range(10))
1545 self.assertFalse(hasattr(arr5, 'get_lock'))
1546 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001547
1548#
1549#
1550#
1551
1552class _TestContainers(BaseTestCase):
1553
1554 ALLOWED_TYPES = ('manager',)
1555
1556 def test_list(self):
1557 a = self.list(list(range(10)))
1558 self.assertEqual(a[:], list(range(10)))
1559
1560 b = self.list()
1561 self.assertEqual(b[:], [])
1562
1563 b.extend(list(range(5)))
1564 self.assertEqual(b[:], list(range(5)))
1565
1566 self.assertEqual(b[2], 2)
1567 self.assertEqual(b[2:10], [2,3,4])
1568
1569 b *= 2
1570 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1571
1572 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1573
1574 self.assertEqual(a[:], list(range(10)))
1575
1576 d = [a, b]
1577 e = self.list(d)
1578 self.assertEqual(
1579 e[:],
1580 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1581 )
1582
1583 f = self.list([a])
1584 a.append('hello')
1585 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1586
1587 def test_dict(self):
1588 d = self.dict()
1589 indices = list(range(65, 70))
1590 for i in indices:
1591 d[i] = chr(i)
1592 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1593 self.assertEqual(sorted(d.keys()), indices)
1594 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1595 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1596
1597 def test_namespace(self):
1598 n = self.Namespace()
1599 n.name = 'Bob'
1600 n.job = 'Builder'
1601 n._hidden = 'hidden'
1602 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1603 del n.job
1604 self.assertEqual(str(n), "Namespace(name='Bob')")
1605 self.assertTrue(hasattr(n, 'name'))
1606 self.assertTrue(not hasattr(n, 'job'))
1607
1608#
1609#
1610#
1611
1612def sqr(x, wait=0.0):
1613 time.sleep(wait)
1614 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001615
Antoine Pitroude911b22011-12-21 11:03:24 +01001616def mul(x, y):
1617 return x*y
1618
Benjamin Petersone711caf2008-06-11 16:44:04 +00001619class _TestPool(BaseTestCase):
1620
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01001621 @classmethod
1622 def setUpClass(cls):
1623 super().setUpClass()
1624 cls.pool = cls.Pool(4)
1625
1626 @classmethod
1627 def tearDownClass(cls):
1628 cls.pool.terminate()
1629 cls.pool.join()
1630 cls.pool = None
1631 super().tearDownClass()
1632
Benjamin Petersone711caf2008-06-11 16:44:04 +00001633 def test_apply(self):
1634 papply = self.pool.apply
1635 self.assertEqual(papply(sqr, (5,)), sqr(5))
1636 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1637
1638 def test_map(self):
1639 pmap = self.pool.map
1640 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1641 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1642 list(map(sqr, list(range(100)))))
1643
Antoine Pitroude911b22011-12-21 11:03:24 +01001644 def test_starmap(self):
1645 psmap = self.pool.starmap
1646 tuples = list(zip(range(10), range(9,-1, -1)))
1647 self.assertEqual(psmap(mul, tuples),
1648 list(itertools.starmap(mul, tuples)))
1649 tuples = list(zip(range(100), range(99,-1, -1)))
1650 self.assertEqual(psmap(mul, tuples, chunksize=20),
1651 list(itertools.starmap(mul, tuples)))
1652
1653 def test_starmap_async(self):
1654 tuples = list(zip(range(100), range(99,-1, -1)))
1655 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1656 list(itertools.starmap(mul, tuples)))
1657
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001658 def test_map_chunksize(self):
1659 try:
1660 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1661 except multiprocessing.TimeoutError:
1662 self.fail("pool.map_async with chunksize stalled on null list")
1663
Benjamin Petersone711caf2008-06-11 16:44:04 +00001664 def test_async(self):
1665 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1666 get = TimingWrapper(res.get)
1667 self.assertEqual(get(), 49)
1668 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1669
1670 def test_async_timeout(self):
1671 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1672 get = TimingWrapper(res.get)
1673 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1674 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1675
1676 def test_imap(self):
1677 it = self.pool.imap(sqr, list(range(10)))
1678 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1679
1680 it = self.pool.imap(sqr, list(range(10)))
1681 for i in range(10):
1682 self.assertEqual(next(it), i*i)
1683 self.assertRaises(StopIteration, it.__next__)
1684
1685 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1686 for i in range(1000):
1687 self.assertEqual(next(it), i*i)
1688 self.assertRaises(StopIteration, it.__next__)
1689
1690 def test_imap_unordered(self):
1691 it = self.pool.imap_unordered(sqr, list(range(1000)))
1692 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1693
1694 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1695 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1696
1697 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001698 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1699 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1700
Benjamin Petersone711caf2008-06-11 16:44:04 +00001701 p = multiprocessing.Pool(3)
1702 self.assertEqual(3, len(p._pool))
1703 p.close()
1704 p.join()
1705
1706 def test_terminate(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001707 result = self.pool.map_async(
1708 time.sleep, [0.1 for i in range(10000)], chunksize=1
1709 )
1710 self.pool.terminate()
1711 join = TimingWrapper(self.pool.join)
1712 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001713 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001714
Richard Oudkerke41682b2012-06-06 19:04:57 +01001715 def test_empty_iterable(self):
1716 # See Issue 12157
1717 p = self.Pool(1)
1718
1719 self.assertEqual(p.map(sqr, []), [])
1720 self.assertEqual(list(p.imap(sqr, [])), [])
1721 self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1722 self.assertEqual(p.map_async(sqr, []).get(), [])
1723
1724 p.close()
1725 p.join()
1726
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001727 def test_context(self):
1728 if self.TYPE == 'processes':
1729 L = list(range(10))
1730 expected = [sqr(i) for i in L]
1731 with multiprocessing.Pool(2) as p:
1732 r = p.map_async(sqr, L)
1733 self.assertEqual(r.get(), expected)
Benjamin Peterson3095f472012-09-25 12:45:42 -04001734 print(p._state)
1735 self.assertRaises(ValueError, p.map_async, sqr, L)
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01001736
Ask Solem2afcbf22010-11-09 20:55:52 +00001737def raising():
1738 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001739
Ask Solem2afcbf22010-11-09 20:55:52 +00001740def unpickleable_result():
1741 return lambda: 42
1742
1743class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001744 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001745
1746 def test_async_error_callback(self):
1747 p = multiprocessing.Pool(2)
1748
1749 scratchpad = [None]
1750 def errback(exc):
1751 scratchpad[0] = exc
1752
1753 res = p.apply_async(raising, error_callback=errback)
1754 self.assertRaises(KeyError, res.get)
1755 self.assertTrue(scratchpad[0])
1756 self.assertIsInstance(scratchpad[0], KeyError)
1757
1758 p.close()
1759 p.join()
1760
1761 def test_unpickleable_result(self):
1762 from multiprocessing.pool import MaybeEncodingError
1763 p = multiprocessing.Pool(2)
1764
1765 # Make sure we don't lose pool processes because of encoding errors.
1766 for iteration in range(20):
1767
1768 scratchpad = [None]
1769 def errback(exc):
1770 scratchpad[0] = exc
1771
1772 res = p.apply_async(unpickleable_result, error_callback=errback)
1773 self.assertRaises(MaybeEncodingError, res.get)
1774 wrapped = scratchpad[0]
1775 self.assertTrue(wrapped)
1776 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1777 self.assertIsNotNone(wrapped.exc)
1778 self.assertIsNotNone(wrapped.value)
1779
1780 p.close()
1781 p.join()
1782
1783class _TestPoolWorkerLifetime(BaseTestCase):
1784 ALLOWED_TYPES = ('processes', )
1785
Jesse Noller1f0b6582010-01-27 03:36:01 +00001786 def test_pool_worker_lifetime(self):
1787 p = multiprocessing.Pool(3, maxtasksperchild=10)
1788 self.assertEqual(3, len(p._pool))
1789 origworkerpids = [w.pid for w in p._pool]
1790 # Run many tasks so each worker gets replaced (hopefully)
1791 results = []
1792 for i in range(100):
1793 results.append(p.apply_async(sqr, (i, )))
1794 # Fetch the results and verify we got the right answers,
1795 # also ensuring all the tasks have completed.
1796 for (j, res) in enumerate(results):
1797 self.assertEqual(res.get(), sqr(j))
1798 # Refill the pool
1799 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001800 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001801 # (countdown * DELTA = 5 seconds max startup process time)
1802 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001803 while countdown and not all(w.is_alive() for w in p._pool):
1804 countdown -= 1
1805 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001806 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001807 # All pids should be assigned. See issue #7805.
1808 self.assertNotIn(None, origworkerpids)
1809 self.assertNotIn(None, finalworkerpids)
1810 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001811 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1812 p.close()
1813 p.join()
1814
Charles-François Natalif8859e12011-10-24 18:45:29 +02001815 def test_pool_worker_lifetime_early_close(self):
1816 # Issue #10332: closing a pool whose workers have limited lifetimes
1817 # before all the tasks completed would make join() hang.
1818 p = multiprocessing.Pool(3, maxtasksperchild=1)
1819 results = []
1820 for i in range(6):
1821 results.append(p.apply_async(sqr, (i, 0.3)))
1822 p.close()
1823 p.join()
1824 # check the results
1825 for (j, res) in enumerate(results):
1826 self.assertEqual(res.get(), sqr(j))
1827
Benjamin Petersone711caf2008-06-11 16:44:04 +00001828#
1829# Test of creating a customized manager class
1830#
1831
1832from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1833
1834class FooBar(object):
1835 def f(self):
1836 return 'f()'
1837 def g(self):
1838 raise ValueError
1839 def _h(self):
1840 return '_h()'
1841
1842def baz():
1843 for i in range(10):
1844 yield i*i
1845
1846class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001847 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001848 def __iter__(self):
1849 return self
1850 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001851 return self._callmethod('__next__')
1852
1853class MyManager(BaseManager):
1854 pass
1855
1856MyManager.register('Foo', callable=FooBar)
1857MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1858MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1859
1860
1861class _TestMyManager(BaseTestCase):
1862
1863 ALLOWED_TYPES = ('manager',)
1864
1865 def test_mymanager(self):
1866 manager = MyManager()
1867 manager.start()
Richard Oudkerkac385712012-06-18 21:29:30 +01001868 self.common(manager)
1869 manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001870
Richard Oudkerkac385712012-06-18 21:29:30 +01001871 # If the manager process exited cleanly then the exitcode
1872 # will be zero. Otherwise (after a short timeout)
1873 # terminate() is used, resulting in an exitcode of -SIGTERM.
1874 self.assertEqual(manager._process.exitcode, 0)
1875
1876 def test_mymanager_context(self):
1877 with MyManager() as manager:
1878 self.common(manager)
1879 self.assertEqual(manager._process.exitcode, 0)
1880
1881 def test_mymanager_context_prestarted(self):
1882 manager = MyManager()
1883 manager.start()
1884 with manager:
1885 self.common(manager)
1886 self.assertEqual(manager._process.exitcode, 0)
1887
1888 def common(self, manager):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001889 foo = manager.Foo()
1890 bar = manager.Bar()
1891 baz = manager.baz()
1892
1893 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1894 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1895
1896 self.assertEqual(foo_methods, ['f', 'g'])
1897 self.assertEqual(bar_methods, ['f', '_h'])
1898
1899 self.assertEqual(foo.f(), 'f()')
1900 self.assertRaises(ValueError, foo.g)
1901 self.assertEqual(foo._callmethod('f'), 'f()')
1902 self.assertRaises(RemoteError, foo._callmethod, '_h')
1903
1904 self.assertEqual(bar.f(), 'f()')
1905 self.assertEqual(bar._h(), '_h()')
1906 self.assertEqual(bar._callmethod('f'), 'f()')
1907 self.assertEqual(bar._callmethod('_h'), '_h()')
1908
1909 self.assertEqual(list(baz), [i*i for i in range(10)])
1910
Richard Oudkerk73d9a292012-06-14 15:30:10 +01001911
Benjamin Petersone711caf2008-06-11 16:44:04 +00001912#
1913# Test of connecting to a remote server and using xmlrpclib for serialization
1914#
1915
1916_queue = pyqueue.Queue()
1917def get_queue():
1918 return _queue
1919
1920class QueueManager(BaseManager):
1921 '''manager class used by server process'''
1922QueueManager.register('get_queue', callable=get_queue)
1923
1924class QueueManager2(BaseManager):
1925 '''manager class which specifies the same interface as QueueManager'''
1926QueueManager2.register('get_queue')
1927
1928
1929SERIALIZER = 'xmlrpclib'
1930
1931class _TestRemoteManager(BaseTestCase):
1932
1933 ALLOWED_TYPES = ('manager',)
1934
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001935 @classmethod
1936 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001937 manager = QueueManager2(
1938 address=address, authkey=authkey, serializer=SERIALIZER
1939 )
1940 manager.connect()
1941 queue = manager.get_queue()
1942 queue.put(('hello world', None, True, 2.25))
1943
1944 def test_remote(self):
1945 authkey = os.urandom(32)
1946
1947 manager = QueueManager(
1948 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1949 )
1950 manager.start()
1951
1952 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001953 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001954 p.start()
1955
1956 manager2 = QueueManager2(
1957 address=manager.address, authkey=authkey, serializer=SERIALIZER
1958 )
1959 manager2.connect()
1960 queue = manager2.get_queue()
1961
1962 # Note that xmlrpclib will deserialize object as a list not a tuple
1963 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1964
1965 # Because we are using xmlrpclib for serialization instead of
1966 # pickle this will cause a serialization error.
1967 self.assertRaises(Exception, queue.put, time.sleep)
1968
1969 # Make queue finalizer run before the server is stopped
1970 del queue
1971 manager.shutdown()
1972
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001973class _TestManagerRestart(BaseTestCase):
1974
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001975 @classmethod
1976 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001977 manager = QueueManager(
1978 address=address, authkey=authkey, serializer=SERIALIZER)
1979 manager.connect()
1980 queue = manager.get_queue()
1981 queue.put('hello world')
1982
1983 def test_rapid_restart(self):
1984 authkey = os.urandom(32)
1985 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001986 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001987 srvr = manager.get_server()
1988 addr = srvr.address
1989 # Close the connection.Listener socket which gets opened as a part
1990 # of manager.get_server(). It's not needed for the test.
1991 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001992 manager.start()
1993
1994 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001995 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001996 p.start()
1997 queue = manager.get_queue()
1998 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001999 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002000 manager.shutdown()
2001 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00002002 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02002003 try:
2004 manager.start()
2005 except IOError as e:
2006 if e.errno != errno.EADDRINUSE:
2007 raise
2008 # Retry after some time, in case the old socket was lingering
2009 # (sporadic failure on buildbots)
2010 time.sleep(1.0)
2011 manager = QueueManager(
2012 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00002013 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00002014
Benjamin Petersone711caf2008-06-11 16:44:04 +00002015#
2016#
2017#
2018
2019SENTINEL = latin('')
2020
2021class _TestConnection(BaseTestCase):
2022
2023 ALLOWED_TYPES = ('processes', 'threads')
2024
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002025 @classmethod
2026 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002027 for msg in iter(conn.recv_bytes, SENTINEL):
2028 conn.send_bytes(msg)
2029 conn.close()
2030
2031 def test_connection(self):
2032 conn, child_conn = self.Pipe()
2033
2034 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002035 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002036 p.start()
2037
2038 seq = [1, 2.25, None]
2039 msg = latin('hello world')
2040 longmsg = msg * 10
2041 arr = array.array('i', list(range(4)))
2042
2043 if self.TYPE == 'processes':
2044 self.assertEqual(type(conn.fileno()), int)
2045
2046 self.assertEqual(conn.send(seq), None)
2047 self.assertEqual(conn.recv(), seq)
2048
2049 self.assertEqual(conn.send_bytes(msg), None)
2050 self.assertEqual(conn.recv_bytes(), msg)
2051
2052 if self.TYPE == 'processes':
2053 buffer = array.array('i', [0]*10)
2054 expected = list(arr) + [0] * (10 - len(arr))
2055 self.assertEqual(conn.send_bytes(arr), None)
2056 self.assertEqual(conn.recv_bytes_into(buffer),
2057 len(arr) * buffer.itemsize)
2058 self.assertEqual(list(buffer), expected)
2059
2060 buffer = array.array('i', [0]*10)
2061 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2062 self.assertEqual(conn.send_bytes(arr), None)
2063 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2064 len(arr) * buffer.itemsize)
2065 self.assertEqual(list(buffer), expected)
2066
2067 buffer = bytearray(latin(' ' * 40))
2068 self.assertEqual(conn.send_bytes(longmsg), None)
2069 try:
2070 res = conn.recv_bytes_into(buffer)
2071 except multiprocessing.BufferTooShort as e:
2072 self.assertEqual(e.args, (longmsg,))
2073 else:
2074 self.fail('expected BufferTooShort, got %s' % res)
2075
2076 poll = TimingWrapper(conn.poll)
2077
2078 self.assertEqual(poll(), False)
2079 self.assertTimingAlmostEqual(poll.elapsed, 0)
2080
Richard Oudkerk59d54042012-05-10 16:11:12 +01002081 self.assertEqual(poll(-1), False)
2082 self.assertTimingAlmostEqual(poll.elapsed, 0)
2083
Benjamin Petersone711caf2008-06-11 16:44:04 +00002084 self.assertEqual(poll(TIMEOUT1), False)
2085 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2086
2087 conn.send(None)
2088
2089 self.assertEqual(poll(TIMEOUT1), True)
2090 self.assertTimingAlmostEqual(poll.elapsed, 0)
2091
2092 self.assertEqual(conn.recv(), None)
2093
2094 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
2095 conn.send_bytes(really_big_msg)
2096 self.assertEqual(conn.recv_bytes(), really_big_msg)
2097
2098 conn.send_bytes(SENTINEL) # tell child to quit
2099 child_conn.close()
2100
2101 if self.TYPE == 'processes':
2102 self.assertEqual(conn.readable, True)
2103 self.assertEqual(conn.writable, True)
2104 self.assertRaises(EOFError, conn.recv)
2105 self.assertRaises(EOFError, conn.recv_bytes)
2106
2107 p.join()
2108
2109 def test_duplex_false(self):
2110 reader, writer = self.Pipe(duplex=False)
2111 self.assertEqual(writer.send(1), None)
2112 self.assertEqual(reader.recv(), 1)
2113 if self.TYPE == 'processes':
2114 self.assertEqual(reader.readable, True)
2115 self.assertEqual(reader.writable, False)
2116 self.assertEqual(writer.readable, False)
2117 self.assertEqual(writer.writable, True)
2118 self.assertRaises(IOError, reader.send, 2)
2119 self.assertRaises(IOError, writer.recv)
2120 self.assertRaises(IOError, writer.poll)
2121
2122 def test_spawn_close(self):
2123 # We test that a pipe connection can be closed by parent
2124 # process immediately after child is spawned. On Windows this
2125 # would have sometimes failed on old versions because
2126 # child_conn would be closed before the child got a chance to
2127 # duplicate it.
2128 conn, child_conn = self.Pipe()
2129
2130 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002131 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002132 p.start()
2133 child_conn.close() # this might complete before child initializes
2134
2135 msg = latin('hello')
2136 conn.send_bytes(msg)
2137 self.assertEqual(conn.recv_bytes(), msg)
2138
2139 conn.send_bytes(SENTINEL)
2140 conn.close()
2141 p.join()
2142
2143 def test_sendbytes(self):
2144 if self.TYPE != 'processes':
2145 return
2146
2147 msg = latin('abcdefghijklmnopqrstuvwxyz')
2148 a, b = self.Pipe()
2149
2150 a.send_bytes(msg)
2151 self.assertEqual(b.recv_bytes(), msg)
2152
2153 a.send_bytes(msg, 5)
2154 self.assertEqual(b.recv_bytes(), msg[5:])
2155
2156 a.send_bytes(msg, 7, 8)
2157 self.assertEqual(b.recv_bytes(), msg[7:7+8])
2158
2159 a.send_bytes(msg, 26)
2160 self.assertEqual(b.recv_bytes(), latin(''))
2161
2162 a.send_bytes(msg, 26, 0)
2163 self.assertEqual(b.recv_bytes(), latin(''))
2164
2165 self.assertRaises(ValueError, a.send_bytes, msg, 27)
2166
2167 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2168
2169 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2170
2171 self.assertRaises(ValueError, a.send_bytes, msg, -1)
2172
2173 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2174
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002175 @classmethod
2176 def _is_fd_assigned(cls, fd):
2177 try:
2178 os.fstat(fd)
2179 except OSError as e:
2180 if e.errno == errno.EBADF:
2181 return False
2182 raise
2183 else:
2184 return True
2185
2186 @classmethod
2187 def _writefd(cls, conn, data, create_dummy_fds=False):
2188 if create_dummy_fds:
2189 for i in range(0, 256):
2190 if not cls._is_fd_assigned(i):
2191 os.dup2(conn.fileno(), i)
2192 fd = reduction.recv_handle(conn)
2193 if msvcrt:
2194 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2195 os.write(fd, data)
2196 os.close(fd)
2197
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002198 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002199 def test_fd_transfer(self):
2200 if self.TYPE != 'processes':
2201 self.skipTest("only makes sense with processes")
2202 conn, child_conn = self.Pipe(duplex=True)
2203
2204 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02002205 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002206 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002207 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002208 with open(test.support.TESTFN, "wb") as f:
2209 fd = f.fileno()
2210 if msvcrt:
2211 fd = msvcrt.get_osfhandle(fd)
2212 reduction.send_handle(conn, fd, p.pid)
2213 p.join()
2214 with open(test.support.TESTFN, "rb") as f:
2215 self.assertEqual(f.read(), b"foo")
2216
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002217 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002218 @unittest.skipIf(sys.platform == "win32",
2219 "test semantics don't make sense on Windows")
2220 @unittest.skipIf(MAXFD <= 256,
2221 "largest assignable fd number is too small")
2222 @unittest.skipUnless(hasattr(os, "dup2"),
2223 "test needs os.dup2()")
2224 def test_large_fd_transfer(self):
2225 # With fd > 256 (issue #11657)
2226 if self.TYPE != 'processes':
2227 self.skipTest("only makes sense with processes")
2228 conn, child_conn = self.Pipe(duplex=True)
2229
2230 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02002231 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002232 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02002233 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002234 with open(test.support.TESTFN, "wb") as f:
2235 fd = f.fileno()
2236 for newfd in range(256, MAXFD):
2237 if not self._is_fd_assigned(newfd):
2238 break
2239 else:
2240 self.fail("could not find an unassigned large file descriptor")
2241 os.dup2(fd, newfd)
2242 try:
2243 reduction.send_handle(conn, newfd, p.pid)
2244 finally:
2245 os.close(newfd)
2246 p.join()
2247 with open(test.support.TESTFN, "rb") as f:
2248 self.assertEqual(f.read(), b"bar")
2249
Jesus Cea4507e642011-09-21 03:53:25 +02002250 @classmethod
2251 def _send_data_without_fd(self, conn):
2252 os.write(conn.fileno(), b"\0")
2253
Charles-François Natalie51c8da2011-09-21 18:48:21 +02002254 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02002255 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2256 def test_missing_fd_transfer(self):
2257 # Check that exception is raised when received data is not
2258 # accompanied by a file descriptor in ancillary data.
2259 if self.TYPE != 'processes':
2260 self.skipTest("only makes sense with processes")
2261 conn, child_conn = self.Pipe(duplex=True)
2262
2263 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2264 p.daemon = True
2265 p.start()
2266 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2267 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02002268
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002269 def test_context(self):
2270 a, b = self.Pipe()
2271
2272 with a, b:
2273 a.send(1729)
2274 self.assertEqual(b.recv(), 1729)
2275 if self.TYPE == 'processes':
2276 self.assertFalse(a.closed)
2277 self.assertFalse(b.closed)
2278
2279 if self.TYPE == 'processes':
2280 self.assertTrue(a.closed)
2281 self.assertTrue(b.closed)
2282 self.assertRaises(IOError, a.recv)
2283 self.assertRaises(IOError, b.recv)
2284
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002285class _TestListener(BaseTestCase):
2286
Richard Oudkerk91257752012-06-15 21:53:34 +01002287 ALLOWED_TYPES = ('processes',)
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002288
2289 def test_multiple_bind(self):
2290 for family in self.connection.families:
2291 l = self.connection.Listener(family=family)
2292 self.addCleanup(l.close)
2293 self.assertRaises(OSError, self.connection.Listener,
2294 l.address, family)
2295
Richard Oudkerkd69cfe82012-06-18 17:47:52 +01002296 def test_context(self):
2297 with self.connection.Listener() as l:
2298 with self.connection.Client(l.address) as c:
2299 with l.accept() as d:
2300 c.send(1729)
2301 self.assertEqual(d.recv(), 1729)
2302
2303 if self.TYPE == 'processes':
2304 self.assertRaises(IOError, l.accept)
2305
Benjamin Petersone711caf2008-06-11 16:44:04 +00002306class _TestListenerClient(BaseTestCase):
2307
2308 ALLOWED_TYPES = ('processes', 'threads')
2309
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002310 @classmethod
2311 def _test(cls, address):
2312 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002313 conn.send('hello')
2314 conn.close()
2315
2316 def test_listener_client(self):
2317 for family in self.connection.families:
2318 l = self.connection.Listener(family=family)
2319 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00002320 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002321 p.start()
2322 conn = l.accept()
2323 self.assertEqual(conn.recv(), 'hello')
2324 p.join()
2325 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01002326
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01002327 def test_issue14725(self):
2328 l = self.connection.Listener()
2329 p = self.Process(target=self._test, args=(l.address,))
2330 p.daemon = True
2331 p.start()
2332 time.sleep(1)
2333 # On Windows the client process should by now have connected,
2334 # written data and closed the pipe handle by now. This causes
2335 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
2336 # 14725.
2337 conn = l.accept()
2338 self.assertEqual(conn.recv(), 'hello')
2339 conn.close()
2340 p.join()
2341 l.close()
2342
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002343class _TestPoll(unittest.TestCase):
2344
2345 ALLOWED_TYPES = ('processes', 'threads')
2346
2347 def test_empty_string(self):
2348 a, b = self.Pipe()
2349 self.assertEqual(a.poll(), False)
2350 b.send_bytes(b'')
2351 self.assertEqual(a.poll(), True)
2352 self.assertEqual(a.poll(), True)
2353
2354 @classmethod
2355 def _child_strings(cls, conn, strings):
2356 for s in strings:
2357 time.sleep(0.1)
2358 conn.send_bytes(s)
2359 conn.close()
2360
2361 def test_strings(self):
2362 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2363 a, b = self.Pipe()
2364 p = self.Process(target=self._child_strings, args=(b, strings))
2365 p.start()
2366
2367 for s in strings:
2368 for i in range(200):
2369 if a.poll(0.01):
2370 break
2371 x = a.recv_bytes()
2372 self.assertEqual(s, x)
2373
2374 p.join()
2375
2376 @classmethod
2377 def _child_boundaries(cls, r):
2378 # Polling may "pull" a message in to the child process, but we
2379 # don't want it to pull only part of a message, as that would
2380 # corrupt the pipe for any other processes which might later
2381 # read from it.
2382 r.poll(5)
2383
2384 def test_boundaries(self):
2385 r, w = self.Pipe(False)
2386 p = self.Process(target=self._child_boundaries, args=(r,))
2387 p.start()
2388 time.sleep(2)
2389 L = [b"first", b"second"]
2390 for obj in L:
2391 w.send_bytes(obj)
2392 w.close()
2393 p.join()
2394 self.assertIn(r.recv_bytes(), L)
2395
2396 @classmethod
2397 def _child_dont_merge(cls, b):
2398 b.send_bytes(b'a')
2399 b.send_bytes(b'b')
2400 b.send_bytes(b'cd')
2401
2402 def test_dont_merge(self):
2403 a, b = self.Pipe()
2404 self.assertEqual(a.poll(0.0), False)
2405 self.assertEqual(a.poll(0.1), False)
2406
2407 p = self.Process(target=self._child_dont_merge, args=(b,))
2408 p.start()
2409
2410 self.assertEqual(a.recv_bytes(), b'a')
2411 self.assertEqual(a.poll(1.0), True)
2412 self.assertEqual(a.poll(1.0), True)
2413 self.assertEqual(a.recv_bytes(), b'b')
2414 self.assertEqual(a.poll(1.0), True)
2415 self.assertEqual(a.poll(1.0), True)
2416 self.assertEqual(a.poll(0.0), True)
2417 self.assertEqual(a.recv_bytes(), b'cd')
2418
2419 p.join()
2420
Benjamin Petersone711caf2008-06-11 16:44:04 +00002421#
2422# Test of sending connection and socket objects between processes
2423#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002424
2425@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00002426class _TestPicklingConnections(BaseTestCase):
2427
2428 ALLOWED_TYPES = ('processes',)
2429
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002430 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02002431 def tearDownClass(cls):
2432 from multiprocessing.reduction import resource_sharer
2433 resource_sharer.stop(timeout=5)
2434
2435 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002436 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002437 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002438 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002439 conn.send(l.address)
2440 new_conn = l.accept()
2441 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002442 new_conn.close()
2443 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002444
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002445 l = socket.socket()
2446 l.bind(('localhost', 0))
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002447 l.listen(1)
Richard Oudkerk5d73c172012-05-08 22:24:47 +01002448 conn.send(l.getsockname())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002449 new_conn, addr = l.accept()
2450 conn.send(new_conn)
2451 new_conn.close()
2452 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002453
2454 conn.recv()
2455
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002456 @classmethod
2457 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002458 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002459 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002460 client.send(msg.upper())
2461 client.close()
2462
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002463 address, msg = conn.recv()
2464 client = socket.socket()
2465 client.connect(address)
2466 client.sendall(msg.upper())
2467 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002468
2469 conn.close()
2470
2471 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002472 families = self.connection.families
2473
2474 lconn, lconn0 = self.Pipe()
2475 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002476 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002477 lp.start()
2478 lconn0.close()
2479
2480 rconn, rconn0 = self.Pipe()
2481 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002482 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002483 rp.start()
2484 rconn0.close()
2485
2486 for fam in families:
2487 msg = ('This connection uses family %s' % fam).encode('ascii')
2488 address = lconn.recv()
2489 rconn.send((address, msg))
2490 new_conn = lconn.recv()
2491 self.assertEqual(new_conn.recv(), msg.upper())
2492
2493 rconn.send(None)
2494
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002495 msg = latin('This connection uses a normal socket')
2496 address = lconn.recv()
2497 rconn.send((address, msg))
2498 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002499 buf = []
2500 while True:
2501 s = new_conn.recv(100)
2502 if not s:
2503 break
2504 buf.append(s)
2505 buf = b''.join(buf)
2506 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002507 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002508
2509 lconn.send(None)
2510
2511 rconn.close()
2512 lconn.close()
2513
2514 lp.join()
2515 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002516
2517 @classmethod
2518 def child_access(cls, conn):
2519 w = conn.recv()
2520 w.send('all is well')
2521 w.close()
2522
2523 r = conn.recv()
2524 msg = r.recv()
2525 conn.send(msg*2)
2526
2527 conn.close()
2528
2529 def test_access(self):
2530 # On Windows, if we do not specify a destination pid when
2531 # using DupHandle then we need to be careful to use the
2532 # correct access flags for DuplicateHandle(), or else
2533 # DupHandle.detach() will raise PermissionError. For example,
2534 # for a read only pipe handle we should use
2535 # access=FILE_GENERIC_READ. (Unfortunately
2536 # DUPLICATE_SAME_ACCESS does not work.)
2537 conn, child_conn = self.Pipe()
2538 p = self.Process(target=self.child_access, args=(child_conn,))
2539 p.daemon = True
2540 p.start()
2541 child_conn.close()
2542
2543 r, w = self.Pipe(duplex=False)
2544 conn.send(w)
2545 w.close()
2546 self.assertEqual(r.recv(), 'all is well')
2547 r.close()
2548
2549 r, w = self.Pipe(duplex=False)
2550 conn.send(r)
2551 r.close()
2552 w.send('foobar')
2553 w.close()
2554 self.assertEqual(conn.recv(), 'foobar'*2)
2555
Benjamin Petersone711caf2008-06-11 16:44:04 +00002556#
2557#
2558#
2559
2560class _TestHeap(BaseTestCase):
2561
2562 ALLOWED_TYPES = ('processes',)
2563
2564 def test_heap(self):
2565 iterations = 5000
2566 maxblocks = 50
2567 blocks = []
2568
2569 # create and destroy lots of blocks of different sizes
2570 for i in range(iterations):
2571 size = int(random.lognormvariate(0, 1) * 1000)
2572 b = multiprocessing.heap.BufferWrapper(size)
2573 blocks.append(b)
2574 if len(blocks) > maxblocks:
2575 i = random.randrange(maxblocks)
2576 del blocks[i]
2577
2578 # get the heap object
2579 heap = multiprocessing.heap.BufferWrapper._heap
2580
2581 # verify the state of the heap
2582 all = []
2583 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002584 heap._lock.acquire()
2585 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002586 for L in list(heap._len_to_seq.values()):
2587 for arena, start, stop in L:
2588 all.append((heap._arenas.index(arena), start, stop,
2589 stop-start, 'free'))
2590 for arena, start, stop in heap._allocated_blocks:
2591 all.append((heap._arenas.index(arena), start, stop,
2592 stop-start, 'occupied'))
2593 occupied += (stop-start)
2594
2595 all.sort()
2596
2597 for i in range(len(all)-1):
2598 (arena, start, stop) = all[i][:3]
2599 (narena, nstart, nstop) = all[i+1][:3]
2600 self.assertTrue((arena != narena and nstart == 0) or
2601 (stop == nstart))
2602
Charles-François Natali778db492011-07-02 14:35:49 +02002603 def test_free_from_gc(self):
2604 # Check that freeing of blocks by the garbage collector doesn't deadlock
2605 # (issue #12352).
2606 # Make sure the GC is enabled, and set lower collection thresholds to
2607 # make collections more frequent (and increase the probability of
2608 # deadlock).
2609 if not gc.isenabled():
2610 gc.enable()
2611 self.addCleanup(gc.disable)
2612 thresholds = gc.get_threshold()
2613 self.addCleanup(gc.set_threshold, *thresholds)
2614 gc.set_threshold(10)
2615
2616 # perform numerous block allocations, with cyclic references to make
2617 # sure objects are collected asynchronously by the gc
2618 for i in range(5000):
2619 a = multiprocessing.heap.BufferWrapper(1)
2620 b = multiprocessing.heap.BufferWrapper(1)
2621 # circular references
2622 a.buddy = b
2623 b.buddy = a
2624
Benjamin Petersone711caf2008-06-11 16:44:04 +00002625#
2626#
2627#
2628
Benjamin Petersone711caf2008-06-11 16:44:04 +00002629class _Foo(Structure):
2630 _fields_ = [
2631 ('x', c_int),
2632 ('y', c_double)
2633 ]
2634
2635class _TestSharedCTypes(BaseTestCase):
2636
2637 ALLOWED_TYPES = ('processes',)
2638
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002639 def setUp(self):
2640 if not HAS_SHAREDCTYPES:
2641 self.skipTest("requires multiprocessing.sharedctypes")
2642
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002643 @classmethod
2644 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002645 x.value *= 2
2646 y.value *= 2
2647 foo.x *= 2
2648 foo.y *= 2
2649 string.value *= 2
2650 for i in range(len(arr)):
2651 arr[i] *= 2
2652
2653 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002654 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002655 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002656 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002657 arr = self.Array('d', list(range(10)), lock=lock)
2658 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002659 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002660
2661 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002662 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002663 p.start()
2664 p.join()
2665
2666 self.assertEqual(x.value, 14)
2667 self.assertAlmostEqual(y.value, 2.0/3.0)
2668 self.assertEqual(foo.x, 6)
2669 self.assertAlmostEqual(foo.y, 4.0)
2670 for i in range(10):
2671 self.assertAlmostEqual(arr[i], i*2)
2672 self.assertEqual(string.value, latin('hellohello'))
2673
2674 def test_synchronize(self):
2675 self.test_sharedctypes(lock=True)
2676
2677 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002678 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002679 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002680 foo.x = 0
2681 foo.y = 0
2682 self.assertEqual(bar.x, 2)
2683 self.assertAlmostEqual(bar.y, 5.0)
2684
2685#
2686#
2687#
2688
2689class _TestFinalize(BaseTestCase):
2690
2691 ALLOWED_TYPES = ('processes',)
2692
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002693 @classmethod
2694 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002695 class Foo(object):
2696 pass
2697
2698 a = Foo()
2699 util.Finalize(a, conn.send, args=('a',))
2700 del a # triggers callback for a
2701
2702 b = Foo()
2703 close_b = util.Finalize(b, conn.send, args=('b',))
2704 close_b() # triggers callback for b
2705 close_b() # does nothing because callback has already been called
2706 del b # does nothing because callback has already been called
2707
2708 c = Foo()
2709 util.Finalize(c, conn.send, args=('c',))
2710
2711 d10 = Foo()
2712 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2713
2714 d01 = Foo()
2715 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2716 d02 = Foo()
2717 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2718 d03 = Foo()
2719 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2720
2721 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2722
2723 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2724
Ezio Melotti13925002011-03-16 11:05:33 +02002725 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002726 # garbage collecting locals
2727 util._exit_function()
2728 conn.close()
2729 os._exit(0)
2730
2731 def test_finalize(self):
2732 conn, child_conn = self.Pipe()
2733
2734 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002735 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002736 p.start()
2737 p.join()
2738
2739 result = [obj for obj in iter(conn.recv, 'STOP')]
2740 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2741
2742#
2743# Test that from ... import * works for each module
2744#
2745
2746class _TestImportStar(BaseTestCase):
2747
2748 ALLOWED_TYPES = ('processes',)
2749
2750 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002751 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002752 'multiprocessing', 'multiprocessing.connection',
2753 'multiprocessing.heap', 'multiprocessing.managers',
2754 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002755 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002756 ]
2757
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002758 if HAS_REDUCTION:
2759 modules.append('multiprocessing.reduction')
2760
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002761 if c_int is not None:
2762 # This module requires _ctypes
2763 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002764
2765 for name in modules:
2766 __import__(name)
2767 mod = sys.modules[name]
2768
2769 for attr in getattr(mod, '__all__', ()):
2770 self.assertTrue(
2771 hasattr(mod, attr),
2772 '%r does not have attribute %r' % (mod, attr)
2773 )
2774
2775#
2776# Quick test that logging works -- does not test logging output
2777#
2778
2779class _TestLogging(BaseTestCase):
2780
2781 ALLOWED_TYPES = ('processes',)
2782
2783 def test_enable_logging(self):
2784 logger = multiprocessing.get_logger()
2785 logger.setLevel(util.SUBWARNING)
2786 self.assertTrue(logger is not None)
2787 logger.debug('this will not be printed')
2788 logger.info('nor will this')
2789 logger.setLevel(LOG_LEVEL)
2790
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002791 @classmethod
2792 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002793 logger = multiprocessing.get_logger()
2794 conn.send(logger.getEffectiveLevel())
2795
2796 def test_level(self):
2797 LEVEL1 = 32
2798 LEVEL2 = 37
2799
2800 logger = multiprocessing.get_logger()
2801 root_logger = logging.getLogger()
2802 root_level = root_logger.level
2803
2804 reader, writer = multiprocessing.Pipe(duplex=False)
2805
2806 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002807 p = self.Process(target=self._test_level, args=(writer,))
2808 p.daemon = True
2809 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002810 self.assertEqual(LEVEL1, reader.recv())
2811
2812 logger.setLevel(logging.NOTSET)
2813 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002814 p = self.Process(target=self._test_level, args=(writer,))
2815 p.daemon = True
2816 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002817 self.assertEqual(LEVEL2, reader.recv())
2818
2819 root_logger.setLevel(root_level)
2820 logger.setLevel(level=LOG_LEVEL)
2821
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002822
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002823# class _TestLoggingProcessName(BaseTestCase):
2824#
2825# def handle(self, record):
2826# assert record.processName == multiprocessing.current_process().name
2827# self.__handled = True
2828#
2829# def test_logging(self):
2830# handler = logging.Handler()
2831# handler.handle = self.handle
2832# self.__handled = False
2833# # Bypass getLogger() and side-effects
2834# logger = logging.getLoggerClass()(
2835# 'multiprocessing.test.TestLoggingProcessName')
2836# logger.addHandler(handler)
2837# logger.propagate = False
2838#
2839# logger.warn('foo')
2840# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002841
Benjamin Petersone711caf2008-06-11 16:44:04 +00002842#
Jesse Noller6214edd2009-01-19 16:23:53 +00002843# Test to verify handle verification, see issue 3321
2844#
2845
2846class TestInvalidHandle(unittest.TestCase):
2847
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002848 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002849 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002850 conn = multiprocessing.connection.Connection(44977608)
2851 try:
2852 self.assertRaises((ValueError, IOError), conn.poll)
2853 finally:
2854 # Hack private attribute _handle to avoid printing an error
2855 # in conn.__del__
2856 conn._handle = None
2857 self.assertRaises((ValueError, IOError),
2858 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002859
Jesse Noller6214edd2009-01-19 16:23:53 +00002860#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002861# Functions used to create test cases from the base ones in this module
2862#
2863
Benjamin Petersone711caf2008-06-11 16:44:04 +00002864def create_test_cases(Mixin, type):
2865 result = {}
2866 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002867 Type = type.capitalize()
Richard Oudkerk91257752012-06-15 21:53:34 +01002868 ALL_TYPES = {'processes', 'threads', 'manager'}
Benjamin Petersone711caf2008-06-11 16:44:04 +00002869
2870 for name in list(glob.keys()):
2871 if name.startswith('_Test'):
2872 base = glob[name]
Richard Oudkerk91257752012-06-15 21:53:34 +01002873 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002874 if type in base.ALLOWED_TYPES:
2875 newname = 'With' + Type + name[1:]
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002876 class Temp(base, Mixin, unittest.TestCase):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002877 pass
2878 result[newname] = Temp
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002879 Temp.__name__ = Temp.__qualname__ = newname
Benjamin Petersone711caf2008-06-11 16:44:04 +00002880 Temp.__module__ = Mixin.__module__
2881 return result
2882
2883#
2884# Create test cases
2885#
2886
2887class ProcessesMixin(object):
2888 TYPE = 'processes'
2889 Process = multiprocessing.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002890 connection = multiprocessing.connection
2891 current_process = staticmethod(multiprocessing.current_process)
2892 active_children = staticmethod(multiprocessing.active_children)
2893 Pool = staticmethod(multiprocessing.Pool)
2894 Pipe = staticmethod(multiprocessing.Pipe)
2895 Queue = staticmethod(multiprocessing.Queue)
2896 JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2897 Lock = staticmethod(multiprocessing.Lock)
2898 RLock = staticmethod(multiprocessing.RLock)
2899 Semaphore = staticmethod(multiprocessing.Semaphore)
2900 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2901 Condition = staticmethod(multiprocessing.Condition)
2902 Event = staticmethod(multiprocessing.Event)
2903 Barrier = staticmethod(multiprocessing.Barrier)
2904 Value = staticmethod(multiprocessing.Value)
2905 Array = staticmethod(multiprocessing.Array)
2906 RawValue = staticmethod(multiprocessing.RawValue)
2907 RawArray = staticmethod(multiprocessing.RawArray)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002908
2909testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2910globals().update(testcases_processes)
2911
2912
2913class ManagerMixin(object):
2914 TYPE = 'manager'
2915 Process = multiprocessing.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002916 Queue = property(operator.attrgetter('manager.Queue'))
2917 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
2918 Lock = property(operator.attrgetter('manager.Lock'))
2919 RLock = property(operator.attrgetter('manager.RLock'))
2920 Semaphore = property(operator.attrgetter('manager.Semaphore'))
2921 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
2922 Condition = property(operator.attrgetter('manager.Condition'))
2923 Event = property(operator.attrgetter('manager.Event'))
2924 Barrier = property(operator.attrgetter('manager.Barrier'))
2925 Value = property(operator.attrgetter('manager.Value'))
2926 Array = property(operator.attrgetter('manager.Array'))
2927 list = property(operator.attrgetter('manager.list'))
2928 dict = property(operator.attrgetter('manager.dict'))
2929 Namespace = property(operator.attrgetter('manager.Namespace'))
2930
2931 @classmethod
2932 def Pool(cls, *args, **kwds):
2933 return cls.manager.Pool(*args, **kwds)
2934
2935 @classmethod
2936 def setUpClass(cls):
2937 cls.manager = multiprocessing.Manager()
2938
2939 @classmethod
2940 def tearDownClass(cls):
2941 multiprocessing.active_children() # discard dead process objs
2942 gc.collect() # do garbage collection
2943 if cls.manager._number_of_objects() != 0:
2944 # This is not really an error since some tests do not
2945 # ensure that all processes which hold a reference to a
2946 # managed object have been joined.
2947 print('Shared objects which still exist at manager shutdown:')
2948 print(cls.manager._debug_info())
2949 cls.manager.shutdown()
2950 cls.manager.join()
2951 cls.manager = None
Benjamin Petersone711caf2008-06-11 16:44:04 +00002952
2953testcases_manager = create_test_cases(ManagerMixin, type='manager')
2954globals().update(testcases_manager)
2955
2956
2957class ThreadsMixin(object):
2958 TYPE = 'threads'
2959 Process = multiprocessing.dummy.Process
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002960 connection = multiprocessing.dummy.connection
2961 current_process = staticmethod(multiprocessing.dummy.current_process)
2962 active_children = staticmethod(multiprocessing.dummy.active_children)
2963 Pool = staticmethod(multiprocessing.Pool)
2964 Pipe = staticmethod(multiprocessing.dummy.Pipe)
2965 Queue = staticmethod(multiprocessing.dummy.Queue)
2966 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
2967 Lock = staticmethod(multiprocessing.dummy.Lock)
2968 RLock = staticmethod(multiprocessing.dummy.RLock)
2969 Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
2970 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
2971 Condition = staticmethod(multiprocessing.dummy.Condition)
2972 Event = staticmethod(multiprocessing.dummy.Event)
2973 Barrier = staticmethod(multiprocessing.dummy.Barrier)
2974 Value = staticmethod(multiprocessing.dummy.Value)
2975 Array = staticmethod(multiprocessing.dummy.Array)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002976
2977testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2978globals().update(testcases_threads)
2979
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01002980
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002981class OtherTest(unittest.TestCase):
2982 # TODO: add more tests for deliver/answer challenge.
2983 def test_deliver_challenge_auth_failure(self):
2984 class _FakeConnection(object):
2985 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002986 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002987 def send_bytes(self, data):
2988 pass
2989 self.assertRaises(multiprocessing.AuthenticationError,
2990 multiprocessing.connection.deliver_challenge,
2991 _FakeConnection(), b'abc')
2992
2993 def test_answer_challenge_auth_failure(self):
2994 class _FakeConnection(object):
2995 def __init__(self):
2996 self.count = 0
2997 def recv_bytes(self, size):
2998 self.count += 1
2999 if self.count == 1:
3000 return multiprocessing.connection.CHALLENGE
3001 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00003002 return b'something bogus'
3003 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003004 def send_bytes(self, data):
3005 pass
3006 self.assertRaises(multiprocessing.AuthenticationError,
3007 multiprocessing.connection.answer_challenge,
3008 _FakeConnection(), b'abc')
3009
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003010#
3011# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3012#
3013
3014def initializer(ns):
3015 ns.test += 1
3016
3017class TestInitializers(unittest.TestCase):
3018 def setUp(self):
3019 self.mgr = multiprocessing.Manager()
3020 self.ns = self.mgr.Namespace()
3021 self.ns.test = 0
3022
3023 def tearDown(self):
3024 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003025 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003026
3027 def test_manager_initializer(self):
3028 m = multiprocessing.managers.SyncManager()
3029 self.assertRaises(TypeError, m.start, 1)
3030 m.start(initializer, (self.ns,))
3031 self.assertEqual(self.ns.test, 1)
3032 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01003033 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00003034
3035 def test_pool_initializer(self):
3036 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3037 p = multiprocessing.Pool(1, initializer, (self.ns,))
3038 p.close()
3039 p.join()
3040 self.assertEqual(self.ns.test, 1)
3041
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003042#
3043# Issue 5155, 5313, 5331: Test process in processes
3044# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3045#
3046
3047def _ThisSubProcess(q):
3048 try:
3049 item = q.get(block=False)
3050 except pyqueue.Empty:
3051 pass
3052
3053def _TestProcess(q):
3054 queue = multiprocessing.Queue()
3055 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02003056 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003057 subProc.start()
3058 subProc.join()
3059
3060def _afunc(x):
3061 return x*x
3062
3063def pool_in_process():
3064 pool = multiprocessing.Pool(processes=4)
3065 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01003066 pool.close()
3067 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003068
3069class _file_like(object):
3070 def __init__(self, delegate):
3071 self._delegate = delegate
3072 self._pid = None
3073
3074 @property
3075 def cache(self):
3076 pid = os.getpid()
3077 # There are no race conditions since fork keeps only the running thread
3078 if pid != self._pid:
3079 self._pid = pid
3080 self._cache = []
3081 return self._cache
3082
3083 def write(self, data):
3084 self.cache.append(data)
3085
3086 def flush(self):
3087 self._delegate.write(''.join(self.cache))
3088 self._cache = []
3089
3090class TestStdinBadfiledescriptor(unittest.TestCase):
3091
3092 def test_queue_in_process(self):
3093 queue = multiprocessing.Queue()
3094 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
3095 proc.start()
3096 proc.join()
3097
3098 def test_pool_in_process(self):
3099 p = multiprocessing.Process(target=pool_in_process)
3100 p.start()
3101 p.join()
3102
3103 def test_flushing(self):
3104 sio = io.StringIO()
3105 flike = _file_like(sio)
3106 flike.write('foo')
3107 proc = multiprocessing.Process(target=lambda: flike.flush())
3108 flike.flush()
3109 assert sio.getvalue() == 'foo'
3110
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003111
3112class TestWait(unittest.TestCase):
3113
3114 @classmethod
3115 def _child_test_wait(cls, w, slow):
3116 for i in range(10):
3117 if slow:
3118 time.sleep(random.random()*0.1)
3119 w.send((i, os.getpid()))
3120 w.close()
3121
3122 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003123 from multiprocessing.connection import wait
3124 readers = []
3125 procs = []
3126 messages = []
3127
3128 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003129 r, w = multiprocessing.Pipe(duplex=False)
3130 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003131 p.daemon = True
3132 p.start()
3133 w.close()
3134 readers.append(r)
3135 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003136 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003137
3138 while readers:
3139 for r in wait(readers):
3140 try:
3141 msg = r.recv()
3142 except EOFError:
3143 readers.remove(r)
3144 r.close()
3145 else:
3146 messages.append(msg)
3147
3148 messages.sort()
3149 expected = sorted((i, p.pid) for i in range(10) for p in procs)
3150 self.assertEqual(messages, expected)
3151
3152 @classmethod
3153 def _child_test_wait_socket(cls, address, slow):
3154 s = socket.socket()
3155 s.connect(address)
3156 for i in range(10):
3157 if slow:
3158 time.sleep(random.random()*0.1)
3159 s.sendall(('%s\n' % i).encode('ascii'))
3160 s.close()
3161
3162 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003163 from multiprocessing.connection import wait
3164 l = socket.socket()
3165 l.bind(('', 0))
3166 l.listen(4)
3167 addr = ('localhost', l.getsockname()[1])
3168 readers = []
3169 procs = []
3170 dic = {}
3171
3172 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01003173 p = multiprocessing.Process(target=self._child_test_wait_socket,
3174 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003175 p.daemon = True
3176 p.start()
3177 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01003178 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003179
3180 for i in range(4):
3181 r, _ = l.accept()
3182 readers.append(r)
3183 dic[r] = []
3184 l.close()
3185
3186 while readers:
3187 for r in wait(readers):
3188 msg = r.recv(32)
3189 if not msg:
3190 readers.remove(r)
3191 r.close()
3192 else:
3193 dic[r].append(msg)
3194
3195 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3196 for v in dic.values():
3197 self.assertEqual(b''.join(v), expected)
3198
3199 def test_wait_slow(self):
3200 self.test_wait(True)
3201
3202 def test_wait_socket_slow(self):
Richard Oudkerk104b3f42012-05-08 16:08:07 +01003203 self.test_wait_socket(True)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003204
3205 def test_wait_timeout(self):
3206 from multiprocessing.connection import wait
3207
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003208 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003209 a, b = multiprocessing.Pipe()
3210
3211 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003212 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003213 delta = time.time() - start
3214
3215 self.assertEqual(res, [])
Richard Oudkerk6dbca362012-05-06 16:46:36 +01003216 self.assertLess(delta, expected * 2)
3217 self.assertGreater(delta, expected * 0.5)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003218
3219 b.send(None)
3220
3221 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003222 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003223 delta = time.time() - start
3224
3225 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01003226 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003227
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003228 @classmethod
3229 def signal_and_sleep(cls, sem, period):
3230 sem.release()
3231 time.sleep(period)
3232
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003233 def test_wait_integer(self):
3234 from multiprocessing.connection import wait
3235
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003236 expected = 3
3237 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003238 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003239 p = multiprocessing.Process(target=self.signal_and_sleep,
3240 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003241
3242 p.start()
3243 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003244 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003245
3246 start = time.time()
3247 res = wait([a, p.sentinel, b], expected + 20)
3248 delta = time.time() - start
3249
3250 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01003251 self.assertLess(delta, expected + 2)
3252 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003253
3254 a.send(None)
3255
3256 start = time.time()
3257 res = wait([a, p.sentinel, b], 20)
3258 delta = time.time() - start
3259
3260 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01003261 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003262
3263 b.send(None)
3264
3265 start = time.time()
3266 res = wait([a, p.sentinel, b], 20)
3267 delta = time.time() - start
3268
3269 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01003270 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003271
Richard Oudkerk009b15e2012-05-04 09:44:39 +01003272 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003273 p.join()
3274
Richard Oudkerk59d54042012-05-10 16:11:12 +01003275 def test_neg_timeout(self):
3276 from multiprocessing.connection import wait
3277 a, b = multiprocessing.Pipe()
3278 t = time.time()
3279 res = wait([a], timeout=-1)
3280 t = time.time() - t
3281 self.assertEqual(res, [])
3282 self.assertLess(t, 1)
3283 a.close()
3284 b.close()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01003285
Antoine Pitrou709176f2012-04-01 17:19:09 +02003286#
3287# Issue 14151: Test invalid family on invalid environment
3288#
3289
3290class TestInvalidFamily(unittest.TestCase):
3291
3292 @unittest.skipIf(WIN32, "skipped on Windows")
3293 def test_invalid_family(self):
3294 with self.assertRaises(ValueError):
3295 multiprocessing.connection.Listener(r'\\.\test')
3296
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02003297 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3298 def test_invalid_family_win32(self):
3299 with self.assertRaises(ValueError):
3300 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02003301
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003302#
3303# Issue 12098: check sys.flags of child matches that for parent
3304#
3305
3306class TestFlags(unittest.TestCase):
3307 @classmethod
3308 def run_in_grandchild(cls, conn):
3309 conn.send(tuple(sys.flags))
3310
3311 @classmethod
3312 def run_in_child(cls):
3313 import json
3314 r, w = multiprocessing.Pipe(duplex=False)
3315 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3316 p.start()
3317 grandchild_flags = r.recv()
3318 p.join()
3319 r.close()
3320 w.close()
3321 flags = (tuple(sys.flags), grandchild_flags)
3322 print(json.dumps(flags))
3323
3324 def test_flags(self):
3325 import json, subprocess
3326 # start child process using unusual flags
3327 prog = ('from test.test_multiprocessing import TestFlags; ' +
3328 'TestFlags.run_in_child()')
3329 data = subprocess.check_output(
3330 [sys.executable, '-E', '-S', '-O', '-c', prog])
3331 child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3332 self.assertEqual(child_flags, grandchild_flags)
3333
Richard Oudkerkb15e6222012-07-27 14:19:00 +01003334#
3335# Test interaction with socket timeouts - see Issue #6056
3336#
3337
3338class TestTimeouts(unittest.TestCase):
3339 @classmethod
3340 def _test_timeout(cls, child, address):
3341 time.sleep(1)
3342 child.send(123)
3343 child.close()
3344 conn = multiprocessing.connection.Client(address)
3345 conn.send(456)
3346 conn.close()
3347
3348 def test_timeout(self):
3349 old_timeout = socket.getdefaulttimeout()
3350 try:
3351 socket.setdefaulttimeout(0.1)
3352 parent, child = multiprocessing.Pipe(duplex=True)
3353 l = multiprocessing.connection.Listener(family='AF_INET')
3354 p = multiprocessing.Process(target=self._test_timeout,
3355 args=(child, l.address))
3356 p.start()
3357 child.close()
3358 self.assertEqual(parent.recv(), 123)
3359 parent.close()
3360 conn = l.accept()
3361 self.assertEqual(conn.recv(), 456)
3362 conn.close()
3363 l.close()
3364 p.join(10)
3365 finally:
3366 socket.setdefaulttimeout(old_timeout)
3367
Richard Oudkerke88a2442012-08-14 11:41:32 +01003368#
3369# Test what happens with no "if __name__ == '__main__'"
3370#
3371
3372class TestNoForkBomb(unittest.TestCase):
3373 def test_noforkbomb(self):
3374 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3375 if WIN32:
3376 rc, out, err = test.script_helper.assert_python_failure(name)
3377 self.assertEqual('', out.decode('ascii'))
3378 self.assertIn('RuntimeError', err.decode('ascii'))
3379 else:
3380 rc, out, err = test.script_helper.assert_python_ok(name)
3381 self.assertEqual('123', out.decode('ascii').rstrip())
3382 self.assertEqual('', err.decode('ascii'))
3383
3384#
3385#
3386#
3387
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00003388testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Richard Oudkerk77c84f22012-05-18 14:28:02 +01003389 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily,
Richard Oudkerk3165a752012-08-14 12:51:14 +01003390 TestFlags, TestTimeouts, TestNoForkBomb]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003391
Benjamin Petersone711caf2008-06-11 16:44:04 +00003392#
3393#
3394#
3395
3396def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00003397 if sys.platform.startswith("linux"):
3398 try:
3399 lock = multiprocessing.RLock()
3400 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00003401 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00003402
Charles-François Natali221ef672011-11-22 18:55:22 +01003403 check_enough_semaphores()
3404
Benjamin Petersone711caf2008-06-11 16:44:04 +00003405 if run is None:
3406 from test.support import run_unittest as run
3407
3408 util.get_temp_dir() # creates temp directory for use by all processes
3409
3410 multiprocessing.get_logger().setLevel(LOG_LEVEL)
3411
Benjamin Petersone711caf2008-06-11 16:44:04 +00003412 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00003413 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
3414 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00003415 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
3416 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00003417 )
3418
3419 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
3420 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerkfc7b0ec2012-10-08 14:56:24 +01003421 run(suite)
Benjamin Petersone711caf2008-06-11 16:44:04 +00003422
3423def main():
3424 test_main(unittest.TextTestRunner(verbosity=2).run)
3425
3426if __name__ == '__main__':
3427 main()