blob: cd0784bca3259b8ccf9764c931101ba2db6c4258 [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
21import multiprocessing.dummy
22import multiprocessing.connection
23import multiprocessing.managers
24import multiprocessing.heap
25import multiprocessing.managers
26import multiprocessing.pool
27import _multiprocessing
28
29from multiprocessing import util
30
31#
32#
33#
34
35if sys.version_info >= (3, 0):
36 def latin(s):
37 return s.encode('latin')
38else:
39 latin = str
40
41try:
42 bytes
43except NameError:
44 bytes = str
45 def bytearray(seq):
46 return array.array('c', seq)
47
48#
49# Constants
50#
51
52LOG_LEVEL = util.SUBWARNING
53#LOG_LEVEL = logging.WARNING
54
55DELTA = 0.1
56CHECK_TIMINGS = False # making true makes tests take a lot longer
57 # and can sometimes cause some non-serious
58 # failures because some calls block a bit
59 # longer than expected
60if CHECK_TIMINGS:
61 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
62else:
63 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
64
65HAVE_GETVALUE = not getattr(_multiprocessing,
66 'HAVE_BROKEN_SEM_GETVALUE', False)
67
68#
69# Creates a wrapper for a function which records the time it takes to finish
70#
71
72class TimingWrapper(object):
73
74 def __init__(self, func):
75 self.func = func
76 self.elapsed = None
77
78 def __call__(self, *args, **kwds):
79 t = time.time()
80 try:
81 return self.func(*args, **kwds)
82 finally:
83 self.elapsed = time.time() - t
84
85#
86# Base class for test cases
87#
88
89class BaseTestCase(object):
90
91 ALLOWED_TYPES = ('processes', 'manager', 'threads')
92
93 def assertTimingAlmostEqual(self, a, b):
94 if CHECK_TIMINGS:
95 self.assertAlmostEqual(a, b, 1)
96
97 def assertReturnsIfImplemented(self, value, func, *args):
98 try:
99 res = func(*args)
100 except NotImplementedError:
101 pass
102 else:
103 return self.assertEqual(value, res)
104
105#
106# Return the value of a semaphore
107#
108
109def get_value(self):
110 try:
111 return self.get_value()
112 except AttributeError:
113 try:
114 return self._Semaphore__value
115 except AttributeError:
116 try:
117 return self._value
118 except AttributeError:
119 raise NotImplementedError
120
121#
122# Testcases
123#
124
125class _TestProcess(BaseTestCase):
126
127 ALLOWED_TYPES = ('processes', 'threads')
128
129 def test_current(self):
130 if self.TYPE == 'threads':
131 return
132
133 current = self.current_process()
134 authkey = current.get_authkey()
135
136 self.assertTrue(current.is_alive())
137 self.assertTrue(not current.is_daemon())
138 self.assertTrue(isinstance(authkey, bytes))
139 self.assertTrue(len(authkey) > 0)
140 self.assertEqual(current.get_ident(), os.getpid())
141 self.assertEqual(current.get_exitcode(), None)
142
143 def _test(self, q, *args, **kwds):
144 current = self.current_process()
145 q.put(args)
146 q.put(kwds)
147 q.put(current.get_name())
148 if self.TYPE != 'threads':
149 q.put(bytes(current.get_authkey()))
150 q.put(current.pid)
151
152 def test_process(self):
153 q = self.Queue(1)
154 e = self.Event()
155 args = (q, 1, 2)
156 kwargs = {'hello':23, 'bye':2.54}
157 name = 'SomeProcess'
158 p = self.Process(
159 target=self._test, args=args, kwargs=kwargs, name=name
160 )
161 p.set_daemon(True)
162 current = self.current_process()
163
164 if self.TYPE != 'threads':
165 self.assertEquals(p.get_authkey(), current.get_authkey())
166 self.assertEquals(p.is_alive(), False)
167 self.assertEquals(p.is_daemon(), True)
168 self.assertTrue(p not in self.active_children())
169 self.assertTrue(type(self.active_children()) is list)
170 self.assertEqual(p.get_exitcode(), None)
171
172 p.start()
173
174 self.assertEquals(p.get_exitcode(), None)
175 self.assertEquals(p.is_alive(), True)
176 self.assertTrue(p in self.active_children())
177
178 self.assertEquals(q.get(), args[1:])
179 self.assertEquals(q.get(), kwargs)
180 self.assertEquals(q.get(), p.get_name())
181 if self.TYPE != 'threads':
182 self.assertEquals(q.get(), current.get_authkey())
183 self.assertEquals(q.get(), p.pid)
184
185 p.join()
186
187 self.assertEquals(p.get_exitcode(), 0)
188 self.assertEquals(p.is_alive(), False)
189 self.assertTrue(p not in self.active_children())
190
191 def _test_terminate(self):
192 time.sleep(1000)
193
194 def test_terminate(self):
195 if self.TYPE == 'threads':
196 return
197
198 p = self.Process(target=self._test_terminate)
199 p.set_daemon(True)
200 p.start()
201
202 self.assertEqual(p.is_alive(), True)
203 self.assertTrue(p in self.active_children())
204 self.assertEqual(p.get_exitcode(), None)
205
206 p.terminate()
207
208 join = TimingWrapper(p.join)
209 self.assertEqual(join(), None)
210 self.assertTimingAlmostEqual(join.elapsed, 0.0)
211
212 self.assertEqual(p.is_alive(), False)
213 self.assertTrue(p not in self.active_children())
214
215 p.join()
216
217 # XXX sometimes get p.get_exitcode() == 0 on Windows ...
218 #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
219
220 def test_cpu_count(self):
221 try:
222 cpus = multiprocessing.cpu_count()
223 except NotImplementedError:
224 cpus = 1
225 self.assertTrue(type(cpus) is int)
226 self.assertTrue(cpus >= 1)
227
228 def test_active_children(self):
229 self.assertEqual(type(self.active_children()), list)
230
231 p = self.Process(target=time.sleep, args=(DELTA,))
232 self.assertTrue(p not in self.active_children())
233
234 p.start()
235 self.assertTrue(p in self.active_children())
236
237 p.join()
238 self.assertTrue(p not in self.active_children())
239
240 def _test_recursion(self, wconn, id):
241 from multiprocessing import forking
242 wconn.send(id)
243 if len(id) < 2:
244 for i in range(2):
245 p = self.Process(
246 target=self._test_recursion, args=(wconn, id+[i])
247 )
248 p.start()
249 p.join()
250
251 def test_recursion(self):
252 rconn, wconn = self.Pipe(duplex=False)
253 self._test_recursion(wconn, [])
254
255 time.sleep(DELTA)
256 result = []
257 while rconn.poll():
258 result.append(rconn.recv())
259
260 expected = [
261 [],
262 [0],
263 [0, 0],
264 [0, 1],
265 [1],
266 [1, 0],
267 [1, 1]
268 ]
269 self.assertEqual(result, expected)
270
271#
272#
273#
274
275class _UpperCaser(multiprocessing.Process):
276
277 def __init__(self):
278 multiprocessing.Process.__init__(self)
279 self.child_conn, self.parent_conn = multiprocessing.Pipe()
280
281 def run(self):
282 self.parent_conn.close()
283 for s in iter(self.child_conn.recv, None):
284 self.child_conn.send(s.upper())
285 self.child_conn.close()
286
287 def submit(self, s):
288 assert type(s) is str
289 self.parent_conn.send(s)
290 return self.parent_conn.recv()
291
292 def stop(self):
293 self.parent_conn.send(None)
294 self.parent_conn.close()
295 self.child_conn.close()
296
297class _TestSubclassingProcess(BaseTestCase):
298
299 ALLOWED_TYPES = ('processes',)
300
301 def test_subclassing(self):
302 uppercaser = _UpperCaser()
303 uppercaser.start()
304 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
305 self.assertEqual(uppercaser.submit('world'), 'WORLD')
306 uppercaser.stop()
307 uppercaser.join()
308
309#
310#
311#
312
313def queue_empty(q):
314 if hasattr(q, 'empty'):
315 return q.empty()
316 else:
317 return q.qsize() == 0
318
319def queue_full(q, maxsize):
320 if hasattr(q, 'full'):
321 return q.full()
322 else:
323 return q.qsize() == maxsize
324
325
326class _TestQueue(BaseTestCase):
327
328
329 def _test_put(self, queue, child_can_start, parent_can_continue):
330 child_can_start.wait()
331 for i in range(6):
332 queue.get()
333 parent_can_continue.set()
334
335 def test_put(self):
336 MAXSIZE = 6
337 queue = self.Queue(maxsize=MAXSIZE)
338 child_can_start = self.Event()
339 parent_can_continue = self.Event()
340
341 proc = self.Process(
342 target=self._test_put,
343 args=(queue, child_can_start, parent_can_continue)
344 )
345 proc.set_daemon(True)
346 proc.start()
347
348 self.assertEqual(queue_empty(queue), True)
349 self.assertEqual(queue_full(queue, MAXSIZE), False)
350
351 queue.put(1)
352 queue.put(2, True)
353 queue.put(3, True, None)
354 queue.put(4, False)
355 queue.put(5, False, None)
356 queue.put_nowait(6)
357
358 # the values may be in buffer but not yet in pipe so sleep a bit
359 time.sleep(DELTA)
360
361 self.assertEqual(queue_empty(queue), False)
362 self.assertEqual(queue_full(queue, MAXSIZE), True)
363
364 put = TimingWrapper(queue.put)
365 put_nowait = TimingWrapper(queue.put_nowait)
366
367 self.assertRaises(Queue.Full, put, 7, False)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(Queue.Full, put, 7, False, None)
371 self.assertTimingAlmostEqual(put.elapsed, 0)
372
373 self.assertRaises(Queue.Full, put_nowait, 7)
374 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
375
376 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
377 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
378
379 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
380 self.assertTimingAlmostEqual(put.elapsed, 0)
381
382 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
383 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
384
385 child_can_start.set()
386 parent_can_continue.wait()
387
388 self.assertEqual(queue_empty(queue), True)
389 self.assertEqual(queue_full(queue, MAXSIZE), False)
390
391 proc.join()
392
393 def _test_get(self, queue, child_can_start, parent_can_continue):
394 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000395 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000396 queue.put(2)
397 queue.put(3)
398 queue.put(4)
399 queue.put(5)
400 parent_can_continue.set()
401
402 def test_get(self):
403 queue = self.Queue()
404 child_can_start = self.Event()
405 parent_can_continue = self.Event()
406
407 proc = self.Process(
408 target=self._test_get,
409 args=(queue, child_can_start, parent_can_continue)
410 )
411 proc.set_daemon(True)
412 proc.start()
413
414 self.assertEqual(queue_empty(queue), True)
415
416 child_can_start.set()
417 parent_can_continue.wait()
418
419 time.sleep(DELTA)
420 self.assertEqual(queue_empty(queue), False)
421
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000422 # Hangs unexpectedly, remove for now
423 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000424 self.assertEqual(queue.get(True, None), 2)
425 self.assertEqual(queue.get(True), 3)
426 self.assertEqual(queue.get(timeout=1), 4)
427 self.assertEqual(queue.get_nowait(), 5)
428
429 self.assertEqual(queue_empty(queue), True)
430
431 get = TimingWrapper(queue.get)
432 get_nowait = TimingWrapper(queue.get_nowait)
433
434 self.assertRaises(Queue.Empty, get, False)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(Queue.Empty, get, False, None)
438 self.assertTimingAlmostEqual(get.elapsed, 0)
439
440 self.assertRaises(Queue.Empty, get_nowait)
441 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
442
443 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
444 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
445
446 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
447 self.assertTimingAlmostEqual(get.elapsed, 0)
448
449 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
450 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
451
452 proc.join()
453
454 def _test_fork(self, queue):
455 for i in range(10, 20):
456 queue.put(i)
457 # note that at this point the items may only be buffered, so the
458 # process cannot shutdown until the feeder thread has finished
459 # pushing items onto the pipe.
460
461 def test_fork(self):
462 # Old versions of Queue would fail to create a new feeder
463 # thread for a forked process if the original process had its
464 # own feeder thread. This test checks that this no longer
465 # happens.
466
467 queue = self.Queue()
468
469 # put items on queue so that main process starts a feeder thread
470 for i in range(10):
471 queue.put(i)
472
473 # wait to make sure thread starts before we fork a new process
474 time.sleep(DELTA)
475
476 # fork process
477 p = self.Process(target=self._test_fork, args=(queue,))
478 p.start()
479
480 # check that all expected items are in the queue
481 for i in range(20):
482 self.assertEqual(queue.get(), i)
483 self.assertRaises(Queue.Empty, queue.get, False)
484
485 p.join()
486
487 def test_qsize(self):
488 q = self.Queue()
489 try:
490 self.assertEqual(q.qsize(), 0)
491 except NotImplementedError:
492 return
493 q.put(1)
494 self.assertEqual(q.qsize(), 1)
495 q.put(5)
496 self.assertEqual(q.qsize(), 2)
497 q.get()
498 self.assertEqual(q.qsize(), 1)
499 q.get()
500 self.assertEqual(q.qsize(), 0)
501
502 def _test_task_done(self, q):
503 for obj in iter(q.get, None):
504 time.sleep(DELTA)
505 q.task_done()
506
507 def test_task_done(self):
508 queue = self.JoinableQueue()
509
510 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
511 return
512
513 workers = [self.Process(target=self._test_task_done, args=(queue,))
514 for i in xrange(4)]
515
516 for p in workers:
517 p.start()
518
519 for i in xrange(10):
520 queue.put(i)
521
522 queue.join()
523
524 for p in workers:
525 queue.put(None)
526
527 for p in workers:
528 p.join()
529
530#
531#
532#
533
534class _TestLock(BaseTestCase):
535
536 def test_lock(self):
537 lock = self.Lock()
538 self.assertEqual(lock.acquire(), True)
539 self.assertEqual(lock.acquire(False), False)
540 self.assertEqual(lock.release(), None)
541 self.assertRaises((ValueError, threading.ThreadError), lock.release)
542
543 def test_rlock(self):
544 lock = self.RLock()
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.acquire(), True)
547 self.assertEqual(lock.acquire(), True)
548 self.assertEqual(lock.release(), None)
549 self.assertEqual(lock.release(), None)
550 self.assertEqual(lock.release(), None)
551 self.assertRaises((AssertionError, RuntimeError), lock.release)
552
553
554class _TestSemaphore(BaseTestCase):
555
556 def _test_semaphore(self, sem):
557 self.assertReturnsIfImplemented(2, get_value, sem)
558 self.assertEqual(sem.acquire(), True)
559 self.assertReturnsIfImplemented(1, get_value, sem)
560 self.assertEqual(sem.acquire(), True)
561 self.assertReturnsIfImplemented(0, get_value, sem)
562 self.assertEqual(sem.acquire(False), False)
563 self.assertReturnsIfImplemented(0, get_value, sem)
564 self.assertEqual(sem.release(), None)
565 self.assertReturnsIfImplemented(1, get_value, sem)
566 self.assertEqual(sem.release(), None)
567 self.assertReturnsIfImplemented(2, get_value, sem)
568
569 def test_semaphore(self):
570 sem = self.Semaphore(2)
571 self._test_semaphore(sem)
572 self.assertEqual(sem.release(), None)
573 self.assertReturnsIfImplemented(3, get_value, sem)
574 self.assertEqual(sem.release(), None)
575 self.assertReturnsIfImplemented(4, get_value, sem)
576
577 def test_bounded_semaphore(self):
578 sem = self.BoundedSemaphore(2)
579 self._test_semaphore(sem)
580 # Currently fails on OS/X
581 #if HAVE_GETVALUE:
582 # self.assertRaises(ValueError, sem.release)
583 # self.assertReturnsIfImplemented(2, get_value, sem)
584
585 def test_timeout(self):
586 if self.TYPE != 'processes':
587 return
588
589 sem = self.Semaphore(0)
590 acquire = TimingWrapper(sem.acquire)
591
592 self.assertEqual(acquire(False), False)
593 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
594
595 self.assertEqual(acquire(False, None), False)
596 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
597
598 self.assertEqual(acquire(False, TIMEOUT1), False)
599 self.assertTimingAlmostEqual(acquire.elapsed, 0)
600
601 self.assertEqual(acquire(True, TIMEOUT2), False)
602 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
603
604 self.assertEqual(acquire(timeout=TIMEOUT3), False)
605 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
606
607
608class _TestCondition(BaseTestCase):
609
610 def f(self, cond, sleeping, woken, timeout=None):
611 cond.acquire()
612 sleeping.release()
613 cond.wait(timeout)
614 woken.release()
615 cond.release()
616
617 def check_invariant(self, cond):
618 # this is only supposed to succeed when there are no sleepers
619 if self.TYPE == 'processes':
620 try:
621 sleepers = (cond._sleeping_count.get_value() -
622 cond._woken_count.get_value())
623 self.assertEqual(sleepers, 0)
624 self.assertEqual(cond._wait_semaphore.get_value(), 0)
625 except NotImplementedError:
626 pass
627
628 def test_notify(self):
629 cond = self.Condition()
630 sleeping = self.Semaphore(0)
631 woken = self.Semaphore(0)
632
633 p = self.Process(target=self.f, args=(cond, sleeping, woken))
634 p.set_daemon(True)
635 p.start()
636
637 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
638 p.set_daemon(True)
639 p.start()
640
641 # wait for both children to start sleeping
642 sleeping.acquire()
643 sleeping.acquire()
644
645 # check no process/thread has woken up
646 time.sleep(DELTA)
647 self.assertReturnsIfImplemented(0, get_value, woken)
648
649 # wake up one process/thread
650 cond.acquire()
651 cond.notify()
652 cond.release()
653
654 # check one process/thread has woken up
655 time.sleep(DELTA)
656 self.assertReturnsIfImplemented(1, get_value, woken)
657
658 # wake up another
659 cond.acquire()
660 cond.notify()
661 cond.release()
662
663 # check other has woken up
664 time.sleep(DELTA)
665 self.assertReturnsIfImplemented(2, get_value, woken)
666
667 # check state is not mucked up
668 self.check_invariant(cond)
669 p.join()
670
671 def test_notify_all(self):
672 cond = self.Condition()
673 sleeping = self.Semaphore(0)
674 woken = self.Semaphore(0)
675
676 # start some threads/processes which will timeout
677 for i in range(3):
678 p = self.Process(target=self.f,
679 args=(cond, sleeping, woken, TIMEOUT1))
680 p.set_daemon(True)
681 p.start()
682
683 t = threading.Thread(target=self.f,
684 args=(cond, sleeping, woken, TIMEOUT1))
685 t.set_daemon(True)
686 t.start()
687
688 # wait for them all to sleep
689 for i in xrange(6):
690 sleeping.acquire()
691
692 # check they have all timed out
693 for i in xrange(6):
694 woken.acquire()
695 self.assertReturnsIfImplemented(0, get_value, woken)
696
697 # check state is not mucked up
698 self.check_invariant(cond)
699
700 # start some more threads/processes
701 for i in range(3):
702 p = self.Process(target=self.f, args=(cond, sleeping, woken))
703 p.set_daemon(True)
704 p.start()
705
706 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
707 t.set_daemon(True)
708 t.start()
709
710 # wait for them to all sleep
711 for i in xrange(6):
712 sleeping.acquire()
713
714 # check no process/thread has woken up
715 time.sleep(DELTA)
716 self.assertReturnsIfImplemented(0, get_value, woken)
717
718 # wake them all up
719 cond.acquire()
720 cond.notify_all()
721 cond.release()
722
723 # check they have all woken
724 time.sleep(DELTA)
725 self.assertReturnsIfImplemented(6, get_value, woken)
726
727 # check state is not mucked up
728 self.check_invariant(cond)
729
730 def test_timeout(self):
731 cond = self.Condition()
732 wait = TimingWrapper(cond.wait)
733 cond.acquire()
734 res = wait(TIMEOUT1)
735 cond.release()
736 self.assertEqual(res, None)
737 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
738
739
740class _TestEvent(BaseTestCase):
741
742 def _test_event(self, event):
743 time.sleep(TIMEOUT2)
744 event.set()
745
746 def test_event(self):
747 event = self.Event()
748 wait = TimingWrapper(event.wait)
749
750 # Removed temporaily, due to API shear, this does not
751 # work with threading._Event objects. is_set == isSet
752 #self.assertEqual(event.is_set(), False)
753
754 self.assertEqual(wait(0.0), None)
755 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
756 self.assertEqual(wait(TIMEOUT1), None)
757 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
758
759 event.set()
760
761 # See note above on the API differences
762 # self.assertEqual(event.is_set(), True)
763 self.assertEqual(wait(), None)
764 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
765 self.assertEqual(wait(TIMEOUT1), None)
766 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
767 # self.assertEqual(event.is_set(), True)
768
769 event.clear()
770
771 #self.assertEqual(event.is_set(), False)
772
773 self.Process(target=self._test_event, args=(event,)).start()
774 self.assertEqual(wait(), None)
775
776#
777#
778#
779
780class _TestValue(BaseTestCase):
781
782 codes_values = [
783 ('i', 4343, 24234),
784 ('d', 3.625, -4.25),
785 ('h', -232, 234),
786 ('c', latin('x'), latin('y'))
787 ]
788
789 def _test(self, values):
790 for sv, cv in zip(values, self.codes_values):
791 sv.value = cv[2]
792
793
794 def test_value(self, raw=False):
795 if self.TYPE != 'processes':
796 return
797
798 if raw:
799 values = [self.RawValue(code, value)
800 for code, value, _ in self.codes_values]
801 else:
802 values = [self.Value(code, value)
803 for code, value, _ in self.codes_values]
804
805 for sv, cv in zip(values, self.codes_values):
806 self.assertEqual(sv.value, cv[1])
807
808 proc = self.Process(target=self._test, args=(values,))
809 proc.start()
810 proc.join()
811
812 for sv, cv in zip(values, self.codes_values):
813 self.assertEqual(sv.value, cv[2])
814
815 def test_rawvalue(self):
816 self.test_value(raw=True)
817
818 def test_getobj_getlock(self):
819 if self.TYPE != 'processes':
820 return
821
822 val1 = self.Value('i', 5)
823 lock1 = val1.get_lock()
824 obj1 = val1.get_obj()
825
826 val2 = self.Value('i', 5, lock=None)
827 lock2 = val2.get_lock()
828 obj2 = val2.get_obj()
829
830 lock = self.Lock()
831 val3 = self.Value('i', 5, lock=lock)
832 lock3 = val3.get_lock()
833 obj3 = val3.get_obj()
834 self.assertEqual(lock, lock3)
835
836 arr4 = self.RawValue('i', 5)
837 self.assertFalse(hasattr(arr4, 'get_lock'))
838 self.assertFalse(hasattr(arr4, 'get_obj'))
839
840
841class _TestArray(BaseTestCase):
842
843 def f(self, seq):
844 for i in range(1, len(seq)):
845 seq[i] += seq[i-1]
846
847 def test_array(self, raw=False):
848 if self.TYPE != 'processes':
849 return
850
851 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
852 if raw:
853 arr = self.RawArray('i', seq)
854 else:
855 arr = self.Array('i', seq)
856
857 self.assertEqual(len(arr), len(seq))
858 self.assertEqual(arr[3], seq[3])
859 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
860
861 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
862
863 self.assertEqual(list(arr[:]), seq)
864
865 self.f(seq)
866
867 p = self.Process(target=self.f, args=(arr,))
868 p.start()
869 p.join()
870
871 self.assertEqual(list(arr[:]), seq)
872
873 def test_rawarray(self):
874 self.test_array(raw=True)
875
876 def test_getobj_getlock_obj(self):
877 if self.TYPE != 'processes':
878 return
879
880 arr1 = self.Array('i', range(10))
881 lock1 = arr1.get_lock()
882 obj1 = arr1.get_obj()
883
884 arr2 = self.Array('i', range(10), lock=None)
885 lock2 = arr2.get_lock()
886 obj2 = arr2.get_obj()
887
888 lock = self.Lock()
889 arr3 = self.Array('i', range(10), lock=lock)
890 lock3 = arr3.get_lock()
891 obj3 = arr3.get_obj()
892 self.assertEqual(lock, lock3)
893
894 arr4 = self.RawArray('i', range(10))
895 self.assertFalse(hasattr(arr4, 'get_lock'))
896 self.assertFalse(hasattr(arr4, 'get_obj'))
897
898#
899#
900#
901
902class _TestContainers(BaseTestCase):
903
904 ALLOWED_TYPES = ('manager',)
905
906 def test_list(self):
907 a = self.list(range(10))
908 self.assertEqual(a[:], range(10))
909
910 b = self.list()
911 self.assertEqual(b[:], [])
912
913 b.extend(range(5))
914 self.assertEqual(b[:], range(5))
915
916 self.assertEqual(b[2], 2)
917 self.assertEqual(b[2:10], [2,3,4])
918
919 b *= 2
920 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
921
922 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
923
924 self.assertEqual(a[:], range(10))
925
926 d = [a, b]
927 e = self.list(d)
928 self.assertEqual(
929 e[:],
930 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
931 )
932
933 f = self.list([a])
934 a.append('hello')
935 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
936
937 def test_dict(self):
938 d = self.dict()
939 indices = range(65, 70)
940 for i in indices:
941 d[i] = chr(i)
942 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
943 self.assertEqual(sorted(d.keys()), indices)
944 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
945 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
946
947 def test_namespace(self):
948 n = self.Namespace()
949 n.name = 'Bob'
950 n.job = 'Builder'
951 n._hidden = 'hidden'
952 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
953 del n.job
954 self.assertEqual(str(n), "Namespace(name='Bob')")
955 self.assertTrue(hasattr(n, 'name'))
956 self.assertTrue(not hasattr(n, 'job'))
957
958#
959#
960#
961
962def sqr(x, wait=0.0):
963 time.sleep(wait)
964 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000965class _TestPool(BaseTestCase):
966
967 def test_apply(self):
968 papply = self.pool.apply
969 self.assertEqual(papply(sqr, (5,)), sqr(5))
970 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
971
972 def test_map(self):
973 pmap = self.pool.map
974 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
975 self.assertEqual(pmap(sqr, range(100), chunksize=20),
976 map(sqr, range(100)))
977
978 def test_async(self):
979 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
980 get = TimingWrapper(res.get)
981 self.assertEqual(get(), 49)
982 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
983
984 def test_async_timeout(self):
985 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
986 get = TimingWrapper(res.get)
987 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
988 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
989
990 def test_imap(self):
991 it = self.pool.imap(sqr, range(10))
992 self.assertEqual(list(it), map(sqr, range(10)))
993
994 it = self.pool.imap(sqr, range(10))
995 for i in range(10):
996 self.assertEqual(it.next(), i*i)
997 self.assertRaises(StopIteration, it.next)
998
999 it = self.pool.imap(sqr, range(1000), chunksize=100)
1000 for i in range(1000):
1001 self.assertEqual(it.next(), i*i)
1002 self.assertRaises(StopIteration, it.next)
1003
1004 def test_imap_unordered(self):
1005 it = self.pool.imap_unordered(sqr, range(1000))
1006 self.assertEqual(sorted(it), map(sqr, range(1000)))
1007
1008 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1009 self.assertEqual(sorted(it), map(sqr, range(1000)))
1010
1011 def test_make_pool(self):
1012 p = multiprocessing.Pool(3)
1013 self.assertEqual(3, len(p._pool))
1014 p.close()
1015 p.join()
1016
1017 def test_terminate(self):
1018 if self.TYPE == 'manager':
1019 # On Unix a forked process increfs each shared object to
1020 # which its parent process held a reference. If the
1021 # forked process gets terminated then there is likely to
1022 # be a reference leak. So to prevent
1023 # _TestZZZNumberOfObjects from failing we skip this test
1024 # when using a manager.
1025 return
1026
1027 result = self.pool.map_async(
1028 time.sleep, [0.1 for i in range(10000)], chunksize=1
1029 )
1030 self.pool.terminate()
1031 join = TimingWrapper(self.pool.join)
1032 join()
1033 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001034#
1035# Test that manager has expected number of shared objects left
1036#
1037
1038class _TestZZZNumberOfObjects(BaseTestCase):
1039 # Because test cases are sorted alphabetically, this one will get
1040 # run after all the other tests for the manager. It tests that
1041 # there have been no "reference leaks" for the manager's shared
1042 # objects. Note the comment in _TestPool.test_terminate().
1043 ALLOWED_TYPES = ('manager',)
1044
1045 def test_number_of_objects(self):
1046 EXPECTED_NUMBER = 1 # the pool object is still alive
1047 multiprocessing.active_children() # discard dead process objs
1048 gc.collect() # do garbage collection
1049 refs = self.manager._number_of_objects()
1050 if refs != EXPECTED_NUMBER:
1051 print self.manager._debugInfo()
1052
1053 self.assertEqual(refs, EXPECTED_NUMBER)
1054
1055#
1056# Test of creating a customized manager class
1057#
1058
1059from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1060
1061class FooBar(object):
1062 def f(self):
1063 return 'f()'
1064 def g(self):
1065 raise ValueError
1066 def _h(self):
1067 return '_h()'
1068
1069def baz():
1070 for i in xrange(10):
1071 yield i*i
1072
1073class IteratorProxy(BaseProxy):
1074 _exposed_ = ('next', '__next__')
1075 def __iter__(self):
1076 return self
1077 def next(self):
1078 return self._callmethod('next')
1079 def __next__(self):
1080 return self._callmethod('__next__')
1081
1082class MyManager(BaseManager):
1083 pass
1084
1085MyManager.register('Foo', callable=FooBar)
1086MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1087MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1088
1089
1090class _TestMyManager(BaseTestCase):
1091
1092 ALLOWED_TYPES = ('manager',)
1093
1094 def test_mymanager(self):
1095 manager = MyManager()
1096 manager.start()
1097
1098 foo = manager.Foo()
1099 bar = manager.Bar()
1100 baz = manager.baz()
1101
1102 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1103 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1104
1105 self.assertEqual(foo_methods, ['f', 'g'])
1106 self.assertEqual(bar_methods, ['f', '_h'])
1107
1108 self.assertEqual(foo.f(), 'f()')
1109 self.assertRaises(ValueError, foo.g)
1110 self.assertEqual(foo._callmethod('f'), 'f()')
1111 self.assertRaises(RemoteError, foo._callmethod, '_h')
1112
1113 self.assertEqual(bar.f(), 'f()')
1114 self.assertEqual(bar._h(), '_h()')
1115 self.assertEqual(bar._callmethod('f'), 'f()')
1116 self.assertEqual(bar._callmethod('_h'), '_h()')
1117
1118 self.assertEqual(list(baz), [i*i for i in range(10)])
1119
1120 manager.shutdown()
1121
1122#
1123# Test of connecting to a remote server and using xmlrpclib for serialization
1124#
1125
1126_queue = Queue.Queue()
1127def get_queue():
1128 return _queue
1129
1130class QueueManager(BaseManager):
1131 '''manager class used by server process'''
1132QueueManager.register('get_queue', callable=get_queue)
1133
1134class QueueManager2(BaseManager):
1135 '''manager class which specifies the same interface as QueueManager'''
1136QueueManager2.register('get_queue')
1137
1138
1139SERIALIZER = 'xmlrpclib'
1140
1141class _TestRemoteManager(BaseTestCase):
1142
1143 ALLOWED_TYPES = ('manager',)
1144
1145 def _putter(self, address, authkey):
1146 manager = QueueManager2(
1147 address=address, authkey=authkey, serializer=SERIALIZER
1148 )
1149 manager.connect()
1150 queue = manager.get_queue()
1151 queue.put(('hello world', None, True, 2.25))
1152
1153 def test_remote(self):
1154 authkey = os.urandom(32)
1155
1156 manager = QueueManager(
1157 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1158 )
1159 manager.start()
1160
1161 p = self.Process(target=self._putter, args=(manager.address, authkey))
1162 p.start()
1163
1164 manager2 = QueueManager2(
1165 address=manager.address, authkey=authkey, serializer=SERIALIZER
1166 )
1167 manager2.connect()
1168 queue = manager2.get_queue()
1169
1170 # Note that xmlrpclib will deserialize object as a list not a tuple
1171 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1172
1173 # Because we are using xmlrpclib for serialization instead of
1174 # pickle this will cause a serialization error.
1175 self.assertRaises(Exception, queue.put, time.sleep)
1176
1177 # Make queue finalizer run before the server is stopped
1178 del queue
1179 manager.shutdown()
1180
1181#
1182#
1183#
1184
1185SENTINEL = latin('')
1186
1187class _TestConnection(BaseTestCase):
1188
1189 ALLOWED_TYPES = ('processes', 'threads')
1190
1191 def _echo(self, conn):
1192 for msg in iter(conn.recv_bytes, SENTINEL):
1193 conn.send_bytes(msg)
1194 conn.close()
1195
1196 def test_connection(self):
1197 conn, child_conn = self.Pipe()
1198
1199 p = self.Process(target=self._echo, args=(child_conn,))
1200 p.set_daemon(True)
1201 p.start()
1202
1203 seq = [1, 2.25, None]
1204 msg = latin('hello world')
1205 longmsg = msg * 10
1206 arr = array.array('i', range(4))
1207
1208 if self.TYPE == 'processes':
1209 self.assertEqual(type(conn.fileno()), int)
1210
1211 self.assertEqual(conn.send(seq), None)
1212 self.assertEqual(conn.recv(), seq)
1213
1214 self.assertEqual(conn.send_bytes(msg), None)
1215 self.assertEqual(conn.recv_bytes(), msg)
1216
1217 if self.TYPE == 'processes':
1218 buffer = array.array('i', [0]*10)
1219 expected = list(arr) + [0] * (10 - len(arr))
1220 self.assertEqual(conn.send_bytes(arr), None)
1221 self.assertEqual(conn.recv_bytes_into(buffer),
1222 len(arr) * buffer.itemsize)
1223 self.assertEqual(list(buffer), expected)
1224
1225 buffer = array.array('i', [0]*10)
1226 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1227 self.assertEqual(conn.send_bytes(arr), None)
1228 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1229 len(arr) * buffer.itemsize)
1230 self.assertEqual(list(buffer), expected)
1231
1232 buffer = bytearray(latin(' ' * 40))
1233 self.assertEqual(conn.send_bytes(longmsg), None)
1234 try:
1235 res = conn.recv_bytes_into(buffer)
1236 except multiprocessing.BufferTooShort, e:
1237 self.assertEqual(e.args, (longmsg,))
1238 else:
1239 self.fail('expected BufferTooShort, got %s' % res)
1240
1241 poll = TimingWrapper(conn.poll)
1242
1243 self.assertEqual(poll(), False)
1244 self.assertTimingAlmostEqual(poll.elapsed, 0)
1245
1246 self.assertEqual(poll(TIMEOUT1), False)
1247 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1248
1249 conn.send(None)
1250
1251 self.assertEqual(poll(TIMEOUT1), True)
1252 self.assertTimingAlmostEqual(poll.elapsed, 0)
1253
1254 self.assertEqual(conn.recv(), None)
1255
1256 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1257 conn.send_bytes(really_big_msg)
1258 self.assertEqual(conn.recv_bytes(), really_big_msg)
1259
1260 conn.send_bytes(SENTINEL) # tell child to quit
1261 child_conn.close()
1262
1263 if self.TYPE == 'processes':
1264 self.assertEqual(conn.readable, True)
1265 self.assertEqual(conn.writable, True)
1266 self.assertRaises(EOFError, conn.recv)
1267 self.assertRaises(EOFError, conn.recv_bytes)
1268
1269 p.join()
1270
1271 def test_duplex_false(self):
1272 reader, writer = self.Pipe(duplex=False)
1273 self.assertEqual(writer.send(1), None)
1274 self.assertEqual(reader.recv(), 1)
1275 if self.TYPE == 'processes':
1276 self.assertEqual(reader.readable, True)
1277 self.assertEqual(reader.writable, False)
1278 self.assertEqual(writer.readable, False)
1279 self.assertEqual(writer.writable, True)
1280 self.assertRaises(IOError, reader.send, 2)
1281 self.assertRaises(IOError, writer.recv)
1282 self.assertRaises(IOError, writer.poll)
1283
1284 def test_spawn_close(self):
1285 # We test that a pipe connection can be closed by parent
1286 # process immediately after child is spawned. On Windows this
1287 # would have sometimes failed on old versions because
1288 # child_conn would be closed before the child got a chance to
1289 # duplicate it.
1290 conn, child_conn = self.Pipe()
1291
1292 p = self.Process(target=self._echo, args=(child_conn,))
1293 p.start()
1294 child_conn.close() # this might complete before child initializes
1295
1296 msg = latin('hello')
1297 conn.send_bytes(msg)
1298 self.assertEqual(conn.recv_bytes(), msg)
1299
1300 conn.send_bytes(SENTINEL)
1301 conn.close()
1302 p.join()
1303
1304 def test_sendbytes(self):
1305 if self.TYPE != 'processes':
1306 return
1307
1308 msg = latin('abcdefghijklmnopqrstuvwxyz')
1309 a, b = self.Pipe()
1310
1311 a.send_bytes(msg)
1312 self.assertEqual(b.recv_bytes(), msg)
1313
1314 a.send_bytes(msg, 5)
1315 self.assertEqual(b.recv_bytes(), msg[5:])
1316
1317 a.send_bytes(msg, 7, 8)
1318 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1319
1320 a.send_bytes(msg, 26)
1321 self.assertEqual(b.recv_bytes(), latin(''))
1322
1323 a.send_bytes(msg, 26, 0)
1324 self.assertEqual(b.recv_bytes(), latin(''))
1325
1326 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1327
1328 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1329
1330 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1331
1332 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1333
1334 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1335
Benjamin Petersondfd79492008-06-13 19:13:39 +00001336class _TestListenerClient(BaseTestCase):
1337
1338 ALLOWED_TYPES = ('processes', 'threads')
1339
1340 def _test(self, address):
1341 conn = self.connection.Client(address)
1342 conn.send('hello')
1343 conn.close()
1344
1345 def test_listener_client(self):
1346 for family in self.connection.families:
1347 l = self.connection.Listener(family=family)
1348 p = self.Process(target=self._test, args=(l.address,))
1349 p.set_daemon(True)
1350 p.start()
1351 conn = l.accept()
1352 self.assertEqual(conn.recv(), 'hello')
1353 p.join()
1354 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001355#
1356# Test of sending connection and socket objects between processes
1357#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001358"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001359class _TestPicklingConnections(BaseTestCase):
1360
1361 ALLOWED_TYPES = ('processes',)
1362
1363 def _listener(self, conn, families):
1364 for fam in families:
1365 l = self.connection.Listener(family=fam)
1366 conn.send(l.address)
1367 new_conn = l.accept()
1368 conn.send(new_conn)
1369
1370 if self.TYPE == 'processes':
1371 l = socket.socket()
1372 l.bind(('localhost', 0))
1373 conn.send(l.getsockname())
1374 l.listen(1)
1375 new_conn, addr = l.accept()
1376 conn.send(new_conn)
1377
1378 conn.recv()
1379
1380 def _remote(self, conn):
1381 for (address, msg) in iter(conn.recv, None):
1382 client = self.connection.Client(address)
1383 client.send(msg.upper())
1384 client.close()
1385
1386 if self.TYPE == 'processes':
1387 address, msg = conn.recv()
1388 client = socket.socket()
1389 client.connect(address)
1390 client.sendall(msg.upper())
1391 client.close()
1392
1393 conn.close()
1394
1395 def test_pickling(self):
1396 try:
1397 multiprocessing.allow_connection_pickling()
1398 except ImportError:
1399 return
1400
1401 families = self.connection.families
1402
1403 lconn, lconn0 = self.Pipe()
1404 lp = self.Process(target=self._listener, args=(lconn0, families))
1405 lp.start()
1406 lconn0.close()
1407
1408 rconn, rconn0 = self.Pipe()
1409 rp = self.Process(target=self._remote, args=(rconn0,))
1410 rp.start()
1411 rconn0.close()
1412
1413 for fam in families:
1414 msg = ('This connection uses family %s' % fam).encode('ascii')
1415 address = lconn.recv()
1416 rconn.send((address, msg))
1417 new_conn = lconn.recv()
1418 self.assertEqual(new_conn.recv(), msg.upper())
1419
1420 rconn.send(None)
1421
1422 if self.TYPE == 'processes':
1423 msg = latin('This connection uses a normal socket')
1424 address = lconn.recv()
1425 rconn.send((address, msg))
1426 if hasattr(socket, 'fromfd'):
1427 new_conn = lconn.recv()
1428 self.assertEqual(new_conn.recv(100), msg.upper())
1429 else:
1430 # XXX On Windows with Py2.6 need to backport fromfd()
1431 discard = lconn.recv_bytes()
1432
1433 lconn.send(None)
1434
1435 rconn.close()
1436 lconn.close()
1437
1438 lp.join()
1439 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001440"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001441#
1442#
1443#
1444
1445class _TestHeap(BaseTestCase):
1446
1447 ALLOWED_TYPES = ('processes',)
1448
1449 def test_heap(self):
1450 iterations = 5000
1451 maxblocks = 50
1452 blocks = []
1453
1454 # create and destroy lots of blocks of different sizes
1455 for i in xrange(iterations):
1456 size = int(random.lognormvariate(0, 1) * 1000)
1457 b = multiprocessing.heap.BufferWrapper(size)
1458 blocks.append(b)
1459 if len(blocks) > maxblocks:
1460 i = random.randrange(maxblocks)
1461 del blocks[i]
1462
1463 # get the heap object
1464 heap = multiprocessing.heap.BufferWrapper._heap
1465
1466 # verify the state of the heap
1467 all = []
1468 occupied = 0
1469 for L in heap._len_to_seq.values():
1470 for arena, start, stop in L:
1471 all.append((heap._arenas.index(arena), start, stop,
1472 stop-start, 'free'))
1473 for arena, start, stop in heap._allocated_blocks:
1474 all.append((heap._arenas.index(arena), start, stop,
1475 stop-start, 'occupied'))
1476 occupied += (stop-start)
1477
1478 all.sort()
1479
1480 for i in range(len(all)-1):
1481 (arena, start, stop) = all[i][:3]
1482 (narena, nstart, nstop) = all[i+1][:3]
1483 self.assertTrue((arena != narena and nstart == 0) or
1484 (stop == nstart))
1485
1486#
1487#
1488#
1489
1490try:
1491 from ctypes import Structure, Value, copy, c_int, c_double
1492except ImportError:
1493 Structure = object
1494 c_int = c_double = None
1495
1496class _Foo(Structure):
1497 _fields_ = [
1498 ('x', c_int),
1499 ('y', c_double)
1500 ]
1501
1502class _TestSharedCTypes(BaseTestCase):
1503
1504 ALLOWED_TYPES = ('processes',)
1505
1506 def _double(self, x, y, foo, arr, string):
1507 x.value *= 2
1508 y.value *= 2
1509 foo.x *= 2
1510 foo.y *= 2
1511 string.value *= 2
1512 for i in range(len(arr)):
1513 arr[i] *= 2
1514
1515 def test_sharedctypes(self, lock=False):
1516 if c_int is None:
1517 return
1518
1519 x = Value('i', 7, lock=lock)
1520 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1521 foo = Value(_Foo, 3, 2, lock=lock)
1522 arr = Array('d', range(10), lock=lock)
1523 string = Array('c', 20, lock=lock)
1524 string.value = 'hello'
1525
1526 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1527 p.start()
1528 p.join()
1529
1530 self.assertEqual(x.value, 14)
1531 self.assertAlmostEqual(y.value, 2.0/3.0)
1532 self.assertEqual(foo.x, 6)
1533 self.assertAlmostEqual(foo.y, 4.0)
1534 for i in range(10):
1535 self.assertAlmostEqual(arr[i], i*2)
1536 self.assertEqual(string.value, latin('hellohello'))
1537
1538 def test_synchronize(self):
1539 self.test_sharedctypes(lock=True)
1540
1541 def test_copy(self):
1542 if c_int is None:
1543 return
1544
1545 foo = _Foo(2, 5.0)
1546 bar = copy(foo)
1547 foo.x = 0
1548 foo.y = 0
1549 self.assertEqual(bar.x, 2)
1550 self.assertAlmostEqual(bar.y, 5.0)
1551
1552#
1553#
1554#
1555
1556class _TestFinalize(BaseTestCase):
1557
1558 ALLOWED_TYPES = ('processes',)
1559
1560 def _test_finalize(self, conn):
1561 class Foo(object):
1562 pass
1563
1564 a = Foo()
1565 util.Finalize(a, conn.send, args=('a',))
1566 del a # triggers callback for a
1567
1568 b = Foo()
1569 close_b = util.Finalize(b, conn.send, args=('b',))
1570 close_b() # triggers callback for b
1571 close_b() # does nothing because callback has already been called
1572 del b # does nothing because callback has already been called
1573
1574 c = Foo()
1575 util.Finalize(c, conn.send, args=('c',))
1576
1577 d10 = Foo()
1578 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1579
1580 d01 = Foo()
1581 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1582 d02 = Foo()
1583 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1584 d03 = Foo()
1585 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1586
1587 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1588
1589 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1590
1591 # call mutliprocessing's cleanup function then exit process without
1592 # garbage collecting locals
1593 util._exit_function()
1594 conn.close()
1595 os._exit(0)
1596
1597 def test_finalize(self):
1598 conn, child_conn = self.Pipe()
1599
1600 p = self.Process(target=self._test_finalize, args=(child_conn,))
1601 p.start()
1602 p.join()
1603
1604 result = [obj for obj in iter(conn.recv, 'STOP')]
1605 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1606
1607#
1608# Test that from ... import * works for each module
1609#
1610
1611class _TestImportStar(BaseTestCase):
1612
1613 ALLOWED_TYPES = ('processes',)
1614
1615 def test_import(self):
1616 modules = (
1617 'multiprocessing', 'multiprocessing.connection',
1618 'multiprocessing.heap', 'multiprocessing.managers',
1619 'multiprocessing.pool', 'multiprocessing.process',
1620 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1621 'multiprocessing.synchronize', 'multiprocessing.util'
1622 )
1623
1624 for name in modules:
1625 __import__(name)
1626 mod = sys.modules[name]
1627
1628 for attr in getattr(mod, '__all__', ()):
1629 self.assertTrue(
1630 hasattr(mod, attr),
1631 '%r does not have attribute %r' % (mod, attr)
1632 )
1633
1634#
1635# Quick test that logging works -- does not test logging output
1636#
1637
1638class _TestLogging(BaseTestCase):
1639
1640 ALLOWED_TYPES = ('processes',)
1641
1642 def test_enable_logging(self):
1643 logger = multiprocessing.get_logger()
1644 logger.setLevel(util.SUBWARNING)
1645 self.assertTrue(logger is not None)
1646 logger.debug('this will not be printed')
1647 logger.info('nor will this')
1648 logger.setLevel(LOG_LEVEL)
1649
1650 def _test_level(self, conn):
1651 logger = multiprocessing.get_logger()
1652 conn.send(logger.getEffectiveLevel())
1653
1654 def test_level(self):
1655 LEVEL1 = 32
1656 LEVEL2 = 37
1657
1658 logger = multiprocessing.get_logger()
1659 root_logger = logging.getLogger()
1660 root_level = root_logger.level
1661
1662 reader, writer = multiprocessing.Pipe(duplex=False)
1663
1664 logger.setLevel(LEVEL1)
1665 self.Process(target=self._test_level, args=(writer,)).start()
1666 self.assertEqual(LEVEL1, reader.recv())
1667
1668 logger.setLevel(logging.NOTSET)
1669 root_logger.setLevel(LEVEL2)
1670 self.Process(target=self._test_level, args=(writer,)).start()
1671 self.assertEqual(LEVEL2, reader.recv())
1672
1673 root_logger.setLevel(root_level)
1674 logger.setLevel(level=LOG_LEVEL)
1675
1676#
1677# Functions used to create test cases from the base ones in this module
1678#
1679
1680def get_attributes(Source, names):
1681 d = {}
1682 for name in names:
1683 obj = getattr(Source, name)
1684 if type(obj) == type(get_attributes):
1685 obj = staticmethod(obj)
1686 d[name] = obj
1687 return d
1688
1689def create_test_cases(Mixin, type):
1690 result = {}
1691 glob = globals()
1692 Type = type[0].upper() + type[1:]
1693
1694 for name in glob.keys():
1695 if name.startswith('_Test'):
1696 base = glob[name]
1697 if type in base.ALLOWED_TYPES:
1698 newname = 'With' + Type + name[1:]
1699 class Temp(base, unittest.TestCase, Mixin):
1700 pass
1701 result[newname] = Temp
1702 Temp.__name__ = newname
1703 Temp.__module__ = Mixin.__module__
1704 return result
1705
1706#
1707# Create test cases
1708#
1709
1710class ProcessesMixin(object):
1711 TYPE = 'processes'
1712 Process = multiprocessing.Process
1713 locals().update(get_attributes(multiprocessing, (
1714 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1715 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1716 'RawArray', 'current_process', 'active_children', 'Pipe',
1717 'connection', 'JoinableQueue'
1718 )))
1719
1720testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1721globals().update(testcases_processes)
1722
1723
1724class ManagerMixin(object):
1725 TYPE = 'manager'
1726 Process = multiprocessing.Process
1727 manager = object.__new__(multiprocessing.managers.SyncManager)
1728 locals().update(get_attributes(manager, (
1729 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1730 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1731 'Namespace', 'JoinableQueue'
1732 )))
1733
1734testcases_manager = create_test_cases(ManagerMixin, type='manager')
1735globals().update(testcases_manager)
1736
1737
1738class ThreadsMixin(object):
1739 TYPE = 'threads'
1740 Process = multiprocessing.dummy.Process
1741 locals().update(get_attributes(multiprocessing.dummy, (
1742 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1743 'Condition', 'Event', 'Value', 'Array', 'current_process',
1744 'active_children', 'Pipe', 'connection', 'dict', 'list',
1745 'Namespace', 'JoinableQueue'
1746 )))
1747
1748testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1749globals().update(testcases_threads)
1750
1751#
1752#
1753#
1754
1755def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001756 if sys.platform.startswith("linux"):
1757 try:
1758 lock = multiprocessing.RLock()
1759 except OSError:
1760 from test.test_support import TestSkipped
1761 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001762
Benjamin Petersondfd79492008-06-13 19:13:39 +00001763 if run is None:
1764 from test.test_support import run_unittest as run
1765
1766 util.get_temp_dir() # creates temp directory for use by all processes
1767
1768 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1769
Jesse Noller146b7ab2008-07-02 16:44:09 +00001770 ProcessesMixin.pool = multiprocessing.Pool(4)
1771 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1772 ManagerMixin.manager.__init__()
1773 ManagerMixin.manager.start()
1774 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001775
1776 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001777 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1778 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1779 sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001780 )
1781
1782 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1783 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1784 run(suite)
1785
Jesse Noller146b7ab2008-07-02 16:44:09 +00001786 ThreadsMixin.pool.terminate()
1787 ProcessesMixin.pool.terminate()
1788 ManagerMixin.pool.terminate()
1789 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001790
Jesse Noller146b7ab2008-07-02 16:44:09 +00001791 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001792
1793def main():
1794 test_main(unittest.TextTestRunner(verbosity=2).run)
1795
1796if __name__ == '__main__':
1797 main()