blob: a887bca81ba9f7a08b2867671a4c66191dd16aa3 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
21import multiprocessing.dummy
22import multiprocessing.connection
23import multiprocessing.managers
24import multiprocessing.heap
25import multiprocessing.managers
26import multiprocessing.pool
27import _multiprocessing
28
29from multiprocessing import util
30
31#
32#
33#
34
35if sys.version_info >= (3, 0):
36 def latin(s):
37 return s.encode('latin')
38else:
39 latin = str
40
Benjamin Petersondfd79492008-06-13 19:13:39 +000041#
42# Constants
43#
44
45LOG_LEVEL = util.SUBWARNING
46#LOG_LEVEL = logging.WARNING
47
48DELTA = 0.1
49CHECK_TIMINGS = False # making true makes tests take a lot longer
50 # and can sometimes cause some non-serious
51 # failures because some calls block a bit
52 # longer than expected
53if CHECK_TIMINGS:
54 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
55else:
56 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
57
58HAVE_GETVALUE = not getattr(_multiprocessing,
59 'HAVE_BROKEN_SEM_GETVALUE', False)
60
61#
62# Creates a wrapper for a function which records the time it takes to finish
63#
64
65class TimingWrapper(object):
66
67 def __init__(self, func):
68 self.func = func
69 self.elapsed = None
70
71 def __call__(self, *args, **kwds):
72 t = time.time()
73 try:
74 return self.func(*args, **kwds)
75 finally:
76 self.elapsed = time.time() - t
77
78#
79# Base class for test cases
80#
81
82class BaseTestCase(object):
83
84 ALLOWED_TYPES = ('processes', 'manager', 'threads')
85
86 def assertTimingAlmostEqual(self, a, b):
87 if CHECK_TIMINGS:
88 self.assertAlmostEqual(a, b, 1)
89
90 def assertReturnsIfImplemented(self, value, func, *args):
91 try:
92 res = func(*args)
93 except NotImplementedError:
94 pass
95 else:
96 return self.assertEqual(value, res)
97
98#
99# Return the value of a semaphore
100#
101
102def get_value(self):
103 try:
104 return self.get_value()
105 except AttributeError:
106 try:
107 return self._Semaphore__value
108 except AttributeError:
109 try:
110 return self._value
111 except AttributeError:
112 raise NotImplementedError
113
114#
115# Testcases
116#
117
118class _TestProcess(BaseTestCase):
119
120 ALLOWED_TYPES = ('processes', 'threads')
121
122 def test_current(self):
123 if self.TYPE == 'threads':
124 return
125
126 current = self.current_process()
127 authkey = current.get_authkey()
128
129 self.assertTrue(current.is_alive())
130 self.assertTrue(not current.is_daemon())
131 self.assertTrue(isinstance(authkey, bytes))
132 self.assertTrue(len(authkey) > 0)
133 self.assertEqual(current.get_ident(), os.getpid())
134 self.assertEqual(current.get_exitcode(), None)
135
136 def _test(self, q, *args, **kwds):
137 current = self.current_process()
138 q.put(args)
139 q.put(kwds)
140 q.put(current.get_name())
141 if self.TYPE != 'threads':
142 q.put(bytes(current.get_authkey()))
143 q.put(current.pid)
144
145 def test_process(self):
146 q = self.Queue(1)
147 e = self.Event()
148 args = (q, 1, 2)
149 kwargs = {'hello':23, 'bye':2.54}
150 name = 'SomeProcess'
151 p = self.Process(
152 target=self._test, args=args, kwargs=kwargs, name=name
153 )
154 p.set_daemon(True)
155 current = self.current_process()
156
157 if self.TYPE != 'threads':
158 self.assertEquals(p.get_authkey(), current.get_authkey())
159 self.assertEquals(p.is_alive(), False)
160 self.assertEquals(p.is_daemon(), True)
161 self.assertTrue(p not in self.active_children())
162 self.assertTrue(type(self.active_children()) is list)
163 self.assertEqual(p.get_exitcode(), None)
164
165 p.start()
166
167 self.assertEquals(p.get_exitcode(), None)
168 self.assertEquals(p.is_alive(), True)
169 self.assertTrue(p in self.active_children())
170
171 self.assertEquals(q.get(), args[1:])
172 self.assertEquals(q.get(), kwargs)
173 self.assertEquals(q.get(), p.get_name())
174 if self.TYPE != 'threads':
175 self.assertEquals(q.get(), current.get_authkey())
176 self.assertEquals(q.get(), p.pid)
177
178 p.join()
179
180 self.assertEquals(p.get_exitcode(), 0)
181 self.assertEquals(p.is_alive(), False)
182 self.assertTrue(p not in self.active_children())
183
184 def _test_terminate(self):
185 time.sleep(1000)
186
187 def test_terminate(self):
188 if self.TYPE == 'threads':
189 return
190
191 p = self.Process(target=self._test_terminate)
192 p.set_daemon(True)
193 p.start()
194
195 self.assertEqual(p.is_alive(), True)
196 self.assertTrue(p in self.active_children())
197 self.assertEqual(p.get_exitcode(), None)
198
199 p.terminate()
200
201 join = TimingWrapper(p.join)
202 self.assertEqual(join(), None)
203 self.assertTimingAlmostEqual(join.elapsed, 0.0)
204
205 self.assertEqual(p.is_alive(), False)
206 self.assertTrue(p not in self.active_children())
207
208 p.join()
209
210 # XXX sometimes get p.get_exitcode() == 0 on Windows ...
211 #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
212
213 def test_cpu_count(self):
214 try:
215 cpus = multiprocessing.cpu_count()
216 except NotImplementedError:
217 cpus = 1
218 self.assertTrue(type(cpus) is int)
219 self.assertTrue(cpus >= 1)
220
221 def test_active_children(self):
222 self.assertEqual(type(self.active_children()), list)
223
224 p = self.Process(target=time.sleep, args=(DELTA,))
225 self.assertTrue(p not in self.active_children())
226
227 p.start()
228 self.assertTrue(p in self.active_children())
229
230 p.join()
231 self.assertTrue(p not in self.active_children())
232
233 def _test_recursion(self, wconn, id):
234 from multiprocessing import forking
235 wconn.send(id)
236 if len(id) < 2:
237 for i in range(2):
238 p = self.Process(
239 target=self._test_recursion, args=(wconn, id+[i])
240 )
241 p.start()
242 p.join()
243
244 def test_recursion(self):
245 rconn, wconn = self.Pipe(duplex=False)
246 self._test_recursion(wconn, [])
247
248 time.sleep(DELTA)
249 result = []
250 while rconn.poll():
251 result.append(rconn.recv())
252
253 expected = [
254 [],
255 [0],
256 [0, 0],
257 [0, 1],
258 [1],
259 [1, 0],
260 [1, 1]
261 ]
262 self.assertEqual(result, expected)
263
264#
265#
266#
267
268class _UpperCaser(multiprocessing.Process):
269
270 def __init__(self):
271 multiprocessing.Process.__init__(self)
272 self.child_conn, self.parent_conn = multiprocessing.Pipe()
273
274 def run(self):
275 self.parent_conn.close()
276 for s in iter(self.child_conn.recv, None):
277 self.child_conn.send(s.upper())
278 self.child_conn.close()
279
280 def submit(self, s):
281 assert type(s) is str
282 self.parent_conn.send(s)
283 return self.parent_conn.recv()
284
285 def stop(self):
286 self.parent_conn.send(None)
287 self.parent_conn.close()
288 self.child_conn.close()
289
290class _TestSubclassingProcess(BaseTestCase):
291
292 ALLOWED_TYPES = ('processes',)
293
294 def test_subclassing(self):
295 uppercaser = _UpperCaser()
296 uppercaser.start()
297 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
298 self.assertEqual(uppercaser.submit('world'), 'WORLD')
299 uppercaser.stop()
300 uppercaser.join()
301
302#
303#
304#
305
306def queue_empty(q):
307 if hasattr(q, 'empty'):
308 return q.empty()
309 else:
310 return q.qsize() == 0
311
312def queue_full(q, maxsize):
313 if hasattr(q, 'full'):
314 return q.full()
315 else:
316 return q.qsize() == maxsize
317
318
319class _TestQueue(BaseTestCase):
320
321
322 def _test_put(self, queue, child_can_start, parent_can_continue):
323 child_can_start.wait()
324 for i in range(6):
325 queue.get()
326 parent_can_continue.set()
327
328 def test_put(self):
329 MAXSIZE = 6
330 queue = self.Queue(maxsize=MAXSIZE)
331 child_can_start = self.Event()
332 parent_can_continue = self.Event()
333
334 proc = self.Process(
335 target=self._test_put,
336 args=(queue, child_can_start, parent_can_continue)
337 )
338 proc.set_daemon(True)
339 proc.start()
340
341 self.assertEqual(queue_empty(queue), True)
342 self.assertEqual(queue_full(queue, MAXSIZE), False)
343
344 queue.put(1)
345 queue.put(2, True)
346 queue.put(3, True, None)
347 queue.put(4, False)
348 queue.put(5, False, None)
349 queue.put_nowait(6)
350
351 # the values may be in buffer but not yet in pipe so sleep a bit
352 time.sleep(DELTA)
353
354 self.assertEqual(queue_empty(queue), False)
355 self.assertEqual(queue_full(queue, MAXSIZE), True)
356
357 put = TimingWrapper(queue.put)
358 put_nowait = TimingWrapper(queue.put_nowait)
359
360 self.assertRaises(Queue.Full, put, 7, False)
361 self.assertTimingAlmostEqual(put.elapsed, 0)
362
363 self.assertRaises(Queue.Full, put, 7, False, None)
364 self.assertTimingAlmostEqual(put.elapsed, 0)
365
366 self.assertRaises(Queue.Full, put_nowait, 7)
367 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
368
369 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
370 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
371
372 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
373 self.assertTimingAlmostEqual(put.elapsed, 0)
374
375 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
376 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
377
378 child_can_start.set()
379 parent_can_continue.wait()
380
381 self.assertEqual(queue_empty(queue), True)
382 self.assertEqual(queue_full(queue, MAXSIZE), False)
383
384 proc.join()
385
386 def _test_get(self, queue, child_can_start, parent_can_continue):
387 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000388 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000389 queue.put(2)
390 queue.put(3)
391 queue.put(4)
392 queue.put(5)
393 parent_can_continue.set()
394
395 def test_get(self):
396 queue = self.Queue()
397 child_can_start = self.Event()
398 parent_can_continue = self.Event()
399
400 proc = self.Process(
401 target=self._test_get,
402 args=(queue, child_can_start, parent_can_continue)
403 )
404 proc.set_daemon(True)
405 proc.start()
406
407 self.assertEqual(queue_empty(queue), True)
408
409 child_can_start.set()
410 parent_can_continue.wait()
411
412 time.sleep(DELTA)
413 self.assertEqual(queue_empty(queue), False)
414
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000415 # Hangs unexpectedly, remove for now
416 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000417 self.assertEqual(queue.get(True, None), 2)
418 self.assertEqual(queue.get(True), 3)
419 self.assertEqual(queue.get(timeout=1), 4)
420 self.assertEqual(queue.get_nowait(), 5)
421
422 self.assertEqual(queue_empty(queue), True)
423
424 get = TimingWrapper(queue.get)
425 get_nowait = TimingWrapper(queue.get_nowait)
426
427 self.assertRaises(Queue.Empty, get, False)
428 self.assertTimingAlmostEqual(get.elapsed, 0)
429
430 self.assertRaises(Queue.Empty, get, False, None)
431 self.assertTimingAlmostEqual(get.elapsed, 0)
432
433 self.assertRaises(Queue.Empty, get_nowait)
434 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
435
436 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
437 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
438
439 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
440 self.assertTimingAlmostEqual(get.elapsed, 0)
441
442 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
443 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
444
445 proc.join()
446
447 def _test_fork(self, queue):
448 for i in range(10, 20):
449 queue.put(i)
450 # note that at this point the items may only be buffered, so the
451 # process cannot shutdown until the feeder thread has finished
452 # pushing items onto the pipe.
453
454 def test_fork(self):
455 # Old versions of Queue would fail to create a new feeder
456 # thread for a forked process if the original process had its
457 # own feeder thread. This test checks that this no longer
458 # happens.
459
460 queue = self.Queue()
461
462 # put items on queue so that main process starts a feeder thread
463 for i in range(10):
464 queue.put(i)
465
466 # wait to make sure thread starts before we fork a new process
467 time.sleep(DELTA)
468
469 # fork process
470 p = self.Process(target=self._test_fork, args=(queue,))
471 p.start()
472
473 # check that all expected items are in the queue
474 for i in range(20):
475 self.assertEqual(queue.get(), i)
476 self.assertRaises(Queue.Empty, queue.get, False)
477
478 p.join()
479
480 def test_qsize(self):
481 q = self.Queue()
482 try:
483 self.assertEqual(q.qsize(), 0)
484 except NotImplementedError:
485 return
486 q.put(1)
487 self.assertEqual(q.qsize(), 1)
488 q.put(5)
489 self.assertEqual(q.qsize(), 2)
490 q.get()
491 self.assertEqual(q.qsize(), 1)
492 q.get()
493 self.assertEqual(q.qsize(), 0)
494
495 def _test_task_done(self, q):
496 for obj in iter(q.get, None):
497 time.sleep(DELTA)
498 q.task_done()
499
500 def test_task_done(self):
501 queue = self.JoinableQueue()
502
503 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
504 return
505
506 workers = [self.Process(target=self._test_task_done, args=(queue,))
507 for i in xrange(4)]
508
509 for p in workers:
510 p.start()
511
512 for i in xrange(10):
513 queue.put(i)
514
515 queue.join()
516
517 for p in workers:
518 queue.put(None)
519
520 for p in workers:
521 p.join()
522
523#
524#
525#
526
527class _TestLock(BaseTestCase):
528
529 def test_lock(self):
530 lock = self.Lock()
531 self.assertEqual(lock.acquire(), True)
532 self.assertEqual(lock.acquire(False), False)
533 self.assertEqual(lock.release(), None)
534 self.assertRaises((ValueError, threading.ThreadError), lock.release)
535
536 def test_rlock(self):
537 lock = self.RLock()
538 self.assertEqual(lock.acquire(), True)
539 self.assertEqual(lock.acquire(), True)
540 self.assertEqual(lock.acquire(), True)
541 self.assertEqual(lock.release(), None)
542 self.assertEqual(lock.release(), None)
543 self.assertEqual(lock.release(), None)
544 self.assertRaises((AssertionError, RuntimeError), lock.release)
545
546
547class _TestSemaphore(BaseTestCase):
548
549 def _test_semaphore(self, sem):
550 self.assertReturnsIfImplemented(2, get_value, sem)
551 self.assertEqual(sem.acquire(), True)
552 self.assertReturnsIfImplemented(1, get_value, sem)
553 self.assertEqual(sem.acquire(), True)
554 self.assertReturnsIfImplemented(0, get_value, sem)
555 self.assertEqual(sem.acquire(False), False)
556 self.assertReturnsIfImplemented(0, get_value, sem)
557 self.assertEqual(sem.release(), None)
558 self.assertReturnsIfImplemented(1, get_value, sem)
559 self.assertEqual(sem.release(), None)
560 self.assertReturnsIfImplemented(2, get_value, sem)
561
562 def test_semaphore(self):
563 sem = self.Semaphore(2)
564 self._test_semaphore(sem)
565 self.assertEqual(sem.release(), None)
566 self.assertReturnsIfImplemented(3, get_value, sem)
567 self.assertEqual(sem.release(), None)
568 self.assertReturnsIfImplemented(4, get_value, sem)
569
570 def test_bounded_semaphore(self):
571 sem = self.BoundedSemaphore(2)
572 self._test_semaphore(sem)
573 # Currently fails on OS/X
574 #if HAVE_GETVALUE:
575 # self.assertRaises(ValueError, sem.release)
576 # self.assertReturnsIfImplemented(2, get_value, sem)
577
578 def test_timeout(self):
579 if self.TYPE != 'processes':
580 return
581
582 sem = self.Semaphore(0)
583 acquire = TimingWrapper(sem.acquire)
584
585 self.assertEqual(acquire(False), False)
586 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
587
588 self.assertEqual(acquire(False, None), False)
589 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
590
591 self.assertEqual(acquire(False, TIMEOUT1), False)
592 self.assertTimingAlmostEqual(acquire.elapsed, 0)
593
594 self.assertEqual(acquire(True, TIMEOUT2), False)
595 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
596
597 self.assertEqual(acquire(timeout=TIMEOUT3), False)
598 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
599
600
601class _TestCondition(BaseTestCase):
602
603 def f(self, cond, sleeping, woken, timeout=None):
604 cond.acquire()
605 sleeping.release()
606 cond.wait(timeout)
607 woken.release()
608 cond.release()
609
610 def check_invariant(self, cond):
611 # this is only supposed to succeed when there are no sleepers
612 if self.TYPE == 'processes':
613 try:
614 sleepers = (cond._sleeping_count.get_value() -
615 cond._woken_count.get_value())
616 self.assertEqual(sleepers, 0)
617 self.assertEqual(cond._wait_semaphore.get_value(), 0)
618 except NotImplementedError:
619 pass
620
621 def test_notify(self):
622 cond = self.Condition()
623 sleeping = self.Semaphore(0)
624 woken = self.Semaphore(0)
625
626 p = self.Process(target=self.f, args=(cond, sleeping, woken))
627 p.set_daemon(True)
628 p.start()
629
630 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
631 p.set_daemon(True)
632 p.start()
633
634 # wait for both children to start sleeping
635 sleeping.acquire()
636 sleeping.acquire()
637
638 # check no process/thread has woken up
639 time.sleep(DELTA)
640 self.assertReturnsIfImplemented(0, get_value, woken)
641
642 # wake up one process/thread
643 cond.acquire()
644 cond.notify()
645 cond.release()
646
647 # check one process/thread has woken up
648 time.sleep(DELTA)
649 self.assertReturnsIfImplemented(1, get_value, woken)
650
651 # wake up another
652 cond.acquire()
653 cond.notify()
654 cond.release()
655
656 # check other has woken up
657 time.sleep(DELTA)
658 self.assertReturnsIfImplemented(2, get_value, woken)
659
660 # check state is not mucked up
661 self.check_invariant(cond)
662 p.join()
663
664 def test_notify_all(self):
665 cond = self.Condition()
666 sleeping = self.Semaphore(0)
667 woken = self.Semaphore(0)
668
669 # start some threads/processes which will timeout
670 for i in range(3):
671 p = self.Process(target=self.f,
672 args=(cond, sleeping, woken, TIMEOUT1))
673 p.set_daemon(True)
674 p.start()
675
676 t = threading.Thread(target=self.f,
677 args=(cond, sleeping, woken, TIMEOUT1))
678 t.set_daemon(True)
679 t.start()
680
681 # wait for them all to sleep
682 for i in xrange(6):
683 sleeping.acquire()
684
685 # check they have all timed out
686 for i in xrange(6):
687 woken.acquire()
688 self.assertReturnsIfImplemented(0, get_value, woken)
689
690 # check state is not mucked up
691 self.check_invariant(cond)
692
693 # start some more threads/processes
694 for i in range(3):
695 p = self.Process(target=self.f, args=(cond, sleeping, woken))
696 p.set_daemon(True)
697 p.start()
698
699 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
700 t.set_daemon(True)
701 t.start()
702
703 # wait for them to all sleep
704 for i in xrange(6):
705 sleeping.acquire()
706
707 # check no process/thread has woken up
708 time.sleep(DELTA)
709 self.assertReturnsIfImplemented(0, get_value, woken)
710
711 # wake them all up
712 cond.acquire()
713 cond.notify_all()
714 cond.release()
715
716 # check they have all woken
717 time.sleep(DELTA)
718 self.assertReturnsIfImplemented(6, get_value, woken)
719
720 # check state is not mucked up
721 self.check_invariant(cond)
722
723 def test_timeout(self):
724 cond = self.Condition()
725 wait = TimingWrapper(cond.wait)
726 cond.acquire()
727 res = wait(TIMEOUT1)
728 cond.release()
729 self.assertEqual(res, None)
730 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
731
732
733class _TestEvent(BaseTestCase):
734
735 def _test_event(self, event):
736 time.sleep(TIMEOUT2)
737 event.set()
738
739 def test_event(self):
740 event = self.Event()
741 wait = TimingWrapper(event.wait)
742
743 # Removed temporaily, due to API shear, this does not
744 # work with threading._Event objects. is_set == isSet
745 #self.assertEqual(event.is_set(), False)
746
747 self.assertEqual(wait(0.0), None)
748 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
749 self.assertEqual(wait(TIMEOUT1), None)
750 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
751
752 event.set()
753
754 # See note above on the API differences
755 # self.assertEqual(event.is_set(), True)
756 self.assertEqual(wait(), None)
757 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
758 self.assertEqual(wait(TIMEOUT1), None)
759 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
760 # self.assertEqual(event.is_set(), True)
761
762 event.clear()
763
764 #self.assertEqual(event.is_set(), False)
765
766 self.Process(target=self._test_event, args=(event,)).start()
767 self.assertEqual(wait(), None)
768
769#
770#
771#
772
773class _TestValue(BaseTestCase):
774
775 codes_values = [
776 ('i', 4343, 24234),
777 ('d', 3.625, -4.25),
778 ('h', -232, 234),
779 ('c', latin('x'), latin('y'))
780 ]
781
782 def _test(self, values):
783 for sv, cv in zip(values, self.codes_values):
784 sv.value = cv[2]
785
786
787 def test_value(self, raw=False):
788 if self.TYPE != 'processes':
789 return
790
791 if raw:
792 values = [self.RawValue(code, value)
793 for code, value, _ in self.codes_values]
794 else:
795 values = [self.Value(code, value)
796 for code, value, _ in self.codes_values]
797
798 for sv, cv in zip(values, self.codes_values):
799 self.assertEqual(sv.value, cv[1])
800
801 proc = self.Process(target=self._test, args=(values,))
802 proc.start()
803 proc.join()
804
805 for sv, cv in zip(values, self.codes_values):
806 self.assertEqual(sv.value, cv[2])
807
808 def test_rawvalue(self):
809 self.test_value(raw=True)
810
811 def test_getobj_getlock(self):
812 if self.TYPE != 'processes':
813 return
814
815 val1 = self.Value('i', 5)
816 lock1 = val1.get_lock()
817 obj1 = val1.get_obj()
818
819 val2 = self.Value('i', 5, lock=None)
820 lock2 = val2.get_lock()
821 obj2 = val2.get_obj()
822
823 lock = self.Lock()
824 val3 = self.Value('i', 5, lock=lock)
825 lock3 = val3.get_lock()
826 obj3 = val3.get_obj()
827 self.assertEqual(lock, lock3)
828
829 arr4 = self.RawValue('i', 5)
830 self.assertFalse(hasattr(arr4, 'get_lock'))
831 self.assertFalse(hasattr(arr4, 'get_obj'))
832
833
834class _TestArray(BaseTestCase):
835
836 def f(self, seq):
837 for i in range(1, len(seq)):
838 seq[i] += seq[i-1]
839
840 def test_array(self, raw=False):
841 if self.TYPE != 'processes':
842 return
843
844 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
845 if raw:
846 arr = self.RawArray('i', seq)
847 else:
848 arr = self.Array('i', seq)
849
850 self.assertEqual(len(arr), len(seq))
851 self.assertEqual(arr[3], seq[3])
852 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
853
854 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
855
856 self.assertEqual(list(arr[:]), seq)
857
858 self.f(seq)
859
860 p = self.Process(target=self.f, args=(arr,))
861 p.start()
862 p.join()
863
864 self.assertEqual(list(arr[:]), seq)
865
866 def test_rawarray(self):
867 self.test_array(raw=True)
868
869 def test_getobj_getlock_obj(self):
870 if self.TYPE != 'processes':
871 return
872
873 arr1 = self.Array('i', range(10))
874 lock1 = arr1.get_lock()
875 obj1 = arr1.get_obj()
876
877 arr2 = self.Array('i', range(10), lock=None)
878 lock2 = arr2.get_lock()
879 obj2 = arr2.get_obj()
880
881 lock = self.Lock()
882 arr3 = self.Array('i', range(10), lock=lock)
883 lock3 = arr3.get_lock()
884 obj3 = arr3.get_obj()
885 self.assertEqual(lock, lock3)
886
887 arr4 = self.RawArray('i', range(10))
888 self.assertFalse(hasattr(arr4, 'get_lock'))
889 self.assertFalse(hasattr(arr4, 'get_obj'))
890
891#
892#
893#
894
895class _TestContainers(BaseTestCase):
896
897 ALLOWED_TYPES = ('manager',)
898
899 def test_list(self):
900 a = self.list(range(10))
901 self.assertEqual(a[:], range(10))
902
903 b = self.list()
904 self.assertEqual(b[:], [])
905
906 b.extend(range(5))
907 self.assertEqual(b[:], range(5))
908
909 self.assertEqual(b[2], 2)
910 self.assertEqual(b[2:10], [2,3,4])
911
912 b *= 2
913 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
914
915 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
916
917 self.assertEqual(a[:], range(10))
918
919 d = [a, b]
920 e = self.list(d)
921 self.assertEqual(
922 e[:],
923 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
924 )
925
926 f = self.list([a])
927 a.append('hello')
928 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
929
930 def test_dict(self):
931 d = self.dict()
932 indices = range(65, 70)
933 for i in indices:
934 d[i] = chr(i)
935 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
936 self.assertEqual(sorted(d.keys()), indices)
937 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
938 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
939
940 def test_namespace(self):
941 n = self.Namespace()
942 n.name = 'Bob'
943 n.job = 'Builder'
944 n._hidden = 'hidden'
945 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
946 del n.job
947 self.assertEqual(str(n), "Namespace(name='Bob')")
948 self.assertTrue(hasattr(n, 'name'))
949 self.assertTrue(not hasattr(n, 'job'))
950
951#
952#
953#
954
955def sqr(x, wait=0.0):
956 time.sleep(wait)
957 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000958class _TestPool(BaseTestCase):
959
960 def test_apply(self):
961 papply = self.pool.apply
962 self.assertEqual(papply(sqr, (5,)), sqr(5))
963 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
964
965 def test_map(self):
966 pmap = self.pool.map
967 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
968 self.assertEqual(pmap(sqr, range(100), chunksize=20),
969 map(sqr, range(100)))
970
971 def test_async(self):
972 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
973 get = TimingWrapper(res.get)
974 self.assertEqual(get(), 49)
975 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
976
977 def test_async_timeout(self):
978 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
979 get = TimingWrapper(res.get)
980 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
981 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
982
983 def test_imap(self):
984 it = self.pool.imap(sqr, range(10))
985 self.assertEqual(list(it), map(sqr, range(10)))
986
987 it = self.pool.imap(sqr, range(10))
988 for i in range(10):
989 self.assertEqual(it.next(), i*i)
990 self.assertRaises(StopIteration, it.next)
991
992 it = self.pool.imap(sqr, range(1000), chunksize=100)
993 for i in range(1000):
994 self.assertEqual(it.next(), i*i)
995 self.assertRaises(StopIteration, it.next)
996
997 def test_imap_unordered(self):
998 it = self.pool.imap_unordered(sqr, range(1000))
999 self.assertEqual(sorted(it), map(sqr, range(1000)))
1000
1001 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1002 self.assertEqual(sorted(it), map(sqr, range(1000)))
1003
1004 def test_make_pool(self):
1005 p = multiprocessing.Pool(3)
1006 self.assertEqual(3, len(p._pool))
1007 p.close()
1008 p.join()
1009
1010 def test_terminate(self):
1011 if self.TYPE == 'manager':
1012 # On Unix a forked process increfs each shared object to
1013 # which its parent process held a reference. If the
1014 # forked process gets terminated then there is likely to
1015 # be a reference leak. So to prevent
1016 # _TestZZZNumberOfObjects from failing we skip this test
1017 # when using a manager.
1018 return
1019
1020 result = self.pool.map_async(
1021 time.sleep, [0.1 for i in range(10000)], chunksize=1
1022 )
1023 self.pool.terminate()
1024 join = TimingWrapper(self.pool.join)
1025 join()
1026 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001027#
1028# Test that manager has expected number of shared objects left
1029#
1030
1031class _TestZZZNumberOfObjects(BaseTestCase):
1032 # Because test cases are sorted alphabetically, this one will get
1033 # run after all the other tests for the manager. It tests that
1034 # there have been no "reference leaks" for the manager's shared
1035 # objects. Note the comment in _TestPool.test_terminate().
1036 ALLOWED_TYPES = ('manager',)
1037
1038 def test_number_of_objects(self):
1039 EXPECTED_NUMBER = 1 # the pool object is still alive
1040 multiprocessing.active_children() # discard dead process objs
1041 gc.collect() # do garbage collection
1042 refs = self.manager._number_of_objects()
1043 if refs != EXPECTED_NUMBER:
1044 print self.manager._debugInfo()
1045
1046 self.assertEqual(refs, EXPECTED_NUMBER)
1047
1048#
1049# Test of creating a customized manager class
1050#
1051
1052from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1053
1054class FooBar(object):
1055 def f(self):
1056 return 'f()'
1057 def g(self):
1058 raise ValueError
1059 def _h(self):
1060 return '_h()'
1061
1062def baz():
1063 for i in xrange(10):
1064 yield i*i
1065
1066class IteratorProxy(BaseProxy):
1067 _exposed_ = ('next', '__next__')
1068 def __iter__(self):
1069 return self
1070 def next(self):
1071 return self._callmethod('next')
1072 def __next__(self):
1073 return self._callmethod('__next__')
1074
1075class MyManager(BaseManager):
1076 pass
1077
1078MyManager.register('Foo', callable=FooBar)
1079MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1080MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1081
1082
1083class _TestMyManager(BaseTestCase):
1084
1085 ALLOWED_TYPES = ('manager',)
1086
1087 def test_mymanager(self):
1088 manager = MyManager()
1089 manager.start()
1090
1091 foo = manager.Foo()
1092 bar = manager.Bar()
1093 baz = manager.baz()
1094
1095 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1096 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1097
1098 self.assertEqual(foo_methods, ['f', 'g'])
1099 self.assertEqual(bar_methods, ['f', '_h'])
1100
1101 self.assertEqual(foo.f(), 'f()')
1102 self.assertRaises(ValueError, foo.g)
1103 self.assertEqual(foo._callmethod('f'), 'f()')
1104 self.assertRaises(RemoteError, foo._callmethod, '_h')
1105
1106 self.assertEqual(bar.f(), 'f()')
1107 self.assertEqual(bar._h(), '_h()')
1108 self.assertEqual(bar._callmethod('f'), 'f()')
1109 self.assertEqual(bar._callmethod('_h'), '_h()')
1110
1111 self.assertEqual(list(baz), [i*i for i in range(10)])
1112
1113 manager.shutdown()
1114
1115#
1116# Test of connecting to a remote server and using xmlrpclib for serialization
1117#
1118
1119_queue = Queue.Queue()
1120def get_queue():
1121 return _queue
1122
1123class QueueManager(BaseManager):
1124 '''manager class used by server process'''
1125QueueManager.register('get_queue', callable=get_queue)
1126
1127class QueueManager2(BaseManager):
1128 '''manager class which specifies the same interface as QueueManager'''
1129QueueManager2.register('get_queue')
1130
1131
1132SERIALIZER = 'xmlrpclib'
1133
1134class _TestRemoteManager(BaseTestCase):
1135
1136 ALLOWED_TYPES = ('manager',)
1137
1138 def _putter(self, address, authkey):
1139 manager = QueueManager2(
1140 address=address, authkey=authkey, serializer=SERIALIZER
1141 )
1142 manager.connect()
1143 queue = manager.get_queue()
1144 queue.put(('hello world', None, True, 2.25))
1145
1146 def test_remote(self):
1147 authkey = os.urandom(32)
1148
1149 manager = QueueManager(
1150 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1151 )
1152 manager.start()
1153
1154 p = self.Process(target=self._putter, args=(manager.address, authkey))
1155 p.start()
1156
1157 manager2 = QueueManager2(
1158 address=manager.address, authkey=authkey, serializer=SERIALIZER
1159 )
1160 manager2.connect()
1161 queue = manager2.get_queue()
1162
1163 # Note that xmlrpclib will deserialize object as a list not a tuple
1164 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1165
1166 # Because we are using xmlrpclib for serialization instead of
1167 # pickle this will cause a serialization error.
1168 self.assertRaises(Exception, queue.put, time.sleep)
1169
1170 # Make queue finalizer run before the server is stopped
1171 del queue
1172 manager.shutdown()
1173
1174#
1175#
1176#
1177
1178SENTINEL = latin('')
1179
1180class _TestConnection(BaseTestCase):
1181
1182 ALLOWED_TYPES = ('processes', 'threads')
1183
1184 def _echo(self, conn):
1185 for msg in iter(conn.recv_bytes, SENTINEL):
1186 conn.send_bytes(msg)
1187 conn.close()
1188
1189 def test_connection(self):
1190 conn, child_conn = self.Pipe()
1191
1192 p = self.Process(target=self._echo, args=(child_conn,))
1193 p.set_daemon(True)
1194 p.start()
1195
1196 seq = [1, 2.25, None]
1197 msg = latin('hello world')
1198 longmsg = msg * 10
1199 arr = array.array('i', range(4))
1200
1201 if self.TYPE == 'processes':
1202 self.assertEqual(type(conn.fileno()), int)
1203
1204 self.assertEqual(conn.send(seq), None)
1205 self.assertEqual(conn.recv(), seq)
1206
1207 self.assertEqual(conn.send_bytes(msg), None)
1208 self.assertEqual(conn.recv_bytes(), msg)
1209
1210 if self.TYPE == 'processes':
1211 buffer = array.array('i', [0]*10)
1212 expected = list(arr) + [0] * (10 - len(arr))
1213 self.assertEqual(conn.send_bytes(arr), None)
1214 self.assertEqual(conn.recv_bytes_into(buffer),
1215 len(arr) * buffer.itemsize)
1216 self.assertEqual(list(buffer), expected)
1217
1218 buffer = array.array('i', [0]*10)
1219 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1220 self.assertEqual(conn.send_bytes(arr), None)
1221 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1222 len(arr) * buffer.itemsize)
1223 self.assertEqual(list(buffer), expected)
1224
1225 buffer = bytearray(latin(' ' * 40))
1226 self.assertEqual(conn.send_bytes(longmsg), None)
1227 try:
1228 res = conn.recv_bytes_into(buffer)
1229 except multiprocessing.BufferTooShort, e:
1230 self.assertEqual(e.args, (longmsg,))
1231 else:
1232 self.fail('expected BufferTooShort, got %s' % res)
1233
1234 poll = TimingWrapper(conn.poll)
1235
1236 self.assertEqual(poll(), False)
1237 self.assertTimingAlmostEqual(poll.elapsed, 0)
1238
1239 self.assertEqual(poll(TIMEOUT1), False)
1240 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1241
1242 conn.send(None)
1243
1244 self.assertEqual(poll(TIMEOUT1), True)
1245 self.assertTimingAlmostEqual(poll.elapsed, 0)
1246
1247 self.assertEqual(conn.recv(), None)
1248
1249 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1250 conn.send_bytes(really_big_msg)
1251 self.assertEqual(conn.recv_bytes(), really_big_msg)
1252
1253 conn.send_bytes(SENTINEL) # tell child to quit
1254 child_conn.close()
1255
1256 if self.TYPE == 'processes':
1257 self.assertEqual(conn.readable, True)
1258 self.assertEqual(conn.writable, True)
1259 self.assertRaises(EOFError, conn.recv)
1260 self.assertRaises(EOFError, conn.recv_bytes)
1261
1262 p.join()
1263
1264 def test_duplex_false(self):
1265 reader, writer = self.Pipe(duplex=False)
1266 self.assertEqual(writer.send(1), None)
1267 self.assertEqual(reader.recv(), 1)
1268 if self.TYPE == 'processes':
1269 self.assertEqual(reader.readable, True)
1270 self.assertEqual(reader.writable, False)
1271 self.assertEqual(writer.readable, False)
1272 self.assertEqual(writer.writable, True)
1273 self.assertRaises(IOError, reader.send, 2)
1274 self.assertRaises(IOError, writer.recv)
1275 self.assertRaises(IOError, writer.poll)
1276
1277 def test_spawn_close(self):
1278 # We test that a pipe connection can be closed by parent
1279 # process immediately after child is spawned. On Windows this
1280 # would have sometimes failed on old versions because
1281 # child_conn would be closed before the child got a chance to
1282 # duplicate it.
1283 conn, child_conn = self.Pipe()
1284
1285 p = self.Process(target=self._echo, args=(child_conn,))
1286 p.start()
1287 child_conn.close() # this might complete before child initializes
1288
1289 msg = latin('hello')
1290 conn.send_bytes(msg)
1291 self.assertEqual(conn.recv_bytes(), msg)
1292
1293 conn.send_bytes(SENTINEL)
1294 conn.close()
1295 p.join()
1296
1297 def test_sendbytes(self):
1298 if self.TYPE != 'processes':
1299 return
1300
1301 msg = latin('abcdefghijklmnopqrstuvwxyz')
1302 a, b = self.Pipe()
1303
1304 a.send_bytes(msg)
1305 self.assertEqual(b.recv_bytes(), msg)
1306
1307 a.send_bytes(msg, 5)
1308 self.assertEqual(b.recv_bytes(), msg[5:])
1309
1310 a.send_bytes(msg, 7, 8)
1311 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1312
1313 a.send_bytes(msg, 26)
1314 self.assertEqual(b.recv_bytes(), latin(''))
1315
1316 a.send_bytes(msg, 26, 0)
1317 self.assertEqual(b.recv_bytes(), latin(''))
1318
1319 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1320
1321 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1322
1323 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1324
1325 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1326
1327 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1328
Benjamin Petersondfd79492008-06-13 19:13:39 +00001329class _TestListenerClient(BaseTestCase):
1330
1331 ALLOWED_TYPES = ('processes', 'threads')
1332
1333 def _test(self, address):
1334 conn = self.connection.Client(address)
1335 conn.send('hello')
1336 conn.close()
1337
1338 def test_listener_client(self):
1339 for family in self.connection.families:
1340 l = self.connection.Listener(family=family)
1341 p = self.Process(target=self._test, args=(l.address,))
1342 p.set_daemon(True)
1343 p.start()
1344 conn = l.accept()
1345 self.assertEqual(conn.recv(), 'hello')
1346 p.join()
1347 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001348#
1349# Test of sending connection and socket objects between processes
1350#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001351"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001352class _TestPicklingConnections(BaseTestCase):
1353
1354 ALLOWED_TYPES = ('processes',)
1355
1356 def _listener(self, conn, families):
1357 for fam in families:
1358 l = self.connection.Listener(family=fam)
1359 conn.send(l.address)
1360 new_conn = l.accept()
1361 conn.send(new_conn)
1362
1363 if self.TYPE == 'processes':
1364 l = socket.socket()
1365 l.bind(('localhost', 0))
1366 conn.send(l.getsockname())
1367 l.listen(1)
1368 new_conn, addr = l.accept()
1369 conn.send(new_conn)
1370
1371 conn.recv()
1372
1373 def _remote(self, conn):
1374 for (address, msg) in iter(conn.recv, None):
1375 client = self.connection.Client(address)
1376 client.send(msg.upper())
1377 client.close()
1378
1379 if self.TYPE == 'processes':
1380 address, msg = conn.recv()
1381 client = socket.socket()
1382 client.connect(address)
1383 client.sendall(msg.upper())
1384 client.close()
1385
1386 conn.close()
1387
1388 def test_pickling(self):
1389 try:
1390 multiprocessing.allow_connection_pickling()
1391 except ImportError:
1392 return
1393
1394 families = self.connection.families
1395
1396 lconn, lconn0 = self.Pipe()
1397 lp = self.Process(target=self._listener, args=(lconn0, families))
1398 lp.start()
1399 lconn0.close()
1400
1401 rconn, rconn0 = self.Pipe()
1402 rp = self.Process(target=self._remote, args=(rconn0,))
1403 rp.start()
1404 rconn0.close()
1405
1406 for fam in families:
1407 msg = ('This connection uses family %s' % fam).encode('ascii')
1408 address = lconn.recv()
1409 rconn.send((address, msg))
1410 new_conn = lconn.recv()
1411 self.assertEqual(new_conn.recv(), msg.upper())
1412
1413 rconn.send(None)
1414
1415 if self.TYPE == 'processes':
1416 msg = latin('This connection uses a normal socket')
1417 address = lconn.recv()
1418 rconn.send((address, msg))
1419 if hasattr(socket, 'fromfd'):
1420 new_conn = lconn.recv()
1421 self.assertEqual(new_conn.recv(100), msg.upper())
1422 else:
1423 # XXX On Windows with Py2.6 need to backport fromfd()
1424 discard = lconn.recv_bytes()
1425
1426 lconn.send(None)
1427
1428 rconn.close()
1429 lconn.close()
1430
1431 lp.join()
1432 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001433"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001434#
1435#
1436#
1437
1438class _TestHeap(BaseTestCase):
1439
1440 ALLOWED_TYPES = ('processes',)
1441
1442 def test_heap(self):
1443 iterations = 5000
1444 maxblocks = 50
1445 blocks = []
1446
1447 # create and destroy lots of blocks of different sizes
1448 for i in xrange(iterations):
1449 size = int(random.lognormvariate(0, 1) * 1000)
1450 b = multiprocessing.heap.BufferWrapper(size)
1451 blocks.append(b)
1452 if len(blocks) > maxblocks:
1453 i = random.randrange(maxblocks)
1454 del blocks[i]
1455
1456 # get the heap object
1457 heap = multiprocessing.heap.BufferWrapper._heap
1458
1459 # verify the state of the heap
1460 all = []
1461 occupied = 0
1462 for L in heap._len_to_seq.values():
1463 for arena, start, stop in L:
1464 all.append((heap._arenas.index(arena), start, stop,
1465 stop-start, 'free'))
1466 for arena, start, stop in heap._allocated_blocks:
1467 all.append((heap._arenas.index(arena), start, stop,
1468 stop-start, 'occupied'))
1469 occupied += (stop-start)
1470
1471 all.sort()
1472
1473 for i in range(len(all)-1):
1474 (arena, start, stop) = all[i][:3]
1475 (narena, nstart, nstop) = all[i+1][:3]
1476 self.assertTrue((arena != narena and nstart == 0) or
1477 (stop == nstart))
1478
1479#
1480#
1481#
1482
1483try:
1484 from ctypes import Structure, Value, copy, c_int, c_double
1485except ImportError:
1486 Structure = object
1487 c_int = c_double = None
1488
1489class _Foo(Structure):
1490 _fields_ = [
1491 ('x', c_int),
1492 ('y', c_double)
1493 ]
1494
1495class _TestSharedCTypes(BaseTestCase):
1496
1497 ALLOWED_TYPES = ('processes',)
1498
1499 def _double(self, x, y, foo, arr, string):
1500 x.value *= 2
1501 y.value *= 2
1502 foo.x *= 2
1503 foo.y *= 2
1504 string.value *= 2
1505 for i in range(len(arr)):
1506 arr[i] *= 2
1507
1508 def test_sharedctypes(self, lock=False):
1509 if c_int is None:
1510 return
1511
1512 x = Value('i', 7, lock=lock)
1513 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1514 foo = Value(_Foo, 3, 2, lock=lock)
1515 arr = Array('d', range(10), lock=lock)
1516 string = Array('c', 20, lock=lock)
1517 string.value = 'hello'
1518
1519 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1520 p.start()
1521 p.join()
1522
1523 self.assertEqual(x.value, 14)
1524 self.assertAlmostEqual(y.value, 2.0/3.0)
1525 self.assertEqual(foo.x, 6)
1526 self.assertAlmostEqual(foo.y, 4.0)
1527 for i in range(10):
1528 self.assertAlmostEqual(arr[i], i*2)
1529 self.assertEqual(string.value, latin('hellohello'))
1530
1531 def test_synchronize(self):
1532 self.test_sharedctypes(lock=True)
1533
1534 def test_copy(self):
1535 if c_int is None:
1536 return
1537
1538 foo = _Foo(2, 5.0)
1539 bar = copy(foo)
1540 foo.x = 0
1541 foo.y = 0
1542 self.assertEqual(bar.x, 2)
1543 self.assertAlmostEqual(bar.y, 5.0)
1544
1545#
1546#
1547#
1548
1549class _TestFinalize(BaseTestCase):
1550
1551 ALLOWED_TYPES = ('processes',)
1552
1553 def _test_finalize(self, conn):
1554 class Foo(object):
1555 pass
1556
1557 a = Foo()
1558 util.Finalize(a, conn.send, args=('a',))
1559 del a # triggers callback for a
1560
1561 b = Foo()
1562 close_b = util.Finalize(b, conn.send, args=('b',))
1563 close_b() # triggers callback for b
1564 close_b() # does nothing because callback has already been called
1565 del b # does nothing because callback has already been called
1566
1567 c = Foo()
1568 util.Finalize(c, conn.send, args=('c',))
1569
1570 d10 = Foo()
1571 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1572
1573 d01 = Foo()
1574 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1575 d02 = Foo()
1576 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1577 d03 = Foo()
1578 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1579
1580 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1581
1582 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1583
1584 # call mutliprocessing's cleanup function then exit process without
1585 # garbage collecting locals
1586 util._exit_function()
1587 conn.close()
1588 os._exit(0)
1589
1590 def test_finalize(self):
1591 conn, child_conn = self.Pipe()
1592
1593 p = self.Process(target=self._test_finalize, args=(child_conn,))
1594 p.start()
1595 p.join()
1596
1597 result = [obj for obj in iter(conn.recv, 'STOP')]
1598 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1599
1600#
1601# Test that from ... import * works for each module
1602#
1603
1604class _TestImportStar(BaseTestCase):
1605
1606 ALLOWED_TYPES = ('processes',)
1607
1608 def test_import(self):
1609 modules = (
1610 'multiprocessing', 'multiprocessing.connection',
1611 'multiprocessing.heap', 'multiprocessing.managers',
1612 'multiprocessing.pool', 'multiprocessing.process',
1613 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1614 'multiprocessing.synchronize', 'multiprocessing.util'
1615 )
1616
1617 for name in modules:
1618 __import__(name)
1619 mod = sys.modules[name]
1620
1621 for attr in getattr(mod, '__all__', ()):
1622 self.assertTrue(
1623 hasattr(mod, attr),
1624 '%r does not have attribute %r' % (mod, attr)
1625 )
1626
1627#
1628# Quick test that logging works -- does not test logging output
1629#
1630
1631class _TestLogging(BaseTestCase):
1632
1633 ALLOWED_TYPES = ('processes',)
1634
1635 def test_enable_logging(self):
1636 logger = multiprocessing.get_logger()
1637 logger.setLevel(util.SUBWARNING)
1638 self.assertTrue(logger is not None)
1639 logger.debug('this will not be printed')
1640 logger.info('nor will this')
1641 logger.setLevel(LOG_LEVEL)
1642
1643 def _test_level(self, conn):
1644 logger = multiprocessing.get_logger()
1645 conn.send(logger.getEffectiveLevel())
1646
1647 def test_level(self):
1648 LEVEL1 = 32
1649 LEVEL2 = 37
1650
1651 logger = multiprocessing.get_logger()
1652 root_logger = logging.getLogger()
1653 root_level = root_logger.level
1654
1655 reader, writer = multiprocessing.Pipe(duplex=False)
1656
1657 logger.setLevel(LEVEL1)
1658 self.Process(target=self._test_level, args=(writer,)).start()
1659 self.assertEqual(LEVEL1, reader.recv())
1660
1661 logger.setLevel(logging.NOTSET)
1662 root_logger.setLevel(LEVEL2)
1663 self.Process(target=self._test_level, args=(writer,)).start()
1664 self.assertEqual(LEVEL2, reader.recv())
1665
1666 root_logger.setLevel(root_level)
1667 logger.setLevel(level=LOG_LEVEL)
1668
1669#
1670# Functions used to create test cases from the base ones in this module
1671#
1672
1673def get_attributes(Source, names):
1674 d = {}
1675 for name in names:
1676 obj = getattr(Source, name)
1677 if type(obj) == type(get_attributes):
1678 obj = staticmethod(obj)
1679 d[name] = obj
1680 return d
1681
1682def create_test_cases(Mixin, type):
1683 result = {}
1684 glob = globals()
1685 Type = type[0].upper() + type[1:]
1686
1687 for name in glob.keys():
1688 if name.startswith('_Test'):
1689 base = glob[name]
1690 if type in base.ALLOWED_TYPES:
1691 newname = 'With' + Type + name[1:]
1692 class Temp(base, unittest.TestCase, Mixin):
1693 pass
1694 result[newname] = Temp
1695 Temp.__name__ = newname
1696 Temp.__module__ = Mixin.__module__
1697 return result
1698
1699#
1700# Create test cases
1701#
1702
1703class ProcessesMixin(object):
1704 TYPE = 'processes'
1705 Process = multiprocessing.Process
1706 locals().update(get_attributes(multiprocessing, (
1707 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1708 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1709 'RawArray', 'current_process', 'active_children', 'Pipe',
1710 'connection', 'JoinableQueue'
1711 )))
1712
1713testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1714globals().update(testcases_processes)
1715
1716
1717class ManagerMixin(object):
1718 TYPE = 'manager'
1719 Process = multiprocessing.Process
1720 manager = object.__new__(multiprocessing.managers.SyncManager)
1721 locals().update(get_attributes(manager, (
1722 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1723 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1724 'Namespace', 'JoinableQueue'
1725 )))
1726
1727testcases_manager = create_test_cases(ManagerMixin, type='manager')
1728globals().update(testcases_manager)
1729
1730
1731class ThreadsMixin(object):
1732 TYPE = 'threads'
1733 Process = multiprocessing.dummy.Process
1734 locals().update(get_attributes(multiprocessing.dummy, (
1735 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1736 'Condition', 'Event', 'Value', 'Array', 'current_process',
1737 'active_children', 'Pipe', 'connection', 'dict', 'list',
1738 'Namespace', 'JoinableQueue'
1739 )))
1740
1741testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1742globals().update(testcases_threads)
1743
1744#
1745#
1746#
1747
1748def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001749 if sys.platform.startswith("linux"):
1750 try:
1751 lock = multiprocessing.RLock()
1752 except OSError:
1753 from test.test_support import TestSkipped
1754 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001755
Benjamin Petersondfd79492008-06-13 19:13:39 +00001756 if run is None:
1757 from test.test_support import run_unittest as run
1758
1759 util.get_temp_dir() # creates temp directory for use by all processes
1760
1761 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1762
Jesse Noller146b7ab2008-07-02 16:44:09 +00001763 ProcessesMixin.pool = multiprocessing.Pool(4)
1764 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1765 ManagerMixin.manager.__init__()
1766 ManagerMixin.manager.start()
1767 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001768
1769 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001770 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1771 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1772 sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001773 )
1774
1775 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1776 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1777 run(suite)
1778
Jesse Noller146b7ab2008-07-02 16:44:09 +00001779 ThreadsMixin.pool.terminate()
1780 ProcessesMixin.pool.terminate()
1781 ManagerMixin.pool.terminate()
1782 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001783
Jesse Noller146b7ab2008-07-02 16:44:09 +00001784 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001785
1786def main():
1787 test_main(unittest.TextTestRunner(verbosity=2).run)
1788
1789if __name__ == '__main__':
1790 main()