blob: 52b1bd2e3dbb60c8147c32c916a5401574298ba3 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import threading
7import queue as pyqueue
8import time
9import sys
10import os
11import gc
12import signal
13import array
14import copy
15import socket
16import random
17import logging
18
19import multiprocessing.dummy
20import multiprocessing.connection
21import multiprocessing.managers
22import multiprocessing.heap
23import multiprocessing.managers
24import multiprocessing.pool
25import _multiprocessing
26
27from multiprocessing import util
28
29#
30#
31#
32
33if sys.version_info >= (3, 0):
34 def latin(s):
35 return s.encode('latin')
36else:
37 latin = str
38
Benjamin Petersone711caf2008-06-11 16:44:04 +000039#
40# Constants
41#
42
43LOG_LEVEL = util.SUBWARNING
44#LOG_LEVEL = logging.WARNING
45
46DELTA = 0.1
47CHECK_TIMINGS = False # making true makes tests take a lot longer
48 # and can sometimes cause some non-serious
49 # failures because some calls block a bit
50 # longer than expected
51if CHECK_TIMINGS:
52 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
53else:
54 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
55
56HAVE_GETVALUE = not getattr(_multiprocessing,
57 'HAVE_BROKEN_SEM_GETVALUE', False)
58
59#
60# Creates a wrapper for a function which records the time it takes to finish
61#
62
63class TimingWrapper(object):
64
65 def __init__(self, func):
66 self.func = func
67 self.elapsed = None
68
69 def __call__(self, *args, **kwds):
70 t = time.time()
71 try:
72 return self.func(*args, **kwds)
73 finally:
74 self.elapsed = time.time() - t
75
76#
77# Base class for test cases
78#
79
80class BaseTestCase(object):
81
82 ALLOWED_TYPES = ('processes', 'manager', 'threads')
83
84 def assertTimingAlmostEqual(self, a, b):
85 if CHECK_TIMINGS:
86 self.assertAlmostEqual(a, b, 1)
87
88 def assertReturnsIfImplemented(self, value, func, *args):
89 try:
90 res = func(*args)
91 except NotImplementedError:
92 pass
93 else:
94 return self.assertEqual(value, res)
95
96#
97# Return the value of a semaphore
98#
99
100def get_value(self):
101 try:
102 return self.get_value()
103 except AttributeError:
104 try:
105 return self._Semaphore__value
106 except AttributeError:
107 try:
108 return self._value
109 except AttributeError:
110 raise NotImplementedError
111
112#
113# Testcases
114#
115
116class _TestProcess(BaseTestCase):
117
118 ALLOWED_TYPES = ('processes', 'threads')
119
120 def test_current(self):
121 if self.TYPE == 'threads':
122 return
123
124 current = self.current_process()
125 authkey = current.get_authkey()
126
127 self.assertTrue(current.is_alive())
128 self.assertTrue(not current.is_daemon())
129 self.assertTrue(isinstance(authkey, bytes))
130 self.assertTrue(len(authkey) > 0)
131 self.assertEqual(current.get_ident(), os.getpid())
132 self.assertEqual(current.get_exitcode(), None)
133
134 def _test(self, q, *args, **kwds):
135 current = self.current_process()
136 q.put(args)
137 q.put(kwds)
138 q.put(current.get_name())
139 if self.TYPE != 'threads':
140 q.put(bytes(current.get_authkey()))
141 q.put(current.pid)
142
143 def test_process(self):
144 q = self.Queue(1)
145 e = self.Event()
146 args = (q, 1, 2)
147 kwargs = {'hello':23, 'bye':2.54}
148 name = 'SomeProcess'
149 p = self.Process(
150 target=self._test, args=args, kwargs=kwargs, name=name
151 )
152 p.set_daemon(True)
153 current = self.current_process()
154
155 if self.TYPE != 'threads':
156 self.assertEquals(p.get_authkey(), current.get_authkey())
157 self.assertEquals(p.is_alive(), False)
158 self.assertEquals(p.is_daemon(), True)
159 self.assertTrue(p not in self.active_children())
160 self.assertTrue(type(self.active_children()) is list)
161 self.assertEqual(p.get_exitcode(), None)
162
163 p.start()
164
165 self.assertEquals(p.get_exitcode(), None)
166 self.assertEquals(p.is_alive(), True)
167 self.assertTrue(p in self.active_children())
168
169 self.assertEquals(q.get(), args[1:])
170 self.assertEquals(q.get(), kwargs)
171 self.assertEquals(q.get(), p.get_name())
172 if self.TYPE != 'threads':
173 self.assertEquals(q.get(), current.get_authkey())
174 self.assertEquals(q.get(), p.pid)
175
176 p.join()
177
178 self.assertEquals(p.get_exitcode(), 0)
179 self.assertEquals(p.is_alive(), False)
180 self.assertTrue(p not in self.active_children())
181
182 def _test_terminate(self):
183 time.sleep(1000)
184
185 def test_terminate(self):
186 if self.TYPE == 'threads':
187 return
188
189 p = self.Process(target=self._test_terminate)
190 p.set_daemon(True)
191 p.start()
192
193 self.assertEqual(p.is_alive(), True)
194 self.assertTrue(p in self.active_children())
195 self.assertEqual(p.get_exitcode(), None)
196
197 p.terminate()
198
199 join = TimingWrapper(p.join)
200 self.assertEqual(join(), None)
201 self.assertTimingAlmostEqual(join.elapsed, 0.0)
202
203 self.assertEqual(p.is_alive(), False)
204 self.assertTrue(p not in self.active_children())
205
206 p.join()
207
208 # XXX sometimes get p.get_exitcode() == 0 on Windows ...
209 #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
210
211 def test_cpu_count(self):
212 try:
213 cpus = multiprocessing.cpu_count()
214 except NotImplementedError:
215 cpus = 1
216 self.assertTrue(type(cpus) is int)
217 self.assertTrue(cpus >= 1)
218
219 def test_active_children(self):
220 self.assertEqual(type(self.active_children()), list)
221
222 p = self.Process(target=time.sleep, args=(DELTA,))
223 self.assertTrue(p not in self.active_children())
224
225 p.start()
226 self.assertTrue(p in self.active_children())
227
228 p.join()
229 self.assertTrue(p not in self.active_children())
230
231 def _test_recursion(self, wconn, id):
232 from multiprocessing import forking
233 wconn.send(id)
234 if len(id) < 2:
235 for i in range(2):
236 p = self.Process(
237 target=self._test_recursion, args=(wconn, id+[i])
238 )
239 p.start()
240 p.join()
241
242 def test_recursion(self):
243 rconn, wconn = self.Pipe(duplex=False)
244 self._test_recursion(wconn, [])
245
246 time.sleep(DELTA)
247 result = []
248 while rconn.poll():
249 result.append(rconn.recv())
250
251 expected = [
252 [],
253 [0],
254 [0, 0],
255 [0, 1],
256 [1],
257 [1, 0],
258 [1, 1]
259 ]
260 self.assertEqual(result, expected)
261
262#
263#
264#
265
266class _UpperCaser(multiprocessing.Process):
267
268 def __init__(self):
269 multiprocessing.Process.__init__(self)
270 self.child_conn, self.parent_conn = multiprocessing.Pipe()
271
272 def run(self):
273 self.parent_conn.close()
274 for s in iter(self.child_conn.recv, None):
275 self.child_conn.send(s.upper())
276 self.child_conn.close()
277
278 def submit(self, s):
279 assert type(s) is str
280 self.parent_conn.send(s)
281 return self.parent_conn.recv()
282
283 def stop(self):
284 self.parent_conn.send(None)
285 self.parent_conn.close()
286 self.child_conn.close()
287
288class _TestSubclassingProcess(BaseTestCase):
289
290 ALLOWED_TYPES = ('processes',)
291
292 def test_subclassing(self):
293 uppercaser = _UpperCaser()
294 uppercaser.start()
295 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
296 self.assertEqual(uppercaser.submit('world'), 'WORLD')
297 uppercaser.stop()
298 uppercaser.join()
299
300#
301#
302#
303
304def queue_empty(q):
305 if hasattr(q, 'empty'):
306 return q.empty()
307 else:
308 return q.qsize() == 0
309
310def queue_full(q, maxsize):
311 if hasattr(q, 'full'):
312 return q.full()
313 else:
314 return q.qsize() == maxsize
315
316
317class _TestQueue(BaseTestCase):
318
319
320 def _test_put(self, queue, child_can_start, parent_can_continue):
321 child_can_start.wait()
322 for i in range(6):
323 queue.get()
324 parent_can_continue.set()
325
326 def test_put(self):
327 MAXSIZE = 6
328 queue = self.Queue(maxsize=MAXSIZE)
329 child_can_start = self.Event()
330 parent_can_continue = self.Event()
331
332 proc = self.Process(
333 target=self._test_put,
334 args=(queue, child_can_start, parent_can_continue)
335 )
336 proc.set_daemon(True)
337 proc.start()
338
339 self.assertEqual(queue_empty(queue), True)
340 self.assertEqual(queue_full(queue, MAXSIZE), False)
341
342 queue.put(1)
343 queue.put(2, True)
344 queue.put(3, True, None)
345 queue.put(4, False)
346 queue.put(5, False, None)
347 queue.put_nowait(6)
348
349 # the values may be in buffer but not yet in pipe so sleep a bit
350 time.sleep(DELTA)
351
352 self.assertEqual(queue_empty(queue), False)
353 self.assertEqual(queue_full(queue, MAXSIZE), True)
354
355 put = TimingWrapper(queue.put)
356 put_nowait = TimingWrapper(queue.put_nowait)
357
358 self.assertRaises(pyqueue.Full, put, 7, False)
359 self.assertTimingAlmostEqual(put.elapsed, 0)
360
361 self.assertRaises(pyqueue.Full, put, 7, False, None)
362 self.assertTimingAlmostEqual(put.elapsed, 0)
363
364 self.assertRaises(pyqueue.Full, put_nowait, 7)
365 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
366
367 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
368 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
369
370 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
371 self.assertTimingAlmostEqual(put.elapsed, 0)
372
373 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
374 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
375
376 child_can_start.set()
377 parent_can_continue.wait()
378
379 self.assertEqual(queue_empty(queue), True)
380 self.assertEqual(queue_full(queue, MAXSIZE), False)
381
382 proc.join()
383
384 def _test_get(self, queue, child_can_start, parent_can_continue):
385 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000386 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000387 queue.put(2)
388 queue.put(3)
389 queue.put(4)
390 queue.put(5)
391 parent_can_continue.set()
392
393 def test_get(self):
394 queue = self.Queue()
395 child_can_start = self.Event()
396 parent_can_continue = self.Event()
397
398 proc = self.Process(
399 target=self._test_get,
400 args=(queue, child_can_start, parent_can_continue)
401 )
402 proc.set_daemon(True)
403 proc.start()
404
405 self.assertEqual(queue_empty(queue), True)
406
407 child_can_start.set()
408 parent_can_continue.wait()
409
410 time.sleep(DELTA)
411 self.assertEqual(queue_empty(queue), False)
412
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000413 # Hangs unexpectedly, remove for now
414 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000415 self.assertEqual(queue.get(True, None), 2)
416 self.assertEqual(queue.get(True), 3)
417 self.assertEqual(queue.get(timeout=1), 4)
418 self.assertEqual(queue.get_nowait(), 5)
419
420 self.assertEqual(queue_empty(queue), True)
421
422 get = TimingWrapper(queue.get)
423 get_nowait = TimingWrapper(queue.get_nowait)
424
425 self.assertRaises(pyqueue.Empty, get, False)
426 self.assertTimingAlmostEqual(get.elapsed, 0)
427
428 self.assertRaises(pyqueue.Empty, get, False, None)
429 self.assertTimingAlmostEqual(get.elapsed, 0)
430
431 self.assertRaises(pyqueue.Empty, get_nowait)
432 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
433
434 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
435 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
436
437 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
438 self.assertTimingAlmostEqual(get.elapsed, 0)
439
440 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
442
443 proc.join()
444
445 def _test_fork(self, queue):
446 for i in range(10, 20):
447 queue.put(i)
448 # note that at this point the items may only be buffered, so the
449 # process cannot shutdown until the feeder thread has finished
450 # pushing items onto the pipe.
451
452 def test_fork(self):
453 # Old versions of Queue would fail to create a new feeder
454 # thread for a forked process if the original process had its
455 # own feeder thread. This test checks that this no longer
456 # happens.
457
458 queue = self.Queue()
459
460 # put items on queue so that main process starts a feeder thread
461 for i in range(10):
462 queue.put(i)
463
464 # wait to make sure thread starts before we fork a new process
465 time.sleep(DELTA)
466
467 # fork process
468 p = self.Process(target=self._test_fork, args=(queue,))
469 p.start()
470
471 # check that all expected items are in the queue
472 for i in range(20):
473 self.assertEqual(queue.get(), i)
474 self.assertRaises(pyqueue.Empty, queue.get, False)
475
476 p.join()
477
478 def test_qsize(self):
479 q = self.Queue()
480 try:
481 self.assertEqual(q.qsize(), 0)
482 except NotImplementedError:
483 return
484 q.put(1)
485 self.assertEqual(q.qsize(), 1)
486 q.put(5)
487 self.assertEqual(q.qsize(), 2)
488 q.get()
489 self.assertEqual(q.qsize(), 1)
490 q.get()
491 self.assertEqual(q.qsize(), 0)
492
493 def _test_task_done(self, q):
494 for obj in iter(q.get, None):
495 time.sleep(DELTA)
496 q.task_done()
497
498 def test_task_done(self):
499 queue = self.JoinableQueue()
500
501 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
502 return
503
504 workers = [self.Process(target=self._test_task_done, args=(queue,))
505 for i in range(4)]
506
507 for p in workers:
508 p.start()
509
510 for i in range(10):
511 queue.put(i)
512
513 queue.join()
514
515 for p in workers:
516 queue.put(None)
517
518 for p in workers:
519 p.join()
520
521#
522#
523#
524
525class _TestLock(BaseTestCase):
526
527 def test_lock(self):
528 lock = self.Lock()
529 self.assertEqual(lock.acquire(), True)
530 self.assertEqual(lock.acquire(False), False)
531 self.assertEqual(lock.release(), None)
532 self.assertRaises((ValueError, threading.ThreadError), lock.release)
533
534 def test_rlock(self):
535 lock = self.RLock()
536 self.assertEqual(lock.acquire(), True)
537 self.assertEqual(lock.acquire(), True)
538 self.assertEqual(lock.acquire(), True)
539 self.assertEqual(lock.release(), None)
540 self.assertEqual(lock.release(), None)
541 self.assertEqual(lock.release(), None)
542 self.assertRaises((AssertionError, RuntimeError), lock.release)
543
544
545class _TestSemaphore(BaseTestCase):
546
547 def _test_semaphore(self, sem):
548 self.assertReturnsIfImplemented(2, get_value, sem)
549 self.assertEqual(sem.acquire(), True)
550 self.assertReturnsIfImplemented(1, get_value, sem)
551 self.assertEqual(sem.acquire(), True)
552 self.assertReturnsIfImplemented(0, get_value, sem)
553 self.assertEqual(sem.acquire(False), False)
554 self.assertReturnsIfImplemented(0, get_value, sem)
555 self.assertEqual(sem.release(), None)
556 self.assertReturnsIfImplemented(1, get_value, sem)
557 self.assertEqual(sem.release(), None)
558 self.assertReturnsIfImplemented(2, get_value, sem)
559
560 def test_semaphore(self):
561 sem = self.Semaphore(2)
562 self._test_semaphore(sem)
563 self.assertEqual(sem.release(), None)
564 self.assertReturnsIfImplemented(3, get_value, sem)
565 self.assertEqual(sem.release(), None)
566 self.assertReturnsIfImplemented(4, get_value, sem)
567
568 def test_bounded_semaphore(self):
569 sem = self.BoundedSemaphore(2)
570 self._test_semaphore(sem)
571 # Currently fails on OS/X
572 #if HAVE_GETVALUE:
573 # self.assertRaises(ValueError, sem.release)
574 # self.assertReturnsIfImplemented(2, get_value, sem)
575
576 def test_timeout(self):
577 if self.TYPE != 'processes':
578 return
579
580 sem = self.Semaphore(0)
581 acquire = TimingWrapper(sem.acquire)
582
583 self.assertEqual(acquire(False), False)
584 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
585
586 self.assertEqual(acquire(False, None), False)
587 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
588
589 self.assertEqual(acquire(False, TIMEOUT1), False)
590 self.assertTimingAlmostEqual(acquire.elapsed, 0)
591
592 self.assertEqual(acquire(True, TIMEOUT2), False)
593 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
594
595 self.assertEqual(acquire(timeout=TIMEOUT3), False)
596 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
597
598
599class _TestCondition(BaseTestCase):
600
601 def f(self, cond, sleeping, woken, timeout=None):
602 cond.acquire()
603 sleeping.release()
604 cond.wait(timeout)
605 woken.release()
606 cond.release()
607
608 def check_invariant(self, cond):
609 # this is only supposed to succeed when there are no sleepers
610 if self.TYPE == 'processes':
611 try:
612 sleepers = (cond._sleeping_count.get_value() -
613 cond._woken_count.get_value())
614 self.assertEqual(sleepers, 0)
615 self.assertEqual(cond._wait_semaphore.get_value(), 0)
616 except NotImplementedError:
617 pass
618
619 def test_notify(self):
620 cond = self.Condition()
621 sleeping = self.Semaphore(0)
622 woken = self.Semaphore(0)
623
624 p = self.Process(target=self.f, args=(cond, sleeping, woken))
625 p.set_daemon(True)
626 p.start()
627
628 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000629 p.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000630 p.start()
631
632 # wait for both children to start sleeping
633 sleeping.acquire()
634 sleeping.acquire()
635
636 # check no process/thread has woken up
637 time.sleep(DELTA)
638 self.assertReturnsIfImplemented(0, get_value, woken)
639
640 # wake up one process/thread
641 cond.acquire()
642 cond.notify()
643 cond.release()
644
645 # check one process/thread has woken up
646 time.sleep(DELTA)
647 self.assertReturnsIfImplemented(1, get_value, woken)
648
649 # wake up another
650 cond.acquire()
651 cond.notify()
652 cond.release()
653
654 # check other has woken up
655 time.sleep(DELTA)
656 self.assertReturnsIfImplemented(2, get_value, woken)
657
658 # check state is not mucked up
659 self.check_invariant(cond)
660 p.join()
661
662 def test_notify_all(self):
663 cond = self.Condition()
664 sleeping = self.Semaphore(0)
665 woken = self.Semaphore(0)
666
667 # start some threads/processes which will timeout
668 for i in range(3):
669 p = self.Process(target=self.f,
670 args=(cond, sleeping, woken, TIMEOUT1))
671 p.set_daemon(True)
672 p.start()
673
674 t = threading.Thread(target=self.f,
675 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000676 t.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000677 t.start()
678
679 # wait for them all to sleep
680 for i in range(6):
681 sleeping.acquire()
682
683 # check they have all timed out
684 for i in range(6):
685 woken.acquire()
686 self.assertReturnsIfImplemented(0, get_value, woken)
687
688 # check state is not mucked up
689 self.check_invariant(cond)
690
691 # start some more threads/processes
692 for i in range(3):
693 p = self.Process(target=self.f, args=(cond, sleeping, woken))
694 p.set_daemon(True)
695 p.start()
696
697 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000698 t.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000699 t.start()
700
701 # wait for them to all sleep
702 for i in range(6):
703 sleeping.acquire()
704
705 # check no process/thread has woken up
706 time.sleep(DELTA)
707 self.assertReturnsIfImplemented(0, get_value, woken)
708
709 # wake them all up
710 cond.acquire()
711 cond.notify_all()
712 cond.release()
713
714 # check they have all woken
715 time.sleep(DELTA)
716 self.assertReturnsIfImplemented(6, get_value, woken)
717
718 # check state is not mucked up
719 self.check_invariant(cond)
720
721 def test_timeout(self):
722 cond = self.Condition()
723 wait = TimingWrapper(cond.wait)
724 cond.acquire()
725 res = wait(TIMEOUT1)
726 cond.release()
727 self.assertEqual(res, None)
728 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
729
730
731class _TestEvent(BaseTestCase):
732
733 def _test_event(self, event):
734 time.sleep(TIMEOUT2)
735 event.set()
736
737 def test_event(self):
738 event = self.Event()
739 wait = TimingWrapper(event.wait)
740
741 # Removed temporaily, due to API shear, this does not
742 # work with threading._Event objects. is_set == isSet
743 #self.assertEqual(event.is_set(), False)
744
745 self.assertEqual(wait(0.0), None)
746 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
747 self.assertEqual(wait(TIMEOUT1), None)
748 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
749
750 event.set()
751
752 # See note above on the API differences
753 # self.assertEqual(event.is_set(), True)
754 self.assertEqual(wait(), None)
755 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
756 self.assertEqual(wait(TIMEOUT1), None)
757 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
758 # self.assertEqual(event.is_set(), True)
759
760 event.clear()
761
762 #self.assertEqual(event.is_set(), False)
763
764 self.Process(target=self._test_event, args=(event,)).start()
765 self.assertEqual(wait(), None)
766
767#
768#
769#
770
771class _TestValue(BaseTestCase):
772
773 codes_values = [
774 ('i', 4343, 24234),
775 ('d', 3.625, -4.25),
776 ('h', -232, 234),
777 ('c', latin('x'), latin('y'))
778 ]
779
780 def _test(self, values):
781 for sv, cv in zip(values, self.codes_values):
782 sv.value = cv[2]
783
784
785 def test_value(self, raw=False):
786 if self.TYPE != 'processes':
787 return
788
789 if raw:
790 values = [self.RawValue(code, value)
791 for code, value, _ in self.codes_values]
792 else:
793 values = [self.Value(code, value)
794 for code, value, _ in self.codes_values]
795
796 for sv, cv in zip(values, self.codes_values):
797 self.assertEqual(sv.value, cv[1])
798
799 proc = self.Process(target=self._test, args=(values,))
800 proc.start()
801 proc.join()
802
803 for sv, cv in zip(values, self.codes_values):
804 self.assertEqual(sv.value, cv[2])
805
806 def test_rawvalue(self):
807 self.test_value(raw=True)
808
809 def test_getobj_getlock(self):
810 if self.TYPE != 'processes':
811 return
812
813 val1 = self.Value('i', 5)
814 lock1 = val1.get_lock()
815 obj1 = val1.get_obj()
816
817 val2 = self.Value('i', 5, lock=None)
818 lock2 = val2.get_lock()
819 obj2 = val2.get_obj()
820
821 lock = self.Lock()
822 val3 = self.Value('i', 5, lock=lock)
823 lock3 = val3.get_lock()
824 obj3 = val3.get_obj()
825 self.assertEqual(lock, lock3)
826
827 arr4 = self.RawValue('i', 5)
828 self.assertFalse(hasattr(arr4, 'get_lock'))
829 self.assertFalse(hasattr(arr4, 'get_obj'))
830
831
832class _TestArray(BaseTestCase):
833
834 def f(self, seq):
835 for i in range(1, len(seq)):
836 seq[i] += seq[i-1]
837
838 def test_array(self, raw=False):
839 if self.TYPE != 'processes':
840 return
841
842 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
843 if raw:
844 arr = self.RawArray('i', seq)
845 else:
846 arr = self.Array('i', seq)
847
848 self.assertEqual(len(arr), len(seq))
849 self.assertEqual(arr[3], seq[3])
850 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
851
852 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
853
854 self.assertEqual(list(arr[:]), seq)
855
856 self.f(seq)
857
858 p = self.Process(target=self.f, args=(arr,))
859 p.start()
860 p.join()
861
862 self.assertEqual(list(arr[:]), seq)
863
864 def test_rawarray(self):
865 self.test_array(raw=True)
866
867 def test_getobj_getlock_obj(self):
868 if self.TYPE != 'processes':
869 return
870
871 arr1 = self.Array('i', list(range(10)))
872 lock1 = arr1.get_lock()
873 obj1 = arr1.get_obj()
874
875 arr2 = self.Array('i', list(range(10)), lock=None)
876 lock2 = arr2.get_lock()
877 obj2 = arr2.get_obj()
878
879 lock = self.Lock()
880 arr3 = self.Array('i', list(range(10)), lock=lock)
881 lock3 = arr3.get_lock()
882 obj3 = arr3.get_obj()
883 self.assertEqual(lock, lock3)
884
885 arr4 = self.RawArray('i', list(range(10)))
886 self.assertFalse(hasattr(arr4, 'get_lock'))
887 self.assertFalse(hasattr(arr4, 'get_obj'))
888
889#
890#
891#
892
893class _TestContainers(BaseTestCase):
894
895 ALLOWED_TYPES = ('manager',)
896
897 def test_list(self):
898 a = self.list(list(range(10)))
899 self.assertEqual(a[:], list(range(10)))
900
901 b = self.list()
902 self.assertEqual(b[:], [])
903
904 b.extend(list(range(5)))
905 self.assertEqual(b[:], list(range(5)))
906
907 self.assertEqual(b[2], 2)
908 self.assertEqual(b[2:10], [2,3,4])
909
910 b *= 2
911 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
912
913 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
914
915 self.assertEqual(a[:], list(range(10)))
916
917 d = [a, b]
918 e = self.list(d)
919 self.assertEqual(
920 e[:],
921 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
922 )
923
924 f = self.list([a])
925 a.append('hello')
926 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
927
928 def test_dict(self):
929 d = self.dict()
930 indices = list(range(65, 70))
931 for i in indices:
932 d[i] = chr(i)
933 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
934 self.assertEqual(sorted(d.keys()), indices)
935 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
936 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
937
938 def test_namespace(self):
939 n = self.Namespace()
940 n.name = 'Bob'
941 n.job = 'Builder'
942 n._hidden = 'hidden'
943 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
944 del n.job
945 self.assertEqual(str(n), "Namespace(name='Bob')")
946 self.assertTrue(hasattr(n, 'name'))
947 self.assertTrue(not hasattr(n, 'job'))
948
949#
950#
951#
952
953def sqr(x, wait=0.0):
954 time.sleep(wait)
955 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000956class _TestPool(BaseTestCase):
957
958 def test_apply(self):
959 papply = self.pool.apply
960 self.assertEqual(papply(sqr, (5,)), sqr(5))
961 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
962
963 def test_map(self):
964 pmap = self.pool.map
965 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
966 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
967 list(map(sqr, list(range(100)))))
968
969 def test_async(self):
970 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
971 get = TimingWrapper(res.get)
972 self.assertEqual(get(), 49)
973 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
974
975 def test_async_timeout(self):
976 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
977 get = TimingWrapper(res.get)
978 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
979 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
980
981 def test_imap(self):
982 it = self.pool.imap(sqr, list(range(10)))
983 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
984
985 it = self.pool.imap(sqr, list(range(10)))
986 for i in range(10):
987 self.assertEqual(next(it), i*i)
988 self.assertRaises(StopIteration, it.__next__)
989
990 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
991 for i in range(1000):
992 self.assertEqual(next(it), i*i)
993 self.assertRaises(StopIteration, it.__next__)
994
995 def test_imap_unordered(self):
996 it = self.pool.imap_unordered(sqr, list(range(1000)))
997 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
998
999 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1000 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1001
1002 def test_make_pool(self):
1003 p = multiprocessing.Pool(3)
1004 self.assertEqual(3, len(p._pool))
1005 p.close()
1006 p.join()
1007
1008 def test_terminate(self):
1009 if self.TYPE == 'manager':
1010 # On Unix a forked process increfs each shared object to
1011 # which its parent process held a reference. If the
1012 # forked process gets terminated then there is likely to
1013 # be a reference leak. So to prevent
1014 # _TestZZZNumberOfObjects from failing we skip this test
1015 # when using a manager.
1016 return
1017
1018 result = self.pool.map_async(
1019 time.sleep, [0.1 for i in range(10000)], chunksize=1
1020 )
1021 self.pool.terminate()
1022 join = TimingWrapper(self.pool.join)
1023 join()
1024 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001025#
1026# Test that manager has expected number of shared objects left
1027#
1028
1029class _TestZZZNumberOfObjects(BaseTestCase):
1030 # Because test cases are sorted alphabetically, this one will get
1031 # run after all the other tests for the manager. It tests that
1032 # there have been no "reference leaks" for the manager's shared
1033 # objects. Note the comment in _TestPool.test_terminate().
1034 ALLOWED_TYPES = ('manager',)
1035
1036 def test_number_of_objects(self):
1037 EXPECTED_NUMBER = 1 # the pool object is still alive
1038 multiprocessing.active_children() # discard dead process objs
1039 gc.collect() # do garbage collection
1040 refs = self.manager._number_of_objects()
1041 if refs != EXPECTED_NUMBER:
1042 print(self.manager._debugInfo())
1043
1044 self.assertEqual(refs, EXPECTED_NUMBER)
1045
1046#
1047# Test of creating a customized manager class
1048#
1049
1050from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1051
1052class FooBar(object):
1053 def f(self):
1054 return 'f()'
1055 def g(self):
1056 raise ValueError
1057 def _h(self):
1058 return '_h()'
1059
1060def baz():
1061 for i in range(10):
1062 yield i*i
1063
1064class IteratorProxy(BaseProxy):
1065 _exposed_ = ('next', '__next__')
1066 def __iter__(self):
1067 return self
1068 def __next__(self):
1069 return self._callmethod('next')
1070 def __next__(self):
1071 return self._callmethod('__next__')
1072
1073class MyManager(BaseManager):
1074 pass
1075
1076MyManager.register('Foo', callable=FooBar)
1077MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1078MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1079
1080
1081class _TestMyManager(BaseTestCase):
1082
1083 ALLOWED_TYPES = ('manager',)
1084
1085 def test_mymanager(self):
1086 manager = MyManager()
1087 manager.start()
1088
1089 foo = manager.Foo()
1090 bar = manager.Bar()
1091 baz = manager.baz()
1092
1093 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1094 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1095
1096 self.assertEqual(foo_methods, ['f', 'g'])
1097 self.assertEqual(bar_methods, ['f', '_h'])
1098
1099 self.assertEqual(foo.f(), 'f()')
1100 self.assertRaises(ValueError, foo.g)
1101 self.assertEqual(foo._callmethod('f'), 'f()')
1102 self.assertRaises(RemoteError, foo._callmethod, '_h')
1103
1104 self.assertEqual(bar.f(), 'f()')
1105 self.assertEqual(bar._h(), '_h()')
1106 self.assertEqual(bar._callmethod('f'), 'f()')
1107 self.assertEqual(bar._callmethod('_h'), '_h()')
1108
1109 self.assertEqual(list(baz), [i*i for i in range(10)])
1110
1111 manager.shutdown()
1112
1113#
1114# Test of connecting to a remote server and using xmlrpclib for serialization
1115#
1116
1117_queue = pyqueue.Queue()
1118def get_queue():
1119 return _queue
1120
1121class QueueManager(BaseManager):
1122 '''manager class used by server process'''
1123QueueManager.register('get_queue', callable=get_queue)
1124
1125class QueueManager2(BaseManager):
1126 '''manager class which specifies the same interface as QueueManager'''
1127QueueManager2.register('get_queue')
1128
1129
1130SERIALIZER = 'xmlrpclib'
1131
1132class _TestRemoteManager(BaseTestCase):
1133
1134 ALLOWED_TYPES = ('manager',)
1135
1136 def _putter(self, address, authkey):
1137 manager = QueueManager2(
1138 address=address, authkey=authkey, serializer=SERIALIZER
1139 )
1140 manager.connect()
1141 queue = manager.get_queue()
1142 queue.put(('hello world', None, True, 2.25))
1143
1144 def test_remote(self):
1145 authkey = os.urandom(32)
1146
1147 manager = QueueManager(
1148 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1149 )
1150 manager.start()
1151
1152 p = self.Process(target=self._putter, args=(manager.address, authkey))
1153 p.start()
1154
1155 manager2 = QueueManager2(
1156 address=manager.address, authkey=authkey, serializer=SERIALIZER
1157 )
1158 manager2.connect()
1159 queue = manager2.get_queue()
1160
1161 # Note that xmlrpclib will deserialize object as a list not a tuple
1162 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1163
1164 # Because we are using xmlrpclib for serialization instead of
1165 # pickle this will cause a serialization error.
1166 self.assertRaises(Exception, queue.put, time.sleep)
1167
1168 # Make queue finalizer run before the server is stopped
1169 del queue
1170 manager.shutdown()
1171
1172#
1173#
1174#
1175
1176SENTINEL = latin('')
1177
1178class _TestConnection(BaseTestCase):
1179
1180 ALLOWED_TYPES = ('processes', 'threads')
1181
1182 def _echo(self, conn):
1183 for msg in iter(conn.recv_bytes, SENTINEL):
1184 conn.send_bytes(msg)
1185 conn.close()
1186
1187 def test_connection(self):
1188 conn, child_conn = self.Pipe()
1189
1190 p = self.Process(target=self._echo, args=(child_conn,))
1191 p.set_daemon(True)
1192 p.start()
1193
1194 seq = [1, 2.25, None]
1195 msg = latin('hello world')
1196 longmsg = msg * 10
1197 arr = array.array('i', list(range(4)))
1198
1199 if self.TYPE == 'processes':
1200 self.assertEqual(type(conn.fileno()), int)
1201
1202 self.assertEqual(conn.send(seq), None)
1203 self.assertEqual(conn.recv(), seq)
1204
1205 self.assertEqual(conn.send_bytes(msg), None)
1206 self.assertEqual(conn.recv_bytes(), msg)
1207
1208 if self.TYPE == 'processes':
1209 buffer = array.array('i', [0]*10)
1210 expected = list(arr) + [0] * (10 - len(arr))
1211 self.assertEqual(conn.send_bytes(arr), None)
1212 self.assertEqual(conn.recv_bytes_into(buffer),
1213 len(arr) * buffer.itemsize)
1214 self.assertEqual(list(buffer), expected)
1215
1216 buffer = array.array('i', [0]*10)
1217 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1218 self.assertEqual(conn.send_bytes(arr), None)
1219 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1220 len(arr) * buffer.itemsize)
1221 self.assertEqual(list(buffer), expected)
1222
1223 buffer = bytearray(latin(' ' * 40))
1224 self.assertEqual(conn.send_bytes(longmsg), None)
1225 try:
1226 res = conn.recv_bytes_into(buffer)
1227 except multiprocessing.BufferTooShort as e:
1228 self.assertEqual(e.args, (longmsg,))
1229 else:
1230 self.fail('expected BufferTooShort, got %s' % res)
1231
1232 poll = TimingWrapper(conn.poll)
1233
1234 self.assertEqual(poll(), False)
1235 self.assertTimingAlmostEqual(poll.elapsed, 0)
1236
1237 self.assertEqual(poll(TIMEOUT1), False)
1238 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1239
1240 conn.send(None)
1241
1242 self.assertEqual(poll(TIMEOUT1), True)
1243 self.assertTimingAlmostEqual(poll.elapsed, 0)
1244
1245 self.assertEqual(conn.recv(), None)
1246
1247 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1248 conn.send_bytes(really_big_msg)
1249 self.assertEqual(conn.recv_bytes(), really_big_msg)
1250
1251 conn.send_bytes(SENTINEL) # tell child to quit
1252 child_conn.close()
1253
1254 if self.TYPE == 'processes':
1255 self.assertEqual(conn.readable, True)
1256 self.assertEqual(conn.writable, True)
1257 self.assertRaises(EOFError, conn.recv)
1258 self.assertRaises(EOFError, conn.recv_bytes)
1259
1260 p.join()
1261
1262 def test_duplex_false(self):
1263 reader, writer = self.Pipe(duplex=False)
1264 self.assertEqual(writer.send(1), None)
1265 self.assertEqual(reader.recv(), 1)
1266 if self.TYPE == 'processes':
1267 self.assertEqual(reader.readable, True)
1268 self.assertEqual(reader.writable, False)
1269 self.assertEqual(writer.readable, False)
1270 self.assertEqual(writer.writable, True)
1271 self.assertRaises(IOError, reader.send, 2)
1272 self.assertRaises(IOError, writer.recv)
1273 self.assertRaises(IOError, writer.poll)
1274
1275 def test_spawn_close(self):
1276 # We test that a pipe connection can be closed by parent
1277 # process immediately after child is spawned. On Windows this
1278 # would have sometimes failed on old versions because
1279 # child_conn would be closed before the child got a chance to
1280 # duplicate it.
1281 conn, child_conn = self.Pipe()
1282
1283 p = self.Process(target=self._echo, args=(child_conn,))
1284 p.start()
1285 child_conn.close() # this might complete before child initializes
1286
1287 msg = latin('hello')
1288 conn.send_bytes(msg)
1289 self.assertEqual(conn.recv_bytes(), msg)
1290
1291 conn.send_bytes(SENTINEL)
1292 conn.close()
1293 p.join()
1294
1295 def test_sendbytes(self):
1296 if self.TYPE != 'processes':
1297 return
1298
1299 msg = latin('abcdefghijklmnopqrstuvwxyz')
1300 a, b = self.Pipe()
1301
1302 a.send_bytes(msg)
1303 self.assertEqual(b.recv_bytes(), msg)
1304
1305 a.send_bytes(msg, 5)
1306 self.assertEqual(b.recv_bytes(), msg[5:])
1307
1308 a.send_bytes(msg, 7, 8)
1309 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1310
1311 a.send_bytes(msg, 26)
1312 self.assertEqual(b.recv_bytes(), latin(''))
1313
1314 a.send_bytes(msg, 26, 0)
1315 self.assertEqual(b.recv_bytes(), latin(''))
1316
1317 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1318
1319 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1320
1321 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1322
1323 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1324
1325 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1326
Benjamin Petersone711caf2008-06-11 16:44:04 +00001327class _TestListenerClient(BaseTestCase):
1328
1329 ALLOWED_TYPES = ('processes', 'threads')
1330
1331 def _test(self, address):
1332 conn = self.connection.Client(address)
1333 conn.send('hello')
1334 conn.close()
1335
1336 def test_listener_client(self):
1337 for family in self.connection.families:
1338 l = self.connection.Listener(family=family)
1339 p = self.Process(target=self._test, args=(l.address,))
1340 p.set_daemon(True)
1341 p.start()
1342 conn = l.accept()
1343 self.assertEqual(conn.recv(), 'hello')
1344 p.join()
1345 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001346#
1347# Test of sending connection and socket objects between processes
1348#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001349"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001350class _TestPicklingConnections(BaseTestCase):
1351
1352 ALLOWED_TYPES = ('processes',)
1353
1354 def _listener(self, conn, families):
1355 for fam in families:
1356 l = self.connection.Listener(family=fam)
1357 conn.send(l.address)
1358 new_conn = l.accept()
1359 conn.send(new_conn)
1360
1361 if self.TYPE == 'processes':
1362 l = socket.socket()
1363 l.bind(('localhost', 0))
1364 conn.send(l.getsockname())
1365 l.listen(1)
1366 new_conn, addr = l.accept()
1367 conn.send(new_conn)
1368
1369 conn.recv()
1370
1371 def _remote(self, conn):
1372 for (address, msg) in iter(conn.recv, None):
1373 client = self.connection.Client(address)
1374 client.send(msg.upper())
1375 client.close()
1376
1377 if self.TYPE == 'processes':
1378 address, msg = conn.recv()
1379 client = socket.socket()
1380 client.connect(address)
1381 client.sendall(msg.upper())
1382 client.close()
1383
1384 conn.close()
1385
1386 def test_pickling(self):
1387 try:
1388 multiprocessing.allow_connection_pickling()
1389 except ImportError:
1390 return
1391
1392 families = self.connection.families
1393
1394 lconn, lconn0 = self.Pipe()
1395 lp = self.Process(target=self._listener, args=(lconn0, families))
1396 lp.start()
1397 lconn0.close()
1398
1399 rconn, rconn0 = self.Pipe()
1400 rp = self.Process(target=self._remote, args=(rconn0,))
1401 rp.start()
1402 rconn0.close()
1403
1404 for fam in families:
1405 msg = ('This connection uses family %s' % fam).encode('ascii')
1406 address = lconn.recv()
1407 rconn.send((address, msg))
1408 new_conn = lconn.recv()
1409 self.assertEqual(new_conn.recv(), msg.upper())
1410
1411 rconn.send(None)
1412
1413 if self.TYPE == 'processes':
1414 msg = latin('This connection uses a normal socket')
1415 address = lconn.recv()
1416 rconn.send((address, msg))
1417 if hasattr(socket, 'fromfd'):
1418 new_conn = lconn.recv()
1419 self.assertEqual(new_conn.recv(100), msg.upper())
1420 else:
1421 # XXX On Windows with Py2.6 need to backport fromfd()
1422 discard = lconn.recv_bytes()
1423
1424 lconn.send(None)
1425
1426 rconn.close()
1427 lconn.close()
1428
1429 lp.join()
1430 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001431"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001432#
1433#
1434#
1435
1436class _TestHeap(BaseTestCase):
1437
1438 ALLOWED_TYPES = ('processes',)
1439
1440 def test_heap(self):
1441 iterations = 5000
1442 maxblocks = 50
1443 blocks = []
1444
1445 # create and destroy lots of blocks of different sizes
1446 for i in range(iterations):
1447 size = int(random.lognormvariate(0, 1) * 1000)
1448 b = multiprocessing.heap.BufferWrapper(size)
1449 blocks.append(b)
1450 if len(blocks) > maxblocks:
1451 i = random.randrange(maxblocks)
1452 del blocks[i]
1453
1454 # get the heap object
1455 heap = multiprocessing.heap.BufferWrapper._heap
1456
1457 # verify the state of the heap
1458 all = []
1459 occupied = 0
1460 for L in list(heap._len_to_seq.values()):
1461 for arena, start, stop in L:
1462 all.append((heap._arenas.index(arena), start, stop,
1463 stop-start, 'free'))
1464 for arena, start, stop in heap._allocated_blocks:
1465 all.append((heap._arenas.index(arena), start, stop,
1466 stop-start, 'occupied'))
1467 occupied += (stop-start)
1468
1469 all.sort()
1470
1471 for i in range(len(all)-1):
1472 (arena, start, stop) = all[i][:3]
1473 (narena, nstart, nstop) = all[i+1][:3]
1474 self.assertTrue((arena != narena and nstart == 0) or
1475 (stop == nstart))
1476
1477#
1478#
1479#
1480
1481try:
1482 from ctypes import Structure, Value, copy, c_int, c_double
1483except ImportError:
1484 Structure = object
1485 c_int = c_double = None
1486
1487class _Foo(Structure):
1488 _fields_ = [
1489 ('x', c_int),
1490 ('y', c_double)
1491 ]
1492
1493class _TestSharedCTypes(BaseTestCase):
1494
1495 ALLOWED_TYPES = ('processes',)
1496
1497 def _double(self, x, y, foo, arr, string):
1498 x.value *= 2
1499 y.value *= 2
1500 foo.x *= 2
1501 foo.y *= 2
1502 string.value *= 2
1503 for i in range(len(arr)):
1504 arr[i] *= 2
1505
1506 def test_sharedctypes(self, lock=False):
1507 if c_int is None:
1508 return
1509
1510 x = Value('i', 7, lock=lock)
1511 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1512 foo = Value(_Foo, 3, 2, lock=lock)
1513 arr = Array('d', list(range(10)), lock=lock)
1514 string = Array('c', 20, lock=lock)
1515 string.value = 'hello'
1516
1517 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1518 p.start()
1519 p.join()
1520
1521 self.assertEqual(x.value, 14)
1522 self.assertAlmostEqual(y.value, 2.0/3.0)
1523 self.assertEqual(foo.x, 6)
1524 self.assertAlmostEqual(foo.y, 4.0)
1525 for i in range(10):
1526 self.assertAlmostEqual(arr[i], i*2)
1527 self.assertEqual(string.value, latin('hellohello'))
1528
1529 def test_synchronize(self):
1530 self.test_sharedctypes(lock=True)
1531
1532 def test_copy(self):
1533 if c_int is None:
1534 return
1535
1536 foo = _Foo(2, 5.0)
1537 bar = copy(foo)
1538 foo.x = 0
1539 foo.y = 0
1540 self.assertEqual(bar.x, 2)
1541 self.assertAlmostEqual(bar.y, 5.0)
1542
1543#
1544#
1545#
1546
1547class _TestFinalize(BaseTestCase):
1548
1549 ALLOWED_TYPES = ('processes',)
1550
1551 def _test_finalize(self, conn):
1552 class Foo(object):
1553 pass
1554
1555 a = Foo()
1556 util.Finalize(a, conn.send, args=('a',))
1557 del a # triggers callback for a
1558
1559 b = Foo()
1560 close_b = util.Finalize(b, conn.send, args=('b',))
1561 close_b() # triggers callback for b
1562 close_b() # does nothing because callback has already been called
1563 del b # does nothing because callback has already been called
1564
1565 c = Foo()
1566 util.Finalize(c, conn.send, args=('c',))
1567
1568 d10 = Foo()
1569 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1570
1571 d01 = Foo()
1572 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1573 d02 = Foo()
1574 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1575 d03 = Foo()
1576 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1577
1578 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1579
1580 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1581
1582 # call mutliprocessing's cleanup function then exit process without
1583 # garbage collecting locals
1584 util._exit_function()
1585 conn.close()
1586 os._exit(0)
1587
1588 def test_finalize(self):
1589 conn, child_conn = self.Pipe()
1590
1591 p = self.Process(target=self._test_finalize, args=(child_conn,))
1592 p.start()
1593 p.join()
1594
1595 result = [obj for obj in iter(conn.recv, 'STOP')]
1596 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1597
1598#
1599# Test that from ... import * works for each module
1600#
1601
1602class _TestImportStar(BaseTestCase):
1603
1604 ALLOWED_TYPES = ('processes',)
1605
1606 def test_import(self):
1607 modules = (
1608 'multiprocessing', 'multiprocessing.connection',
1609 'multiprocessing.heap', 'multiprocessing.managers',
1610 'multiprocessing.pool', 'multiprocessing.process',
1611 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1612 'multiprocessing.synchronize', 'multiprocessing.util'
1613 )
1614
1615 for name in modules:
1616 __import__(name)
1617 mod = sys.modules[name]
1618
1619 for attr in getattr(mod, '__all__', ()):
1620 self.assertTrue(
1621 hasattr(mod, attr),
1622 '%r does not have attribute %r' % (mod, attr)
1623 )
1624
1625#
1626# Quick test that logging works -- does not test logging output
1627#
1628
1629class _TestLogging(BaseTestCase):
1630
1631 ALLOWED_TYPES = ('processes',)
1632
1633 def test_enable_logging(self):
1634 logger = multiprocessing.get_logger()
1635 logger.setLevel(util.SUBWARNING)
1636 self.assertTrue(logger is not None)
1637 logger.debug('this will not be printed')
1638 logger.info('nor will this')
1639 logger.setLevel(LOG_LEVEL)
1640
1641 def _test_level(self, conn):
1642 logger = multiprocessing.get_logger()
1643 conn.send(logger.getEffectiveLevel())
1644
1645 def test_level(self):
1646 LEVEL1 = 32
1647 LEVEL2 = 37
1648
1649 logger = multiprocessing.get_logger()
1650 root_logger = logging.getLogger()
1651 root_level = root_logger.level
1652
1653 reader, writer = multiprocessing.Pipe(duplex=False)
1654
1655 logger.setLevel(LEVEL1)
1656 self.Process(target=self._test_level, args=(writer,)).start()
1657 self.assertEqual(LEVEL1, reader.recv())
1658
1659 logger.setLevel(logging.NOTSET)
1660 root_logger.setLevel(LEVEL2)
1661 self.Process(target=self._test_level, args=(writer,)).start()
1662 self.assertEqual(LEVEL2, reader.recv())
1663
1664 root_logger.setLevel(root_level)
1665 logger.setLevel(level=LOG_LEVEL)
1666
1667#
1668# Functions used to create test cases from the base ones in this module
1669#
1670
1671def get_attributes(Source, names):
1672 d = {}
1673 for name in names:
1674 obj = getattr(Source, name)
1675 if type(obj) == type(get_attributes):
1676 obj = staticmethod(obj)
1677 d[name] = obj
1678 return d
1679
1680def create_test_cases(Mixin, type):
1681 result = {}
1682 glob = globals()
1683 Type = type[0].upper() + type[1:]
1684
1685 for name in list(glob.keys()):
1686 if name.startswith('_Test'):
1687 base = glob[name]
1688 if type in base.ALLOWED_TYPES:
1689 newname = 'With' + Type + name[1:]
1690 class Temp(base, unittest.TestCase, Mixin):
1691 pass
1692 result[newname] = Temp
1693 Temp.__name__ = newname
1694 Temp.__module__ = Mixin.__module__
1695 return result
1696
1697#
1698# Create test cases
1699#
1700
1701class ProcessesMixin(object):
1702 TYPE = 'processes'
1703 Process = multiprocessing.Process
1704 locals().update(get_attributes(multiprocessing, (
1705 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1706 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1707 'RawArray', 'current_process', 'active_children', 'Pipe',
1708 'connection', 'JoinableQueue'
1709 )))
1710
1711testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1712globals().update(testcases_processes)
1713
1714
1715class ManagerMixin(object):
1716 TYPE = 'manager'
1717 Process = multiprocessing.Process
1718 manager = object.__new__(multiprocessing.managers.SyncManager)
1719 locals().update(get_attributes(manager, (
1720 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1721 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1722 'Namespace', 'JoinableQueue'
1723 )))
1724
1725testcases_manager = create_test_cases(ManagerMixin, type='manager')
1726globals().update(testcases_manager)
1727
1728
1729class ThreadsMixin(object):
1730 TYPE = 'threads'
1731 Process = multiprocessing.dummy.Process
1732 locals().update(get_attributes(multiprocessing.dummy, (
1733 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1734 'Condition', 'Event', 'Value', 'Array', 'current_process',
1735 'active_children', 'Pipe', 'connection', 'dict', 'list',
1736 'Namespace', 'JoinableQueue'
1737 )))
1738
1739testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1740globals().update(testcases_threads)
1741
1742#
1743#
1744#
1745
1746def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001747 if sys.platform.startswith("linux"):
1748 try:
1749 lock = multiprocessing.RLock()
1750 except OSError:
Amaury Forgeot d'Arc620626b2008-06-19 22:03:50 +00001751 from test.support import TestSkipped
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001752 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001753
Benjamin Petersone711caf2008-06-11 16:44:04 +00001754 if run is None:
1755 from test.support import run_unittest as run
1756
1757 util.get_temp_dir() # creates temp directory for use by all processes
1758
1759 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1760
Benjamin Peterson41181742008-07-02 20:22:54 +00001761 ProcessesMixin.pool = multiprocessing.Pool(4)
1762 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1763 ManagerMixin.manager.__init__()
1764 ManagerMixin.manager.start()
1765 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001766
1767 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001768 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1769 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1770 sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001771 )
1772
1773 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1774 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1775 run(suite)
1776
Benjamin Peterson41181742008-07-02 20:22:54 +00001777 ThreadsMixin.pool.terminate()
1778 ProcessesMixin.pool.terminate()
1779 ManagerMixin.pool.terminate()
1780 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001781
Benjamin Peterson41181742008-07-02 20:22:54 +00001782 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001783
1784def main():
1785 test_main(unittest.TextTestRunner(verbosity=2).run)
1786
1787if __name__ == '__main__':
1788 main()