blob: b16629b69add4e2a65d81baae3eb018d822262a7 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
Mark Dickinsonc4920e82009-11-20 19:30:22 +000020from test import test_support
Jesse Noller1b90efb2009-06-30 17:11:52 +000021from StringIO import StringIO
Benjamin Petersondfd79492008-06-13 19:13:39 +000022
Jesse Noller37040cd2008-09-30 00:15:45 +000023
R. David Murray3db8a342009-03-30 23:05:48 +000024_multiprocessing = test_support.import_module('_multiprocessing')
25
Jesse Noller37040cd2008-09-30 00:15:45 +000026# Work around broken sem_open implementations
R. David Murray3db8a342009-03-30 23:05:48 +000027test_support.import_module('multiprocessing.synchronize')
Jesse Noller37040cd2008-09-30 00:15:45 +000028
Benjamin Petersondfd79492008-06-13 19:13:39 +000029import multiprocessing.dummy
30import multiprocessing.connection
31import multiprocessing.managers
32import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000033import multiprocessing.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +000034
35from multiprocessing import util
36
37#
38#
39#
40
Benjamin Petersone79edf52008-07-13 18:34:58 +000041latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000042
Benjamin Petersondfd79492008-06-13 19:13:39 +000043#
44# Constants
45#
46
47LOG_LEVEL = util.SUBWARNING
48#LOG_LEVEL = logging.WARNING
49
50DELTA = 0.1
51CHECK_TIMINGS = False # making true makes tests take a lot longer
52 # and can sometimes cause some non-serious
53 # failures because some calls block a bit
54 # longer than expected
55if CHECK_TIMINGS:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
57else:
58 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
59
60HAVE_GETVALUE = not getattr(_multiprocessing,
61 'HAVE_BROKEN_SEM_GETVALUE', False)
62
Jesse Noller9a5b2ad2009-01-19 15:12:22 +000063WIN32 = (sys.platform == "win32")
64
Benjamin Petersondfd79492008-06-13 19:13:39 +000065#
66# Creates a wrapper for a function which records the time it takes to finish
67#
68
69class TimingWrapper(object):
70
71 def __init__(self, func):
72 self.func = func
73 self.elapsed = None
74
75 def __call__(self, *args, **kwds):
76 t = time.time()
77 try:
78 return self.func(*args, **kwds)
79 finally:
80 self.elapsed = time.time() - t
81
82#
83# Base class for test cases
84#
85
86class BaseTestCase(object):
87
88 ALLOWED_TYPES = ('processes', 'manager', 'threads')
89
90 def assertTimingAlmostEqual(self, a, b):
91 if CHECK_TIMINGS:
92 self.assertAlmostEqual(a, b, 1)
93
94 def assertReturnsIfImplemented(self, value, func, *args):
95 try:
96 res = func(*args)
97 except NotImplementedError:
98 pass
99 else:
100 return self.assertEqual(value, res)
101
102#
103# Return the value of a semaphore
104#
105
106def get_value(self):
107 try:
108 return self.get_value()
109 except AttributeError:
110 try:
111 return self._Semaphore__value
112 except AttributeError:
113 try:
114 return self._value
115 except AttributeError:
116 raise NotImplementedError
117
118#
119# Testcases
120#
121
122class _TestProcess(BaseTestCase):
123
124 ALLOWED_TYPES = ('processes', 'threads')
125
126 def test_current(self):
127 if self.TYPE == 'threads':
128 return
129
130 current = self.current_process()
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000131 authkey = current.authkey
Benjamin Petersondfd79492008-06-13 19:13:39 +0000132
133 self.assertTrue(current.is_alive())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000134 self.assertTrue(not current.daemon)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000135 self.assertTrue(isinstance(authkey, bytes))
136 self.assertTrue(len(authkey) > 0)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000137 self.assertEqual(current.ident, os.getpid())
138 self.assertEqual(current.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000139
140 def _test(self, q, *args, **kwds):
141 current = self.current_process()
142 q.put(args)
143 q.put(kwds)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000144 q.put(current.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000145 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000146 q.put(bytes(current.authkey))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000147 q.put(current.pid)
148
149 def test_process(self):
150 q = self.Queue(1)
151 e = self.Event()
152 args = (q, 1, 2)
153 kwargs = {'hello':23, 'bye':2.54}
154 name = 'SomeProcess'
155 p = self.Process(
156 target=self._test, args=args, kwargs=kwargs, name=name
157 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000158 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000159 current = self.current_process()
160
161 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000162 self.assertEquals(p.authkey, current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000163 self.assertEquals(p.is_alive(), False)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000164 self.assertEquals(p.daemon, True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000165 self.assertTrue(p not in self.active_children())
166 self.assertTrue(type(self.active_children()) is list)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000167 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000168
169 p.start()
170
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000171 self.assertEquals(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000172 self.assertEquals(p.is_alive(), True)
173 self.assertTrue(p in self.active_children())
174
175 self.assertEquals(q.get(), args[1:])
176 self.assertEquals(q.get(), kwargs)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000177 self.assertEquals(q.get(), p.name)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000178 if self.TYPE != 'threads':
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000179 self.assertEquals(q.get(), current.authkey)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000180 self.assertEquals(q.get(), p.pid)
181
182 p.join()
183
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000184 self.assertEquals(p.exitcode, 0)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000185 self.assertEquals(p.is_alive(), False)
186 self.assertTrue(p not in self.active_children())
187
188 def _test_terminate(self):
189 time.sleep(1000)
190
191 def test_terminate(self):
192 if self.TYPE == 'threads':
193 return
194
195 p = self.Process(target=self._test_terminate)
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000196 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000197 p.start()
198
199 self.assertEqual(p.is_alive(), True)
200 self.assertTrue(p in self.active_children())
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000201 self.assertEqual(p.exitcode, None)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000202
203 p.terminate()
204
205 join = TimingWrapper(p.join)
206 self.assertEqual(join(), None)
207 self.assertTimingAlmostEqual(join.elapsed, 0.0)
208
209 self.assertEqual(p.is_alive(), False)
210 self.assertTrue(p not in self.active_children())
211
212 p.join()
213
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000214 # XXX sometimes get p.exitcode == 0 on Windows ...
215 #self.assertEqual(p.exitcode, -signal.SIGTERM)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000216
217 def test_cpu_count(self):
218 try:
219 cpus = multiprocessing.cpu_count()
220 except NotImplementedError:
221 cpus = 1
222 self.assertTrue(type(cpus) is int)
223 self.assertTrue(cpus >= 1)
224
225 def test_active_children(self):
226 self.assertEqual(type(self.active_children()), list)
227
228 p = self.Process(target=time.sleep, args=(DELTA,))
229 self.assertTrue(p not in self.active_children())
230
231 p.start()
232 self.assertTrue(p in self.active_children())
233
234 p.join()
235 self.assertTrue(p not in self.active_children())
236
237 def _test_recursion(self, wconn, id):
238 from multiprocessing import forking
239 wconn.send(id)
240 if len(id) < 2:
241 for i in range(2):
242 p = self.Process(
243 target=self._test_recursion, args=(wconn, id+[i])
244 )
245 p.start()
246 p.join()
247
248 def test_recursion(self):
249 rconn, wconn = self.Pipe(duplex=False)
250 self._test_recursion(wconn, [])
251
252 time.sleep(DELTA)
253 result = []
254 while rconn.poll():
255 result.append(rconn.recv())
256
257 expected = [
258 [],
259 [0],
260 [0, 0],
261 [0, 1],
262 [1],
263 [1, 0],
264 [1, 1]
265 ]
266 self.assertEqual(result, expected)
267
268#
269#
270#
271
272class _UpperCaser(multiprocessing.Process):
273
274 def __init__(self):
275 multiprocessing.Process.__init__(self)
276 self.child_conn, self.parent_conn = multiprocessing.Pipe()
277
278 def run(self):
279 self.parent_conn.close()
280 for s in iter(self.child_conn.recv, None):
281 self.child_conn.send(s.upper())
282 self.child_conn.close()
283
284 def submit(self, s):
285 assert type(s) is str
286 self.parent_conn.send(s)
287 return self.parent_conn.recv()
288
289 def stop(self):
290 self.parent_conn.send(None)
291 self.parent_conn.close()
292 self.child_conn.close()
293
294class _TestSubclassingProcess(BaseTestCase):
295
296 ALLOWED_TYPES = ('processes',)
297
298 def test_subclassing(self):
299 uppercaser = _UpperCaser()
300 uppercaser.start()
301 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
302 self.assertEqual(uppercaser.submit('world'), 'WORLD')
303 uppercaser.stop()
304 uppercaser.join()
305
306#
307#
308#
309
310def queue_empty(q):
311 if hasattr(q, 'empty'):
312 return q.empty()
313 else:
314 return q.qsize() == 0
315
316def queue_full(q, maxsize):
317 if hasattr(q, 'full'):
318 return q.full()
319 else:
320 return q.qsize() == maxsize
321
322
323class _TestQueue(BaseTestCase):
324
325
326 def _test_put(self, queue, child_can_start, parent_can_continue):
327 child_can_start.wait()
328 for i in range(6):
329 queue.get()
330 parent_can_continue.set()
331
332 def test_put(self):
333 MAXSIZE = 6
334 queue = self.Queue(maxsize=MAXSIZE)
335 child_can_start = self.Event()
336 parent_can_continue = self.Event()
337
338 proc = self.Process(
339 target=self._test_put,
340 args=(queue, child_can_start, parent_can_continue)
341 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000342 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000343 proc.start()
344
345 self.assertEqual(queue_empty(queue), True)
346 self.assertEqual(queue_full(queue, MAXSIZE), False)
347
348 queue.put(1)
349 queue.put(2, True)
350 queue.put(3, True, None)
351 queue.put(4, False)
352 queue.put(5, False, None)
353 queue.put_nowait(6)
354
355 # the values may be in buffer but not yet in pipe so sleep a bit
356 time.sleep(DELTA)
357
358 self.assertEqual(queue_empty(queue), False)
359 self.assertEqual(queue_full(queue, MAXSIZE), True)
360
361 put = TimingWrapper(queue.put)
362 put_nowait = TimingWrapper(queue.put_nowait)
363
364 self.assertRaises(Queue.Full, put, 7, False)
365 self.assertTimingAlmostEqual(put.elapsed, 0)
366
367 self.assertRaises(Queue.Full, put, 7, False, None)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(Queue.Full, put_nowait, 7)
371 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
372
373 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
374 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
375
376 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
377 self.assertTimingAlmostEqual(put.elapsed, 0)
378
379 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
380 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
381
382 child_can_start.set()
383 parent_can_continue.wait()
384
385 self.assertEqual(queue_empty(queue), True)
386 self.assertEqual(queue_full(queue, MAXSIZE), False)
387
388 proc.join()
389
390 def _test_get(self, queue, child_can_start, parent_can_continue):
391 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000392 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000393 queue.put(2)
394 queue.put(3)
395 queue.put(4)
396 queue.put(5)
397 parent_can_continue.set()
398
399 def test_get(self):
400 queue = self.Queue()
401 child_can_start = self.Event()
402 parent_can_continue = self.Event()
403
404 proc = self.Process(
405 target=self._test_get,
406 args=(queue, child_can_start, parent_can_continue)
407 )
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000408 proc.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000409 proc.start()
410
411 self.assertEqual(queue_empty(queue), True)
412
413 child_can_start.set()
414 parent_can_continue.wait()
415
416 time.sleep(DELTA)
417 self.assertEqual(queue_empty(queue), False)
418
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000419 # Hangs unexpectedly, remove for now
420 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000421 self.assertEqual(queue.get(True, None), 2)
422 self.assertEqual(queue.get(True), 3)
423 self.assertEqual(queue.get(timeout=1), 4)
424 self.assertEqual(queue.get_nowait(), 5)
425
426 self.assertEqual(queue_empty(queue), True)
427
428 get = TimingWrapper(queue.get)
429 get_nowait = TimingWrapper(queue.get_nowait)
430
431 self.assertRaises(Queue.Empty, get, False)
432 self.assertTimingAlmostEqual(get.elapsed, 0)
433
434 self.assertRaises(Queue.Empty, get, False, None)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(Queue.Empty, get_nowait)
438 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
439
440 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
442
443 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
444 self.assertTimingAlmostEqual(get.elapsed, 0)
445
446 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
447 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
448
449 proc.join()
450
451 def _test_fork(self, queue):
452 for i in range(10, 20):
453 queue.put(i)
454 # note that at this point the items may only be buffered, so the
455 # process cannot shutdown until the feeder thread has finished
456 # pushing items onto the pipe.
457
458 def test_fork(self):
459 # Old versions of Queue would fail to create a new feeder
460 # thread for a forked process if the original process had its
461 # own feeder thread. This test checks that this no longer
462 # happens.
463
464 queue = self.Queue()
465
466 # put items on queue so that main process starts a feeder thread
467 for i in range(10):
468 queue.put(i)
469
470 # wait to make sure thread starts before we fork a new process
471 time.sleep(DELTA)
472
473 # fork process
474 p = self.Process(target=self._test_fork, args=(queue,))
475 p.start()
476
477 # check that all expected items are in the queue
478 for i in range(20):
479 self.assertEqual(queue.get(), i)
480 self.assertRaises(Queue.Empty, queue.get, False)
481
482 p.join()
483
484 def test_qsize(self):
485 q = self.Queue()
486 try:
487 self.assertEqual(q.qsize(), 0)
488 except NotImplementedError:
489 return
490 q.put(1)
491 self.assertEqual(q.qsize(), 1)
492 q.put(5)
493 self.assertEqual(q.qsize(), 2)
494 q.get()
495 self.assertEqual(q.qsize(), 1)
496 q.get()
497 self.assertEqual(q.qsize(), 0)
498
499 def _test_task_done(self, q):
500 for obj in iter(q.get, None):
501 time.sleep(DELTA)
502 q.task_done()
503
504 def test_task_done(self):
505 queue = self.JoinableQueue()
506
507 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
508 return
509
510 workers = [self.Process(target=self._test_task_done, args=(queue,))
511 for i in xrange(4)]
512
513 for p in workers:
514 p.start()
515
516 for i in xrange(10):
517 queue.put(i)
518
519 queue.join()
520
521 for p in workers:
522 queue.put(None)
523
524 for p in workers:
525 p.join()
526
527#
528#
529#
530
531class _TestLock(BaseTestCase):
532
533 def test_lock(self):
534 lock = self.Lock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(False), False)
537 self.assertEqual(lock.release(), None)
538 self.assertRaises((ValueError, threading.ThreadError), lock.release)
539
540 def test_rlock(self):
541 lock = self.RLock()
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((AssertionError, RuntimeError), lock.release)
549
Jesse Noller82eb5902009-03-30 23:29:31 +0000550 def test_lock_context(self):
551 with self.Lock():
552 pass
553
Benjamin Petersondfd79492008-06-13 19:13:39 +0000554
555class _TestSemaphore(BaseTestCase):
556
557 def _test_semaphore(self, sem):
558 self.assertReturnsIfImplemented(2, get_value, sem)
559 self.assertEqual(sem.acquire(), True)
560 self.assertReturnsIfImplemented(1, get_value, sem)
561 self.assertEqual(sem.acquire(), True)
562 self.assertReturnsIfImplemented(0, get_value, sem)
563 self.assertEqual(sem.acquire(False), False)
564 self.assertReturnsIfImplemented(0, get_value, sem)
565 self.assertEqual(sem.release(), None)
566 self.assertReturnsIfImplemented(1, get_value, sem)
567 self.assertEqual(sem.release(), None)
568 self.assertReturnsIfImplemented(2, get_value, sem)
569
570 def test_semaphore(self):
571 sem = self.Semaphore(2)
572 self._test_semaphore(sem)
573 self.assertEqual(sem.release(), None)
574 self.assertReturnsIfImplemented(3, get_value, sem)
575 self.assertEqual(sem.release(), None)
576 self.assertReturnsIfImplemented(4, get_value, sem)
577
578 def test_bounded_semaphore(self):
579 sem = self.BoundedSemaphore(2)
580 self._test_semaphore(sem)
581 # Currently fails on OS/X
582 #if HAVE_GETVALUE:
583 # self.assertRaises(ValueError, sem.release)
584 # self.assertReturnsIfImplemented(2, get_value, sem)
585
586 def test_timeout(self):
587 if self.TYPE != 'processes':
588 return
589
590 sem = self.Semaphore(0)
591 acquire = TimingWrapper(sem.acquire)
592
593 self.assertEqual(acquire(False), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
595
596 self.assertEqual(acquire(False, None), False)
597 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
598
599 self.assertEqual(acquire(False, TIMEOUT1), False)
600 self.assertTimingAlmostEqual(acquire.elapsed, 0)
601
602 self.assertEqual(acquire(True, TIMEOUT2), False)
603 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
604
605 self.assertEqual(acquire(timeout=TIMEOUT3), False)
606 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
607
608
609class _TestCondition(BaseTestCase):
610
611 def f(self, cond, sleeping, woken, timeout=None):
612 cond.acquire()
613 sleeping.release()
614 cond.wait(timeout)
615 woken.release()
616 cond.release()
617
618 def check_invariant(self, cond):
619 # this is only supposed to succeed when there are no sleepers
620 if self.TYPE == 'processes':
621 try:
622 sleepers = (cond._sleeping_count.get_value() -
623 cond._woken_count.get_value())
624 self.assertEqual(sleepers, 0)
625 self.assertEqual(cond._wait_semaphore.get_value(), 0)
626 except NotImplementedError:
627 pass
628
629 def test_notify(self):
630 cond = self.Condition()
631 sleeping = self.Semaphore(0)
632 woken = self.Semaphore(0)
633
634 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000635 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000636 p.start()
637
638 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000639 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000640 p.start()
641
642 # wait for both children to start sleeping
643 sleeping.acquire()
644 sleeping.acquire()
645
646 # check no process/thread has woken up
647 time.sleep(DELTA)
648 self.assertReturnsIfImplemented(0, get_value, woken)
649
650 # wake up one process/thread
651 cond.acquire()
652 cond.notify()
653 cond.release()
654
655 # check one process/thread has woken up
656 time.sleep(DELTA)
657 self.assertReturnsIfImplemented(1, get_value, woken)
658
659 # wake up another
660 cond.acquire()
661 cond.notify()
662 cond.release()
663
664 # check other has woken up
665 time.sleep(DELTA)
666 self.assertReturnsIfImplemented(2, get_value, woken)
667
668 # check state is not mucked up
669 self.check_invariant(cond)
670 p.join()
671
672 def test_notify_all(self):
673 cond = self.Condition()
674 sleeping = self.Semaphore(0)
675 woken = self.Semaphore(0)
676
677 # start some threads/processes which will timeout
678 for i in range(3):
679 p = self.Process(target=self.f,
680 args=(cond, sleeping, woken, TIMEOUT1))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000681 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000682 p.start()
683
684 t = threading.Thread(target=self.f,
685 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000686 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000687 t.start()
688
689 # wait for them all to sleep
690 for i in xrange(6):
691 sleeping.acquire()
692
693 # check they have all timed out
694 for i in xrange(6):
695 woken.acquire()
696 self.assertReturnsIfImplemented(0, get_value, woken)
697
698 # check state is not mucked up
699 self.check_invariant(cond)
700
701 # start some more threads/processes
702 for i in range(3):
703 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +0000704 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000705 p.start()
706
707 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000708 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000709 t.start()
710
711 # wait for them to all sleep
712 for i in xrange(6):
713 sleeping.acquire()
714
715 # check no process/thread has woken up
716 time.sleep(DELTA)
717 self.assertReturnsIfImplemented(0, get_value, woken)
718
719 # wake them all up
720 cond.acquire()
721 cond.notify_all()
722 cond.release()
723
724 # check they have all woken
725 time.sleep(DELTA)
726 self.assertReturnsIfImplemented(6, get_value, woken)
727
728 # check state is not mucked up
729 self.check_invariant(cond)
730
731 def test_timeout(self):
732 cond = self.Condition()
733 wait = TimingWrapper(cond.wait)
734 cond.acquire()
735 res = wait(TIMEOUT1)
736 cond.release()
737 self.assertEqual(res, None)
738 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
739
740
741class _TestEvent(BaseTestCase):
742
743 def _test_event(self, event):
744 time.sleep(TIMEOUT2)
745 event.set()
746
747 def test_event(self):
748 event = self.Event()
749 wait = TimingWrapper(event.wait)
750
751 # Removed temporaily, due to API shear, this does not
752 # work with threading._Event objects. is_set == isSet
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000753 self.assertEqual(event.is_set(), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000754
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000755 # Removed, threading.Event.wait() will return the value of the __flag
756 # instead of None. API Shear with the semaphore backed mp.Event
757 self.assertEqual(wait(0.0), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000758 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000759 self.assertEqual(wait(TIMEOUT1), False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000760 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
761
762 event.set()
763
764 # See note above on the API differences
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000765 self.assertEqual(event.is_set(), True)
766 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000767 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000768 self.assertEqual(wait(TIMEOUT1), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000769 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
770 # self.assertEqual(event.is_set(), True)
771
772 event.clear()
773
774 #self.assertEqual(event.is_set(), False)
775
776 self.Process(target=self._test_event, args=(event,)).start()
Jesse Noller02cb0eb2009-04-01 03:45:50 +0000777 self.assertEqual(wait(), True)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000778
779#
780#
781#
782
783class _TestValue(BaseTestCase):
784
785 codes_values = [
786 ('i', 4343, 24234),
787 ('d', 3.625, -4.25),
788 ('h', -232, 234),
789 ('c', latin('x'), latin('y'))
790 ]
791
792 def _test(self, values):
793 for sv, cv in zip(values, self.codes_values):
794 sv.value = cv[2]
795
796
797 def test_value(self, raw=False):
798 if self.TYPE != 'processes':
799 return
800
801 if raw:
802 values = [self.RawValue(code, value)
803 for code, value, _ in self.codes_values]
804 else:
805 values = [self.Value(code, value)
806 for code, value, _ in self.codes_values]
807
808 for sv, cv in zip(values, self.codes_values):
809 self.assertEqual(sv.value, cv[1])
810
811 proc = self.Process(target=self._test, args=(values,))
812 proc.start()
813 proc.join()
814
815 for sv, cv in zip(values, self.codes_values):
816 self.assertEqual(sv.value, cv[2])
817
818 def test_rawvalue(self):
819 self.test_value(raw=True)
820
821 def test_getobj_getlock(self):
822 if self.TYPE != 'processes':
823 return
824
825 val1 = self.Value('i', 5)
826 lock1 = val1.get_lock()
827 obj1 = val1.get_obj()
828
829 val2 = self.Value('i', 5, lock=None)
830 lock2 = val2.get_lock()
831 obj2 = val2.get_obj()
832
833 lock = self.Lock()
834 val3 = self.Value('i', 5, lock=lock)
835 lock3 = val3.get_lock()
836 obj3 = val3.get_obj()
837 self.assertEqual(lock, lock3)
838
Jesse Noller6ab22152009-01-18 02:45:38 +0000839 arr4 = self.Value('i', 5, lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000840 self.assertFalse(hasattr(arr4, 'get_lock'))
841 self.assertFalse(hasattr(arr4, 'get_obj'))
842
Jesse Noller6ab22152009-01-18 02:45:38 +0000843 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
844
845 arr5 = self.RawValue('i', 5)
846 self.assertFalse(hasattr(arr5, 'get_lock'))
847 self.assertFalse(hasattr(arr5, 'get_obj'))
848
Benjamin Petersondfd79492008-06-13 19:13:39 +0000849
850class _TestArray(BaseTestCase):
851
852 def f(self, seq):
853 for i in range(1, len(seq)):
854 seq[i] += seq[i-1]
855
856 def test_array(self, raw=False):
857 if self.TYPE != 'processes':
858 return
859
860 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
861 if raw:
862 arr = self.RawArray('i', seq)
863 else:
864 arr = self.Array('i', seq)
865
866 self.assertEqual(len(arr), len(seq))
867 self.assertEqual(arr[3], seq[3])
868 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
869
870 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
871
872 self.assertEqual(list(arr[:]), seq)
873
874 self.f(seq)
875
876 p = self.Process(target=self.f, args=(arr,))
877 p.start()
878 p.join()
879
880 self.assertEqual(list(arr[:]), seq)
881
882 def test_rawarray(self):
883 self.test_array(raw=True)
884
885 def test_getobj_getlock_obj(self):
886 if self.TYPE != 'processes':
887 return
888
889 arr1 = self.Array('i', range(10))
890 lock1 = arr1.get_lock()
891 obj1 = arr1.get_obj()
892
893 arr2 = self.Array('i', range(10), lock=None)
894 lock2 = arr2.get_lock()
895 obj2 = arr2.get_obj()
896
897 lock = self.Lock()
898 arr3 = self.Array('i', range(10), lock=lock)
899 lock3 = arr3.get_lock()
900 obj3 = arr3.get_obj()
901 self.assertEqual(lock, lock3)
902
Jesse Noller6ab22152009-01-18 02:45:38 +0000903 arr4 = self.Array('i', range(10), lock=False)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000904 self.assertFalse(hasattr(arr4, 'get_lock'))
905 self.assertFalse(hasattr(arr4, 'get_obj'))
Jesse Noller6ab22152009-01-18 02:45:38 +0000906 self.assertRaises(AttributeError,
907 self.Array, 'i', range(10), lock='notalock')
908
909 arr5 = self.RawArray('i', range(10))
910 self.assertFalse(hasattr(arr5, 'get_lock'))
911 self.assertFalse(hasattr(arr5, 'get_obj'))
Benjamin Petersondfd79492008-06-13 19:13:39 +0000912
913#
914#
915#
916
917class _TestContainers(BaseTestCase):
918
919 ALLOWED_TYPES = ('manager',)
920
921 def test_list(self):
922 a = self.list(range(10))
923 self.assertEqual(a[:], range(10))
924
925 b = self.list()
926 self.assertEqual(b[:], [])
927
928 b.extend(range(5))
929 self.assertEqual(b[:], range(5))
930
931 self.assertEqual(b[2], 2)
932 self.assertEqual(b[2:10], [2,3,4])
933
934 b *= 2
935 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
936
937 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
938
939 self.assertEqual(a[:], range(10))
940
941 d = [a, b]
942 e = self.list(d)
943 self.assertEqual(
944 e[:],
945 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
946 )
947
948 f = self.list([a])
949 a.append('hello')
950 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
951
952 def test_dict(self):
953 d = self.dict()
954 indices = range(65, 70)
955 for i in indices:
956 d[i] = chr(i)
957 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
958 self.assertEqual(sorted(d.keys()), indices)
959 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
960 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
961
962 def test_namespace(self):
963 n = self.Namespace()
964 n.name = 'Bob'
965 n.job = 'Builder'
966 n._hidden = 'hidden'
967 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
968 del n.job
969 self.assertEqual(str(n), "Namespace(name='Bob')")
970 self.assertTrue(hasattr(n, 'name'))
971 self.assertTrue(not hasattr(n, 'job'))
972
973#
974#
975#
976
977def sqr(x, wait=0.0):
978 time.sleep(wait)
979 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000980class _TestPool(BaseTestCase):
981
982 def test_apply(self):
983 papply = self.pool.apply
984 self.assertEqual(papply(sqr, (5,)), sqr(5))
985 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
986
987 def test_map(self):
988 pmap = self.pool.map
989 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
990 self.assertEqual(pmap(sqr, range(100), chunksize=20),
991 map(sqr, range(100)))
992
Jesse Noller7530e472009-07-16 14:23:04 +0000993 def test_map_chunksize(self):
994 try:
995 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
996 except multiprocessing.TimeoutError:
997 self.fail("pool.map_async with chunksize stalled on null list")
998
Benjamin Petersondfd79492008-06-13 19:13:39 +0000999 def test_async(self):
1000 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1001 get = TimingWrapper(res.get)
1002 self.assertEqual(get(), 49)
1003 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1004
1005 def test_async_timeout(self):
1006 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
1007 get = TimingWrapper(res.get)
1008 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1009 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1010
1011 def test_imap(self):
1012 it = self.pool.imap(sqr, range(10))
1013 self.assertEqual(list(it), map(sqr, range(10)))
1014
1015 it = self.pool.imap(sqr, range(10))
1016 for i in range(10):
1017 self.assertEqual(it.next(), i*i)
1018 self.assertRaises(StopIteration, it.next)
1019
1020 it = self.pool.imap(sqr, range(1000), chunksize=100)
1021 for i in range(1000):
1022 self.assertEqual(it.next(), i*i)
1023 self.assertRaises(StopIteration, it.next)
1024
1025 def test_imap_unordered(self):
1026 it = self.pool.imap_unordered(sqr, range(1000))
1027 self.assertEqual(sorted(it), map(sqr, range(1000)))
1028
1029 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1030 self.assertEqual(sorted(it), map(sqr, range(1000)))
1031
1032 def test_make_pool(self):
1033 p = multiprocessing.Pool(3)
1034 self.assertEqual(3, len(p._pool))
1035 p.close()
1036 p.join()
1037
1038 def test_terminate(self):
1039 if self.TYPE == 'manager':
1040 # On Unix a forked process increfs each shared object to
1041 # which its parent process held a reference. If the
1042 # forked process gets terminated then there is likely to
1043 # be a reference leak. So to prevent
1044 # _TestZZZNumberOfObjects from failing we skip this test
1045 # when using a manager.
1046 return
1047
1048 result = self.pool.map_async(
1049 time.sleep, [0.1 for i in range(10000)], chunksize=1
1050 )
1051 self.pool.terminate()
1052 join = TimingWrapper(self.pool.join)
1053 join()
1054 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001055#
1056# Test that manager has expected number of shared objects left
1057#
1058
1059class _TestZZZNumberOfObjects(BaseTestCase):
1060 # Because test cases are sorted alphabetically, this one will get
1061 # run after all the other tests for the manager. It tests that
1062 # there have been no "reference leaks" for the manager's shared
1063 # objects. Note the comment in _TestPool.test_terminate().
1064 ALLOWED_TYPES = ('manager',)
1065
1066 def test_number_of_objects(self):
1067 EXPECTED_NUMBER = 1 # the pool object is still alive
1068 multiprocessing.active_children() # discard dead process objs
1069 gc.collect() # do garbage collection
1070 refs = self.manager._number_of_objects()
Jesse Noller7314b382009-01-21 02:08:17 +00001071 debug_info = self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001072 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001073 print self.manager._debug_info()
Jesse Noller7314b382009-01-21 02:08:17 +00001074 print debug_info
Benjamin Petersondfd79492008-06-13 19:13:39 +00001075
1076 self.assertEqual(refs, EXPECTED_NUMBER)
1077
1078#
1079# Test of creating a customized manager class
1080#
1081
1082from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1083
1084class FooBar(object):
1085 def f(self):
1086 return 'f()'
1087 def g(self):
1088 raise ValueError
1089 def _h(self):
1090 return '_h()'
1091
1092def baz():
1093 for i in xrange(10):
1094 yield i*i
1095
1096class IteratorProxy(BaseProxy):
1097 _exposed_ = ('next', '__next__')
1098 def __iter__(self):
1099 return self
1100 def next(self):
1101 return self._callmethod('next')
1102 def __next__(self):
1103 return self._callmethod('__next__')
1104
1105class MyManager(BaseManager):
1106 pass
1107
1108MyManager.register('Foo', callable=FooBar)
1109MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1110MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1111
1112
1113class _TestMyManager(BaseTestCase):
1114
1115 ALLOWED_TYPES = ('manager',)
1116
1117 def test_mymanager(self):
1118 manager = MyManager()
1119 manager.start()
1120
1121 foo = manager.Foo()
1122 bar = manager.Bar()
1123 baz = manager.baz()
1124
1125 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1126 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1127
1128 self.assertEqual(foo_methods, ['f', 'g'])
1129 self.assertEqual(bar_methods, ['f', '_h'])
1130
1131 self.assertEqual(foo.f(), 'f()')
1132 self.assertRaises(ValueError, foo.g)
1133 self.assertEqual(foo._callmethod('f'), 'f()')
1134 self.assertRaises(RemoteError, foo._callmethod, '_h')
1135
1136 self.assertEqual(bar.f(), 'f()')
1137 self.assertEqual(bar._h(), '_h()')
1138 self.assertEqual(bar._callmethod('f'), 'f()')
1139 self.assertEqual(bar._callmethod('_h'), '_h()')
1140
1141 self.assertEqual(list(baz), [i*i for i in range(10)])
1142
1143 manager.shutdown()
1144
1145#
1146# Test of connecting to a remote server and using xmlrpclib for serialization
1147#
1148
1149_queue = Queue.Queue()
1150def get_queue():
1151 return _queue
1152
1153class QueueManager(BaseManager):
1154 '''manager class used by server process'''
1155QueueManager.register('get_queue', callable=get_queue)
1156
1157class QueueManager2(BaseManager):
1158 '''manager class which specifies the same interface as QueueManager'''
1159QueueManager2.register('get_queue')
1160
1161
1162SERIALIZER = 'xmlrpclib'
1163
1164class _TestRemoteManager(BaseTestCase):
1165
1166 ALLOWED_TYPES = ('manager',)
1167
1168 def _putter(self, address, authkey):
1169 manager = QueueManager2(
1170 address=address, authkey=authkey, serializer=SERIALIZER
1171 )
1172 manager.connect()
1173 queue = manager.get_queue()
1174 queue.put(('hello world', None, True, 2.25))
1175
1176 def test_remote(self):
1177 authkey = os.urandom(32)
1178
1179 manager = QueueManager(
1180 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1181 )
1182 manager.start()
1183
1184 p = self.Process(target=self._putter, args=(manager.address, authkey))
1185 p.start()
1186
1187 manager2 = QueueManager2(
1188 address=manager.address, authkey=authkey, serializer=SERIALIZER
1189 )
1190 manager2.connect()
1191 queue = manager2.get_queue()
1192
1193 # Note that xmlrpclib will deserialize object as a list not a tuple
1194 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1195
1196 # Because we are using xmlrpclib for serialization instead of
1197 # pickle this will cause a serialization error.
1198 self.assertRaises(Exception, queue.put, time.sleep)
1199
1200 # Make queue finalizer run before the server is stopped
1201 del queue
1202 manager.shutdown()
1203
Jesse Noller459a6482009-03-30 15:50:42 +00001204class _TestManagerRestart(BaseTestCase):
1205
1206 def _putter(self, address, authkey):
1207 manager = QueueManager(
1208 address=address, authkey=authkey, serializer=SERIALIZER)
1209 manager.connect()
1210 queue = manager.get_queue()
1211 queue.put('hello world')
1212
1213 def test_rapid_restart(self):
1214 authkey = os.urandom(32)
1215 manager = QueueManager(
1216 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1217 manager.start()
1218
1219 p = self.Process(target=self._putter, args=(manager.address, authkey))
1220 p.start()
1221 queue = manager.get_queue()
1222 self.assertEqual(queue.get(), 'hello world')
Jesse Noller019ce772009-03-30 21:53:29 +00001223 del queue
Jesse Noller459a6482009-03-30 15:50:42 +00001224 manager.shutdown()
1225 manager = QueueManager(
1226 address=('localhost', 9999), authkey=authkey, serializer=SERIALIZER)
1227 manager.start()
Jesse Noller019ce772009-03-30 21:53:29 +00001228 manager.shutdown()
Jesse Noller459a6482009-03-30 15:50:42 +00001229
Benjamin Petersondfd79492008-06-13 19:13:39 +00001230#
1231#
1232#
1233
1234SENTINEL = latin('')
1235
1236class _TestConnection(BaseTestCase):
1237
1238 ALLOWED_TYPES = ('processes', 'threads')
1239
1240 def _echo(self, conn):
1241 for msg in iter(conn.recv_bytes, SENTINEL):
1242 conn.send_bytes(msg)
1243 conn.close()
1244
1245 def test_connection(self):
1246 conn, child_conn = self.Pipe()
1247
1248 p = self.Process(target=self._echo, args=(child_conn,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001249 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001250 p.start()
1251
1252 seq = [1, 2.25, None]
1253 msg = latin('hello world')
1254 longmsg = msg * 10
1255 arr = array.array('i', range(4))
1256
1257 if self.TYPE == 'processes':
1258 self.assertEqual(type(conn.fileno()), int)
1259
1260 self.assertEqual(conn.send(seq), None)
1261 self.assertEqual(conn.recv(), seq)
1262
1263 self.assertEqual(conn.send_bytes(msg), None)
1264 self.assertEqual(conn.recv_bytes(), msg)
1265
1266 if self.TYPE == 'processes':
1267 buffer = array.array('i', [0]*10)
1268 expected = list(arr) + [0] * (10 - len(arr))
1269 self.assertEqual(conn.send_bytes(arr), None)
1270 self.assertEqual(conn.recv_bytes_into(buffer),
1271 len(arr) * buffer.itemsize)
1272 self.assertEqual(list(buffer), expected)
1273
1274 buffer = array.array('i', [0]*10)
1275 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1276 self.assertEqual(conn.send_bytes(arr), None)
1277 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1278 len(arr) * buffer.itemsize)
1279 self.assertEqual(list(buffer), expected)
1280
1281 buffer = bytearray(latin(' ' * 40))
1282 self.assertEqual(conn.send_bytes(longmsg), None)
1283 try:
1284 res = conn.recv_bytes_into(buffer)
1285 except multiprocessing.BufferTooShort, e:
1286 self.assertEqual(e.args, (longmsg,))
1287 else:
1288 self.fail('expected BufferTooShort, got %s' % res)
1289
1290 poll = TimingWrapper(conn.poll)
1291
1292 self.assertEqual(poll(), False)
1293 self.assertTimingAlmostEqual(poll.elapsed, 0)
1294
1295 self.assertEqual(poll(TIMEOUT1), False)
1296 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1297
1298 conn.send(None)
1299
1300 self.assertEqual(poll(TIMEOUT1), True)
1301 self.assertTimingAlmostEqual(poll.elapsed, 0)
1302
1303 self.assertEqual(conn.recv(), None)
1304
1305 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1306 conn.send_bytes(really_big_msg)
1307 self.assertEqual(conn.recv_bytes(), really_big_msg)
1308
1309 conn.send_bytes(SENTINEL) # tell child to quit
1310 child_conn.close()
1311
1312 if self.TYPE == 'processes':
1313 self.assertEqual(conn.readable, True)
1314 self.assertEqual(conn.writable, True)
1315 self.assertRaises(EOFError, conn.recv)
1316 self.assertRaises(EOFError, conn.recv_bytes)
1317
1318 p.join()
1319
1320 def test_duplex_false(self):
1321 reader, writer = self.Pipe(duplex=False)
1322 self.assertEqual(writer.send(1), None)
1323 self.assertEqual(reader.recv(), 1)
1324 if self.TYPE == 'processes':
1325 self.assertEqual(reader.readable, True)
1326 self.assertEqual(reader.writable, False)
1327 self.assertEqual(writer.readable, False)
1328 self.assertEqual(writer.writable, True)
1329 self.assertRaises(IOError, reader.send, 2)
1330 self.assertRaises(IOError, writer.recv)
1331 self.assertRaises(IOError, writer.poll)
1332
1333 def test_spawn_close(self):
1334 # We test that a pipe connection can be closed by parent
1335 # process immediately after child is spawned. On Windows this
1336 # would have sometimes failed on old versions because
1337 # child_conn would be closed before the child got a chance to
1338 # duplicate it.
1339 conn, child_conn = self.Pipe()
1340
1341 p = self.Process(target=self._echo, args=(child_conn,))
1342 p.start()
1343 child_conn.close() # this might complete before child initializes
1344
1345 msg = latin('hello')
1346 conn.send_bytes(msg)
1347 self.assertEqual(conn.recv_bytes(), msg)
1348
1349 conn.send_bytes(SENTINEL)
1350 conn.close()
1351 p.join()
1352
1353 def test_sendbytes(self):
1354 if self.TYPE != 'processes':
1355 return
1356
1357 msg = latin('abcdefghijklmnopqrstuvwxyz')
1358 a, b = self.Pipe()
1359
1360 a.send_bytes(msg)
1361 self.assertEqual(b.recv_bytes(), msg)
1362
1363 a.send_bytes(msg, 5)
1364 self.assertEqual(b.recv_bytes(), msg[5:])
1365
1366 a.send_bytes(msg, 7, 8)
1367 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1368
1369 a.send_bytes(msg, 26)
1370 self.assertEqual(b.recv_bytes(), latin(''))
1371
1372 a.send_bytes(msg, 26, 0)
1373 self.assertEqual(b.recv_bytes(), latin(''))
1374
1375 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1376
1377 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1378
1379 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1380
1381 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1382
1383 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1384
Benjamin Petersondfd79492008-06-13 19:13:39 +00001385class _TestListenerClient(BaseTestCase):
1386
1387 ALLOWED_TYPES = ('processes', 'threads')
1388
1389 def _test(self, address):
1390 conn = self.connection.Client(address)
1391 conn.send('hello')
1392 conn.close()
1393
1394 def test_listener_client(self):
1395 for family in self.connection.families:
1396 l = self.connection.Listener(family=family)
1397 p = self.Process(target=self._test, args=(l.address,))
Jesse Noller5bc9f4c2008-08-19 19:06:19 +00001398 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +00001399 p.start()
1400 conn = l.accept()
1401 self.assertEqual(conn.recv(), 'hello')
1402 p.join()
1403 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001404#
1405# Test of sending connection and socket objects between processes
1406#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001407"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001408class _TestPicklingConnections(BaseTestCase):
1409
1410 ALLOWED_TYPES = ('processes',)
1411
1412 def _listener(self, conn, families):
1413 for fam in families:
1414 l = self.connection.Listener(family=fam)
1415 conn.send(l.address)
1416 new_conn = l.accept()
1417 conn.send(new_conn)
1418
1419 if self.TYPE == 'processes':
1420 l = socket.socket()
1421 l.bind(('localhost', 0))
1422 conn.send(l.getsockname())
1423 l.listen(1)
1424 new_conn, addr = l.accept()
1425 conn.send(new_conn)
1426
1427 conn.recv()
1428
1429 def _remote(self, conn):
1430 for (address, msg) in iter(conn.recv, None):
1431 client = self.connection.Client(address)
1432 client.send(msg.upper())
1433 client.close()
1434
1435 if self.TYPE == 'processes':
1436 address, msg = conn.recv()
1437 client = socket.socket()
1438 client.connect(address)
1439 client.sendall(msg.upper())
1440 client.close()
1441
1442 conn.close()
1443
1444 def test_pickling(self):
1445 try:
1446 multiprocessing.allow_connection_pickling()
1447 except ImportError:
1448 return
1449
1450 families = self.connection.families
1451
1452 lconn, lconn0 = self.Pipe()
1453 lp = self.Process(target=self._listener, args=(lconn0, families))
1454 lp.start()
1455 lconn0.close()
1456
1457 rconn, rconn0 = self.Pipe()
1458 rp = self.Process(target=self._remote, args=(rconn0,))
1459 rp.start()
1460 rconn0.close()
1461
1462 for fam in families:
1463 msg = ('This connection uses family %s' % fam).encode('ascii')
1464 address = lconn.recv()
1465 rconn.send((address, msg))
1466 new_conn = lconn.recv()
1467 self.assertEqual(new_conn.recv(), msg.upper())
1468
1469 rconn.send(None)
1470
1471 if self.TYPE == 'processes':
1472 msg = latin('This connection uses a normal socket')
1473 address = lconn.recv()
1474 rconn.send((address, msg))
1475 if hasattr(socket, 'fromfd'):
1476 new_conn = lconn.recv()
1477 self.assertEqual(new_conn.recv(100), msg.upper())
1478 else:
1479 # XXX On Windows with Py2.6 need to backport fromfd()
1480 discard = lconn.recv_bytes()
1481
1482 lconn.send(None)
1483
1484 rconn.close()
1485 lconn.close()
1486
1487 lp.join()
1488 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001489"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001490#
1491#
1492#
1493
1494class _TestHeap(BaseTestCase):
1495
1496 ALLOWED_TYPES = ('processes',)
1497
1498 def test_heap(self):
1499 iterations = 5000
1500 maxblocks = 50
1501 blocks = []
1502
1503 # create and destroy lots of blocks of different sizes
1504 for i in xrange(iterations):
1505 size = int(random.lognormvariate(0, 1) * 1000)
1506 b = multiprocessing.heap.BufferWrapper(size)
1507 blocks.append(b)
1508 if len(blocks) > maxblocks:
1509 i = random.randrange(maxblocks)
1510 del blocks[i]
1511
1512 # get the heap object
1513 heap = multiprocessing.heap.BufferWrapper._heap
1514
1515 # verify the state of the heap
1516 all = []
1517 occupied = 0
1518 for L in heap._len_to_seq.values():
1519 for arena, start, stop in L:
1520 all.append((heap._arenas.index(arena), start, stop,
1521 stop-start, 'free'))
1522 for arena, start, stop in heap._allocated_blocks:
1523 all.append((heap._arenas.index(arena), start, stop,
1524 stop-start, 'occupied'))
1525 occupied += (stop-start)
1526
1527 all.sort()
1528
1529 for i in range(len(all)-1):
1530 (arena, start, stop) = all[i][:3]
1531 (narena, nstart, nstop) = all[i+1][:3]
1532 self.assertTrue((arena != narena and nstart == 0) or
1533 (stop == nstart))
1534
1535#
1536#
1537#
1538
1539try:
1540 from ctypes import Structure, Value, copy, c_int, c_double
1541except ImportError:
1542 Structure = object
1543 c_int = c_double = None
1544
1545class _Foo(Structure):
1546 _fields_ = [
1547 ('x', c_int),
1548 ('y', c_double)
1549 ]
1550
1551class _TestSharedCTypes(BaseTestCase):
1552
1553 ALLOWED_TYPES = ('processes',)
1554
1555 def _double(self, x, y, foo, arr, string):
1556 x.value *= 2
1557 y.value *= 2
1558 foo.x *= 2
1559 foo.y *= 2
1560 string.value *= 2
1561 for i in range(len(arr)):
1562 arr[i] *= 2
1563
1564 def test_sharedctypes(self, lock=False):
1565 if c_int is None:
1566 return
1567
1568 x = Value('i', 7, lock=lock)
1569 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1570 foo = Value(_Foo, 3, 2, lock=lock)
1571 arr = Array('d', range(10), lock=lock)
1572 string = Array('c', 20, lock=lock)
1573 string.value = 'hello'
1574
1575 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1576 p.start()
1577 p.join()
1578
1579 self.assertEqual(x.value, 14)
1580 self.assertAlmostEqual(y.value, 2.0/3.0)
1581 self.assertEqual(foo.x, 6)
1582 self.assertAlmostEqual(foo.y, 4.0)
1583 for i in range(10):
1584 self.assertAlmostEqual(arr[i], i*2)
1585 self.assertEqual(string.value, latin('hellohello'))
1586
1587 def test_synchronize(self):
1588 self.test_sharedctypes(lock=True)
1589
1590 def test_copy(self):
1591 if c_int is None:
1592 return
1593
1594 foo = _Foo(2, 5.0)
1595 bar = copy(foo)
1596 foo.x = 0
1597 foo.y = 0
1598 self.assertEqual(bar.x, 2)
1599 self.assertAlmostEqual(bar.y, 5.0)
1600
1601#
1602#
1603#
1604
1605class _TestFinalize(BaseTestCase):
1606
1607 ALLOWED_TYPES = ('processes',)
1608
1609 def _test_finalize(self, conn):
1610 class Foo(object):
1611 pass
1612
1613 a = Foo()
1614 util.Finalize(a, conn.send, args=('a',))
1615 del a # triggers callback for a
1616
1617 b = Foo()
1618 close_b = util.Finalize(b, conn.send, args=('b',))
1619 close_b() # triggers callback for b
1620 close_b() # does nothing because callback has already been called
1621 del b # does nothing because callback has already been called
1622
1623 c = Foo()
1624 util.Finalize(c, conn.send, args=('c',))
1625
1626 d10 = Foo()
1627 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1628
1629 d01 = Foo()
1630 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1631 d02 = Foo()
1632 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1633 d03 = Foo()
1634 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1635
1636 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1637
1638 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1639
1640 # call mutliprocessing's cleanup function then exit process without
1641 # garbage collecting locals
1642 util._exit_function()
1643 conn.close()
1644 os._exit(0)
1645
1646 def test_finalize(self):
1647 conn, child_conn = self.Pipe()
1648
1649 p = self.Process(target=self._test_finalize, args=(child_conn,))
1650 p.start()
1651 p.join()
1652
1653 result = [obj for obj in iter(conn.recv, 'STOP')]
1654 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1655
1656#
1657# Test that from ... import * works for each module
1658#
1659
1660class _TestImportStar(BaseTestCase):
1661
1662 ALLOWED_TYPES = ('processes',)
1663
1664 def test_import(self):
1665 modules = (
1666 'multiprocessing', 'multiprocessing.connection',
1667 'multiprocessing.heap', 'multiprocessing.managers',
1668 'multiprocessing.pool', 'multiprocessing.process',
1669 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1670 'multiprocessing.synchronize', 'multiprocessing.util'
1671 )
1672
1673 for name in modules:
1674 __import__(name)
1675 mod = sys.modules[name]
1676
1677 for attr in getattr(mod, '__all__', ()):
1678 self.assertTrue(
1679 hasattr(mod, attr),
1680 '%r does not have attribute %r' % (mod, attr)
1681 )
1682
1683#
1684# Quick test that logging works -- does not test logging output
1685#
1686
1687class _TestLogging(BaseTestCase):
1688
1689 ALLOWED_TYPES = ('processes',)
1690
1691 def test_enable_logging(self):
1692 logger = multiprocessing.get_logger()
1693 logger.setLevel(util.SUBWARNING)
1694 self.assertTrue(logger is not None)
1695 logger.debug('this will not be printed')
1696 logger.info('nor will this')
1697 logger.setLevel(LOG_LEVEL)
1698
1699 def _test_level(self, conn):
1700 logger = multiprocessing.get_logger()
1701 conn.send(logger.getEffectiveLevel())
1702
1703 def test_level(self):
1704 LEVEL1 = 32
1705 LEVEL2 = 37
1706
1707 logger = multiprocessing.get_logger()
1708 root_logger = logging.getLogger()
1709 root_level = root_logger.level
1710
1711 reader, writer = multiprocessing.Pipe(duplex=False)
1712
1713 logger.setLevel(LEVEL1)
1714 self.Process(target=self._test_level, args=(writer,)).start()
1715 self.assertEqual(LEVEL1, reader.recv())
1716
1717 logger.setLevel(logging.NOTSET)
1718 root_logger.setLevel(LEVEL2)
1719 self.Process(target=self._test_level, args=(writer,)).start()
1720 self.assertEqual(LEVEL2, reader.recv())
1721
1722 root_logger.setLevel(root_level)
1723 logger.setLevel(level=LOG_LEVEL)
1724
1725#
Jesse Noller9a5b2ad2009-01-19 15:12:22 +00001726# Test to verify handle verification, see issue 3321
1727#
1728
1729class TestInvalidHandle(unittest.TestCase):
1730
1731 def test_invalid_handles(self):
1732 if WIN32:
1733 return
1734 conn = _multiprocessing.Connection(44977608)
1735 self.assertRaises(IOError, conn.poll)
1736 self.assertRaises(IOError, _multiprocessing.Connection, -1)
1737#
Benjamin Petersondfd79492008-06-13 19:13:39 +00001738# Functions used to create test cases from the base ones in this module
1739#
1740
1741def get_attributes(Source, names):
1742 d = {}
1743 for name in names:
1744 obj = getattr(Source, name)
1745 if type(obj) == type(get_attributes):
1746 obj = staticmethod(obj)
1747 d[name] = obj
1748 return d
1749
1750def create_test_cases(Mixin, type):
1751 result = {}
1752 glob = globals()
1753 Type = type[0].upper() + type[1:]
1754
1755 for name in glob.keys():
1756 if name.startswith('_Test'):
1757 base = glob[name]
1758 if type in base.ALLOWED_TYPES:
1759 newname = 'With' + Type + name[1:]
1760 class Temp(base, unittest.TestCase, Mixin):
1761 pass
1762 result[newname] = Temp
1763 Temp.__name__ = newname
1764 Temp.__module__ = Mixin.__module__
1765 return result
1766
1767#
1768# Create test cases
1769#
1770
1771class ProcessesMixin(object):
1772 TYPE = 'processes'
1773 Process = multiprocessing.Process
1774 locals().update(get_attributes(multiprocessing, (
1775 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1776 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1777 'RawArray', 'current_process', 'active_children', 'Pipe',
1778 'connection', 'JoinableQueue'
1779 )))
1780
1781testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1782globals().update(testcases_processes)
1783
1784
1785class ManagerMixin(object):
1786 TYPE = 'manager'
1787 Process = multiprocessing.Process
1788 manager = object.__new__(multiprocessing.managers.SyncManager)
1789 locals().update(get_attributes(manager, (
1790 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1791 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1792 'Namespace', 'JoinableQueue'
1793 )))
1794
1795testcases_manager = create_test_cases(ManagerMixin, type='manager')
1796globals().update(testcases_manager)
1797
1798
1799class ThreadsMixin(object):
1800 TYPE = 'threads'
1801 Process = multiprocessing.dummy.Process
1802 locals().update(get_attributes(multiprocessing.dummy, (
1803 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1804 'Condition', 'Event', 'Value', 'Array', 'current_process',
1805 'active_children', 'Pipe', 'connection', 'dict', 'list',
1806 'Namespace', 'JoinableQueue'
1807 )))
1808
1809testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1810globals().update(testcases_threads)
1811
Neal Norwitz0c519b32008-08-25 01:50:24 +00001812class OtherTest(unittest.TestCase):
1813 # TODO: add more tests for deliver/answer challenge.
1814 def test_deliver_challenge_auth_failure(self):
1815 class _FakeConnection(object):
1816 def recv_bytes(self, size):
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001817 return b'something bogus'
Neal Norwitz0c519b32008-08-25 01:50:24 +00001818 def send_bytes(self, data):
1819 pass
1820 self.assertRaises(multiprocessing.AuthenticationError,
1821 multiprocessing.connection.deliver_challenge,
1822 _FakeConnection(), b'abc')
1823
1824 def test_answer_challenge_auth_failure(self):
1825 class _FakeConnection(object):
1826 def __init__(self):
1827 self.count = 0
1828 def recv_bytes(self, size):
1829 self.count += 1
1830 if self.count == 1:
1831 return multiprocessing.connection.CHALLENGE
1832 elif self.count == 2:
Neal Norwitz2a7767a2008-08-25 03:03:25 +00001833 return b'something bogus'
1834 return b''
Neal Norwitz0c519b32008-08-25 01:50:24 +00001835 def send_bytes(self, data):
1836 pass
1837 self.assertRaises(multiprocessing.AuthenticationError,
1838 multiprocessing.connection.answer_challenge,
1839 _FakeConnection(), b'abc')
1840
Jesse Noller7152f6d2009-04-02 05:17:26 +00001841#
1842# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
1843#
1844
1845def initializer(ns):
1846 ns.test += 1
1847
1848class TestInitializers(unittest.TestCase):
1849 def setUp(self):
1850 self.mgr = multiprocessing.Manager()
1851 self.ns = self.mgr.Namespace()
1852 self.ns.test = 0
1853
1854 def tearDown(self):
1855 self.mgr.shutdown()
1856
1857 def test_manager_initializer(self):
1858 m = multiprocessing.managers.SyncManager()
1859 self.assertRaises(TypeError, m.start, 1)
1860 m.start(initializer, (self.ns,))
1861 self.assertEqual(self.ns.test, 1)
1862 m.shutdown()
1863
1864 def test_pool_initializer(self):
1865 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
1866 p = multiprocessing.Pool(1, initializer, (self.ns,))
1867 p.close()
1868 p.join()
1869 self.assertEqual(self.ns.test, 1)
1870
Jesse Noller1b90efb2009-06-30 17:11:52 +00001871#
1872# Issue 5155, 5313, 5331: Test process in processes
1873# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
1874#
1875
1876def _ThisSubProcess(q):
1877 try:
1878 item = q.get(block=False)
1879 except Queue.Empty:
1880 pass
1881
1882def _TestProcess(q):
1883 queue = multiprocessing.Queue()
1884 subProc = multiprocessing.Process(target=_ThisSubProcess, args=(queue,))
1885 subProc.start()
1886 subProc.join()
1887
1888def _afunc(x):
1889 return x*x
1890
1891def pool_in_process():
1892 pool = multiprocessing.Pool(processes=4)
1893 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
1894
1895class _file_like(object):
1896 def __init__(self, delegate):
1897 self._delegate = delegate
1898 self._pid = None
1899
1900 @property
1901 def cache(self):
1902 pid = os.getpid()
1903 # There are no race conditions since fork keeps only the running thread
1904 if pid != self._pid:
1905 self._pid = pid
1906 self._cache = []
1907 return self._cache
1908
1909 def write(self, data):
1910 self.cache.append(data)
1911
1912 def flush(self):
1913 self._delegate.write(''.join(self.cache))
1914 self._cache = []
1915
1916class TestStdinBadfiledescriptor(unittest.TestCase):
1917
1918 def test_queue_in_process(self):
1919 queue = multiprocessing.Queue()
1920 proc = multiprocessing.Process(target=_TestProcess, args=(queue,))
1921 proc.start()
1922 proc.join()
1923
1924 def test_pool_in_process(self):
1925 p = multiprocessing.Process(target=pool_in_process)
1926 p.start()
1927 p.join()
1928
1929 def test_flushing(self):
1930 sio = StringIO()
1931 flike = _file_like(sio)
1932 flike.write('foo')
1933 proc = multiprocessing.Process(target=lambda: flike.flush())
1934 flike.flush()
1935 assert sio.getvalue() == 'foo'
1936
1937testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
1938 TestStdinBadfiledescriptor]
Neal Norwitz0c519b32008-08-25 01:50:24 +00001939
Benjamin Petersondfd79492008-06-13 19:13:39 +00001940#
1941#
1942#
1943
1944def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001945 if sys.platform.startswith("linux"):
1946 try:
1947 lock = multiprocessing.RLock()
1948 except OSError:
Benjamin Petersonbec087f2009-03-26 21:10:30 +00001949 raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001950
Benjamin Petersondfd79492008-06-13 19:13:39 +00001951 if run is None:
1952 from test.test_support import run_unittest as run
1953
1954 util.get_temp_dir() # creates temp directory for use by all processes
1955
1956 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1957
Jesse Noller146b7ab2008-07-02 16:44:09 +00001958 ProcessesMixin.pool = multiprocessing.Pool(4)
1959 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1960 ManagerMixin.manager.__init__()
1961 ManagerMixin.manager.start()
1962 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001963
1964 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001965 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1966 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
Neal Norwitz0c519b32008-08-25 01:50:24 +00001967 sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
1968 testcases_other
Benjamin Petersondfd79492008-06-13 19:13:39 +00001969 )
1970
1971 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1972 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1973 run(suite)
1974
Jesse Noller146b7ab2008-07-02 16:44:09 +00001975 ThreadsMixin.pool.terminate()
1976 ProcessesMixin.pool.terminate()
1977 ManagerMixin.pool.terminate()
1978 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001979
Jesse Noller146b7ab2008-07-02 16:44:09 +00001980 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001981
1982def main():
1983 test_main(unittest.TextTestRunner(verbosity=2).run)
1984
1985if __name__ == '__main__':
1986 main()