blob: b9a360ac2b51ffcc006d69df3ac4a2d8be40a363 [file] [log] [blame]
Benjamin Petersone711caf2008-06-11 16:44:04 +00001#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import threading
7import queue as pyqueue
8import time
9import sys
10import os
11import gc
12import signal
13import array
14import copy
15import socket
16import random
17import logging
18
19import multiprocessing.dummy
20import multiprocessing.connection
21import multiprocessing.managers
22import multiprocessing.heap
23import multiprocessing.managers
24import multiprocessing.pool
25import _multiprocessing
26
27from multiprocessing import util
28
29#
30#
31#
32
33if sys.version_info >= (3, 0):
34 def latin(s):
35 return s.encode('latin')
36else:
37 latin = str
38
39try:
40 bytes
41except NameError:
42 bytes = str
43 def bytearray(seq):
44 return array.array('c', seq)
45
46#
47# Constants
48#
49
50LOG_LEVEL = util.SUBWARNING
51#LOG_LEVEL = logging.WARNING
52
53DELTA = 0.1
54CHECK_TIMINGS = False # making true makes tests take a lot longer
55 # and can sometimes cause some non-serious
56 # failures because some calls block a bit
57 # longer than expected
58if CHECK_TIMINGS:
59 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
60else:
61 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
62
63HAVE_GETVALUE = not getattr(_multiprocessing,
64 'HAVE_BROKEN_SEM_GETVALUE', False)
65
66#
67# Creates a wrapper for a function which records the time it takes to finish
68#
69
70class TimingWrapper(object):
71
72 def __init__(self, func):
73 self.func = func
74 self.elapsed = None
75
76 def __call__(self, *args, **kwds):
77 t = time.time()
78 try:
79 return self.func(*args, **kwds)
80 finally:
81 self.elapsed = time.time() - t
82
83#
84# Base class for test cases
85#
86
87class BaseTestCase(object):
88
89 ALLOWED_TYPES = ('processes', 'manager', 'threads')
90
91 def assertTimingAlmostEqual(self, a, b):
92 if CHECK_TIMINGS:
93 self.assertAlmostEqual(a, b, 1)
94
95 def assertReturnsIfImplemented(self, value, func, *args):
96 try:
97 res = func(*args)
98 except NotImplementedError:
99 pass
100 else:
101 return self.assertEqual(value, res)
102
103#
104# Return the value of a semaphore
105#
106
107def get_value(self):
108 try:
109 return self.get_value()
110 except AttributeError:
111 try:
112 return self._Semaphore__value
113 except AttributeError:
114 try:
115 return self._value
116 except AttributeError:
117 raise NotImplementedError
118
119#
120# Testcases
121#
122
123class _TestProcess(BaseTestCase):
124
125 ALLOWED_TYPES = ('processes', 'threads')
126
127 def test_current(self):
128 if self.TYPE == 'threads':
129 return
130
131 current = self.current_process()
132 authkey = current.get_authkey()
133
134 self.assertTrue(current.is_alive())
135 self.assertTrue(not current.is_daemon())
136 self.assertTrue(isinstance(authkey, bytes))
137 self.assertTrue(len(authkey) > 0)
138 self.assertEqual(current.get_ident(), os.getpid())
139 self.assertEqual(current.get_exitcode(), None)
140
141 def _test(self, q, *args, **kwds):
142 current = self.current_process()
143 q.put(args)
144 q.put(kwds)
145 q.put(current.get_name())
146 if self.TYPE != 'threads':
147 q.put(bytes(current.get_authkey()))
148 q.put(current.pid)
149
150 def test_process(self):
151 q = self.Queue(1)
152 e = self.Event()
153 args = (q, 1, 2)
154 kwargs = {'hello':23, 'bye':2.54}
155 name = 'SomeProcess'
156 p = self.Process(
157 target=self._test, args=args, kwargs=kwargs, name=name
158 )
159 p.set_daemon(True)
160 current = self.current_process()
161
162 if self.TYPE != 'threads':
163 self.assertEquals(p.get_authkey(), current.get_authkey())
164 self.assertEquals(p.is_alive(), False)
165 self.assertEquals(p.is_daemon(), True)
166 self.assertTrue(p not in self.active_children())
167 self.assertTrue(type(self.active_children()) is list)
168 self.assertEqual(p.get_exitcode(), None)
169
170 p.start()
171
172 self.assertEquals(p.get_exitcode(), None)
173 self.assertEquals(p.is_alive(), True)
174 self.assertTrue(p in self.active_children())
175
176 self.assertEquals(q.get(), args[1:])
177 self.assertEquals(q.get(), kwargs)
178 self.assertEquals(q.get(), p.get_name())
179 if self.TYPE != 'threads':
180 self.assertEquals(q.get(), current.get_authkey())
181 self.assertEquals(q.get(), p.pid)
182
183 p.join()
184
185 self.assertEquals(p.get_exitcode(), 0)
186 self.assertEquals(p.is_alive(), False)
187 self.assertTrue(p not in self.active_children())
188
189 def _test_terminate(self):
190 time.sleep(1000)
191
192 def test_terminate(self):
193 if self.TYPE == 'threads':
194 return
195
196 p = self.Process(target=self._test_terminate)
197 p.set_daemon(True)
198 p.start()
199
200 self.assertEqual(p.is_alive(), True)
201 self.assertTrue(p in self.active_children())
202 self.assertEqual(p.get_exitcode(), None)
203
204 p.terminate()
205
206 join = TimingWrapper(p.join)
207 self.assertEqual(join(), None)
208 self.assertTimingAlmostEqual(join.elapsed, 0.0)
209
210 self.assertEqual(p.is_alive(), False)
211 self.assertTrue(p not in self.active_children())
212
213 p.join()
214
215 # XXX sometimes get p.get_exitcode() == 0 on Windows ...
216 #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
217
218 def test_cpu_count(self):
219 try:
220 cpus = multiprocessing.cpu_count()
221 except NotImplementedError:
222 cpus = 1
223 self.assertTrue(type(cpus) is int)
224 self.assertTrue(cpus >= 1)
225
226 def test_active_children(self):
227 self.assertEqual(type(self.active_children()), list)
228
229 p = self.Process(target=time.sleep, args=(DELTA,))
230 self.assertTrue(p not in self.active_children())
231
232 p.start()
233 self.assertTrue(p in self.active_children())
234
235 p.join()
236 self.assertTrue(p not in self.active_children())
237
238 def _test_recursion(self, wconn, id):
239 from multiprocessing import forking
240 wconn.send(id)
241 if len(id) < 2:
242 for i in range(2):
243 p = self.Process(
244 target=self._test_recursion, args=(wconn, id+[i])
245 )
246 p.start()
247 p.join()
248
249 def test_recursion(self):
250 rconn, wconn = self.Pipe(duplex=False)
251 self._test_recursion(wconn, [])
252
253 time.sleep(DELTA)
254 result = []
255 while rconn.poll():
256 result.append(rconn.recv())
257
258 expected = [
259 [],
260 [0],
261 [0, 0],
262 [0, 1],
263 [1],
264 [1, 0],
265 [1, 1]
266 ]
267 self.assertEqual(result, expected)
268
269#
270#
271#
272
273class _UpperCaser(multiprocessing.Process):
274
275 def __init__(self):
276 multiprocessing.Process.__init__(self)
277 self.child_conn, self.parent_conn = multiprocessing.Pipe()
278
279 def run(self):
280 self.parent_conn.close()
281 for s in iter(self.child_conn.recv, None):
282 self.child_conn.send(s.upper())
283 self.child_conn.close()
284
285 def submit(self, s):
286 assert type(s) is str
287 self.parent_conn.send(s)
288 return self.parent_conn.recv()
289
290 def stop(self):
291 self.parent_conn.send(None)
292 self.parent_conn.close()
293 self.child_conn.close()
294
295class _TestSubclassingProcess(BaseTestCase):
296
297 ALLOWED_TYPES = ('processes',)
298
299 def test_subclassing(self):
300 uppercaser = _UpperCaser()
301 uppercaser.start()
302 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
303 self.assertEqual(uppercaser.submit('world'), 'WORLD')
304 uppercaser.stop()
305 uppercaser.join()
306
307#
308#
309#
310
311def queue_empty(q):
312 if hasattr(q, 'empty'):
313 return q.empty()
314 else:
315 return q.qsize() == 0
316
317def queue_full(q, maxsize):
318 if hasattr(q, 'full'):
319 return q.full()
320 else:
321 return q.qsize() == maxsize
322
323
324class _TestQueue(BaseTestCase):
325
326
327 def _test_put(self, queue, child_can_start, parent_can_continue):
328 child_can_start.wait()
329 for i in range(6):
330 queue.get()
331 parent_can_continue.set()
332
333 def test_put(self):
334 MAXSIZE = 6
335 queue = self.Queue(maxsize=MAXSIZE)
336 child_can_start = self.Event()
337 parent_can_continue = self.Event()
338
339 proc = self.Process(
340 target=self._test_put,
341 args=(queue, child_can_start, parent_can_continue)
342 )
343 proc.set_daemon(True)
344 proc.start()
345
346 self.assertEqual(queue_empty(queue), True)
347 self.assertEqual(queue_full(queue, MAXSIZE), False)
348
349 queue.put(1)
350 queue.put(2, True)
351 queue.put(3, True, None)
352 queue.put(4, False)
353 queue.put(5, False, None)
354 queue.put_nowait(6)
355
356 # the values may be in buffer but not yet in pipe so sleep a bit
357 time.sleep(DELTA)
358
359 self.assertEqual(queue_empty(queue), False)
360 self.assertEqual(queue_full(queue, MAXSIZE), True)
361
362 put = TimingWrapper(queue.put)
363 put_nowait = TimingWrapper(queue.put_nowait)
364
365 self.assertRaises(pyqueue.Full, put, 7, False)
366 self.assertTimingAlmostEqual(put.elapsed, 0)
367
368 self.assertRaises(pyqueue.Full, put, 7, False, None)
369 self.assertTimingAlmostEqual(put.elapsed, 0)
370
371 self.assertRaises(pyqueue.Full, put_nowait, 7)
372 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
373
374 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
375 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
376
377 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
378 self.assertTimingAlmostEqual(put.elapsed, 0)
379
380 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
381 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
382
383 child_can_start.set()
384 parent_can_continue.wait()
385
386 self.assertEqual(queue_empty(queue), True)
387 self.assertEqual(queue_full(queue, MAXSIZE), False)
388
389 proc.join()
390
391 def _test_get(self, queue, child_can_start, parent_can_continue):
392 child_can_start.wait()
393 queue.put(1)
394 queue.put(2)
395 queue.put(3)
396 queue.put(4)
397 queue.put(5)
398 parent_can_continue.set()
399
400 def test_get(self):
401 queue = self.Queue()
402 child_can_start = self.Event()
403 parent_can_continue = self.Event()
404
405 proc = self.Process(
406 target=self._test_get,
407 args=(queue, child_can_start, parent_can_continue)
408 )
409 proc.set_daemon(True)
410 proc.start()
411
412 self.assertEqual(queue_empty(queue), True)
413
414 child_can_start.set()
415 parent_can_continue.wait()
416
417 time.sleep(DELTA)
418 self.assertEqual(queue_empty(queue), False)
419
420 self.assertEqual(queue.get(), 1)
421 self.assertEqual(queue.get(True, None), 2)
422 self.assertEqual(queue.get(True), 3)
423 self.assertEqual(queue.get(timeout=1), 4)
424 self.assertEqual(queue.get_nowait(), 5)
425
426 self.assertEqual(queue_empty(queue), True)
427
428 get = TimingWrapper(queue.get)
429 get_nowait = TimingWrapper(queue.get_nowait)
430
431 self.assertRaises(pyqueue.Empty, get, False)
432 self.assertTimingAlmostEqual(get.elapsed, 0)
433
434 self.assertRaises(pyqueue.Empty, get, False, None)
435 self.assertTimingAlmostEqual(get.elapsed, 0)
436
437 self.assertRaises(pyqueue.Empty, get_nowait)
438 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
439
440 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
441 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
442
443 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
444 self.assertTimingAlmostEqual(get.elapsed, 0)
445
446 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
447 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
448
449 proc.join()
450
451 def _test_fork(self, queue):
452 for i in range(10, 20):
453 queue.put(i)
454 # note that at this point the items may only be buffered, so the
455 # process cannot shutdown until the feeder thread has finished
456 # pushing items onto the pipe.
457
458 def test_fork(self):
459 # Old versions of Queue would fail to create a new feeder
460 # thread for a forked process if the original process had its
461 # own feeder thread. This test checks that this no longer
462 # happens.
463
464 queue = self.Queue()
465
466 # put items on queue so that main process starts a feeder thread
467 for i in range(10):
468 queue.put(i)
469
470 # wait to make sure thread starts before we fork a new process
471 time.sleep(DELTA)
472
473 # fork process
474 p = self.Process(target=self._test_fork, args=(queue,))
475 p.start()
476
477 # check that all expected items are in the queue
478 for i in range(20):
479 self.assertEqual(queue.get(), i)
480 self.assertRaises(pyqueue.Empty, queue.get, False)
481
482 p.join()
483
484 def test_qsize(self):
485 q = self.Queue()
486 try:
487 self.assertEqual(q.qsize(), 0)
488 except NotImplementedError:
489 return
490 q.put(1)
491 self.assertEqual(q.qsize(), 1)
492 q.put(5)
493 self.assertEqual(q.qsize(), 2)
494 q.get()
495 self.assertEqual(q.qsize(), 1)
496 q.get()
497 self.assertEqual(q.qsize(), 0)
498
499 def _test_task_done(self, q):
500 for obj in iter(q.get, None):
501 time.sleep(DELTA)
502 q.task_done()
503
504 def test_task_done(self):
505 queue = self.JoinableQueue()
506
507 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
508 return
509
510 workers = [self.Process(target=self._test_task_done, args=(queue,))
511 for i in range(4)]
512
513 for p in workers:
514 p.start()
515
516 for i in range(10):
517 queue.put(i)
518
519 queue.join()
520
521 for p in workers:
522 queue.put(None)
523
524 for p in workers:
525 p.join()
526
527#
528#
529#
530
531class _TestLock(BaseTestCase):
532
533 def test_lock(self):
534 lock = self.Lock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(False), False)
537 self.assertEqual(lock.release(), None)
538 self.assertRaises((ValueError, threading.ThreadError), lock.release)
539
540 def test_rlock(self):
541 lock = self.RLock()
542 self.assertEqual(lock.acquire(), True)
543 self.assertEqual(lock.acquire(), True)
544 self.assertEqual(lock.acquire(), True)
545 self.assertEqual(lock.release(), None)
546 self.assertEqual(lock.release(), None)
547 self.assertEqual(lock.release(), None)
548 self.assertRaises((AssertionError, RuntimeError), lock.release)
549
550
551class _TestSemaphore(BaseTestCase):
552
553 def _test_semaphore(self, sem):
554 self.assertReturnsIfImplemented(2, get_value, sem)
555 self.assertEqual(sem.acquire(), True)
556 self.assertReturnsIfImplemented(1, get_value, sem)
557 self.assertEqual(sem.acquire(), True)
558 self.assertReturnsIfImplemented(0, get_value, sem)
559 self.assertEqual(sem.acquire(False), False)
560 self.assertReturnsIfImplemented(0, get_value, sem)
561 self.assertEqual(sem.release(), None)
562 self.assertReturnsIfImplemented(1, get_value, sem)
563 self.assertEqual(sem.release(), None)
564 self.assertReturnsIfImplemented(2, get_value, sem)
565
566 def test_semaphore(self):
567 sem = self.Semaphore(2)
568 self._test_semaphore(sem)
569 self.assertEqual(sem.release(), None)
570 self.assertReturnsIfImplemented(3, get_value, sem)
571 self.assertEqual(sem.release(), None)
572 self.assertReturnsIfImplemented(4, get_value, sem)
573
574 def test_bounded_semaphore(self):
575 sem = self.BoundedSemaphore(2)
576 self._test_semaphore(sem)
577 # Currently fails on OS/X
578 #if HAVE_GETVALUE:
579 # self.assertRaises(ValueError, sem.release)
580 # self.assertReturnsIfImplemented(2, get_value, sem)
581
582 def test_timeout(self):
583 if self.TYPE != 'processes':
584 return
585
586 sem = self.Semaphore(0)
587 acquire = TimingWrapper(sem.acquire)
588
589 self.assertEqual(acquire(False), False)
590 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
591
592 self.assertEqual(acquire(False, None), False)
593 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
594
595 self.assertEqual(acquire(False, TIMEOUT1), False)
596 self.assertTimingAlmostEqual(acquire.elapsed, 0)
597
598 self.assertEqual(acquire(True, TIMEOUT2), False)
599 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
600
601 self.assertEqual(acquire(timeout=TIMEOUT3), False)
602 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
603
604
605class _TestCondition(BaseTestCase):
606
607 def f(self, cond, sleeping, woken, timeout=None):
608 cond.acquire()
609 sleeping.release()
610 cond.wait(timeout)
611 woken.release()
612 cond.release()
613
614 def check_invariant(self, cond):
615 # this is only supposed to succeed when there are no sleepers
616 if self.TYPE == 'processes':
617 try:
618 sleepers = (cond._sleeping_count.get_value() -
619 cond._woken_count.get_value())
620 self.assertEqual(sleepers, 0)
621 self.assertEqual(cond._wait_semaphore.get_value(), 0)
622 except NotImplementedError:
623 pass
624
625 def test_notify(self):
626 cond = self.Condition()
627 sleeping = self.Semaphore(0)
628 woken = self.Semaphore(0)
629
630 p = self.Process(target=self.f, args=(cond, sleeping, woken))
631 p.set_daemon(True)
632 p.start()
633
634 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
635 p.setDaemon(True)
636 p.start()
637
638 # wait for both children to start sleeping
639 sleeping.acquire()
640 sleeping.acquire()
641
642 # check no process/thread has woken up
643 time.sleep(DELTA)
644 self.assertReturnsIfImplemented(0, get_value, woken)
645
646 # wake up one process/thread
647 cond.acquire()
648 cond.notify()
649 cond.release()
650
651 # check one process/thread has woken up
652 time.sleep(DELTA)
653 self.assertReturnsIfImplemented(1, get_value, woken)
654
655 # wake up another
656 cond.acquire()
657 cond.notify()
658 cond.release()
659
660 # check other has woken up
661 time.sleep(DELTA)
662 self.assertReturnsIfImplemented(2, get_value, woken)
663
664 # check state is not mucked up
665 self.check_invariant(cond)
666 p.join()
667
668 def test_notify_all(self):
669 cond = self.Condition()
670 sleeping = self.Semaphore(0)
671 woken = self.Semaphore(0)
672
673 # start some threads/processes which will timeout
674 for i in range(3):
675 p = self.Process(target=self.f,
676 args=(cond, sleeping, woken, TIMEOUT1))
677 p.set_daemon(True)
678 p.start()
679
680 t = threading.Thread(target=self.f,
681 args=(cond, sleeping, woken, TIMEOUT1))
682 t.setDaemon(True)
683 t.start()
684
685 # wait for them all to sleep
686 for i in range(6):
687 sleeping.acquire()
688
689 # check they have all timed out
690 for i in range(6):
691 woken.acquire()
692 self.assertReturnsIfImplemented(0, get_value, woken)
693
694 # check state is not mucked up
695 self.check_invariant(cond)
696
697 # start some more threads/processes
698 for i in range(3):
699 p = self.Process(target=self.f, args=(cond, sleeping, woken))
700 p.set_daemon(True)
701 p.start()
702
703 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
704 t.setDaemon(True)
705 t.start()
706
707 # wait for them to all sleep
708 for i in range(6):
709 sleeping.acquire()
710
711 # check no process/thread has woken up
712 time.sleep(DELTA)
713 self.assertReturnsIfImplemented(0, get_value, woken)
714
715 # wake them all up
716 cond.acquire()
717 cond.notify_all()
718 cond.release()
719
720 # check they have all woken
721 time.sleep(DELTA)
722 self.assertReturnsIfImplemented(6, get_value, woken)
723
724 # check state is not mucked up
725 self.check_invariant(cond)
726
727 def test_timeout(self):
728 cond = self.Condition()
729 wait = TimingWrapper(cond.wait)
730 cond.acquire()
731 res = wait(TIMEOUT1)
732 cond.release()
733 self.assertEqual(res, None)
734 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
735
736
737class _TestEvent(BaseTestCase):
738
739 def _test_event(self, event):
740 time.sleep(TIMEOUT2)
741 event.set()
742
743 def test_event(self):
744 event = self.Event()
745 wait = TimingWrapper(event.wait)
746
747 # Removed temporaily, due to API shear, this does not
748 # work with threading._Event objects. is_set == isSet
749 #self.assertEqual(event.is_set(), False)
750
751 self.assertEqual(wait(0.0), None)
752 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
753 self.assertEqual(wait(TIMEOUT1), None)
754 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
755
756 event.set()
757
758 # See note above on the API differences
759 # self.assertEqual(event.is_set(), True)
760 self.assertEqual(wait(), None)
761 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
762 self.assertEqual(wait(TIMEOUT1), None)
763 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
764 # self.assertEqual(event.is_set(), True)
765
766 event.clear()
767
768 #self.assertEqual(event.is_set(), False)
769
770 self.Process(target=self._test_event, args=(event,)).start()
771 self.assertEqual(wait(), None)
772
773#
774#
775#
776
777class _TestValue(BaseTestCase):
778
779 codes_values = [
780 ('i', 4343, 24234),
781 ('d', 3.625, -4.25),
782 ('h', -232, 234),
783 ('c', latin('x'), latin('y'))
784 ]
785
786 def _test(self, values):
787 for sv, cv in zip(values, self.codes_values):
788 sv.value = cv[2]
789
790
791 def test_value(self, raw=False):
792 if self.TYPE != 'processes':
793 return
794
795 if raw:
796 values = [self.RawValue(code, value)
797 for code, value, _ in self.codes_values]
798 else:
799 values = [self.Value(code, value)
800 for code, value, _ in self.codes_values]
801
802 for sv, cv in zip(values, self.codes_values):
803 self.assertEqual(sv.value, cv[1])
804
805 proc = self.Process(target=self._test, args=(values,))
806 proc.start()
807 proc.join()
808
809 for sv, cv in zip(values, self.codes_values):
810 self.assertEqual(sv.value, cv[2])
811
812 def test_rawvalue(self):
813 self.test_value(raw=True)
814
815 def test_getobj_getlock(self):
816 if self.TYPE != 'processes':
817 return
818
819 val1 = self.Value('i', 5)
820 lock1 = val1.get_lock()
821 obj1 = val1.get_obj()
822
823 val2 = self.Value('i', 5, lock=None)
824 lock2 = val2.get_lock()
825 obj2 = val2.get_obj()
826
827 lock = self.Lock()
828 val3 = self.Value('i', 5, lock=lock)
829 lock3 = val3.get_lock()
830 obj3 = val3.get_obj()
831 self.assertEqual(lock, lock3)
832
833 arr4 = self.RawValue('i', 5)
834 self.assertFalse(hasattr(arr4, 'get_lock'))
835 self.assertFalse(hasattr(arr4, 'get_obj'))
836
837
838class _TestArray(BaseTestCase):
839
840 def f(self, seq):
841 for i in range(1, len(seq)):
842 seq[i] += seq[i-1]
843
844 def test_array(self, raw=False):
845 if self.TYPE != 'processes':
846 return
847
848 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
849 if raw:
850 arr = self.RawArray('i', seq)
851 else:
852 arr = self.Array('i', seq)
853
854 self.assertEqual(len(arr), len(seq))
855 self.assertEqual(arr[3], seq[3])
856 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
857
858 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
859
860 self.assertEqual(list(arr[:]), seq)
861
862 self.f(seq)
863
864 p = self.Process(target=self.f, args=(arr,))
865 p.start()
866 p.join()
867
868 self.assertEqual(list(arr[:]), seq)
869
870 def test_rawarray(self):
871 self.test_array(raw=True)
872
873 def test_getobj_getlock_obj(self):
874 if self.TYPE != 'processes':
875 return
876
877 arr1 = self.Array('i', list(range(10)))
878 lock1 = arr1.get_lock()
879 obj1 = arr1.get_obj()
880
881 arr2 = self.Array('i', list(range(10)), lock=None)
882 lock2 = arr2.get_lock()
883 obj2 = arr2.get_obj()
884
885 lock = self.Lock()
886 arr3 = self.Array('i', list(range(10)), lock=lock)
887 lock3 = arr3.get_lock()
888 obj3 = arr3.get_obj()
889 self.assertEqual(lock, lock3)
890
891 arr4 = self.RawArray('i', list(range(10)))
892 self.assertFalse(hasattr(arr4, 'get_lock'))
893 self.assertFalse(hasattr(arr4, 'get_obj'))
894
895#
896#
897#
898
899class _TestContainers(BaseTestCase):
900
901 ALLOWED_TYPES = ('manager',)
902
903 def test_list(self):
904 a = self.list(list(range(10)))
905 self.assertEqual(a[:], list(range(10)))
906
907 b = self.list()
908 self.assertEqual(b[:], [])
909
910 b.extend(list(range(5)))
911 self.assertEqual(b[:], list(range(5)))
912
913 self.assertEqual(b[2], 2)
914 self.assertEqual(b[2:10], [2,3,4])
915
916 b *= 2
917 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
918
919 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
920
921 self.assertEqual(a[:], list(range(10)))
922
923 d = [a, b]
924 e = self.list(d)
925 self.assertEqual(
926 e[:],
927 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
928 )
929
930 f = self.list([a])
931 a.append('hello')
932 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
933
934 def test_dict(self):
935 d = self.dict()
936 indices = list(range(65, 70))
937 for i in indices:
938 d[i] = chr(i)
939 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
940 self.assertEqual(sorted(d.keys()), indices)
941 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
942 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
943
944 def test_namespace(self):
945 n = self.Namespace()
946 n.name = 'Bob'
947 n.job = 'Builder'
948 n._hidden = 'hidden'
949 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
950 del n.job
951 self.assertEqual(str(n), "Namespace(name='Bob')")
952 self.assertTrue(hasattr(n, 'name'))
953 self.assertTrue(not hasattr(n, 'job'))
954
955#
956#
957#
958
959def sqr(x, wait=0.0):
960 time.sleep(wait)
961 return x*x
962
963class _TestPool(BaseTestCase):
964
965 def test_apply(self):
966 papply = self.pool.apply
967 self.assertEqual(papply(sqr, (5,)), sqr(5))
968 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
969
970 def test_map(self):
971 pmap = self.pool.map
972 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
973 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
974 list(map(sqr, list(range(100)))))
975
976 def test_async(self):
977 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
978 get = TimingWrapper(res.get)
979 self.assertEqual(get(), 49)
980 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
981
982 def test_async_timeout(self):
983 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
984 get = TimingWrapper(res.get)
985 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
986 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
987
988 def test_imap(self):
989 it = self.pool.imap(sqr, list(range(10)))
990 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
991
992 it = self.pool.imap(sqr, list(range(10)))
993 for i in range(10):
994 self.assertEqual(next(it), i*i)
995 self.assertRaises(StopIteration, it.__next__)
996
997 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
998 for i in range(1000):
999 self.assertEqual(next(it), i*i)
1000 self.assertRaises(StopIteration, it.__next__)
1001
1002 def test_imap_unordered(self):
1003 it = self.pool.imap_unordered(sqr, list(range(1000)))
1004 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1005
1006 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1007 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1008
1009 def test_make_pool(self):
1010 p = multiprocessing.Pool(3)
1011 self.assertEqual(3, len(p._pool))
1012 p.close()
1013 p.join()
1014
1015 def test_terminate(self):
1016 if self.TYPE == 'manager':
1017 # On Unix a forked process increfs each shared object to
1018 # which its parent process held a reference. If the
1019 # forked process gets terminated then there is likely to
1020 # be a reference leak. So to prevent
1021 # _TestZZZNumberOfObjects from failing we skip this test
1022 # when using a manager.
1023 return
1024
1025 result = self.pool.map_async(
1026 time.sleep, [0.1 for i in range(10000)], chunksize=1
1027 )
1028 self.pool.terminate()
1029 join = TimingWrapper(self.pool.join)
1030 join()
1031 self.assertTrue(join.elapsed < 0.2)
1032
1033#
1034# Test that manager has expected number of shared objects left
1035#
1036
1037class _TestZZZNumberOfObjects(BaseTestCase):
1038 # Because test cases are sorted alphabetically, this one will get
1039 # run after all the other tests for the manager. It tests that
1040 # there have been no "reference leaks" for the manager's shared
1041 # objects. Note the comment in _TestPool.test_terminate().
1042 ALLOWED_TYPES = ('manager',)
1043
1044 def test_number_of_objects(self):
1045 EXPECTED_NUMBER = 1 # the pool object is still alive
1046 multiprocessing.active_children() # discard dead process objs
1047 gc.collect() # do garbage collection
1048 refs = self.manager._number_of_objects()
1049 if refs != EXPECTED_NUMBER:
1050 print(self.manager._debugInfo())
1051
1052 self.assertEqual(refs, EXPECTED_NUMBER)
1053
1054#
1055# Test of creating a customized manager class
1056#
1057
1058from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1059
1060class FooBar(object):
1061 def f(self):
1062 return 'f()'
1063 def g(self):
1064 raise ValueError
1065 def _h(self):
1066 return '_h()'
1067
1068def baz():
1069 for i in range(10):
1070 yield i*i
1071
1072class IteratorProxy(BaseProxy):
1073 _exposed_ = ('next', '__next__')
1074 def __iter__(self):
1075 return self
1076 def __next__(self):
1077 return self._callmethod('next')
1078 def __next__(self):
1079 return self._callmethod('__next__')
1080
1081class MyManager(BaseManager):
1082 pass
1083
1084MyManager.register('Foo', callable=FooBar)
1085MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1086MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1087
1088
1089class _TestMyManager(BaseTestCase):
1090
1091 ALLOWED_TYPES = ('manager',)
1092
1093 def test_mymanager(self):
1094 manager = MyManager()
1095 manager.start()
1096
1097 foo = manager.Foo()
1098 bar = manager.Bar()
1099 baz = manager.baz()
1100
1101 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1102 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1103
1104 self.assertEqual(foo_methods, ['f', 'g'])
1105 self.assertEqual(bar_methods, ['f', '_h'])
1106
1107 self.assertEqual(foo.f(), 'f()')
1108 self.assertRaises(ValueError, foo.g)
1109 self.assertEqual(foo._callmethod('f'), 'f()')
1110 self.assertRaises(RemoteError, foo._callmethod, '_h')
1111
1112 self.assertEqual(bar.f(), 'f()')
1113 self.assertEqual(bar._h(), '_h()')
1114 self.assertEqual(bar._callmethod('f'), 'f()')
1115 self.assertEqual(bar._callmethod('_h'), '_h()')
1116
1117 self.assertEqual(list(baz), [i*i for i in range(10)])
1118
1119 manager.shutdown()
1120
1121#
1122# Test of connecting to a remote server and using xmlrpclib for serialization
1123#
1124
1125_queue = pyqueue.Queue()
1126def get_queue():
1127 return _queue
1128
1129class QueueManager(BaseManager):
1130 '''manager class used by server process'''
1131QueueManager.register('get_queue', callable=get_queue)
1132
1133class QueueManager2(BaseManager):
1134 '''manager class which specifies the same interface as QueueManager'''
1135QueueManager2.register('get_queue')
1136
1137
1138SERIALIZER = 'xmlrpclib'
1139
1140class _TestRemoteManager(BaseTestCase):
1141
1142 ALLOWED_TYPES = ('manager',)
1143
1144 def _putter(self, address, authkey):
1145 manager = QueueManager2(
1146 address=address, authkey=authkey, serializer=SERIALIZER
1147 )
1148 manager.connect()
1149 queue = manager.get_queue()
1150 queue.put(('hello world', None, True, 2.25))
1151
1152 def test_remote(self):
1153 authkey = os.urandom(32)
1154
1155 manager = QueueManager(
1156 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1157 )
1158 manager.start()
1159
1160 p = self.Process(target=self._putter, args=(manager.address, authkey))
1161 p.start()
1162
1163 manager2 = QueueManager2(
1164 address=manager.address, authkey=authkey, serializer=SERIALIZER
1165 )
1166 manager2.connect()
1167 queue = manager2.get_queue()
1168
1169 # Note that xmlrpclib will deserialize object as a list not a tuple
1170 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1171
1172 # Because we are using xmlrpclib for serialization instead of
1173 # pickle this will cause a serialization error.
1174 self.assertRaises(Exception, queue.put, time.sleep)
1175
1176 # Make queue finalizer run before the server is stopped
1177 del queue
1178 manager.shutdown()
1179
1180#
1181#
1182#
1183
1184SENTINEL = latin('')
1185
1186class _TestConnection(BaseTestCase):
1187
1188 ALLOWED_TYPES = ('processes', 'threads')
1189
1190 def _echo(self, conn):
1191 for msg in iter(conn.recv_bytes, SENTINEL):
1192 conn.send_bytes(msg)
1193 conn.close()
1194
1195 def test_connection(self):
1196 conn, child_conn = self.Pipe()
1197
1198 p = self.Process(target=self._echo, args=(child_conn,))
1199 p.set_daemon(True)
1200 p.start()
1201
1202 seq = [1, 2.25, None]
1203 msg = latin('hello world')
1204 longmsg = msg * 10
1205 arr = array.array('i', list(range(4)))
1206
1207 if self.TYPE == 'processes':
1208 self.assertEqual(type(conn.fileno()), int)
1209
1210 self.assertEqual(conn.send(seq), None)
1211 self.assertEqual(conn.recv(), seq)
1212
1213 self.assertEqual(conn.send_bytes(msg), None)
1214 self.assertEqual(conn.recv_bytes(), msg)
1215
1216 if self.TYPE == 'processes':
1217 buffer = array.array('i', [0]*10)
1218 expected = list(arr) + [0] * (10 - len(arr))
1219 self.assertEqual(conn.send_bytes(arr), None)
1220 self.assertEqual(conn.recv_bytes_into(buffer),
1221 len(arr) * buffer.itemsize)
1222 self.assertEqual(list(buffer), expected)
1223
1224 buffer = array.array('i', [0]*10)
1225 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1226 self.assertEqual(conn.send_bytes(arr), None)
1227 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1228 len(arr) * buffer.itemsize)
1229 self.assertEqual(list(buffer), expected)
1230
1231 buffer = bytearray(latin(' ' * 40))
1232 self.assertEqual(conn.send_bytes(longmsg), None)
1233 try:
1234 res = conn.recv_bytes_into(buffer)
1235 except multiprocessing.BufferTooShort as e:
1236 self.assertEqual(e.args, (longmsg,))
1237 else:
1238 self.fail('expected BufferTooShort, got %s' % res)
1239
1240 poll = TimingWrapper(conn.poll)
1241
1242 self.assertEqual(poll(), False)
1243 self.assertTimingAlmostEqual(poll.elapsed, 0)
1244
1245 self.assertEqual(poll(TIMEOUT1), False)
1246 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1247
1248 conn.send(None)
1249
1250 self.assertEqual(poll(TIMEOUT1), True)
1251 self.assertTimingAlmostEqual(poll.elapsed, 0)
1252
1253 self.assertEqual(conn.recv(), None)
1254
1255 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1256 conn.send_bytes(really_big_msg)
1257 self.assertEqual(conn.recv_bytes(), really_big_msg)
1258
1259 conn.send_bytes(SENTINEL) # tell child to quit
1260 child_conn.close()
1261
1262 if self.TYPE == 'processes':
1263 self.assertEqual(conn.readable, True)
1264 self.assertEqual(conn.writable, True)
1265 self.assertRaises(EOFError, conn.recv)
1266 self.assertRaises(EOFError, conn.recv_bytes)
1267
1268 p.join()
1269
1270 def test_duplex_false(self):
1271 reader, writer = self.Pipe(duplex=False)
1272 self.assertEqual(writer.send(1), None)
1273 self.assertEqual(reader.recv(), 1)
1274 if self.TYPE == 'processes':
1275 self.assertEqual(reader.readable, True)
1276 self.assertEqual(reader.writable, False)
1277 self.assertEqual(writer.readable, False)
1278 self.assertEqual(writer.writable, True)
1279 self.assertRaises(IOError, reader.send, 2)
1280 self.assertRaises(IOError, writer.recv)
1281 self.assertRaises(IOError, writer.poll)
1282
1283 def test_spawn_close(self):
1284 # We test that a pipe connection can be closed by parent
1285 # process immediately after child is spawned. On Windows this
1286 # would have sometimes failed on old versions because
1287 # child_conn would be closed before the child got a chance to
1288 # duplicate it.
1289 conn, child_conn = self.Pipe()
1290
1291 p = self.Process(target=self._echo, args=(child_conn,))
1292 p.start()
1293 child_conn.close() # this might complete before child initializes
1294
1295 msg = latin('hello')
1296 conn.send_bytes(msg)
1297 self.assertEqual(conn.recv_bytes(), msg)
1298
1299 conn.send_bytes(SENTINEL)
1300 conn.close()
1301 p.join()
1302
1303 def test_sendbytes(self):
1304 if self.TYPE != 'processes':
1305 return
1306
1307 msg = latin('abcdefghijklmnopqrstuvwxyz')
1308 a, b = self.Pipe()
1309
1310 a.send_bytes(msg)
1311 self.assertEqual(b.recv_bytes(), msg)
1312
1313 a.send_bytes(msg, 5)
1314 self.assertEqual(b.recv_bytes(), msg[5:])
1315
1316 a.send_bytes(msg, 7, 8)
1317 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1318
1319 a.send_bytes(msg, 26)
1320 self.assertEqual(b.recv_bytes(), latin(''))
1321
1322 a.send_bytes(msg, 26, 0)
1323 self.assertEqual(b.recv_bytes(), latin(''))
1324
1325 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1326
1327 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1328
1329 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1330
1331 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1332
1333 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1334
1335
1336class _TestListenerClient(BaseTestCase):
1337
1338 ALLOWED_TYPES = ('processes', 'threads')
1339
1340 def _test(self, address):
1341 conn = self.connection.Client(address)
1342 conn.send('hello')
1343 conn.close()
1344
1345 def test_listener_client(self):
1346 for family in self.connection.families:
1347 l = self.connection.Listener(family=family)
1348 p = self.Process(target=self._test, args=(l.address,))
1349 p.set_daemon(True)
1350 p.start()
1351 conn = l.accept()
1352 self.assertEqual(conn.recv(), 'hello')
1353 p.join()
1354 l.close()
1355
1356#
1357# Test of sending connection and socket objects between processes
1358#
1359
1360class _TestPicklingConnections(BaseTestCase):
1361
1362 ALLOWED_TYPES = ('processes',)
1363
1364 def _listener(self, conn, families):
1365 for fam in families:
1366 l = self.connection.Listener(family=fam)
1367 conn.send(l.address)
1368 new_conn = l.accept()
1369 conn.send(new_conn)
1370
1371 if self.TYPE == 'processes':
1372 l = socket.socket()
1373 l.bind(('localhost', 0))
1374 conn.send(l.getsockname())
1375 l.listen(1)
1376 new_conn, addr = l.accept()
1377 conn.send(new_conn)
1378
1379 conn.recv()
1380
1381 def _remote(self, conn):
1382 for (address, msg) in iter(conn.recv, None):
1383 client = self.connection.Client(address)
1384 client.send(msg.upper())
1385 client.close()
1386
1387 if self.TYPE == 'processes':
1388 address, msg = conn.recv()
1389 client = socket.socket()
1390 client.connect(address)
1391 client.sendall(msg.upper())
1392 client.close()
1393
1394 conn.close()
1395
1396 def test_pickling(self):
1397 try:
1398 multiprocessing.allow_connection_pickling()
1399 except ImportError:
1400 return
1401
1402 families = self.connection.families
1403
1404 lconn, lconn0 = self.Pipe()
1405 lp = self.Process(target=self._listener, args=(lconn0, families))
1406 lp.start()
1407 lconn0.close()
1408
1409 rconn, rconn0 = self.Pipe()
1410 rp = self.Process(target=self._remote, args=(rconn0,))
1411 rp.start()
1412 rconn0.close()
1413
1414 for fam in families:
1415 msg = ('This connection uses family %s' % fam).encode('ascii')
1416 address = lconn.recv()
1417 rconn.send((address, msg))
1418 new_conn = lconn.recv()
1419 self.assertEqual(new_conn.recv(), msg.upper())
1420
1421 rconn.send(None)
1422
1423 if self.TYPE == 'processes':
1424 msg = latin('This connection uses a normal socket')
1425 address = lconn.recv()
1426 rconn.send((address, msg))
1427 if hasattr(socket, 'fromfd'):
1428 new_conn = lconn.recv()
1429 self.assertEqual(new_conn.recv(100), msg.upper())
1430 else:
1431 # XXX On Windows with Py2.6 need to backport fromfd()
1432 discard = lconn.recv_bytes()
1433
1434 lconn.send(None)
1435
1436 rconn.close()
1437 lconn.close()
1438
1439 lp.join()
1440 rp.join()
1441
1442#
1443#
1444#
1445
1446class _TestHeap(BaseTestCase):
1447
1448 ALLOWED_TYPES = ('processes',)
1449
1450 def test_heap(self):
1451 iterations = 5000
1452 maxblocks = 50
1453 blocks = []
1454
1455 # create and destroy lots of blocks of different sizes
1456 for i in range(iterations):
1457 size = int(random.lognormvariate(0, 1) * 1000)
1458 b = multiprocessing.heap.BufferWrapper(size)
1459 blocks.append(b)
1460 if len(blocks) > maxblocks:
1461 i = random.randrange(maxblocks)
1462 del blocks[i]
1463
1464 # get the heap object
1465 heap = multiprocessing.heap.BufferWrapper._heap
1466
1467 # verify the state of the heap
1468 all = []
1469 occupied = 0
1470 for L in list(heap._len_to_seq.values()):
1471 for arena, start, stop in L:
1472 all.append((heap._arenas.index(arena), start, stop,
1473 stop-start, 'free'))
1474 for arena, start, stop in heap._allocated_blocks:
1475 all.append((heap._arenas.index(arena), start, stop,
1476 stop-start, 'occupied'))
1477 occupied += (stop-start)
1478
1479 all.sort()
1480
1481 for i in range(len(all)-1):
1482 (arena, start, stop) = all[i][:3]
1483 (narena, nstart, nstop) = all[i+1][:3]
1484 self.assertTrue((arena != narena and nstart == 0) or
1485 (stop == nstart))
1486
1487#
1488#
1489#
1490
1491try:
1492 from ctypes import Structure, Value, copy, c_int, c_double
1493except ImportError:
1494 Structure = object
1495 c_int = c_double = None
1496
1497class _Foo(Structure):
1498 _fields_ = [
1499 ('x', c_int),
1500 ('y', c_double)
1501 ]
1502
1503class _TestSharedCTypes(BaseTestCase):
1504
1505 ALLOWED_TYPES = ('processes',)
1506
1507 def _double(self, x, y, foo, arr, string):
1508 x.value *= 2
1509 y.value *= 2
1510 foo.x *= 2
1511 foo.y *= 2
1512 string.value *= 2
1513 for i in range(len(arr)):
1514 arr[i] *= 2
1515
1516 def test_sharedctypes(self, lock=False):
1517 if c_int is None:
1518 return
1519
1520 x = Value('i', 7, lock=lock)
1521 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1522 foo = Value(_Foo, 3, 2, lock=lock)
1523 arr = Array('d', list(range(10)), lock=lock)
1524 string = Array('c', 20, lock=lock)
1525 string.value = 'hello'
1526
1527 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1528 p.start()
1529 p.join()
1530
1531 self.assertEqual(x.value, 14)
1532 self.assertAlmostEqual(y.value, 2.0/3.0)
1533 self.assertEqual(foo.x, 6)
1534 self.assertAlmostEqual(foo.y, 4.0)
1535 for i in range(10):
1536 self.assertAlmostEqual(arr[i], i*2)
1537 self.assertEqual(string.value, latin('hellohello'))
1538
1539 def test_synchronize(self):
1540 self.test_sharedctypes(lock=True)
1541
1542 def test_copy(self):
1543 if c_int is None:
1544 return
1545
1546 foo = _Foo(2, 5.0)
1547 bar = copy(foo)
1548 foo.x = 0
1549 foo.y = 0
1550 self.assertEqual(bar.x, 2)
1551 self.assertAlmostEqual(bar.y, 5.0)
1552
1553#
1554#
1555#
1556
1557class _TestFinalize(BaseTestCase):
1558
1559 ALLOWED_TYPES = ('processes',)
1560
1561 def _test_finalize(self, conn):
1562 class Foo(object):
1563 pass
1564
1565 a = Foo()
1566 util.Finalize(a, conn.send, args=('a',))
1567 del a # triggers callback for a
1568
1569 b = Foo()
1570 close_b = util.Finalize(b, conn.send, args=('b',))
1571 close_b() # triggers callback for b
1572 close_b() # does nothing because callback has already been called
1573 del b # does nothing because callback has already been called
1574
1575 c = Foo()
1576 util.Finalize(c, conn.send, args=('c',))
1577
1578 d10 = Foo()
1579 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1580
1581 d01 = Foo()
1582 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1583 d02 = Foo()
1584 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1585 d03 = Foo()
1586 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1587
1588 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1589
1590 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1591
1592 # call mutliprocessing's cleanup function then exit process without
1593 # garbage collecting locals
1594 util._exit_function()
1595 conn.close()
1596 os._exit(0)
1597
1598 def test_finalize(self):
1599 conn, child_conn = self.Pipe()
1600
1601 p = self.Process(target=self._test_finalize, args=(child_conn,))
1602 p.start()
1603 p.join()
1604
1605 result = [obj for obj in iter(conn.recv, 'STOP')]
1606 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1607
1608#
1609# Test that from ... import * works for each module
1610#
1611
1612class _TestImportStar(BaseTestCase):
1613
1614 ALLOWED_TYPES = ('processes',)
1615
1616 def test_import(self):
1617 modules = (
1618 'multiprocessing', 'multiprocessing.connection',
1619 'multiprocessing.heap', 'multiprocessing.managers',
1620 'multiprocessing.pool', 'multiprocessing.process',
1621 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1622 'multiprocessing.synchronize', 'multiprocessing.util'
1623 )
1624
1625 for name in modules:
1626 __import__(name)
1627 mod = sys.modules[name]
1628
1629 for attr in getattr(mod, '__all__', ()):
1630 self.assertTrue(
1631 hasattr(mod, attr),
1632 '%r does not have attribute %r' % (mod, attr)
1633 )
1634
1635#
1636# Quick test that logging works -- does not test logging output
1637#
1638
1639class _TestLogging(BaseTestCase):
1640
1641 ALLOWED_TYPES = ('processes',)
1642
1643 def test_enable_logging(self):
1644 logger = multiprocessing.get_logger()
1645 logger.setLevel(util.SUBWARNING)
1646 self.assertTrue(logger is not None)
1647 logger.debug('this will not be printed')
1648 logger.info('nor will this')
1649 logger.setLevel(LOG_LEVEL)
1650
1651 def _test_level(self, conn):
1652 logger = multiprocessing.get_logger()
1653 conn.send(logger.getEffectiveLevel())
1654
1655 def test_level(self):
1656 LEVEL1 = 32
1657 LEVEL2 = 37
1658
1659 logger = multiprocessing.get_logger()
1660 root_logger = logging.getLogger()
1661 root_level = root_logger.level
1662
1663 reader, writer = multiprocessing.Pipe(duplex=False)
1664
1665 logger.setLevel(LEVEL1)
1666 self.Process(target=self._test_level, args=(writer,)).start()
1667 self.assertEqual(LEVEL1, reader.recv())
1668
1669 logger.setLevel(logging.NOTSET)
1670 root_logger.setLevel(LEVEL2)
1671 self.Process(target=self._test_level, args=(writer,)).start()
1672 self.assertEqual(LEVEL2, reader.recv())
1673
1674 root_logger.setLevel(root_level)
1675 logger.setLevel(level=LOG_LEVEL)
1676
1677#
1678# Functions used to create test cases from the base ones in this module
1679#
1680
1681def get_attributes(Source, names):
1682 d = {}
1683 for name in names:
1684 obj = getattr(Source, name)
1685 if type(obj) == type(get_attributes):
1686 obj = staticmethod(obj)
1687 d[name] = obj
1688 return d
1689
1690def create_test_cases(Mixin, type):
1691 result = {}
1692 glob = globals()
1693 Type = type[0].upper() + type[1:]
1694
1695 for name in list(glob.keys()):
1696 if name.startswith('_Test'):
1697 base = glob[name]
1698 if type in base.ALLOWED_TYPES:
1699 newname = 'With' + Type + name[1:]
1700 class Temp(base, unittest.TestCase, Mixin):
1701 pass
1702 result[newname] = Temp
1703 Temp.__name__ = newname
1704 Temp.__module__ = Mixin.__module__
1705 return result
1706
1707#
1708# Create test cases
1709#
1710
1711class ProcessesMixin(object):
1712 TYPE = 'processes'
1713 Process = multiprocessing.Process
1714 locals().update(get_attributes(multiprocessing, (
1715 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1716 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1717 'RawArray', 'current_process', 'active_children', 'Pipe',
1718 'connection', 'JoinableQueue'
1719 )))
1720
1721testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1722globals().update(testcases_processes)
1723
1724
1725class ManagerMixin(object):
1726 TYPE = 'manager'
1727 Process = multiprocessing.Process
1728 manager = object.__new__(multiprocessing.managers.SyncManager)
1729 locals().update(get_attributes(manager, (
1730 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1731 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1732 'Namespace', 'JoinableQueue'
1733 )))
1734
1735testcases_manager = create_test_cases(ManagerMixin, type='manager')
1736globals().update(testcases_manager)
1737
1738
1739class ThreadsMixin(object):
1740 TYPE = 'threads'
1741 Process = multiprocessing.dummy.Process
1742 locals().update(get_attributes(multiprocessing.dummy, (
1743 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1744 'Condition', 'Event', 'Value', 'Array', 'current_process',
1745 'active_children', 'Pipe', 'connection', 'dict', 'list',
1746 'Namespace', 'JoinableQueue'
1747 )))
1748
1749testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1750globals().update(testcases_threads)
1751
1752#
1753#
1754#
1755
1756def test_main(run=None):
1757 if run is None:
1758 from test.support import run_unittest as run
1759
1760 util.get_temp_dir() # creates temp directory for use by all processes
1761
1762 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1763
1764 ProcessesMixin.pool = multiprocessing.Pool(4)
1765 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1766 ManagerMixin.manager.__init__()
1767 ManagerMixin.manager.start()
1768 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
1769
1770 testcases = (
1771 sorted(list(testcases_processes.values()), key=lambda tc:tc.__name__) +
1772 sorted(list(testcases_threads.values()), key=lambda tc:tc.__name__) +
1773 sorted(list(testcases_manager.values()), key=lambda tc:tc.__name__)
1774 )
1775
1776 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1777 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1778 run(suite)
1779
1780 ThreadsMixin.pool.terminate()
1781 ProcessesMixin.pool.terminate()
1782 ManagerMixin.pool.terminate()
1783 ManagerMixin.manager.shutdown()
1784
1785 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
1786
1787def main():
1788 test_main(unittest.TextTestRunner(verbosity=2).run)
1789
1790if __name__ == '__main__':
1791 main()