blob: 4b36efed148ac9e46c0ca5d8c040e84084a93088 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import threading
7import queue as pyqueue
8import time
9import sys
10import os
11import gc
12import signal
13import array
14import copy
15import socket
16import random
17import logging
18
19import multiprocessing.dummy
20import multiprocessing.connection
21import multiprocessing.managers
22import multiprocessing.heap
23import multiprocessing.managers
24import multiprocessing.pool
25import _multiprocessing
26
27from multiprocessing import util
28
29#
30#
31#
32
33if sys.version_info >= (3, 0):
34 def latin(s):
35 return s.encode('latin')
36else:
37 latin = str
38
39try:
40 bytes
41except NameError:
42 bytes = str
43 def bytearray(seq):
44 return array.array('c', seq)
45
46#
47# Constants
48#
49
50LOG_LEVEL = util.SUBWARNING
51#LOG_LEVEL = logging.WARNING
52
53DELTA = 0.1
54CHECK_TIMINGS = False # making true makes tests take a lot longer
55 # and can sometimes cause some non-serious
56 # failures because some calls block a bit
57 # longer than expected
58if CHECK_TIMINGS:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
60else:
61 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
62
63HAVE_GETVALUE = not getattr(_multiprocessing,
64 'HAVE_BROKEN_SEM_GETVALUE', False)
65
66#
67# Creates a wrapper for a function which records the time it takes to finish
68#
69
70class TimingWrapper(object):
71
72 def __init__(self, func):
73 self.func = func
74 self.elapsed = None
75
76 def __call__(self, *args, **kwds):
77 t = time.time()
78 try:
79 return self.func(*args, **kwds)
80 finally:
81 self.elapsed = time.time() - t
82
83#
84# Base class for test cases
85#
86
87class BaseTestCase(object):
88
89 ALLOWED_TYPES = ('processes', 'manager', 'threads')
90
91 def assertTimingAlmostEqual(self, a, b):
92 if CHECK_TIMINGS:
93 self.assertAlmostEqual(a, b, 1)
94
95 def assertReturnsIfImplemented(self, value, func, *args):
96 try:
97 res = func(*args)
98 except NotImplementedError:
99 pass
100 else:
101 return self.assertEqual(value, res)
102
103#
104# Return the value of a semaphore
105#
106
107def get_value(self):
108 try:
109 return self.get_value()
110 except AttributeError:
111 try:
112 return self._Semaphore__value
113 except AttributeError:
114 try:
115 return self._value
116 except AttributeError:
117 raise NotImplementedError
118
119#
120# Testcases
121#
122
123class _TestProcess(BaseTestCase):
124
125 ALLOWED_TYPES = ('processes', 'threads')
126
127 def test_current(self):
128 if self.TYPE == 'threads':
129 return
130
131 current = self.current_process()
132 authkey = current.get_authkey()
133
134 self.assertTrue(current.is_alive())
135 self.assertTrue(not current.is_daemon())
136 self.assertTrue(isinstance(authkey, bytes))
137 self.assertTrue(len(authkey) > 0)
138 self.assertEqual(current.get_ident(), os.getpid())
139 self.assertEqual(current.get_exitcode(), None)
140
141 def _test(self, q, *args, **kwds):
142 current = self.current_process()
143 q.put(args)
144 q.put(kwds)
145 q.put(current.get_name())
146 if self.TYPE != 'threads':
147 q.put(bytes(current.get_authkey()))
148 q.put(current.pid)
149
150 def test_process(self):
151 q = self.Queue(1)
152 e = self.Event()
153 args = (q, 1, 2)
154 kwargs = {'hello':23, 'bye':2.54}
155 name = 'SomeProcess'
156 p = self.Process(
157 target=self._test, args=args, kwargs=kwargs, name=name
158 )
159 p.set_daemon(True)
160 current = self.current_process()
161
162 if self.TYPE != 'threads':
163 self.assertEquals(p.get_authkey(), current.get_authkey())
164 self.assertEquals(p.is_alive(), False)
165 self.assertEquals(p.is_daemon(), True)
166 self.assertTrue(p not in self.active_children())
167 self.assertTrue(type(self.active_children()) is list)
168 self.assertEqual(p.get_exitcode(), None)
169
170 p.start()
171
172 self.assertEquals(p.get_exitcode(), None)
173 self.assertEquals(p.is_alive(), True)
174 self.assertTrue(p in self.active_children())
175
176 self.assertEquals(q.get(), args[1:])
177 self.assertEquals(q.get(), kwargs)
178 self.assertEquals(q.get(), p.get_name())
179 if self.TYPE != 'threads':
180 self.assertEquals(q.get(), current.get_authkey())
181 self.assertEquals(q.get(), p.pid)
182
183 p.join()
184
185 self.assertEquals(p.get_exitcode(), 0)
186 self.assertEquals(p.is_alive(), False)
187 self.assertTrue(p not in self.active_children())
188
189 def _test_terminate(self):
190 time.sleep(1000)
191
192 def test_terminate(self):
193 if self.TYPE == 'threads':
194 return
195
196 p = self.Process(target=self._test_terminate)
197 p.set_daemon(True)
198 p.start()
199
200 self.assertEqual(p.is_alive(), True)
201 self.assertTrue(p in self.active_children())
202 self.assertEqual(p.get_exitcode(), None)
203
204 p.terminate()
205
206 join = TimingWrapper(p.join)
207 self.assertEqual(join(), None)
208 self.assertTimingAlmostEqual(join.elapsed, 0.0)
209
210 self.assertEqual(p.is_alive(), False)
211 self.assertTrue(p not in self.active_children())
212
213 p.join()
214
215 # XXX sometimes get p.get_exitcode() == 0 on Windows ...
216 #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
217
218 def test_cpu_count(self):
219 try:
220 cpus = multiprocessing.cpu_count()
221 except NotImplementedError:
222 cpus = 1
223 self.assertTrue(type(cpus) is int)
224 self.assertTrue(cpus >= 1)
225
226 def test_active_children(self):
227 self.assertEqual(type(self.active_children()), list)
228
229 p = self.Process(target=time.sleep, args=(DELTA,))
230 self.assertTrue(p not in self.active_children())
231
232 p.start()
233 self.assertTrue(p in self.active_children())
234
235 p.join()
236 self.assertTrue(p not in self.active_children())
237
238 def _test_recursion(self, wconn, id):
239 from multiprocessing import forking
240 wconn.send(id)
241 if len(id) < 2:
242 for i in range(2):
243 p = self.Process(
244 target=self._test_recursion, args=(wconn, id+[i])
245 )
246 p.start()
247 p.join()
248
249 def test_recursion(self):
250 rconn, wconn = self.Pipe(duplex=False)
251 self._test_recursion(wconn, [])
252
253 time.sleep(DELTA)
254 result = []
255 while rconn.poll():
256 result.append(rconn.recv())
257
258 expected = [
259 [],
260 [0],
261 [0, 0],
262 [0, 1],
263 [1],
264 [1, 0],
265 [1, 1]
266 ]
267 self.assertEqual(result, expected)
268
269#
270#
271#
272
273class _UpperCaser(multiprocessing.Process):
274
275 def __init__(self):
276 multiprocessing.Process.__init__(self)
277 self.child_conn, self.parent_conn = multiprocessing.Pipe()
278
279 def run(self):
280 self.parent_conn.close()
281 for s in iter(self.child_conn.recv, None):
282 self.child_conn.send(s.upper())
283 self.child_conn.close()
284
285 def submit(self, s):
286 assert type(s) is str
287 self.parent_conn.send(s)
288 return self.parent_conn.recv()
289
290 def stop(self):
291 self.parent_conn.send(None)
292 self.parent_conn.close()
293 self.child_conn.close()
294
295class _TestSubclassingProcess(BaseTestCase):
296
297 ALLOWED_TYPES = ('processes',)
298
299 def test_subclassing(self):
300 uppercaser = _UpperCaser()
301 uppercaser.start()
302 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
303 self.assertEqual(uppercaser.submit('world'), 'WORLD')
304 uppercaser.stop()
305 uppercaser.join()
306
307#
308#
309#
310
311def queue_empty(q):
312 if hasattr(q, 'empty'):
313 return q.empty()
314 else:
315 return q.qsize() == 0
316
317def queue_full(q, maxsize):
318 if hasattr(q, 'full'):
319 return q.full()
320 else:
321 return q.qsize() == maxsize
322
323
324class _TestQueue(BaseTestCase):
325
326
327 def _test_put(self, queue, child_can_start, parent_can_continue):
328 child_can_start.wait()
329 for i in range(6):
330 queue.get()
331 parent_can_continue.set()
332
333 def test_put(self):
334 MAXSIZE = 6
335 queue = self.Queue(maxsize=MAXSIZE)
336 child_can_start = self.Event()
337 parent_can_continue = self.Event()
338
339 proc = self.Process(
340 target=self._test_put,
341 args=(queue, child_can_start, parent_can_continue)
342 )
343 proc.set_daemon(True)
344 proc.start()
345
346 self.assertEqual(queue_empty(queue), True)
347 self.assertEqual(queue_full(queue, MAXSIZE), False)
348
349 queue.put(1)
350 queue.put(2, True)
351 queue.put(3, True, None)
352 queue.put(4, False)
353 queue.put(5, False, None)
354 queue.put_nowait(6)
355
356 # the values may be in buffer but not yet in pipe so sleep a bit
357 time.sleep(DELTA)
358
359 self.assertEqual(queue_empty(queue), False)
360 self.assertEqual(queue_full(queue, MAXSIZE), True)
361
362 put = TimingWrapper(queue.put)
363 put_nowait = TimingWrapper(queue.put_nowait)
364
365 self.assertRaises(pyqueue.Full, put, 7, False)
366 self.assertTimingAlmostEqual(put.elapsed, 0)
367
368 self.assertRaises(pyqueue.Full, put, 7, False, None)
369 self.assertTimingAlmostEqual(put.elapsed, 0)
370
371 self.assertRaises(pyqueue.Full, put_nowait, 7)
372 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
373
374 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
375 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
376
377 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
378 self.assertTimingAlmostEqual(put.elapsed, 0)
379
380 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
381 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
382
383 child_can_start.set()
384 parent_can_continue.wait()
385
386 self.assertEqual(queue_empty(queue), True)
387 self.assertEqual(queue_full(queue, MAXSIZE), False)
388
389 proc.join()
390
391 def _test_get(self, queue, child_can_start, parent_can_continue):
392 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000393 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000394 queue.put(2)
395 queue.put(3)
396 queue.put(4)
397 queue.put(5)
398 parent_can_continue.set()
399
400 def test_get(self):
401 queue = self.Queue()
402 child_can_start = self.Event()
403 parent_can_continue = self.Event()
404
405 proc = self.Process(
406 target=self._test_get,
407 args=(queue, child_can_start, parent_can_continue)
408 )
409 proc.set_daemon(True)
410 proc.start()
411
412 self.assertEqual(queue_empty(queue), True)
413
414 child_can_start.set()
415 parent_can_continue.wait()
416
417 time.sleep(DELTA)
418 self.assertEqual(queue_empty(queue), False)
419
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000420 # Hangs unexpectedly, remove for now
421 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000422 self.assertEqual(queue.get(True, None), 2)
423 self.assertEqual(queue.get(True), 3)
424 self.assertEqual(queue.get(timeout=1), 4)
425 self.assertEqual(queue.get_nowait(), 5)
426
427 self.assertEqual(queue_empty(queue), True)
428
429 get = TimingWrapper(queue.get)
430 get_nowait = TimingWrapper(queue.get_nowait)
431
432 self.assertRaises(pyqueue.Empty, get, False)
433 self.assertTimingAlmostEqual(get.elapsed, 0)
434
435 self.assertRaises(pyqueue.Empty, get, False, None)
436 self.assertTimingAlmostEqual(get.elapsed, 0)
437
438 self.assertRaises(pyqueue.Empty, get_nowait)
439 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
440
441 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
442 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
443
444 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
445 self.assertTimingAlmostEqual(get.elapsed, 0)
446
447 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
448 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
449
450 proc.join()
451
452 def _test_fork(self, queue):
453 for i in range(10, 20):
454 queue.put(i)
455 # note that at this point the items may only be buffered, so the
456 # process cannot shutdown until the feeder thread has finished
457 # pushing items onto the pipe.
458
459 def test_fork(self):
460 # Old versions of Queue would fail to create a new feeder
461 # thread for a forked process if the original process had its
462 # own feeder thread. This test checks that this no longer
463 # happens.
464
465 queue = self.Queue()
466
467 # put items on queue so that main process starts a feeder thread
468 for i in range(10):
469 queue.put(i)
470
471 # wait to make sure thread starts before we fork a new process
472 time.sleep(DELTA)
473
474 # fork process
475 p = self.Process(target=self._test_fork, args=(queue,))
476 p.start()
477
478 # check that all expected items are in the queue
479 for i in range(20):
480 self.assertEqual(queue.get(), i)
481 self.assertRaises(pyqueue.Empty, queue.get, False)
482
483 p.join()
484
485 def test_qsize(self):
486 q = self.Queue()
487 try:
488 self.assertEqual(q.qsize(), 0)
489 except NotImplementedError:
490 return
491 q.put(1)
492 self.assertEqual(q.qsize(), 1)
493 q.put(5)
494 self.assertEqual(q.qsize(), 2)
495 q.get()
496 self.assertEqual(q.qsize(), 1)
497 q.get()
498 self.assertEqual(q.qsize(), 0)
499
500 def _test_task_done(self, q):
501 for obj in iter(q.get, None):
502 time.sleep(DELTA)
503 q.task_done()
504
505 def test_task_done(self):
506 queue = self.JoinableQueue()
507
508 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
509 return
510
511 workers = [self.Process(target=self._test_task_done, args=(queue,))
512 for i in range(4)]
513
514 for p in workers:
515 p.start()
516
517 for i in range(10):
518 queue.put(i)
519
520 queue.join()
521
522 for p in workers:
523 queue.put(None)
524
525 for p in workers:
526 p.join()
527
528#
529#
530#
531
532class _TestLock(BaseTestCase):
533
534 def test_lock(self):
535 lock = self.Lock()
536 self.assertEqual(lock.acquire(), True)
537 self.assertEqual(lock.acquire(False), False)
538 self.assertEqual(lock.release(), None)
539 self.assertRaises((ValueError, threading.ThreadError), lock.release)
540
541 def test_rlock(self):
542 lock = self.RLock()
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.acquire(), True)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertEqual(lock.release(), None)
549 self.assertRaises((AssertionError, RuntimeError), lock.release)
550
551
552class _TestSemaphore(BaseTestCase):
553
554 def _test_semaphore(self, sem):
555 self.assertReturnsIfImplemented(2, get_value, sem)
556 self.assertEqual(sem.acquire(), True)
557 self.assertReturnsIfImplemented(1, get_value, sem)
558 self.assertEqual(sem.acquire(), True)
559 self.assertReturnsIfImplemented(0, get_value, sem)
560 self.assertEqual(sem.acquire(False), False)
561 self.assertReturnsIfImplemented(0, get_value, sem)
562 self.assertEqual(sem.release(), None)
563 self.assertReturnsIfImplemented(1, get_value, sem)
564 self.assertEqual(sem.release(), None)
565 self.assertReturnsIfImplemented(2, get_value, sem)
566
567 def test_semaphore(self):
568 sem = self.Semaphore(2)
569 self._test_semaphore(sem)
570 self.assertEqual(sem.release(), None)
571 self.assertReturnsIfImplemented(3, get_value, sem)
572 self.assertEqual(sem.release(), None)
573 self.assertReturnsIfImplemented(4, get_value, sem)
574
575 def test_bounded_semaphore(self):
576 sem = self.BoundedSemaphore(2)
577 self._test_semaphore(sem)
578 # Currently fails on OS/X
579 #if HAVE_GETVALUE:
580 # self.assertRaises(ValueError, sem.release)
581 # self.assertReturnsIfImplemented(2, get_value, sem)
582
583 def test_timeout(self):
584 if self.TYPE != 'processes':
585 return
586
587 sem = self.Semaphore(0)
588 acquire = TimingWrapper(sem.acquire)
589
590 self.assertEqual(acquire(False), False)
591 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
592
593 self.assertEqual(acquire(False, None), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
595
596 self.assertEqual(acquire(False, TIMEOUT1), False)
597 self.assertTimingAlmostEqual(acquire.elapsed, 0)
598
599 self.assertEqual(acquire(True, TIMEOUT2), False)
600 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
601
602 self.assertEqual(acquire(timeout=TIMEOUT3), False)
603 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
604
605
606class _TestCondition(BaseTestCase):
607
608 def f(self, cond, sleeping, woken, timeout=None):
609 cond.acquire()
610 sleeping.release()
611 cond.wait(timeout)
612 woken.release()
613 cond.release()
614
615 def check_invariant(self, cond):
616 # this is only supposed to succeed when there are no sleepers
617 if self.TYPE == 'processes':
618 try:
619 sleepers = (cond._sleeping_count.get_value() -
620 cond._woken_count.get_value())
621 self.assertEqual(sleepers, 0)
622 self.assertEqual(cond._wait_semaphore.get_value(), 0)
623 except NotImplementedError:
624 pass
625
626 def test_notify(self):
627 cond = self.Condition()
628 sleeping = self.Semaphore(0)
629 woken = self.Semaphore(0)
630
631 p = self.Process(target=self.f, args=(cond, sleeping, woken))
632 p.set_daemon(True)
633 p.start()
634
635 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000636 p.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000637 p.start()
638
639 # wait for both children to start sleeping
640 sleeping.acquire()
641 sleeping.acquire()
642
643 # check no process/thread has woken up
644 time.sleep(DELTA)
645 self.assertReturnsIfImplemented(0, get_value, woken)
646
647 # wake up one process/thread
648 cond.acquire()
649 cond.notify()
650 cond.release()
651
652 # check one process/thread has woken up
653 time.sleep(DELTA)
654 self.assertReturnsIfImplemented(1, get_value, woken)
655
656 # wake up another
657 cond.acquire()
658 cond.notify()
659 cond.release()
660
661 # check other has woken up
662 time.sleep(DELTA)
663 self.assertReturnsIfImplemented(2, get_value, woken)
664
665 # check state is not mucked up
666 self.check_invariant(cond)
667 p.join()
668
669 def test_notify_all(self):
670 cond = self.Condition()
671 sleeping = self.Semaphore(0)
672 woken = self.Semaphore(0)
673
674 # start some threads/processes which will timeout
675 for i in range(3):
676 p = self.Process(target=self.f,
677 args=(cond, sleeping, woken, TIMEOUT1))
678 p.set_daemon(True)
679 p.start()
680
681 t = threading.Thread(target=self.f,
682 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000683 t.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000684 t.start()
685
686 # wait for them all to sleep
687 for i in range(6):
688 sleeping.acquire()
689
690 # check they have all timed out
691 for i in range(6):
692 woken.acquire()
693 self.assertReturnsIfImplemented(0, get_value, woken)
694
695 # check state is not mucked up
696 self.check_invariant(cond)
697
698 # start some more threads/processes
699 for i in range(3):
700 p = self.Process(target=self.f, args=(cond, sleeping, woken))
701 p.set_daemon(True)
702 p.start()
703
704 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000705 t.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000706 t.start()
707
708 # wait for them to all sleep
709 for i in range(6):
710 sleeping.acquire()
711
712 # check no process/thread has woken up
713 time.sleep(DELTA)
714 self.assertReturnsIfImplemented(0, get_value, woken)
715
716 # wake them all up
717 cond.acquire()
718 cond.notify_all()
719 cond.release()
720
721 # check they have all woken
722 time.sleep(DELTA)
723 self.assertReturnsIfImplemented(6, get_value, woken)
724
725 # check state is not mucked up
726 self.check_invariant(cond)
727
728 def test_timeout(self):
729 cond = self.Condition()
730 wait = TimingWrapper(cond.wait)
731 cond.acquire()
732 res = wait(TIMEOUT1)
733 cond.release()
734 self.assertEqual(res, None)
735 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
736
737
738class _TestEvent(BaseTestCase):
739
740 def _test_event(self, event):
741 time.sleep(TIMEOUT2)
742 event.set()
743
744 def test_event(self):
745 event = self.Event()
746 wait = TimingWrapper(event.wait)
747
748 # Removed temporaily, due to API shear, this does not
749 # work with threading._Event objects. is_set == isSet
750 #self.assertEqual(event.is_set(), False)
751
752 self.assertEqual(wait(0.0), None)
753 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
754 self.assertEqual(wait(TIMEOUT1), None)
755 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
756
757 event.set()
758
759 # See note above on the API differences
760 # self.assertEqual(event.is_set(), True)
761 self.assertEqual(wait(), None)
762 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
763 self.assertEqual(wait(TIMEOUT1), None)
764 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
765 # self.assertEqual(event.is_set(), True)
766
767 event.clear()
768
769 #self.assertEqual(event.is_set(), False)
770
771 self.Process(target=self._test_event, args=(event,)).start()
772 self.assertEqual(wait(), None)
773
774#
775#
776#
777
778class _TestValue(BaseTestCase):
779
780 codes_values = [
781 ('i', 4343, 24234),
782 ('d', 3.625, -4.25),
783 ('h', -232, 234),
784 ('c', latin('x'), latin('y'))
785 ]
786
787 def _test(self, values):
788 for sv, cv in zip(values, self.codes_values):
789 sv.value = cv[2]
790
791
792 def test_value(self, raw=False):
793 if self.TYPE != 'processes':
794 return
795
796 if raw:
797 values = [self.RawValue(code, value)
798 for code, value, _ in self.codes_values]
799 else:
800 values = [self.Value(code, value)
801 for code, value, _ in self.codes_values]
802
803 for sv, cv in zip(values, self.codes_values):
804 self.assertEqual(sv.value, cv[1])
805
806 proc = self.Process(target=self._test, args=(values,))
807 proc.start()
808 proc.join()
809
810 for sv, cv in zip(values, self.codes_values):
811 self.assertEqual(sv.value, cv[2])
812
813 def test_rawvalue(self):
814 self.test_value(raw=True)
815
816 def test_getobj_getlock(self):
817 if self.TYPE != 'processes':
818 return
819
820 val1 = self.Value('i', 5)
821 lock1 = val1.get_lock()
822 obj1 = val1.get_obj()
823
824 val2 = self.Value('i', 5, lock=None)
825 lock2 = val2.get_lock()
826 obj2 = val2.get_obj()
827
828 lock = self.Lock()
829 val3 = self.Value('i', 5, lock=lock)
830 lock3 = val3.get_lock()
831 obj3 = val3.get_obj()
832 self.assertEqual(lock, lock3)
833
834 arr4 = self.RawValue('i', 5)
835 self.assertFalse(hasattr(arr4, 'get_lock'))
836 self.assertFalse(hasattr(arr4, 'get_obj'))
837
838
839class _TestArray(BaseTestCase):
840
841 def f(self, seq):
842 for i in range(1, len(seq)):
843 seq[i] += seq[i-1]
844
845 def test_array(self, raw=False):
846 if self.TYPE != 'processes':
847 return
848
849 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
850 if raw:
851 arr = self.RawArray('i', seq)
852 else:
853 arr = self.Array('i', seq)
854
855 self.assertEqual(len(arr), len(seq))
856 self.assertEqual(arr[3], seq[3])
857 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
858
859 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
860
861 self.assertEqual(list(arr[:]), seq)
862
863 self.f(seq)
864
865 p = self.Process(target=self.f, args=(arr,))
866 p.start()
867 p.join()
868
869 self.assertEqual(list(arr[:]), seq)
870
871 def test_rawarray(self):
872 self.test_array(raw=True)
873
874 def test_getobj_getlock_obj(self):
875 if self.TYPE != 'processes':
876 return
877
878 arr1 = self.Array('i', list(range(10)))
879 lock1 = arr1.get_lock()
880 obj1 = arr1.get_obj()
881
882 arr2 = self.Array('i', list(range(10)), lock=None)
883 lock2 = arr2.get_lock()
884 obj2 = arr2.get_obj()
885
886 lock = self.Lock()
887 arr3 = self.Array('i', list(range(10)), lock=lock)
888 lock3 = arr3.get_lock()
889 obj3 = arr3.get_obj()
890 self.assertEqual(lock, lock3)
891
892 arr4 = self.RawArray('i', list(range(10)))
893 self.assertFalse(hasattr(arr4, 'get_lock'))
894 self.assertFalse(hasattr(arr4, 'get_obj'))
895
896#
897#
898#
899
900class _TestContainers(BaseTestCase):
901
902 ALLOWED_TYPES = ('manager',)
903
904 def test_list(self):
905 a = self.list(list(range(10)))
906 self.assertEqual(a[:], list(range(10)))
907
908 b = self.list()
909 self.assertEqual(b[:], [])
910
911 b.extend(list(range(5)))
912 self.assertEqual(b[:], list(range(5)))
913
914 self.assertEqual(b[2], 2)
915 self.assertEqual(b[2:10], [2,3,4])
916
917 b *= 2
918 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
919
920 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
921
922 self.assertEqual(a[:], list(range(10)))
923
924 d = [a, b]
925 e = self.list(d)
926 self.assertEqual(
927 e[:],
928 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
929 )
930
931 f = self.list([a])
932 a.append('hello')
933 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
934
935 def test_dict(self):
936 d = self.dict()
937 indices = list(range(65, 70))
938 for i in indices:
939 d[i] = chr(i)
940 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
941 self.assertEqual(sorted(d.keys()), indices)
942 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
943 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
944
945 def test_namespace(self):
946 n = self.Namespace()
947 n.name = 'Bob'
948 n.job = 'Builder'
949 n._hidden = 'hidden'
950 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
951 del n.job
952 self.assertEqual(str(n), "Namespace(name='Bob')")
953 self.assertTrue(hasattr(n, 'name'))
954 self.assertTrue(not hasattr(n, 'job'))
955
956#
957#
958#
959
960def sqr(x, wait=0.0):
961 time.sleep(wait)
962 return x*x
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000963"""
Benjamin Petersone711caf2008-06-11 16:44:04 +0000964class _TestPool(BaseTestCase):
965
966 def test_apply(self):
967 papply = self.pool.apply
968 self.assertEqual(papply(sqr, (5,)), sqr(5))
969 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
970
971 def test_map(self):
972 pmap = self.pool.map
973 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
974 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
975 list(map(sqr, list(range(100)))))
976
977 def test_async(self):
978 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
979 get = TimingWrapper(res.get)
980 self.assertEqual(get(), 49)
981 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
982
983 def test_async_timeout(self):
984 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
985 get = TimingWrapper(res.get)
986 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
987 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
988
989 def test_imap(self):
990 it = self.pool.imap(sqr, list(range(10)))
991 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
992
993 it = self.pool.imap(sqr, list(range(10)))
994 for i in range(10):
995 self.assertEqual(next(it), i*i)
996 self.assertRaises(StopIteration, it.__next__)
997
998 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
999 for i in range(1000):
1000 self.assertEqual(next(it), i*i)
1001 self.assertRaises(StopIteration, it.__next__)
1002
1003 def test_imap_unordered(self):
1004 it = self.pool.imap_unordered(sqr, list(range(1000)))
1005 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1006
1007 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1008 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1009
1010 def test_make_pool(self):
1011 p = multiprocessing.Pool(3)
1012 self.assertEqual(3, len(p._pool))
1013 p.close()
1014 p.join()
1015
1016 def test_terminate(self):
1017 if self.TYPE == 'manager':
1018 # On Unix a forked process increfs each shared object to
1019 # which its parent process held a reference. If the
1020 # forked process gets terminated then there is likely to
1021 # be a reference leak. So to prevent
1022 # _TestZZZNumberOfObjects from failing we skip this test
1023 # when using a manager.
1024 return
1025
1026 result = self.pool.map_async(
1027 time.sleep, [0.1 for i in range(10000)], chunksize=1
1028 )
1029 self.pool.terminate()
1030 join = TimingWrapper(self.pool.join)
1031 join()
1032 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001033"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001034#
1035# Test that manager has expected number of shared objects left
1036#
1037
1038class _TestZZZNumberOfObjects(BaseTestCase):
1039 # Because test cases are sorted alphabetically, this one will get
1040 # run after all the other tests for the manager. It tests that
1041 # there have been no "reference leaks" for the manager's shared
1042 # objects. Note the comment in _TestPool.test_terminate().
1043 ALLOWED_TYPES = ('manager',)
1044
1045 def test_number_of_objects(self):
1046 EXPECTED_NUMBER = 1 # the pool object is still alive
1047 multiprocessing.active_children() # discard dead process objs
1048 gc.collect() # do garbage collection
1049 refs = self.manager._number_of_objects()
1050 if refs != EXPECTED_NUMBER:
1051 print(self.manager._debugInfo())
1052
1053 self.assertEqual(refs, EXPECTED_NUMBER)
1054
1055#
1056# Test of creating a customized manager class
1057#
1058
1059from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1060
1061class FooBar(object):
1062 def f(self):
1063 return 'f()'
1064 def g(self):
1065 raise ValueError
1066 def _h(self):
1067 return '_h()'
1068
1069def baz():
1070 for i in range(10):
1071 yield i*i
1072
1073class IteratorProxy(BaseProxy):
1074 _exposed_ = ('next', '__next__')
1075 def __iter__(self):
1076 return self
1077 def __next__(self):
1078 return self._callmethod('next')
1079 def __next__(self):
1080 return self._callmethod('__next__')
1081
1082class MyManager(BaseManager):
1083 pass
1084
1085MyManager.register('Foo', callable=FooBar)
1086MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1087MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1088
1089
1090class _TestMyManager(BaseTestCase):
1091
1092 ALLOWED_TYPES = ('manager',)
1093
1094 def test_mymanager(self):
1095 manager = MyManager()
1096 manager.start()
1097
1098 foo = manager.Foo()
1099 bar = manager.Bar()
1100 baz = manager.baz()
1101
1102 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1103 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1104
1105 self.assertEqual(foo_methods, ['f', 'g'])
1106 self.assertEqual(bar_methods, ['f', '_h'])
1107
1108 self.assertEqual(foo.f(), 'f()')
1109 self.assertRaises(ValueError, foo.g)
1110 self.assertEqual(foo._callmethod('f'), 'f()')
1111 self.assertRaises(RemoteError, foo._callmethod, '_h')
1112
1113 self.assertEqual(bar.f(), 'f()')
1114 self.assertEqual(bar._h(), '_h()')
1115 self.assertEqual(bar._callmethod('f'), 'f()')
1116 self.assertEqual(bar._callmethod('_h'), '_h()')
1117
1118 self.assertEqual(list(baz), [i*i for i in range(10)])
1119
1120 manager.shutdown()
1121
1122#
1123# Test of connecting to a remote server and using xmlrpclib for serialization
1124#
1125
1126_queue = pyqueue.Queue()
1127def get_queue():
1128 return _queue
1129
1130class QueueManager(BaseManager):
1131 '''manager class used by server process'''
1132QueueManager.register('get_queue', callable=get_queue)
1133
1134class QueueManager2(BaseManager):
1135 '''manager class which specifies the same interface as QueueManager'''
1136QueueManager2.register('get_queue')
1137
1138
1139SERIALIZER = 'xmlrpclib'
1140
1141class _TestRemoteManager(BaseTestCase):
1142
1143 ALLOWED_TYPES = ('manager',)
1144
1145 def _putter(self, address, authkey):
1146 manager = QueueManager2(
1147 address=address, authkey=authkey, serializer=SERIALIZER
1148 )
1149 manager.connect()
1150 queue = manager.get_queue()
1151 queue.put(('hello world', None, True, 2.25))
1152
1153 def test_remote(self):
1154 authkey = os.urandom(32)
1155
1156 manager = QueueManager(
1157 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1158 )
1159 manager.start()
1160
1161 p = self.Process(target=self._putter, args=(manager.address, authkey))
1162 p.start()
1163
1164 manager2 = QueueManager2(
1165 address=manager.address, authkey=authkey, serializer=SERIALIZER
1166 )
1167 manager2.connect()
1168 queue = manager2.get_queue()
1169
1170 # Note that xmlrpclib will deserialize object as a list not a tuple
1171 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1172
1173 # Because we are using xmlrpclib for serialization instead of
1174 # pickle this will cause a serialization error.
1175 self.assertRaises(Exception, queue.put, time.sleep)
1176
1177 # Make queue finalizer run before the server is stopped
1178 del queue
1179 manager.shutdown()
1180
1181#
1182#
1183#
1184
1185SENTINEL = latin('')
1186
1187class _TestConnection(BaseTestCase):
1188
1189 ALLOWED_TYPES = ('processes', 'threads')
1190
1191 def _echo(self, conn):
1192 for msg in iter(conn.recv_bytes, SENTINEL):
1193 conn.send_bytes(msg)
1194 conn.close()
1195
1196 def test_connection(self):
1197 conn, child_conn = self.Pipe()
1198
1199 p = self.Process(target=self._echo, args=(child_conn,))
1200 p.set_daemon(True)
1201 p.start()
1202
1203 seq = [1, 2.25, None]
1204 msg = latin('hello world')
1205 longmsg = msg * 10
1206 arr = array.array('i', list(range(4)))
1207
1208 if self.TYPE == 'processes':
1209 self.assertEqual(type(conn.fileno()), int)
1210
1211 self.assertEqual(conn.send(seq), None)
1212 self.assertEqual(conn.recv(), seq)
1213
1214 self.assertEqual(conn.send_bytes(msg), None)
1215 self.assertEqual(conn.recv_bytes(), msg)
1216
1217 if self.TYPE == 'processes':
1218 buffer = array.array('i', [0]*10)
1219 expected = list(arr) + [0] * (10 - len(arr))
1220 self.assertEqual(conn.send_bytes(arr), None)
1221 self.assertEqual(conn.recv_bytes_into(buffer),
1222 len(arr) * buffer.itemsize)
1223 self.assertEqual(list(buffer), expected)
1224
1225 buffer = array.array('i', [0]*10)
1226 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1227 self.assertEqual(conn.send_bytes(arr), None)
1228 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1229 len(arr) * buffer.itemsize)
1230 self.assertEqual(list(buffer), expected)
1231
1232 buffer = bytearray(latin(' ' * 40))
1233 self.assertEqual(conn.send_bytes(longmsg), None)
1234 try:
1235 res = conn.recv_bytes_into(buffer)
1236 except multiprocessing.BufferTooShort as e:
1237 self.assertEqual(e.args, (longmsg,))
1238 else:
1239 self.fail('expected BufferTooShort, got %s' % res)
1240
1241 poll = TimingWrapper(conn.poll)
1242
1243 self.assertEqual(poll(), False)
1244 self.assertTimingAlmostEqual(poll.elapsed, 0)
1245
1246 self.assertEqual(poll(TIMEOUT1), False)
1247 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1248
1249 conn.send(None)
1250
1251 self.assertEqual(poll(TIMEOUT1), True)
1252 self.assertTimingAlmostEqual(poll.elapsed, 0)
1253
1254 self.assertEqual(conn.recv(), None)
1255
1256 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1257 conn.send_bytes(really_big_msg)
1258 self.assertEqual(conn.recv_bytes(), really_big_msg)
1259
1260 conn.send_bytes(SENTINEL) # tell child to quit
1261 child_conn.close()
1262
1263 if self.TYPE == 'processes':
1264 self.assertEqual(conn.readable, True)
1265 self.assertEqual(conn.writable, True)
1266 self.assertRaises(EOFError, conn.recv)
1267 self.assertRaises(EOFError, conn.recv_bytes)
1268
1269 p.join()
1270
1271 def test_duplex_false(self):
1272 reader, writer = self.Pipe(duplex=False)
1273 self.assertEqual(writer.send(1), None)
1274 self.assertEqual(reader.recv(), 1)
1275 if self.TYPE == 'processes':
1276 self.assertEqual(reader.readable, True)
1277 self.assertEqual(reader.writable, False)
1278 self.assertEqual(writer.readable, False)
1279 self.assertEqual(writer.writable, True)
1280 self.assertRaises(IOError, reader.send, 2)
1281 self.assertRaises(IOError, writer.recv)
1282 self.assertRaises(IOError, writer.poll)
1283
1284 def test_spawn_close(self):
1285 # We test that a pipe connection can be closed by parent
1286 # process immediately after child is spawned. On Windows this
1287 # would have sometimes failed on old versions because
1288 # child_conn would be closed before the child got a chance to
1289 # duplicate it.
1290 conn, child_conn = self.Pipe()
1291
1292 p = self.Process(target=self._echo, args=(child_conn,))
1293 p.start()
1294 child_conn.close() # this might complete before child initializes
1295
1296 msg = latin('hello')
1297 conn.send_bytes(msg)
1298 self.assertEqual(conn.recv_bytes(), msg)
1299
1300 conn.send_bytes(SENTINEL)
1301 conn.close()
1302 p.join()
1303
1304 def test_sendbytes(self):
1305 if self.TYPE != 'processes':
1306 return
1307
1308 msg = latin('abcdefghijklmnopqrstuvwxyz')
1309 a, b = self.Pipe()
1310
1311 a.send_bytes(msg)
1312 self.assertEqual(b.recv_bytes(), msg)
1313
1314 a.send_bytes(msg, 5)
1315 self.assertEqual(b.recv_bytes(), msg[5:])
1316
1317 a.send_bytes(msg, 7, 8)
1318 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1319
1320 a.send_bytes(msg, 26)
1321 self.assertEqual(b.recv_bytes(), latin(''))
1322
1323 a.send_bytes(msg, 26, 0)
1324 self.assertEqual(b.recv_bytes(), latin(''))
1325
1326 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1327
1328 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1329
1330 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1331
1332 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1333
1334 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1335
1336
1337class _TestListenerClient(BaseTestCase):
1338
1339 ALLOWED_TYPES = ('processes', 'threads')
1340
1341 def _test(self, address):
1342 conn = self.connection.Client(address)
1343 conn.send('hello')
1344 conn.close()
1345
1346 def test_listener_client(self):
1347 for family in self.connection.families:
1348 l = self.connection.Listener(family=family)
1349 p = self.Process(target=self._test, args=(l.address,))
1350 p.set_daemon(True)
1351 p.start()
1352 conn = l.accept()
1353 self.assertEqual(conn.recv(), 'hello')
1354 p.join()
1355 l.close()
1356
1357#
1358# Test of sending connection and socket objects between processes
1359#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001360"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001361class _TestPicklingConnections(BaseTestCase):
1362
1363 ALLOWED_TYPES = ('processes',)
1364
1365 def _listener(self, conn, families):
1366 for fam in families:
1367 l = self.connection.Listener(family=fam)
1368 conn.send(l.address)
1369 new_conn = l.accept()
1370 conn.send(new_conn)
1371
1372 if self.TYPE == 'processes':
1373 l = socket.socket()
1374 l.bind(('localhost', 0))
1375 conn.send(l.getsockname())
1376 l.listen(1)
1377 new_conn, addr = l.accept()
1378 conn.send(new_conn)
1379
1380 conn.recv()
1381
1382 def _remote(self, conn):
1383 for (address, msg) in iter(conn.recv, None):
1384 client = self.connection.Client(address)
1385 client.send(msg.upper())
1386 client.close()
1387
1388 if self.TYPE == 'processes':
1389 address, msg = conn.recv()
1390 client = socket.socket()
1391 client.connect(address)
1392 client.sendall(msg.upper())
1393 client.close()
1394
1395 conn.close()
1396
1397 def test_pickling(self):
1398 try:
1399 multiprocessing.allow_connection_pickling()
1400 except ImportError:
1401 return
1402
1403 families = self.connection.families
1404
1405 lconn, lconn0 = self.Pipe()
1406 lp = self.Process(target=self._listener, args=(lconn0, families))
1407 lp.start()
1408 lconn0.close()
1409
1410 rconn, rconn0 = self.Pipe()
1411 rp = self.Process(target=self._remote, args=(rconn0,))
1412 rp.start()
1413 rconn0.close()
1414
1415 for fam in families:
1416 msg = ('This connection uses family %s' % fam).encode('ascii')
1417 address = lconn.recv()
1418 rconn.send((address, msg))
1419 new_conn = lconn.recv()
1420 self.assertEqual(new_conn.recv(), msg.upper())
1421
1422 rconn.send(None)
1423
1424 if self.TYPE == 'processes':
1425 msg = latin('This connection uses a normal socket')
1426 address = lconn.recv()
1427 rconn.send((address, msg))
1428 if hasattr(socket, 'fromfd'):
1429 new_conn = lconn.recv()
1430 self.assertEqual(new_conn.recv(100), msg.upper())
1431 else:
1432 # XXX On Windows with Py2.6 need to backport fromfd()
1433 discard = lconn.recv_bytes()
1434
1435 lconn.send(None)
1436
1437 rconn.close()
1438 lconn.close()
1439
1440 lp.join()
1441 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001442"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001443#
1444#
1445#
1446
1447class _TestHeap(BaseTestCase):
1448
1449 ALLOWED_TYPES = ('processes',)
1450
1451 def test_heap(self):
1452 iterations = 5000
1453 maxblocks = 50
1454 blocks = []
1455
1456 # create and destroy lots of blocks of different sizes
1457 for i in range(iterations):
1458 size = int(random.lognormvariate(0, 1) * 1000)
1459 b = multiprocessing.heap.BufferWrapper(size)
1460 blocks.append(b)
1461 if len(blocks) > maxblocks:
1462 i = random.randrange(maxblocks)
1463 del blocks[i]
1464
1465 # get the heap object
1466 heap = multiprocessing.heap.BufferWrapper._heap
1467
1468 # verify the state of the heap
1469 all = []
1470 occupied = 0
1471 for L in list(heap._len_to_seq.values()):
1472 for arena, start, stop in L:
1473 all.append((heap._arenas.index(arena), start, stop,
1474 stop-start, 'free'))
1475 for arena, start, stop in heap._allocated_blocks:
1476 all.append((heap._arenas.index(arena), start, stop,
1477 stop-start, 'occupied'))
1478 occupied += (stop-start)
1479
1480 all.sort()
1481
1482 for i in range(len(all)-1):
1483 (arena, start, stop) = all[i][:3]
1484 (narena, nstart, nstop) = all[i+1][:3]
1485 self.assertTrue((arena != narena and nstart == 0) or
1486 (stop == nstart))
1487
1488#
1489#
1490#
1491
1492try:
1493 from ctypes import Structure, Value, copy, c_int, c_double
1494except ImportError:
1495 Structure = object
1496 c_int = c_double = None
1497
1498class _Foo(Structure):
1499 _fields_ = [
1500 ('x', c_int),
1501 ('y', c_double)
1502 ]
1503
1504class _TestSharedCTypes(BaseTestCase):
1505
1506 ALLOWED_TYPES = ('processes',)
1507
1508 def _double(self, x, y, foo, arr, string):
1509 x.value *= 2
1510 y.value *= 2
1511 foo.x *= 2
1512 foo.y *= 2
1513 string.value *= 2
1514 for i in range(len(arr)):
1515 arr[i] *= 2
1516
1517 def test_sharedctypes(self, lock=False):
1518 if c_int is None:
1519 return
1520
1521 x = Value('i', 7, lock=lock)
1522 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1523 foo = Value(_Foo, 3, 2, lock=lock)
1524 arr = Array('d', list(range(10)), lock=lock)
1525 string = Array('c', 20, lock=lock)
1526 string.value = 'hello'
1527
1528 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1529 p.start()
1530 p.join()
1531
1532 self.assertEqual(x.value, 14)
1533 self.assertAlmostEqual(y.value, 2.0/3.0)
1534 self.assertEqual(foo.x, 6)
1535 self.assertAlmostEqual(foo.y, 4.0)
1536 for i in range(10):
1537 self.assertAlmostEqual(arr[i], i*2)
1538 self.assertEqual(string.value, latin('hellohello'))
1539
1540 def test_synchronize(self):
1541 self.test_sharedctypes(lock=True)
1542
1543 def test_copy(self):
1544 if c_int is None:
1545 return
1546
1547 foo = _Foo(2, 5.0)
1548 bar = copy(foo)
1549 foo.x = 0
1550 foo.y = 0
1551 self.assertEqual(bar.x, 2)
1552 self.assertAlmostEqual(bar.y, 5.0)
1553
1554#
1555#
1556#
1557
1558class _TestFinalize(BaseTestCase):
1559
1560 ALLOWED_TYPES = ('processes',)
1561
1562 def _test_finalize(self, conn):
1563 class Foo(object):
1564 pass
1565
1566 a = Foo()
1567 util.Finalize(a, conn.send, args=('a',))
1568 del a # triggers callback for a
1569
1570 b = Foo()
1571 close_b = util.Finalize(b, conn.send, args=('b',))
1572 close_b() # triggers callback for b
1573 close_b() # does nothing because callback has already been called
1574 del b # does nothing because callback has already been called
1575
1576 c = Foo()
1577 util.Finalize(c, conn.send, args=('c',))
1578
1579 d10 = Foo()
1580 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1581
1582 d01 = Foo()
1583 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1584 d02 = Foo()
1585 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1586 d03 = Foo()
1587 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1588
1589 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1590
1591 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1592
1593 # call mutliprocessing's cleanup function then exit process without
1594 # garbage collecting locals
1595 util._exit_function()
1596 conn.close()
1597 os._exit(0)
1598
1599 def test_finalize(self):
1600 conn, child_conn = self.Pipe()
1601
1602 p = self.Process(target=self._test_finalize, args=(child_conn,))
1603 p.start()
1604 p.join()
1605
1606 result = [obj for obj in iter(conn.recv, 'STOP')]
1607 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1608
1609#
1610# Test that from ... import * works for each module
1611#
1612
1613class _TestImportStar(BaseTestCase):
1614
1615 ALLOWED_TYPES = ('processes',)
1616
1617 def test_import(self):
1618 modules = (
1619 'multiprocessing', 'multiprocessing.connection',
1620 'multiprocessing.heap', 'multiprocessing.managers',
1621 'multiprocessing.pool', 'multiprocessing.process',
1622 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1623 'multiprocessing.synchronize', 'multiprocessing.util'
1624 )
1625
1626 for name in modules:
1627 __import__(name)
1628 mod = sys.modules[name]
1629
1630 for attr in getattr(mod, '__all__', ()):
1631 self.assertTrue(
1632 hasattr(mod, attr),
1633 '%r does not have attribute %r' % (mod, attr)
1634 )
1635
1636#
1637# Quick test that logging works -- does not test logging output
1638#
1639
1640class _TestLogging(BaseTestCase):
1641
1642 ALLOWED_TYPES = ('processes',)
1643
1644 def test_enable_logging(self):
1645 logger = multiprocessing.get_logger()
1646 logger.setLevel(util.SUBWARNING)
1647 self.assertTrue(logger is not None)
1648 logger.debug('this will not be printed')
1649 logger.info('nor will this')
1650 logger.setLevel(LOG_LEVEL)
1651
1652 def _test_level(self, conn):
1653 logger = multiprocessing.get_logger()
1654 conn.send(logger.getEffectiveLevel())
1655
1656 def test_level(self):
1657 LEVEL1 = 32
1658 LEVEL2 = 37
1659
1660 logger = multiprocessing.get_logger()
1661 root_logger = logging.getLogger()
1662 root_level = root_logger.level
1663
1664 reader, writer = multiprocessing.Pipe(duplex=False)
1665
1666 logger.setLevel(LEVEL1)
1667 self.Process(target=self._test_level, args=(writer,)).start()
1668 self.assertEqual(LEVEL1, reader.recv())
1669
1670 logger.setLevel(logging.NOTSET)
1671 root_logger.setLevel(LEVEL2)
1672 self.Process(target=self._test_level, args=(writer,)).start()
1673 self.assertEqual(LEVEL2, reader.recv())
1674
1675 root_logger.setLevel(root_level)
1676 logger.setLevel(level=LOG_LEVEL)
1677
1678#
1679# Functions used to create test cases from the base ones in this module
1680#
1681
1682def get_attributes(Source, names):
1683 d = {}
1684 for name in names:
1685 obj = getattr(Source, name)
1686 if type(obj) == type(get_attributes):
1687 obj = staticmethod(obj)
1688 d[name] = obj
1689 return d
1690
1691def create_test_cases(Mixin, type):
1692 result = {}
1693 glob = globals()
1694 Type = type[0].upper() + type[1:]
1695
1696 for name in list(glob.keys()):
1697 if name.startswith('_Test'):
1698 base = glob[name]
1699 if type in base.ALLOWED_TYPES:
1700 newname = 'With' + Type + name[1:]
1701 class Temp(base, unittest.TestCase, Mixin):
1702 pass
1703 result[newname] = Temp
1704 Temp.__name__ = newname
1705 Temp.__module__ = Mixin.__module__
1706 return result
1707
1708#
1709# Create test cases
1710#
1711
1712class ProcessesMixin(object):
1713 TYPE = 'processes'
1714 Process = multiprocessing.Process
1715 locals().update(get_attributes(multiprocessing, (
1716 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1717 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1718 'RawArray', 'current_process', 'active_children', 'Pipe',
1719 'connection', 'JoinableQueue'
1720 )))
1721
1722testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1723globals().update(testcases_processes)
1724
1725
1726class ManagerMixin(object):
1727 TYPE = 'manager'
1728 Process = multiprocessing.Process
1729 manager = object.__new__(multiprocessing.managers.SyncManager)
1730 locals().update(get_attributes(manager, (
1731 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1732 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1733 'Namespace', 'JoinableQueue'
1734 )))
1735
1736testcases_manager = create_test_cases(ManagerMixin, type='manager')
1737globals().update(testcases_manager)
1738
1739
1740class ThreadsMixin(object):
1741 TYPE = 'threads'
1742 Process = multiprocessing.dummy.Process
1743 locals().update(get_attributes(multiprocessing.dummy, (
1744 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1745 'Condition', 'Event', 'Value', 'Array', 'current_process',
1746 'active_children', 'Pipe', 'connection', 'dict', 'list',
1747 'Namespace', 'JoinableQueue'
1748 )))
1749
1750testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1751globals().update(testcases_threads)
1752
1753#
1754#
1755#
1756
1757def test_main(run=None):
1758 if run is None:
1759 from test.support import run_unittest as run
1760
1761 util.get_temp_dir() # creates temp directory for use by all processes
1762
1763 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1764
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001765 #ProcessesMixin.pool = multiprocessing.Pool(4)
1766 #ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1767 #ManagerMixin.manager.__init__()
1768 #ManagerMixin.manager.start()
1769 #ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001770
1771 testcases = (
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001772 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) #+
1773 #sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1774 #sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001775 )
1776
1777 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1778 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1779 run(suite)
1780
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001781 #ThreadsMixin.pool.terminate()
1782 #ProcessesMixin.pool.terminate()
1783 #ManagerMixin.pool.terminate()
1784 #ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001785
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001786 #del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001787
1788def main():
1789 test_main(unittest.TextTestRunner(verbosity=2).run)
1790
1791if __name__ == '__main__':
1792 main()