blob: 6940d0e2f7ef40129fcbd88b863a037a10a7f4e6 [file] [log] [blame]
Benjamin Peterson90f5ba52010-03-11 22:53:45 +00001#!/usr/bin/env python3
Georg Brandl86b2fb92008-07-16 03:43:04 +00002
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
Benjamin Petersone711caf2008-06-11 16:44:04 +00008import queue as pyqueue
9import time
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +000010import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000011import sys
12import os
13import gc
Antoine Pitrouc824e9a2011-04-05 18:11:33 +020014import errno
Benjamin Petersone711caf2008-06-11 16:44:04 +000015import signal
16import array
Benjamin Petersone711caf2008-06-11 16:44:04 +000017import socket
18import random
19import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000020import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000021
Benjamin Petersone5384b02008-10-04 22:00:42 +000022
R. David Murraya21e4ca2009-03-31 23:16:50 +000023# Skip tests if _multiprocessing wasn't built.
24_multiprocessing = test.support.import_module('_multiprocessing')
25# Skip tests if sem_open implementation is broken.
26test.support.import_module('multiprocessing.synchronize')
Victor Stinner45df8202010-04-28 22:31:17 +000027# import threading after _multiprocessing to raise a more revelant error
28# message: "No module named _multiprocessing". _multiprocessing is not compiled
29# without thread support.
30import threading
Benjamin Petersone5384b02008-10-04 22:00:42 +000031
Benjamin Petersone711caf2008-06-11 16:44:04 +000032import multiprocessing.dummy
33import multiprocessing.connection
34import multiprocessing.managers
35import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000036import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Charles-François Natalie51c8da2011-09-21 18:48:21 +020038from multiprocessing import util
39
40try:
41 from multiprocessing import reduction
42 HAS_REDUCTION = True
43except ImportError:
44 HAS_REDUCTION = False
Benjamin Petersone711caf2008-06-11 16:44:04 +000045
Brian Curtinafa88b52010-10-07 01:12:19 +000046try:
47 from multiprocessing.sharedctypes import Value, copy
48 HAS_SHAREDCTYPES = True
49except ImportError:
50 HAS_SHAREDCTYPES = False
51
Antoine Pitroubcb39d42011-08-23 19:46:22 +020052try:
53 import msvcrt
54except ImportError:
55 msvcrt = None
56
Benjamin Petersone711caf2008-06-11 16:44:04 +000057#
58#
59#
60
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000061def latin(s):
62 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000063
Benjamin Petersone711caf2008-06-11 16:44:04 +000064#
65# Constants
66#
67
68LOG_LEVEL = util.SUBWARNING
Jesse Noller1f0b6582010-01-27 03:36:01 +000069#LOG_LEVEL = logging.DEBUG
Benjamin Petersone711caf2008-06-11 16:44:04 +000070
71DELTA = 0.1
72CHECK_TIMINGS = False # making true makes tests take a lot longer
73 # and can sometimes cause some non-serious
74 # failures because some calls block a bit
75 # longer than expected
76if CHECK_TIMINGS:
77 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
78else:
79 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
80
81HAVE_GETVALUE = not getattr(_multiprocessing,
82 'HAVE_BROKEN_SEM_GETVALUE', False)
83
Jesse Noller6214edd2009-01-19 16:23:53 +000084WIN32 = (sys.platform == "win32")
85
Antoine Pitroubcb39d42011-08-23 19:46:22 +020086try:
87 MAXFD = os.sysconf("SC_OPEN_MAX")
88except:
89 MAXFD = 256
90
Benjamin Petersone711caf2008-06-11 16:44:04 +000091#
Florent Xiclunafd1b0932010-03-28 00:25:02 +000092# Some tests require ctypes
93#
94
95try:
Florent Xiclunaaa171062010-08-14 15:56:42 +000096 from ctypes import Structure, c_int, c_double
Florent Xiclunafd1b0932010-03-28 00:25:02 +000097except ImportError:
98 Structure = object
99 c_int = c_double = None
100
101#
Benjamin Petersone711caf2008-06-11 16:44:04 +0000102# Creates a wrapper for a function which records the time it takes to finish
103#
104
105class TimingWrapper(object):
106
107 def __init__(self, func):
108 self.func = func
109 self.elapsed = None
110
111 def __call__(self, *args, **kwds):
112 t = time.time()
113 try:
114 return self.func(*args, **kwds)
115 finally:
116 self.elapsed = time.time() - t
117
118#
119# Base class for test cases
120#
121
122class BaseTestCase(object):
123
124 ALLOWED_TYPES = ('processes', 'manager', 'threads')
125
126 def assertTimingAlmostEqual(self, a, b):
127 if CHECK_TIMINGS:
128 self.assertAlmostEqual(a, b, 1)
129
130 def assertReturnsIfImplemented(self, value, func, *args):
131 try:
132 res = func(*args)
133 except NotImplementedError:
134 pass
135 else:
136 return self.assertEqual(value, res)
137
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000138 # For the sanity of Windows users, rather than crashing or freezing in
139 # multiple ways.
140 def __reduce__(self, *args):
141 raise NotImplementedError("shouldn't try to pickle a test case")
142
143 __reduce_ex__ = __reduce__
144
Benjamin Petersone711caf2008-06-11 16:44:04 +0000145#
146# Return the value of a semaphore
147#
148
149def get_value(self):
150 try:
151 return self.get_value()
152 except AttributeError:
153 try:
154 return self._Semaphore__value
155 except AttributeError:
156 try:
157 return self._value
158 except AttributeError:
159 raise NotImplementedError
160
161#
162# Testcases
163#
164
165class _TestProcess(BaseTestCase):
166
167 ALLOWED_TYPES = ('processes', 'threads')
168
169 def test_current(self):
170 if self.TYPE == 'threads':
171 return
172
173 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000174 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000175
176 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000177 self.assertTrue(not current.daemon)
Ezio Melottie9615932010-01-24 19:26:24 +0000178 self.assertIsInstance(authkey, bytes)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000180 self.assertEqual(current.ident, os.getpid())
181 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000182
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000183 @classmethod
184 def _test(cls, q, *args, **kwds):
185 current = cls.current_process()
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 q.put(args)
187 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000188 q.put(current.name)
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000189 if cls.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000190 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000191 q.put(current.pid)
192
193 def test_process(self):
194 q = self.Queue(1)
195 e = self.Event()
196 args = (q, 1, 2)
197 kwargs = {'hello':23, 'bye':2.54}
198 name = 'SomeProcess'
199 p = self.Process(
200 target=self._test, args=args, kwargs=kwargs, name=name
201 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203 current = self.current_process()
204
205 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000206 self.assertEqual(p.authkey, current.authkey)
207 self.assertEqual(p.is_alive(), False)
208 self.assertEqual(p.daemon, True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000209 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000210 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000211 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000212
213 p.start()
214
Ezio Melottib3aedd42010-11-20 19:04:17 +0000215 self.assertEqual(p.exitcode, None)
216 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000217 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000218
Ezio Melottib3aedd42010-11-20 19:04:17 +0000219 self.assertEqual(q.get(), args[1:])
220 self.assertEqual(q.get(), kwargs)
221 self.assertEqual(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000222 if self.TYPE != 'threads':
Ezio Melottib3aedd42010-11-20 19:04:17 +0000223 self.assertEqual(q.get(), current.authkey)
224 self.assertEqual(q.get(), p.pid)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000225
226 p.join()
227
Ezio Melottib3aedd42010-11-20 19:04:17 +0000228 self.assertEqual(p.exitcode, 0)
229 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000230 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000231
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000232 @classmethod
233 def _test_terminate(cls):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000234 time.sleep(1000)
235
236 def test_terminate(self):
237 if self.TYPE == 'threads':
238 return
239
240 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000241 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000242 p.start()
243
244 self.assertEqual(p.is_alive(), True)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000245 self.assertIn(p, self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000246 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000247
248 p.terminate()
249
250 join = TimingWrapper(p.join)
251 self.assertEqual(join(), None)
252 self.assertTimingAlmostEqual(join.elapsed, 0.0)
253
254 self.assertEqual(p.is_alive(), False)
Benjamin Peterson577473f2010-01-19 00:09:57 +0000255 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000256
257 p.join()
258
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000259 # XXX sometimes get p.exitcode == 0 on Windows ...
260 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000261
262 def test_cpu_count(self):
263 try:
264 cpus = multiprocessing.cpu_count()
265 except NotImplementedError:
266 cpus = 1
267 self.assertTrue(type(cpus) is int)
268 self.assertTrue(cpus >= 1)
269
270 def test_active_children(self):
271 self.assertEqual(type(self.active_children()), list)
272
273 p = self.Process(target=time.sleep, args=(DELTA,))
Benjamin Peterson577473f2010-01-19 00:09:57 +0000274 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000275
Jesus Cea94f964f2011-09-09 20:26:57 +0200276 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000277 p.start()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000278 self.assertIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000279
280 p.join()
Benjamin Peterson577473f2010-01-19 00:09:57 +0000281 self.assertNotIn(p, self.active_children())
Benjamin Petersone711caf2008-06-11 16:44:04 +0000282
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000283 @classmethod
284 def _test_recursion(cls, wconn, id):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000285 from multiprocessing import forking
286 wconn.send(id)
287 if len(id) < 2:
288 for i in range(2):
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000289 p = cls.Process(
290 target=cls._test_recursion, args=(wconn, id+[i])
Benjamin Petersone711caf2008-06-11 16:44:04 +0000291 )
292 p.start()
293 p.join()
294
295 def test_recursion(self):
296 rconn, wconn = self.Pipe(duplex=False)
297 self._test_recursion(wconn, [])
298
299 time.sleep(DELTA)
300 result = []
301 while rconn.poll():
302 result.append(rconn.recv())
303
304 expected = [
305 [],
306 [0],
307 [0, 0],
308 [0, 1],
309 [1],
310 [1, 0],
311 [1, 1]
312 ]
313 self.assertEqual(result, expected)
314
315#
316#
317#
318
319class _UpperCaser(multiprocessing.Process):
320
321 def __init__(self):
322 multiprocessing.Process.__init__(self)
323 self.child_conn, self.parent_conn = multiprocessing.Pipe()
324
325 def run(self):
326 self.parent_conn.close()
327 for s in iter(self.child_conn.recv, None):
328 self.child_conn.send(s.upper())
329 self.child_conn.close()
330
331 def submit(self, s):
332 assert type(s) is str
333 self.parent_conn.send(s)
334 return self.parent_conn.recv()
335
336 def stop(self):
337 self.parent_conn.send(None)
338 self.parent_conn.close()
339 self.child_conn.close()
340
341class _TestSubclassingProcess(BaseTestCase):
342
343 ALLOWED_TYPES = ('processes',)
344
345 def test_subclassing(self):
346 uppercaser = _UpperCaser()
Jesus Cea94f964f2011-09-09 20:26:57 +0200347 uppercaser.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000348 uppercaser.start()
349 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
350 self.assertEqual(uppercaser.submit('world'), 'WORLD')
351 uppercaser.stop()
352 uppercaser.join()
353
354#
355#
356#
357
358def queue_empty(q):
359 if hasattr(q, 'empty'):
360 return q.empty()
361 else:
362 return q.qsize() == 0
363
364def queue_full(q, maxsize):
365 if hasattr(q, 'full'):
366 return q.full()
367 else:
368 return q.qsize() == maxsize
369
370
371class _TestQueue(BaseTestCase):
372
373
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000374 @classmethod
375 def _test_put(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000376 child_can_start.wait()
377 for i in range(6):
378 queue.get()
379 parent_can_continue.set()
380
381 def test_put(self):
382 MAXSIZE = 6
383 queue = self.Queue(maxsize=MAXSIZE)
384 child_can_start = self.Event()
385 parent_can_continue = self.Event()
386
387 proc = self.Process(
388 target=self._test_put,
389 args=(queue, child_can_start, parent_can_continue)
390 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000391 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000392 proc.start()
393
394 self.assertEqual(queue_empty(queue), True)
395 self.assertEqual(queue_full(queue, MAXSIZE), False)
396
397 queue.put(1)
398 queue.put(2, True)
399 queue.put(3, True, None)
400 queue.put(4, False)
401 queue.put(5, False, None)
402 queue.put_nowait(6)
403
404 # the values may be in buffer but not yet in pipe so sleep a bit
405 time.sleep(DELTA)
406
407 self.assertEqual(queue_empty(queue), False)
408 self.assertEqual(queue_full(queue, MAXSIZE), True)
409
410 put = TimingWrapper(queue.put)
411 put_nowait = TimingWrapper(queue.put_nowait)
412
413 self.assertRaises(pyqueue.Full, put, 7, False)
414 self.assertTimingAlmostEqual(put.elapsed, 0)
415
416 self.assertRaises(pyqueue.Full, put, 7, False, None)
417 self.assertTimingAlmostEqual(put.elapsed, 0)
418
419 self.assertRaises(pyqueue.Full, put_nowait, 7)
420 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
421
422 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
423 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
424
425 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
426 self.assertTimingAlmostEqual(put.elapsed, 0)
427
428 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
429 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
430
431 child_can_start.set()
432 parent_can_continue.wait()
433
434 self.assertEqual(queue_empty(queue), True)
435 self.assertEqual(queue_full(queue, MAXSIZE), False)
436
437 proc.join()
438
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000439 @classmethod
440 def _test_get(cls, queue, child_can_start, parent_can_continue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000441 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000442 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000443 queue.put(2)
444 queue.put(3)
445 queue.put(4)
446 queue.put(5)
447 parent_can_continue.set()
448
449 def test_get(self):
450 queue = self.Queue()
451 child_can_start = self.Event()
452 parent_can_continue = self.Event()
453
454 proc = self.Process(
455 target=self._test_get,
456 args=(queue, child_can_start, parent_can_continue)
457 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000458 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000459 proc.start()
460
461 self.assertEqual(queue_empty(queue), True)
462
463 child_can_start.set()
464 parent_can_continue.wait()
465
466 time.sleep(DELTA)
467 self.assertEqual(queue_empty(queue), False)
468
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000469 # Hangs unexpectedly, remove for now
470 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000471 self.assertEqual(queue.get(True, None), 2)
472 self.assertEqual(queue.get(True), 3)
473 self.assertEqual(queue.get(timeout=1), 4)
474 self.assertEqual(queue.get_nowait(), 5)
475
476 self.assertEqual(queue_empty(queue), True)
477
478 get = TimingWrapper(queue.get)
479 get_nowait = TimingWrapper(queue.get_nowait)
480
481 self.assertRaises(pyqueue.Empty, get, False)
482 self.assertTimingAlmostEqual(get.elapsed, 0)
483
484 self.assertRaises(pyqueue.Empty, get, False, None)
485 self.assertTimingAlmostEqual(get.elapsed, 0)
486
487 self.assertRaises(pyqueue.Empty, get_nowait)
488 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
489
490 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
491 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
492
493 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
494 self.assertTimingAlmostEqual(get.elapsed, 0)
495
496 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
497 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
498
499 proc.join()
500
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000501 @classmethod
502 def _test_fork(cls, queue):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000503 for i in range(10, 20):
504 queue.put(i)
505 # note that at this point the items may only be buffered, so the
506 # process cannot shutdown until the feeder thread has finished
507 # pushing items onto the pipe.
508
509 def test_fork(self):
510 # Old versions of Queue would fail to create a new feeder
511 # thread for a forked process if the original process had its
512 # own feeder thread. This test checks that this no longer
513 # happens.
514
515 queue = self.Queue()
516
517 # put items on queue so that main process starts a feeder thread
518 for i in range(10):
519 queue.put(i)
520
521 # wait to make sure thread starts before we fork a new process
522 time.sleep(DELTA)
523
524 # fork process
525 p = self.Process(target=self._test_fork, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200526 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000527 p.start()
528
529 # check that all expected items are in the queue
530 for i in range(20):
531 self.assertEqual(queue.get(), i)
532 self.assertRaises(pyqueue.Empty, queue.get, False)
533
534 p.join()
535
536 def test_qsize(self):
537 q = self.Queue()
538 try:
539 self.assertEqual(q.qsize(), 0)
540 except NotImplementedError:
541 return
542 q.put(1)
543 self.assertEqual(q.qsize(), 1)
544 q.put(5)
545 self.assertEqual(q.qsize(), 2)
546 q.get()
547 self.assertEqual(q.qsize(), 1)
548 q.get()
549 self.assertEqual(q.qsize(), 0)
550
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000551 @classmethod
552 def _test_task_done(cls, q):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000553 for obj in iter(q.get, None):
554 time.sleep(DELTA)
555 q.task_done()
556
557 def test_task_done(self):
558 queue = self.JoinableQueue()
559
560 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000561 self.skipTest("requires 'queue.task_done()' method")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000562
563 workers = [self.Process(target=self._test_task_done, args=(queue,))
564 for i in range(4)]
565
566 for p in workers:
Jesus Cea94f964f2011-09-09 20:26:57 +0200567 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000568 p.start()
569
570 for i in range(10):
571 queue.put(i)
572
573 queue.join()
574
575 for p in workers:
576 queue.put(None)
577
578 for p in workers:
579 p.join()
580
581#
582#
583#
584
585class _TestLock(BaseTestCase):
586
587 def test_lock(self):
588 lock = self.Lock()
589 self.assertEqual(lock.acquire(), True)
590 self.assertEqual(lock.acquire(False), False)
591 self.assertEqual(lock.release(), None)
592 self.assertRaises((ValueError, threading.ThreadError), lock.release)
593
594 def test_rlock(self):
595 lock = self.RLock()
596 self.assertEqual(lock.acquire(), True)
597 self.assertEqual(lock.acquire(), True)
598 self.assertEqual(lock.acquire(), True)
599 self.assertEqual(lock.release(), None)
600 self.assertEqual(lock.release(), None)
601 self.assertEqual(lock.release(), None)
602 self.assertRaises((AssertionError, RuntimeError), lock.release)
603
Jesse Nollerf8d00852009-03-31 03:25:07 +0000604 def test_lock_context(self):
605 with self.Lock():
606 pass
607
Benjamin Petersone711caf2008-06-11 16:44:04 +0000608
609class _TestSemaphore(BaseTestCase):
610
611 def _test_semaphore(self, sem):
612 self.assertReturnsIfImplemented(2, get_value, sem)
613 self.assertEqual(sem.acquire(), True)
614 self.assertReturnsIfImplemented(1, get_value, sem)
615 self.assertEqual(sem.acquire(), True)
616 self.assertReturnsIfImplemented(0, get_value, sem)
617 self.assertEqual(sem.acquire(False), False)
618 self.assertReturnsIfImplemented(0, get_value, sem)
619 self.assertEqual(sem.release(), None)
620 self.assertReturnsIfImplemented(1, get_value, sem)
621 self.assertEqual(sem.release(), None)
622 self.assertReturnsIfImplemented(2, get_value, sem)
623
624 def test_semaphore(self):
625 sem = self.Semaphore(2)
626 self._test_semaphore(sem)
627 self.assertEqual(sem.release(), None)
628 self.assertReturnsIfImplemented(3, get_value, sem)
629 self.assertEqual(sem.release(), None)
630 self.assertReturnsIfImplemented(4, get_value, sem)
631
632 def test_bounded_semaphore(self):
633 sem = self.BoundedSemaphore(2)
634 self._test_semaphore(sem)
635 # Currently fails on OS/X
636 #if HAVE_GETVALUE:
637 # self.assertRaises(ValueError, sem.release)
638 # self.assertReturnsIfImplemented(2, get_value, sem)
639
640 def test_timeout(self):
641 if self.TYPE != 'processes':
642 return
643
644 sem = self.Semaphore(0)
645 acquire = TimingWrapper(sem.acquire)
646
647 self.assertEqual(acquire(False), False)
648 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
649
650 self.assertEqual(acquire(False, None), False)
651 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
652
653 self.assertEqual(acquire(False, TIMEOUT1), False)
654 self.assertTimingAlmostEqual(acquire.elapsed, 0)
655
656 self.assertEqual(acquire(True, TIMEOUT2), False)
657 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
658
659 self.assertEqual(acquire(timeout=TIMEOUT3), False)
660 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
661
662
663class _TestCondition(BaseTestCase):
664
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000665 @classmethod
666 def f(cls, cond, sleeping, woken, timeout=None):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000667 cond.acquire()
668 sleeping.release()
669 cond.wait(timeout)
670 woken.release()
671 cond.release()
672
673 def check_invariant(self, cond):
674 # this is only supposed to succeed when there are no sleepers
675 if self.TYPE == 'processes':
676 try:
677 sleepers = (cond._sleeping_count.get_value() -
678 cond._woken_count.get_value())
679 self.assertEqual(sleepers, 0)
680 self.assertEqual(cond._wait_semaphore.get_value(), 0)
681 except NotImplementedError:
682 pass
683
684 def test_notify(self):
685 cond = self.Condition()
686 sleeping = self.Semaphore(0)
687 woken = self.Semaphore(0)
688
689 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000690 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000691 p.start()
692
693 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000694 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000695 p.start()
696
697 # wait for both children to start sleeping
698 sleeping.acquire()
699 sleeping.acquire()
700
701 # check no process/thread has woken up
702 time.sleep(DELTA)
703 self.assertReturnsIfImplemented(0, get_value, woken)
704
705 # wake up one process/thread
706 cond.acquire()
707 cond.notify()
708 cond.release()
709
710 # check one process/thread has woken up
711 time.sleep(DELTA)
712 self.assertReturnsIfImplemented(1, get_value, woken)
713
714 # wake up another
715 cond.acquire()
716 cond.notify()
717 cond.release()
718
719 # check other has woken up
720 time.sleep(DELTA)
721 self.assertReturnsIfImplemented(2, get_value, woken)
722
723 # check state is not mucked up
724 self.check_invariant(cond)
725 p.join()
726
727 def test_notify_all(self):
728 cond = self.Condition()
729 sleeping = self.Semaphore(0)
730 woken = self.Semaphore(0)
731
732 # start some threads/processes which will timeout
733 for i in range(3):
734 p = self.Process(target=self.f,
735 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000736 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000737 p.start()
738
739 t = threading.Thread(target=self.f,
740 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000741 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000742 t.start()
743
744 # wait for them all to sleep
745 for i in range(6):
746 sleeping.acquire()
747
748 # check they have all timed out
749 for i in range(6):
750 woken.acquire()
751 self.assertReturnsIfImplemented(0, get_value, woken)
752
753 # check state is not mucked up
754 self.check_invariant(cond)
755
756 # start some more threads/processes
757 for i in range(3):
758 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000759 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000760 p.start()
761
762 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000763 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000764 t.start()
765
766 # wait for them to all sleep
767 for i in range(6):
768 sleeping.acquire()
769
770 # check no process/thread has woken up
771 time.sleep(DELTA)
772 self.assertReturnsIfImplemented(0, get_value, woken)
773
774 # wake them all up
775 cond.acquire()
776 cond.notify_all()
777 cond.release()
778
779 # check they have all woken
Antoine Pitrouf25a8de2011-04-16 21:02:01 +0200780 for i in range(10):
781 try:
782 if get_value(woken) == 6:
783 break
784 except NotImplementedError:
785 break
786 time.sleep(DELTA)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000787 self.assertReturnsIfImplemented(6, get_value, woken)
788
789 # check state is not mucked up
790 self.check_invariant(cond)
791
792 def test_timeout(self):
793 cond = self.Condition()
794 wait = TimingWrapper(cond.wait)
795 cond.acquire()
796 res = wait(TIMEOUT1)
797 cond.release()
Georg Brandl65ffae02010-10-28 09:24:56 +0000798 self.assertEqual(res, False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000799 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
800
801
802class _TestEvent(BaseTestCase):
803
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000804 @classmethod
805 def _test_event(cls, event):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000806 time.sleep(TIMEOUT2)
807 event.set()
808
809 def test_event(self):
810 event = self.Event()
811 wait = TimingWrapper(event.wait)
812
Ezio Melotti13925002011-03-16 11:05:33 +0200813 # Removed temporarily, due to API shear, this does not
Benjamin Petersone711caf2008-06-11 16:44:04 +0000814 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000815 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000816
Benjamin Peterson965ce872009-04-05 21:24:58 +0000817 # Removed, threading.Event.wait() will return the value of the __flag
818 # instead of None. API Shear with the semaphore backed mp.Event
819 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000820 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000821 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000822 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
823
824 event.set()
825
826 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000827 self.assertEqual(event.is_set(), True)
828 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000829 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000830 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000831 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
832 # self.assertEqual(event.is_set(), True)
833
834 event.clear()
835
836 #self.assertEqual(event.is_set(), False)
837
Jesus Cea94f964f2011-09-09 20:26:57 +0200838 p = self.Process(target=self._test_event, args=(event,))
839 p.daemon = True
840 p.start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000841 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000842
843#
844#
845#
846
847class _TestValue(BaseTestCase):
848
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000849 ALLOWED_TYPES = ('processes',)
850
Benjamin Petersone711caf2008-06-11 16:44:04 +0000851 codes_values = [
852 ('i', 4343, 24234),
853 ('d', 3.625, -4.25),
854 ('h', -232, 234),
855 ('c', latin('x'), latin('y'))
856 ]
857
Antoine Pitrou7744e2a2010-11-22 16:26:21 +0000858 def setUp(self):
859 if not HAS_SHAREDCTYPES:
860 self.skipTest("requires multiprocessing.sharedctypes")
861
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000862 @classmethod
863 def _test(cls, values):
864 for sv, cv in zip(values, cls.codes_values):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000865 sv.value = cv[2]
866
867
868 def test_value(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000869 if raw:
870 values = [self.RawValue(code, value)
871 for code, value, _ in self.codes_values]
872 else:
873 values = [self.Value(code, value)
874 for code, value, _ in self.codes_values]
875
876 for sv, cv in zip(values, self.codes_values):
877 self.assertEqual(sv.value, cv[1])
878
879 proc = self.Process(target=self._test, args=(values,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200880 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000881 proc.start()
882 proc.join()
883
884 for sv, cv in zip(values, self.codes_values):
885 self.assertEqual(sv.value, cv[2])
886
887 def test_rawvalue(self):
888 self.test_value(raw=True)
889
890 def test_getobj_getlock(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000891 val1 = self.Value('i', 5)
892 lock1 = val1.get_lock()
893 obj1 = val1.get_obj()
894
895 val2 = self.Value('i', 5, lock=None)
896 lock2 = val2.get_lock()
897 obj2 = val2.get_obj()
898
899 lock = self.Lock()
900 val3 = self.Value('i', 5, lock=lock)
901 lock3 = val3.get_lock()
902 obj3 = val3.get_obj()
903 self.assertEqual(lock, lock3)
904
Jesse Nollerb0516a62009-01-18 03:11:38 +0000905 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000906 self.assertFalse(hasattr(arr4, 'get_lock'))
907 self.assertFalse(hasattr(arr4, 'get_obj'))
908
Jesse Nollerb0516a62009-01-18 03:11:38 +0000909 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
910
911 arr5 = self.RawValue('i', 5)
912 self.assertFalse(hasattr(arr5, 'get_lock'))
913 self.assertFalse(hasattr(arr5, 'get_obj'))
914
Benjamin Petersone711caf2008-06-11 16:44:04 +0000915
916class _TestArray(BaseTestCase):
917
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000918 ALLOWED_TYPES = ('processes',)
919
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +0000920 @classmethod
921 def f(cls, seq):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000922 for i in range(1, len(seq)):
923 seq[i] += seq[i-1]
924
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000925 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000926 def test_array(self, raw=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000927 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
928 if raw:
929 arr = self.RawArray('i', seq)
930 else:
931 arr = self.Array('i', seq)
932
933 self.assertEqual(len(arr), len(seq))
934 self.assertEqual(arr[3], seq[3])
935 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
936
937 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
938
939 self.assertEqual(list(arr[:]), seq)
940
941 self.f(seq)
942
943 p = self.Process(target=self.f, args=(arr,))
Jesus Cea94f964f2011-09-09 20:26:57 +0200944 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000945 p.start()
946 p.join()
947
948 self.assertEqual(list(arr[:]), seq)
949
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000950 @unittest.skipIf(c_int is None, "requires _ctypes")
Mark Dickinson89461ef2011-03-26 10:19:03 +0000951 def test_array_from_size(self):
952 size = 10
953 # Test for zeroing (see issue #11675).
954 # The repetition below strengthens the test by increasing the chances
955 # of previously allocated non-zero memory being used for the new array
956 # on the 2nd and 3rd loops.
957 for _ in range(3):
958 arr = self.Array('i', size)
959 self.assertEqual(len(arr), size)
960 self.assertEqual(list(arr), [0] * size)
961 arr[:] = range(10)
962 self.assertEqual(list(arr), list(range(10)))
963 del arr
964
965 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000966 def test_rawarray(self):
967 self.test_array(raw=True)
968
Florent Xiclunafd1b0932010-03-28 00:25:02 +0000969 @unittest.skipIf(c_int is None, "requires _ctypes")
Benjamin Petersone711caf2008-06-11 16:44:04 +0000970 def test_getobj_getlock_obj(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +0000971 arr1 = self.Array('i', list(range(10)))
972 lock1 = arr1.get_lock()
973 obj1 = arr1.get_obj()
974
975 arr2 = self.Array('i', list(range(10)), lock=None)
976 lock2 = arr2.get_lock()
977 obj2 = arr2.get_obj()
978
979 lock = self.Lock()
980 arr3 = self.Array('i', list(range(10)), lock=lock)
981 lock3 = arr3.get_lock()
982 obj3 = arr3.get_obj()
983 self.assertEqual(lock, lock3)
984
Jesse Nollerb0516a62009-01-18 03:11:38 +0000985 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000986 self.assertFalse(hasattr(arr4, 'get_lock'))
987 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000988 self.assertRaises(AttributeError,
989 self.Array, 'i', range(10), lock='notalock')
990
991 arr5 = self.RawArray('i', range(10))
992 self.assertFalse(hasattr(arr5, 'get_lock'))
993 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000994
995#
996#
997#
998
999class _TestContainers(BaseTestCase):
1000
1001 ALLOWED_TYPES = ('manager',)
1002
1003 def test_list(self):
1004 a = self.list(list(range(10)))
1005 self.assertEqual(a[:], list(range(10)))
1006
1007 b = self.list()
1008 self.assertEqual(b[:], [])
1009
1010 b.extend(list(range(5)))
1011 self.assertEqual(b[:], list(range(5)))
1012
1013 self.assertEqual(b[2], 2)
1014 self.assertEqual(b[2:10], [2,3,4])
1015
1016 b *= 2
1017 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1018
1019 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1020
1021 self.assertEqual(a[:], list(range(10)))
1022
1023 d = [a, b]
1024 e = self.list(d)
1025 self.assertEqual(
1026 e[:],
1027 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1028 )
1029
1030 f = self.list([a])
1031 a.append('hello')
1032 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1033
1034 def test_dict(self):
1035 d = self.dict()
1036 indices = list(range(65, 70))
1037 for i in indices:
1038 d[i] = chr(i)
1039 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1040 self.assertEqual(sorted(d.keys()), indices)
1041 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1042 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1043
1044 def test_namespace(self):
1045 n = self.Namespace()
1046 n.name = 'Bob'
1047 n.job = 'Builder'
1048 n._hidden = 'hidden'
1049 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1050 del n.job
1051 self.assertEqual(str(n), "Namespace(name='Bob')")
1052 self.assertTrue(hasattr(n, 'name'))
1053 self.assertTrue(not hasattr(n, 'job'))
1054
1055#
1056#
1057#
1058
1059def sqr(x, wait=0.0):
1060 time.sleep(wait)
1061 return x*x
Ask Solem2afcbf22010-11-09 20:55:52 +00001062
Benjamin Petersone711caf2008-06-11 16:44:04 +00001063class _TestPool(BaseTestCase):
1064
1065 def test_apply(self):
1066 papply = self.pool.apply
1067 self.assertEqual(papply(sqr, (5,)), sqr(5))
1068 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1069
1070 def test_map(self):
1071 pmap = self.pool.map
1072 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1073 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1074 list(map(sqr, list(range(100)))))
1075
Alexandre Vassalottie52e3782009-07-17 09:18:18 +00001076 def test_map_chunksize(self):
1077 try:
1078 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1079 except multiprocessing.TimeoutError:
1080 self.fail("pool.map_async with chunksize stalled on null list")
1081
Benjamin Petersone711caf2008-06-11 16:44:04 +00001082 def test_async(self):
1083 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1084 get = TimingWrapper(res.get)
1085 self.assertEqual(get(), 49)
1086 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1087
1088 def test_async_timeout(self):
1089 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1090 get = TimingWrapper(res.get)
1091 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1092 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1093
1094 def test_imap(self):
1095 it = self.pool.imap(sqr, list(range(10)))
1096 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1097
1098 it = self.pool.imap(sqr, list(range(10)))
1099 for i in range(10):
1100 self.assertEqual(next(it), i*i)
1101 self.assertRaises(StopIteration, it.__next__)
1102
1103 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1104 for i in range(1000):
1105 self.assertEqual(next(it), i*i)
1106 self.assertRaises(StopIteration, it.__next__)
1107
1108 def test_imap_unordered(self):
1109 it = self.pool.imap_unordered(sqr, list(range(1000)))
1110 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1111
1112 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1113 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1114
1115 def test_make_pool(self):
Victor Stinner2fae27b2011-06-20 17:53:35 +02001116 self.assertRaises(ValueError, multiprocessing.Pool, -1)
1117 self.assertRaises(ValueError, multiprocessing.Pool, 0)
1118
Benjamin Petersone711caf2008-06-11 16:44:04 +00001119 p = multiprocessing.Pool(3)
1120 self.assertEqual(3, len(p._pool))
1121 p.close()
1122 p.join()
1123
1124 def test_terminate(self):
1125 if self.TYPE == 'manager':
1126 # On Unix a forked process increfs each shared object to
1127 # which its parent process held a reference. If the
1128 # forked process gets terminated then there is likely to
1129 # be a reference leak. So to prevent
1130 # _TestZZZNumberOfObjects from failing we skip this test
1131 # when using a manager.
1132 return
1133
1134 result = self.pool.map_async(
1135 time.sleep, [0.1 for i in range(10000)], chunksize=1
1136 )
1137 self.pool.terminate()
1138 join = TimingWrapper(self.pool.join)
1139 join()
Victor Stinner900189b2011-03-24 16:39:07 +01001140 self.assertLess(join.elapsed, 0.5)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001141
Ask Solem2afcbf22010-11-09 20:55:52 +00001142def raising():
1143 raise KeyError("key")
Jesse Noller1f0b6582010-01-27 03:36:01 +00001144
Ask Solem2afcbf22010-11-09 20:55:52 +00001145def unpickleable_result():
1146 return lambda: 42
1147
1148class _TestPoolWorkerErrors(BaseTestCase):
Jesse Noller1f0b6582010-01-27 03:36:01 +00001149 ALLOWED_TYPES = ('processes', )
Ask Solem2afcbf22010-11-09 20:55:52 +00001150
1151 def test_async_error_callback(self):
1152 p = multiprocessing.Pool(2)
1153
1154 scratchpad = [None]
1155 def errback(exc):
1156 scratchpad[0] = exc
1157
1158 res = p.apply_async(raising, error_callback=errback)
1159 self.assertRaises(KeyError, res.get)
1160 self.assertTrue(scratchpad[0])
1161 self.assertIsInstance(scratchpad[0], KeyError)
1162
1163 p.close()
1164 p.join()
1165
1166 def test_unpickleable_result(self):
1167 from multiprocessing.pool import MaybeEncodingError
1168 p = multiprocessing.Pool(2)
1169
1170 # Make sure we don't lose pool processes because of encoding errors.
1171 for iteration in range(20):
1172
1173 scratchpad = [None]
1174 def errback(exc):
1175 scratchpad[0] = exc
1176
1177 res = p.apply_async(unpickleable_result, error_callback=errback)
1178 self.assertRaises(MaybeEncodingError, res.get)
1179 wrapped = scratchpad[0]
1180 self.assertTrue(wrapped)
1181 self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1182 self.assertIsNotNone(wrapped.exc)
1183 self.assertIsNotNone(wrapped.value)
1184
1185 p.close()
1186 p.join()
1187
1188class _TestPoolWorkerLifetime(BaseTestCase):
1189 ALLOWED_TYPES = ('processes', )
1190
Jesse Noller1f0b6582010-01-27 03:36:01 +00001191 def test_pool_worker_lifetime(self):
1192 p = multiprocessing.Pool(3, maxtasksperchild=10)
1193 self.assertEqual(3, len(p._pool))
1194 origworkerpids = [w.pid for w in p._pool]
1195 # Run many tasks so each worker gets replaced (hopefully)
1196 results = []
1197 for i in range(100):
1198 results.append(p.apply_async(sqr, (i, )))
1199 # Fetch the results and verify we got the right answers,
1200 # also ensuring all the tasks have completed.
1201 for (j, res) in enumerate(results):
1202 self.assertEqual(res.get(), sqr(j))
1203 # Refill the pool
1204 p._repopulate_pool()
Florent Xiclunafb190f62010-03-04 16:10:10 +00001205 # Wait until all workers are alive
Antoine Pitrou540ab062011-04-06 22:51:17 +02001206 # (countdown * DELTA = 5 seconds max startup process time)
1207 countdown = 50
Florent Xiclunafb190f62010-03-04 16:10:10 +00001208 while countdown and not all(w.is_alive() for w in p._pool):
1209 countdown -= 1
1210 time.sleep(DELTA)
Jesse Noller1f0b6582010-01-27 03:36:01 +00001211 finalworkerpids = [w.pid for w in p._pool]
Florent Xiclunafb190f62010-03-04 16:10:10 +00001212 # All pids should be assigned. See issue #7805.
1213 self.assertNotIn(None, origworkerpids)
1214 self.assertNotIn(None, finalworkerpids)
1215 # Finally, check that the worker pids have changed
Jesse Noller1f0b6582010-01-27 03:36:01 +00001216 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1217 p.close()
1218 p.join()
1219
Benjamin Petersone711caf2008-06-11 16:44:04 +00001220#
1221# Test that manager has expected number of shared objects left
1222#
1223
1224class _TestZZZNumberOfObjects(BaseTestCase):
1225 # Because test cases are sorted alphabetically, this one will get
1226 # run after all the other tests for the manager. It tests that
1227 # there have been no "reference leaks" for the manager's shared
1228 # objects. Note the comment in _TestPool.test_terminate().
1229 ALLOWED_TYPES = ('manager',)
1230
1231 def test_number_of_objects(self):
1232 EXPECTED_NUMBER = 1 # the pool object is still alive
1233 multiprocessing.active_children() # discard dead process objs
1234 gc.collect() # do garbage collection
1235 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001236 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001237 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001238 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001239 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001240
1241 self.assertEqual(refs, EXPECTED_NUMBER)
1242
1243#
1244# Test of creating a customized manager class
1245#
1246
1247from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1248
1249class FooBar(object):
1250 def f(self):
1251 return 'f()'
1252 def g(self):
1253 raise ValueError
1254 def _h(self):
1255 return '_h()'
1256
1257def baz():
1258 for i in range(10):
1259 yield i*i
1260
1261class IteratorProxy(BaseProxy):
Florent Xiclunaaa171062010-08-14 15:56:42 +00001262 _exposed_ = ('__next__',)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001263 def __iter__(self):
1264 return self
1265 def __next__(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001266 return self._callmethod('__next__')
1267
1268class MyManager(BaseManager):
1269 pass
1270
1271MyManager.register('Foo', callable=FooBar)
1272MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1273MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1274
1275
1276class _TestMyManager(BaseTestCase):
1277
1278 ALLOWED_TYPES = ('manager',)
1279
1280 def test_mymanager(self):
1281 manager = MyManager()
1282 manager.start()
1283
1284 foo = manager.Foo()
1285 bar = manager.Bar()
1286 baz = manager.baz()
1287
1288 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1289 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1290
1291 self.assertEqual(foo_methods, ['f', 'g'])
1292 self.assertEqual(bar_methods, ['f', '_h'])
1293
1294 self.assertEqual(foo.f(), 'f()')
1295 self.assertRaises(ValueError, foo.g)
1296 self.assertEqual(foo._callmethod('f'), 'f()')
1297 self.assertRaises(RemoteError, foo._callmethod, '_h')
1298
1299 self.assertEqual(bar.f(), 'f()')
1300 self.assertEqual(bar._h(), '_h()')
1301 self.assertEqual(bar._callmethod('f'), 'f()')
1302 self.assertEqual(bar._callmethod('_h'), '_h()')
1303
1304 self.assertEqual(list(baz), [i*i for i in range(10)])
1305
1306 manager.shutdown()
1307
1308#
1309# Test of connecting to a remote server and using xmlrpclib for serialization
1310#
1311
1312_queue = pyqueue.Queue()
1313def get_queue():
1314 return _queue
1315
1316class QueueManager(BaseManager):
1317 '''manager class used by server process'''
1318QueueManager.register('get_queue', callable=get_queue)
1319
1320class QueueManager2(BaseManager):
1321 '''manager class which specifies the same interface as QueueManager'''
1322QueueManager2.register('get_queue')
1323
1324
1325SERIALIZER = 'xmlrpclib'
1326
1327class _TestRemoteManager(BaseTestCase):
1328
1329 ALLOWED_TYPES = ('manager',)
1330
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001331 @classmethod
1332 def _putter(cls, address, authkey):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001333 manager = QueueManager2(
1334 address=address, authkey=authkey, serializer=SERIALIZER
1335 )
1336 manager.connect()
1337 queue = manager.get_queue()
1338 queue.put(('hello world', None, True, 2.25))
1339
1340 def test_remote(self):
1341 authkey = os.urandom(32)
1342
1343 manager = QueueManager(
1344 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1345 )
1346 manager.start()
1347
1348 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001349 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001350 p.start()
1351
1352 manager2 = QueueManager2(
1353 address=manager.address, authkey=authkey, serializer=SERIALIZER
1354 )
1355 manager2.connect()
1356 queue = manager2.get_queue()
1357
1358 # Note that xmlrpclib will deserialize object as a list not a tuple
1359 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1360
1361 # Because we are using xmlrpclib for serialization instead of
1362 # pickle this will cause a serialization error.
1363 self.assertRaises(Exception, queue.put, time.sleep)
1364
1365 # Make queue finalizer run before the server is stopped
1366 del queue
1367 manager.shutdown()
1368
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001369class _TestManagerRestart(BaseTestCase):
1370
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001371 @classmethod
1372 def _putter(cls, address, authkey):
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001373 manager = QueueManager(
1374 address=address, authkey=authkey, serializer=SERIALIZER)
1375 manager.connect()
1376 queue = manager.get_queue()
1377 queue.put('hello world')
1378
1379 def test_rapid_restart(self):
1380 authkey = os.urandom(32)
1381 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001382 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER)
Brian Curtin50be1ca2010-11-01 05:10:44 +00001383 srvr = manager.get_server()
1384 addr = srvr.address
1385 # Close the connection.Listener socket which gets opened as a part
1386 # of manager.get_server(). It's not needed for the test.
1387 srvr.listener.close()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001388 manager.start()
1389
1390 p = self.Process(target=self._putter, args=(manager.address, authkey))
Jesus Cea94f964f2011-09-09 20:26:57 +02001391 p.daemon = True
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001392 p.start()
1393 queue = manager.get_queue()
1394 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001395 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001396 manager.shutdown()
1397 manager = QueueManager(
Antoine Pitrou043bad02010-04-30 23:20:15 +00001398 address=addr, authkey=authkey, serializer=SERIALIZER)
Antoine Pitrouc824e9a2011-04-05 18:11:33 +02001399 try:
1400 manager.start()
1401 except IOError as e:
1402 if e.errno != errno.EADDRINUSE:
1403 raise
1404 # Retry after some time, in case the old socket was lingering
1405 # (sporadic failure on buildbots)
1406 time.sleep(1.0)
1407 manager = QueueManager(
1408 address=addr, authkey=authkey, serializer=SERIALIZER)
Jesse Noller35d1f002009-03-30 22:59:27 +00001409 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001410
Benjamin Petersone711caf2008-06-11 16:44:04 +00001411#
1412#
1413#
1414
1415SENTINEL = latin('')
1416
1417class _TestConnection(BaseTestCase):
1418
1419 ALLOWED_TYPES = ('processes', 'threads')
1420
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001421 @classmethod
1422 def _echo(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001423 for msg in iter(conn.recv_bytes, SENTINEL):
1424 conn.send_bytes(msg)
1425 conn.close()
1426
1427 def test_connection(self):
1428 conn, child_conn = self.Pipe()
1429
1430 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001431 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001432 p.start()
1433
1434 seq = [1, 2.25, None]
1435 msg = latin('hello world')
1436 longmsg = msg * 10
1437 arr = array.array('i', list(range(4)))
1438
1439 if self.TYPE == 'processes':
1440 self.assertEqual(type(conn.fileno()), int)
1441
1442 self.assertEqual(conn.send(seq), None)
1443 self.assertEqual(conn.recv(), seq)
1444
1445 self.assertEqual(conn.send_bytes(msg), None)
1446 self.assertEqual(conn.recv_bytes(), msg)
1447
1448 if self.TYPE == 'processes':
1449 buffer = array.array('i', [0]*10)
1450 expected = list(arr) + [0] * (10 - len(arr))
1451 self.assertEqual(conn.send_bytes(arr), None)
1452 self.assertEqual(conn.recv_bytes_into(buffer),
1453 len(arr) * buffer.itemsize)
1454 self.assertEqual(list(buffer), expected)
1455
1456 buffer = array.array('i', [0]*10)
1457 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1458 self.assertEqual(conn.send_bytes(arr), None)
1459 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1460 len(arr) * buffer.itemsize)
1461 self.assertEqual(list(buffer), expected)
1462
1463 buffer = bytearray(latin(' ' * 40))
1464 self.assertEqual(conn.send_bytes(longmsg), None)
1465 try:
1466 res = conn.recv_bytes_into(buffer)
1467 except multiprocessing.BufferTooShort as e:
1468 self.assertEqual(e.args, (longmsg,))
1469 else:
1470 self.fail('expected BufferTooShort, got %s' % res)
1471
1472 poll = TimingWrapper(conn.poll)
1473
1474 self.assertEqual(poll(), False)
1475 self.assertTimingAlmostEqual(poll.elapsed, 0)
1476
1477 self.assertEqual(poll(TIMEOUT1), False)
1478 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1479
1480 conn.send(None)
1481
1482 self.assertEqual(poll(TIMEOUT1), True)
1483 self.assertTimingAlmostEqual(poll.elapsed, 0)
1484
1485 self.assertEqual(conn.recv(), None)
1486
1487 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1488 conn.send_bytes(really_big_msg)
1489 self.assertEqual(conn.recv_bytes(), really_big_msg)
1490
1491 conn.send_bytes(SENTINEL) # tell child to quit
1492 child_conn.close()
1493
1494 if self.TYPE == 'processes':
1495 self.assertEqual(conn.readable, True)
1496 self.assertEqual(conn.writable, True)
1497 self.assertRaises(EOFError, conn.recv)
1498 self.assertRaises(EOFError, conn.recv_bytes)
1499
1500 p.join()
1501
1502 def test_duplex_false(self):
1503 reader, writer = self.Pipe(duplex=False)
1504 self.assertEqual(writer.send(1), None)
1505 self.assertEqual(reader.recv(), 1)
1506 if self.TYPE == 'processes':
1507 self.assertEqual(reader.readable, True)
1508 self.assertEqual(reader.writable, False)
1509 self.assertEqual(writer.readable, False)
1510 self.assertEqual(writer.writable, True)
1511 self.assertRaises(IOError, reader.send, 2)
1512 self.assertRaises(IOError, writer.recv)
1513 self.assertRaises(IOError, writer.poll)
1514
1515 def test_spawn_close(self):
1516 # We test that a pipe connection can be closed by parent
1517 # process immediately after child is spawned. On Windows this
1518 # would have sometimes failed on old versions because
1519 # child_conn would be closed before the child got a chance to
1520 # duplicate it.
1521 conn, child_conn = self.Pipe()
1522
1523 p = self.Process(target=self._echo, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001524 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001525 p.start()
1526 child_conn.close() # this might complete before child initializes
1527
1528 msg = latin('hello')
1529 conn.send_bytes(msg)
1530 self.assertEqual(conn.recv_bytes(), msg)
1531
1532 conn.send_bytes(SENTINEL)
1533 conn.close()
1534 p.join()
1535
1536 def test_sendbytes(self):
1537 if self.TYPE != 'processes':
1538 return
1539
1540 msg = latin('abcdefghijklmnopqrstuvwxyz')
1541 a, b = self.Pipe()
1542
1543 a.send_bytes(msg)
1544 self.assertEqual(b.recv_bytes(), msg)
1545
1546 a.send_bytes(msg, 5)
1547 self.assertEqual(b.recv_bytes(), msg[5:])
1548
1549 a.send_bytes(msg, 7, 8)
1550 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1551
1552 a.send_bytes(msg, 26)
1553 self.assertEqual(b.recv_bytes(), latin(''))
1554
1555 a.send_bytes(msg, 26, 0)
1556 self.assertEqual(b.recv_bytes(), latin(''))
1557
1558 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1559
1560 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1561
1562 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1563
1564 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1565
1566 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1567
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001568 @classmethod
1569 def _is_fd_assigned(cls, fd):
1570 try:
1571 os.fstat(fd)
1572 except OSError as e:
1573 if e.errno == errno.EBADF:
1574 return False
1575 raise
1576 else:
1577 return True
1578
1579 @classmethod
1580 def _writefd(cls, conn, data, create_dummy_fds=False):
1581 if create_dummy_fds:
1582 for i in range(0, 256):
1583 if not cls._is_fd_assigned(i):
1584 os.dup2(conn.fileno(), i)
1585 fd = reduction.recv_handle(conn)
1586 if msvcrt:
1587 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
1588 os.write(fd, data)
1589 os.close(fd)
1590
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001591 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001592 def test_fd_transfer(self):
1593 if self.TYPE != 'processes':
1594 self.skipTest("only makes sense with processes")
1595 conn, child_conn = self.Pipe(duplex=True)
1596
1597 p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
Jesus Cea94f964f2011-09-09 20:26:57 +02001598 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001599 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001600 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001601 with open(test.support.TESTFN, "wb") as f:
1602 fd = f.fileno()
1603 if msvcrt:
1604 fd = msvcrt.get_osfhandle(fd)
1605 reduction.send_handle(conn, fd, p.pid)
1606 p.join()
1607 with open(test.support.TESTFN, "rb") as f:
1608 self.assertEqual(f.read(), b"foo")
1609
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001610 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001611 @unittest.skipIf(sys.platform == "win32",
1612 "test semantics don't make sense on Windows")
1613 @unittest.skipIf(MAXFD <= 256,
1614 "largest assignable fd number is too small")
1615 @unittest.skipUnless(hasattr(os, "dup2"),
1616 "test needs os.dup2()")
1617 def test_large_fd_transfer(self):
1618 # With fd > 256 (issue #11657)
1619 if self.TYPE != 'processes':
1620 self.skipTest("only makes sense with processes")
1621 conn, child_conn = self.Pipe(duplex=True)
1622
1623 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
Jesus Cea94f964f2011-09-09 20:26:57 +02001624 p.daemon = True
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001625 p.start()
Victor Stinnerd0b10a62011-09-21 01:10:29 +02001626 self.addCleanup(test.support.unlink, test.support.TESTFN)
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001627 with open(test.support.TESTFN, "wb") as f:
1628 fd = f.fileno()
1629 for newfd in range(256, MAXFD):
1630 if not self._is_fd_assigned(newfd):
1631 break
1632 else:
1633 self.fail("could not find an unassigned large file descriptor")
1634 os.dup2(fd, newfd)
1635 try:
1636 reduction.send_handle(conn, newfd, p.pid)
1637 finally:
1638 os.close(newfd)
1639 p.join()
1640 with open(test.support.TESTFN, "rb") as f:
1641 self.assertEqual(f.read(), b"bar")
1642
Jesus Cea4507e642011-09-21 03:53:25 +02001643 @classmethod
1644 def _send_data_without_fd(self, conn):
1645 os.write(conn.fileno(), b"\0")
1646
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001647 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
Jesus Cea4507e642011-09-21 03:53:25 +02001648 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
1649 def test_missing_fd_transfer(self):
1650 # Check that exception is raised when received data is not
1651 # accompanied by a file descriptor in ancillary data.
1652 if self.TYPE != 'processes':
1653 self.skipTest("only makes sense with processes")
1654 conn, child_conn = self.Pipe(duplex=True)
1655
1656 p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
1657 p.daemon = True
1658 p.start()
1659 self.assertRaises(RuntimeError, reduction.recv_handle, conn)
1660 p.join()
Antoine Pitroubcb39d42011-08-23 19:46:22 +02001661
Benjamin Petersone711caf2008-06-11 16:44:04 +00001662class _TestListenerClient(BaseTestCase):
1663
1664 ALLOWED_TYPES = ('processes', 'threads')
1665
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001666 @classmethod
1667 def _test(cls, address):
1668 conn = cls.connection.Client(address)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001669 conn.send('hello')
1670 conn.close()
1671
1672 def test_listener_client(self):
1673 for family in self.connection.families:
1674 l = self.connection.Listener(family=family)
1675 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001676 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001677 p.start()
1678 conn = l.accept()
1679 self.assertEqual(conn.recv(), 'hello')
1680 p.join()
1681 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001682#
1683# Test of sending connection and socket objects between processes
1684#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001685"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001686class _TestPicklingConnections(BaseTestCase):
1687
1688 ALLOWED_TYPES = ('processes',)
1689
1690 def _listener(self, conn, families):
1691 for fam in families:
1692 l = self.connection.Listener(family=fam)
1693 conn.send(l.address)
1694 new_conn = l.accept()
1695 conn.send(new_conn)
1696
1697 if self.TYPE == 'processes':
1698 l = socket.socket()
1699 l.bind(('localhost', 0))
1700 conn.send(l.getsockname())
1701 l.listen(1)
1702 new_conn, addr = l.accept()
1703 conn.send(new_conn)
1704
1705 conn.recv()
1706
1707 def _remote(self, conn):
1708 for (address, msg) in iter(conn.recv, None):
1709 client = self.connection.Client(address)
1710 client.send(msg.upper())
1711 client.close()
1712
1713 if self.TYPE == 'processes':
1714 address, msg = conn.recv()
1715 client = socket.socket()
1716 client.connect(address)
1717 client.sendall(msg.upper())
1718 client.close()
1719
1720 conn.close()
1721
1722 def test_pickling(self):
1723 try:
1724 multiprocessing.allow_connection_pickling()
1725 except ImportError:
1726 return
1727
1728 families = self.connection.families
1729
1730 lconn, lconn0 = self.Pipe()
1731 lp = self.Process(target=self._listener, args=(lconn0, families))
Jesus Cea94f964f2011-09-09 20:26:57 +02001732 lp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001733 lp.start()
1734 lconn0.close()
1735
1736 rconn, rconn0 = self.Pipe()
1737 rp = self.Process(target=self._remote, args=(rconn0,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001738 rp.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001739 rp.start()
1740 rconn0.close()
1741
1742 for fam in families:
1743 msg = ('This connection uses family %s' % fam).encode('ascii')
1744 address = lconn.recv()
1745 rconn.send((address, msg))
1746 new_conn = lconn.recv()
1747 self.assertEqual(new_conn.recv(), msg.upper())
1748
1749 rconn.send(None)
1750
1751 if self.TYPE == 'processes':
1752 msg = latin('This connection uses a normal socket')
1753 address = lconn.recv()
1754 rconn.send((address, msg))
1755 if hasattr(socket, 'fromfd'):
1756 new_conn = lconn.recv()
1757 self.assertEqual(new_conn.recv(100), msg.upper())
1758 else:
1759 # XXX On Windows with Py2.6 need to backport fromfd()
1760 discard = lconn.recv_bytes()
1761
1762 lconn.send(None)
1763
1764 rconn.close()
1765 lconn.close()
1766
1767 lp.join()
1768 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001769"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001770#
1771#
1772#
1773
1774class _TestHeap(BaseTestCase):
1775
1776 ALLOWED_TYPES = ('processes',)
1777
1778 def test_heap(self):
1779 iterations = 5000
1780 maxblocks = 50
1781 blocks = []
1782
1783 # create and destroy lots of blocks of different sizes
1784 for i in range(iterations):
1785 size = int(random.lognormvariate(0, 1) * 1000)
1786 b = multiprocessing.heap.BufferWrapper(size)
1787 blocks.append(b)
1788 if len(blocks) > maxblocks:
1789 i = random.randrange(maxblocks)
1790 del blocks[i]
1791
1792 # get the heap object
1793 heap = multiprocessing.heap.BufferWrapper._heap
1794
1795 # verify the state of the heap
1796 all = []
1797 occupied = 0
Charles-François Natali778db492011-07-02 14:35:49 +02001798 heap._lock.acquire()
1799 self.addCleanup(heap._lock.release)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001800 for L in list(heap._len_to_seq.values()):
1801 for arena, start, stop in L:
1802 all.append((heap._arenas.index(arena), start, stop,
1803 stop-start, 'free'))
1804 for arena, start, stop in heap._allocated_blocks:
1805 all.append((heap._arenas.index(arena), start, stop,
1806 stop-start, 'occupied'))
1807 occupied += (stop-start)
1808
1809 all.sort()
1810
1811 for i in range(len(all)-1):
1812 (arena, start, stop) = all[i][:3]
1813 (narena, nstart, nstop) = all[i+1][:3]
1814 self.assertTrue((arena != narena and nstart == 0) or
1815 (stop == nstart))
1816
Charles-François Natali778db492011-07-02 14:35:49 +02001817 def test_free_from_gc(self):
1818 # Check that freeing of blocks by the garbage collector doesn't deadlock
1819 # (issue #12352).
1820 # Make sure the GC is enabled, and set lower collection thresholds to
1821 # make collections more frequent (and increase the probability of
1822 # deadlock).
1823 if not gc.isenabled():
1824 gc.enable()
1825 self.addCleanup(gc.disable)
1826 thresholds = gc.get_threshold()
1827 self.addCleanup(gc.set_threshold, *thresholds)
1828 gc.set_threshold(10)
1829
1830 # perform numerous block allocations, with cyclic references to make
1831 # sure objects are collected asynchronously by the gc
1832 for i in range(5000):
1833 a = multiprocessing.heap.BufferWrapper(1)
1834 b = multiprocessing.heap.BufferWrapper(1)
1835 # circular references
1836 a.buddy = b
1837 b.buddy = a
1838
Benjamin Petersone711caf2008-06-11 16:44:04 +00001839#
1840#
1841#
1842
Benjamin Petersone711caf2008-06-11 16:44:04 +00001843class _Foo(Structure):
1844 _fields_ = [
1845 ('x', c_int),
1846 ('y', c_double)
1847 ]
1848
1849class _TestSharedCTypes(BaseTestCase):
1850
1851 ALLOWED_TYPES = ('processes',)
1852
Antoine Pitrou7744e2a2010-11-22 16:26:21 +00001853 def setUp(self):
1854 if not HAS_SHAREDCTYPES:
1855 self.skipTest("requires multiprocessing.sharedctypes")
1856
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001857 @classmethod
1858 def _double(cls, x, y, foo, arr, string):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001859 x.value *= 2
1860 y.value *= 2
1861 foo.x *= 2
1862 foo.y *= 2
1863 string.value *= 2
1864 for i in range(len(arr)):
1865 arr[i] *= 2
1866
1867 def test_sharedctypes(self, lock=False):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001868 x = Value('i', 7, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001869 y = Value(c_double, 1.0/3.0, lock=lock)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001870 foo = Value(_Foo, 3, 2, lock=lock)
Georg Brandl89fad142010-03-14 10:23:39 +00001871 arr = self.Array('d', list(range(10)), lock=lock)
1872 string = self.Array('c', 20, lock=lock)
Brian Curtinafa88b52010-10-07 01:12:19 +00001873 string.value = latin('hello')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001874
1875 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
Jesus Cea94f964f2011-09-09 20:26:57 +02001876 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001877 p.start()
1878 p.join()
1879
1880 self.assertEqual(x.value, 14)
1881 self.assertAlmostEqual(y.value, 2.0/3.0)
1882 self.assertEqual(foo.x, 6)
1883 self.assertAlmostEqual(foo.y, 4.0)
1884 for i in range(10):
1885 self.assertAlmostEqual(arr[i], i*2)
1886 self.assertEqual(string.value, latin('hellohello'))
1887
1888 def test_synchronize(self):
1889 self.test_sharedctypes(lock=True)
1890
1891 def test_copy(self):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001892 foo = _Foo(2, 5.0)
Brian Curtinafa88b52010-10-07 01:12:19 +00001893 bar = copy(foo)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001894 foo.x = 0
1895 foo.y = 0
1896 self.assertEqual(bar.x, 2)
1897 self.assertAlmostEqual(bar.y, 5.0)
1898
1899#
1900#
1901#
1902
1903class _TestFinalize(BaseTestCase):
1904
1905 ALLOWED_TYPES = ('processes',)
1906
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00001907 @classmethod
1908 def _test_finalize(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00001909 class Foo(object):
1910 pass
1911
1912 a = Foo()
1913 util.Finalize(a, conn.send, args=('a',))
1914 del a # triggers callback for a
1915
1916 b = Foo()
1917 close_b = util.Finalize(b, conn.send, args=('b',))
1918 close_b() # triggers callback for b
1919 close_b() # does nothing because callback has already been called
1920 del b # does nothing because callback has already been called
1921
1922 c = Foo()
1923 util.Finalize(c, conn.send, args=('c',))
1924
1925 d10 = Foo()
1926 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1927
1928 d01 = Foo()
1929 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1930 d02 = Foo()
1931 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1932 d03 = Foo()
1933 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1934
1935 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1936
1937 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1938
Ezio Melotti13925002011-03-16 11:05:33 +02001939 # call multiprocessing's cleanup function then exit process without
Benjamin Petersone711caf2008-06-11 16:44:04 +00001940 # garbage collecting locals
1941 util._exit_function()
1942 conn.close()
1943 os._exit(0)
1944
1945 def test_finalize(self):
1946 conn, child_conn = self.Pipe()
1947
1948 p = self.Process(target=self._test_finalize, args=(child_conn,))
Jesus Cea94f964f2011-09-09 20:26:57 +02001949 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001950 p.start()
1951 p.join()
1952
1953 result = [obj for obj in iter(conn.recv, 'STOP')]
1954 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1955
1956#
1957# Test that from ... import * works for each module
1958#
1959
1960class _TestImportStar(BaseTestCase):
1961
1962 ALLOWED_TYPES = ('processes',)
1963
1964 def test_import(self):
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001965 modules = [
Benjamin Petersone711caf2008-06-11 16:44:04 +00001966 'multiprocessing', 'multiprocessing.connection',
1967 'multiprocessing.heap', 'multiprocessing.managers',
1968 'multiprocessing.pool', 'multiprocessing.process',
Benjamin Petersone711caf2008-06-11 16:44:04 +00001969 'multiprocessing.synchronize', 'multiprocessing.util'
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001970 ]
1971
Charles-François Natalie51c8da2011-09-21 18:48:21 +02001972 if HAS_REDUCTION:
1973 modules.append('multiprocessing.reduction')
1974
Florent Xiclunafd1b0932010-03-28 00:25:02 +00001975 if c_int is not None:
1976 # This module requires _ctypes
1977 modules.append('multiprocessing.sharedctypes')
Benjamin Petersone711caf2008-06-11 16:44:04 +00001978
1979 for name in modules:
1980 __import__(name)
1981 mod = sys.modules[name]
1982
1983 for attr in getattr(mod, '__all__', ()):
1984 self.assertTrue(
1985 hasattr(mod, attr),
1986 '%r does not have attribute %r' % (mod, attr)
1987 )
1988
1989#
1990# Quick test that logging works -- does not test logging output
1991#
1992
1993class _TestLogging(BaseTestCase):
1994
1995 ALLOWED_TYPES = ('processes',)
1996
1997 def test_enable_logging(self):
1998 logger = multiprocessing.get_logger()
1999 logger.setLevel(util.SUBWARNING)
2000 self.assertTrue(logger is not None)
2001 logger.debug('this will not be printed')
2002 logger.info('nor will this')
2003 logger.setLevel(LOG_LEVEL)
2004
Antoine Pitrou0d1b38c2010-11-02 23:50:11 +00002005 @classmethod
2006 def _test_level(cls, conn):
Benjamin Petersone711caf2008-06-11 16:44:04 +00002007 logger = multiprocessing.get_logger()
2008 conn.send(logger.getEffectiveLevel())
2009
2010 def test_level(self):
2011 LEVEL1 = 32
2012 LEVEL2 = 37
2013
2014 logger = multiprocessing.get_logger()
2015 root_logger = logging.getLogger()
2016 root_level = root_logger.level
2017
2018 reader, writer = multiprocessing.Pipe(duplex=False)
2019
2020 logger.setLevel(LEVEL1)
Jesus Cea94f964f2011-09-09 20:26:57 +02002021 p = self.Process(target=self._test_level, args=(writer,))
2022 p.daemon = True
2023 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002024 self.assertEqual(LEVEL1, reader.recv())
2025
2026 logger.setLevel(logging.NOTSET)
2027 root_logger.setLevel(LEVEL2)
Jesus Cea94f964f2011-09-09 20:26:57 +02002028 p = self.Process(target=self._test_level, args=(writer,))
2029 p.daemon = True
2030 p.start()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002031 self.assertEqual(LEVEL2, reader.recv())
2032
2033 root_logger.setLevel(root_level)
2034 logger.setLevel(level=LOG_LEVEL)
2035
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002036
Jesse Nollerf4ae35f2009-11-24 14:22:24 +00002037# class _TestLoggingProcessName(BaseTestCase):
2038#
2039# def handle(self, record):
2040# assert record.processName == multiprocessing.current_process().name
2041# self.__handled = True
2042#
2043# def test_logging(self):
2044# handler = logging.Handler()
2045# handler.handle = self.handle
2046# self.__handled = False
2047# # Bypass getLogger() and side-effects
2048# logger = logging.getLoggerClass()(
2049# 'multiprocessing.test.TestLoggingProcessName')
2050# logger.addHandler(handler)
2051# logger.propagate = False
2052#
2053# logger.warn('foo')
2054# assert self.__handled
Jesse Nollerb9a49b72009-11-21 18:09:38 +00002055
Benjamin Petersone711caf2008-06-11 16:44:04 +00002056#
Jesse Noller6214edd2009-01-19 16:23:53 +00002057# Test to verify handle verification, see issue 3321
2058#
2059
2060class TestInvalidHandle(unittest.TestCase):
2061
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002062 @unittest.skipIf(WIN32, "skipped on Windows")
Jesse Noller6214edd2009-01-19 16:23:53 +00002063 def test_invalid_handles(self):
Jesse Noller6214edd2009-01-19 16:23:53 +00002064 conn = _multiprocessing.Connection(44977608)
2065 self.assertRaises(IOError, conn.poll)
2066 self.assertRaises(IOError, _multiprocessing.Connection, -1)
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002067
Jesse Noller6214edd2009-01-19 16:23:53 +00002068#
Benjamin Petersone711caf2008-06-11 16:44:04 +00002069# Functions used to create test cases from the base ones in this module
2070#
2071
2072def get_attributes(Source, names):
2073 d = {}
2074 for name in names:
2075 obj = getattr(Source, name)
2076 if type(obj) == type(get_attributes):
2077 obj = staticmethod(obj)
2078 d[name] = obj
2079 return d
2080
2081def create_test_cases(Mixin, type):
2082 result = {}
2083 glob = globals()
Florent Xiclunafd1b0932010-03-28 00:25:02 +00002084 Type = type.capitalize()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002085
2086 for name in list(glob.keys()):
2087 if name.startswith('_Test'):
2088 base = glob[name]
2089 if type in base.ALLOWED_TYPES:
2090 newname = 'With' + Type + name[1:]
2091 class Temp(base, unittest.TestCase, Mixin):
2092 pass
2093 result[newname] = Temp
2094 Temp.__name__ = newname
2095 Temp.__module__ = Mixin.__module__
2096 return result
2097
2098#
2099# Create test cases
2100#
2101
2102class ProcessesMixin(object):
2103 TYPE = 'processes'
2104 Process = multiprocessing.Process
2105 locals().update(get_attributes(multiprocessing, (
2106 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2107 'Condition', 'Event', 'Value', 'Array', 'RawValue',
2108 'RawArray', 'current_process', 'active_children', 'Pipe',
2109 'connection', 'JoinableQueue'
2110 )))
2111
2112testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2113globals().update(testcases_processes)
2114
2115
2116class ManagerMixin(object):
2117 TYPE = 'manager'
2118 Process = multiprocessing.Process
2119 manager = object.__new__(multiprocessing.managers.SyncManager)
2120 locals().update(get_attributes(manager, (
2121 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2122 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
2123 'Namespace', 'JoinableQueue'
2124 )))
2125
2126testcases_manager = create_test_cases(ManagerMixin, type='manager')
2127globals().update(testcases_manager)
2128
2129
2130class ThreadsMixin(object):
2131 TYPE = 'threads'
2132 Process = multiprocessing.dummy.Process
2133 locals().update(get_attributes(multiprocessing.dummy, (
2134 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
2135 'Condition', 'Event', 'Value', 'Array', 'current_process',
2136 'active_children', 'Pipe', 'connection', 'dict', 'list',
2137 'Namespace', 'JoinableQueue'
2138 )))
2139
2140testcases_threads = create_test_cases(ThreadsMixin, type='threads')
2141globals().update(testcases_threads)
2142
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002143class OtherTest(unittest.TestCase):
2144 # TODO: add more tests for deliver/answer challenge.
2145 def test_deliver_challenge_auth_failure(self):
2146 class _FakeConnection(object):
2147 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00002148 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002149 def send_bytes(self, data):
2150 pass
2151 self.assertRaises(multiprocessing.AuthenticationError,
2152 multiprocessing.connection.deliver_challenge,
2153 _FakeConnection(), b'abc')
2154
2155 def test_answer_challenge_auth_failure(self):
2156 class _FakeConnection(object):
2157 def __init__(self):
2158 self.count = 0
2159 def recv_bytes(self, size):
2160 self.count += 1
2161 if self.count == 1:
2162 return multiprocessing.connection.CHALLENGE
2163 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00002164 return b'something bogus'
2165 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002166 def send_bytes(self, data):
2167 pass
2168 self.assertRaises(multiprocessing.AuthenticationError,
2169 multiprocessing.connection.answer_challenge,
2170 _FakeConnection(), b'abc')
2171
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00002172#
2173# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
2174#
2175
2176def initializer(ns):
2177 ns.test += 1
2178
2179class TestInitializers(unittest.TestCase):
2180 def setUp(self):
2181 self.mgr = multiprocessing.Manager()
2182 self.ns = self.mgr.Namespace()
2183 self.ns.test = 0
2184
2185 def tearDown(self):
2186 self.mgr.shutdown()
2187
2188 def test_manager_initializer(self):
2189 m = multiprocessing.managers.SyncManager()
2190 self.assertRaises(TypeError, m.start, 1)
2191 m.start(initializer, (self.ns,))
2192 self.assertEqual(self.ns.test, 1)
2193 m.shutdown()
2194
2195 def test_pool_initializer(self):
2196 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
2197 p = multiprocessing.Pool(1, initializer, (self.ns,))
2198 p.close()
2199 p.join()
2200 self.assertEqual(self.ns.test, 1)
2201
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002202#
2203# Issue 5155, 5313, 5331: Test process in processes
2204# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
2205#
2206
2207def _ThisSubProcess(q):
2208 try:
2209 item = q.get(block=False)
2210 except pyqueue.Empty:
2211 pass
2212
2213def _TestProcess(q):
2214 queue = multiprocessing.Queue()
2215 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
Jesus Cea94f964f2011-09-09 20:26:57 +02002216 subProc.daemon = True
Alexandre Vassalottic57a84f2009-07-17 12:07:01 +00002217 subProc.start()
2218 subProc.join()
2219
2220def _afunc(x):
2221 return x*x
2222
2223def pool_in_process():
2224 pool = multiprocessing.Pool(processes=4)
2225 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
2226
2227class _file_like(object):
2228 def __init__(self, delegate):
2229 self._delegate = delegate
2230 self._pid = None
2231
2232 @property
2233 def cache(self):
2234 pid = os.getpid()
2235 # There are no race conditions since fork keeps only the running thread
2236 if pid != self._pid:
2237 self._pid = pid
2238 self._cache = []
2239 return self._cache
2240
2241 def write(self, data):
2242 self.cache.append(data)
2243
2244 def flush(self):
2245 self._delegate.write(''.join(self.cache))
2246 self._cache = []
2247
2248class TestStdinBadfiledescriptor(unittest.TestCase):
2249
2250 def test_queue_in_process(self):
2251 queue = multiprocessing.Queue()
2252 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
2253 proc.start()
2254 proc.join()
2255
2256 def test_pool_in_process(self):
2257 p = multiprocessing.Process(target=pool_in_process)
2258 p.start()
2259 p.join()
2260
2261 def test_flushing(self):
2262 sio = io.StringIO()
2263 flike = _file_like(sio)
2264 flike.write('foo')
2265 proc = multiprocessing.Process(target=lambda: flike.flush())
2266 flike.flush()
2267 assert sio.getvalue() == 'foo'
2268
2269testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
2270 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002271
Benjamin Petersone711caf2008-06-11 16:44:04 +00002272#
2273#
2274#
2275
2276def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00002277 if sys.platform.startswith("linux"):
2278 try:
2279 lock = multiprocessing.RLock()
2280 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00002281 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00002282
Benjamin Petersone711caf2008-06-11 16:44:04 +00002283 if run is None:
2284 from test.support import run_unittest as run
2285
2286 util.get_temp_dir() # creates temp directory for use by all processes
2287
2288 multiprocessing.get_logger().setLevel(LOG_LEVEL)
2289
Benjamin Peterson41181742008-07-02 20:22:54 +00002290 ProcessesMixin.pool = multiprocessing.Pool(4)
2291 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
2292 ManagerMixin.manager.__init__()
2293 ManagerMixin.manager.start()
2294 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00002295
2296 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00002297 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
2298 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00002299 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
2300 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00002301 )
2302
2303 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
2304 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
2305 run(suite)
2306
Benjamin Peterson41181742008-07-02 20:22:54 +00002307 ThreadsMixin.pool.terminate()
2308 ProcessesMixin.pool.terminate()
2309 ManagerMixin.pool.terminate()
2310 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00002311
Benjamin Peterson41181742008-07-02 20:22:54 +00002312 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00002313
2314def main():
2315 test_main(unittest.TextTestRunner(verbosity=2).run)
2316
2317if __name__ == '__main__':
2318 main()