blob: 9f4e1a243512db1de0899a63e95aaacc42e3d71d [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
R. David Murraya44c6b32009-07-29 15:40:30 +000011import io
Benjamin Petersone711caf2008-06-11 16:44:04 +000012import sys
13import os
14import gc
15import signal
16import array
17import copy
18import socket
19import random
20import logging
R. David Murraya21e4ca2009-03-31 23:16:50 +000021import test.support
Benjamin Petersone711caf2008-06-11 16:44:04 +000022
Benjamin Petersone5384b02008-10-04 22:00:42 +000023
R. David Murraya21e4ca2009-03-31 23:16:50 +000024# Skip tests if _multiprocessing wasn't built.
25_multiprocessing = test.support.import_module('_multiprocessing')
26# Skip tests if sem_open implementation is broken.
27test.support.import_module('multiprocessing.synchronize')
Benjamin Petersone5384b02008-10-04 22:00:42 +000028
Benjamin Petersone711caf2008-06-11 16:44:04 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000033import multiprocessing.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +000034
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000041def latin(s):
42 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000043
Benjamin Petersone711caf2008-06-11 16:44:04 +000044#
45# Constants
46#
47
48LOG_LEVEL = util.SUBWARNING
49#LOG_LEVEL = logging.WARNING
50
51DELTA = 0.1
52CHECK_TIMINGS = False # making true makes tests take a lot longer
53 # and can sometimes cause some non-serious
54 # failures because some calls block a bit
55 # longer than expected
56if CHECK_TIMINGS:
57 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
58else:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
60
61HAVE_GETVALUE = not getattr(_multiprocessing,
62 'HAVE_BROKEN_SEM_GETVALUE', False)
63
Jesse Noller6214edd2009-01-19 16:23:53 +000064WIN32 = (sys.platform == "win32")
65
Benjamin Petersone711caf2008-06-11 16:44:04 +000066#
67# Creates a wrapper for a function which records the time it takes to finish
68#
69
70class TimingWrapper(object):
71
72 def __init__(self, func):
73 self.func = func
74 self.elapsed = None
75
76 def __call__(self, *args, **kwds):
77 t = time.time()
78 try:
79 return self.func(*args, **kwds)
80 finally:
81 self.elapsed = time.time() - t
82
83#
84# Base class for test cases
85#
86
87class BaseTestCase(object):
88
89 ALLOWED_TYPES = ('processes', 'manager', 'threads')
90
91 def assertTimingAlmostEqual(self, a, b):
92 if CHECK_TIMINGS:
93 self.assertAlmostEqual(a, b, 1)
94
95 def assertReturnsIfImplemented(self, value, func, *args):
96 try:
97 res = func(*args)
98 except NotImplementedError:
99 pass
100 else:
101 return self.assertEqual(value, res)
102
103#
104# Return the value of a semaphore
105#
106
107def get_value(self):
108 try:
109 return self.get_value()
110 except AttributeError:
111 try:
112 return self._Semaphore__value
113 except AttributeError:
114 try:
115 return self._value
116 except AttributeError:
117 raise NotImplementedError
118
119#
120# Testcases
121#
122
123class _TestProcess(BaseTestCase):
124
125 ALLOWED_TYPES = ('processes', 'threads')
126
127 def test_current(self):
128 if self.TYPE == 'threads':
129 return
130
131 current = self.current_process()
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000132 authkey = current.authkey
Benjamin Petersone711caf2008-06-11 16:44:04 +0000133
134 self.assertTrue(current.is_alive())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000135 self.assertTrue(not current.daemon)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000136 self.assertTrue(isinstance(authkey, bytes))
137 self.assertTrue(len(authkey) > 0)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000138 self.assertEqual(current.ident, os.getpid())
139 self.assertEqual(current.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000140
141 def _test(self, q, *args, **kwds):
142 current = self.current_process()
143 q.put(args)
144 q.put(kwds)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000145 q.put(current.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000146 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000147 q.put(bytes(current.authkey))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000148 q.put(current.pid)
149
150 def test_process(self):
151 q = self.Queue(1)
152 e = self.Event()
153 args = (q, 1, 2)
154 kwargs = {'hello':23, 'bye':2.54}
155 name = 'SomeProcess'
156 p = self.Process(
157 target=self._test, args=args, kwargs=kwargs, name=name
158 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000159 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000160 current = self.current_process()
161
162 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000163 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000164 self.assertEquals(p.is_alive(), False)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000165 self.assertEquals(p.daemon, True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000166 self.assertTrue(p not in self.active_children())
167 self.assertTrue(type(self.active_children()) is list)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000168 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000169
170 p.start()
171
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000172 self.assertEquals(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000173 self.assertEquals(p.is_alive(), True)
174 self.assertTrue(p in self.active_children())
175
176 self.assertEquals(q.get(), args[1:])
177 self.assertEquals(q.get(), kwargs)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000178 self.assertEquals(q.get(), p.name)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000179 if self.TYPE != 'threads':
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000180 self.assertEquals(q.get(), current.authkey)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000181 self.assertEquals(q.get(), p.pid)
182
183 p.join()
184
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000185 self.assertEquals(p.exitcode, 0)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000186 self.assertEquals(p.is_alive(), False)
187 self.assertTrue(p not in self.active_children())
188
189 def _test_terminate(self):
190 time.sleep(1000)
191
192 def test_terminate(self):
193 if self.TYPE == 'threads':
194 return
195
196 p = self.Process(target=self._test_terminate)
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000197 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000198 p.start()
199
200 self.assertEqual(p.is_alive(), True)
201 self.assertTrue(p in self.active_children())
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000202 self.assertEqual(p.exitcode, None)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000203
204 p.terminate()
205
206 join = TimingWrapper(p.join)
207 self.assertEqual(join(), None)
208 self.assertTimingAlmostEqual(join.elapsed, 0.0)
209
210 self.assertEqual(p.is_alive(), False)
211 self.assertTrue(p not in self.active_children())
212
213 p.join()
214
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000215 # XXX sometimes get p.exitcode == 0 on Windows ...
216 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000217
218 def test_cpu_count(self):
219 try:
220 cpus = multiprocessing.cpu_count()
221 except NotImplementedError:
222 cpus = 1
223 self.assertTrue(type(cpus) is int)
224 self.assertTrue(cpus >= 1)
225
226 def test_active_children(self):
227 self.assertEqual(type(self.active_children()), list)
228
229 p = self.Process(target=time.sleep, args=(DELTA,))
230 self.assertTrue(p not in self.active_children())
231
232 p.start()
233 self.assertTrue(p in self.active_children())
234
235 p.join()
236 self.assertTrue(p not in self.active_children())
237
238 def _test_recursion(self, wconn, id):
239 from multiprocessing import forking
240 wconn.send(id)
241 if len(id) < 2:
242 for i in range(2):
243 p = self.Process(
244 target=self._test_recursion, args=(wconn, id+[i])
245 )
246 p.start()
247 p.join()
248
249 def test_recursion(self):
250 rconn, wconn = self.Pipe(duplex=False)
251 self._test_recursion(wconn, [])
252
253 time.sleep(DELTA)
254 result = []
255 while rconn.poll():
256 result.append(rconn.recv())
257
258 expected = [
259 [],
260 [0],
261 [0, 0],
262 [0, 1],
263 [1],
264 [1, 0],
265 [1, 1]
266 ]
267 self.assertEqual(result, expected)
268
269#
270#
271#
272
273class _UpperCaser(multiprocessing.Process):
274
275 def __init__(self):
276 multiprocessing.Process.__init__(self)
277 self.child_conn, self.parent_conn = multiprocessing.Pipe()
278
279 def run(self):
280 self.parent_conn.close()
281 for s in iter(self.child_conn.recv, None):
282 self.child_conn.send(s.upper())
283 self.child_conn.close()
284
285 def submit(self, s):
286 assert type(s) is str
287 self.parent_conn.send(s)
288 return self.parent_conn.recv()
289
290 def stop(self):
291 self.parent_conn.send(None)
292 self.parent_conn.close()
293 self.child_conn.close()
294
295class _TestSubclassingProcess(BaseTestCase):
296
297 ALLOWED_TYPES = ('processes',)
298
299 def test_subclassing(self):
300 uppercaser = _UpperCaser()
301 uppercaser.start()
302 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
303 self.assertEqual(uppercaser.submit('world'), 'WORLD')
304 uppercaser.stop()
305 uppercaser.join()
306
307#
308#
309#
310
311def queue_empty(q):
312 if hasattr(q, 'empty'):
313 return q.empty()
314 else:
315 return q.qsize() == 0
316
317def queue_full(q, maxsize):
318 if hasattr(q, 'full'):
319 return q.full()
320 else:
321 return q.qsize() == maxsize
322
323
324class _TestQueue(BaseTestCase):
325
326
327 def _test_put(self, queue, child_can_start, parent_can_continue):
328 child_can_start.wait()
329 for i in range(6):
330 queue.get()
331 parent_can_continue.set()
332
333 def test_put(self):
334 MAXSIZE = 6
335 queue = self.Queue(maxsize=MAXSIZE)
336 child_can_start = self.Event()
337 parent_can_continue = self.Event()
338
339 proc = self.Process(
340 target=self._test_put,
341 args=(queue, child_can_start, parent_can_continue)
342 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000343 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000344 proc.start()
345
346 self.assertEqual(queue_empty(queue), True)
347 self.assertEqual(queue_full(queue, MAXSIZE), False)
348
349 queue.put(1)
350 queue.put(2, True)
351 queue.put(3, True, None)
352 queue.put(4, False)
353 queue.put(5, False, None)
354 queue.put_nowait(6)
355
356 # the values may be in buffer but not yet in pipe so sleep a bit
357 time.sleep(DELTA)
358
359 self.assertEqual(queue_empty(queue), False)
360 self.assertEqual(queue_full(queue, MAXSIZE), True)
361
362 put = TimingWrapper(queue.put)
363 put_nowait = TimingWrapper(queue.put_nowait)
364
365 self.assertRaises(pyqueue.Full, put, 7, False)
366 self.assertTimingAlmostEqual(put.elapsed, 0)
367
368 self.assertRaises(pyqueue.Full, put, 7, False, None)
369 self.assertTimingAlmostEqual(put.elapsed, 0)
370
371 self.assertRaises(pyqueue.Full, put_nowait, 7)
372 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
373
374 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
375 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
376
377 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
378 self.assertTimingAlmostEqual(put.elapsed, 0)
379
380 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
381 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
382
383 child_can_start.set()
384 parent_can_continue.wait()
385
386 self.assertEqual(queue_empty(queue), True)
387 self.assertEqual(queue_full(queue, MAXSIZE), False)
388
389 proc.join()
390
391 def _test_get(self, queue, child_can_start, parent_can_continue):
392 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000393 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000394 queue.put(2)
395 queue.put(3)
396 queue.put(4)
397 queue.put(5)
398 parent_can_continue.set()
399
400 def test_get(self):
401 queue = self.Queue()
402 child_can_start = self.Event()
403 parent_can_continue = self.Event()
404
405 proc = self.Process(
406 target=self._test_get,
407 args=(queue, child_can_start, parent_can_continue)
408 )
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000409 proc.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000410 proc.start()
411
412 self.assertEqual(queue_empty(queue), True)
413
414 child_can_start.set()
415 parent_can_continue.wait()
416
417 time.sleep(DELTA)
418 self.assertEqual(queue_empty(queue), False)
419
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000420 # Hangs unexpectedly, remove for now
421 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422 self.assertEqual(queue.get(True, None), 2)
423 self.assertEqual(queue.get(True), 3)
424 self.assertEqual(queue.get(timeout=1), 4)
425 self.assertEqual(queue.get_nowait(), 5)
426
427 self.assertEqual(queue_empty(queue), True)
428
429 get = TimingWrapper(queue.get)
430 get_nowait = TimingWrapper(queue.get_nowait)
431
432 self.assertRaises(pyqueue.Empty, get, False)
433 self.assertTimingAlmostEqual(get.elapsed, 0)
434
435 self.assertRaises(pyqueue.Empty, get, False, None)
436 self.assertTimingAlmostEqual(get.elapsed, 0)
437
438 self.assertRaises(pyqueue.Empty, get_nowait)
439 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
440
441 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
442 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
443
444 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
445 self.assertTimingAlmostEqual(get.elapsed, 0)
446
447 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
448 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
449
450 proc.join()
451
452 def _test_fork(self, queue):
453 for i in range(10, 20):
454 queue.put(i)
455 # note that at this point the items may only be buffered, so the
456 # process cannot shutdown until the feeder thread has finished
457 # pushing items onto the pipe.
458
459 def test_fork(self):
460 # Old versions of Queue would fail to create a new feeder
461 # thread for a forked process if the original process had its
462 # own feeder thread. This test checks that this no longer
463 # happens.
464
465 queue = self.Queue()
466
467 # put items on queue so that main process starts a feeder thread
468 for i in range(10):
469 queue.put(i)
470
471 # wait to make sure thread starts before we fork a new process
472 time.sleep(DELTA)
473
474 # fork process
475 p = self.Process(target=self._test_fork, args=(queue,))
476 p.start()
477
478 # check that all expected items are in the queue
479 for i in range(20):
480 self.assertEqual(queue.get(), i)
481 self.assertRaises(pyqueue.Empty, queue.get, False)
482
483 p.join()
484
485 def test_qsize(self):
486 q = self.Queue()
487 try:
488 self.assertEqual(q.qsize(), 0)
489 except NotImplementedError:
490 return
491 q.put(1)
492 self.assertEqual(q.qsize(), 1)
493 q.put(5)
494 self.assertEqual(q.qsize(), 2)
495 q.get()
496 self.assertEqual(q.qsize(), 1)
497 q.get()
498 self.assertEqual(q.qsize(), 0)
499
500 def _test_task_done(self, q):
501 for obj in iter(q.get, None):
502 time.sleep(DELTA)
503 q.task_done()
504
505 def test_task_done(self):
506 queue = self.JoinableQueue()
507
508 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
509 return
510
511 workers = [self.Process(target=self._test_task_done, args=(queue,))
512 for i in range(4)]
513
514 for p in workers:
515 p.start()
516
517 for i in range(10):
518 queue.put(i)
519
520 queue.join()
521
522 for p in workers:
523 queue.put(None)
524
525 for p in workers:
526 p.join()
527
528#
529#
530#
531
532class _TestLock(BaseTestCase):
533
534 def test_lock(self):
535 lock = self.Lock()
536 self.assertEqual(lock.acquire(), True)
537 self.assertEqual(lock.acquire(False), False)
538 self.assertEqual(lock.release(), None)
539 self.assertRaises((ValueError, threading.ThreadError), lock.release)
540
541 def test_rlock(self):
542 lock = self.RLock()
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertEqual(lock.release(), None)
549 self.assertRaises((AssertionError, RuntimeError), lock.release)
550
Jesse Nollerf8d00852009-03-31 03:25:07 +0000551 def test_lock_context(self):
552 with self.Lock():
553 pass
554
Benjamin Petersone711caf2008-06-11 16:44:04 +0000555
556class _TestSemaphore(BaseTestCase):
557
558 def _test_semaphore(self, sem):
559 self.assertReturnsIfImplemented(2, get_value, sem)
560 self.assertEqual(sem.acquire(), True)
561 self.assertReturnsIfImplemented(1, get_value, sem)
562 self.assertEqual(sem.acquire(), True)
563 self.assertReturnsIfImplemented(0, get_value, sem)
564 self.assertEqual(sem.acquire(False), False)
565 self.assertReturnsIfImplemented(0, get_value, sem)
566 self.assertEqual(sem.release(), None)
567 self.assertReturnsIfImplemented(1, get_value, sem)
568 self.assertEqual(sem.release(), None)
569 self.assertReturnsIfImplemented(2, get_value, sem)
570
571 def test_semaphore(self):
572 sem = self.Semaphore(2)
573 self._test_semaphore(sem)
574 self.assertEqual(sem.release(), None)
575 self.assertReturnsIfImplemented(3, get_value, sem)
576 self.assertEqual(sem.release(), None)
577 self.assertReturnsIfImplemented(4, get_value, sem)
578
579 def test_bounded_semaphore(self):
580 sem = self.BoundedSemaphore(2)
581 self._test_semaphore(sem)
582 # Currently fails on OS/X
583 #if HAVE_GETVALUE:
584 # self.assertRaises(ValueError, sem.release)
585 # self.assertReturnsIfImplemented(2, get_value, sem)
586
587 def test_timeout(self):
588 if self.TYPE != 'processes':
589 return
590
591 sem = self.Semaphore(0)
592 acquire = TimingWrapper(sem.acquire)
593
594 self.assertEqual(acquire(False), False)
595 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
596
597 self.assertEqual(acquire(False, None), False)
598 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
599
600 self.assertEqual(acquire(False, TIMEOUT1), False)
601 self.assertTimingAlmostEqual(acquire.elapsed, 0)
602
603 self.assertEqual(acquire(True, TIMEOUT2), False)
604 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
605
606 self.assertEqual(acquire(timeout=TIMEOUT3), False)
607 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
608
609
610class _TestCondition(BaseTestCase):
611
612 def f(self, cond, sleeping, woken, timeout=None):
613 cond.acquire()
614 sleeping.release()
615 cond.wait(timeout)
616 woken.release()
617 cond.release()
618
619 def check_invariant(self, cond):
620 # this is only supposed to succeed when there are no sleepers
621 if self.TYPE == 'processes':
622 try:
623 sleepers = (cond._sleeping_count.get_value() -
624 cond._woken_count.get_value())
625 self.assertEqual(sleepers, 0)
626 self.assertEqual(cond._wait_semaphore.get_value(), 0)
627 except NotImplementedError:
628 pass
629
630 def test_notify(self):
631 cond = self.Condition()
632 sleeping = self.Semaphore(0)
633 woken = self.Semaphore(0)
634
635 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000636 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000637 p.start()
638
639 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000640 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000641 p.start()
642
643 # wait for both children to start sleeping
644 sleeping.acquire()
645 sleeping.acquire()
646
647 # check no process/thread has woken up
648 time.sleep(DELTA)
649 self.assertReturnsIfImplemented(0, get_value, woken)
650
651 # wake up one process/thread
652 cond.acquire()
653 cond.notify()
654 cond.release()
655
656 # check one process/thread has woken up
657 time.sleep(DELTA)
658 self.assertReturnsIfImplemented(1, get_value, woken)
659
660 # wake up another
661 cond.acquire()
662 cond.notify()
663 cond.release()
664
665 # check other has woken up
666 time.sleep(DELTA)
667 self.assertReturnsIfImplemented(2, get_value, woken)
668
669 # check state is not mucked up
670 self.check_invariant(cond)
671 p.join()
672
673 def test_notify_all(self):
674 cond = self.Condition()
675 sleeping = self.Semaphore(0)
676 woken = self.Semaphore(0)
677
678 # start some threads/processes which will timeout
679 for i in range(3):
680 p = self.Process(target=self.f,
681 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000682 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000683 p.start()
684
685 t = threading.Thread(target=self.f,
686 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000687 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000688 t.start()
689
690 # wait for them all to sleep
691 for i in range(6):
692 sleeping.acquire()
693
694 # check they have all timed out
695 for i in range(6):
696 woken.acquire()
697 self.assertReturnsIfImplemented(0, get_value, woken)
698
699 # check state is not mucked up
700 self.check_invariant(cond)
701
702 # start some more threads/processes
703 for i in range(3):
704 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +0000705 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000706 p.start()
707
708 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000709 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000710 t.start()
711
712 # wait for them to all sleep
713 for i in range(6):
714 sleeping.acquire()
715
716 # check no process/thread has woken up
717 time.sleep(DELTA)
718 self.assertReturnsIfImplemented(0, get_value, woken)
719
720 # wake them all up
721 cond.acquire()
722 cond.notify_all()
723 cond.release()
724
725 # check they have all woken
726 time.sleep(DELTA)
727 self.assertReturnsIfImplemented(6, get_value, woken)
728
729 # check state is not mucked up
730 self.check_invariant(cond)
731
732 def test_timeout(self):
733 cond = self.Condition()
734 wait = TimingWrapper(cond.wait)
735 cond.acquire()
736 res = wait(TIMEOUT1)
737 cond.release()
738 self.assertEqual(res, None)
739 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
740
741
742class _TestEvent(BaseTestCase):
743
744 def _test_event(self, event):
745 time.sleep(TIMEOUT2)
746 event.set()
747
748 def test_event(self):
749 event = self.Event()
750 wait = TimingWrapper(event.wait)
751
752 # Removed temporaily, due to API shear, this does not
753 # work with threading._Event objects. is_set == isSet
Benjamin Peterson965ce872009-04-05 21:24:58 +0000754 self.assertEqual(event.is_set(), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000755
Benjamin Peterson965ce872009-04-05 21:24:58 +0000756 # Removed, threading.Event.wait() will return the value of the __flag
757 # instead of None. API Shear with the semaphore backed mp.Event
758 self.assertEqual(wait(0.0), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000759 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000760 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000761 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
762
763 event.set()
764
765 # See note above on the API differences
Benjamin Peterson965ce872009-04-05 21:24:58 +0000766 self.assertEqual(event.is_set(), True)
767 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000768 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Benjamin Peterson965ce872009-04-05 21:24:58 +0000769 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000770 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
771 # self.assertEqual(event.is_set(), True)
772
773 event.clear()
774
775 #self.assertEqual(event.is_set(), False)
776
777 self.Process(target=self._test_event, args=(event,)).start()
Benjamin Peterson965ce872009-04-05 21:24:58 +0000778 self.assertEqual(wait(), True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000779
780#
781#
782#
783
784class _TestValue(BaseTestCase):
785
786 codes_values = [
787 ('i', 4343, 24234),
788 ('d', 3.625, -4.25),
789 ('h', -232, 234),
790 ('c', latin('x'), latin('y'))
791 ]
792
793 def _test(self, values):
794 for sv, cv in zip(values, self.codes_values):
795 sv.value = cv[2]
796
797
798 def test_value(self, raw=False):
799 if self.TYPE != 'processes':
800 return
801
802 if raw:
803 values = [self.RawValue(code, value)
804 for code, value, _ in self.codes_values]
805 else:
806 values = [self.Value(code, value)
807 for code, value, _ in self.codes_values]
808
809 for sv, cv in zip(values, self.codes_values):
810 self.assertEqual(sv.value, cv[1])
811
812 proc = self.Process(target=self._test, args=(values,))
813 proc.start()
814 proc.join()
815
816 for sv, cv in zip(values, self.codes_values):
817 self.assertEqual(sv.value, cv[2])
818
819 def test_rawvalue(self):
820 self.test_value(raw=True)
821
822 def test_getobj_getlock(self):
823 if self.TYPE != 'processes':
824 return
825
826 val1 = self.Value('i', 5)
827 lock1 = val1.get_lock()
828 obj1 = val1.get_obj()
829
830 val2 = self.Value('i', 5, lock=None)
831 lock2 = val2.get_lock()
832 obj2 = val2.get_obj()
833
834 lock = self.Lock()
835 val3 = self.Value('i', 5, lock=lock)
836 lock3 = val3.get_lock()
837 obj3 = val3.get_obj()
838 self.assertEqual(lock, lock3)
839
Jesse Nollerb0516a62009-01-18 03:11:38 +0000840 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000841 self.assertFalse(hasattr(arr4, 'get_lock'))
842 self.assertFalse(hasattr(arr4, 'get_obj'))
843
Jesse Nollerb0516a62009-01-18 03:11:38 +0000844 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
845
846 arr5 = self.RawValue('i', 5)
847 self.assertFalse(hasattr(arr5, 'get_lock'))
848 self.assertFalse(hasattr(arr5, 'get_obj'))
849
Benjamin Petersone711caf2008-06-11 16:44:04 +0000850
851class _TestArray(BaseTestCase):
852
853 def f(self, seq):
854 for i in range(1, len(seq)):
855 seq[i] += seq[i-1]
856
857 def test_array(self, raw=False):
858 if self.TYPE != 'processes':
859 return
860
861 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
862 if raw:
863 arr = self.RawArray('i', seq)
864 else:
865 arr = self.Array('i', seq)
866
867 self.assertEqual(len(arr), len(seq))
868 self.assertEqual(arr[3], seq[3])
869 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
870
871 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
872
873 self.assertEqual(list(arr[:]), seq)
874
875 self.f(seq)
876
877 p = self.Process(target=self.f, args=(arr,))
878 p.start()
879 p.join()
880
881 self.assertEqual(list(arr[:]), seq)
882
883 def test_rawarray(self):
884 self.test_array(raw=True)
885
886 def test_getobj_getlock_obj(self):
887 if self.TYPE != 'processes':
888 return
889
890 arr1 = self.Array('i', list(range(10)))
891 lock1 = arr1.get_lock()
892 obj1 = arr1.get_obj()
893
894 arr2 = self.Array('i', list(range(10)), lock=None)
895 lock2 = arr2.get_lock()
896 obj2 = arr2.get_obj()
897
898 lock = self.Lock()
899 arr3 = self.Array('i', list(range(10)), lock=lock)
900 lock3 = arr3.get_lock()
901 obj3 = arr3.get_obj()
902 self.assertEqual(lock, lock3)
903
Jesse Nollerb0516a62009-01-18 03:11:38 +0000904 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000905 self.assertFalse(hasattr(arr4, 'get_lock'))
906 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Nollerb0516a62009-01-18 03:11:38 +0000907 self.assertRaises(AttributeError,
908 self.Array, 'i', range(10), lock='notalock')
909
910 arr5 = self.RawArray('i', range(10))
911 self.assertFalse(hasattr(arr5, 'get_lock'))
912 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersone711caf2008-06-11 16:44:04 +0000913
914#
915#
916#
917
918class _TestContainers(BaseTestCase):
919
920 ALLOWED_TYPES = ('manager',)
921
922 def test_list(self):
923 a = self.list(list(range(10)))
924 self.assertEqual(a[:], list(range(10)))
925
926 b = self.list()
927 self.assertEqual(b[:], [])
928
929 b.extend(list(range(5)))
930 self.assertEqual(b[:], list(range(5)))
931
932 self.assertEqual(b[2], 2)
933 self.assertEqual(b[2:10], [2,3,4])
934
935 b *= 2
936 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
937
938 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
939
940 self.assertEqual(a[:], list(range(10)))
941
942 d = [a, b]
943 e = self.list(d)
944 self.assertEqual(
945 e[:],
946 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
947 )
948
949 f = self.list([a])
950 a.append('hello')
951 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
952
953 def test_dict(self):
954 d = self.dict()
955 indices = list(range(65, 70))
956 for i in indices:
957 d[i] = chr(i)
958 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
959 self.assertEqual(sorted(d.keys()), indices)
960 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
961 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
962
963 def test_namespace(self):
964 n = self.Namespace()
965 n.name = 'Bob'
966 n.job = 'Builder'
967 n._hidden = 'hidden'
968 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
969 del n.job
970 self.assertEqual(str(n), "Namespace(name='Bob')")
971 self.assertTrue(hasattr(n, 'name'))
972 self.assertTrue(not hasattr(n, 'job'))
973
974#
975#
976#
977
978def sqr(x, wait=0.0):
979 time.sleep(wait)
980 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000981class _TestPool(BaseTestCase):
982
983 def test_apply(self):
984 papply = self.pool.apply
985 self.assertEqual(papply(sqr, (5,)), sqr(5))
986 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
987
988 def test_map(self):
989 pmap = self.pool.map
990 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
991 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
992 list(map(sqr, list(range(100)))))
993
994 def test_async(self):
995 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
996 get = TimingWrapper(res.get)
997 self.assertEqual(get(), 49)
998 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
999
1000 def test_async_timeout(self):
1001 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1002 get = TimingWrapper(res.get)
1003 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1004 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1005
1006 def test_imap(self):
1007 it = self.pool.imap(sqr, list(range(10)))
1008 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1009
1010 it = self.pool.imap(sqr, list(range(10)))
1011 for i in range(10):
1012 self.assertEqual(next(it), i*i)
1013 self.assertRaises(StopIteration, it.__next__)
1014
1015 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1016 for i in range(1000):
1017 self.assertEqual(next(it), i*i)
1018 self.assertRaises(StopIteration, it.__next__)
1019
1020 def test_imap_unordered(self):
1021 it = self.pool.imap_unordered(sqr, list(range(1000)))
1022 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1023
1024 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1025 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1026
1027 def test_make_pool(self):
1028 p = multiprocessing.Pool(3)
1029 self.assertEqual(3, len(p._pool))
1030 p.close()
1031 p.join()
1032
1033 def test_terminate(self):
1034 if self.TYPE == 'manager':
1035 # On Unix a forked process increfs each shared object to
1036 # which its parent process held a reference. If the
1037 # forked process gets terminated then there is likely to
1038 # be a reference leak. So to prevent
1039 # _TestZZZNumberOfObjects from failing we skip this test
1040 # when using a manager.
1041 return
1042
1043 result = self.pool.map_async(
1044 time.sleep, [0.1 for i in range(10000)], chunksize=1
1045 )
1046 self.pool.terminate()
1047 join = TimingWrapper(self.pool.join)
1048 join()
1049 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001050#
1051# Test that manager has expected number of shared objects left
1052#
1053
1054class _TestZZZNumberOfObjects(BaseTestCase):
1055 # Because test cases are sorted alphabetically, this one will get
1056 # run after all the other tests for the manager. It tests that
1057 # there have been no "reference leaks" for the manager's shared
1058 # objects. Note the comment in _TestPool.test_terminate().
1059 ALLOWED_TYPES = ('manager',)
1060
1061 def test_number_of_objects(self):
1062 EXPECTED_NUMBER = 1 # the pool object is still alive
1063 multiprocessing.active_children() # discard dead process objs
1064 gc.collect() # do garbage collection
1065 refs = self.manager._number_of_objects()
Jesse Noller63b3a972009-01-21 02:15:48 +00001066 debug_info = self.manager._debug_info()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001067 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001068 print(self.manager._debug_info())
Jesse Noller63b3a972009-01-21 02:15:48 +00001069 print(debug_info)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001070
1071 self.assertEqual(refs, EXPECTED_NUMBER)
1072
1073#
1074# Test of creating a customized manager class
1075#
1076
1077from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1078
1079class FooBar(object):
1080 def f(self):
1081 return 'f()'
1082 def g(self):
1083 raise ValueError
1084 def _h(self):
1085 return '_h()'
1086
1087def baz():
1088 for i in range(10):
1089 yield i*i
1090
1091class IteratorProxy(BaseProxy):
1092 _exposed_ = ('next', '__next__')
1093 def __iter__(self):
1094 return self
1095 def __next__(self):
1096 return self._callmethod('next')
1097 def __next__(self):
1098 return self._callmethod('__next__')
1099
1100class MyManager(BaseManager):
1101 pass
1102
1103MyManager.register('Foo', callable=FooBar)
1104MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1105MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1106
1107
1108class _TestMyManager(BaseTestCase):
1109
1110 ALLOWED_TYPES = ('manager',)
1111
1112 def test_mymanager(self):
1113 manager = MyManager()
1114 manager.start()
1115
1116 foo = manager.Foo()
1117 bar = manager.Bar()
1118 baz = manager.baz()
1119
1120 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1121 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1122
1123 self.assertEqual(foo_methods, ['f', 'g'])
1124 self.assertEqual(bar_methods, ['f', '_h'])
1125
1126 self.assertEqual(foo.f(), 'f()')
1127 self.assertRaises(ValueError, foo.g)
1128 self.assertEqual(foo._callmethod('f'), 'f()')
1129 self.assertRaises(RemoteError, foo._callmethod, '_h')
1130
1131 self.assertEqual(bar.f(), 'f()')
1132 self.assertEqual(bar._h(), '_h()')
1133 self.assertEqual(bar._callmethod('f'), 'f()')
1134 self.assertEqual(bar._callmethod('_h'), '_h()')
1135
1136 self.assertEqual(list(baz), [i*i for i in range(10)])
1137
1138 manager.shutdown()
1139
1140#
1141# Test of connecting to a remote server and using xmlrpclib for serialization
1142#
1143
1144_queue = pyqueue.Queue()
1145def get_queue():
1146 return _queue
1147
1148class QueueManager(BaseManager):
1149 '''manager class used by server process'''
1150QueueManager.register('get_queue', callable=get_queue)
1151
1152class QueueManager2(BaseManager):
1153 '''manager class which specifies the same interface as QueueManager'''
1154QueueManager2.register('get_queue')
1155
1156
1157SERIALIZER = 'xmlrpclib'
1158
1159class _TestRemoteManager(BaseTestCase):
1160
1161 ALLOWED_TYPES = ('manager',)
1162
1163 def _putter(self, address, authkey):
1164 manager = QueueManager2(
1165 address=address, authkey=authkey, serializer=SERIALIZER
1166 )
1167 manager.connect()
1168 queue = manager.get_queue()
1169 queue.put(('hello world', None, True, 2.25))
1170
1171 def test_remote(self):
1172 authkey = os.urandom(32)
1173
1174 manager = QueueManager(
1175 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1176 )
1177 manager.start()
1178
1179 p = self.Process(target=self._putter, args=(manager.address, authkey))
1180 p.start()
1181
1182 manager2 = QueueManager2(
1183 address=manager.address, authkey=authkey, serializer=SERIALIZER
1184 )
1185 manager2.connect()
1186 queue = manager2.get_queue()
1187
1188 # Note that xmlrpclib will deserialize object as a list not a tuple
1189 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1190
1191 # Because we are using xmlrpclib for serialization instead of
1192 # pickle this will cause a serialization error.
1193 self.assertRaises(Exception, queue.put, time.sleep)
1194
1195 # Make queue finalizer run before the server is stopped
1196 del queue
1197 manager.shutdown()
1198
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001199class _TestManagerRestart(BaseTestCase):
1200
1201 def _putter(self, address, authkey):
1202 manager = QueueManager(
1203 address=address, authkey=authkey, serializer=SERIALIZER)
1204 manager.connect()
1205 queue = manager.get_queue()
1206 queue.put('hello world')
1207
1208 def test_rapid_restart(self):
1209 authkey = os.urandom(32)
1210 manager = QueueManager(
1211 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1212 manager.start()
1213
1214 p = self.Process(target=self._putter, args=(manager.address, authkey))
1215 p.start()
1216 queue = manager.get_queue()
1217 self.assertEqual(queue.get(), 'hello world')
Jesse Noller35d1f002009-03-30 22:59:27 +00001218 del queue
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001219 manager.shutdown()
1220 manager = QueueManager(
1221 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1222 manager.start()
Jesse Noller35d1f002009-03-30 22:59:27 +00001223 manager.shutdown()
Jesse Nollerc5d28a02009-03-30 16:37:36 +00001224
Benjamin Petersone711caf2008-06-11 16:44:04 +00001225#
1226#
1227#
1228
1229SENTINEL = latin('')
1230
1231class _TestConnection(BaseTestCase):
1232
1233 ALLOWED_TYPES = ('processes', 'threads')
1234
1235 def _echo(self, conn):
1236 for msg in iter(conn.recv_bytes, SENTINEL):
1237 conn.send_bytes(msg)
1238 conn.close()
1239
1240 def test_connection(self):
1241 conn, child_conn = self.Pipe()
1242
1243 p = self.Process(target=self._echo, args=(child_conn,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001244 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001245 p.start()
1246
1247 seq = [1, 2.25, None]
1248 msg = latin('hello world')
1249 longmsg = msg * 10
1250 arr = array.array('i', list(range(4)))
1251
1252 if self.TYPE == 'processes':
1253 self.assertEqual(type(conn.fileno()), int)
1254
1255 self.assertEqual(conn.send(seq), None)
1256 self.assertEqual(conn.recv(), seq)
1257
1258 self.assertEqual(conn.send_bytes(msg), None)
1259 self.assertEqual(conn.recv_bytes(), msg)
1260
1261 if self.TYPE == 'processes':
1262 buffer = array.array('i', [0]*10)
1263 expected = list(arr) + [0] * (10 - len(arr))
1264 self.assertEqual(conn.send_bytes(arr), None)
1265 self.assertEqual(conn.recv_bytes_into(buffer),
1266 len(arr) * buffer.itemsize)
1267 self.assertEqual(list(buffer), expected)
1268
1269 buffer = array.array('i', [0]*10)
1270 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1271 self.assertEqual(conn.send_bytes(arr), None)
1272 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1273 len(arr) * buffer.itemsize)
1274 self.assertEqual(list(buffer), expected)
1275
1276 buffer = bytearray(latin(' ' * 40))
1277 self.assertEqual(conn.send_bytes(longmsg), None)
1278 try:
1279 res = conn.recv_bytes_into(buffer)
1280 except multiprocessing.BufferTooShort as e:
1281 self.assertEqual(e.args, (longmsg,))
1282 else:
1283 self.fail('expected BufferTooShort, got %s' % res)
1284
1285 poll = TimingWrapper(conn.poll)
1286
1287 self.assertEqual(poll(), False)
1288 self.assertTimingAlmostEqual(poll.elapsed, 0)
1289
1290 self.assertEqual(poll(TIMEOUT1), False)
1291 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1292
1293 conn.send(None)
1294
1295 self.assertEqual(poll(TIMEOUT1), True)
1296 self.assertTimingAlmostEqual(poll.elapsed, 0)
1297
1298 self.assertEqual(conn.recv(), None)
1299
1300 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1301 conn.send_bytes(really_big_msg)
1302 self.assertEqual(conn.recv_bytes(), really_big_msg)
1303
1304 conn.send_bytes(SENTINEL) # tell child to quit
1305 child_conn.close()
1306
1307 if self.TYPE == 'processes':
1308 self.assertEqual(conn.readable, True)
1309 self.assertEqual(conn.writable, True)
1310 self.assertRaises(EOFError, conn.recv)
1311 self.assertRaises(EOFError, conn.recv_bytes)
1312
1313 p.join()
1314
1315 def test_duplex_false(self):
1316 reader, writer = self.Pipe(duplex=False)
1317 self.assertEqual(writer.send(1), None)
1318 self.assertEqual(reader.recv(), 1)
1319 if self.TYPE == 'processes':
1320 self.assertEqual(reader.readable, True)
1321 self.assertEqual(reader.writable, False)
1322 self.assertEqual(writer.readable, False)
1323 self.assertEqual(writer.writable, True)
1324 self.assertRaises(IOError, reader.send, 2)
1325 self.assertRaises(IOError, writer.recv)
1326 self.assertRaises(IOError, writer.poll)
1327
1328 def test_spawn_close(self):
1329 # We test that a pipe connection can be closed by parent
1330 # process immediately after child is spawned. On Windows this
1331 # would have sometimes failed on old versions because
1332 # child_conn would be closed before the child got a chance to
1333 # duplicate it.
1334 conn, child_conn = self.Pipe()
1335
1336 p = self.Process(target=self._echo, args=(child_conn,))
1337 p.start()
1338 child_conn.close() # this might complete before child initializes
1339
1340 msg = latin('hello')
1341 conn.send_bytes(msg)
1342 self.assertEqual(conn.recv_bytes(), msg)
1343
1344 conn.send_bytes(SENTINEL)
1345 conn.close()
1346 p.join()
1347
1348 def test_sendbytes(self):
1349 if self.TYPE != 'processes':
1350 return
1351
1352 msg = latin('abcdefghijklmnopqrstuvwxyz')
1353 a, b = self.Pipe()
1354
1355 a.send_bytes(msg)
1356 self.assertEqual(b.recv_bytes(), msg)
1357
1358 a.send_bytes(msg, 5)
1359 self.assertEqual(b.recv_bytes(), msg[5:])
1360
1361 a.send_bytes(msg, 7, 8)
1362 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1363
1364 a.send_bytes(msg, 26)
1365 self.assertEqual(b.recv_bytes(), latin(''))
1366
1367 a.send_bytes(msg, 26, 0)
1368 self.assertEqual(b.recv_bytes(), latin(''))
1369
1370 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1371
1372 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1373
1374 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1375
1376 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1377
1378 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1379
Benjamin Petersone711caf2008-06-11 16:44:04 +00001380class _TestListenerClient(BaseTestCase):
1381
1382 ALLOWED_TYPES = ('processes', 'threads')
1383
1384 def _test(self, address):
1385 conn = self.connection.Client(address)
1386 conn.send('hello')
1387 conn.close()
1388
1389 def test_listener_client(self):
1390 for family in self.connection.families:
1391 l = self.connection.Listener(family=family)
1392 p = self.Process(target=self._test, args=(l.address,))
Benjamin Peterson58ea9fe2008-08-19 19:17:39 +00001393 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +00001394 p.start()
1395 conn = l.accept()
1396 self.assertEqual(conn.recv(), 'hello')
1397 p.join()
1398 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001399#
1400# Test of sending connection and socket objects between processes
1401#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001402"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001403class _TestPicklingConnections(BaseTestCase):
1404
1405 ALLOWED_TYPES = ('processes',)
1406
1407 def _listener(self, conn, families):
1408 for fam in families:
1409 l = self.connection.Listener(family=fam)
1410 conn.send(l.address)
1411 new_conn = l.accept()
1412 conn.send(new_conn)
1413
1414 if self.TYPE == 'processes':
1415 l = socket.socket()
1416 l.bind(('localhost', 0))
1417 conn.send(l.getsockname())
1418 l.listen(1)
1419 new_conn, addr = l.accept()
1420 conn.send(new_conn)
1421
1422 conn.recv()
1423
1424 def _remote(self, conn):
1425 for (address, msg) in iter(conn.recv, None):
1426 client = self.connection.Client(address)
1427 client.send(msg.upper())
1428 client.close()
1429
1430 if self.TYPE == 'processes':
1431 address, msg = conn.recv()
1432 client = socket.socket()
1433 client.connect(address)
1434 client.sendall(msg.upper())
1435 client.close()
1436
1437 conn.close()
1438
1439 def test_pickling(self):
1440 try:
1441 multiprocessing.allow_connection_pickling()
1442 except ImportError:
1443 return
1444
1445 families = self.connection.families
1446
1447 lconn, lconn0 = self.Pipe()
1448 lp = self.Process(target=self._listener, args=(lconn0, families))
1449 lp.start()
1450 lconn0.close()
1451
1452 rconn, rconn0 = self.Pipe()
1453 rp = self.Process(target=self._remote, args=(rconn0,))
1454 rp.start()
1455 rconn0.close()
1456
1457 for fam in families:
1458 msg = ('This connection uses family %s' % fam).encode('ascii')
1459 address = lconn.recv()
1460 rconn.send((address, msg))
1461 new_conn = lconn.recv()
1462 self.assertEqual(new_conn.recv(), msg.upper())
1463
1464 rconn.send(None)
1465
1466 if self.TYPE == 'processes':
1467 msg = latin('This connection uses a normal socket')
1468 address = lconn.recv()
1469 rconn.send((address, msg))
1470 if hasattr(socket, 'fromfd'):
1471 new_conn = lconn.recv()
1472 self.assertEqual(new_conn.recv(100), msg.upper())
1473 else:
1474 # XXX On Windows with Py2.6 need to backport fromfd()
1475 discard = lconn.recv_bytes()
1476
1477 lconn.send(None)
1478
1479 rconn.close()
1480 lconn.close()
1481
1482 lp.join()
1483 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001484"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001485#
1486#
1487#
1488
1489class _TestHeap(BaseTestCase):
1490
1491 ALLOWED_TYPES = ('processes',)
1492
1493 def test_heap(self):
1494 iterations = 5000
1495 maxblocks = 50
1496 blocks = []
1497
1498 # create and destroy lots of blocks of different sizes
1499 for i in range(iterations):
1500 size = int(random.lognormvariate(0, 1) * 1000)
1501 b = multiprocessing.heap.BufferWrapper(size)
1502 blocks.append(b)
1503 if len(blocks) > maxblocks:
1504 i = random.randrange(maxblocks)
1505 del blocks[i]
1506
1507 # get the heap object
1508 heap = multiprocessing.heap.BufferWrapper._heap
1509
1510 # verify the state of the heap
1511 all = []
1512 occupied = 0
1513 for L in list(heap._len_to_seq.values()):
1514 for arena, start, stop in L:
1515 all.append((heap._arenas.index(arena), start, stop,
1516 stop-start, 'free'))
1517 for arena, start, stop in heap._allocated_blocks:
1518 all.append((heap._arenas.index(arena), start, stop,
1519 stop-start, 'occupied'))
1520 occupied += (stop-start)
1521
1522 all.sort()
1523
1524 for i in range(len(all)-1):
1525 (arena, start, stop) = all[i][:3]
1526 (narena, nstart, nstop) = all[i+1][:3]
1527 self.assertTrue((arena != narena and nstart == 0) or
1528 (stop == nstart))
1529
1530#
1531#
1532#
1533
1534try:
1535 from ctypes import Structure, Value, copy, c_int, c_double
1536except ImportError:
1537 Structure = object
1538 c_int = c_double = None
1539
1540class _Foo(Structure):
1541 _fields_ = [
1542 ('x', c_int),
1543 ('y', c_double)
1544 ]
1545
1546class _TestSharedCTypes(BaseTestCase):
1547
1548 ALLOWED_TYPES = ('processes',)
1549
1550 def _double(self, x, y, foo, arr, string):
1551 x.value *= 2
1552 y.value *= 2
1553 foo.x *= 2
1554 foo.y *= 2
1555 string.value *= 2
1556 for i in range(len(arr)):
1557 arr[i] *= 2
1558
1559 def test_sharedctypes(self, lock=False):
1560 if c_int is None:
1561 return
1562
1563 x = Value('i', 7, lock=lock)
1564 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1565 foo = Value(_Foo, 3, 2, lock=lock)
1566 arr = Array('d', list(range(10)), lock=lock)
1567 string = Array('c', 20, lock=lock)
1568 string.value = 'hello'
1569
1570 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1571 p.start()
1572 p.join()
1573
1574 self.assertEqual(x.value, 14)
1575 self.assertAlmostEqual(y.value, 2.0/3.0)
1576 self.assertEqual(foo.x, 6)
1577 self.assertAlmostEqual(foo.y, 4.0)
1578 for i in range(10):
1579 self.assertAlmostEqual(arr[i], i*2)
1580 self.assertEqual(string.value, latin('hellohello'))
1581
1582 def test_synchronize(self):
1583 self.test_sharedctypes(lock=True)
1584
1585 def test_copy(self):
1586 if c_int is None:
1587 return
1588
1589 foo = _Foo(2, 5.0)
1590 bar = copy(foo)
1591 foo.x = 0
1592 foo.y = 0
1593 self.assertEqual(bar.x, 2)
1594 self.assertAlmostEqual(bar.y, 5.0)
1595
1596#
1597#
1598#
1599
1600class _TestFinalize(BaseTestCase):
1601
1602 ALLOWED_TYPES = ('processes',)
1603
1604 def _test_finalize(self, conn):
1605 class Foo(object):
1606 pass
1607
1608 a = Foo()
1609 util.Finalize(a, conn.send, args=('a',))
1610 del a # triggers callback for a
1611
1612 b = Foo()
1613 close_b = util.Finalize(b, conn.send, args=('b',))
1614 close_b() # triggers callback for b
1615 close_b() # does nothing because callback has already been called
1616 del b # does nothing because callback has already been called
1617
1618 c = Foo()
1619 util.Finalize(c, conn.send, args=('c',))
1620
1621 d10 = Foo()
1622 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1623
1624 d01 = Foo()
1625 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1626 d02 = Foo()
1627 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1628 d03 = Foo()
1629 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1630
1631 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1632
1633 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1634
1635 # call mutliprocessing's cleanup function then exit process without
1636 # garbage collecting locals
1637 util._exit_function()
1638 conn.close()
1639 os._exit(0)
1640
1641 def test_finalize(self):
1642 conn, child_conn = self.Pipe()
1643
1644 p = self.Process(target=self._test_finalize, args=(child_conn,))
1645 p.start()
1646 p.join()
1647
1648 result = [obj for obj in iter(conn.recv, 'STOP')]
1649 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1650
1651#
1652# Test that from ... import * works for each module
1653#
1654
1655class _TestImportStar(BaseTestCase):
1656
1657 ALLOWED_TYPES = ('processes',)
1658
1659 def test_import(self):
1660 modules = (
1661 'multiprocessing', 'multiprocessing.connection',
1662 'multiprocessing.heap', 'multiprocessing.managers',
1663 'multiprocessing.pool', 'multiprocessing.process',
1664 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1665 'multiprocessing.synchronize', 'multiprocessing.util'
1666 )
1667
1668 for name in modules:
1669 __import__(name)
1670 mod = sys.modules[name]
1671
1672 for attr in getattr(mod, '__all__', ()):
1673 self.assertTrue(
1674 hasattr(mod, attr),
1675 '%r does not have attribute %r' % (mod, attr)
1676 )
1677
1678#
1679# Quick test that logging works -- does not test logging output
1680#
1681
1682class _TestLogging(BaseTestCase):
1683
1684 ALLOWED_TYPES = ('processes',)
1685
1686 def test_enable_logging(self):
1687 logger = multiprocessing.get_logger()
1688 logger.setLevel(util.SUBWARNING)
1689 self.assertTrue(logger is not None)
1690 logger.debug('this will not be printed')
1691 logger.info('nor will this')
1692 logger.setLevel(LOG_LEVEL)
1693
1694 def _test_level(self, conn):
1695 logger = multiprocessing.get_logger()
1696 conn.send(logger.getEffectiveLevel())
1697
1698 def test_level(self):
1699 LEVEL1 = 32
1700 LEVEL2 = 37
1701
1702 logger = multiprocessing.get_logger()
1703 root_logger = logging.getLogger()
1704 root_level = root_logger.level
1705
1706 reader, writer = multiprocessing.Pipe(duplex=False)
1707
1708 logger.setLevel(LEVEL1)
1709 self.Process(target=self._test_level, args=(writer,)).start()
1710 self.assertEqual(LEVEL1, reader.recv())
1711
1712 logger.setLevel(logging.NOTSET)
1713 root_logger.setLevel(LEVEL2)
1714 self.Process(target=self._test_level, args=(writer,)).start()
1715 self.assertEqual(LEVEL2, reader.recv())
1716
1717 root_logger.setLevel(root_level)
1718 logger.setLevel(level=LOG_LEVEL)
1719
1720#
Jesse Noller6214edd2009-01-19 16:23:53 +00001721# Test to verify handle verification, see issue 3321
1722#
1723
1724class TestInvalidHandle(unittest.TestCase):
1725
1726 def test_invalid_handles(self):
1727 if WIN32:
1728 return
1729 conn = _multiprocessing.Connection(44977608)
1730 self.assertRaises(IOError, conn.poll)
1731 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1732#
Benjamin Petersone711caf2008-06-11 16:44:04 +00001733# Functions used to create test cases from the base ones in this module
1734#
1735
1736def get_attributes(Source, names):
1737 d = {}
1738 for name in names:
1739 obj = getattr(Source, name)
1740 if type(obj) == type(get_attributes):
1741 obj = staticmethod(obj)
1742 d[name] = obj
1743 return d
1744
1745def create_test_cases(Mixin, type):
1746 result = {}
1747 glob = globals()
1748 Type = type[0].upper() + type[1:]
1749
1750 for name in list(glob.keys()):
1751 if name.startswith('_Test'):
1752 base = glob[name]
1753 if type in base.ALLOWED_TYPES:
1754 newname = 'With' + Type + name[1:]
1755 class Temp(base, unittest.TestCase, Mixin):
1756 pass
1757 result[newname] = Temp
1758 Temp.__name__ = newname
1759 Temp.__module__ = Mixin.__module__
1760 return result
1761
1762#
1763# Create test cases
1764#
1765
1766class ProcessesMixin(object):
1767 TYPE = 'processes'
1768 Process = multiprocessing.Process
1769 locals().update(get_attributes(multiprocessing, (
1770 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1771 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1772 'RawArray', 'current_process', 'active_children', 'Pipe',
1773 'connection', 'JoinableQueue'
1774 )))
1775
1776testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1777globals().update(testcases_processes)
1778
1779
1780class ManagerMixin(object):
1781 TYPE = 'manager'
1782 Process = multiprocessing.Process
1783 manager = object.__new__(multiprocessing.managers.SyncManager)
1784 locals().update(get_attributes(manager, (
1785 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1786 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1787 'Namespace', 'JoinableQueue'
1788 )))
1789
1790testcases_manager = create_test_cases(ManagerMixin, type='manager')
1791globals().update(testcases_manager)
1792
1793
1794class ThreadsMixin(object):
1795 TYPE = 'threads'
1796 Process = multiprocessing.dummy.Process
1797 locals().update(get_attributes(multiprocessing.dummy, (
1798 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1799 'Condition', 'Event', 'Value', 'Array', 'current_process',
1800 'active_children', 'Pipe', 'connection', 'dict', 'list',
1801 'Namespace', 'JoinableQueue'
1802 )))
1803
1804testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1805globals().update(testcases_threads)
1806
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001807class OtherTest(unittest.TestCase):
1808 # TODO: add more tests for deliver/answer challenge.
1809 def test_deliver_challenge_auth_failure(self):
1810 class _FakeConnection(object):
1811 def recv_bytes(self, size):
Neal Norwitzec105ad2008-08-25 03:05:54 +00001812 return b'something bogus'
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001813 def send_bytes(self, data):
1814 pass
1815 self.assertRaises(multiprocessing.AuthenticationError,
1816 multiprocessing.connection.deliver_challenge,
1817 _FakeConnection(), b'abc')
1818
1819 def test_answer_challenge_auth_failure(self):
1820 class _FakeConnection(object):
1821 def __init__(self):
1822 self.count = 0
1823 def recv_bytes(self, size):
1824 self.count += 1
1825 if self.count == 1:
1826 return multiprocessing.connection.CHALLENGE
1827 elif self.count == 2:
Neal Norwitzec105ad2008-08-25 03:05:54 +00001828 return b'something bogus'
1829 return b''
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001830 def send_bytes(self, data):
1831 pass
1832 self.assertRaises(multiprocessing.AuthenticationError,
1833 multiprocessing.connection.answer_challenge,
1834 _FakeConnection(), b'abc')
1835
Benjamin Petersonf47ed4a2009-04-11 20:45:40 +00001836#
1837# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1838#
1839
1840def initializer(ns):
1841 ns.test += 1
1842
1843class TestInitializers(unittest.TestCase):
1844 def setUp(self):
1845 self.mgr = multiprocessing.Manager()
1846 self.ns = self.mgr.Namespace()
1847 self.ns.test = 0
1848
1849 def tearDown(self):
1850 self.mgr.shutdown()
1851
1852 def test_manager_initializer(self):
1853 m = multiprocessing.managers.SyncManager()
1854 self.assertRaises(TypeError, m.start, 1)
1855 m.start(initializer, (self.ns,))
1856 self.assertEqual(self.ns.test, 1)
1857 m.shutdown()
1858
1859 def test_pool_initializer(self):
1860 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1861 p = multiprocessing.Pool(1, initializer, (self.ns,))
1862 p.close()
1863 p.join()
1864 self.assertEqual(self.ns.test, 1)
1865
R. David Murraya44c6b32009-07-29 15:40:30 +00001866#
1867# Issue 5155, 5313, 5331: Test process in processes
1868# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1869#
1870
1871def _ThisSubProcess(q):
1872 try:
1873 item = q.get(block=False)
1874 except pyqueue.Empty:
1875 pass
1876
1877def _TestProcess(q):
1878 queue = multiprocessing.Queue()
1879 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1880 subProc.start()
1881 subProc.join()
1882
1883def _afunc(x):
1884 return x*x
1885
1886def pool_in_process():
1887 pool = multiprocessing.Pool(processes=4)
1888 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1889
1890class _file_like(object):
1891 def __init__(self, delegate):
1892 self._delegate = delegate
1893 self._pid = None
1894
1895 @property
1896 def cache(self):
1897 pid = os.getpid()
1898 # There are no race conditions since fork keeps only the running thread
1899 if pid != self._pid:
1900 self._pid = pid
1901 self._cache = []
1902 return self._cache
1903
1904 def write(self, data):
1905 self.cache.append(data)
1906
1907 def flush(self):
1908 self._delegate.write(''.join(self.cache))
1909 self._cache = []
1910
1911class TestStdinBadfiledescriptor(unittest.TestCase):
1912
1913 def test_queue_in_process(self):
1914 queue = multiprocessing.Queue()
1915 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1916 proc.start()
1917 proc.join()
1918
1919 def test_pool_in_process(self):
1920 p = multiprocessing.Process(target=pool_in_process)
1921 p.start()
1922 p.join()
1923
1924 def test_flushing(self):
1925 sio = io.StringIO()
1926 flike = _file_like(sio)
1927 flike.write('foo')
1928 proc = multiprocessing.Process(target=lambda: flike.flush())
1929 flike.flush()
1930 assert sio.getvalue() == 'foo'
1931
1932testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1933 TestStdinBadfiledescriptor]
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001934
Benjamin Petersone711caf2008-06-11 16:44:04 +00001935#
1936#
1937#
1938
1939def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001940 if sys.platform.startswith("linux"):
1941 try:
1942 lock = multiprocessing.RLock()
1943 except OSError:
Benjamin Petersone549ead2009-03-28 21:42:05 +00001944 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001945
Benjamin Petersone711caf2008-06-11 16:44:04 +00001946 if run is None:
1947 from test.support import run_unittest as run
1948
1949 util.get_temp_dir() # creates temp directory for use by all processes
1950
1951 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1952
Benjamin Peterson41181742008-07-02 20:22:54 +00001953 ProcessesMixin.pool = multiprocessing.Pool(4)
1954 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1955 ManagerMixin.manager.__init__()
1956 ManagerMixin.manager.start()
1957 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001958
1959 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001960 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1961 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz5d6415e2008-08-25 01:53:32 +00001962 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1963 testcases_other
Benjamin Petersone711caf2008-06-11 16:44:04 +00001964 )
1965
1966 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1967 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1968 run(suite)
1969
Benjamin Peterson41181742008-07-02 20:22:54 +00001970 ThreadsMixin.pool.terminate()
1971 ProcessesMixin.pool.terminate()
1972 ManagerMixin.pool.terminate()
1973 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001974
Benjamin Peterson41181742008-07-02 20:22:54 +00001975 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001976
1977def main():
1978 test_main(unittest.TextTestRunner(verbosity=2).run)
1979
1980if __name__ == '__main__':
1981 main()