blob: 5f5c58871f90d79cababf14f7c595b708b4d9dc0 [file] [log] [blame]
Georg Brandl86b2fb92008-07-16 03:43:04 +00001#!/usr/bin/env python
2
Benjamin Petersone711caf2008-06-11 16:44:04 +00003#
4# Unit tests for the multiprocessing package
5#
6
7import unittest
8import threading
9import queue as pyqueue
10import time
11import sys
12import os
13import gc
14import signal
15import array
16import copy
17import socket
18import random
19import logging
20
21import multiprocessing.dummy
22import multiprocessing.connection
23import multiprocessing.managers
24import multiprocessing.heap
25import multiprocessing.managers
26import multiprocessing.pool
27import _multiprocessing
28
29from multiprocessing import util
30
31#
32#
33#
34
Benjamin Peterson2bc91df2008-07-13 18:45:30 +000035def latin(s):
36 return s.encode('latin')
Benjamin Petersone711caf2008-06-11 16:44:04 +000037
Benjamin Petersone711caf2008-06-11 16:44:04 +000038#
39# Constants
40#
41
42LOG_LEVEL = util.SUBWARNING
43#LOG_LEVEL = logging.WARNING
44
45DELTA = 0.1
46CHECK_TIMINGS = False # making true makes tests take a lot longer
47 # and can sometimes cause some non-serious
48 # failures because some calls block a bit
49 # longer than expected
50if CHECK_TIMINGS:
51 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
52else:
53 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
54
55HAVE_GETVALUE = not getattr(_multiprocessing,
56 'HAVE_BROKEN_SEM_GETVALUE', False)
57
58#
59# Creates a wrapper for a function which records the time it takes to finish
60#
61
62class TimingWrapper(object):
63
64 def __init__(self, func):
65 self.func = func
66 self.elapsed = None
67
68 def __call__(self, *args, **kwds):
69 t = time.time()
70 try:
71 return self.func(*args, **kwds)
72 finally:
73 self.elapsed = time.time() - t
74
75#
76# Base class for test cases
77#
78
79class BaseTestCase(object):
80
81 ALLOWED_TYPES = ('processes', 'manager', 'threads')
82
83 def assertTimingAlmostEqual(self, a, b):
84 if CHECK_TIMINGS:
85 self.assertAlmostEqual(a, b, 1)
86
87 def assertReturnsIfImplemented(self, value, func, *args):
88 try:
89 res = func(*args)
90 except NotImplementedError:
91 pass
92 else:
93 return self.assertEqual(value, res)
94
95#
96# Return the value of a semaphore
97#
98
99def get_value(self):
100 try:
101 return self.get_value()
102 except AttributeError:
103 try:
104 return self._Semaphore__value
105 except AttributeError:
106 try:
107 return self._value
108 except AttributeError:
109 raise NotImplementedError
110
111#
112# Testcases
113#
114
115class _TestProcess(BaseTestCase):
116
117 ALLOWED_TYPES = ('processes', 'threads')
118
119 def test_current(self):
120 if self.TYPE == 'threads':
121 return
122
123 current = self.current_process()
124 authkey = current.get_authkey()
125
126 self.assertTrue(current.is_alive())
127 self.assertTrue(not current.is_daemon())
128 self.assertTrue(isinstance(authkey, bytes))
129 self.assertTrue(len(authkey) > 0)
130 self.assertEqual(current.get_ident(), os.getpid())
131 self.assertEqual(current.get_exitcode(), None)
132
133 def _test(self, q, *args, **kwds):
134 current = self.current_process()
135 q.put(args)
136 q.put(kwds)
137 q.put(current.get_name())
138 if self.TYPE != 'threads':
139 q.put(bytes(current.get_authkey()))
140 q.put(current.pid)
141
142 def test_process(self):
143 q = self.Queue(1)
144 e = self.Event()
145 args = (q, 1, 2)
146 kwargs = {'hello':23, 'bye':2.54}
147 name = 'SomeProcess'
148 p = self.Process(
149 target=self._test, args=args, kwargs=kwargs, name=name
150 )
151 p.set_daemon(True)
152 current = self.current_process()
153
154 if self.TYPE != 'threads':
155 self.assertEquals(p.get_authkey(), current.get_authkey())
156 self.assertEquals(p.is_alive(), False)
157 self.assertEquals(p.is_daemon(), True)
158 self.assertTrue(p not in self.active_children())
159 self.assertTrue(type(self.active_children()) is list)
160 self.assertEqual(p.get_exitcode(), None)
161
162 p.start()
163
164 self.assertEquals(p.get_exitcode(), None)
165 self.assertEquals(p.is_alive(), True)
166 self.assertTrue(p in self.active_children())
167
168 self.assertEquals(q.get(), args[1:])
169 self.assertEquals(q.get(), kwargs)
170 self.assertEquals(q.get(), p.get_name())
171 if self.TYPE != 'threads':
172 self.assertEquals(q.get(), current.get_authkey())
173 self.assertEquals(q.get(), p.pid)
174
175 p.join()
176
177 self.assertEquals(p.get_exitcode(), 0)
178 self.assertEquals(p.is_alive(), False)
179 self.assertTrue(p not in self.active_children())
180
181 def _test_terminate(self):
182 time.sleep(1000)
183
184 def test_terminate(self):
185 if self.TYPE == 'threads':
186 return
187
188 p = self.Process(target=self._test_terminate)
189 p.set_daemon(True)
190 p.start()
191
192 self.assertEqual(p.is_alive(), True)
193 self.assertTrue(p in self.active_children())
194 self.assertEqual(p.get_exitcode(), None)
195
196 p.terminate()
197
198 join = TimingWrapper(p.join)
199 self.assertEqual(join(), None)
200 self.assertTimingAlmostEqual(join.elapsed, 0.0)
201
202 self.assertEqual(p.is_alive(), False)
203 self.assertTrue(p not in self.active_children())
204
205 p.join()
206
207 # XXX sometimes get p.get_exitcode() == 0 on Windows ...
208 #self.assertEqual(p.get_exitcode(), -signal.SIGTERM)
209
210 def test_cpu_count(self):
211 try:
212 cpus = multiprocessing.cpu_count()
213 except NotImplementedError:
214 cpus = 1
215 self.assertTrue(type(cpus) is int)
216 self.assertTrue(cpus >= 1)
217
218 def test_active_children(self):
219 self.assertEqual(type(self.active_children()), list)
220
221 p = self.Process(target=time.sleep, args=(DELTA,))
222 self.assertTrue(p not in self.active_children())
223
224 p.start()
225 self.assertTrue(p in self.active_children())
226
227 p.join()
228 self.assertTrue(p not in self.active_children())
229
230 def _test_recursion(self, wconn, id):
231 from multiprocessing import forking
232 wconn.send(id)
233 if len(id) < 2:
234 for i in range(2):
235 p = self.Process(
236 target=self._test_recursion, args=(wconn, id+[i])
237 )
238 p.start()
239 p.join()
240
241 def test_recursion(self):
242 rconn, wconn = self.Pipe(duplex=False)
243 self._test_recursion(wconn, [])
244
245 time.sleep(DELTA)
246 result = []
247 while rconn.poll():
248 result.append(rconn.recv())
249
250 expected = [
251 [],
252 [0],
253 [0, 0],
254 [0, 1],
255 [1],
256 [1, 0],
257 [1, 1]
258 ]
259 self.assertEqual(result, expected)
260
261#
262#
263#
264
265class _UpperCaser(multiprocessing.Process):
266
267 def __init__(self):
268 multiprocessing.Process.__init__(self)
269 self.child_conn, self.parent_conn = multiprocessing.Pipe()
270
271 def run(self):
272 self.parent_conn.close()
273 for s in iter(self.child_conn.recv, None):
274 self.child_conn.send(s.upper())
275 self.child_conn.close()
276
277 def submit(self, s):
278 assert type(s) is str
279 self.parent_conn.send(s)
280 return self.parent_conn.recv()
281
282 def stop(self):
283 self.parent_conn.send(None)
284 self.parent_conn.close()
285 self.child_conn.close()
286
287class _TestSubclassingProcess(BaseTestCase):
288
289 ALLOWED_TYPES = ('processes',)
290
291 def test_subclassing(self):
292 uppercaser = _UpperCaser()
293 uppercaser.start()
294 self.assertEqual(uppercaser.submit('hello'), 'HELLO')
295 self.assertEqual(uppercaser.submit('world'), 'WORLD')
296 uppercaser.stop()
297 uppercaser.join()
298
299#
300#
301#
302
303def queue_empty(q):
304 if hasattr(q, 'empty'):
305 return q.empty()
306 else:
307 return q.qsize() == 0
308
309def queue_full(q, maxsize):
310 if hasattr(q, 'full'):
311 return q.full()
312 else:
313 return q.qsize() == maxsize
314
315
316class _TestQueue(BaseTestCase):
317
318
319 def _test_put(self, queue, child_can_start, parent_can_continue):
320 child_can_start.wait()
321 for i in range(6):
322 queue.get()
323 parent_can_continue.set()
324
325 def test_put(self):
326 MAXSIZE = 6
327 queue = self.Queue(maxsize=MAXSIZE)
328 child_can_start = self.Event()
329 parent_can_continue = self.Event()
330
331 proc = self.Process(
332 target=self._test_put,
333 args=(queue, child_can_start, parent_can_continue)
334 )
335 proc.set_daemon(True)
336 proc.start()
337
338 self.assertEqual(queue_empty(queue), True)
339 self.assertEqual(queue_full(queue, MAXSIZE), False)
340
341 queue.put(1)
342 queue.put(2, True)
343 queue.put(3, True, None)
344 queue.put(4, False)
345 queue.put(5, False, None)
346 queue.put_nowait(6)
347
348 # the values may be in buffer but not yet in pipe so sleep a bit
349 time.sleep(DELTA)
350
351 self.assertEqual(queue_empty(queue), False)
352 self.assertEqual(queue_full(queue, MAXSIZE), True)
353
354 put = TimingWrapper(queue.put)
355 put_nowait = TimingWrapper(queue.put_nowait)
356
357 self.assertRaises(pyqueue.Full, put, 7, False)
358 self.assertTimingAlmostEqual(put.elapsed, 0)
359
360 self.assertRaises(pyqueue.Full, put, 7, False, None)
361 self.assertTimingAlmostEqual(put.elapsed, 0)
362
363 self.assertRaises(pyqueue.Full, put_nowait, 7)
364 self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
365
366 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
367 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
368
369 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
370 self.assertTimingAlmostEqual(put.elapsed, 0)
371
372 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
373 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
374
375 child_can_start.set()
376 parent_can_continue.wait()
377
378 self.assertEqual(queue_empty(queue), True)
379 self.assertEqual(queue_full(queue, MAXSIZE), False)
380
381 proc.join()
382
383 def _test_get(self, queue, child_can_start, parent_can_continue):
384 child_can_start.wait()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000385 #queue.put(1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000386 queue.put(2)
387 queue.put(3)
388 queue.put(4)
389 queue.put(5)
390 parent_can_continue.set()
391
392 def test_get(self):
393 queue = self.Queue()
394 child_can_start = self.Event()
395 parent_can_continue = self.Event()
396
397 proc = self.Process(
398 target=self._test_get,
399 args=(queue, child_can_start, parent_can_continue)
400 )
401 proc.set_daemon(True)
402 proc.start()
403
404 self.assertEqual(queue_empty(queue), True)
405
406 child_can_start.set()
407 parent_can_continue.wait()
408
409 time.sleep(DELTA)
410 self.assertEqual(queue_empty(queue), False)
411
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +0000412 # Hangs unexpectedly, remove for now
413 #self.assertEqual(queue.get(), 1)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000414 self.assertEqual(queue.get(True, None), 2)
415 self.assertEqual(queue.get(True), 3)
416 self.assertEqual(queue.get(timeout=1), 4)
417 self.assertEqual(queue.get_nowait(), 5)
418
419 self.assertEqual(queue_empty(queue), True)
420
421 get = TimingWrapper(queue.get)
422 get_nowait = TimingWrapper(queue.get_nowait)
423
424 self.assertRaises(pyqueue.Empty, get, False)
425 self.assertTimingAlmostEqual(get.elapsed, 0)
426
427 self.assertRaises(pyqueue.Empty, get, False, None)
428 self.assertTimingAlmostEqual(get.elapsed, 0)
429
430 self.assertRaises(pyqueue.Empty, get_nowait)
431 self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
432
433 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
434 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
435
436 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
437 self.assertTimingAlmostEqual(get.elapsed, 0)
438
439 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
440 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
441
442 proc.join()
443
444 def _test_fork(self, queue):
445 for i in range(10, 20):
446 queue.put(i)
447 # note that at this point the items may only be buffered, so the
448 # process cannot shutdown until the feeder thread has finished
449 # pushing items onto the pipe.
450
451 def test_fork(self):
452 # Old versions of Queue would fail to create a new feeder
453 # thread for a forked process if the original process had its
454 # own feeder thread. This test checks that this no longer
455 # happens.
456
457 queue = self.Queue()
458
459 # put items on queue so that main process starts a feeder thread
460 for i in range(10):
461 queue.put(i)
462
463 # wait to make sure thread starts before we fork a new process
464 time.sleep(DELTA)
465
466 # fork process
467 p = self.Process(target=self._test_fork, args=(queue,))
468 p.start()
469
470 # check that all expected items are in the queue
471 for i in range(20):
472 self.assertEqual(queue.get(), i)
473 self.assertRaises(pyqueue.Empty, queue.get, False)
474
475 p.join()
476
477 def test_qsize(self):
478 q = self.Queue()
479 try:
480 self.assertEqual(q.qsize(), 0)
481 except NotImplementedError:
482 return
483 q.put(1)
484 self.assertEqual(q.qsize(), 1)
485 q.put(5)
486 self.assertEqual(q.qsize(), 2)
487 q.get()
488 self.assertEqual(q.qsize(), 1)
489 q.get()
490 self.assertEqual(q.qsize(), 0)
491
492 def _test_task_done(self, q):
493 for obj in iter(q.get, None):
494 time.sleep(DELTA)
495 q.task_done()
496
497 def test_task_done(self):
498 queue = self.JoinableQueue()
499
500 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
501 return
502
503 workers = [self.Process(target=self._test_task_done, args=(queue,))
504 for i in range(4)]
505
506 for p in workers:
507 p.start()
508
509 for i in range(10):
510 queue.put(i)
511
512 queue.join()
513
514 for p in workers:
515 queue.put(None)
516
517 for p in workers:
518 p.join()
519
520#
521#
522#
523
524class _TestLock(BaseTestCase):
525
526 def test_lock(self):
527 lock = self.Lock()
528 self.assertEqual(lock.acquire(), True)
529 self.assertEqual(lock.acquire(False), False)
530 self.assertEqual(lock.release(), None)
531 self.assertRaises((ValueError, threading.ThreadError), lock.release)
532
533 def test_rlock(self):
534 lock = self.RLock()
535 self.assertEqual(lock.acquire(), True)
536 self.assertEqual(lock.acquire(), True)
537 self.assertEqual(lock.acquire(), True)
538 self.assertEqual(lock.release(), None)
539 self.assertEqual(lock.release(), None)
540 self.assertEqual(lock.release(), None)
541 self.assertRaises((AssertionError, RuntimeError), lock.release)
542
543
544class _TestSemaphore(BaseTestCase):
545
546 def _test_semaphore(self, sem):
547 self.assertReturnsIfImplemented(2, get_value, sem)
548 self.assertEqual(sem.acquire(), True)
549 self.assertReturnsIfImplemented(1, get_value, sem)
550 self.assertEqual(sem.acquire(), True)
551 self.assertReturnsIfImplemented(0, get_value, sem)
552 self.assertEqual(sem.acquire(False), False)
553 self.assertReturnsIfImplemented(0, get_value, sem)
554 self.assertEqual(sem.release(), None)
555 self.assertReturnsIfImplemented(1, get_value, sem)
556 self.assertEqual(sem.release(), None)
557 self.assertReturnsIfImplemented(2, get_value, sem)
558
559 def test_semaphore(self):
560 sem = self.Semaphore(2)
561 self._test_semaphore(sem)
562 self.assertEqual(sem.release(), None)
563 self.assertReturnsIfImplemented(3, get_value, sem)
564 self.assertEqual(sem.release(), None)
565 self.assertReturnsIfImplemented(4, get_value, sem)
566
567 def test_bounded_semaphore(self):
568 sem = self.BoundedSemaphore(2)
569 self._test_semaphore(sem)
570 # Currently fails on OS/X
571 #if HAVE_GETVALUE:
572 # self.assertRaises(ValueError, sem.release)
573 # self.assertReturnsIfImplemented(2, get_value, sem)
574
575 def test_timeout(self):
576 if self.TYPE != 'processes':
577 return
578
579 sem = self.Semaphore(0)
580 acquire = TimingWrapper(sem.acquire)
581
582 self.assertEqual(acquire(False), False)
583 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
584
585 self.assertEqual(acquire(False, None), False)
586 self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
587
588 self.assertEqual(acquire(False, TIMEOUT1), False)
589 self.assertTimingAlmostEqual(acquire.elapsed, 0)
590
591 self.assertEqual(acquire(True, TIMEOUT2), False)
592 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
593
594 self.assertEqual(acquire(timeout=TIMEOUT3), False)
595 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
596
597
598class _TestCondition(BaseTestCase):
599
600 def f(self, cond, sleeping, woken, timeout=None):
601 cond.acquire()
602 sleeping.release()
603 cond.wait(timeout)
604 woken.release()
605 cond.release()
606
607 def check_invariant(self, cond):
608 # this is only supposed to succeed when there are no sleepers
609 if self.TYPE == 'processes':
610 try:
611 sleepers = (cond._sleeping_count.get_value() -
612 cond._woken_count.get_value())
613 self.assertEqual(sleepers, 0)
614 self.assertEqual(cond._wait_semaphore.get_value(), 0)
615 except NotImplementedError:
616 pass
617
618 def test_notify(self):
619 cond = self.Condition()
620 sleeping = self.Semaphore(0)
621 woken = self.Semaphore(0)
622
623 p = self.Process(target=self.f, args=(cond, sleeping, woken))
624 p.set_daemon(True)
625 p.start()
626
627 p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000628 p.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000629 p.start()
630
631 # wait for both children to start sleeping
632 sleeping.acquire()
633 sleeping.acquire()
634
635 # check no process/thread has woken up
636 time.sleep(DELTA)
637 self.assertReturnsIfImplemented(0, get_value, woken)
638
639 # wake up one process/thread
640 cond.acquire()
641 cond.notify()
642 cond.release()
643
644 # check one process/thread has woken up
645 time.sleep(DELTA)
646 self.assertReturnsIfImplemented(1, get_value, woken)
647
648 # wake up another
649 cond.acquire()
650 cond.notify()
651 cond.release()
652
653 # check other has woken up
654 time.sleep(DELTA)
655 self.assertReturnsIfImplemented(2, get_value, woken)
656
657 # check state is not mucked up
658 self.check_invariant(cond)
659 p.join()
660
661 def test_notify_all(self):
662 cond = self.Condition()
663 sleeping = self.Semaphore(0)
664 woken = self.Semaphore(0)
665
666 # start some threads/processes which will timeout
667 for i in range(3):
668 p = self.Process(target=self.f,
669 args=(cond, sleeping, woken, TIMEOUT1))
670 p.set_daemon(True)
671 p.start()
672
673 t = threading.Thread(target=self.f,
674 args=(cond, sleeping, woken, TIMEOUT1))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000675 t.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000676 t.start()
677
678 # wait for them all to sleep
679 for i in range(6):
680 sleeping.acquire()
681
682 # check they have all timed out
683 for i in range(6):
684 woken.acquire()
685 self.assertReturnsIfImplemented(0, get_value, woken)
686
687 # check state is not mucked up
688 self.check_invariant(cond)
689
690 # start some more threads/processes
691 for i in range(3):
692 p = self.Process(target=self.f, args=(cond, sleeping, woken))
693 p.set_daemon(True)
694 p.start()
695
696 t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
Benjamin Peterson672b8032008-06-11 19:14:14 +0000697 t.set_daemon(True)
Benjamin Petersone711caf2008-06-11 16:44:04 +0000698 t.start()
699
700 # wait for them to all sleep
701 for i in range(6):
702 sleeping.acquire()
703
704 # check no process/thread has woken up
705 time.sleep(DELTA)
706 self.assertReturnsIfImplemented(0, get_value, woken)
707
708 # wake them all up
709 cond.acquire()
710 cond.notify_all()
711 cond.release()
712
713 # check they have all woken
714 time.sleep(DELTA)
715 self.assertReturnsIfImplemented(6, get_value, woken)
716
717 # check state is not mucked up
718 self.check_invariant(cond)
719
720 def test_timeout(self):
721 cond = self.Condition()
722 wait = TimingWrapper(cond.wait)
723 cond.acquire()
724 res = wait(TIMEOUT1)
725 cond.release()
726 self.assertEqual(res, None)
727 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
728
729
730class _TestEvent(BaseTestCase):
731
732 def _test_event(self, event):
733 time.sleep(TIMEOUT2)
734 event.set()
735
736 def test_event(self):
737 event = self.Event()
738 wait = TimingWrapper(event.wait)
739
740 # Removed temporaily, due to API shear, this does not
741 # work with threading._Event objects. is_set == isSet
742 #self.assertEqual(event.is_set(), False)
743
744 self.assertEqual(wait(0.0), None)
745 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
746 self.assertEqual(wait(TIMEOUT1), None)
747 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
748
749 event.set()
750
751 # See note above on the API differences
752 # self.assertEqual(event.is_set(), True)
753 self.assertEqual(wait(), None)
754 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
755 self.assertEqual(wait(TIMEOUT1), None)
756 self.assertTimingAlmostEqual(wait.elapsed, 0.0)
757 # self.assertEqual(event.is_set(), True)
758
759 event.clear()
760
761 #self.assertEqual(event.is_set(), False)
762
763 self.Process(target=self._test_event, args=(event,)).start()
764 self.assertEqual(wait(), None)
765
766#
767#
768#
769
770class _TestValue(BaseTestCase):
771
772 codes_values = [
773 ('i', 4343, 24234),
774 ('d', 3.625, -4.25),
775 ('h', -232, 234),
776 ('c', latin('x'), latin('y'))
777 ]
778
779 def _test(self, values):
780 for sv, cv in zip(values, self.codes_values):
781 sv.value = cv[2]
782
783
784 def test_value(self, raw=False):
785 if self.TYPE != 'processes':
786 return
787
788 if raw:
789 values = [self.RawValue(code, value)
790 for code, value, _ in self.codes_values]
791 else:
792 values = [self.Value(code, value)
793 for code, value, _ in self.codes_values]
794
795 for sv, cv in zip(values, self.codes_values):
796 self.assertEqual(sv.value, cv[1])
797
798 proc = self.Process(target=self._test, args=(values,))
799 proc.start()
800 proc.join()
801
802 for sv, cv in zip(values, self.codes_values):
803 self.assertEqual(sv.value, cv[2])
804
805 def test_rawvalue(self):
806 self.test_value(raw=True)
807
808 def test_getobj_getlock(self):
809 if self.TYPE != 'processes':
810 return
811
812 val1 = self.Value('i', 5)
813 lock1 = val1.get_lock()
814 obj1 = val1.get_obj()
815
816 val2 = self.Value('i', 5, lock=None)
817 lock2 = val2.get_lock()
818 obj2 = val2.get_obj()
819
820 lock = self.Lock()
821 val3 = self.Value('i', 5, lock=lock)
822 lock3 = val3.get_lock()
823 obj3 = val3.get_obj()
824 self.assertEqual(lock, lock3)
825
826 arr4 = self.RawValue('i', 5)
827 self.assertFalse(hasattr(arr4, 'get_lock'))
828 self.assertFalse(hasattr(arr4, 'get_obj'))
829
830
831class _TestArray(BaseTestCase):
832
833 def f(self, seq):
834 for i in range(1, len(seq)):
835 seq[i] += seq[i-1]
836
837 def test_array(self, raw=False):
838 if self.TYPE != 'processes':
839 return
840
841 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
842 if raw:
843 arr = self.RawArray('i', seq)
844 else:
845 arr = self.Array('i', seq)
846
847 self.assertEqual(len(arr), len(seq))
848 self.assertEqual(arr[3], seq[3])
849 self.assertEqual(list(arr[2:7]), list(seq[2:7]))
850
851 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
852
853 self.assertEqual(list(arr[:]), seq)
854
855 self.f(seq)
856
857 p = self.Process(target=self.f, args=(arr,))
858 p.start()
859 p.join()
860
861 self.assertEqual(list(arr[:]), seq)
862
863 def test_rawarray(self):
864 self.test_array(raw=True)
865
866 def test_getobj_getlock_obj(self):
867 if self.TYPE != 'processes':
868 return
869
870 arr1 = self.Array('i', list(range(10)))
871 lock1 = arr1.get_lock()
872 obj1 = arr1.get_obj()
873
874 arr2 = self.Array('i', list(range(10)), lock=None)
875 lock2 = arr2.get_lock()
876 obj2 = arr2.get_obj()
877
878 lock = self.Lock()
879 arr3 = self.Array('i', list(range(10)), lock=lock)
880 lock3 = arr3.get_lock()
881 obj3 = arr3.get_obj()
882 self.assertEqual(lock, lock3)
883
884 arr4 = self.RawArray('i', list(range(10)))
885 self.assertFalse(hasattr(arr4, 'get_lock'))
886 self.assertFalse(hasattr(arr4, 'get_obj'))
887
888#
889#
890#
891
892class _TestContainers(BaseTestCase):
893
894 ALLOWED_TYPES = ('manager',)
895
896 def test_list(self):
897 a = self.list(list(range(10)))
898 self.assertEqual(a[:], list(range(10)))
899
900 b = self.list()
901 self.assertEqual(b[:], [])
902
903 b.extend(list(range(5)))
904 self.assertEqual(b[:], list(range(5)))
905
906 self.assertEqual(b[2], 2)
907 self.assertEqual(b[2:10], [2,3,4])
908
909 b *= 2
910 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
911
912 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
913
914 self.assertEqual(a[:], list(range(10)))
915
916 d = [a, b]
917 e = self.list(d)
918 self.assertEqual(
919 e[:],
920 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
921 )
922
923 f = self.list([a])
924 a.append('hello')
925 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
926
927 def test_dict(self):
928 d = self.dict()
929 indices = list(range(65, 70))
930 for i in indices:
931 d[i] = chr(i)
932 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
933 self.assertEqual(sorted(d.keys()), indices)
934 self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
935 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
936
937 def test_namespace(self):
938 n = self.Namespace()
939 n.name = 'Bob'
940 n.job = 'Builder'
941 n._hidden = 'hidden'
942 self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
943 del n.job
944 self.assertEqual(str(n), "Namespace(name='Bob')")
945 self.assertTrue(hasattr(n, 'name'))
946 self.assertTrue(not hasattr(n, 'job'))
947
948#
949#
950#
951
952def sqr(x, wait=0.0):
953 time.sleep(wait)
954 return x*x
Benjamin Petersone711caf2008-06-11 16:44:04 +0000955class _TestPool(BaseTestCase):
956
957 def test_apply(self):
958 papply = self.pool.apply
959 self.assertEqual(papply(sqr, (5,)), sqr(5))
960 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
961
962 def test_map(self):
963 pmap = self.pool.map
964 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
965 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
966 list(map(sqr, list(range(100)))))
967
968 def test_async(self):
969 res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
970 get = TimingWrapper(res.get)
971 self.assertEqual(get(), 49)
972 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
973
974 def test_async_timeout(self):
975 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 0.2))
976 get = TimingWrapper(res.get)
977 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
978 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
979
980 def test_imap(self):
981 it = self.pool.imap(sqr, list(range(10)))
982 self.assertEqual(list(it), list(map(sqr, list(range(10)))))
983
984 it = self.pool.imap(sqr, list(range(10)))
985 for i in range(10):
986 self.assertEqual(next(it), i*i)
987 self.assertRaises(StopIteration, it.__next__)
988
989 it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
990 for i in range(1000):
991 self.assertEqual(next(it), i*i)
992 self.assertRaises(StopIteration, it.__next__)
993
994 def test_imap_unordered(self):
995 it = self.pool.imap_unordered(sqr, list(range(1000)))
996 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
997
998 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
999 self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1000
1001 def test_make_pool(self):
1002 p = multiprocessing.Pool(3)
1003 self.assertEqual(3, len(p._pool))
1004 p.close()
1005 p.join()
1006
1007 def test_terminate(self):
1008 if self.TYPE == 'manager':
1009 # On Unix a forked process increfs each shared object to
1010 # which its parent process held a reference. If the
1011 # forked process gets terminated then there is likely to
1012 # be a reference leak. So to prevent
1013 # _TestZZZNumberOfObjects from failing we skip this test
1014 # when using a manager.
1015 return
1016
1017 result = self.pool.map_async(
1018 time.sleep, [0.1 for i in range(10000)], chunksize=1
1019 )
1020 self.pool.terminate()
1021 join = TimingWrapper(self.pool.join)
1022 join()
1023 self.assertTrue(join.elapsed < 0.2)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001024#
1025# Test that manager has expected number of shared objects left
1026#
1027
1028class _TestZZZNumberOfObjects(BaseTestCase):
1029 # Because test cases are sorted alphabetically, this one will get
1030 # run after all the other tests for the manager. It tests that
1031 # there have been no "reference leaks" for the manager's shared
1032 # objects. Note the comment in _TestPool.test_terminate().
1033 ALLOWED_TYPES = ('manager',)
1034
1035 def test_number_of_objects(self):
1036 EXPECTED_NUMBER = 1 # the pool object is still alive
1037 multiprocessing.active_children() # discard dead process objs
1038 gc.collect() # do garbage collection
1039 refs = self.manager._number_of_objects()
1040 if refs != EXPECTED_NUMBER:
1041 print(self.manager._debugInfo())
1042
1043 self.assertEqual(refs, EXPECTED_NUMBER)
1044
1045#
1046# Test of creating a customized manager class
1047#
1048
1049from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
1050
1051class FooBar(object):
1052 def f(self):
1053 return 'f()'
1054 def g(self):
1055 raise ValueError
1056 def _h(self):
1057 return '_h()'
1058
1059def baz():
1060 for i in range(10):
1061 yield i*i
1062
1063class IteratorProxy(BaseProxy):
1064 _exposed_ = ('next', '__next__')
1065 def __iter__(self):
1066 return self
1067 def __next__(self):
1068 return self._callmethod('next')
1069 def __next__(self):
1070 return self._callmethod('__next__')
1071
1072class MyManager(BaseManager):
1073 pass
1074
1075MyManager.register('Foo', callable=FooBar)
1076MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1077MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1078
1079
1080class _TestMyManager(BaseTestCase):
1081
1082 ALLOWED_TYPES = ('manager',)
1083
1084 def test_mymanager(self):
1085 manager = MyManager()
1086 manager.start()
1087
1088 foo = manager.Foo()
1089 bar = manager.Bar()
1090 baz = manager.baz()
1091
1092 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1093 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1094
1095 self.assertEqual(foo_methods, ['f', 'g'])
1096 self.assertEqual(bar_methods, ['f', '_h'])
1097
1098 self.assertEqual(foo.f(), 'f()')
1099 self.assertRaises(ValueError, foo.g)
1100 self.assertEqual(foo._callmethod('f'), 'f()')
1101 self.assertRaises(RemoteError, foo._callmethod, '_h')
1102
1103 self.assertEqual(bar.f(), 'f()')
1104 self.assertEqual(bar._h(), '_h()')
1105 self.assertEqual(bar._callmethod('f'), 'f()')
1106 self.assertEqual(bar._callmethod('_h'), '_h()')
1107
1108 self.assertEqual(list(baz), [i*i for i in range(10)])
1109
1110 manager.shutdown()
1111
1112#
1113# Test of connecting to a remote server and using xmlrpclib for serialization
1114#
1115
1116_queue = pyqueue.Queue()
1117def get_queue():
1118 return _queue
1119
1120class QueueManager(BaseManager):
1121 '''manager class used by server process'''
1122QueueManager.register('get_queue', callable=get_queue)
1123
1124class QueueManager2(BaseManager):
1125 '''manager class which specifies the same interface as QueueManager'''
1126QueueManager2.register('get_queue')
1127
1128
1129SERIALIZER = 'xmlrpclib'
1130
1131class _TestRemoteManager(BaseTestCase):
1132
1133 ALLOWED_TYPES = ('manager',)
1134
1135 def _putter(self, address, authkey):
1136 manager = QueueManager2(
1137 address=address, authkey=authkey, serializer=SERIALIZER
1138 )
1139 manager.connect()
1140 queue = manager.get_queue()
1141 queue.put(('hello world', None, True, 2.25))
1142
1143 def test_remote(self):
1144 authkey = os.urandom(32)
1145
1146 manager = QueueManager(
1147 address=('localhost', 0), authkey=authkey, serializer=SERIALIZER
1148 )
1149 manager.start()
1150
1151 p = self.Process(target=self._putter, args=(manager.address, authkey))
1152 p.start()
1153
1154 manager2 = QueueManager2(
1155 address=manager.address, authkey=authkey, serializer=SERIALIZER
1156 )
1157 manager2.connect()
1158 queue = manager2.get_queue()
1159
1160 # Note that xmlrpclib will deserialize object as a list not a tuple
1161 self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
1162
1163 # Because we are using xmlrpclib for serialization instead of
1164 # pickle this will cause a serialization error.
1165 self.assertRaises(Exception, queue.put, time.sleep)
1166
1167 # Make queue finalizer run before the server is stopped
1168 del queue
1169 manager.shutdown()
1170
1171#
1172#
1173#
1174
1175SENTINEL = latin('')
1176
1177class _TestConnection(BaseTestCase):
1178
1179 ALLOWED_TYPES = ('processes', 'threads')
1180
1181 def _echo(self, conn):
1182 for msg in iter(conn.recv_bytes, SENTINEL):
1183 conn.send_bytes(msg)
1184 conn.close()
1185
1186 def test_connection(self):
1187 conn, child_conn = self.Pipe()
1188
1189 p = self.Process(target=self._echo, args=(child_conn,))
1190 p.set_daemon(True)
1191 p.start()
1192
1193 seq = [1, 2.25, None]
1194 msg = latin('hello world')
1195 longmsg = msg * 10
1196 arr = array.array('i', list(range(4)))
1197
1198 if self.TYPE == 'processes':
1199 self.assertEqual(type(conn.fileno()), int)
1200
1201 self.assertEqual(conn.send(seq), None)
1202 self.assertEqual(conn.recv(), seq)
1203
1204 self.assertEqual(conn.send_bytes(msg), None)
1205 self.assertEqual(conn.recv_bytes(), msg)
1206
1207 if self.TYPE == 'processes':
1208 buffer = array.array('i', [0]*10)
1209 expected = list(arr) + [0] * (10 - len(arr))
1210 self.assertEqual(conn.send_bytes(arr), None)
1211 self.assertEqual(conn.recv_bytes_into(buffer),
1212 len(arr) * buffer.itemsize)
1213 self.assertEqual(list(buffer), expected)
1214
1215 buffer = array.array('i', [0]*10)
1216 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
1217 self.assertEqual(conn.send_bytes(arr), None)
1218 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
1219 len(arr) * buffer.itemsize)
1220 self.assertEqual(list(buffer), expected)
1221
1222 buffer = bytearray(latin(' ' * 40))
1223 self.assertEqual(conn.send_bytes(longmsg), None)
1224 try:
1225 res = conn.recv_bytes_into(buffer)
1226 except multiprocessing.BufferTooShort as e:
1227 self.assertEqual(e.args, (longmsg,))
1228 else:
1229 self.fail('expected BufferTooShort, got %s' % res)
1230
1231 poll = TimingWrapper(conn.poll)
1232
1233 self.assertEqual(poll(), False)
1234 self.assertTimingAlmostEqual(poll.elapsed, 0)
1235
1236 self.assertEqual(poll(TIMEOUT1), False)
1237 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
1238
1239 conn.send(None)
1240
1241 self.assertEqual(poll(TIMEOUT1), True)
1242 self.assertTimingAlmostEqual(poll.elapsed, 0)
1243
1244 self.assertEqual(conn.recv(), None)
1245
1246 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
1247 conn.send_bytes(really_big_msg)
1248 self.assertEqual(conn.recv_bytes(), really_big_msg)
1249
1250 conn.send_bytes(SENTINEL) # tell child to quit
1251 child_conn.close()
1252
1253 if self.TYPE == 'processes':
1254 self.assertEqual(conn.readable, True)
1255 self.assertEqual(conn.writable, True)
1256 self.assertRaises(EOFError, conn.recv)
1257 self.assertRaises(EOFError, conn.recv_bytes)
1258
1259 p.join()
1260
1261 def test_duplex_false(self):
1262 reader, writer = self.Pipe(duplex=False)
1263 self.assertEqual(writer.send(1), None)
1264 self.assertEqual(reader.recv(), 1)
1265 if self.TYPE == 'processes':
1266 self.assertEqual(reader.readable, True)
1267 self.assertEqual(reader.writable, False)
1268 self.assertEqual(writer.readable, False)
1269 self.assertEqual(writer.writable, True)
1270 self.assertRaises(IOError, reader.send, 2)
1271 self.assertRaises(IOError, writer.recv)
1272 self.assertRaises(IOError, writer.poll)
1273
1274 def test_spawn_close(self):
1275 # We test that a pipe connection can be closed by parent
1276 # process immediately after child is spawned. On Windows this
1277 # would have sometimes failed on old versions because
1278 # child_conn would be closed before the child got a chance to
1279 # duplicate it.
1280 conn, child_conn = self.Pipe()
1281
1282 p = self.Process(target=self._echo, args=(child_conn,))
1283 p.start()
1284 child_conn.close() # this might complete before child initializes
1285
1286 msg = latin('hello')
1287 conn.send_bytes(msg)
1288 self.assertEqual(conn.recv_bytes(), msg)
1289
1290 conn.send_bytes(SENTINEL)
1291 conn.close()
1292 p.join()
1293
1294 def test_sendbytes(self):
1295 if self.TYPE != 'processes':
1296 return
1297
1298 msg = latin('abcdefghijklmnopqrstuvwxyz')
1299 a, b = self.Pipe()
1300
1301 a.send_bytes(msg)
1302 self.assertEqual(b.recv_bytes(), msg)
1303
1304 a.send_bytes(msg, 5)
1305 self.assertEqual(b.recv_bytes(), msg[5:])
1306
1307 a.send_bytes(msg, 7, 8)
1308 self.assertEqual(b.recv_bytes(), msg[7:7+8])
1309
1310 a.send_bytes(msg, 26)
1311 self.assertEqual(b.recv_bytes(), latin(''))
1312
1313 a.send_bytes(msg, 26, 0)
1314 self.assertEqual(b.recv_bytes(), latin(''))
1315
1316 self.assertRaises(ValueError, a.send_bytes, msg, 27)
1317
1318 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
1319
1320 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
1321
1322 self.assertRaises(ValueError, a.send_bytes, msg, -1)
1323
1324 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
1325
Benjamin Petersone711caf2008-06-11 16:44:04 +00001326class _TestListenerClient(BaseTestCase):
1327
1328 ALLOWED_TYPES = ('processes', 'threads')
1329
1330 def _test(self, address):
1331 conn = self.connection.Client(address)
1332 conn.send('hello')
1333 conn.close()
1334
1335 def test_listener_client(self):
1336 for family in self.connection.families:
1337 l = self.connection.Listener(family=family)
1338 p = self.Process(target=self._test, args=(l.address,))
1339 p.set_daemon(True)
1340 p.start()
1341 conn = l.accept()
1342 self.assertEqual(conn.recv(), 'hello')
1343 p.join()
1344 l.close()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001345#
1346# Test of sending connection and socket objects between processes
1347#
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001348"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001349class _TestPicklingConnections(BaseTestCase):
1350
1351 ALLOWED_TYPES = ('processes',)
1352
1353 def _listener(self, conn, families):
1354 for fam in families:
1355 l = self.connection.Listener(family=fam)
1356 conn.send(l.address)
1357 new_conn = l.accept()
1358 conn.send(new_conn)
1359
1360 if self.TYPE == 'processes':
1361 l = socket.socket()
1362 l.bind(('localhost', 0))
1363 conn.send(l.getsockname())
1364 l.listen(1)
1365 new_conn, addr = l.accept()
1366 conn.send(new_conn)
1367
1368 conn.recv()
1369
1370 def _remote(self, conn):
1371 for (address, msg) in iter(conn.recv, None):
1372 client = self.connection.Client(address)
1373 client.send(msg.upper())
1374 client.close()
1375
1376 if self.TYPE == 'processes':
1377 address, msg = conn.recv()
1378 client = socket.socket()
1379 client.connect(address)
1380 client.sendall(msg.upper())
1381 client.close()
1382
1383 conn.close()
1384
1385 def test_pickling(self):
1386 try:
1387 multiprocessing.allow_connection_pickling()
1388 except ImportError:
1389 return
1390
1391 families = self.connection.families
1392
1393 lconn, lconn0 = self.Pipe()
1394 lp = self.Process(target=self._listener, args=(lconn0, families))
1395 lp.start()
1396 lconn0.close()
1397
1398 rconn, rconn0 = self.Pipe()
1399 rp = self.Process(target=self._remote, args=(rconn0,))
1400 rp.start()
1401 rconn0.close()
1402
1403 for fam in families:
1404 msg = ('This connection uses family %s' % fam).encode('ascii')
1405 address = lconn.recv()
1406 rconn.send((address, msg))
1407 new_conn = lconn.recv()
1408 self.assertEqual(new_conn.recv(), msg.upper())
1409
1410 rconn.send(None)
1411
1412 if self.TYPE == 'processes':
1413 msg = latin('This connection uses a normal socket')
1414 address = lconn.recv()
1415 rconn.send((address, msg))
1416 if hasattr(socket, 'fromfd'):
1417 new_conn = lconn.recv()
1418 self.assertEqual(new_conn.recv(100), msg.upper())
1419 else:
1420 # XXX On Windows with Py2.6 need to backport fromfd()
1421 discard = lconn.recv_bytes()
1422
1423 lconn.send(None)
1424
1425 rconn.close()
1426 lconn.close()
1427
1428 lp.join()
1429 rp.join()
Benjamin Petersonb29cbbc2008-06-16 20:57:14 +00001430"""
Benjamin Petersone711caf2008-06-11 16:44:04 +00001431#
1432#
1433#
1434
1435class _TestHeap(BaseTestCase):
1436
1437 ALLOWED_TYPES = ('processes',)
1438
1439 def test_heap(self):
1440 iterations = 5000
1441 maxblocks = 50
1442 blocks = []
1443
1444 # create and destroy lots of blocks of different sizes
1445 for i in range(iterations):
1446 size = int(random.lognormvariate(0, 1) * 1000)
1447 b = multiprocessing.heap.BufferWrapper(size)
1448 blocks.append(b)
1449 if len(blocks) > maxblocks:
1450 i = random.randrange(maxblocks)
1451 del blocks[i]
1452
1453 # get the heap object
1454 heap = multiprocessing.heap.BufferWrapper._heap
1455
1456 # verify the state of the heap
1457 all = []
1458 occupied = 0
1459 for L in list(heap._len_to_seq.values()):
1460 for arena, start, stop in L:
1461 all.append((heap._arenas.index(arena), start, stop,
1462 stop-start, 'free'))
1463 for arena, start, stop in heap._allocated_blocks:
1464 all.append((heap._arenas.index(arena), start, stop,
1465 stop-start, 'occupied'))
1466 occupied += (stop-start)
1467
1468 all.sort()
1469
1470 for i in range(len(all)-1):
1471 (arena, start, stop) = all[i][:3]
1472 (narena, nstart, nstop) = all[i+1][:3]
1473 self.assertTrue((arena != narena and nstart == 0) or
1474 (stop == nstart))
1475
1476#
1477#
1478#
1479
1480try:
1481 from ctypes import Structure, Value, copy, c_int, c_double
1482except ImportError:
1483 Structure = object
1484 c_int = c_double = None
1485
1486class _Foo(Structure):
1487 _fields_ = [
1488 ('x', c_int),
1489 ('y', c_double)
1490 ]
1491
1492class _TestSharedCTypes(BaseTestCase):
1493
1494 ALLOWED_TYPES = ('processes',)
1495
1496 def _double(self, x, y, foo, arr, string):
1497 x.value *= 2
1498 y.value *= 2
1499 foo.x *= 2
1500 foo.y *= 2
1501 string.value *= 2
1502 for i in range(len(arr)):
1503 arr[i] *= 2
1504
1505 def test_sharedctypes(self, lock=False):
1506 if c_int is None:
1507 return
1508
1509 x = Value('i', 7, lock=lock)
1510 y = Value(ctypes.c_double, 1.0/3.0, lock=lock)
1511 foo = Value(_Foo, 3, 2, lock=lock)
1512 arr = Array('d', list(range(10)), lock=lock)
1513 string = Array('c', 20, lock=lock)
1514 string.value = 'hello'
1515
1516 p = self.Process(target=self._double, args=(x, y, foo, arr, string))
1517 p.start()
1518 p.join()
1519
1520 self.assertEqual(x.value, 14)
1521 self.assertAlmostEqual(y.value, 2.0/3.0)
1522 self.assertEqual(foo.x, 6)
1523 self.assertAlmostEqual(foo.y, 4.0)
1524 for i in range(10):
1525 self.assertAlmostEqual(arr[i], i*2)
1526 self.assertEqual(string.value, latin('hellohello'))
1527
1528 def test_synchronize(self):
1529 self.test_sharedctypes(lock=True)
1530
1531 def test_copy(self):
1532 if c_int is None:
1533 return
1534
1535 foo = _Foo(2, 5.0)
1536 bar = copy(foo)
1537 foo.x = 0
1538 foo.y = 0
1539 self.assertEqual(bar.x, 2)
1540 self.assertAlmostEqual(bar.y, 5.0)
1541
1542#
1543#
1544#
1545
1546class _TestFinalize(BaseTestCase):
1547
1548 ALLOWED_TYPES = ('processes',)
1549
1550 def _test_finalize(self, conn):
1551 class Foo(object):
1552 pass
1553
1554 a = Foo()
1555 util.Finalize(a, conn.send, args=('a',))
1556 del a # triggers callback for a
1557
1558 b = Foo()
1559 close_b = util.Finalize(b, conn.send, args=('b',))
1560 close_b() # triggers callback for b
1561 close_b() # does nothing because callback has already been called
1562 del b # does nothing because callback has already been called
1563
1564 c = Foo()
1565 util.Finalize(c, conn.send, args=('c',))
1566
1567 d10 = Foo()
1568 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
1569
1570 d01 = Foo()
1571 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
1572 d02 = Foo()
1573 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
1574 d03 = Foo()
1575 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
1576
1577 util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
1578
1579 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
1580
1581 # call mutliprocessing's cleanup function then exit process without
1582 # garbage collecting locals
1583 util._exit_function()
1584 conn.close()
1585 os._exit(0)
1586
1587 def test_finalize(self):
1588 conn, child_conn = self.Pipe()
1589
1590 p = self.Process(target=self._test_finalize, args=(child_conn,))
1591 p.start()
1592 p.join()
1593
1594 result = [obj for obj in iter(conn.recv, 'STOP')]
1595 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
1596
1597#
1598# Test that from ... import * works for each module
1599#
1600
1601class _TestImportStar(BaseTestCase):
1602
1603 ALLOWED_TYPES = ('processes',)
1604
1605 def test_import(self):
1606 modules = (
1607 'multiprocessing', 'multiprocessing.connection',
1608 'multiprocessing.heap', 'multiprocessing.managers',
1609 'multiprocessing.pool', 'multiprocessing.process',
1610 'multiprocessing.reduction', 'multiprocessing.sharedctypes',
1611 'multiprocessing.synchronize', 'multiprocessing.util'
1612 )
1613
1614 for name in modules:
1615 __import__(name)
1616 mod = sys.modules[name]
1617
1618 for attr in getattr(mod, '__all__', ()):
1619 self.assertTrue(
1620 hasattr(mod, attr),
1621 '%r does not have attribute %r' % (mod, attr)
1622 )
1623
1624#
1625# Quick test that logging works -- does not test logging output
1626#
1627
1628class _TestLogging(BaseTestCase):
1629
1630 ALLOWED_TYPES = ('processes',)
1631
1632 def test_enable_logging(self):
1633 logger = multiprocessing.get_logger()
1634 logger.setLevel(util.SUBWARNING)
1635 self.assertTrue(logger is not None)
1636 logger.debug('this will not be printed')
1637 logger.info('nor will this')
1638 logger.setLevel(LOG_LEVEL)
1639
1640 def _test_level(self, conn):
1641 logger = multiprocessing.get_logger()
1642 conn.send(logger.getEffectiveLevel())
1643
1644 def test_level(self):
1645 LEVEL1 = 32
1646 LEVEL2 = 37
1647
1648 logger = multiprocessing.get_logger()
1649 root_logger = logging.getLogger()
1650 root_level = root_logger.level
1651
1652 reader, writer = multiprocessing.Pipe(duplex=False)
1653
1654 logger.setLevel(LEVEL1)
1655 self.Process(target=self._test_level, args=(writer,)).start()
1656 self.assertEqual(LEVEL1, reader.recv())
1657
1658 logger.setLevel(logging.NOTSET)
1659 root_logger.setLevel(LEVEL2)
1660 self.Process(target=self._test_level, args=(writer,)).start()
1661 self.assertEqual(LEVEL2, reader.recv())
1662
1663 root_logger.setLevel(root_level)
1664 logger.setLevel(level=LOG_LEVEL)
1665
1666#
1667# Functions used to create test cases from the base ones in this module
1668#
1669
1670def get_attributes(Source, names):
1671 d = {}
1672 for name in names:
1673 obj = getattr(Source, name)
1674 if type(obj) == type(get_attributes):
1675 obj = staticmethod(obj)
1676 d[name] = obj
1677 return d
1678
1679def create_test_cases(Mixin, type):
1680 result = {}
1681 glob = globals()
1682 Type = type[0].upper() + type[1:]
1683
1684 for name in list(glob.keys()):
1685 if name.startswith('_Test'):
1686 base = glob[name]
1687 if type in base.ALLOWED_TYPES:
1688 newname = 'With' + Type + name[1:]
1689 class Temp(base, unittest.TestCase, Mixin):
1690 pass
1691 result[newname] = Temp
1692 Temp.__name__ = newname
1693 Temp.__module__ = Mixin.__module__
1694 return result
1695
1696#
1697# Create test cases
1698#
1699
1700class ProcessesMixin(object):
1701 TYPE = 'processes'
1702 Process = multiprocessing.Process
1703 locals().update(get_attributes(multiprocessing, (
1704 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1705 'Condition', 'Event', 'Value', 'Array', 'RawValue',
1706 'RawArray', 'current_process', 'active_children', 'Pipe',
1707 'connection', 'JoinableQueue'
1708 )))
1709
1710testcases_processes = create_test_cases(ProcessesMixin, type='processes')
1711globals().update(testcases_processes)
1712
1713
1714class ManagerMixin(object):
1715 TYPE = 'manager'
1716 Process = multiprocessing.Process
1717 manager = object.__new__(multiprocessing.managers.SyncManager)
1718 locals().update(get_attributes(manager, (
1719 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1720 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
1721 'Namespace', 'JoinableQueue'
1722 )))
1723
1724testcases_manager = create_test_cases(ManagerMixin, type='manager')
1725globals().update(testcases_manager)
1726
1727
1728class ThreadsMixin(object):
1729 TYPE = 'threads'
1730 Process = multiprocessing.dummy.Process
1731 locals().update(get_attributes(multiprocessing.dummy, (
1732 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
1733 'Condition', 'Event', 'Value', 'Array', 'current_process',
1734 'active_children', 'Pipe', 'connection', 'dict', 'list',
1735 'Namespace', 'JoinableQueue'
1736 )))
1737
1738testcases_threads = create_test_cases(ThreadsMixin, type='threads')
1739globals().update(testcases_threads)
1740
1741#
1742#
1743#
1744
1745def test_main(run=None):
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001746 if sys.platform.startswith("linux"):
1747 try:
1748 lock = multiprocessing.RLock()
1749 except OSError:
Amaury Forgeot d'Arc620626b2008-06-19 22:03:50 +00001750 from test.support import TestSkipped
Jesse Nollerd00df3c2008-06-18 14:22:48 +00001751 raise TestSkipped("OSError raises on RLock creation, see issue 3111!")
Benjamin Peterson3c0dd062008-06-17 22:43:48 +00001752
Benjamin Petersone711caf2008-06-11 16:44:04 +00001753 if run is None:
1754 from test.support import run_unittest as run
1755
1756 util.get_temp_dir() # creates temp directory for use by all processes
1757
1758 multiprocessing.get_logger().setLevel(LOG_LEVEL)
1759
Benjamin Peterson41181742008-07-02 20:22:54 +00001760 ProcessesMixin.pool = multiprocessing.Pool(4)
1761 ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
1762 ManagerMixin.manager.__init__()
1763 ManagerMixin.manager.start()
1764 ManagerMixin.pool = ManagerMixin.manager.Pool(4)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001765
1766 testcases = (
Benjamin Peterson41181742008-07-02 20:22:54 +00001767 sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
1768 sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
1769 sorted(testcases_manager.values(), key=lambda tc:tc.__name__)
Benjamin Petersone711caf2008-06-11 16:44:04 +00001770 )
1771
1772 loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
1773 suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
1774 run(suite)
1775
Benjamin Peterson41181742008-07-02 20:22:54 +00001776 ThreadsMixin.pool.terminate()
1777 ProcessesMixin.pool.terminate()
1778 ManagerMixin.pool.terminate()
1779 ManagerMixin.manager.shutdown()
Benjamin Petersone711caf2008-06-11 16:44:04 +00001780
Benjamin Peterson41181742008-07-02 20:22:54 +00001781 del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
Benjamin Petersone711caf2008-06-11 16:44:04 +00001782
1783def main():
1784 test_main(unittest.TextTestRunner(verbosity=2).run)
1785
1786if __name__ == '__main__':
1787 main()