blob: 5e471f161359e22553724b29c82f1caed1567989 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Antoine Pitroude911b22011-12-21 11:03:24 +010011import itertools
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020015import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000016import signal
17import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000018import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000028# import threading after _multiprocessing to raise a more revelant error
29# message: "No module named _multiprocessing". _multiprocessing is not compiled
30# without thread support.
31import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000032
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.dummy
34import multiprocessing.connection
35import multiprocessing.managers
36import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000037import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000038
Charles-François Natalibc8f0822011-09-20 20:36:51 +020039from multiprocessing import util
40
41try:
42 from multiprocessing import reduction
43 HAS_REDUCTION = True
44except ImportError:
45 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000046
Brian Curtinafa88b52010-10-07 01:12:19 +000047try:
48 from multiprocessing.sharedctypes import Value, copy
49 HAS_SHAREDCTYPES = True
50except ImportError:
51 HAS_SHAREDCTYPES = False
52
Antoine Pitroubcb39d42011-08-23 19:46:22 +020053try:
54 import msvcrt
55except ImportError:
56 msvcrt = None
57
Benjamin Petersone711caf2008-06-11 16:44:04 +000058#
59#
60#
61
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000062def latin(s):
63 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000064
Benjamin Petersone711caf2008-06-11 16:44:04 +000065#
66# Constants
67#
68
69LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000070#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000071
72DELTA = 0.1
73CHECK_TIMINGS = False # making true makes tests take a lot longer
74 # and can sometimes cause some non-serious
75 # failures because some calls block a bit
76 # longer than expected
77if CHECK_TIMINGS:
78 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
79else:
80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
81
82HAVE_GETVALUE = not getattr(_multiprocessing,
83 'HAVE_BROKEN_SEM_GETVALUE', False)
84
Jesse Noller6214edd2009-01-19 16:23:53 +000085WIN32 = (sys.platform == "win32")
Antoine Pitrou176f07d2011-06-06 19:35:31 +020086if WIN32:
Antoine Pitrou23bba4c2012-04-18 20:51:15 +020087 from _winapi import WaitForSingleObject, INFINITE, WAIT_OBJECT_0
Antoine Pitrou176f07d2011-06-06 19:35:31 +020088
89 def wait_for_handle(handle, timeout):
90 if timeout is None or timeout < 0.0:
91 timeout = INFINITE
92 else:
93 timeout = int(1000 * timeout)
94 return WaitForSingleObject(handle, timeout) == WAIT_OBJECT_0
95else:
96 from select import select
97 _select = util._eintr_retry(select)
98
99 def wait_for_handle(handle, timeout):
100 if timeout is not None and timeout < 0.0:
101 timeout = None
102 return handle in _select([handle], [], [], timeout)[0]
Jesse Noller6214edd2009-01-19 16:23:53 +0000103
Antoine Pitroubcb39d42011-08-23 19:46:22 +0200104try:
105 MAXFD = os.sysconf("SC_OPEN_MAX")
106except:
107 MAXFD = 256
108
Benjamin Petersone711caf2008-06-11 16:44:04 +0000109#
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000110# Some tests require ctypes
111#
112
113try:
Florent Xiclunaaa171062010-08-14 15:56:42 +0000114 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000115except ImportError:
116 Structure = object
117 c_int = c_double = None
118
Charles-François Natali221ef672011-11-22 18:55:22 +0100119
120def check_enough_semaphores():
121 """Check that the system supports enough semaphores to run the test."""
122 # minimum number of semaphores available according to POSIX
123 nsems_min = 256
124 try:
125 nsems = os.sysconf("SC_SEM_NSEMS_MAX")
126 except (AttributeError, ValueError):
127 # sysconf not available or setting not available
128 return
129 if nsems == -1 or nsems >= nsems_min:
130 return
131 raise unittest.SkipTest("The OS doesn't support enough semaphores "
132 "to run the test (required: %d)." % nsems_min)
133
134
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000135#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136# Creates a wrapper for a function which records the time it takes to finish
137#
138
139class TimingWrapper(object):
140
141 def __init__(self, func):
142 self.func = func
143 self.elapsed = None
144
145 def __call__(self, *args, **kwds):
146 t = time.time()
147 try:
148 return self.func(*args, **kwds)
149 finally:
150 self.elapsed = time.time() - t
151
152#
153# Base class for test cases
154#
155
156class BaseTestCase(object):
157
158 ALLOWED_TYPES = ('processes', 'manager', 'threads')
159
160 def assertTimingAlmostEqual(self, a, b):
161 if CHECK_TIMINGS:
162 self.assertAlmostEqual(a, b, 1)
163
164 def assertReturnsIfImplemented(self, value, func, *args):
165 try:
166 res = func(*args)
167 except NotImplementedError:
168 pass
169 else:
170 return self.assertEqual(value, res)
171
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000172 # For the sanity of Windows users, rather than crashing or freezing in
173 # multiple ways.
174 def __reduce__(self, *args):
175 raise NotImplementedError("shouldn't try to pickle a test case")
176
177 __reduce_ex__ = __reduce__
178
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179#
180# Return the value of a semaphore
181#
182
183def get_value(self):
184 try:
185 return self.get_value()
186 except AttributeError:
187 try:
188 return self._Semaphore__value
189 except AttributeError:
190 try:
191 return self._value
192 except AttributeError:
193 raise NotImplementedError
194
195#
196# Testcases
197#
198
199class _TestProcess(BaseTestCase):
200
201 ALLOWED_TYPES = ('processes', 'threads')
202
203 def test_current(self):
204 if self.TYPE == 'threads':
205 return
206
207 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000208 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000209
210 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000212 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000213 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000214 self.assertEqual(current.ident, os.getpid())
215 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000216
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000217 def test_daemon_argument(self):
218 if self.TYPE == "threads":
219 return
220
221 # By default uses the current process's daemon flag.
222 proc0 = self.Process(target=self._test)
Antoine Pitrouec785222011-03-02 00:15:44 +0000223 self.assertEqual(proc0.daemon, self.current_process().daemon)
Antoine Pitrou0bd4deb2011-02-25 22:07:43 +0000224 proc1 = self.Process(target=self._test, daemon=True)
225 self.assertTrue(proc1.daemon)
226 proc2 = self.Process(target=self._test, daemon=False)
227 self.assertFalse(proc2.daemon)
228
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000229 @classmethod
230 def _test(cls, q, *args, **kwds):
231 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000232 q.put(args)
233 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000234 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000235 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000236 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000237 q.put(current.pid)
238
239 def test_process(self):
240 q = self.Queue(1)
241 e = self.Event()
242 args = (q, 1, 2)
243 kwargs = {'hello':23, 'bye':2.54}
244 name = 'SomeProcess'
245 p = self.Process(
246 target=self._test, args=args, kwargs=kwargs, name=name
247 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000248 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000249 current = self.current_process()
250
251 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000252 self.assertEqual(p.authkey, current.authkey)
253 self.assertEqual(p.is_alive(), False)
254 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000257 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000258
259 p.start()
260
Ezio Melottib3aedd42010-11-20 19:04:17 +0000261 self.assertEqual(p.exitcode, None)
262 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000263 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000264
Ezio Melottib3aedd42010-11-20 19:04:17 +0000265 self.assertEqual(q.get(), args[1:])
266 self.assertEqual(q.get(), kwargs)
267 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000268 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000269 self.assertEqual(q.get(), current.authkey)
270 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000271
272 p.join()
273
Ezio Melottib3aedd42010-11-20 19:04:17 +0000274 self.assertEqual(p.exitcode, 0)
275 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000276 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000278 @classmethod
279 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000280 time.sleep(1000)
281
282 def test_terminate(self):
283 if self.TYPE == 'threads':
284 return
285
286 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000287 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000288 p.start()
289
290 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000291 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000292 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000293
294 p.terminate()
295
296 join = TimingWrapper(p.join)
297 self.assertEqual(join(), None)
298 self.assertTimingAlmostEqual(join.elapsed, 0.0)
299
300 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000301 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000302
303 p.join()
304
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000305 # XXX sometimes get p.exitcode == 0 on Windows ...
306 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000307
308 def test_cpu_count(self):
309 try:
310 cpus = multiprocessing.cpu_count()
311 except NotImplementedError:
312 cpus = 1
313 self.assertTrue(type(cpus) is int)
314 self.assertTrue(cpus >= 1)
315
316 def test_active_children(self):
317 self.assertEqual(type(self.active_children()), list)
318
319 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000320 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000321
Jesus Cea94f964f2011-09-09 20:26:57 +0200322 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000323 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000324 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000325
326 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000327 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000328
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000329 @classmethod
330 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000331 from multiprocessing import forking
332 wconn.send(id)
333 if len(id) < 2:
334 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000335 p = cls.Process(
336 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000337 )
338 p.start()
339 p.join()
340
341 def test_recursion(self):
342 rconn, wconn = self.Pipe(duplex=False)
343 self._test_recursion(wconn, [])
344
345 time.sleep(DELTA)
346 result = []
347 while rconn.poll():
348 result.append(rconn.recv())
349
350 expected = [
351 [],
352 [0],
353 [0, 0],
354 [0, 1],
355 [1],
356 [1, 0],
357 [1, 1]
358 ]
359 self.assertEqual(result, expected)
360
Antoine Pitrou176f07d2011-06-06 19:35:31 +0200361 @classmethod
362 def _test_sentinel(cls, event):
363 event.wait(10.0)
364
365 def test_sentinel(self):
366 if self.TYPE == "threads":
367 return
368 event = self.Event()
369 p = self.Process(target=self._test_sentinel, args=(event,))
370 with self.assertRaises(ValueError):
371 p.sentinel
372 p.start()
373 self.addCleanup(p.join)
374 sentinel = p.sentinel
375 self.assertIsInstance(sentinel, int)
376 self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
377 event.set()
378 p.join()
379 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
380
Benjamin Petersone711caf2008-06-11 16:44:04 +0000381#
382#
383#
384
385class _UpperCaser(multiprocessing.Process):
386
387 def __init__(self):
388 multiprocessing.Process.__init__(self)
389 self.child_conn, self.parent_conn = multiprocessing.Pipe()
390
391 def run(self):
392 self.parent_conn.close()
393 for s in iter(self.child_conn.recv, None):
394 self.child_conn.send(s.upper())
395 self.child_conn.close()
396
397 def submit(self, s):
398 assert type(s) is str
399 self.parent_conn.send(s)
400 return self.parent_conn.recv()
401
402 def stop(self):
403 self.parent_conn.send(None)
404 self.parent_conn.close()
405 self.child_conn.close()
406
407class _TestSubclassingProcess(BaseTestCase):
408
409 ALLOWED_TYPES = ('processes',)
410
411 def test_subclassing(self):
412 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200413 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000414 uppercaser.start()
415 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
416 self.assertEqual(uppercaser.submit('world'), 'WORLD')
417 uppercaser.stop()
418 uppercaser.join()
419
Antoine Pitrou84a0fbf2012-01-27 10:52:37 +0100420 def test_stderr_flush(self):
421 # sys.stderr is flushed at process shutdown (issue #13812)
422 if self.TYPE == "threads":
423 return
424
425 testfn = test.support.TESTFN
426 self.addCleanup(test.support.unlink, testfn)
427 proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
428 proc.start()
429 proc.join()
430 with open(testfn, 'r') as f:
431 err = f.read()
432 # The whole traceback was printed
433 self.assertIn("ZeroDivisionError", err)
434 self.assertIn("test_multiprocessing.py", err)
435 self.assertIn("1/0 # MARKER", err)
436
437 @classmethod
438 def _test_stderr_flush(cls, testfn):
439 sys.stderr = open(testfn, 'w')
440 1/0 # MARKER
441
442
Benjamin Petersone711caf2008-06-11 16:44:04 +0000443#
444#
445#
446
447def queue_empty(q):
448 if hasattr(q, 'empty'):
449 return q.empty()
450 else:
451 return q.qsize() == 0
452
453def queue_full(q, maxsize):
454 if hasattr(q, 'full'):
455 return q.full()
456 else:
457 return q.qsize() == maxsize
458
459
460class _TestQueue(BaseTestCase):
461
462
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000463 @classmethod
464 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000465 child_can_start.wait()
466 for i in range(6):
467 queue.get()
468 parent_can_continue.set()
469
470 def test_put(self):
471 MAXSIZE = 6
472 queue = self.Queue(maxsize=MAXSIZE)
473 child_can_start = self.Event()
474 parent_can_continue = self.Event()
475
476 proc = self.Process(
477 target=self._test_put,
478 args=(queue, child_can_start, parent_can_continue)
479 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000480 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000481 proc.start()
482
483 self.assertEqual(queue_empty(queue), True)
484 self.assertEqual(queue_full(queue, MAXSIZE), False)
485
486 queue.put(1)
487 queue.put(2, True)
488 queue.put(3, True, None)
489 queue.put(4, False)
490 queue.put(5, False, None)
491 queue.put_nowait(6)
492
493 # the values may be in buffer but not yet in pipe so sleep a bit
494 time.sleep(DELTA)
495
496 self.assertEqual(queue_empty(queue), False)
497 self.assertEqual(queue_full(queue, MAXSIZE), True)
498
499 put = TimingWrapper(queue.put)
500 put_nowait = TimingWrapper(queue.put_nowait)
501
502 self.assertRaises(pyqueue.Full, put, 7, False)
503 self.assertTimingAlmostEqual(put.elapsed, 0)
504
505 self.assertRaises(pyqueue.Full, put, 7, False, None)
506 self.assertTimingAlmostEqual(put.elapsed, 0)
507
508 self.assertRaises(pyqueue.Full, put_nowait, 7)
509 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
510
511 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
512 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
513
514 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
515 self.assertTimingAlmostEqual(put.elapsed, 0)
516
517 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
518 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
519
520 child_can_start.set()
521 parent_can_continue.wait()
522
523 self.assertEqual(queue_empty(queue), True)
524 self.assertEqual(queue_full(queue, MAXSIZE), False)
525
526 proc.join()
527
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000528 @classmethod
529 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000530 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000531 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000532 queue.put(2)
533 queue.put(3)
534 queue.put(4)
535 queue.put(5)
536 parent_can_continue.set()
537
538 def test_get(self):
539 queue = self.Queue()
540 child_can_start = self.Event()
541 parent_can_continue = self.Event()
542
543 proc = self.Process(
544 target=self._test_get,
545 args=(queue, child_can_start, parent_can_continue)
546 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000547 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000548 proc.start()
549
550 self.assertEqual(queue_empty(queue), True)
551
552 child_can_start.set()
553 parent_can_continue.wait()
554
555 time.sleep(DELTA)
556 self.assertEqual(queue_empty(queue), False)
557
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000558 # Hangs unexpectedly, remove for now
559 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000560 self.assertEqual(queue.get(True, None), 2)
561 self.assertEqual(queue.get(True), 3)
562 self.assertEqual(queue.get(timeout=1), 4)
563 self.assertEqual(queue.get_nowait(), 5)
564
565 self.assertEqual(queue_empty(queue), True)
566
567 get = TimingWrapper(queue.get)
568 get_nowait = TimingWrapper(queue.get_nowait)
569
570 self.assertRaises(pyqueue.Empty, get, False)
571 self.assertTimingAlmostEqual(get.elapsed, 0)
572
573 self.assertRaises(pyqueue.Empty, get, False, None)
574 self.assertTimingAlmostEqual(get.elapsed, 0)
575
576 self.assertRaises(pyqueue.Empty, get_nowait)
577 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
578
579 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
580 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
581
582 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
583 self.assertTimingAlmostEqual(get.elapsed, 0)
584
585 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
586 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
587
588 proc.join()
589
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000590 @classmethod
591 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000592 for i in range(10, 20):
593 queue.put(i)
594 # note that at this point the items may only be buffered, so the
595 # process cannot shutdown until the feeder thread has finished
596 # pushing items onto the pipe.
597
598 def test_fork(self):
599 # Old versions of Queue would fail to create a new feeder
600 # thread for a forked process if the original process had its
601 # own feeder thread. This test checks that this no longer
602 # happens.
603
604 queue = self.Queue()
605
606 # put items on queue so that main process starts a feeder thread
607 for i in range(10):
608 queue.put(i)
609
610 # wait to make sure thread starts before we fork a new process
611 time.sleep(DELTA)
612
613 # fork process
614 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200615 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000616 p.start()
617
618 # check that all expected items are in the queue
619 for i in range(20):
620 self.assertEqual(queue.get(), i)
621 self.assertRaises(pyqueue.Empty, queue.get, False)
622
623 p.join()
624
625 def test_qsize(self):
626 q = self.Queue()
627 try:
628 self.assertEqual(q.qsize(), 0)
629 except NotImplementedError:
630 return
631 q.put(1)
632 self.assertEqual(q.qsize(), 1)
633 q.put(5)
634 self.assertEqual(q.qsize(), 2)
635 q.get()
636 self.assertEqual(q.qsize(), 1)
637 q.get()
638 self.assertEqual(q.qsize(), 0)
639
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000640 @classmethod
641 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000642 for obj in iter(q.get, None):
643 time.sleep(DELTA)
644 q.task_done()
645
646 def test_task_done(self):
647 queue = self.JoinableQueue()
648
649 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000650 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000651
652 workers = [self.Process(target=self._test_task_done, args=(queue,))
653 for i in range(4)]
654
655 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200656 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000657 p.start()
658
659 for i in range(10):
660 queue.put(i)
661
662 queue.join()
663
664 for p in workers:
665 queue.put(None)
666
667 for p in workers:
668 p.join()
669
670#
671#
672#
673
674class _TestLock(BaseTestCase):
675
676 def test_lock(self):
677 lock = self.Lock()
678 self.assertEqual(lock.acquire(), True)
679 self.assertEqual(lock.acquire(False), False)
680 self.assertEqual(lock.release(), None)
681 self.assertRaises((ValueError, threading.ThreadError), lock.release)
682
683 def test_rlock(self):
684 lock = self.RLock()
685 self.assertEqual(lock.acquire(), True)
686 self.assertEqual(lock.acquire(), True)
687 self.assertEqual(lock.acquire(), True)
688 self.assertEqual(lock.release(), None)
689 self.assertEqual(lock.release(), None)
690 self.assertEqual(lock.release(), None)
691 self.assertRaises((AssertionError, RuntimeError), lock.release)
692
Jesse Nollerf8d00852009-03-31 03:25:07 +0000693 def test_lock_context(self):
694 with self.Lock():
695 pass
696
Benjamin Petersone711caf2008-06-11 16:44:04 +0000697
698class _TestSemaphore(BaseTestCase):
699
700 def _test_semaphore(self, sem):
701 self.assertReturnsIfImplemented(2, get_value, sem)
702 self.assertEqual(sem.acquire(), True)
703 self.assertReturnsIfImplemented(1, get_value, sem)
704 self.assertEqual(sem.acquire(), True)
705 self.assertReturnsIfImplemented(0, get_value, sem)
706 self.assertEqual(sem.acquire(False), False)
707 self.assertReturnsIfImplemented(0, get_value, sem)
708 self.assertEqual(sem.release(), None)
709 self.assertReturnsIfImplemented(1, get_value, sem)
710 self.assertEqual(sem.release(), None)
711 self.assertReturnsIfImplemented(2, get_value, sem)
712
713 def test_semaphore(self):
714 sem = self.Semaphore(2)
715 self._test_semaphore(sem)
716 self.assertEqual(sem.release(), None)
717 self.assertReturnsIfImplemented(3, get_value, sem)
718 self.assertEqual(sem.release(), None)
719 self.assertReturnsIfImplemented(4, get_value, sem)
720
721 def test_bounded_semaphore(self):
722 sem = self.BoundedSemaphore(2)
723 self._test_semaphore(sem)
724 # Currently fails on OS/X
725 #if HAVE_GETVALUE:
726 # self.assertRaises(ValueError, sem.release)
727 # self.assertReturnsIfImplemented(2, get_value, sem)
728
729 def test_timeout(self):
730 if self.TYPE != 'processes':
731 return
732
733 sem = self.Semaphore(0)
734 acquire = TimingWrapper(sem.acquire)
735
736 self.assertEqual(acquire(False), False)
737 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
738
739 self.assertEqual(acquire(False, None), False)
740 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
741
742 self.assertEqual(acquire(False, TIMEOUT1), False)
743 self.assertTimingAlmostEqual(acquire.elapsed, 0)
744
745 self.assertEqual(acquire(True, TIMEOUT2), False)
746 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
747
748 self.assertEqual(acquire(timeout=TIMEOUT3), False)
749 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
750
751
752class _TestCondition(BaseTestCase):
753
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000754 @classmethod
755 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000756 cond.acquire()
757 sleeping.release()
758 cond.wait(timeout)
759 woken.release()
760 cond.release()
761
762 def check_invariant(self, cond):
763 # this is only supposed to succeed when there are no sleepers
764 if self.TYPE == 'processes':
765 try:
766 sleepers = (cond._sleeping_count.get_value() -
767 cond._woken_count.get_value())
768 self.assertEqual(sleepers, 0)
769 self.assertEqual(cond._wait_semaphore.get_value(), 0)
770 except NotImplementedError:
771 pass
772
773 def test_notify(self):
774 cond = self.Condition()
775 sleeping = self.Semaphore(0)
776 woken = self.Semaphore(0)
777
778 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000779 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000780 p.start()
781
782 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000783 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000784 p.start()
785
786 # wait for both children to start sleeping
787 sleeping.acquire()
788 sleeping.acquire()
789
790 # check no process/thread has woken up
791 time.sleep(DELTA)
792 self.assertReturnsIfImplemented(0, get_value, woken)
793
794 # wake up one process/thread
795 cond.acquire()
796 cond.notify()
797 cond.release()
798
799 # check one process/thread has woken up
800 time.sleep(DELTA)
801 self.assertReturnsIfImplemented(1, get_value, woken)
802
803 # wake up another
804 cond.acquire()
805 cond.notify()
806 cond.release()
807
808 # check other has woken up
809 time.sleep(DELTA)
810 self.assertReturnsIfImplemented(2, get_value, woken)
811
812 # check state is not mucked up
813 self.check_invariant(cond)
814 p.join()
815
816 def test_notify_all(self):
817 cond = self.Condition()
818 sleeping = self.Semaphore(0)
819 woken = self.Semaphore(0)
820
821 # start some threads/processes which will timeout
822 for i in range(3):
823 p = self.Process(target=self.f,
824 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000825 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000826 p.start()
827
828 t = threading.Thread(target=self.f,
829 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000830 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 t.start()
832
833 # wait for them all to sleep
834 for i in range(6):
835 sleeping.acquire()
836
837 # check they have all timed out
838 for i in range(6):
839 woken.acquire()
840 self.assertReturnsIfImplemented(0, get_value, woken)
841
842 # check state is not mucked up
843 self.check_invariant(cond)
844
845 # start some more threads/processes
846 for i in range(3):
847 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000848 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000849 p.start()
850
851 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000852 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000853 t.start()
854
855 # wait for them to all sleep
856 for i in range(6):
857 sleeping.acquire()
858
859 # check no process/thread has woken up
860 time.sleep(DELTA)
861 self.assertReturnsIfImplemented(0, get_value, woken)
862
863 # wake them all up
864 cond.acquire()
865 cond.notify_all()
866 cond.release()
867
868 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200869 for i in range(10):
870 try:
871 if get_value(woken) == 6:
872 break
873 except NotImplementedError:
874 break
875 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000876 self.assertReturnsIfImplemented(6, get_value, woken)
877
878 # check state is not mucked up
879 self.check_invariant(cond)
880
881 def test_timeout(self):
882 cond = self.Condition()
883 wait = TimingWrapper(cond.wait)
884 cond.acquire()
885 res = wait(TIMEOUT1)
886 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000887 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000888 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
889
Charles-François Natalic8ce7152012-04-17 18:45:57 +0200890 @classmethod
891 def _test_waitfor_f(cls, cond, state):
892 with cond:
893 state.value = 0
894 cond.notify()
895 result = cond.wait_for(lambda : state.value==4)
896 if not result or state.value != 4:
897 sys.exit(1)
898
899 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
900 def test_waitfor(self):
901 # based on test in test/lock_tests.py
902 cond = self.Condition()
903 state = self.Value('i', -1)
904
905 p = self.Process(target=self._test_waitfor_f, args=(cond, state))
906 p.daemon = True
907 p.start()
908
909 with cond:
910 result = cond.wait_for(lambda : state.value==0)
911 self.assertTrue(result)
912 self.assertEqual(state.value, 0)
913
914 for i in range(4):
915 time.sleep(0.01)
916 with cond:
917 state.value += 1
918 cond.notify()
919
920 p.join(5)
921 self.assertFalse(p.is_alive())
922 self.assertEqual(p.exitcode, 0)
923
924 @classmethod
925 def _test_waitfor_timeout_f(cls, cond, state, success):
926 with cond:
927 expected = 0.1
928 dt = time.time()
929 result = cond.wait_for(lambda : state.value==4, timeout=expected)
930 dt = time.time() - dt
931 # borrow logic in assertTimeout() from test/lock_tests.py
932 if not result and expected * 0.6 < dt < expected * 10.0:
933 success.value = True
934
935 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
936 def test_waitfor_timeout(self):
937 # based on test in test/lock_tests.py
938 cond = self.Condition()
939 state = self.Value('i', 0)
940 success = self.Value('i', False)
941
942 p = self.Process(target=self._test_waitfor_timeout_f,
943 args=(cond, state, success))
944 p.daemon = True
945 p.start()
946
947 # Only increment 3 times, so state == 4 is never reached.
948 for i in range(3):
949 time.sleep(0.01)
950 with cond:
951 state.value += 1
952 cond.notify()
953
954 p.join(5)
955 self.assertTrue(success.value)
956
Benjamin Petersone711caf2008-06-11 16:44:04 +0000957
958class _TestEvent(BaseTestCase):
959
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000960 @classmethod
961 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000962 time.sleep(TIMEOUT2)
963 event.set()
964
965 def test_event(self):
966 event = self.Event()
967 wait = TimingWrapper(event.wait)
968
Ezio Melotti13925002011-03-16 11:05:33 +0200969 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000971 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000972
Benjamin Peterson965ce872009-04-05 21:24:58 +0000973 # Removed, threading.Event.wait() will return the value of the __flag
974 # instead of None. API Shear with the semaphore backed mp.Event
975 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000976 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000977 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000978 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
979
980 event.set()
981
982 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000983 self.assertEqual(event.is_set(), True)
984 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000985 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000986 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000987 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
988 # self.assertEqual(event.is_set(), True)
989
990 event.clear()
991
992 #self.assertEqual(event.is_set(), False)
993
Jesus Cea94f964f2011-09-09 20:26:57 +0200994 p = self.Process(target=self._test_event, args=(event,))
995 p.daemon = True
996 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000997 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000998
999#
1000#
1001#
1002
1003class _TestValue(BaseTestCase):
1004
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001005 ALLOWED_TYPES = ('processes',)
1006
Benjamin Petersone711caf2008-06-11 16:44:04 +00001007 codes_values = [
1008 ('i', 4343, 24234),
1009 ('d', 3.625, -4.25),
1010 ('h', -232, 234),
1011 ('c', latin('x'), latin('y'))
1012 ]
1013
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001014 def setUp(self):
1015 if not HAS_SHAREDCTYPES:
1016 self.skipTest("requires multiprocessing.sharedctypes")
1017
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001018 @classmethod
1019 def _test(cls, values):
1020 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001021 sv.value = cv[2]
1022
1023
1024 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001025 if raw:
1026 values = [self.RawValue(code, value)
1027 for code, value, _ in self.codes_values]
1028 else:
1029 values = [self.Value(code, value)
1030 for code, value, _ in self.codes_values]
1031
1032 for sv, cv in zip(values, self.codes_values):
1033 self.assertEqual(sv.value, cv[1])
1034
1035 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001036 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001037 proc.start()
1038 proc.join()
1039
1040 for sv, cv in zip(values, self.codes_values):
1041 self.assertEqual(sv.value, cv[2])
1042
1043 def test_rawvalue(self):
1044 self.test_value(raw=True)
1045
1046 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001047 val1 = self.Value('i', 5)
1048 lock1 = val1.get_lock()
1049 obj1 = val1.get_obj()
1050
1051 val2 = self.Value('i', 5, lock=None)
1052 lock2 = val2.get_lock()
1053 obj2 = val2.get_obj()
1054
1055 lock = self.Lock()
1056 val3 = self.Value('i', 5, lock=lock)
1057 lock3 = val3.get_lock()
1058 obj3 = val3.get_obj()
1059 self.assertEqual(lock, lock3)
1060
Jesse Nollerb0516a62009-01-18 03:11:38 +00001061 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001062 self.assertFalse(hasattr(arr4, 'get_lock'))
1063 self.assertFalse(hasattr(arr4, 'get_obj'))
1064
Jesse Nollerb0516a62009-01-18 03:11:38 +00001065 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1066
1067 arr5 = self.RawValue('i', 5)
1068 self.assertFalse(hasattr(arr5, 'get_lock'))
1069 self.assertFalse(hasattr(arr5, 'get_obj'))
1070
Benjamin Petersone711caf2008-06-11 16:44:04 +00001071
1072class _TestArray(BaseTestCase):
1073
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001074 ALLOWED_TYPES = ('processes',)
1075
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001076 @classmethod
1077 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001078 for i in range(1, len(seq)):
1079 seq[i] += seq[i-1]
1080
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001081 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001082 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001083 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1084 if raw:
1085 arr = self.RawArray('i', seq)
1086 else:
1087 arr = self.Array('i', seq)
1088
1089 self.assertEqual(len(arr), len(seq))
1090 self.assertEqual(arr[3], seq[3])
1091 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1092
1093 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1094
1095 self.assertEqual(list(arr[:]), seq)
1096
1097 self.f(seq)
1098
1099 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001100 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001101 p.start()
1102 p.join()
1103
1104 self.assertEqual(list(arr[:]), seq)
1105
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001106 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +00001107 def test_array_from_size(self):
1108 size = 10
1109 # Test for zeroing (see issue #11675).
1110 # The repetition below strengthens the test by increasing the chances
1111 # of previously allocated non-zero memory being used for the new array
1112 # on the 2nd and 3rd loops.
1113 for _ in range(3):
1114 arr = self.Array('i', size)
1115 self.assertEqual(len(arr), size)
1116 self.assertEqual(list(arr), [0] * size)
1117 arr[:] = range(10)
1118 self.assertEqual(list(arr), list(range(10)))
1119 del arr
1120
1121 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001122 def test_rawarray(self):
1123 self.test_array(raw=True)
1124
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001125 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001126 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001127 arr1 = self.Array('i', list(range(10)))
1128 lock1 = arr1.get_lock()
1129 obj1 = arr1.get_obj()
1130
1131 arr2 = self.Array('i', list(range(10)), lock=None)
1132 lock2 = arr2.get_lock()
1133 obj2 = arr2.get_obj()
1134
1135 lock = self.Lock()
1136 arr3 = self.Array('i', list(range(10)), lock=lock)
1137 lock3 = arr3.get_lock()
1138 obj3 = arr3.get_obj()
1139 self.assertEqual(lock, lock3)
1140
Jesse Nollerb0516a62009-01-18 03:11:38 +00001141 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001142 self.assertFalse(hasattr(arr4, 'get_lock'))
1143 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +00001144 self.assertRaises(AttributeError,
1145 self.Array, 'i', range(10), lock='notalock')
1146
1147 arr5 = self.RawArray('i', range(10))
1148 self.assertFalse(hasattr(arr5, 'get_lock'))
1149 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +00001150
1151#
1152#
1153#
1154
1155class _TestContainers(BaseTestCase):
1156
1157 ALLOWED_TYPES = ('manager',)
1158
1159 def test_list(self):
1160 a = self.list(list(range(10)))
1161 self.assertEqual(a[:], list(range(10)))
1162
1163 b = self.list()
1164 self.assertEqual(b[:], [])
1165
1166 b.extend(list(range(5)))
1167 self.assertEqual(b[:], list(range(5)))
1168
1169 self.assertEqual(b[2], 2)
1170 self.assertEqual(b[2:10], [2,3,4])
1171
1172 b *= 2
1173 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1174
1175 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1176
1177 self.assertEqual(a[:], list(range(10)))
1178
1179 d = [a, b]
1180 e = self.list(d)
1181 self.assertEqual(
1182 e[:],
1183 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1184 )
1185
1186 f = self.list([a])
1187 a.append('hello')
1188 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1189
1190 def test_dict(self):
1191 d = self.dict()
1192 indices = list(range(65, 70))
1193 for i in indices:
1194 d[i] = chr(i)
1195 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1196 self.assertEqual(sorted(d.keys()), indices)
1197 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1198 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1199
1200 def test_namespace(self):
1201 n = self.Namespace()
1202 n.name = 'Bob'
1203 n.job = 'Builder'
1204 n._hidden = 'hidden'
1205 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1206 del n.job
1207 self.assertEqual(str(n), "Namespace(name='Bob')")
1208 self.assertTrue(hasattr(n, 'name'))
1209 self.assertTrue(not hasattr(n, 'job'))
1210
1211#
1212#
1213#
1214
1215def sqr(x, wait=0.0):
1216 time.sleep(wait)
1217 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001218
Antoine Pitroude911b22011-12-21 11:03:24 +01001219def mul(x, y):
1220 return x*y
1221
Benjamin Petersone711caf2008-06-11 16:44:04 +00001222class _TestPool(BaseTestCase):
1223
1224 def test_apply(self):
1225 papply = self.pool.apply
1226 self.assertEqual(papply(sqr, (5,)), sqr(5))
1227 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1228
1229 def test_map(self):
1230 pmap = self.pool.map
1231 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1232 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1233 list(map(sqr, list(range(100)))))
1234
Antoine Pitroude911b22011-12-21 11:03:24 +01001235 def test_starmap(self):
1236 psmap = self.pool.starmap
1237 tuples = list(zip(range(10), range(9,-1, -1)))
1238 self.assertEqual(psmap(mul, tuples),
1239 list(itertools.starmap(mul, tuples)))
1240 tuples = list(zip(range(100), range(99,-1, -1)))
1241 self.assertEqual(psmap(mul, tuples, chunksize=20),
1242 list(itertools.starmap(mul, tuples)))
1243
1244 def test_starmap_async(self):
1245 tuples = list(zip(range(100), range(99,-1, -1)))
1246 self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1247 list(itertools.starmap(mul, tuples)))
1248
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001249 def test_map_chunksize(self):
1250 try:
1251 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1252 except multiprocessing.TimeoutError:
1253 self.fail("pool.map_async with chunksize stalled on null list")
1254
Benjamin Petersone711caf2008-06-11 16:44:04 +00001255 def test_async(self):
1256 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1257 get = TimingWrapper(res.get)
1258 self.assertEqual(get(), 49)
1259 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1260
1261 def test_async_timeout(self):
1262 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1263 get = TimingWrapper(res.get)
1264 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1265 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1266
1267 def test_imap(self):
1268 it = self.pool.imap(sqr, list(range(10)))
1269 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1270
1271 it = self.pool.imap(sqr, list(range(10)))
1272 for i in range(10):
1273 self.assertEqual(next(it), i*i)
1274 self.assertRaises(StopIteration, it.__next__)
1275
1276 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1277 for i in range(1000):
1278 self.assertEqual(next(it), i*i)
1279 self.assertRaises(StopIteration, it.__next__)
1280
1281 def test_imap_unordered(self):
1282 it = self.pool.imap_unordered(sqr, list(range(1000)))
1283 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1284
1285 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1286 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1287
1288 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001289 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1290 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1291
Benjamin Petersone711caf2008-06-11 16:44:04 +00001292 p = multiprocessing.Pool(3)
1293 self.assertEqual(3, len(p._pool))
1294 p.close()
1295 p.join()
1296
1297 def test_terminate(self):
1298 if self.TYPE == 'manager':
1299 # On Unix a forked process increfs each shared object to
1300 # which its parent process held a reference. If the
1301 # forked process gets terminated then there is likely to
1302 # be a reference leak. So to prevent
1303 # _TestZZZNumberOfObjects from failing we skip this test
1304 # when using a manager.
1305 return
1306
1307 result = self.pool.map_async(
1308 time.sleep, [0.1 for i in range(10000)], chunksize=1
1309 )
1310 self.pool.terminate()
1311 join = TimingWrapper(self.pool.join)
1312 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001313 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001314
Ask Solem2afcbf22010-11-09 20:55:52 +00001315def raising():
1316 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001317
Ask Solem2afcbf22010-11-09 20:55:52 +00001318def unpickleable_result():
1319 return lambda: 42
1320
1321class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001322 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001323
1324 def test_async_error_callback(self):
1325 p = multiprocessing.Pool(2)
1326
1327 scratchpad = [None]
1328 def errback(exc):
1329 scratchpad[0] = exc
1330
1331 res = p.apply_async(raising, error_callback=errback)
1332 self.assertRaises(KeyError, res.get)
1333 self.assertTrue(scratchpad[0])
1334 self.assertIsInstance(scratchpad[0], KeyError)
1335
1336 p.close()
1337 p.join()
1338
1339 def test_unpickleable_result(self):
1340 from multiprocessing.pool import MaybeEncodingError
1341 p = multiprocessing.Pool(2)
1342
1343 # Make sure we don't lose pool processes because of encoding errors.
1344 for iteration in range(20):
1345
1346 scratchpad = [None]
1347 def errback(exc):
1348 scratchpad[0] = exc
1349
1350 res = p.apply_async(unpickleable_result, error_callback=errback)
1351 self.assertRaises(MaybeEncodingError, res.get)
1352 wrapped = scratchpad[0]
1353 self.assertTrue(wrapped)
1354 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1355 self.assertIsNotNone(wrapped.exc)
1356 self.assertIsNotNone(wrapped.value)
1357
1358 p.close()
1359 p.join()
1360
1361class _TestPoolWorkerLifetime(BaseTestCase):
1362 ALLOWED_TYPES = ('processes', )
1363
Jesse Noller1f0b6582010-01-27 03:36:01 +00001364 def test_pool_worker_lifetime(self):
1365 p = multiprocessing.Pool(3, maxtasksperchild=10)
1366 self.assertEqual(3, len(p._pool))
1367 origworkerpids = [w.pid for w in p._pool]
1368 # Run many tasks so each worker gets replaced (hopefully)
1369 results = []
1370 for i in range(100):
1371 results.append(p.apply_async(sqr, (i, )))
1372 # Fetch the results and verify we got the right answers,
1373 # also ensuring all the tasks have completed.
1374 for (j, res) in enumerate(results):
1375 self.assertEqual(res.get(), sqr(j))
1376 # Refill the pool
1377 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001378 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001379 # (countdown * DELTA = 5 seconds max startup process time)
1380 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001381 while countdown and not all(w.is_alive() for w in p._pool):
1382 countdown -= 1
1383 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001384 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001385 # All pids should be assigned. See issue #7805.
1386 self.assertNotIn(None, origworkerpids)
1387 self.assertNotIn(None, finalworkerpids)
1388 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001389 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1390 p.close()
1391 p.join()
1392
Charles-François Natalif8859e12011-10-24 18:45:29 +02001393 def test_pool_worker_lifetime_early_close(self):
1394 # Issue #10332: closing a pool whose workers have limited lifetimes
1395 # before all the tasks completed would make join() hang.
1396 p = multiprocessing.Pool(3, maxtasksperchild=1)
1397 results = []
1398 for i in range(6):
1399 results.append(p.apply_async(sqr, (i, 0.3)))
1400 p.close()
1401 p.join()
1402 # check the results
1403 for (j, res) in enumerate(results):
1404 self.assertEqual(res.get(), sqr(j))
1405
1406
Benjamin Petersone711caf2008-06-11 16:44:04 +00001407#
1408# Test that manager has expected number of shared objects left
1409#
1410
1411class _TestZZZNumberOfObjects(BaseTestCase):
1412 # Because test cases are sorted alphabetically, this one will get
1413 # run after all the other tests for the manager. It tests that
1414 # there have been no "reference leaks" for the manager's shared
1415 # objects. Note the comment in _TestPool.test_terminate().
1416 ALLOWED_TYPES = ('manager',)
1417
1418 def test_number_of_objects(self):
1419 EXPECTED_NUMBER = 1 # the pool object is still alive
1420 multiprocessing.active_children() # discard dead process objs
1421 gc.collect() # do garbage collection
1422 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001423 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001424 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001425 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001426 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001427
1428 self.assertEqual(refs, EXPECTED_NUMBER)
1429
1430#
1431# Test of creating a customized manager class
1432#
1433
1434from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1435
1436class FooBar(object):
1437 def f(self):
1438 return 'f()'
1439 def g(self):
1440 raise ValueError
1441 def _h(self):
1442 return '_h()'
1443
1444def baz():
1445 for i in range(10):
1446 yield i*i
1447
1448class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001449 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001450 def __iter__(self):
1451 return self
1452 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001453 return self._callmethod('__next__')
1454
1455class MyManager(BaseManager):
1456 pass
1457
1458MyManager.register('Foo', callable=FooBar)
1459MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1460MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1461
1462
1463class _TestMyManager(BaseTestCase):
1464
1465 ALLOWED_TYPES = ('manager',)
1466
1467 def test_mymanager(self):
1468 manager = MyManager()
1469 manager.start()
1470
1471 foo = manager.Foo()
1472 bar = manager.Bar()
1473 baz = manager.baz()
1474
1475 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1476 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1477
1478 self.assertEqual(foo_methods, ['f', 'g'])
1479 self.assertEqual(bar_methods, ['f', '_h'])
1480
1481 self.assertEqual(foo.f(), 'f()')
1482 self.assertRaises(ValueError, foo.g)
1483 self.assertEqual(foo._callmethod('f'), 'f()')
1484 self.assertRaises(RemoteError, foo._callmethod, '_h')
1485
1486 self.assertEqual(bar.f(), 'f()')
1487 self.assertEqual(bar._h(), '_h()')
1488 self.assertEqual(bar._callmethod('f'), 'f()')
1489 self.assertEqual(bar._callmethod('_h'), '_h()')
1490
1491 self.assertEqual(list(baz), [i*i for i in range(10)])
1492
1493 manager.shutdown()
1494
1495#
1496# Test of connecting to a remote server and using xmlrpclib for serialization
1497#
1498
1499_queue = pyqueue.Queue()
1500def get_queue():
1501 return _queue
1502
1503class QueueManager(BaseManager):
1504 '''manager class used by server process'''
1505QueueManager.register('get_queue', callable=get_queue)
1506
1507class QueueManager2(BaseManager):
1508 '''manager class which specifies the same interface as QueueManager'''
1509QueueManager2.register('get_queue')
1510
1511
1512SERIALIZER = 'xmlrpclib'
1513
1514class _TestRemoteManager(BaseTestCase):
1515
1516 ALLOWED_TYPES = ('manager',)
1517
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001518 @classmethod
1519 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001520 manager = QueueManager2(
1521 address=address, authkey=authkey, serializer=SERIALIZER
1522 )
1523 manager.connect()
1524 queue = manager.get_queue()
1525 queue.put(('hello world', None, True, 2.25))
1526
1527 def test_remote(self):
1528 authkey = os.urandom(32)
1529
1530 manager = QueueManager(
1531 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1532 )
1533 manager.start()
1534
1535 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001536 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001537 p.start()
1538
1539 manager2 = QueueManager2(
1540 address=manager.address, authkey=authkey, serializer=SERIALIZER
1541 )
1542 manager2.connect()
1543 queue = manager2.get_queue()
1544
1545 # Note that xmlrpclib will deserialize object as a list not a tuple
1546 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1547
1548 # Because we are using xmlrpclib for serialization instead of
1549 # pickle this will cause a serialization error.
1550 self.assertRaises(Exception, queue.put, time.sleep)
1551
1552 # Make queue finalizer run before the server is stopped
1553 del queue
1554 manager.shutdown()
1555
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001556class _TestManagerRestart(BaseTestCase):
1557
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001558 @classmethod
1559 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001560 manager = QueueManager(
1561 address=address, authkey=authkey, serializer=SERIALIZER)
1562 manager.connect()
1563 queue = manager.get_queue()
1564 queue.put('hello world')
1565
1566 def test_rapid_restart(self):
1567 authkey = os.urandom(32)
1568 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001569 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001570 srvr = manager.get_server()
1571 addr = srvr.address
1572 # Close the connection.Listener socket which gets opened as a part
1573 # of manager.get_server(). It's not needed for the test.
1574 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001575 manager.start()
1576
1577 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001578 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001579 p.start()
1580 queue = manager.get_queue()
1581 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001582 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001583 manager.shutdown()
1584 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001585 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001586 try:
1587 manager.start()
1588 except IOError as e:
1589 if e.errno != errno.EADDRINUSE:
1590 raise
1591 # Retry after some time, in case the old socket was lingering
1592 # (sporadic failure on buildbots)
1593 time.sleep(1.0)
1594 manager = QueueManager(
1595 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001596 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001597
Benjamin Petersone711caf2008-06-11 16:44:04 +00001598#
1599#
1600#
1601
1602SENTINEL = latin('')
1603
1604class _TestConnection(BaseTestCase):
1605
1606 ALLOWED_TYPES = ('processes', 'threads')
1607
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001608 @classmethod
1609 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001610 for msg in iter(conn.recv_bytes, SENTINEL):
1611 conn.send_bytes(msg)
1612 conn.close()
1613
1614 def test_connection(self):
1615 conn, child_conn = self.Pipe()
1616
1617 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001618 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001619 p.start()
1620
1621 seq = [1, 2.25, None]
1622 msg = latin('hello world')
1623 longmsg = msg * 10
1624 arr = array.array('i', list(range(4)))
1625
1626 if self.TYPE == 'processes':
1627 self.assertEqual(type(conn.fileno()), int)
1628
1629 self.assertEqual(conn.send(seq), None)
1630 self.assertEqual(conn.recv(), seq)
1631
1632 self.assertEqual(conn.send_bytes(msg), None)
1633 self.assertEqual(conn.recv_bytes(), msg)
1634
1635 if self.TYPE == 'processes':
1636 buffer = array.array('i', [0]*10)
1637 expected = list(arr) + [0] * (10 - len(arr))
1638 self.assertEqual(conn.send_bytes(arr), None)
1639 self.assertEqual(conn.recv_bytes_into(buffer),
1640 len(arr) * buffer.itemsize)
1641 self.assertEqual(list(buffer), expected)
1642
1643 buffer = array.array('i', [0]*10)
1644 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1645 self.assertEqual(conn.send_bytes(arr), None)
1646 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1647 len(arr) * buffer.itemsize)
1648 self.assertEqual(list(buffer), expected)
1649
1650 buffer = bytearray(latin(' ' * 40))
1651 self.assertEqual(conn.send_bytes(longmsg), None)
1652 try:
1653 res = conn.recv_bytes_into(buffer)
1654 except multiprocessing.BufferTooShort as e:
1655 self.assertEqual(e.args, (longmsg,))
1656 else:
1657 self.fail('expected BufferTooShort, got %s' % res)
1658
1659 poll = TimingWrapper(conn.poll)
1660
1661 self.assertEqual(poll(), False)
1662 self.assertTimingAlmostEqual(poll.elapsed, 0)
1663
1664 self.assertEqual(poll(TIMEOUT1), False)
1665 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1666
1667 conn.send(None)
1668
1669 self.assertEqual(poll(TIMEOUT1), True)
1670 self.assertTimingAlmostEqual(poll.elapsed, 0)
1671
1672 self.assertEqual(conn.recv(), None)
1673
1674 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1675 conn.send_bytes(really_big_msg)
1676 self.assertEqual(conn.recv_bytes(), really_big_msg)
1677
1678 conn.send_bytes(SENTINEL) # tell child to quit
1679 child_conn.close()
1680
1681 if self.TYPE == 'processes':
1682 self.assertEqual(conn.readable, True)
1683 self.assertEqual(conn.writable, True)
1684 self.assertRaises(EOFError, conn.recv)
1685 self.assertRaises(EOFError, conn.recv_bytes)
1686
1687 p.join()
1688
1689 def test_duplex_false(self):
1690 reader, writer = self.Pipe(duplex=False)
1691 self.assertEqual(writer.send(1), None)
1692 self.assertEqual(reader.recv(), 1)
1693 if self.TYPE == 'processes':
1694 self.assertEqual(reader.readable, True)
1695 self.assertEqual(reader.writable, False)
1696 self.assertEqual(writer.readable, False)
1697 self.assertEqual(writer.writable, True)
1698 self.assertRaises(IOError, reader.send, 2)
1699 self.assertRaises(IOError, writer.recv)
1700 self.assertRaises(IOError, writer.poll)
1701
1702 def test_spawn_close(self):
1703 # We test that a pipe connection can be closed by parent
1704 # process immediately after child is spawned. On Windows this
1705 # would have sometimes failed on old versions because
1706 # child_conn would be closed before the child got a chance to
1707 # duplicate it.
1708 conn, child_conn = self.Pipe()
1709
1710 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001711 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001712 p.start()
1713 child_conn.close() # this might complete before child initializes
1714
1715 msg = latin('hello')
1716 conn.send_bytes(msg)
1717 self.assertEqual(conn.recv_bytes(), msg)
1718
1719 conn.send_bytes(SENTINEL)
1720 conn.close()
1721 p.join()
1722
1723 def test_sendbytes(self):
1724 if self.TYPE != 'processes':
1725 return
1726
1727 msg = latin('abcdefghijklmnopqrstuvwxyz')
1728 a, b = self.Pipe()
1729
1730 a.send_bytes(msg)
1731 self.assertEqual(b.recv_bytes(), msg)
1732
1733 a.send_bytes(msg, 5)
1734 self.assertEqual(b.recv_bytes(), msg[5:])
1735
1736 a.send_bytes(msg, 7, 8)
1737 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1738
1739 a.send_bytes(msg, 26)
1740 self.assertEqual(b.recv_bytes(), latin(''))
1741
1742 a.send_bytes(msg, 26, 0)
1743 self.assertEqual(b.recv_bytes(), latin(''))
1744
1745 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1746
1747 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1748
1749 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1750
1751 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1752
1753 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1754
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001755 @classmethod
1756 def _is_fd_assigned(cls, fd):
1757 try:
1758 os.fstat(fd)
1759 except OSError as e:
1760 if e.errno == errno.EBADF:
1761 return False
1762 raise
1763 else:
1764 return True
1765
1766 @classmethod
1767 def _writefd(cls, conn, data, create_dummy_fds=False):
1768 if create_dummy_fds:
1769 for i in range(0, 256):
1770 if not cls._is_fd_assigned(i):
1771 os.dup2(conn.fileno(), i)
1772 fd = reduction.recv_handle(conn)
1773 if msvcrt:
1774 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1775 os.write(fd, data)
1776 os.close(fd)
1777
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001778 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001779 def test_fd_transfer(self):
1780 if self.TYPE != 'processes':
1781 self.skipTest("only makes sense with processes")
1782 conn, child_conn = self.Pipe(duplex=True)
1783
1784 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001785 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001786 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001787 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001788 with open(test.support.TESTFN, "wb") as f:
1789 fd = f.fileno()
1790 if msvcrt:
1791 fd = msvcrt.get_osfhandle(fd)
1792 reduction.send_handle(conn, fd, p.pid)
1793 p.join()
1794 with open(test.support.TESTFN, "rb") as f:
1795 self.assertEqual(f.read(), b"foo")
1796
Charles-François Natalibc8f0822011-09-20 20:36:51 +02001797 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001798 @unittest.skipIf(sys.platform == "win32",
1799 "test semantics don't make sense on Windows")
1800 @unittest.skipIf(MAXFD <= 256,
1801 "largest assignable fd number is too small")
1802 @unittest.skipUnless(hasattr(os, "dup2"),
1803 "test needs os.dup2()")
1804 def test_large_fd_transfer(self):
1805 # With fd > 256 (issue #11657)
1806 if self.TYPE != 'processes':
1807 self.skipTest("only makes sense with processes")
1808 conn, child_conn = self.Pipe(duplex=True)
1809
1810 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001811 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001812 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001813 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001814 with open(test.support.TESTFN, "wb") as f:
1815 fd = f.fileno()
1816 for newfd in range(256, MAXFD):
1817 if not self._is_fd_assigned(newfd):
1818 break
1819 else:
1820 self.fail("could not find an unassigned large file descriptor")
1821 os.dup2(fd, newfd)
1822 try:
1823 reduction.send_handle(conn, newfd, p.pid)
1824 finally:
1825 os.close(newfd)
1826 p.join()
1827 with open(test.support.TESTFN, "rb") as f:
1828 self.assertEqual(f.read(), b"bar")
1829
Jesus Cea4507e642011-09-21 03:53:25 +02001830 @classmethod
1831 def _send_data_without_fd(self, conn):
1832 os.write(conn.fileno(), b"\0")
1833
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001834 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001835 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1836 def test_missing_fd_transfer(self):
1837 # Check that exception is raised when received data is not
1838 # accompanied by a file descriptor in ancillary data.
1839 if self.TYPE != 'processes':
1840 self.skipTest("only makes sense with processes")
1841 conn, child_conn = self.Pipe(duplex=True)
1842
1843 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1844 p.daemon = True
1845 p.start()
1846 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1847 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001848
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001849class _TestListener(BaseTestCase):
1850
1851 ALLOWED_TYPES = ('processes')
1852
1853 def test_multiple_bind(self):
1854 for family in self.connection.families:
1855 l = self.connection.Listener(family=family)
1856 self.addCleanup(l.close)
1857 self.assertRaises(OSError, self.connection.Listener,
1858 l.address, family)
1859
Benjamin Petersone711caf2008-06-11 16:44:04 +00001860class _TestListenerClient(BaseTestCase):
1861
1862 ALLOWED_TYPES = ('processes', 'threads')
1863
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001864 @classmethod
1865 def _test(cls, address):
1866 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001867 conn.send('hello')
1868 conn.close()
1869
1870 def test_listener_client(self):
1871 for family in self.connection.families:
1872 l = self.connection.Listener(family=family)
1873 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001874 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001875 p.start()
1876 conn = l.accept()
1877 self.assertEqual(conn.recv(), 'hello')
1878 p.join()
1879 l.close()
Charles-François Natalied4a8fc2012-02-08 21:15:58 +01001880
Richard Oudkerkfdb8dcf2012-05-05 19:45:37 +01001881 def test_issue14725(self):
1882 l = self.connection.Listener()
1883 p = self.Process(target=self._test, args=(l.address,))
1884 p.daemon = True
1885 p.start()
1886 time.sleep(1)
1887 # On Windows the client process should by now have connected,
1888 # written data and closed the pipe handle by now. This causes
1889 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
1890 # 14725.
1891 conn = l.accept()
1892 self.assertEqual(conn.recv(), 'hello')
1893 conn.close()
1894 p.join()
1895 l.close()
1896
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01001897class _TestPoll(unittest.TestCase):
1898
1899 ALLOWED_TYPES = ('processes', 'threads')
1900
1901 def test_empty_string(self):
1902 a, b = self.Pipe()
1903 self.assertEqual(a.poll(), False)
1904 b.send_bytes(b'')
1905 self.assertEqual(a.poll(), True)
1906 self.assertEqual(a.poll(), True)
1907
1908 @classmethod
1909 def _child_strings(cls, conn, strings):
1910 for s in strings:
1911 time.sleep(0.1)
1912 conn.send_bytes(s)
1913 conn.close()
1914
1915 def test_strings(self):
1916 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
1917 a, b = self.Pipe()
1918 p = self.Process(target=self._child_strings, args=(b, strings))
1919 p.start()
1920
1921 for s in strings:
1922 for i in range(200):
1923 if a.poll(0.01):
1924 break
1925 x = a.recv_bytes()
1926 self.assertEqual(s, x)
1927
1928 p.join()
1929
1930 @classmethod
1931 def _child_boundaries(cls, r):
1932 # Polling may "pull" a message in to the child process, but we
1933 # don't want it to pull only part of a message, as that would
1934 # corrupt the pipe for any other processes which might later
1935 # read from it.
1936 r.poll(5)
1937
1938 def test_boundaries(self):
1939 r, w = self.Pipe(False)
1940 p = self.Process(target=self._child_boundaries, args=(r,))
1941 p.start()
1942 time.sleep(2)
1943 L = [b"first", b"second"]
1944 for obj in L:
1945 w.send_bytes(obj)
1946 w.close()
1947 p.join()
1948 self.assertIn(r.recv_bytes(), L)
1949
1950 @classmethod
1951 def _child_dont_merge(cls, b):
1952 b.send_bytes(b'a')
1953 b.send_bytes(b'b')
1954 b.send_bytes(b'cd')
1955
1956 def test_dont_merge(self):
1957 a, b = self.Pipe()
1958 self.assertEqual(a.poll(0.0), False)
1959 self.assertEqual(a.poll(0.1), False)
1960
1961 p = self.Process(target=self._child_dont_merge, args=(b,))
1962 p.start()
1963
1964 self.assertEqual(a.recv_bytes(), b'a')
1965 self.assertEqual(a.poll(1.0), True)
1966 self.assertEqual(a.poll(1.0), True)
1967 self.assertEqual(a.recv_bytes(), b'b')
1968 self.assertEqual(a.poll(1.0), True)
1969 self.assertEqual(a.poll(1.0), True)
1970 self.assertEqual(a.poll(0.0), True)
1971 self.assertEqual(a.recv_bytes(), b'cd')
1972
1973 p.join()
1974
Benjamin Petersone711caf2008-06-11 16:44:04 +00001975#
1976# Test of sending connection and socket objects between processes
1977#
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001978
Richard Oudkerk24524192012-04-30 14:48:51 +01001979# Intermittent fails on Mac OS X -- see Issue14669 and Issue12958
1980@unittest.skipIf(sys.platform == "darwin", "fd passing unreliable on Mac OS X")
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001981@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Benjamin Petersone711caf2008-06-11 16:44:04 +00001982class _TestPicklingConnections(BaseTestCase):
1983
1984 ALLOWED_TYPES = ('processes',)
1985
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001986 @classmethod
Antoine Pitrou92ff4e12012-04-27 23:51:03 +02001987 def tearDownClass(cls):
1988 from multiprocessing.reduction import resource_sharer
1989 resource_sharer.stop(timeout=5)
1990
1991 @classmethod
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001992 def _listener(cls, conn, families):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001993 for fam in families:
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001994 l = cls.connection.Listener(family=fam)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001995 conn.send(l.address)
1996 new_conn = l.accept()
1997 conn.send(new_conn)
Antoine Pitrou5438ed12012-04-24 22:56:57 +02001998 new_conn.close()
1999 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002000
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002001 l = socket.socket()
2002 l.bind(('localhost', 0))
2003 conn.send(l.getsockname())
2004 l.listen(1)
2005 new_conn, addr = l.accept()
2006 conn.send(new_conn)
2007 new_conn.close()
2008 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002009
2010 conn.recv()
2011
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002012 @classmethod
2013 def _remote(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002014 for (address, msg) in iter(conn.recv, None):
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002015 client = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002016 client.send(msg.upper())
2017 client.close()
2018
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002019 address, msg = conn.recv()
2020 client = socket.socket()
2021 client.connect(address)
2022 client.sendall(msg.upper())
2023 client.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002024
2025 conn.close()
2026
2027 def test_pickling(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002028 families = self.connection.families
2029
2030 lconn, lconn0 = self.Pipe()
2031 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02002032 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002033 lp.start()
2034 lconn0.close()
2035
2036 rconn, rconn0 = self.Pipe()
2037 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002038 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002039 rp.start()
2040 rconn0.close()
2041
2042 for fam in families:
2043 msg = ('This connection uses family %s' % fam).encode('ascii')
2044 address = lconn.recv()
2045 rconn.send((address, msg))
2046 new_conn = lconn.recv()
2047 self.assertEqual(new_conn.recv(), msg.upper())
2048
2049 rconn.send(None)
2050
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002051 msg = latin('This connection uses a normal socket')
2052 address = lconn.recv()
2053 rconn.send((address, msg))
2054 new_conn = lconn.recv()
Richard Oudkerk4460c342012-04-30 14:48:50 +01002055 buf = []
2056 while True:
2057 s = new_conn.recv(100)
2058 if not s:
2059 break
2060 buf.append(s)
2061 buf = b''.join(buf)
2062 self.assertEqual(buf, msg.upper())
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002063 new_conn.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002064
2065 lconn.send(None)
2066
2067 rconn.close()
2068 lconn.close()
2069
2070 lp.join()
2071 rp.join()
Antoine Pitrou5438ed12012-04-24 22:56:57 +02002072
2073 @classmethod
2074 def child_access(cls, conn):
2075 w = conn.recv()
2076 w.send('all is well')
2077 w.close()
2078
2079 r = conn.recv()
2080 msg = r.recv()
2081 conn.send(msg*2)
2082
2083 conn.close()
2084
2085 def test_access(self):
2086 # On Windows, if we do not specify a destination pid when
2087 # using DupHandle then we need to be careful to use the
2088 # correct access flags for DuplicateHandle(), or else
2089 # DupHandle.detach() will raise PermissionError. For example,
2090 # for a read only pipe handle we should use
2091 # access=FILE_GENERIC_READ. (Unfortunately
2092 # DUPLICATE_SAME_ACCESS does not work.)
2093 conn, child_conn = self.Pipe()
2094 p = self.Process(target=self.child_access, args=(child_conn,))
2095 p.daemon = True
2096 p.start()
2097 child_conn.close()
2098
2099 r, w = self.Pipe(duplex=False)
2100 conn.send(w)
2101 w.close()
2102 self.assertEqual(r.recv(), 'all is well')
2103 r.close()
2104
2105 r, w = self.Pipe(duplex=False)
2106 conn.send(r)
2107 r.close()
2108 w.send('foobar')
2109 w.close()
2110 self.assertEqual(conn.recv(), 'foobar'*2)
2111
Benjamin Petersone711caf2008-06-11 16:44:04 +00002112#
2113#
2114#
2115
2116class _TestHeap(BaseTestCase):
2117
2118 ALLOWED_TYPES = ('processes',)
2119
2120 def test_heap(self):
2121 iterations = 5000
2122 maxblocks = 50
2123 blocks = []
2124
2125 # create and destroy lots of blocks of different sizes
2126 for i in range(iterations):
2127 size = int(random.lognormvariate(0, 1) * 1000)
2128 b = multiprocessing.heap.BufferWrapper(size)
2129 blocks.append(b)
2130 if len(blocks) > maxblocks:
2131 i = random.randrange(maxblocks)
2132 del blocks[i]
2133
2134 # get the heap object
2135 heap = multiprocessing.heap.BufferWrapper._heap
2136
2137 # verify the state of the heap
2138 all = []
2139 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02002140 heap._lock.acquire()
2141 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002142 for L in list(heap._len_to_seq.values()):
2143 for arena, start, stop in L:
2144 all.append((heap._arenas.index(arena), start, stop,
2145 stop-start, 'free'))
2146 for arena, start, stop in heap._allocated_blocks:
2147 all.append((heap._arenas.index(arena), start, stop,
2148 stop-start, 'occupied'))
2149 occupied += (stop-start)
2150
2151 all.sort()
2152
2153 for i in range(len(all)-1):
2154 (arena, start, stop) = all[i][:3]
2155 (narena, nstart, nstop) = all[i+1][:3]
2156 self.assertTrue((arena != narena and nstart == 0) or
2157 (stop == nstart))
2158
Charles-François Natali778db492011-07-02 14:35:49 +02002159 def test_free_from_gc(self):
2160 # Check that freeing of blocks by the garbage collector doesn't deadlock
2161 # (issue #12352).
2162 # Make sure the GC is enabled, and set lower collection thresholds to
2163 # make collections more frequent (and increase the probability of
2164 # deadlock).
2165 if not gc.isenabled():
2166 gc.enable()
2167 self.addCleanup(gc.disable)
2168 thresholds = gc.get_threshold()
2169 self.addCleanup(gc.set_threshold, *thresholds)
2170 gc.set_threshold(10)
2171
2172 # perform numerous block allocations, with cyclic references to make
2173 # sure objects are collected asynchronously by the gc
2174 for i in range(5000):
2175 a = multiprocessing.heap.BufferWrapper(1)
2176 b = multiprocessing.heap.BufferWrapper(1)
2177 # circular references
2178 a.buddy = b
2179 b.buddy = a
2180
Benjamin Petersone711caf2008-06-11 16:44:04 +00002181#
2182#
2183#
2184
Benjamin Petersone711caf2008-06-11 16:44:04 +00002185class _Foo(Structure):
2186 _fields_ = [
2187 ('x', c_int),
2188 ('y', c_double)
2189 ]
2190
2191class _TestSharedCTypes(BaseTestCase):
2192
2193 ALLOWED_TYPES = ('processes',)
2194
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00002195 def setUp(self):
2196 if not HAS_SHAREDCTYPES:
2197 self.skipTest("requires multiprocessing.sharedctypes")
2198
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002199 @classmethod
2200 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002201 x.value *= 2
2202 y.value *= 2
2203 foo.x *= 2
2204 foo.y *= 2
2205 string.value *= 2
2206 for i in range(len(arr)):
2207 arr[i] *= 2
2208
2209 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002210 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002211 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002212 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00002213 arr = self.Array('d', list(range(10)), lock=lock)
2214 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00002215 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002216
2217 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02002218 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002219 p.start()
2220 p.join()
2221
2222 self.assertEqual(x.value, 14)
2223 self.assertAlmostEqual(y.value, 2.0/3.0)
2224 self.assertEqual(foo.x, 6)
2225 self.assertAlmostEqual(foo.y, 4.0)
2226 for i in range(10):
2227 self.assertAlmostEqual(arr[i], i*2)
2228 self.assertEqual(string.value, latin('hellohello'))
2229
2230 def test_synchronize(self):
2231 self.test_sharedctypes(lock=True)
2232
2233 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002234 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00002235 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002236 foo.x = 0
2237 foo.y = 0
2238 self.assertEqual(bar.x, 2)
2239 self.assertAlmostEqual(bar.y, 5.0)
2240
2241#
2242#
2243#
2244
2245class _TestFinalize(BaseTestCase):
2246
2247 ALLOWED_TYPES = ('processes',)
2248
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002249 @classmethod
2250 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002251 class Foo(object):
2252 pass
2253
2254 a = Foo()
2255 util.Finalize(a, conn.send, args=('a',))
2256 del a # triggers callback for a
2257
2258 b = Foo()
2259 close_b = util.Finalize(b, conn.send, args=('b',))
2260 close_b() # triggers callback for b
2261 close_b() # does nothing because callback has already been called
2262 del b # does nothing because callback has already been called
2263
2264 c = Foo()
2265 util.Finalize(c, conn.send, args=('c',))
2266
2267 d10 = Foo()
2268 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2269
2270 d01 = Foo()
2271 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2272 d02 = Foo()
2273 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2274 d03 = Foo()
2275 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2276
2277 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2278
2279 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2280
Ezio Melotti13925002011-03-16 11:05:33 +02002281 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00002282 # garbage collecting locals
2283 util._exit_function()
2284 conn.close()
2285 os._exit(0)
2286
2287 def test_finalize(self):
2288 conn, child_conn = self.Pipe()
2289
2290 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002291 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00002292 p.start()
2293 p.join()
2294
2295 result = [obj for obj in iter(conn.recv, 'STOP')]
2296 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2297
2298#
2299# Test that from ... import * works for each module
2300#
2301
2302class _TestImportStar(BaseTestCase):
2303
2304 ALLOWED_TYPES = ('processes',)
2305
2306 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002307 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00002308 'multiprocessing', 'multiprocessing.connection',
2309 'multiprocessing.heap', 'multiprocessing.managers',
2310 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00002311 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002312 ]
2313
Charles-François Natalibc8f0822011-09-20 20:36:51 +02002314 if HAS_REDUCTION:
2315 modules.append('multiprocessing.reduction')
2316
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002317 if c_int is not None:
2318 # This module requires _ctypes
2319 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00002320
2321 for name in modules:
2322 __import__(name)
2323 mod = sys.modules[name]
2324
2325 for attr in getattr(mod, '__all__', ()):
2326 self.assertTrue(
2327 hasattr(mod, attr),
2328 '%r does not have attribute %r' % (mod, attr)
2329 )
2330
2331#
2332# Quick test that logging works -- does not test logging output
2333#
2334
2335class _TestLogging(BaseTestCase):
2336
2337 ALLOWED_TYPES = ('processes',)
2338
2339 def test_enable_logging(self):
2340 logger = multiprocessing.get_logger()
2341 logger.setLevel(util.SUBWARNING)
2342 self.assertTrue(logger is not None)
2343 logger.debug('this will not be printed')
2344 logger.info('nor will this')
2345 logger.setLevel(LOG_LEVEL)
2346
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002347 @classmethod
2348 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002349 logger = multiprocessing.get_logger()
2350 conn.send(logger.getEffectiveLevel())
2351
2352 def test_level(self):
2353 LEVEL1 = 32
2354 LEVEL2 = 37
2355
2356 logger = multiprocessing.get_logger()
2357 root_logger = logging.getLogger()
2358 root_level = root_logger.level
2359
2360 reader, writer = multiprocessing.Pipe(duplex=False)
2361
2362 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002363 p = self.Process(target=self._test_level, args=(writer,))
2364 p.daemon = True
2365 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002366 self.assertEqual(LEVEL1, reader.recv())
2367
2368 logger.setLevel(logging.NOTSET)
2369 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002370 p = self.Process(target=self._test_level, args=(writer,))
2371 p.daemon = True
2372 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002373 self.assertEqual(LEVEL2, reader.recv())
2374
2375 root_logger.setLevel(root_level)
2376 logger.setLevel(level=LOG_LEVEL)
2377
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002378
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002379# class _TestLoggingProcessName(BaseTestCase):
2380#
2381# def handle(self, record):
2382# assert record.processName == multiprocessing.current_process().name
2383# self.__handled = True
2384#
2385# def test_logging(self):
2386# handler = logging.Handler()
2387# handler.handle = self.handle
2388# self.__handled = False
2389# # Bypass getLogger() and side-effects
2390# logger = logging.getLoggerClass()(
2391# 'multiprocessing.test.TestLoggingProcessName')
2392# logger.addHandler(handler)
2393# logger.propagate = False
2394#
2395# logger.warn('foo')
2396# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002397
Benjamin Petersone711caf2008-06-11 16:44:04 +00002398#
Jesse Noller6214edd2009-01-19 16:23:53 +00002399# Test to verify handle verification, see issue 3321
2400#
2401
2402class TestInvalidHandle(unittest.TestCase):
2403
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002404 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002405 def test_invalid_handles(self):
Antoine Pitrou87cf2202011-05-09 17:04:27 +02002406 conn = multiprocessing.connection.Connection(44977608)
2407 try:
2408 self.assertRaises((ValueError, IOError), conn.poll)
2409 finally:
2410 # Hack private attribute _handle to avoid printing an error
2411 # in conn.__del__
2412 conn._handle = None
2413 self.assertRaises((ValueError, IOError),
2414 multiprocessing.connection.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002415
Jesse Noller6214edd2009-01-19 16:23:53 +00002416#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002417# Functions used to create test cases from the base ones in this module
2418#
2419
2420def get_attributes(Source, names):
2421 d = {}
2422 for name in names:
2423 obj = getattr(Source, name)
2424 if type(obj) == type(get_attributes):
2425 obj = staticmethod(obj)
2426 d[name] = obj
2427 return d
2428
2429def create_test_cases(Mixin, type):
2430 result = {}
2431 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002432 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002433
2434 for name in list(glob.keys()):
2435 if name.startswith('_Test'):
2436 base = glob[name]
2437 if type in base.ALLOWED_TYPES:
2438 newname = 'With' + Type + name[1:]
2439 class Temp(base, unittest.TestCase, Mixin):
2440 pass
2441 result[newname] = Temp
2442 Temp.__name__ = newname
2443 Temp.__module__ = Mixin.__module__
2444 return result
2445
2446#
2447# Create test cases
2448#
2449
2450class ProcessesMixin(object):
2451 TYPE = 'processes'
2452 Process = multiprocessing.Process
2453 locals().update(get_attributes(multiprocessing, (
2454 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2455 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2456 'RawArray', 'current_process', 'active_children', 'Pipe',
2457 'connection', 'JoinableQueue'
2458 )))
2459
2460testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2461globals().update(testcases_processes)
2462
2463
2464class ManagerMixin(object):
2465 TYPE = 'manager'
2466 Process = multiprocessing.Process
2467 manager = object.__new__(multiprocessing.managers.SyncManager)
2468 locals().update(get_attributes(manager, (
2469 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2470 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2471 'Namespace', 'JoinableQueue'
2472 )))
2473
2474testcases_manager = create_test_cases(ManagerMixin, type='manager')
2475globals().update(testcases_manager)
2476
2477
2478class ThreadsMixin(object):
2479 TYPE = 'threads'
2480 Process = multiprocessing.dummy.Process
2481 locals().update(get_attributes(multiprocessing.dummy, (
2482 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2483 'Condition', 'Event', 'Value', 'Array', 'current_process',
2484 'active_children', 'Pipe', 'connection', 'dict', 'list',
2485 'Namespace', 'JoinableQueue'
2486 )))
2487
2488testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2489globals().update(testcases_threads)
2490
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002491class OtherTest(unittest.TestCase):
2492 # TODO: add more tests for deliver/answer challenge.
2493 def test_deliver_challenge_auth_failure(self):
2494 class _FakeConnection(object):
2495 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002496 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002497 def send_bytes(self, data):
2498 pass
2499 self.assertRaises(multiprocessing.AuthenticationError,
2500 multiprocessing.connection.deliver_challenge,
2501 _FakeConnection(), b'abc')
2502
2503 def test_answer_challenge_auth_failure(self):
2504 class _FakeConnection(object):
2505 def __init__(self):
2506 self.count = 0
2507 def recv_bytes(self, size):
2508 self.count += 1
2509 if self.count == 1:
2510 return multiprocessing.connection.CHALLENGE
2511 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002512 return b'something bogus'
2513 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002514 def send_bytes(self, data):
2515 pass
2516 self.assertRaises(multiprocessing.AuthenticationError,
2517 multiprocessing.connection.answer_challenge,
2518 _FakeConnection(), b'abc')
2519
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002520#
2521# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2522#
2523
2524def initializer(ns):
2525 ns.test += 1
2526
2527class TestInitializers(unittest.TestCase):
2528 def setUp(self):
2529 self.mgr = multiprocessing.Manager()
2530 self.ns = self.mgr.Namespace()
2531 self.ns.test = 0
2532
2533 def tearDown(self):
2534 self.mgr.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002535 self.mgr.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002536
2537 def test_manager_initializer(self):
2538 m = multiprocessing.managers.SyncManager()
2539 self.assertRaises(TypeError, m.start, 1)
2540 m.start(initializer, (self.ns,))
2541 self.assertEqual(self.ns.test, 1)
2542 m.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002543 m.join()
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002544
2545 def test_pool_initializer(self):
2546 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2547 p = multiprocessing.Pool(1, initializer, (self.ns,))
2548 p.close()
2549 p.join()
2550 self.assertEqual(self.ns.test, 1)
2551
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002552#
2553# Issue 5155, 5313, 5331: Test process in processes
2554# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2555#
2556
2557def _ThisSubProcess(q):
2558 try:
2559 item = q.get(block=False)
2560 except pyqueue.Empty:
2561 pass
2562
2563def _TestProcess(q):
2564 queue = multiprocessing.Queue()
2565 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002566 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002567 subProc.start()
2568 subProc.join()
2569
2570def _afunc(x):
2571 return x*x
2572
2573def pool_in_process():
2574 pool = multiprocessing.Pool(processes=4)
2575 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002576 pool.close()
2577 pool.join()
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002578
2579class _file_like(object):
2580 def __init__(self, delegate):
2581 self._delegate = delegate
2582 self._pid = None
2583
2584 @property
2585 def cache(self):
2586 pid = os.getpid()
2587 # There are no race conditions since fork keeps only the running thread
2588 if pid != self._pid:
2589 self._pid = pid
2590 self._cache = []
2591 return self._cache
2592
2593 def write(self, data):
2594 self.cache.append(data)
2595
2596 def flush(self):
2597 self._delegate.write(''.join(self.cache))
2598 self._cache = []
2599
2600class TestStdinBadfiledescriptor(unittest.TestCase):
2601
2602 def test_queue_in_process(self):
2603 queue = multiprocessing.Queue()
2604 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2605 proc.start()
2606 proc.join()
2607
2608 def test_pool_in_process(self):
2609 p = multiprocessing.Process(target=pool_in_process)
2610 p.start()
2611 p.join()
2612
2613 def test_flushing(self):
2614 sio = io.StringIO()
2615 flike = _file_like(sio)
2616 flike.write('foo')
2617 proc = multiprocessing.Process(target=lambda: flike.flush())
2618 flike.flush()
2619 assert sio.getvalue() == 'foo'
2620
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002621
2622class TestWait(unittest.TestCase):
2623
2624 @classmethod
2625 def _child_test_wait(cls, w, slow):
2626 for i in range(10):
2627 if slow:
2628 time.sleep(random.random()*0.1)
2629 w.send((i, os.getpid()))
2630 w.close()
2631
2632 def test_wait(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002633 from multiprocessing.connection import wait
2634 readers = []
2635 procs = []
2636 messages = []
2637
2638 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002639 r, w = multiprocessing.Pipe(duplex=False)
2640 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002641 p.daemon = True
2642 p.start()
2643 w.close()
2644 readers.append(r)
2645 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002646 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002647
2648 while readers:
2649 for r in wait(readers):
2650 try:
2651 msg = r.recv()
2652 except EOFError:
2653 readers.remove(r)
2654 r.close()
2655 else:
2656 messages.append(msg)
2657
2658 messages.sort()
2659 expected = sorted((i, p.pid) for i in range(10) for p in procs)
2660 self.assertEqual(messages, expected)
2661
2662 @classmethod
2663 def _child_test_wait_socket(cls, address, slow):
2664 s = socket.socket()
2665 s.connect(address)
2666 for i in range(10):
2667 if slow:
2668 time.sleep(random.random()*0.1)
2669 s.sendall(('%s\n' % i).encode('ascii'))
2670 s.close()
2671
2672 def test_wait_socket(self, slow=False):
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002673 from multiprocessing.connection import wait
2674 l = socket.socket()
2675 l.bind(('', 0))
2676 l.listen(4)
2677 addr = ('localhost', l.getsockname()[1])
2678 readers = []
2679 procs = []
2680 dic = {}
2681
2682 for i in range(4):
Antoine Pitrou5bb9a8f2012-03-06 13:43:24 +01002683 p = multiprocessing.Process(target=self._child_test_wait_socket,
2684 args=(addr, slow))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002685 p.daemon = True
2686 p.start()
2687 procs.append(p)
Antoine Pitrou6c64cc12012-03-06 13:42:35 +01002688 self.addCleanup(p.join)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002689
2690 for i in range(4):
2691 r, _ = l.accept()
2692 readers.append(r)
2693 dic[r] = []
2694 l.close()
2695
2696 while readers:
2697 for r in wait(readers):
2698 msg = r.recv(32)
2699 if not msg:
2700 readers.remove(r)
2701 r.close()
2702 else:
2703 dic[r].append(msg)
2704
2705 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
2706 for v in dic.values():
2707 self.assertEqual(b''.join(v), expected)
2708
2709 def test_wait_slow(self):
2710 self.test_wait(True)
2711
2712 def test_wait_socket_slow(self):
2713 self.test_wait(True)
2714
2715 def test_wait_timeout(self):
2716 from multiprocessing.connection import wait
2717
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002718 expected = 5
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002719 a, b = multiprocessing.Pipe()
2720
2721 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002722 res = wait([a, b], expected)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002723 delta = time.time() - start
2724
2725 self.assertEqual(res, [])
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002726 self.assertLess(delta, expected + 1)
2727 self.assertGreater(delta, expected - 1)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002728
2729 b.send(None)
2730
2731 start = time.time()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002732 res = wait([a, b], 20)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002733 delta = time.time() - start
2734
2735 self.assertEqual(res, [a])
Antoine Pitrou37749772012-03-09 18:40:15 +01002736 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002737
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002738 @classmethod
2739 def signal_and_sleep(cls, sem, period):
2740 sem.release()
2741 time.sleep(period)
2742
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002743 def test_wait_integer(self):
2744 from multiprocessing.connection import wait
2745
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002746 expected = 3
2747 sem = multiprocessing.Semaphore(0)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002748 a, b = multiprocessing.Pipe()
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002749 p = multiprocessing.Process(target=self.signal_and_sleep,
2750 args=(sem, expected))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002751
2752 p.start()
2753 self.assertIsInstance(p.sentinel, int)
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002754 self.assertTrue(sem.acquire(timeout=20))
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002755
2756 start = time.time()
2757 res = wait([a, p.sentinel, b], expected + 20)
2758 delta = time.time() - start
2759
2760 self.assertEqual(res, [p.sentinel])
Antoine Pitrou37749772012-03-09 18:40:15 +01002761 self.assertLess(delta, expected + 2)
2762 self.assertGreater(delta, expected - 2)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002763
2764 a.send(None)
2765
2766 start = time.time()
2767 res = wait([a, p.sentinel, b], 20)
2768 delta = time.time() - start
2769
2770 self.assertEqual(res, [p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002771 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002772
2773 b.send(None)
2774
2775 start = time.time()
2776 res = wait([a, p.sentinel, b], 20)
2777 delta = time.time() - start
2778
2779 self.assertEqual(res, [a, p.sentinel, b])
Antoine Pitrou37749772012-03-09 18:40:15 +01002780 self.assertLess(delta, 0.4)
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002781
Richard Oudkerk009b15e2012-05-04 09:44:39 +01002782 p.terminate()
Antoine Pitroubdb1cf12012-03-05 19:28:37 +01002783 p.join()
2784
2785
Antoine Pitrou709176f2012-04-01 17:19:09 +02002786#
2787# Issue 14151: Test invalid family on invalid environment
2788#
2789
2790class TestInvalidFamily(unittest.TestCase):
2791
2792 @unittest.skipIf(WIN32, "skipped on Windows")
2793 def test_invalid_family(self):
2794 with self.assertRaises(ValueError):
2795 multiprocessing.connection.Listener(r'\\.\test')
2796
Antoine Pitrou6d20cba2012-04-03 20:12:23 +02002797 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
2798 def test_invalid_family_win32(self):
2799 with self.assertRaises(ValueError):
2800 multiprocessing.connection.Listener('/var/test.pipe')
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02002801
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002802testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
Antoine Pitrou93bba8f2012-04-01 17:25:49 +02002803 TestStdinBadfiledescriptor, TestWait, TestInvalidFamily]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002804
Benjamin Petersone711caf2008-06-11 16:44:04 +00002805#
2806#
2807#
2808
2809def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002810 if sys.platform.startswith("linux"):
2811 try:
2812 lock = multiprocessing.RLock()
2813 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002814 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002815
Charles-François Natali221ef672011-11-22 18:55:22 +01002816 check_enough_semaphores()
2817
Benjamin Petersone711caf2008-06-11 16:44:04 +00002818 if run is None:
2819 from test.support import run_unittest as run
2820
2821 util.get_temp_dir() # creates temp directory for use by all processes
2822
2823 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2824
Benjamin Peterson41181742008-07-02 20:22:54 +00002825 ProcessesMixin.pool = multiprocessing.Pool(4)
2826 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2827 ManagerMixin.manager.__init__()
2828 ManagerMixin.manager.start()
2829 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002830
2831 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002832 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2833 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002834 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2835 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002836 )
2837
2838 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2839 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002840 try:
2841 run(suite)
2842 finally:
2843 ThreadsMixin.pool.terminate()
2844 ProcessesMixin.pool.terminate()
2845 ManagerMixin.pool.terminate()
2846 ManagerMixin.pool.join()
2847 ManagerMixin.manager.shutdown()
Richard Oudkerka6becaa2012-05-03 18:29:02 +01002848 ManagerMixin.manager.join()
Richard Oudkerk225cb8d2012-05-02 19:36:11 +01002849 ThreadsMixin.pool.join()
2850 ProcessesMixin.pool.join()
2851 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002852
2853def main():
2854 test_main(unittest.TextTestRunner(verbosity=2).run)
2855
2856if __name__ == '__main__':
2857 main()