blob: c3b1c82662ac9e050e3cdda554a1616cf526bcdb [file] [log] [blame]
Jesse Noller76cf55f2008-07-02 16:56:51 +00001#!/usr/bin/env python
2
Benjamin Petersondfd79492008-06-13 19:13:39 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import Queue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
21import multiprocessing.dummy
22import multiprocessing.connection
23import multiprocessing.managers
24import multiprocessing.heap
Benjamin Petersondfd79492008-06-13 19:13:39 +000025import multiprocessing.pool
26import _multiprocessing
27
28from multiprocessing import util
29
30#
31#
32#
33
Benjamin Petersone79edf52008-07-13 18:34:58 +000034latin = str
Benjamin Petersondfd79492008-06-13 19:13:39 +000035
Benjamin Petersondfd79492008-06-13 19:13:39 +000036#
37# Constants
38#
39
40LOG_LEVEL = util.SUBWARNING
41#LOG_LEVEL = logging.WARNING
42
43DELTA = 0.1
44CHECK_TIMINGS = False # making true makes tests take a lot longer
45 # and can sometimes cause some non-serious
46 # failures because some calls block a bit
47 # longer than expected
48if CHECK_TIMINGS:
49 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
50else:
51 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
52
53HAVE_GETVALUE = not getattr(_multiprocessing,
54 'HAVE_BROKEN_SEM_GETVALUE', False)
55
56#
57# Creates a wrapper for a function which records the time it takes to finish
58#
59
60class TimingWrapper(object):
61
62 def __init__(self, func):
63 self.func = func
64 self.elapsed = None
65
66 def __call__(self, *args, **kwds):
67 t = time.time()
68 try:
69 return self.func(*args, **kwds)
70 finally:
71 self.elapsed = time.time() - t
72
73#
74# Base class for test cases
75#
76
77class BaseTestCase(object):
78
79 ALLOWED_TYPES = ('processes', 'manager', 'threads')
80
81 def assertTimingAlmostEqual(self, a, b):
82 if CHECK_TIMINGS:
83 self.assertAlmostEqual(a, b, 1)
84
85 def assertReturnsIfImplemented(self, value, func, *args):
86 try:
87 res = func(*args)
88 except NotImplementedError:
89 pass
90 else:
91 return self.assertEqual(value, res)
92
93#
94# Return the value of a semaphore
95#
96
97def get_value(self):
98 try:
99 return self.get_value()
100 except AttributeError:
101 try:
102 return self._Semaphore__value
103 except AttributeError:
104 try:
105 return self._value
106 except AttributeError:
107 raise NotImplementedError
108
109#
110# Testcases
111#
112
113class _TestProcess(BaseTestCase):
114
115 ALLOWED_TYPES = ('processes', 'threads')
116
117 def test_current(self):
118 if self.TYPE == 'threads':
119 return
120
121 current = self.current_process()
122 authkey = current.get_authkey()
123
124 self.assertTrue(current.is_alive())
125 self.assertTrue(not current.is_daemon())
126 self.assertTrue(isinstance(authkey, bytes))
127 self.assertTrue(len(authkey) > 0)
128 self.assertEqual(current.get_ident(), os.getpid())
129 self.assertEqual(current.get_exitcode(), None)
130
131 def _test(self, q, *args, **kwds):
132 current = self.current_process()
133 q.put(args)
134 q.put(kwds)
135 q.put(current.get_name())
136 if self.TYPE != 'threads':
137 q.put(bytes(current.get_authkey()))
138 q.put(current.pid)
139
140 def test_process(self):
141 q = self.Queue(1)
142 e = self.Event()
143 args = (q, 1, 2)
144 kwargs = {'hello':23, 'bye':2.54}
145 name = 'SomeProcess'
146 p = self.Process(
147 target=self._test, args=args, kwargs=kwargs, name=name
148 )
149 p.set_daemon(True)
150 current = self.current_process()
151
152 if self.TYPE != 'threads':
153 self.assertEquals(p.get_authkey(), current.get_authkey())
154 self.assertEquals(p.is_alive(), False)
155 self.assertEquals(p.is_daemon(), True)
156 self.assertTrue(p not in self.active_children())
157 self.assertTrue(type(self.active_children()) is list)
158 self.assertEqual(p.get_exitcode(), None)
159
160 p.start()
161
162 self.assertEquals(p.get_exitcode(), None)
163 self.assertEquals(p.is_alive(), True)
164 self.assertTrue(p in self.active_children())
165
166 self.assertEquals(q.get(), args[1:])
167 self.assertEquals(q.get(), kwargs)
168 self.assertEquals(q.get(), p.get_name())
169 if self.TYPE != 'threads':
170 self.assertEquals(q.get(), current.get_authkey())
171 self.assertEquals(q.get(), p.pid)
172
173 p.join()
174
175 self.assertEquals(p.get_exitcode(), 0)
176 self.assertEquals(p.is_alive(), False)
177 self.assertTrue(p not in self.active_children())
178
179 def _test_terminate(self):
180 time.sleep(1000)
181
182 def test_terminate(self):
183 if self.TYPE == 'threads':
184 return
185
186 p = self.Process(target=self._test_terminate)
187 p.set_daemon(True)
188 p.start()
189
190 self.assertEqual(p.is_alive(), True)
191 self.assertTrue(p in self.active_children())
192 self.assertEqual(p.get_exitcode(), None)
193
194 p.terminate()
195
196 join = TimingWrapper(p.join)
197 self.assertEqual(join(), None)
198 self.assertTimingAlmostEqual(join.elapsed, 0.0)
199
200 self.assertEqual(p.is_alive(), False)
201 self.assertTrue(p not in self.active_children())
202
203 p.join()
204
205 # XXX sometimes get p.get_exitcode() == 0 on Windows ...
206 #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
207
208 def test_cpu_count(self):
209 try:
210 cpus = multiprocessing.cpu_count()
211 except NotImplementedError:
212 cpus = 1
213 self.assertTrue(type(cpus) is int)
214 self.assertTrue(cpus >= 1)
215
216 def test_active_children(self):
217 self.assertEqual(type(self.active_children()), list)
218
219 p = self.Process(target=time.sleep, args=(DELTA,))
220 self.assertTrue(p not in self.active_children())
221
222 p.start()
223 self.assertTrue(p in self.active_children())
224
225 p.join()
226 self.assertTrue(p not in self.active_children())
227
228 def _test_recursion(self, wconn, id):
229 from multiprocessing import forking
230 wconn.send(id)
231 if len(id) < 2:
232 for i in range(2):
233 p = self.Process(
234 target=self._test_recursion, args=(wconn, id+[i])
235 )
236 p.start()
237 p.join()
238
239 def test_recursion(self):
240 rconn, wconn = self.Pipe(duplex=False)
241 self._test_recursion(wconn, [])
242
243 time.sleep(DELTA)
244 result = []
245 while rconn.poll():
246 result.append(rconn.recv())
247
248 expected = [
249 [],
250 [0],
251 [0, 0],
252 [0, 1],
253 [1],
254 [1, 0],
255 [1, 1]
256 ]
257 self.assertEqual(result, expected)
258
259#
260#
261#
262
263class _UpperCaser(multiprocessing.Process):
264
265 def __init__(self):
266 multiprocessing.Process.__init__(self)
267 self.child_conn, self.parent_conn = multiprocessing.Pipe()
268
269 def run(self):
270 self.parent_conn.close()
271 for s in iter(self.child_conn.recv, None):
272 self.child_conn.send(s.upper())
273 self.child_conn.close()
274
275 def submit(self, s):
276 assert type(s) is str
277 self.parent_conn.send(s)
278 return self.parent_conn.recv()
279
280 def stop(self):
281 self.parent_conn.send(None)
282 self.parent_conn.close()
283 self.child_conn.close()
284
285class _TestSubclassingProcess(BaseTestCase):
286
287 ALLOWED_TYPES = ('processes',)
288
289 def test_subclassing(self):
290 uppercaser = _UpperCaser()
291 uppercaser.start()
292 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
293 self.assertEqual(uppercaser.submit('world'), 'WORLD')
294 uppercaser.stop()
295 uppercaser.join()
296
297#
298#
299#
300
301def queue_empty(q):
302 if hasattr(q, 'empty'):
303 return q.empty()
304 else:
305 return q.qsize() == 0
306
307def queue_full(q, maxsize):
308 if hasattr(q, 'full'):
309 return q.full()
310 else:
311 return q.qsize() == maxsize
312
313
314class _TestQueue(BaseTestCase):
315
316
317 def _test_put(self, queue, child_can_start, parent_can_continue):
318 child_can_start.wait()
319 for i in range(6):
320 queue.get()
321 parent_can_continue.set()
322
323 def test_put(self):
324 MAXSIZE = 6
325 queue = self.Queue(maxsize=MAXSIZE)
326 child_can_start = self.Event()
327 parent_can_continue = self.Event()
328
329 proc = self.Process(
330 target=self._test_put,
331 args=(queue, child_can_start, parent_can_continue)
332 )
333 proc.set_daemon(True)
334 proc.start()
335
336 self.assertEqual(queue_empty(queue), True)
337 self.assertEqual(queue_full(queue, MAXSIZE), False)
338
339 queue.put(1)
340 queue.put(2, True)
341 queue.put(3, True, None)
342 queue.put(4, False)
343 queue.put(5, False, None)
344 queue.put_nowait(6)
345
346 # the values may be in buffer but not yet in pipe so sleep a bit
347 time.sleep(DELTA)
348
349 self.assertEqual(queue_empty(queue), False)
350 self.assertEqual(queue_full(queue, MAXSIZE), True)
351
352 put = TimingWrapper(queue.put)
353 put_nowait = TimingWrapper(queue.put_nowait)
354
355 self.assertRaises(Queue.Full, put, 7, False)
356 self.assertTimingAlmostEqual(put.elapsed, 0)
357
358 self.assertRaises(Queue.Full, put, 7, False, None)
359 self.assertTimingAlmostEqual(put.elapsed, 0)
360
361 self.assertRaises(Queue.Full, put_nowait, 7)
362 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
363
364 self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
365 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
366
367 self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
368 self.assertTimingAlmostEqual(put.elapsed, 0)
369
370 self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
371 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
372
373 child_can_start.set()
374 parent_can_continue.wait()
375
376 self.assertEqual(queue_empty(queue), True)
377 self.assertEqual(queue_full(queue, MAXSIZE), False)
378
379 proc.join()
380
381 def _test_get(self, queue, child_can_start, parent_can_continue):
382 child_can_start.wait()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000383 #queue.put(1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000384 queue.put(2)
385 queue.put(3)
386 queue.put(4)
387 queue.put(5)
388 parent_can_continue.set()
389
390 def test_get(self):
391 queue = self.Queue()
392 child_can_start = self.Event()
393 parent_can_continue = self.Event()
394
395 proc = self.Process(
396 target=self._test_get,
397 args=(queue, child_can_start, parent_can_continue)
398 )
399 proc.set_daemon(True)
400 proc.start()
401
402 self.assertEqual(queue_empty(queue), True)
403
404 child_can_start.set()
405 parent_can_continue.wait()
406
407 time.sleep(DELTA)
408 self.assertEqual(queue_empty(queue), False)
409
Benjamin Petersonda3a1b12008-06-16 20:52:48 +0000410 # Hangs unexpectedly, remove for now
411 #self.assertEqual(queue.get(), 1)
Benjamin Petersondfd79492008-06-13 19:13:39 +0000412 self.assertEqual(queue.get(True, None), 2)
413 self.assertEqual(queue.get(True), 3)
414 self.assertEqual(queue.get(timeout=1), 4)
415 self.assertEqual(queue.get_nowait(), 5)
416
417 self.assertEqual(queue_empty(queue), True)
418
419 get = TimingWrapper(queue.get)
420 get_nowait = TimingWrapper(queue.get_nowait)
421
422 self.assertRaises(Queue.Empty, get, False)
423 self.assertTimingAlmostEqual(get.elapsed, 0)
424
425 self.assertRaises(Queue.Empty, get, False, None)
426 self.assertTimingAlmostEqual(get.elapsed, 0)
427
428 self.assertRaises(Queue.Empty, get_nowait)
429 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
430
431 self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
432 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
433
434 self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
438 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
439
440 proc.join()
441
442 def _test_fork(self, queue):
443 for i in range(10, 20):
444 queue.put(i)
445 # note that at this point the items may only be buffered, so the
446 # process cannot shutdown until the feeder thread has finished
447 # pushing items onto the pipe.
448
449 def test_fork(self):
450 # Old versions of Queue would fail to create a new feeder
451 # thread for a forked process if the original process had its
452 # own feeder thread. This test checks that this no longer
453 # happens.
454
455 queue = self.Queue()
456
457 # put items on queue so that main process starts a feeder thread
458 for i in range(10):
459 queue.put(i)
460
461 # wait to make sure thread starts before we fork a new process
462 time.sleep(DELTA)
463
464 # fork process
465 p = self.Process(target=self._test_fork, args=(queue,))
466 p.start()
467
468 # check that all expected items are in the queue
469 for i in range(20):
470 self.assertEqual(queue.get(), i)
471 self.assertRaises(Queue.Empty, queue.get, False)
472
473 p.join()
474
475 def test_qsize(self):
476 q = self.Queue()
477 try:
478 self.assertEqual(q.qsize(), 0)
479 except NotImplementedError:
480 return
481 q.put(1)
482 self.assertEqual(q.qsize(), 1)
483 q.put(5)
484 self.assertEqual(q.qsize(), 2)
485 q.get()
486 self.assertEqual(q.qsize(), 1)
487 q.get()
488 self.assertEqual(q.qsize(), 0)
489
490 def _test_task_done(self, q):
491 for obj in iter(q.get, None):
492 time.sleep(DELTA)
493 q.task_done()
494
495 def test_task_done(self):
496 queue = self.JoinableQueue()
497
498 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
499 return
500
501 workers = [self.Process(target=self._test_task_done, args=(queue,))
502 for i in xrange(4)]
503
504 for p in workers:
505 p.start()
506
507 for i in xrange(10):
508 queue.put(i)
509
510 queue.join()
511
512 for p in workers:
513 queue.put(None)
514
515 for p in workers:
516 p.join()
517
518#
519#
520#
521
522class _TestLock(BaseTestCase):
523
524 def test_lock(self):
525 lock = self.Lock()
526 self.assertEqual(lock.acquire(), True)
527 self.assertEqual(lock.acquire(False), False)
528 self.assertEqual(lock.release(), None)
529 self.assertRaises((ValueError, threading.ThreadError), lock.release)
530
531 def test_rlock(self):
532 lock = self.RLock()
533 self.assertEqual(lock.acquire(), True)
534 self.assertEqual(lock.acquire(), True)
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.release(), None)
537 self.assertEqual(lock.release(), None)
538 self.assertEqual(lock.release(), None)
539 self.assertRaises((AssertionError, RuntimeError), lock.release)
540
541
542class _TestSemaphore(BaseTestCase):
543
544 def _test_semaphore(self, sem):
545 self.assertReturnsIfImplemented(2, get_value, sem)
546 self.assertEqual(sem.acquire(), True)
547 self.assertReturnsIfImplemented(1, get_value, sem)
548 self.assertEqual(sem.acquire(), True)
549 self.assertReturnsIfImplemented(0, get_value, sem)
550 self.assertEqual(sem.acquire(False), False)
551 self.assertReturnsIfImplemented(0, get_value, sem)
552 self.assertEqual(sem.release(), None)
553 self.assertReturnsIfImplemented(1, get_value, sem)
554 self.assertEqual(sem.release(), None)
555 self.assertReturnsIfImplemented(2, get_value, sem)
556
557 def test_semaphore(self):
558 sem = self.Semaphore(2)
559 self._test_semaphore(sem)
560 self.assertEqual(sem.release(), None)
561 self.assertReturnsIfImplemented(3, get_value, sem)
562 self.assertEqual(sem.release(), None)
563 self.assertReturnsIfImplemented(4, get_value, sem)
564
565 def test_bounded_semaphore(self):
566 sem = self.BoundedSemaphore(2)
567 self._test_semaphore(sem)
568 # Currently fails on OS/X
569 #if HAVE_GETVALUE:
570 # self.assertRaises(ValueError, sem.release)
571 # self.assertReturnsIfImplemented(2, get_value, sem)
572
573 def test_timeout(self):
574 if self.TYPE != 'processes':
575 return
576
577 sem = self.Semaphore(0)
578 acquire = TimingWrapper(sem.acquire)
579
580 self.assertEqual(acquire(False), False)
581 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
582
583 self.assertEqual(acquire(False, None), False)
584 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
585
586 self.assertEqual(acquire(False, TIMEOUT1), False)
587 self.assertTimingAlmostEqual(acquire.elapsed, 0)
588
589 self.assertEqual(acquire(True, TIMEOUT2), False)
590 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
591
592 self.assertEqual(acquire(timeout=TIMEOUT3), False)
593 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
594
595
596class _TestCondition(BaseTestCase):
597
598 def f(self, cond, sleeping, woken, timeout=None):
599 cond.acquire()
600 sleeping.release()
601 cond.wait(timeout)
602 woken.release()
603 cond.release()
604
605 def check_invariant(self, cond):
606 # this is only supposed to succeed when there are no sleepers
607 if self.TYPE == 'processes':
608 try:
609 sleepers = (cond._sleeping_count.get_value() -
610 cond._woken_count.get_value())
611 self.assertEqual(sleepers, 0)
612 self.assertEqual(cond._wait_semaphore.get_value(), 0)
613 except NotImplementedError:
614 pass
615
616 def test_notify(self):
617 cond = self.Condition()
618 sleeping = self.Semaphore(0)
619 woken = self.Semaphore(0)
620
621 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson82aa2012008-08-18 18:31:58 +0000622 try:
623 p.set_daemon(True)
624 except AttributeError:
625 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000626 p.start()
627
628 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson82aa2012008-08-18 18:31:58 +0000629 try:
630 p.set_daemon(True)
631 except AttributeError:
632 p.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000633 p.start()
634
635 # wait for both children to start sleeping
636 sleeping.acquire()
637 sleeping.acquire()
638
639 # check no process/thread has woken up
640 time.sleep(DELTA)
641 self.assertReturnsIfImplemented(0, get_value, woken)
642
643 # wake up one process/thread
644 cond.acquire()
645 cond.notify()
646 cond.release()
647
648 # check one process/thread has woken up
649 time.sleep(DELTA)
650 self.assertReturnsIfImplemented(1, get_value, woken)
651
652 # wake up another
653 cond.acquire()
654 cond.notify()
655 cond.release()
656
657 # check other has woken up
658 time.sleep(DELTA)
659 self.assertReturnsIfImplemented(2, get_value, woken)
660
661 # check state is not mucked up
662 self.check_invariant(cond)
663 p.join()
664
665 def test_notify_all(self):
666 cond = self.Condition()
667 sleeping = self.Semaphore(0)
668 woken = self.Semaphore(0)
669
670 # start some threads/processes which will timeout
671 for i in range(3):
672 p = self.Process(target=self.f,
673 args=(cond, sleeping, woken, TIMEOUT1))
674 p.set_daemon(True)
675 p.start()
676
677 t = threading.Thread(target=self.f,
678 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000679 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000680 t.start()
681
682 # wait for them all to sleep
683 for i in xrange(6):
684 sleeping.acquire()
685
686 # check they have all timed out
687 for i in xrange(6):
688 woken.acquire()
689 self.assertReturnsIfImplemented(0, get_value, woken)
690
691 # check state is not mucked up
692 self.check_invariant(cond)
693
694 # start some more threads/processes
695 for i in range(3):
696 p = self.Process(target=self.f, args=(cond, sleeping, woken))
697 p.set_daemon(True)
698 p.start()
699
700 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersona9b22222008-08-18 18:01:43 +0000701 t.daemon = True
Benjamin Petersondfd79492008-06-13 19:13:39 +0000702 t.start()
703
704 # wait for them to all sleep
705 for i in xrange(6):
706 sleeping.acquire()
707
708 # check no process/thread has woken up
709 time.sleep(DELTA)
710 self.assertReturnsIfImplemented(0, get_value, woken)
711
712 # wake them all up
713 cond.acquire()
714 cond.notify_all()
715 cond.release()
716
717 # check they have all woken
718 time.sleep(DELTA)
719 self.assertReturnsIfImplemented(6, get_value, woken)
720
721 # check state is not mucked up
722 self.check_invariant(cond)
723
724 def test_timeout(self):
725 cond = self.Condition()
726 wait = TimingWrapper(cond.wait)
727 cond.acquire()
728 res = wait(TIMEOUT1)
729 cond.release()
730 self.assertEqual(res, None)
731 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
732
733
734class _TestEvent(BaseTestCase):
735
736 def _test_event(self, event):
737 time.sleep(TIMEOUT2)
738 event.set()
739
740 def test_event(self):
741 event = self.Event()
742 wait = TimingWrapper(event.wait)
743
744 # Removed temporaily, due to API shear, this does not
745 # work with threading._Event objects. is_set == isSet
746 #self.assertEqual(event.is_set(), False)
747
748 self.assertEqual(wait(0.0), None)
749 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
750 self.assertEqual(wait(TIMEOUT1), None)
751 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
752
753 event.set()
754
755 # See note above on the API differences
756 # self.assertEqual(event.is_set(), True)
757 self.assertEqual(wait(), None)
758 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
759 self.assertEqual(wait(TIMEOUT1), None)
760 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
761 # self.assertEqual(event.is_set(), True)
762
763 event.clear()
764
765 #self.assertEqual(event.is_set(), False)
766
767 self.Process(target=self._test_event, args=(event,)).start()
768 self.assertEqual(wait(), None)
769
770#
771#
772#
773
774class _TestValue(BaseTestCase):
775
776 codes_values = [
777 ('i', 4343, 24234),
778 ('d', 3.625, -4.25),
779 ('h', -232, 234),
780 ('c', latin('x'), latin('y'))
781 ]
782
783 def _test(self, values):
784 for sv, cv in zip(values, self.codes_values):
785 sv.value = cv[2]
786
787
788 def test_value(self, raw=False):
789 if self.TYPE != 'processes':
790 return
791
792 if raw:
793 values = [self.RawValue(code, value)
794 for code, value, _ in self.codes_values]
795 else:
796 values = [self.Value(code, value)
797 for code, value, _ in self.codes_values]
798
799 for sv, cv in zip(values, self.codes_values):
800 self.assertEqual(sv.value, cv[1])
801
802 proc = self.Process(target=self._test, args=(values,))
803 proc.start()
804 proc.join()
805
806 for sv, cv in zip(values, self.codes_values):
807 self.assertEqual(sv.value, cv[2])
808
809 def test_rawvalue(self):
810 self.test_value(raw=True)
811
812 def test_getobj_getlock(self):
813 if self.TYPE != 'processes':
814 return
815
816 val1 = self.Value('i', 5)
817 lock1 = val1.get_lock()
818 obj1 = val1.get_obj()
819
820 val2 = self.Value('i', 5, lock=None)
821 lock2 = val2.get_lock()
822 obj2 = val2.get_obj()
823
824 lock = self.Lock()
825 val3 = self.Value('i', 5, lock=lock)
826 lock3 = val3.get_lock()
827 obj3 = val3.get_obj()
828 self.assertEqual(lock, lock3)
829
830 arr4 = self.RawValue('i', 5)
831 self.assertFalse(hasattr(arr4, 'get_lock'))
832 self.assertFalse(hasattr(arr4, 'get_obj'))
833
834
835class _TestArray(BaseTestCase):
836
837 def f(self, seq):
838 for i in range(1, len(seq)):
839 seq[i] += seq[i-1]
840
841 def test_array(self, raw=False):
842 if self.TYPE != 'processes':
843 return
844
845 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
846 if raw:
847 arr = self.RawArray('i', seq)
848 else:
849 arr = self.Array('i', seq)
850
851 self.assertEqual(len(arr), len(seq))
852 self.assertEqual(arr[3], seq[3])
853 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
854
855 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
856
857 self.assertEqual(list(arr[:]), seq)
858
859 self.f(seq)
860
861 p = self.Process(target=self.f, args=(arr,))
862 p.start()
863 p.join()
864
865 self.assertEqual(list(arr[:]), seq)
866
867 def test_rawarray(self):
868 self.test_array(raw=True)
869
870 def test_getobj_getlock_obj(self):
871 if self.TYPE != 'processes':
872 return
873
874 arr1 = self.Array('i', range(10))
875 lock1 = arr1.get_lock()
876 obj1 = arr1.get_obj()
877
878 arr2 = self.Array('i', range(10), lock=None)
879 lock2 = arr2.get_lock()
880 obj2 = arr2.get_obj()
881
882 lock = self.Lock()
883 arr3 = self.Array('i', range(10), lock=lock)
884 lock3 = arr3.get_lock()
885 obj3 = arr3.get_obj()
886 self.assertEqual(lock, lock3)
887
888 arr4 = self.RawArray('i', range(10))
889 self.assertFalse(hasattr(arr4, 'get_lock'))
890 self.assertFalse(hasattr(arr4, 'get_obj'))
891
892#
893#
894#
895
896class _TestContainers(BaseTestCase):
897
898 ALLOWED_TYPES = ('manager',)
899
900 def test_list(self):
901 a = self.list(range(10))
902 self.assertEqual(a[:], range(10))
903
904 b = self.list()
905 self.assertEqual(b[:], [])
906
907 b.extend(range(5))
908 self.assertEqual(b[:], range(5))
909
910 self.assertEqual(b[2], 2)
911 self.assertEqual(b[2:10], [2,3,4])
912
913 b *= 2
914 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
915
916 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
917
918 self.assertEqual(a[:], range(10))
919
920 d = [a, b]
921 e = self.list(d)
922 self.assertEqual(
923 e[:],
924 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
925 )
926
927 f = self.list([a])
928 a.append('hello')
929 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
930
931 def test_dict(self):
932 d = self.dict()
933 indices = range(65, 70)
934 for i in indices:
935 d[i] = chr(i)
936 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
937 self.assertEqual(sorted(d.keys()), indices)
938 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
939 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
940
941 def test_namespace(self):
942 n = self.Namespace()
943 n.name = 'Bob'
944 n.job = 'Builder'
945 n._hidden = 'hidden'
946 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
947 del n.job
948 self.assertEqual(str(n), "Namespace(name='Bob')")
949 self.assertTrue(hasattr(n, 'name'))
950 self.assertTrue(not hasattr(n, 'job'))
951
952#
953#
954#
955
956def sqr(x, wait=0.0):
957 time.sleep(wait)
958 return x*x
Benjamin Petersondfd79492008-06-13 19:13:39 +0000959class _TestPool(BaseTestCase):
960
961 def test_apply(self):
962 papply = self.pool.apply
963 self.assertEqual(papply(sqr, (5,)), sqr(5))
964 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
965
966 def test_map(self):
967 pmap = self.pool.map
968 self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
969 self.assertEqual(pmap(sqr, range(100), chunksize=20),
970 map(sqr, range(100)))
971
972 def test_async(self):
973 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
974 get = TimingWrapper(res.get)
975 self.assertEqual(get(), 49)
976 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
977
978 def test_async_timeout(self):
979 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
980 get = TimingWrapper(res.get)
981 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
982 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
983
984 def test_imap(self):
985 it = self.pool.imap(sqr, range(10))
986 self.assertEqual(list(it), map(sqr, range(10)))
987
988 it = self.pool.imap(sqr, range(10))
989 for i in range(10):
990 self.assertEqual(it.next(), i*i)
991 self.assertRaises(StopIteration, it.next)
992
993 it = self.pool.imap(sqr, range(1000), chunksize=100)
994 for i in range(1000):
995 self.assertEqual(it.next(), i*i)
996 self.assertRaises(StopIteration, it.next)
997
998 def test_imap_unordered(self):
999 it = self.pool.imap_unordered(sqr, range(1000))
1000 self.assertEqual(sorted(it), map(sqr, range(1000)))
1001
1002 it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
1003 self.assertEqual(sorted(it), map(sqr, range(1000)))
1004
1005 def test_make_pool(self):
1006 p = multiprocessing.Pool(3)
1007 self.assertEqual(3, len(p._pool))
1008 p.close()
1009 p.join()
1010
1011 def test_terminate(self):
1012 if self.TYPE == 'manager':
1013 # On Unix a forked process increfs each shared object to
1014 # which its parent process held a reference. If the
1015 # forked process gets terminated then there is likely to
1016 # be a reference leak. So to prevent
1017 # _TestZZZNumberOfObjects from failing we skip this test
1018 # when using a manager.
1019 return
1020
1021 result = self.pool.map_async(
1022 time.sleep, [0.1 for i in range(10000)], chunksize=1
1023 )
1024 self.pool.terminate()
1025 join = TimingWrapper(self.pool.join)
1026 join()
1027 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001028#
1029# Test that manager has expected number of shared objects left
1030#
1031
1032class _TestZZZNumberOfObjects(BaseTestCase):
1033 # Because test cases are sorted alphabetically, this one will get
1034 # run after all the other tests for the manager. It tests that
1035 # there have been no "reference leaks" for the manager's shared
1036 # objects. Note the comment in _TestPool.test_terminate().
1037 ALLOWED_TYPES = ('manager',)
1038
1039 def test_number_of_objects(self):
1040 EXPECTED_NUMBER = 1 # the pool object is still alive
1041 multiprocessing.active_children() # discard dead process objs
1042 gc.collect() # do garbage collection
1043 refs = self.manager._number_of_objects()
1044 if refs != EXPECTED_NUMBER:
Jesse Noller7fb96402008-07-17 21:01:05 +00001045 print self.manager._debug_info()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001046
1047 self.assertEqual(refs, EXPECTED_NUMBER)
1048
1049#
1050# Test of creating a customized manager class
1051#
1052
1053from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1054
1055class FooBar(object):
1056 def f(self):
1057 return 'f()'
1058 def g(self):
1059 raise ValueError
1060 def _h(self):
1061 return '_h()'
1062
1063def baz():
1064 for i in xrange(10):
1065 yield i*i
1066
1067class IteratorProxy(BaseProxy):
1068 _exposed_ = ('next', '__next__')
1069 def __iter__(self):
1070 return self
1071 def next(self):
1072 return self._callmethod('next')
1073 def __next__(self):
1074 return self._callmethod('__next__')
1075
1076class MyManager(BaseManager):
1077 pass
1078
1079MyManager.register('Foo', callable=FooBar)
1080MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1081MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1082
1083
1084class _TestMyManager(BaseTestCase):
1085
1086 ALLOWED_TYPES = ('manager',)
1087
1088 def test_mymanager(self):
1089 manager = MyManager()
1090 manager.start()
1091
1092 foo = manager.Foo()
1093 bar = manager.Bar()
1094 baz = manager.baz()
1095
1096 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1097 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1098
1099 self.assertEqual(foo_methods, ['f', 'g'])
1100 self.assertEqual(bar_methods, ['f', '_h'])
1101
1102 self.assertEqual(foo.f(), 'f()')
1103 self.assertRaises(ValueError, foo.g)
1104 self.assertEqual(foo._callmethod('f'), 'f()')
1105 self.assertRaises(RemoteError, foo._callmethod, '_h')
1106
1107 self.assertEqual(bar.f(), 'f()')
1108 self.assertEqual(bar._h(), '_h()')
1109 self.assertEqual(bar._callmethod('f'), 'f()')
1110 self.assertEqual(bar._callmethod('_h'), '_h()')
1111
1112 self.assertEqual(list(baz), [i*i for i in range(10)])
1113
1114 manager.shutdown()
1115
1116#
1117# Test of connecting to a remote server and using xmlrpclib for serialization
1118#
1119
1120_queue = Queue.Queue()
1121def get_queue():
1122 return _queue
1123
1124class QueueManager(BaseManager):
1125 '''manager class used by server process'''
1126QueueManager.register('get_queue', callable=get_queue)
1127
1128class QueueManager2(BaseManager):
1129 '''manager class which specifies the same interface as QueueManager'''
1130QueueManager2.register('get_queue')
1131
1132
1133SERIALIZER = 'xmlrpclib'
1134
1135class _TestRemoteManager(BaseTestCase):
1136
1137 ALLOWED_TYPES = ('manager',)
1138
1139 def _putter(self, address, authkey):
1140 manager = QueueManager2(
1141 address=address, authkey=authkey, serializer=SERIALIZER
1142 )
1143 manager.connect()
1144 queue = manager.get_queue()
1145 queue.put(('hello world', None, True, 2.25))
1146
1147 def test_remote(self):
1148 authkey = os.urandom(32)
1149
1150 manager = QueueManager(
1151 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1152 )
1153 manager.start()
1154
1155 p = self.Process(target=self._putter, args=(manager.address, authkey))
1156 p.start()
1157
1158 manager2 = QueueManager2(
1159 address=manager.address, authkey=authkey, serializer=SERIALIZER
1160 )
1161 manager2.connect()
1162 queue = manager2.get_queue()
1163
1164 # Note that xmlrpclib will deserialize object as a list not a tuple
1165 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1166
1167 # Because we are using xmlrpclib for serialization instead of
1168 # pickle this will cause a serialization error.
1169 self.assertRaises(Exception, queue.put, time.sleep)
1170
1171 # Make queue finalizer run before the server is stopped
1172 del queue
1173 manager.shutdown()
1174
1175#
1176#
1177#
1178
1179SENTINEL = latin('')
1180
1181class _TestConnection(BaseTestCase):
1182
1183 ALLOWED_TYPES = ('processes', 'threads')
1184
1185 def _echo(self, conn):
1186 for msg in iter(conn.recv_bytes, SENTINEL):
1187 conn.send_bytes(msg)
1188 conn.close()
1189
1190 def test_connection(self):
1191 conn, child_conn = self.Pipe()
1192
1193 p = self.Process(target=self._echo, args=(child_conn,))
1194 p.set_daemon(True)
1195 p.start()
1196
1197 seq = [1, 2.25, None]
1198 msg = latin('hello world')
1199 longmsg = msg * 10
1200 arr = array.array('i', range(4))
1201
1202 if self.TYPE == 'processes':
1203 self.assertEqual(type(conn.fileno()), int)
1204
1205 self.assertEqual(conn.send(seq), None)
1206 self.assertEqual(conn.recv(), seq)
1207
1208 self.assertEqual(conn.send_bytes(msg), None)
1209 self.assertEqual(conn.recv_bytes(), msg)
1210
1211 if self.TYPE == 'processes':
1212 buffer = array.array('i', [0]*10)
1213 expected = list(arr) + [0] * (10 - len(arr))
1214 self.assertEqual(conn.send_bytes(arr), None)
1215 self.assertEqual(conn.recv_bytes_into(buffer),
1216 len(arr) * buffer.itemsize)
1217 self.assertEqual(list(buffer), expected)
1218
1219 buffer = array.array('i', [0]*10)
1220 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1221 self.assertEqual(conn.send_bytes(arr), None)
1222 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1223 len(arr) * buffer.itemsize)
1224 self.assertEqual(list(buffer), expected)
1225
1226 buffer = bytearray(latin(' ' * 40))
1227 self.assertEqual(conn.send_bytes(longmsg), None)
1228 try:
1229 res = conn.recv_bytes_into(buffer)
1230 except multiprocessing.BufferTooShort, e:
1231 self.assertEqual(e.args, (longmsg,))
1232 else:
1233 self.fail('expected BufferTooShort, got %s' % res)
1234
1235 poll = TimingWrapper(conn.poll)
1236
1237 self.assertEqual(poll(), False)
1238 self.assertTimingAlmostEqual(poll.elapsed, 0)
1239
1240 self.assertEqual(poll(TIMEOUT1), False)
1241 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1242
1243 conn.send(None)
1244
1245 self.assertEqual(poll(TIMEOUT1), True)
1246 self.assertTimingAlmostEqual(poll.elapsed, 0)
1247
1248 self.assertEqual(conn.recv(), None)
1249
1250 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1251 conn.send_bytes(really_big_msg)
1252 self.assertEqual(conn.recv_bytes(), really_big_msg)
1253
1254 conn.send_bytes(SENTINEL) # tell child to quit
1255 child_conn.close()
1256
1257 if self.TYPE == 'processes':
1258 self.assertEqual(conn.readable, True)
1259 self.assertEqual(conn.writable, True)
1260 self.assertRaises(EOFError, conn.recv)
1261 self.assertRaises(EOFError, conn.recv_bytes)
1262
1263 p.join()
1264
1265 def test_duplex_false(self):
1266 reader, writer = self.Pipe(duplex=False)
1267 self.assertEqual(writer.send(1), None)
1268 self.assertEqual(reader.recv(), 1)
1269 if self.TYPE == 'processes':
1270 self.assertEqual(reader.readable, True)
1271 self.assertEqual(reader.writable, False)
1272 self.assertEqual(writer.readable, False)
1273 self.assertEqual(writer.writable, True)
1274 self.assertRaises(IOError, reader.send, 2)
1275 self.assertRaises(IOError, writer.recv)
1276 self.assertRaises(IOError, writer.poll)
1277
1278 def test_spawn_close(self):
1279 # We test that a pipe connection can be closed by parent
1280 # process immediately after child is spawned. On Windows this
1281 # would have sometimes failed on old versions because
1282 # child_conn would be closed before the child got a chance to
1283 # duplicate it.
1284 conn, child_conn = self.Pipe()
1285
1286 p = self.Process(target=self._echo, args=(child_conn,))
1287 p.start()
1288 child_conn.close() # this might complete before child initializes
1289
1290 msg = latin('hello')
1291 conn.send_bytes(msg)
1292 self.assertEqual(conn.recv_bytes(), msg)
1293
1294 conn.send_bytes(SENTINEL)
1295 conn.close()
1296 p.join()
1297
1298 def test_sendbytes(self):
1299 if self.TYPE != 'processes':
1300 return
1301
1302 msg = latin('abcdefghijklmnopqrstuvwxyz')
1303 a, b = self.Pipe()
1304
1305 a.send_bytes(msg)
1306 self.assertEqual(b.recv_bytes(), msg)
1307
1308 a.send_bytes(msg, 5)
1309 self.assertEqual(b.recv_bytes(), msg[5:])
1310
1311 a.send_bytes(msg, 7, 8)
1312 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1313
1314 a.send_bytes(msg, 26)
1315 self.assertEqual(b.recv_bytes(), latin(''))
1316
1317 a.send_bytes(msg, 26, 0)
1318 self.assertEqual(b.recv_bytes(), latin(''))
1319
1320 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1321
1322 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1323
1324 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1325
1326 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1327
1328 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1329
Benjamin Petersondfd79492008-06-13 19:13:39 +00001330class _TestListenerClient(BaseTestCase):
1331
1332 ALLOWED_TYPES = ('processes', 'threads')
1333
1334 def _test(self, address):
1335 conn = self.connection.Client(address)
1336 conn.send('hello')
1337 conn.close()
1338
1339 def test_listener_client(self):
1340 for family in self.connection.families:
1341 l = self.connection.Listener(family=family)
1342 p = self.Process(target=self._test, args=(l.address,))
1343 p.set_daemon(True)
1344 p.start()
1345 conn = l.accept()
1346 self.assertEqual(conn.recv(), 'hello')
1347 p.join()
1348 l.close()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001349#
1350# Test of sending connection and socket objects between processes
1351#
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001352"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001353class _TestPicklingConnections(BaseTestCase):
1354
1355 ALLOWED_TYPES = ('processes',)
1356
1357 def _listener(self, conn, families):
1358 for fam in families:
1359 l = self.connection.Listener(family=fam)
1360 conn.send(l.address)
1361 new_conn = l.accept()
1362 conn.send(new_conn)
1363
1364 if self.TYPE == 'processes':
1365 l = socket.socket()
1366 l.bind(('localhost', 0))
1367 conn.send(l.getsockname())
1368 l.listen(1)
1369 new_conn, addr = l.accept()
1370 conn.send(new_conn)
1371
1372 conn.recv()
1373
1374 def _remote(self, conn):
1375 for (address, msg) in iter(conn.recv, None):
1376 client = self.connection.Client(address)
1377 client.send(msg.upper())
1378 client.close()
1379
1380 if self.TYPE == 'processes':
1381 address, msg = conn.recv()
1382 client = socket.socket()
1383 client.connect(address)
1384 client.sendall(msg.upper())
1385 client.close()
1386
1387 conn.close()
1388
1389 def test_pickling(self):
1390 try:
1391 multiprocessing.allow_connection_pickling()
1392 except ImportError:
1393 return
1394
1395 families = self.connection.families
1396
1397 lconn, lconn0 = self.Pipe()
1398 lp = self.Process(target=self._listener, args=(lconn0, families))
1399 lp.start()
1400 lconn0.close()
1401
1402 rconn, rconn0 = self.Pipe()
1403 rp = self.Process(target=self._remote, args=(rconn0,))
1404 rp.start()
1405 rconn0.close()
1406
1407 for fam in families:
1408 msg = ('This connection uses family %s' % fam).encode('ascii')
1409 address = lconn.recv()
1410 rconn.send((address, msg))
1411 new_conn = lconn.recv()
1412 self.assertEqual(new_conn.recv(), msg.upper())
1413
1414 rconn.send(None)
1415
1416 if self.TYPE == 'processes':
1417 msg = latin('This connection uses a normal socket')
1418 address = lconn.recv()
1419 rconn.send((address, msg))
1420 if hasattr(socket, 'fromfd'):
1421 new_conn = lconn.recv()
1422 self.assertEqual(new_conn.recv(100), msg.upper())
1423 else:
1424 # XXX On Windows with Py2.6 need to backport fromfd()
1425 discard = lconn.recv_bytes()
1426
1427 lconn.send(None)
1428
1429 rconn.close()
1430 lconn.close()
1431
1432 lp.join()
1433 rp.join()
Benjamin Petersonda3a1b12008-06-16 20:52:48 +00001434"""
Benjamin Petersondfd79492008-06-13 19:13:39 +00001435#
1436#
1437#
1438
1439class _TestHeap(BaseTestCase):
1440
1441 ALLOWED_TYPES = ('processes',)
1442
1443 def test_heap(self):
1444 iterations = 5000
1445 maxblocks = 50
1446 blocks = []
1447
1448 # create and destroy lots of blocks of different sizes
1449 for i in xrange(iterations):
1450 size = int(random.lognormvariate(0, 1) * 1000)
1451 b = multiprocessing.heap.BufferWrapper(size)
1452 blocks.append(b)
1453 if len(blocks) > maxblocks:
1454 i = random.randrange(maxblocks)
1455 del blocks[i]
1456
1457 # get the heap object
1458 heap = multiprocessing.heap.BufferWrapper._heap
1459
1460 # verify the state of the heap
1461 all = []
1462 occupied = 0
1463 for L in heap._len_to_seq.values():
1464 for arena, start, stop in L:
1465 all.append((heap._arenas.index(arena), start, stop,
1466 stop-start, 'free'))
1467 for arena, start, stop in heap._allocated_blocks:
1468 all.append((heap._arenas.index(arena), start, stop,
1469 stop-start, 'occupied'))
1470 occupied += (stop-start)
1471
1472 all.sort()
1473
1474 for i in range(len(all)-1):
1475 (arena, start, stop) = all[i][:3]
1476 (narena, nstart, nstop) = all[i+1][:3]
1477 self.assertTrue((arena != narena and nstart == 0) or
1478 (stop == nstart))
1479
1480#
1481#
1482#
1483
1484try:
1485 from ctypes import Structure, Value, copy, c_int, c_double
1486except ImportError:
1487 Structure = object
1488 c_int = c_double = None
1489
1490class _Foo(Structure):
1491 _fields_ = [
1492 ('x', c_int),
1493 ('y', c_double)
1494 ]
1495
1496class _TestSharedCTypes(BaseTestCase):
1497
1498 ALLOWED_TYPES = ('processes',)
1499
1500 def _double(self, x, y, foo, arr, string):
1501 x.value *= 2
1502 y.value *= 2
1503 foo.x *= 2
1504 foo.y *= 2
1505 string.value *= 2
1506 for i in range(len(arr)):
1507 arr[i] *= 2
1508
1509 def test_sharedctypes(self, lock=False):
1510 if c_int is None:
1511 return
1512
1513 x = Value('i', 7, lock=lock)
1514 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1515 foo = Value(_Foo, 3, 2, lock=lock)
1516 arr = Array('d', range(10), lock=lock)
1517 string = Array('c', 20, lock=lock)
1518 string.value = 'hello'
1519
1520 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1521 p.start()
1522 p.join()
1523
1524 self.assertEqual(x.value, 14)
1525 self.assertAlmostEqual(y.value, 2.0/3.0)
1526 self.assertEqual(foo.x, 6)
1527 self.assertAlmostEqual(foo.y, 4.0)
1528 for i in range(10):
1529 self.assertAlmostEqual(arr[i], i*2)
1530 self.assertEqual(string.value, latin('hellohello'))
1531
1532 def test_synchronize(self):
1533 self.test_sharedctypes(lock=True)
1534
1535 def test_copy(self):
1536 if c_int is None:
1537 return
1538
1539 foo = _Foo(2, 5.0)
1540 bar = copy(foo)
1541 foo.x = 0
1542 foo.y = 0
1543 self.assertEqual(bar.x, 2)
1544 self.assertAlmostEqual(bar.y, 5.0)
1545
1546#
1547#
1548#
1549
1550class _TestFinalize(BaseTestCase):
1551
1552 ALLOWED_TYPES = ('processes',)
1553
1554 def _test_finalize(self, conn):
1555 class Foo(object):
1556 pass
1557
1558 a = Foo()
1559 util.Finalize(a, conn.send, args=('a',))
1560 del a # triggers callback for a
1561
1562 b = Foo()
1563 close_b = util.Finalize(b, conn.send, args=('b',))
1564 close_b() # triggers callback for b
1565 close_b() # does nothing because callback has already been called
1566 del b # does nothing because callback has already been called
1567
1568 c = Foo()
1569 util.Finalize(c, conn.send, args=('c',))
1570
1571 d10 = Foo()
1572 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1573
1574 d01 = Foo()
1575 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1576 d02 = Foo()
1577 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1578 d03 = Foo()
1579 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1580
1581 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1582
1583 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1584
1585 # call mutliprocessing's cleanup function then exit process without
1586 # garbage collecting locals
1587 util._exit_function()
1588 conn.close()
1589 os._exit(0)
1590
1591 def test_finalize(self):
1592 conn, child_conn = self.Pipe()
1593
1594 p = self.Process(target=self._test_finalize, args=(child_conn,))
1595 p.start()
1596 p.join()
1597
1598 result = [obj for obj in iter(conn.recv, 'STOP')]
1599 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1600
1601#
1602# Test that from ... import * works for each module
1603#
1604
1605class _TestImportStar(BaseTestCase):
1606
1607 ALLOWED_TYPES = ('processes',)
1608
1609 def test_import(self):
1610 modules = (
1611 'multiprocessing', 'multiprocessing.connection',
1612 'multiprocessing.heap', 'multiprocessing.managers',
1613 'multiprocessing.pool', 'multiprocessing.process',
1614 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1615 'multiprocessing.synchronize', 'multiprocessing.util'
1616 )
1617
1618 for name in modules:
1619 __import__(name)
1620 mod = sys.modules[name]
1621
1622 for attr in getattr(mod, '__all__', ()):
1623 self.assertTrue(
1624 hasattr(mod, attr),
1625 '%r does not have attribute %r' % (mod, attr)
1626 )
1627
1628#
1629# Quick test that logging works -- does not test logging output
1630#
1631
1632class _TestLogging(BaseTestCase):
1633
1634 ALLOWED_TYPES = ('processes',)
1635
1636 def test_enable_logging(self):
1637 logger = multiprocessing.get_logger()
1638 logger.setLevel(util.SUBWARNING)
1639 self.assertTrue(logger is not None)
1640 logger.debug('this will not be printed')
1641 logger.info('nor will this')
1642 logger.setLevel(LOG_LEVEL)
1643
1644 def _test_level(self, conn):
1645 logger = multiprocessing.get_logger()
1646 conn.send(logger.getEffectiveLevel())
1647
1648 def test_level(self):
1649 LEVEL1 = 32
1650 LEVEL2 = 37
1651
1652 logger = multiprocessing.get_logger()
1653 root_logger = logging.getLogger()
1654 root_level = root_logger.level
1655
1656 reader, writer = multiprocessing.Pipe(duplex=False)
1657
1658 logger.setLevel(LEVEL1)
1659 self.Process(target=self._test_level, args=(writer,)).start()
1660 self.assertEqual(LEVEL1, reader.recv())
1661
1662 logger.setLevel(logging.NOTSET)
1663 root_logger.setLevel(LEVEL2)
1664 self.Process(target=self._test_level, args=(writer,)).start()
1665 self.assertEqual(LEVEL2, reader.recv())
1666
1667 root_logger.setLevel(root_level)
1668 logger.setLevel(level=LOG_LEVEL)
1669
1670#
1671# Functions used to create test cases from the base ones in this module
1672#
1673
1674def get_attributes(Source, names):
1675 d = {}
1676 for name in names:
1677 obj = getattr(Source, name)
1678 if type(obj) == type(get_attributes):
1679 obj = staticmethod(obj)
1680 d[name] = obj
1681 return d
1682
1683def create_test_cases(Mixin, type):
1684 result = {}
1685 glob = globals()
1686 Type = type[0].upper() + type[1:]
1687
1688 for name in glob.keys():
1689 if name.startswith('_Test'):
1690 base = glob[name]
1691 if type in base.ALLOWED_TYPES:
1692 newname = 'With' + Type + name[1:]
1693 class Temp(base, unittest.TestCase, Mixin):
1694 pass
1695 result[newname] = Temp
1696 Temp.__name__ = newname
1697 Temp.__module__ = Mixin.__module__
1698 return result
1699
1700#
1701# Create test cases
1702#
1703
1704class ProcessesMixin(object):
1705 TYPE = 'processes'
1706 Process = multiprocessing.Process
1707 locals().update(get_attributes(multiprocessing, (
1708 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1709 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1710 'RawArray', 'current_process', 'active_children', 'Pipe',
1711 'connection', 'JoinableQueue'
1712 )))
1713
1714testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1715globals().update(testcases_processes)
1716
1717
1718class ManagerMixin(object):
1719 TYPE = 'manager'
1720 Process = multiprocessing.Process
1721 manager = object.__new__(multiprocessing.managers.SyncManager)
1722 locals().update(get_attributes(manager, (
1723 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1724 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1725 'Namespace', 'JoinableQueue'
1726 )))
1727
1728testcases_manager = create_test_cases(ManagerMixin, type='manager')
1729globals().update(testcases_manager)
1730
1731
1732class ThreadsMixin(object):
1733 TYPE = 'threads'
1734 Process = multiprocessing.dummy.Process
1735 locals().update(get_attributes(multiprocessing.dummy, (
1736 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1737 'Condition', 'Event', 'Value', 'Array', 'current_process',
1738 'active_children', 'Pipe', 'connection', 'dict', 'list',
1739 'Namespace', 'JoinableQueue'
1740 )))
1741
1742testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1743globals().update(testcases_threads)
1744
1745#
1746#
1747#
1748
1749def test_main(run=None):
Jesse Noller18623822008-06-18 13:29:52 +00001750 if sys.platform.startswith("linux"):
1751 try:
1752 lock = multiprocessing.RLock()
1753 except OSError:
1754 from test.test_support import TestSkipped
1755 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Petersoned77f2e2008-06-17 22:40:44 +00001756
Benjamin Petersondfd79492008-06-13 19:13:39 +00001757 if run is None:
1758 from test.test_support import run_unittest as run
1759
1760 util.get_temp_dir() # creates temp directory for use by all processes
1761
1762 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1763
Jesse Noller146b7ab2008-07-02 16:44:09 +00001764 ProcessesMixin.pool = multiprocessing.Pool(4)
1765 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1766 ManagerMixin.manager.__init__()
1767 ManagerMixin.manager.start()
1768 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001769
1770 testcases = (
Jesse Noller146b7ab2008-07-02 16:44:09 +00001771 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1772 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1773 sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
Benjamin Petersondfd79492008-06-13 19:13:39 +00001774 )
1775
1776 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1777 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1778 run(suite)
1779
Jesse Noller146b7ab2008-07-02 16:44:09 +00001780 ThreadsMixin.pool.terminate()
1781 ProcessesMixin.pool.terminate()
1782 ManagerMixin.pool.terminate()
1783 ManagerMixin.manager.shutdown()
Benjamin Petersondfd79492008-06-13 19:13:39 +00001784
Jesse Noller146b7ab2008-07-02 16:44:09 +00001785 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersondfd79492008-06-13 19:13:39 +00001786
1787def main():
1788 test_main(unittest.TextTestRunner(verbosity=2).run)
1789
1790if __name__ == '__main__':
1791 main()