blob: 436cad807ed5eceeec7b7c62be62c5fb02cdd765 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
21import multiprocessing.dummy
22import multiprocessing.connection
23import multiprocessing.managers
24import multiprocessing.heap
Benjamin Petersone711caf2008-06-11 16:44:04 +000025import multiprocessing.pool
26import _multiprocessing
27
28from multiprocessing import util
29
30#
31#
32#
33
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000034def latin(s):
35 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000036
Benjamin Petersone711caf2008-06-11 16:44:04 +000037#
38# Constants
39#
40
41LOG_LEVEL = util.SUBWARNING
42#LOG_LEVEL = logging.WARNING
43
44DELTA = 0.1
45CHECK_TIMINGS = False # making true makes tests take a lot longer
46 # and can sometimes cause some non-serious
47 # failures because some calls block a bit
48 # longer than expected
49if CHECK_TIMINGS:
50 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
51else:
52 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
53
54HAVE_GETVALUE = not getattr(_multiprocessing,
55 'HAVE_BROKEN_SEM_GETVALUE', False)
56
57#
58# Creates a wrapper for a function which records the time it takes to finish
59#
60
61class TimingWrapper(object):
62
63 def __init__(self, func):
64 self.func = func
65 self.elapsed = None
66
67 def __call__(self, *args, **kwds):
68 t = time.time()
69 try:
70 return self.func(*args, **kwds)
71 finally:
72 self.elapsed = time.time() - t
73
74#
75# Base class for test cases
76#
77
78class BaseTestCase(object):
79
80 ALLOWED_TYPES = ('processes', 'manager', 'threads')
81
82 def assertTimingAlmostEqual(self, a, b):
83 if CHECK_TIMINGS:
84 self.assertAlmostEqual(a, b, 1)
85
86 def assertReturnsIfImplemented(self, value, func, *args):
87 try:
88 res = func(*args)
89 except NotImplementedError:
90 pass
91 else:
92 return self.assertEqual(value, res)
93
94#
95# Return the value of a semaphore
96#
97
98def get_value(self):
99 try:
100 return self.get_value()
101 except AttributeError:
102 try:
103 return self._Semaphore__value
104 except AttributeError:
105 try:
106 return self._value
107 except AttributeError:
108 raise NotImplementedError
109
110#
111# Testcases
112#
113
114class _TestProcess(BaseTestCase):
115
116 ALLOWED_TYPES = ('processes', 'threads')
117
118 def test_current(self):
119 if self.TYPE == 'threads':
120 return
121
122 current = self.current_process()
123 authkey = current.get_authkey()
124
125 self.assertTrue(current.is_alive())
126 self.assertTrue(not current.is_daemon())
127 self.assertTrue(isinstance(authkey, bytes))
128 self.assertTrue(len(authkey) > 0)
129 self.assertEqual(current.get_ident(), os.getpid())
130 self.assertEqual(current.get_exitcode(), None)
131
132 def _test(self, q, *args, **kwds):
133 current = self.current_process()
134 q.put(args)
135 q.put(kwds)
136 q.put(current.get_name())
137 if self.TYPE != 'threads':
138 q.put(bytes(current.get_authkey()))
139 q.put(current.pid)
140
141 def test_process(self):
142 q = self.Queue(1)
143 e = self.Event()
144 args = (q, 1, 2)
145 kwargs = {'hello':23, 'bye':2.54}
146 name = 'SomeProcess'
147 p = self.Process(
148 target=self._test, args=args, kwargs=kwargs, name=name
149 )
150 p.set_daemon(True)
151 current = self.current_process()
152
153 if self.TYPE != 'threads':
154 self.assertEquals(p.get_authkey(), current.get_authkey())
155 self.assertEquals(p.is_alive(), False)
156 self.assertEquals(p.is_daemon(), True)
157 self.assertTrue(p not in self.active_children())
158 self.assertTrue(type(self.active_children()) is list)
159 self.assertEqual(p.get_exitcode(), None)
160
161 p.start()
162
163 self.assertEquals(p.get_exitcode(), None)
164 self.assertEquals(p.is_alive(), True)
165 self.assertTrue(p in self.active_children())
166
167 self.assertEquals(q.get(), args[1:])
168 self.assertEquals(q.get(), kwargs)
169 self.assertEquals(q.get(), p.get_name())
170 if self.TYPE != 'threads':
171 self.assertEquals(q.get(), current.get_authkey())
172 self.assertEquals(q.get(), p.pid)
173
174 p.join()
175
176 self.assertEquals(p.get_exitcode(), 0)
177 self.assertEquals(p.is_alive(), False)
178 self.assertTrue(p not in self.active_children())
179
180 def _test_terminate(self):
181 time.sleep(1000)
182
183 def test_terminate(self):
184 if self.TYPE == 'threads':
185 return
186
187 p = self.Process(target=self._test_terminate)
188 p.set_daemon(True)
189 p.start()
190
191 self.assertEqual(p.is_alive(), True)
192 self.assertTrue(p in self.active_children())
193 self.assertEqual(p.get_exitcode(), None)
194
195 p.terminate()
196
197 join = TimingWrapper(p.join)
198 self.assertEqual(join(), None)
199 self.assertTimingAlmostEqual(join.elapsed, 0.0)
200
201 self.assertEqual(p.is_alive(), False)
202 self.assertTrue(p not in self.active_children())
203
204 p.join()
205
206 # XXX sometimes get p.get_exitcode() == 0 on Windows ...
207 #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
208
209 def test_cpu_count(self):
210 try:
211 cpus = multiprocessing.cpu_count()
212 except NotImplementedError:
213 cpus = 1
214 self.assertTrue(type(cpus) is int)
215 self.assertTrue(cpus >= 1)
216
217 def test_active_children(self):
218 self.assertEqual(type(self.active_children()), list)
219
220 p = self.Process(target=time.sleep, args=(DELTA,))
221 self.assertTrue(p not in self.active_children())
222
223 p.start()
224 self.assertTrue(p in self.active_children())
225
226 p.join()
227 self.assertTrue(p not in self.active_children())
228
229 def _test_recursion(self, wconn, id):
230 from multiprocessing import forking
231 wconn.send(id)
232 if len(id) < 2:
233 for i in range(2):
234 p = self.Process(
235 target=self._test_recursion, args=(wconn, id+[i])
236 )
237 p.start()
238 p.join()
239
240 def test_recursion(self):
241 rconn, wconn = self.Pipe(duplex=False)
242 self._test_recursion(wconn, [])
243
244 time.sleep(DELTA)
245 result = []
246 while rconn.poll():
247 result.append(rconn.recv())
248
249 expected = [
250 [],
251 [0],
252 [0, 0],
253 [0, 1],
254 [1],
255 [1, 0],
256 [1, 1]
257 ]
258 self.assertEqual(result, expected)
259
260#
261#
262#
263
264class _UpperCaser(multiprocessing.Process):
265
266 def __init__(self):
267 multiprocessing.Process.__init__(self)
268 self.child_conn, self.parent_conn = multiprocessing.Pipe()
269
270 def run(self):
271 self.parent_conn.close()
272 for s in iter(self.child_conn.recv, None):
273 self.child_conn.send(s.upper())
274 self.child_conn.close()
275
276 def submit(self, s):
277 assert type(s) is str
278 self.parent_conn.send(s)
279 return self.parent_conn.recv()
280
281 def stop(self):
282 self.parent_conn.send(None)
283 self.parent_conn.close()
284 self.child_conn.close()
285
286class _TestSubclassingProcess(BaseTestCase):
287
288 ALLOWED_TYPES = ('processes',)
289
290 def test_subclassing(self):
291 uppercaser = _UpperCaser()
292 uppercaser.start()
293 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
294 self.assertEqual(uppercaser.submit('world'), 'WORLD')
295 uppercaser.stop()
296 uppercaser.join()
297
298#
299#
300#
301
302def queue_empty(q):
303 if hasattr(q, 'empty'):
304 return q.empty()
305 else:
306 return q.qsize() == 0
307
308def queue_full(q, maxsize):
309 if hasattr(q, 'full'):
310 return q.full()
311 else:
312 return q.qsize() == maxsize
313
314
315class _TestQueue(BaseTestCase):
316
317
318 def _test_put(self, queue, child_can_start, parent_can_continue):
319 child_can_start.wait()
320 for i in range(6):
321 queue.get()
322 parent_can_continue.set()
323
324 def test_put(self):
325 MAXSIZE = 6
326 queue = self.Queue(maxsize=MAXSIZE)
327 child_can_start = self.Event()
328 parent_can_continue = self.Event()
329
330 proc = self.Process(
331 target=self._test_put,
332 args=(queue, child_can_start, parent_can_continue)
333 )
334 proc.set_daemon(True)
335 proc.start()
336
337 self.assertEqual(queue_empty(queue), True)
338 self.assertEqual(queue_full(queue, MAXSIZE), False)
339
340 queue.put(1)
341 queue.put(2, True)
342 queue.put(3, True, None)
343 queue.put(4, False)
344 queue.put(5, False, None)
345 queue.put_nowait(6)
346
347 # the values may be in buffer but not yet in pipe so sleep a bit
348 time.sleep(DELTA)
349
350 self.assertEqual(queue_empty(queue), False)
351 self.assertEqual(queue_full(queue, MAXSIZE), True)
352
353 put = TimingWrapper(queue.put)
354 put_nowait = TimingWrapper(queue.put_nowait)
355
356 self.assertRaises(pyqueue.Full, put, 7, False)
357 self.assertTimingAlmostEqual(put.elapsed, 0)
358
359 self.assertRaises(pyqueue.Full, put, 7, False, None)
360 self.assertTimingAlmostEqual(put.elapsed, 0)
361
362 self.assertRaises(pyqueue.Full, put_nowait, 7)
363 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
364
365 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
366 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
367
368 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
369 self.assertTimingAlmostEqual(put.elapsed, 0)
370
371 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
372 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
373
374 child_can_start.set()
375 parent_can_continue.wait()
376
377 self.assertEqual(queue_empty(queue), True)
378 self.assertEqual(queue_full(queue, MAXSIZE), False)
379
380 proc.join()
381
382 def _test_get(self, queue, child_can_start, parent_can_continue):
383 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000384 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000385 queue.put(2)
386 queue.put(3)
387 queue.put(4)
388 queue.put(5)
389 parent_can_continue.set()
390
391 def test_get(self):
392 queue = self.Queue()
393 child_can_start = self.Event()
394 parent_can_continue = self.Event()
395
396 proc = self.Process(
397 target=self._test_get,
398 args=(queue, child_can_start, parent_can_continue)
399 )
400 proc.set_daemon(True)
401 proc.start()
402
403 self.assertEqual(queue_empty(queue), True)
404
405 child_can_start.set()
406 parent_can_continue.wait()
407
408 time.sleep(DELTA)
409 self.assertEqual(queue_empty(queue), False)
410
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000411 # Hangs unexpectedly, remove for now
412 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000413 self.assertEqual(queue.get(True, None), 2)
414 self.assertEqual(queue.get(True), 3)
415 self.assertEqual(queue.get(timeout=1), 4)
416 self.assertEqual(queue.get_nowait(), 5)
417
418 self.assertEqual(queue_empty(queue), True)
419
420 get = TimingWrapper(queue.get)
421 get_nowait = TimingWrapper(queue.get_nowait)
422
423 self.assertRaises(pyqueue.Empty, get, False)
424 self.assertTimingAlmostEqual(get.elapsed, 0)
425
426 self.assertRaises(pyqueue.Empty, get, False, None)
427 self.assertTimingAlmostEqual(get.elapsed, 0)
428
429 self.assertRaises(pyqueue.Empty, get_nowait)
430 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
431
432 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
433 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
434
435 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
436 self.assertTimingAlmostEqual(get.elapsed, 0)
437
438 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
439 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
440
441 proc.join()
442
443 def _test_fork(self, queue):
444 for i in range(10, 20):
445 queue.put(i)
446 # note that at this point the items may only be buffered, so the
447 # process cannot shutdown until the feeder thread has finished
448 # pushing items onto the pipe.
449
450 def test_fork(self):
451 # Old versions of Queue would fail to create a new feeder
452 # thread for a forked process if the original process had its
453 # own feeder thread. This test checks that this no longer
454 # happens.
455
456 queue = self.Queue()
457
458 # put items on queue so that main process starts a feeder thread
459 for i in range(10):
460 queue.put(i)
461
462 # wait to make sure thread starts before we fork a new process
463 time.sleep(DELTA)
464
465 # fork process
466 p = self.Process(target=self._test_fork, args=(queue,))
467 p.start()
468
469 # check that all expected items are in the queue
470 for i in range(20):
471 self.assertEqual(queue.get(), i)
472 self.assertRaises(pyqueue.Empty, queue.get, False)
473
474 p.join()
475
476 def test_qsize(self):
477 q = self.Queue()
478 try:
479 self.assertEqual(q.qsize(), 0)
480 except NotImplementedError:
481 return
482 q.put(1)
483 self.assertEqual(q.qsize(), 1)
484 q.put(5)
485 self.assertEqual(q.qsize(), 2)
486 q.get()
487 self.assertEqual(q.qsize(), 1)
488 q.get()
489 self.assertEqual(q.qsize(), 0)
490
491 def _test_task_done(self, q):
492 for obj in iter(q.get, None):
493 time.sleep(DELTA)
494 q.task_done()
495
496 def test_task_done(self):
497 queue = self.JoinableQueue()
498
499 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
500 return
501
502 workers = [self.Process(target=self._test_task_done, args=(queue,))
503 for i in range(4)]
504
505 for p in workers:
506 p.start()
507
508 for i in range(10):
509 queue.put(i)
510
511 queue.join()
512
513 for p in workers:
514 queue.put(None)
515
516 for p in workers:
517 p.join()
518
519#
520#
521#
522
523class _TestLock(BaseTestCase):
524
525 def test_lock(self):
526 lock = self.Lock()
527 self.assertEqual(lock.acquire(), True)
528 self.assertEqual(lock.acquire(False), False)
529 self.assertEqual(lock.release(), None)
530 self.assertRaises((ValueError, threading.ThreadError), lock.release)
531
532 def test_rlock(self):
533 lock = self.RLock()
534 self.assertEqual(lock.acquire(), True)
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(), True)
537 self.assertEqual(lock.release(), None)
538 self.assertEqual(lock.release(), None)
539 self.assertEqual(lock.release(), None)
540 self.assertRaises((AssertionError, RuntimeError), lock.release)
541
542
543class _TestSemaphore(BaseTestCase):
544
545 def _test_semaphore(self, sem):
546 self.assertReturnsIfImplemented(2, get_value, sem)
547 self.assertEqual(sem.acquire(), True)
548 self.assertReturnsIfImplemented(1, get_value, sem)
549 self.assertEqual(sem.acquire(), True)
550 self.assertReturnsIfImplemented(0, get_value, sem)
551 self.assertEqual(sem.acquire(False), False)
552 self.assertReturnsIfImplemented(0, get_value, sem)
553 self.assertEqual(sem.release(), None)
554 self.assertReturnsIfImplemented(1, get_value, sem)
555 self.assertEqual(sem.release(), None)
556 self.assertReturnsIfImplemented(2, get_value, sem)
557
558 def test_semaphore(self):
559 sem = self.Semaphore(2)
560 self._test_semaphore(sem)
561 self.assertEqual(sem.release(), None)
562 self.assertReturnsIfImplemented(3, get_value, sem)
563 self.assertEqual(sem.release(), None)
564 self.assertReturnsIfImplemented(4, get_value, sem)
565
566 def test_bounded_semaphore(self):
567 sem = self.BoundedSemaphore(2)
568 self._test_semaphore(sem)
569 # Currently fails on OS/X
570 #if HAVE_GETVALUE:
571 # self.assertRaises(ValueError, sem.release)
572 # self.assertReturnsIfImplemented(2, get_value, sem)
573
574 def test_timeout(self):
575 if self.TYPE != 'processes':
576 return
577
578 sem = self.Semaphore(0)
579 acquire = TimingWrapper(sem.acquire)
580
581 self.assertEqual(acquire(False), False)
582 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
583
584 self.assertEqual(acquire(False, None), False)
585 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
586
587 self.assertEqual(acquire(False, TIMEOUT1), False)
588 self.assertTimingAlmostEqual(acquire.elapsed, 0)
589
590 self.assertEqual(acquire(True, TIMEOUT2), False)
591 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
592
593 self.assertEqual(acquire(timeout=TIMEOUT3), False)
594 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
595
596
597class _TestCondition(BaseTestCase):
598
599 def f(self, cond, sleeping, woken, timeout=None):
600 cond.acquire()
601 sleeping.release()
602 cond.wait(timeout)
603 woken.release()
604 cond.release()
605
606 def check_invariant(self, cond):
607 # this is only supposed to succeed when there are no sleepers
608 if self.TYPE == 'processes':
609 try:
610 sleepers = (cond._sleeping_count.get_value() -
611 cond._woken_count.get_value())
612 self.assertEqual(sleepers, 0)
613 self.assertEqual(cond._wait_semaphore.get_value(), 0)
614 except NotImplementedError:
615 pass
616
617 def test_notify(self):
618 cond = self.Condition()
619 sleeping = self.Semaphore(0)
620 woken = self.Semaphore(0)
621
622 p = self.Process(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000623 try:
624 p.set_daemon(True)
625 except AttributeError:
626 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000627 p.start()
628
629 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Petersonfae4c622008-08-18 18:40:08 +0000630 try:
631 p.set_daemon(True)
632 except AttributeError:
633 p.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000634 p.start()
635
636 # wait for both children to start sleeping
637 sleeping.acquire()
638 sleeping.acquire()
639
640 # check no process/thread has woken up
641 time.sleep(DELTA)
642 self.assertReturnsIfImplemented(0, get_value, woken)
643
644 # wake up one process/thread
645 cond.acquire()
646 cond.notify()
647 cond.release()
648
649 # check one process/thread has woken up
650 time.sleep(DELTA)
651 self.assertReturnsIfImplemented(1, get_value, woken)
652
653 # wake up another
654 cond.acquire()
655 cond.notify()
656 cond.release()
657
658 # check other has woken up
659 time.sleep(DELTA)
660 self.assertReturnsIfImplemented(2, get_value, woken)
661
662 # check state is not mucked up
663 self.check_invariant(cond)
664 p.join()
665
666 def test_notify_all(self):
667 cond = self.Condition()
668 sleeping = self.Semaphore(0)
669 woken = self.Semaphore(0)
670
671 # start some threads/processes which will timeout
672 for i in range(3):
673 p = self.Process(target=self.f,
674 args=(cond, sleeping, woken, TIMEOUT1))
675 p.set_daemon(True)
676 p.start()
677
678 t = threading.Thread(target=self.f,
679 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson72753702008-08-18 18:09:21 +0000680 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000681 t.start()
682
683 # wait for them all to sleep
684 for i in range(6):
685 sleeping.acquire()
686
687 # check they have all timed out
688 for i in range(6):
689 woken.acquire()
690 self.assertReturnsIfImplemented(0, get_value, woken)
691
692 # check state is not mucked up
693 self.check_invariant(cond)
694
695 # start some more threads/processes
696 for i in range(3):
697 p = self.Process(target=self.f, args=(cond, sleeping, woken))
698 p.set_daemon(True)
699 p.start()
700
701 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson72753702008-08-18 18:09:21 +0000702 t.daemon = True
Benjamin Petersone711caf2008-06-11 16:44:04 +0000703 t.start()
704
705 # wait for them to all sleep
706 for i in range(6):
707 sleeping.acquire()
708
709 # check no process/thread has woken up
710 time.sleep(DELTA)
711 self.assertReturnsIfImplemented(0, get_value, woken)
712
713 # wake them all up
714 cond.acquire()
715 cond.notify_all()
716 cond.release()
717
718 # check they have all woken
719 time.sleep(DELTA)
720 self.assertReturnsIfImplemented(6, get_value, woken)
721
722 # check state is not mucked up
723 self.check_invariant(cond)
724
725 def test_timeout(self):
726 cond = self.Condition()
727 wait = TimingWrapper(cond.wait)
728 cond.acquire()
729 res = wait(TIMEOUT1)
730 cond.release()
731 self.assertEqual(res, None)
732 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
733
734
735class _TestEvent(BaseTestCase):
736
737 def _test_event(self, event):
738 time.sleep(TIMEOUT2)
739 event.set()
740
741 def test_event(self):
742 event = self.Event()
743 wait = TimingWrapper(event.wait)
744
745 # Removed temporaily, due to API shear, this does not
746 # work with threading._Event objects. is_set == isSet
747 #self.assertEqual(event.is_set(), False)
748
749 self.assertEqual(wait(0.0), None)
750 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
751 self.assertEqual(wait(TIMEOUT1), None)
752 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
753
754 event.set()
755
756 # See note above on the API differences
757 # self.assertEqual(event.is_set(), True)
758 self.assertEqual(wait(), None)
759 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
760 self.assertEqual(wait(TIMEOUT1), None)
761 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
762 # self.assertEqual(event.is_set(), True)
763
764 event.clear()
765
766 #self.assertEqual(event.is_set(), False)
767
768 self.Process(target=self._test_event, args=(event,)).start()
769 self.assertEqual(wait(), None)
770
771#
772#
773#
774
775class _TestValue(BaseTestCase):
776
777 codes_values = [
778 ('i', 4343, 24234),
779 ('d', 3.625, -4.25),
780 ('h', -232, 234),
781 ('c', latin('x'), latin('y'))
782 ]
783
784 def _test(self, values):
785 for sv, cv in zip(values, self.codes_values):
786 sv.value = cv[2]
787
788
789 def test_value(self, raw=False):
790 if self.TYPE != 'processes':
791 return
792
793 if raw:
794 values = [self.RawValue(code, value)
795 for code, value, _ in self.codes_values]
796 else:
797 values = [self.Value(code, value)
798 for code, value, _ in self.codes_values]
799
800 for sv, cv in zip(values, self.codes_values):
801 self.assertEqual(sv.value, cv[1])
802
803 proc = self.Process(target=self._test, args=(values,))
804 proc.start()
805 proc.join()
806
807 for sv, cv in zip(values, self.codes_values):
808 self.assertEqual(sv.value, cv[2])
809
810 def test_rawvalue(self):
811 self.test_value(raw=True)
812
813 def test_getobj_getlock(self):
814 if self.TYPE != 'processes':
815 return
816
817 val1 = self.Value('i', 5)
818 lock1 = val1.get_lock()
819 obj1 = val1.get_obj()
820
821 val2 = self.Value('i', 5, lock=None)
822 lock2 = val2.get_lock()
823 obj2 = val2.get_obj()
824
825 lock = self.Lock()
826 val3 = self.Value('i', 5, lock=lock)
827 lock3 = val3.get_lock()
828 obj3 = val3.get_obj()
829 self.assertEqual(lock, lock3)
830
831 arr4 = self.RawValue('i', 5)
832 self.assertFalse(hasattr(arr4, 'get_lock'))
833 self.assertFalse(hasattr(arr4, 'get_obj'))
834
835
836class _TestArray(BaseTestCase):
837
838 def f(self, seq):
839 for i in range(1, len(seq)):
840 seq[i] += seq[i-1]
841
842 def test_array(self, raw=False):
843 if self.TYPE != 'processes':
844 return
845
846 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
847 if raw:
848 arr = self.RawArray('i', seq)
849 else:
850 arr = self.Array('i', seq)
851
852 self.assertEqual(len(arr), len(seq))
853 self.assertEqual(arr[3], seq[3])
854 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
855
856 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
857
858 self.assertEqual(list(arr[:]), seq)
859
860 self.f(seq)
861
862 p = self.Process(target=self.f, args=(arr,))
863 p.start()
864 p.join()
865
866 self.assertEqual(list(arr[:]), seq)
867
868 def test_rawarray(self):
869 self.test_array(raw=True)
870
871 def test_getobj_getlock_obj(self):
872 if self.TYPE != 'processes':
873 return
874
875 arr1 = self.Array('i', list(range(10)))
876 lock1 = arr1.get_lock()
877 obj1 = arr1.get_obj()
878
879 arr2 = self.Array('i', list(range(10)), lock=None)
880 lock2 = arr2.get_lock()
881 obj2 = arr2.get_obj()
882
883 lock = self.Lock()
884 arr3 = self.Array('i', list(range(10)), lock=lock)
885 lock3 = arr3.get_lock()
886 obj3 = arr3.get_obj()
887 self.assertEqual(lock, lock3)
888
889 arr4 = self.RawArray('i', list(range(10)))
890 self.assertFalse(hasattr(arr4, 'get_lock'))
891 self.assertFalse(hasattr(arr4, 'get_obj'))
892
893#
894#
895#
896
897class _TestContainers(BaseTestCase):
898
899 ALLOWED_TYPES = ('manager',)
900
901 def test_list(self):
902 a = self.list(list(range(10)))
903 self.assertEqual(a[:], list(range(10)))
904
905 b = self.list()
906 self.assertEqual(b[:], [])
907
908 b.extend(list(range(5)))
909 self.assertEqual(b[:], list(range(5)))
910
911 self.assertEqual(b[2], 2)
912 self.assertEqual(b[2:10], [2,3,4])
913
914 b *= 2
915 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
916
917 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
918
919 self.assertEqual(a[:], list(range(10)))
920
921 d = [a, b]
922 e = self.list(d)
923 self.assertEqual(
924 e[:],
925 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
926 )
927
928 f = self.list([a])
929 a.append('hello')
930 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
931
932 def test_dict(self):
933 d = self.dict()
934 indices = list(range(65, 70))
935 for i in indices:
936 d[i] = chr(i)
937 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
938 self.assertEqual(sorted(d.keys()), indices)
939 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
940 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
941
942 def test_namespace(self):
943 n = self.Namespace()
944 n.name = 'Bob'
945 n.job = 'Builder'
946 n._hidden = 'hidden'
947 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
948 del n.job
949 self.assertEqual(str(n), "Namespace(name='Bob')")
950 self.assertTrue(hasattr(n, 'name'))
951 self.assertTrue(not hasattr(n, 'job'))
952
953#
954#
955#
956
957def sqr(x, wait=0.0):
958 time.sleep(wait)
959 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000960class _TestPool(BaseTestCase):
961
962 def test_apply(self):
963 papply = self.pool.apply
964 self.assertEqual(papply(sqr, (5,)), sqr(5))
965 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
966
967 def test_map(self):
968 pmap = self.pool.map
969 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
970 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
971 list(map(sqr, list(range(100)))))
972
973 def test_async(self):
974 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
975 get = TimingWrapper(res.get)
976 self.assertEqual(get(), 49)
977 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
978
979 def test_async_timeout(self):
980 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
981 get = TimingWrapper(res.get)
982 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
983 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
984
985 def test_imap(self):
986 it = self.pool.imap(sqr, list(range(10)))
987 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
988
989 it = self.pool.imap(sqr, list(range(10)))
990 for i in range(10):
991 self.assertEqual(next(it), i*i)
992 self.assertRaises(StopIteration, it.__next__)
993
994 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
995 for i in range(1000):
996 self.assertEqual(next(it), i*i)
997 self.assertRaises(StopIteration, it.__next__)
998
999 def test_imap_unordered(self):
1000 it = self.pool.imap_unordered(sqr, list(range(1000)))
1001 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1002
1003 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1004 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1005
1006 def test_make_pool(self):
1007 p = multiprocessing.Pool(3)
1008 self.assertEqual(3, len(p._pool))
1009 p.close()
1010 p.join()
1011
1012 def test_terminate(self):
1013 if self.TYPE == 'manager':
1014 # On Unix a forked process increfs each shared object to
1015 # which its parent process held a reference. If the
1016 # forked process gets terminated then there is likely to
1017 # be a reference leak. So to prevent
1018 # _TestZZZNumberOfObjects from failing we skip this test
1019 # when using a manager.
1020 return
1021
1022 result = self.pool.map_async(
1023 time.sleep, [0.1 for i in range(10000)], chunksize=1
1024 )
1025 self.pool.terminate()
1026 join = TimingWrapper(self.pool.join)
1027 join()
1028 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001029#
1030# Test that manager has expected number of shared objects left
1031#
1032
1033class _TestZZZNumberOfObjects(BaseTestCase):
1034 # Because test cases are sorted alphabetically, this one will get
1035 # run after all the other tests for the manager. It tests that
1036 # there have been no "reference leaks" for the manager's shared
1037 # objects. Note the comment in _TestPool.test_terminate().
1038 ALLOWED_TYPES = ('manager',)
1039
1040 def test_number_of_objects(self):
1041 EXPECTED_NUMBER = 1 # the pool object is still alive
1042 multiprocessing.active_children() # discard dead process objs
1043 gc.collect() # do garbage collection
1044 refs = self.manager._number_of_objects()
1045 if refs != EXPECTED_NUMBER:
Georg Brandl3dbca812008-07-23 16:10:53 +00001046 print(self.manager._debug_info())
Benjamin Petersone711caf2008-06-11 16:44:04 +00001047
1048 self.assertEqual(refs, EXPECTED_NUMBER)
1049
1050#
1051# Test of creating a customized manager class
1052#
1053
1054from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1055
1056class FooBar(object):
1057 def f(self):
1058 return 'f()'
1059 def g(self):
1060 raise ValueError
1061 def _h(self):
1062 return '_h()'
1063
1064def baz():
1065 for i in range(10):
1066 yield i*i
1067
1068class IteratorProxy(BaseProxy):
1069 _exposed_ = ('next', '__next__')
1070 def __iter__(self):
1071 return self
1072 def __next__(self):
1073 return self._callmethod('next')
1074 def __next__(self):
1075 return self._callmethod('__next__')
1076
1077class MyManager(BaseManager):
1078 pass
1079
1080MyManager.register('Foo', callable=FooBar)
1081MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1082MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1083
1084
1085class _TestMyManager(BaseTestCase):
1086
1087 ALLOWED_TYPES = ('manager',)
1088
1089 def test_mymanager(self):
1090 manager = MyManager()
1091 manager.start()
1092
1093 foo = manager.Foo()
1094 bar = manager.Bar()
1095 baz = manager.baz()
1096
1097 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1098 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1099
1100 self.assertEqual(foo_methods, ['f', 'g'])
1101 self.assertEqual(bar_methods, ['f', '_h'])
1102
1103 self.assertEqual(foo.f(), 'f()')
1104 self.assertRaises(ValueError, foo.g)
1105 self.assertEqual(foo._callmethod('f'), 'f()')
1106 self.assertRaises(RemoteError, foo._callmethod, '_h')
1107
1108 self.assertEqual(bar.f(), 'f()')
1109 self.assertEqual(bar._h(), '_h()')
1110 self.assertEqual(bar._callmethod('f'), 'f()')
1111 self.assertEqual(bar._callmethod('_h'), '_h()')
1112
1113 self.assertEqual(list(baz), [i*i for i in range(10)])
1114
1115 manager.shutdown()
1116
1117#
1118# Test of connecting to a remote server and using xmlrpclib for serialization
1119#
1120
1121_queue = pyqueue.Queue()
1122def get_queue():
1123 return _queue
1124
1125class QueueManager(BaseManager):
1126 '''manager class used by server process'''
1127QueueManager.register('get_queue', callable=get_queue)
1128
1129class QueueManager2(BaseManager):
1130 '''manager class which specifies the same interface as QueueManager'''
1131QueueManager2.register('get_queue')
1132
1133
1134SERIALIZER = 'xmlrpclib'
1135
1136class _TestRemoteManager(BaseTestCase):
1137
1138 ALLOWED_TYPES = ('manager',)
1139
1140 def _putter(self, address, authkey):
1141 manager = QueueManager2(
1142 address=address, authkey=authkey, serializer=SERIALIZER
1143 )
1144 manager.connect()
1145 queue = manager.get_queue()
1146 queue.put(('hello world', None, True, 2.25))
1147
1148 def test_remote(self):
1149 authkey = os.urandom(32)
1150
1151 manager = QueueManager(
1152 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1153 )
1154 manager.start()
1155
1156 p = self.Process(target=self._putter, args=(manager.address, authkey))
1157 p.start()
1158
1159 manager2 = QueueManager2(
1160 address=manager.address, authkey=authkey, serializer=SERIALIZER
1161 )
1162 manager2.connect()
1163 queue = manager2.get_queue()
1164
1165 # Note that xmlrpclib will deserialize object as a list not a tuple
1166 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1167
1168 # Because we are using xmlrpclib for serialization instead of
1169 # pickle this will cause a serialization error.
1170 self.assertRaises(Exception, queue.put, time.sleep)
1171
1172 # Make queue finalizer run before the server is stopped
1173 del queue
1174 manager.shutdown()
1175
1176#
1177#
1178#
1179
1180SENTINEL = latin('')
1181
1182class _TestConnection(BaseTestCase):
1183
1184 ALLOWED_TYPES = ('processes', 'threads')
1185
1186 def _echo(self, conn):
1187 for msg in iter(conn.recv_bytes, SENTINEL):
1188 conn.send_bytes(msg)
1189 conn.close()
1190
1191 def test_connection(self):
1192 conn, child_conn = self.Pipe()
1193
1194 p = self.Process(target=self._echo, args=(child_conn,))
1195 p.set_daemon(True)
1196 p.start()
1197
1198 seq = [1, 2.25, None]
1199 msg = latin('hello world')
1200 longmsg = msg * 10
1201 arr = array.array('i', list(range(4)))
1202
1203 if self.TYPE == 'processes':
1204 self.assertEqual(type(conn.fileno()), int)
1205
1206 self.assertEqual(conn.send(seq), None)
1207 self.assertEqual(conn.recv(), seq)
1208
1209 self.assertEqual(conn.send_bytes(msg), None)
1210 self.assertEqual(conn.recv_bytes(), msg)
1211
1212 if self.TYPE == 'processes':
1213 buffer = array.array('i', [0]*10)
1214 expected = list(arr) + [0] * (10 - len(arr))
1215 self.assertEqual(conn.send_bytes(arr), None)
1216 self.assertEqual(conn.recv_bytes_into(buffer),
1217 len(arr) * buffer.itemsize)
1218 self.assertEqual(list(buffer), expected)
1219
1220 buffer = array.array('i', [0]*10)
1221 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1222 self.assertEqual(conn.send_bytes(arr), None)
1223 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1224 len(arr) * buffer.itemsize)
1225 self.assertEqual(list(buffer), expected)
1226
1227 buffer = bytearray(latin(' ' * 40))
1228 self.assertEqual(conn.send_bytes(longmsg), None)
1229 try:
1230 res = conn.recv_bytes_into(buffer)
1231 except multiprocessing.BufferTooShort as e:
1232 self.assertEqual(e.args, (longmsg,))
1233 else:
1234 self.fail('expected BufferTooShort, got %s' % res)
1235
1236 poll = TimingWrapper(conn.poll)
1237
1238 self.assertEqual(poll(), False)
1239 self.assertTimingAlmostEqual(poll.elapsed, 0)
1240
1241 self.assertEqual(poll(TIMEOUT1), False)
1242 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1243
1244 conn.send(None)
1245
1246 self.assertEqual(poll(TIMEOUT1), True)
1247 self.assertTimingAlmostEqual(poll.elapsed, 0)
1248
1249 self.assertEqual(conn.recv(), None)
1250
1251 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1252 conn.send_bytes(really_big_msg)
1253 self.assertEqual(conn.recv_bytes(), really_big_msg)
1254
1255 conn.send_bytes(SENTINEL) # tell child to quit
1256 child_conn.close()
1257
1258 if self.TYPE == 'processes':
1259 self.assertEqual(conn.readable, True)
1260 self.assertEqual(conn.writable, True)
1261 self.assertRaises(EOFError, conn.recv)
1262 self.assertRaises(EOFError, conn.recv_bytes)
1263
1264 p.join()
1265
1266 def test_duplex_false(self):
1267 reader, writer = self.Pipe(duplex=False)
1268 self.assertEqual(writer.send(1), None)
1269 self.assertEqual(reader.recv(), 1)
1270 if self.TYPE == 'processes':
1271 self.assertEqual(reader.readable, True)
1272 self.assertEqual(reader.writable, False)
1273 self.assertEqual(writer.readable, False)
1274 self.assertEqual(writer.writable, True)
1275 self.assertRaises(IOError, reader.send, 2)
1276 self.assertRaises(IOError, writer.recv)
1277 self.assertRaises(IOError, writer.poll)
1278
1279 def test_spawn_close(self):
1280 # We test that a pipe connection can be closed by parent
1281 # process immediately after child is spawned. On Windows this
1282 # would have sometimes failed on old versions because
1283 # child_conn would be closed before the child got a chance to
1284 # duplicate it.
1285 conn, child_conn = self.Pipe()
1286
1287 p = self.Process(target=self._echo, args=(child_conn,))
1288 p.start()
1289 child_conn.close() # this might complete before child initializes
1290
1291 msg = latin('hello')
1292 conn.send_bytes(msg)
1293 self.assertEqual(conn.recv_bytes(), msg)
1294
1295 conn.send_bytes(SENTINEL)
1296 conn.close()
1297 p.join()
1298
1299 def test_sendbytes(self):
1300 if self.TYPE != 'processes':
1301 return
1302
1303 msg = latin('abcdefghijklmnopqrstuvwxyz')
1304 a, b = self.Pipe()
1305
1306 a.send_bytes(msg)
1307 self.assertEqual(b.recv_bytes(), msg)
1308
1309 a.send_bytes(msg, 5)
1310 self.assertEqual(b.recv_bytes(), msg[5:])
1311
1312 a.send_bytes(msg, 7, 8)
1313 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1314
1315 a.send_bytes(msg, 26)
1316 self.assertEqual(b.recv_bytes(), latin(''))
1317
1318 a.send_bytes(msg, 26, 0)
1319 self.assertEqual(b.recv_bytes(), latin(''))
1320
1321 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1322
1323 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1324
1325 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1326
1327 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1328
1329 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1330
Benjamin Petersone711caf2008-06-11 16:44:04 +00001331class _TestListenerClient(BaseTestCase):
1332
1333 ALLOWED_TYPES = ('processes', 'threads')
1334
1335 def _test(self, address):
1336 conn = self.connection.Client(address)
1337 conn.send('hello')
1338 conn.close()
1339
1340 def test_listener_client(self):
1341 for family in self.connection.families:
1342 l = self.connection.Listener(family=family)
1343 p = self.Process(target=self._test, args=(l.address,))
1344 p.set_daemon(True)
1345 p.start()
1346 conn = l.accept()
1347 self.assertEqual(conn.recv(), 'hello')
1348 p.join()
1349 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001350#
1351# Test of sending connection and socket objects between processes
1352#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001353"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001354class _TestPicklingConnections(BaseTestCase):
1355
1356 ALLOWED_TYPES = ('processes',)
1357
1358 def _listener(self, conn, families):
1359 for fam in families:
1360 l = self.connection.Listener(family=fam)
1361 conn.send(l.address)
1362 new_conn = l.accept()
1363 conn.send(new_conn)
1364
1365 if self.TYPE == 'processes':
1366 l = socket.socket()
1367 l.bind(('localhost', 0))
1368 conn.send(l.getsockname())
1369 l.listen(1)
1370 new_conn, addr = l.accept()
1371 conn.send(new_conn)
1372
1373 conn.recv()
1374
1375 def _remote(self, conn):
1376 for (address, msg) in iter(conn.recv, None):
1377 client = self.connection.Client(address)
1378 client.send(msg.upper())
1379 client.close()
1380
1381 if self.TYPE == 'processes':
1382 address, msg = conn.recv()
1383 client = socket.socket()
1384 client.connect(address)
1385 client.sendall(msg.upper())
1386 client.close()
1387
1388 conn.close()
1389
1390 def test_pickling(self):
1391 try:
1392 multiprocessing.allow_connection_pickling()
1393 except ImportError:
1394 return
1395
1396 families = self.connection.families
1397
1398 lconn, lconn0 = self.Pipe()
1399 lp = self.Process(target=self._listener, args=(lconn0, families))
1400 lp.start()
1401 lconn0.close()
1402
1403 rconn, rconn0 = self.Pipe()
1404 rp = self.Process(target=self._remote, args=(rconn0,))
1405 rp.start()
1406 rconn0.close()
1407
1408 for fam in families:
1409 msg = ('This connection uses family %s' % fam).encode('ascii')
1410 address = lconn.recv()
1411 rconn.send((address, msg))
1412 new_conn = lconn.recv()
1413 self.assertEqual(new_conn.recv(), msg.upper())
1414
1415 rconn.send(None)
1416
1417 if self.TYPE == 'processes':
1418 msg = latin('This connection uses a normal socket')
1419 address = lconn.recv()
1420 rconn.send((address, msg))
1421 if hasattr(socket, 'fromfd'):
1422 new_conn = lconn.recv()
1423 self.assertEqual(new_conn.recv(100), msg.upper())
1424 else:
1425 # XXX On Windows with Py2.6 need to backport fromfd()
1426 discard = lconn.recv_bytes()
1427
1428 lconn.send(None)
1429
1430 rconn.close()
1431 lconn.close()
1432
1433 lp.join()
1434 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001435"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001436#
1437#
1438#
1439
1440class _TestHeap(BaseTestCase):
1441
1442 ALLOWED_TYPES = ('processes',)
1443
1444 def test_heap(self):
1445 iterations = 5000
1446 maxblocks = 50
1447 blocks = []
1448
1449 # create and destroy lots of blocks of different sizes
1450 for i in range(iterations):
1451 size = int(random.lognormvariate(0, 1) * 1000)
1452 b = multiprocessing.heap.BufferWrapper(size)
1453 blocks.append(b)
1454 if len(blocks) > maxblocks:
1455 i = random.randrange(maxblocks)
1456 del blocks[i]
1457
1458 # get the heap object
1459 heap = multiprocessing.heap.BufferWrapper._heap
1460
1461 # verify the state of the heap
1462 all = []
1463 occupied = 0
1464 for L in list(heap._len_to_seq.values()):
1465 for arena, start, stop in L:
1466 all.append((heap._arenas.index(arena), start, stop,
1467 stop-start, 'free'))
1468 for arena, start, stop in heap._allocated_blocks:
1469 all.append((heap._arenas.index(arena), start, stop,
1470 stop-start, 'occupied'))
1471 occupied += (stop-start)
1472
1473 all.sort()
1474
1475 for i in range(len(all)-1):
1476 (arena, start, stop) = all[i][:3]
1477 (narena, nstart, nstop) = all[i+1][:3]
1478 self.assertTrue((arena != narena and nstart == 0) or
1479 (stop == nstart))
1480
1481#
1482#
1483#
1484
1485try:
1486 from ctypes import Structure, Value, copy, c_int, c_double
1487except ImportError:
1488 Structure = object
1489 c_int = c_double = None
1490
1491class _Foo(Structure):
1492 _fields_ = [
1493 ('x', c_int),
1494 ('y', c_double)
1495 ]
1496
1497class _TestSharedCTypes(BaseTestCase):
1498
1499 ALLOWED_TYPES = ('processes',)
1500
1501 def _double(self, x, y, foo, arr, string):
1502 x.value *= 2
1503 y.value *= 2
1504 foo.x *= 2
1505 foo.y *= 2
1506 string.value *= 2
1507 for i in range(len(arr)):
1508 arr[i] *= 2
1509
1510 def test_sharedctypes(self, lock=False):
1511 if c_int is None:
1512 return
1513
1514 x = Value('i', 7, lock=lock)
1515 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1516 foo = Value(_Foo, 3, 2, lock=lock)
1517 arr = Array('d', list(range(10)), lock=lock)
1518 string = Array('c', 20, lock=lock)
1519 string.value = 'hello'
1520
1521 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1522 p.start()
1523 p.join()
1524
1525 self.assertEqual(x.value, 14)
1526 self.assertAlmostEqual(y.value, 2.0/3.0)
1527 self.assertEqual(foo.x, 6)
1528 self.assertAlmostEqual(foo.y, 4.0)
1529 for i in range(10):
1530 self.assertAlmostEqual(arr[i], i*2)
1531 self.assertEqual(string.value, latin('hellohello'))
1532
1533 def test_synchronize(self):
1534 self.test_sharedctypes(lock=True)
1535
1536 def test_copy(self):
1537 if c_int is None:
1538 return
1539
1540 foo = _Foo(2, 5.0)
1541 bar = copy(foo)
1542 foo.x = 0
1543 foo.y = 0
1544 self.assertEqual(bar.x, 2)
1545 self.assertAlmostEqual(bar.y, 5.0)
1546
1547#
1548#
1549#
1550
1551class _TestFinalize(BaseTestCase):
1552
1553 ALLOWED_TYPES = ('processes',)
1554
1555 def _test_finalize(self, conn):
1556 class Foo(object):
1557 pass
1558
1559 a = Foo()
1560 util.Finalize(a, conn.send, args=('a',))
1561 del a # triggers callback for a
1562
1563 b = Foo()
1564 close_b = util.Finalize(b, conn.send, args=('b',))
1565 close_b() # triggers callback for b
1566 close_b() # does nothing because callback has already been called
1567 del b # does nothing because callback has already been called
1568
1569 c = Foo()
1570 util.Finalize(c, conn.send, args=('c',))
1571
1572 d10 = Foo()
1573 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1574
1575 d01 = Foo()
1576 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1577 d02 = Foo()
1578 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1579 d03 = Foo()
1580 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1581
1582 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1583
1584 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1585
1586 # call mutliprocessing's cleanup function then exit process without
1587 # garbage collecting locals
1588 util._exit_function()
1589 conn.close()
1590 os._exit(0)
1591
1592 def test_finalize(self):
1593 conn, child_conn = self.Pipe()
1594
1595 p = self.Process(target=self._test_finalize, args=(child_conn,))
1596 p.start()
1597 p.join()
1598
1599 result = [obj for obj in iter(conn.recv, 'STOP')]
1600 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1601
1602#
1603# Test that from ... import * works for each module
1604#
1605
1606class _TestImportStar(BaseTestCase):
1607
1608 ALLOWED_TYPES = ('processes',)
1609
1610 def test_import(self):
1611 modules = (
1612 'multiprocessing', 'multiprocessing.connection',
1613 'multiprocessing.heap', 'multiprocessing.managers',
1614 'multiprocessing.pool', 'multiprocessing.process',
1615 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1616 'multiprocessing.synchronize', 'multiprocessing.util'
1617 )
1618
1619 for name in modules:
1620 __import__(name)
1621 mod = sys.modules[name]
1622
1623 for attr in getattr(mod, '__all__', ()):
1624 self.assertTrue(
1625 hasattr(mod, attr),
1626 '%r does not have attribute %r' % (mod, attr)
1627 )
1628
1629#
1630# Quick test that logging works -- does not test logging output
1631#
1632
1633class _TestLogging(BaseTestCase):
1634
1635 ALLOWED_TYPES = ('processes',)
1636
1637 def test_enable_logging(self):
1638 logger = multiprocessing.get_logger()
1639 logger.setLevel(util.SUBWARNING)
1640 self.assertTrue(logger is not None)
1641 logger.debug('this will not be printed')
1642 logger.info('nor will this')
1643 logger.setLevel(LOG_LEVEL)
1644
1645 def _test_level(self, conn):
1646 logger = multiprocessing.get_logger()
1647 conn.send(logger.getEffectiveLevel())
1648
1649 def test_level(self):
1650 LEVEL1 = 32
1651 LEVEL2 = 37
1652
1653 logger = multiprocessing.get_logger()
1654 root_logger = logging.getLogger()
1655 root_level = root_logger.level
1656
1657 reader, writer = multiprocessing.Pipe(duplex=False)
1658
1659 logger.setLevel(LEVEL1)
1660 self.Process(target=self._test_level, args=(writer,)).start()
1661 self.assertEqual(LEVEL1, reader.recv())
1662
1663 logger.setLevel(logging.NOTSET)
1664 root_logger.setLevel(LEVEL2)
1665 self.Process(target=self._test_level, args=(writer,)).start()
1666 self.assertEqual(LEVEL2, reader.recv())
1667
1668 root_logger.setLevel(root_level)
1669 logger.setLevel(level=LOG_LEVEL)
1670
1671#
1672# Functions used to create test cases from the base ones in this module
1673#
1674
1675def get_attributes(Source, names):
1676 d = {}
1677 for name in names:
1678 obj = getattr(Source, name)
1679 if type(obj) == type(get_attributes):
1680 obj = staticmethod(obj)
1681 d[name] = obj
1682 return d
1683
1684def create_test_cases(Mixin, type):
1685 result = {}
1686 glob = globals()
1687 Type = type[0].upper() + type[1:]
1688
1689 for name in list(glob.keys()):
1690 if name.startswith('_Test'):
1691 base = glob[name]
1692 if type in base.ALLOWED_TYPES:
1693 newname = 'With' + Type + name[1:]
1694 class Temp(base, unittest.TestCase, Mixin):
1695 pass
1696 result[newname] = Temp
1697 Temp.__name__ = newname
1698 Temp.__module__ = Mixin.__module__
1699 return result
1700
1701#
1702# Create test cases
1703#
1704
1705class ProcessesMixin(object):
1706 TYPE = 'processes'
1707 Process = multiprocessing.Process
1708 locals().update(get_attributes(multiprocessing, (
1709 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1710 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1711 'RawArray', 'current_process', 'active_children', 'Pipe',
1712 'connection', 'JoinableQueue'
1713 )))
1714
1715testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1716globals().update(testcases_processes)
1717
1718
1719class ManagerMixin(object):
1720 TYPE = 'manager'
1721 Process = multiprocessing.Process
1722 manager = object.__new__(multiprocessing.managers.SyncManager)
1723 locals().update(get_attributes(manager, (
1724 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1725 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1726 'Namespace', 'JoinableQueue'
1727 )))
1728
1729testcases_manager = create_test_cases(ManagerMixin, type='manager')
1730globals().update(testcases_manager)
1731
1732
1733class ThreadsMixin(object):
1734 TYPE = 'threads'
1735 Process = multiprocessing.dummy.Process
1736 locals().update(get_attributes(multiprocessing.dummy, (
1737 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1738 'Condition', 'Event', 'Value', 'Array', 'current_process',
1739 'active_children', 'Pipe', 'connection', 'dict', 'list',
1740 'Namespace', 'JoinableQueue'
1741 )))
1742
1743testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1744globals().update(testcases_threads)
1745
1746#
1747#
1748#
1749
1750def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001751 if sys.platform.startswith("linux"):
1752 try:
1753 lock = multiprocessing.RLock()
1754 except OSError:
Amaury Forgeot d'Arc620626b2008-06-19 22:03:50 +00001755 from test.support import TestSkipped
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001756 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001757
Benjamin Petersone711caf2008-06-11 16:44:04 +00001758 if run is None:
1759 from test.support import run_unittest as run
1760
1761 util.get_temp_dir() # creates temp directory for use by all processes
1762
1763 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1764
Benjamin Peterson41181742008-07-02 20:22:54 +00001765 ProcessesMixin.pool = multiprocessing.Pool(4)
1766 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1767 ManagerMixin.manager.__init__()
1768 ManagerMixin.manager.start()
1769 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001770
1771 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001772 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1773 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1774 sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001775 )
1776
1777 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1778 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1779 run(suite)
1780
Benjamin Peterson41181742008-07-02 20:22:54 +00001781 ThreadsMixin.pool.terminate()
1782 ProcessesMixin.pool.terminate()
1783 ManagerMixin.pool.terminate()
1784 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001785
Benjamin Peterson41181742008-07-02 20:22:54 +00001786 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001787
1788def main():
1789 test_main(unittest.TextTestRunner(verbosity=2).run)
1790
1791if __name__ == '__main__':
1792 main()